Merge "Switch to enabled version of identity clients"
diff --git a/patrole_tempest_plugin/rbac_policy_parser.py b/patrole_tempest_plugin/rbac_policy_parser.py
index 41871cf..254bb18 100644
--- a/patrole_tempest_plugin/rbac_policy_parser.py
+++ b/patrole_tempest_plugin/rbac_policy_parser.py
@@ -17,17 +17,17 @@
import json
import os
-from oslo_config import cfg
from oslo_log import log as logging
from oslo_policy import policy
import stevedore
from tempest.common import credentials_factory as credentials
+from tempest import config
from patrole_tempest_plugin import rbac_exceptions
from patrole_tempest_plugin.rbac_utils import RbacAuthority
-CONF = cfg.CONF
+CONF = config.CONF
LOG = logging.getLogger(__name__)
@@ -94,8 +94,10 @@
# doing an API call every time.
if not hasattr(cls, 'available_services'):
admin_mgr = credentials.AdminManager()
- services = admin_mgr.identity_services_v3_client.\
- list_services()['services']
+ services_client = (admin_mgr.identity_services_v3_client
+ if CONF.identity_feature_enabled.api_v3
+ else admin_mgr.identity_services_client)
+ services = services_client.list_services()['services']
cls.available_services = [s['name'] for s in services]
if not service or service not in cls.available_services:
diff --git a/patrole_tempest_plugin/rbac_rule_validation.py b/patrole_tempest_plugin/rbac_rule_validation.py
index 0a101d7..51b9d92 100644
--- a/patrole_tempest_plugin/rbac_rule_validation.py
+++ b/patrole_tempest_plugin/rbac_rule_validation.py
@@ -143,6 +143,9 @@
reference nested BaseTestCase attributes
:returns: True if the current RBAC role can perform the policy action else
False
+
+ :raises RbacResourceSetupFailed: if project_id or user_id are missing from
+ the Tempest test object's `auth_provider`
:raises RbacParsingException: if ``CONF.rbac.strict_policy_check`` is
enabled and the ``rule_name`` does not exist in the system
:raises skipException: if ``CONF.rbac.strict_policy_check`` is
diff --git a/patrole_tempest_plugin/rbac_utils.py b/patrole_tempest_plugin/rbac_utils.py
index 9fd8aee..9d7a807 100644
--- a/patrole_tempest_plugin/rbac_utils.py
+++ b/patrole_tempest_plugin/rbac_utils.py
@@ -76,6 +76,15 @@
rbac_role_id = None
def switch_role(self, test_obj, toggle_rbac_role=False):
+ """Switch the role used by `os_primary` Tempest credentials.
+
+ Switch the role used by `os_primary` credentials to:
+ * admin if `toggle_rbac_role` is False
+ * `CONF.rbac.rbac_test_role` if `toggle_rbac_role` is True
+
+ :param test_obj: test object of type tempest.lib.base.BaseTestCase
+ :param toggle_rbac_role: role to switch `os_primary` Tempest creds to
+ """
self.user_id = test_obj.os_primary.credentials.user_id
self.project_id = test_obj.os_primary.credentials.tenant_id
self.token = test_obj.os_primary.auth_provider.get_token()
@@ -92,20 +101,16 @@
else:
self._add_role_to_user(self.admin_role_id)
except Exception as exp:
- LOG.error(exp)
+ LOG.exception(exp)
raise
finally:
- # NOTE(felipemonteiro): These two comments below are copied from
- # tempest.api.identity.v2/v3.test_users.
- #
- # Reset auth again to verify the password restore does work.
- # Clear auth restores the original credentials and deletes
- # cached auth data.
test_obj.os_primary.auth_provider.clear_auth()
- # Fernet tokens are not subsecond aware and Keystone should only be
- # precise to the second. Sleep to ensure we are passing the second
- # boundary before attempting to authenticate. If token is of type
- # uuid, then do not sleep.
+ # Fernet tokens are not subsecond aware so sleep to ensure we are
+ # passing the second boundary before attempting to authenticate.
+ #
+ # FIXME(felipemonteiro): Rather than skipping sleep if the token
+ # is not uuid, this should instead be skipped if the token is not
+ # Fernet.
if not uuid_utils.is_uuid_like(self.token):
time.sleep(1)
test_obj.os_primary.auth_provider.set_auth()
@@ -140,6 +145,9 @@
* `switch_role` is not called with a boolean value
* `switch_role` is never called in a test file, except in tearDown
* `switch_role` is called with the same boolean value twice
+
+ If a `skipException` is thrown then this is a legitimate reason why
+ `switch_role` is not called.
"""
if not isinstance(toggle_rbac_role, bool):
raise rbac_exceptions.RbacResourceSetupFailed(
diff --git a/patrole_tempest_plugin/tests/unit/test_rbac_policy_parser.py b/patrole_tempest_plugin/tests/unit/test_rbac_policy_parser.py
index 95e6bff..7ce925a 100644
--- a/patrole_tempest_plugin/tests/unit/test_rbac_policy_parser.py
+++ b/patrole_tempest_plugin/tests/unit/test_rbac_policy_parser.py
@@ -40,10 +40,12 @@
def setUp(self):
super(RbacPolicyTest, self).setUp()
- self.mock_admin_mgr = self.patchobject(
- rbac_policy_parser, 'credentials')
- self.mock_admin_mgr.AdminManager().identity_services_v3_client.\
- list_services.return_value = self.services
+
+ m_creds = self.patchobject(rbac_policy_parser, 'credentials')
+ m_creds.AdminManager().identity_services_client.list_services.\
+ return_value = self.services
+ m_creds.AdminManager().identity_services_v3_client.list_services.\
+ return_value = self.services
current_directory = os.path.dirname(os.path.realpath(__file__))
self.custom_policy_file = os.path.join(current_directory,
@@ -71,6 +73,13 @@
if attr in dir(rbac_policy_parser.RbacPolicyParser):
delattr(rbac_policy_parser.RbacPolicyParser, attr)
+ # TODO(fm577c): Use fixture for setting/clearing CONF.
+ CONF.set_override('api_v3', True, group='identity-feature-enabled')
+ self.addCleanup(CONF.clear_override, 'api_v2',
+ group='identity-feature-enabled')
+ self.addCleanup(CONF.clear_override, 'api_v3',
+ group='identity-feature-enabled')
+
def _get_fake_policy_rule(self, name, rule):
fake_rule = mock.Mock(check=rule)
fake_rule.name = name
@@ -449,13 +458,15 @@
@mock.patch.object(rbac_policy_parser, 'policy', autospec=True)
@mock.patch.object(rbac_policy_parser.RbacPolicyParser, '_get_policy_data',
autospec=True)
+ @mock.patch.object(rbac_policy_parser, 'credentials', autospec=True)
@mock.patch.object(rbac_policy_parser, 'os', autospec=True)
- def test_discover_policy_files_with_many_invalid_one_valid(self, m_os, *a):
+ def test_discover_policy_files_with_many_invalid_one_valid(self, m_os,
+ m_creds, *args):
# Only the 3rd path is valid.
m_os.path.isfile.side_effect = [False, False, True, False]
# Ensure the outer for loop runs only once in `discover_policy_files`.
- self.mock_admin_mgr.AdminManager().identity_services_v3_client.\
+ m_creds.AdminManager().identity_services_v3_client.\
list_services.return_value = {
'services': [{'name': 'test_service'}]}
@@ -490,3 +501,94 @@
self.assertNotIn(
'test_service',
rbac_policy_parser.RbacPolicyParser.policy_files.keys())
+
+ def _test_validate_service(self, v2_services, v3_services,
+ expected_failure=False, expected_services=None):
+ with mock.patch.object(rbac_policy_parser, 'credentials') as m_creds:
+ m_creds.AdminManager().identity_services_client.list_services.\
+ return_value = v2_services
+ m_creds.AdminManager().identity_services_v3_client.list_services.\
+ return_value = v3_services
+
+ test_tenant_id = mock.sentinel.tenant_id
+ test_user_id = mock.sentinel.user_id
+
+ mock_os = self.patchobject(rbac_policy_parser, 'os')
+ mock_os.path.join.return_value = self.admin_policy_file
+
+ if not expected_services:
+ expected_services = [s['name'] for s in self.services['services']]
+
+ # Guarantee a blank slate for this test.
+ if hasattr(rbac_policy_parser.RbacPolicyParser, 'available_services'):
+ delattr(rbac_policy_parser.RbacPolicyParser,
+ 'available_services')
+
+ if expected_failure:
+ policy_parser = None
+
+ expected_exception = 'invalid_service is NOT a valid service'
+ with self.assertRaisesRegex(rbac_exceptions.RbacInvalidService,
+ expected_exception):
+ rbac_policy_parser.RbacPolicyParser(
+ test_tenant_id, test_user_id, "INVALID_SERVICE")
+ else:
+ policy_parser = rbac_policy_parser.RbacPolicyParser(
+ test_tenant_id, test_user_id, "tenant_rbac_policy")
+
+ # Check that the attribute is available at object and class levels.
+ # If initialization failed, only check at class level.
+ if policy_parser:
+ self.assertTrue(hasattr(policy_parser, 'available_services'))
+ self.assertEqual(expected_services,
+ policy_parser.available_services)
+ self.assertTrue(hasattr(rbac_policy_parser.RbacPolicyParser,
+ 'available_services'))
+ self.assertEqual(
+ expected_services,
+ rbac_policy_parser.RbacPolicyParser.available_services)
+
+ def test_validate_service(self):
+ """Positive test case to ensure ``validate_service`` works.
+
+ There are 3 possibilities:
+ 1) Identity v3 API enabled.
+ 2) Identity v2 API enabled.
+ 3) Both are enabled.
+ """
+ CONF.set_override('api_v2', True, group='identity-feature-enabled')
+ CONF.set_override('api_v3', False, group='identity-feature-enabled')
+ self._test_validate_service(self.services, [], False)
+
+ CONF.set_override('api_v2', False, group='identity-feature-enabled')
+ CONF.set_override('api_v3', True, group='identity-feature-enabled')
+ self._test_validate_service([], self.services, False)
+
+ CONF.set_override('api_v2', True, group='identity-feature-enabled')
+ CONF.set_override('api_v3', True, group='identity-feature-enabled')
+ self._test_validate_service(self.services, self.services, False)
+
+ def test_validate_service_except_invalid_service(self):
+ """Negative test case to ensure ``validate_service`` works.
+
+ There are 4 possibilities:
+ 1) Identity v3 API enabled.
+ 2) Identity v2 API enabled.
+ 3) Both are enabled.
+ 4) Neither are enabled.
+ """
+ CONF.set_override('api_v2', True, group='identity-feature-enabled')
+ CONF.set_override('api_v3', False, group='identity-feature-enabled')
+ self._test_validate_service(self.services, [], True)
+
+ CONF.set_override('api_v2', False, group='identity-feature-enabled')
+ CONF.set_override('api_v3', True, group='identity-feature-enabled')
+ self._test_validate_service([], self.services, True)
+
+ CONF.set_override('api_v2', True, group='identity-feature-enabled')
+ CONF.set_override('api_v3', True, group='identity-feature-enabled')
+ self._test_validate_service(self.services, self.services, True)
+
+ CONF.set_override('api_v2', False, group='identity-feature-enabled')
+ CONF.set_override('api_v3', False, group='identity-feature-enabled')
+ self._test_validate_service([], [], True, [])