Refactoring RbacUtils
Remove RbacUtils class and move all functionality to RbacUtilsMixin.
Story: 2002604
Task: 22223
Change-Id: If476be8fd3df78b28669ca940ebeb288af534899
diff --git a/patrole_tempest_plugin/rbac_utils.py b/patrole_tempest_plugin/rbac_utils.py
index a587b72..6cf3c46 100644
--- a/patrole_tempest_plugin/rbac_utils.py
+++ b/patrole_tempest_plugin/rbac_utils.py
@@ -102,8 +102,23 @@
self._validate_func()
-class RbacUtils(object):
- """Utility class responsible for switching ``os_primary`` role.
+class RbacUtilsMixin(object):
+ """Utility mixin responsible for switching ``os_primary`` role.
+
+ Should be used as a mixin class alongside an instance of
+ :py:class:`tempest.test.BaseTestCase` to perform Patrole class setup for a
+ base RBAC class. Child classes should not use this mixin.
+
+ Example::
+
+ class BaseRbacTest(rbac_utils.RbacUtilsMixin, base.BaseV2ComputeTest):
+
+ @classmethod
+ def setup_clients(cls):
+ super(BaseRbacTest, cls).setup_clients()
+
+ cls.hosts_client = cls.os_primary.hosts_client
+ ...
This class is responsible for overriding the value of the primary Tempest
credential's role (i.e. ``os_primary`` role). By doing so, it is possible
@@ -114,15 +129,29 @@
``CONF.patrole.rbac_test_roles``.
"""
- def __init__(self, test_obj):
- """Constructor for ``RbacUtils``.
+ def __init__(self, *args, **kwargs):
+ super(RbacUtilsMixin, self).__init__(*args, **kwargs)
- :param test_obj: An instance of `tempest.test.BaseTestCase`.
- """
- self.admin_role_id = None
- self.rbac_role_ids = None
- self._role_map = None
+ # Shows if override_role was called.
+ self.__override_role_called = False
+ # Shows if exception raised during override_role.
+ self.__override_role_caught_exc = False
+ _admin_role_id = None
+ _rbac_role_ids = None
+ _project_id = None
+ _user_id = None
+ _role_map = None
+ _role_inferences_mapping = None
+
+ admin_roles_client = None
+
+ @property
+ def rbac_utils(self):
+ return self
+
+ @classmethod
+ def setup_clients(cls):
# Intialize the admin roles_client to perform role switching.
admin_mgr = clients.Manager(
credentials.get_configured_admin_credentials())
@@ -132,16 +161,20 @@
raise lib_exc.InvalidConfiguration(
"Patrole role overriding only supports v3 identity API.")
- self.admin_roles_client = admin_roles_client
+ cls.admin_roles_client = admin_roles_client
- self.user_id = test_obj.os_primary.credentials.user_id
- self.project_id = test_obj.os_primary.credentials.tenant_id
- self._role_inferences_mapping = self._prepare_role_inferences_mapping()
+ cls._project_id = cls.os_primary.credentials.tenant_id
+ cls._user_id = cls.os_primary.credentials.user_id
+ cls._role_inferences_mapping = cls._prepare_role_inferences_mapping()
+
+ cls._init_roles()
# Change default role to admin
- self._override_role(test_obj, False)
+ cls._override_role(False)
+ super(RbacUtilsMixin, cls).setup_clients()
- def _prepare_role_inferences_mapping(self):
+ @classmethod
+ def _prepare_role_inferences_mapping(cls):
"""Preparing roles mapping to support role inferences
Making query to `list-all-role-inference-rules`_ keystone API
@@ -186,7 +219,7 @@
res[prior_role] = implies
return res
- raw_data = self.admin_roles_client.list_all_role_inference_rules()
+ raw_data = cls.admin_roles_client.list_all_role_inference_rules()
data = convert_data(raw_data['role_inferences'])
res = {}
for role_id in data:
@@ -207,15 +240,17 @@
"""
res = set(r for r in roles)
for role in res.copy():
- role_id = self._role_map.get(role)
- implied_roles = self._role_inferences_mapping.get(role_id, set())
- role_names = {self._role_map[rid] for rid in implied_roles}
+ role_id = self.__class__._role_map.get(role)
+ implied_roles = self.__class__._role_inferences_mapping.get(
+ role_id, set())
+ role_names = {self.__class__._role_map[rid]
+ for rid in implied_roles}
res.update(role_names)
LOG.debug('All needed roles: %s; Base roles: %s', res, roles)
return list(res)
@contextlib.contextmanager
- def override_role(self, test_obj):
+ def override_role(self, test_obj=None):
"""Override the role used by ``os_primary`` Tempest credentials.
Temporarily change the role used by ``os_primary`` credentials to:
@@ -225,7 +260,6 @@
Automatically switches to admin role after test execution.
- :param test_obj: Instance of ``tempest.test.BaseTestCase``.
:returns: None
.. warning::
@@ -248,8 +282,8 @@
# if the API call above threw an exception, any code below this
# point in the test is not executed.
"""
- test_obj._set_override_role_called()
- self._override_role(test_obj, True)
+ self._set_override_role_called()
+ self._override_role(True)
try:
# Execute the test.
yield
@@ -258,16 +292,16 @@
# for future validation.
exc = sys.exc_info()[0]
if exc is not None:
- test_obj._set_override_role_caught_exc()
+ self._set_override_role_caught_exc()
# This code block is always executed, no matter the result of the
# test. Automatically switch back to the admin role for test clean
# up.
- self._override_role(test_obj, False)
+ self._override_role(False)
- def _override_role(self, test_obj, toggle_rbac_role=False):
+ @classmethod
+ def _override_role(cls, toggle_rbac_role=False):
"""Private helper for overriding ``os_primary`` Tempest credentials.
- :param test_obj: instance of :py:class:`tempest.test.BaseTestCase`
:param toggle_rbac_role: Boolean value that controls the role that
overrides default role of ``os_primary`` credentials.
* If True: role is set to ``[patrole] rbac_test_role``
@@ -277,22 +311,19 @@
roles_already_present = False
try:
- if not all([self.admin_role_id, self.rbac_role_ids]):
- self._get_roles_by_name()
-
- target_roles = (self.rbac_role_ids
- if toggle_rbac_role else [self.admin_role_id])
- roles_already_present = self._list_and_clear_user_roles_on_project(
+ target_roles = (cls._rbac_role_ids
+ if toggle_rbac_role else [cls._admin_role_id])
+ roles_already_present = cls._list_and_clear_user_roles_on_project(
target_roles)
# Do not override roles if `target_role` already exists.
if not roles_already_present:
- self._create_user_role_on_project(target_roles)
+ cls._create_user_role_on_project(target_roles)
except Exception as exp:
with excutils.save_and_reraise_exception():
LOG.exception(exp)
finally:
- auth_providers = test_obj.get_auth_providers()
+ auth_providers = cls.get_auth_providers()
for provider in auth_providers:
provider.clear_auth()
# Fernet tokens are not subsecond aware so sleep to ensure we are
@@ -306,10 +337,11 @@
for provider in auth_providers:
provider.set_auth()
- def _get_roles_by_name(self):
- available_roles = self.admin_roles_client.list_roles()['roles']
- self._role_map = {r['name']: r['id'] for r in available_roles}
- LOG.debug('Available roles: %s', list(self._role_map.keys()))
+ @classmethod
+ def _init_roles(cls):
+ available_roles = cls.admin_roles_client.list_roles()['roles']
+ cls._role_map = {r['name']: r['id'] for r in available_roles}
+ LOG.debug('Available roles: %s', cls._role_map.keys())
rbac_role_ids = []
roles = CONF.patrole.rbac_test_roles
@@ -319,9 +351,9 @@
roles.append(CONF.patrole.rbac_test_role)
for role_name in roles:
- rbac_role_ids.append(self._role_map.get(role_name))
+ rbac_role_ids.append(cls._role_map.get(role_name))
- admin_role_id = self._role_map.get(CONF.identity.admin_role)
+ admin_role_id = cls._role_map.get(CONF.identity.admin_role)
if not all([admin_role_id, all(rbac_role_ids)]):
missing_roles = []
@@ -332,27 +364,28 @@
missing_roles.append(CONF.identity.admin_role)
if not all(rbac_role_ids):
missing_roles += [role_name for role_name in roles
- if not self._role_map.get(role_name)]
+ if role_name not in cls._role_map]
msg += " Following roles were not found: %s." % (
", ".join(missing_roles))
- msg += " Available roles: %s." % ", ".join(list(
- self._role_map.keys()))
+ msg += " Available roles: %s." % ", ".join(cls._role_map)
raise rbac_exceptions.RbacResourceSetupFailed(msg)
- self.admin_role_id = admin_role_id
- self.rbac_role_ids = rbac_role_ids
+ cls._admin_role_id = admin_role_id
+ cls._rbac_role_ids = rbac_role_ids
# Adding backward mapping
- self._role_map.update({v: k for k, v in self._role_map.items()})
+ cls._role_map.update({v: k for k, v in cls._role_map.items()})
- def _create_user_role_on_project(self, role_ids):
+ @classmethod
+ def _create_user_role_on_project(cls, role_ids):
for role_id in role_ids:
- self.admin_roles_client.create_user_role_on_project(
- self.project_id, self.user_id, role_id)
+ cls.admin_roles_client.create_user_role_on_project(
+ cls._project_id, cls._user_id, role_id)
- def _list_and_clear_user_roles_on_project(self, role_ids):
- roles = self.admin_roles_client.list_user_roles_on_project(
- self.project_id, self.user_id)['roles']
+ @classmethod
+ def _list_and_clear_user_roles_on_project(cls, role_ids):
+ roles = cls.admin_roles_client.list_user_roles_on_project(
+ cls._project_id, cls._user_id)['roles']
all_role_ids = [role['id'] for role in roles]
# NOTE(felipemonteiro): We do not use ``role_id in all_role_ids`` here
@@ -364,13 +397,14 @@
return True
for role in roles:
- self.admin_roles_client.delete_role_from_user_on_project(
- self.project_id, self.user_id, role['id'])
+ cls.admin_roles_client.delete_role_from_user_on_project(
+ cls._project_id, cls._user_id, role['id'])
return False
@contextlib.contextmanager
- def override_role_and_validate_list(self, test_obj, admin_resources=None,
+ def override_role_and_validate_list(self, test_obj=None,
+ admin_resources=None,
admin_resource_id=None):
"""Call ``override_role`` and validate RBAC for a list API action.
@@ -400,38 +434,10 @@
policy_id=self.policy_id)["dscp_marking_rules"]
"""
ctx = _ValidateListContext(admin_resources, admin_resource_id)
- with self.override_role(test_obj):
+ with self.override_role():
yield ctx
ctx._validate()
-
-class RbacUtilsMixin(object):
- """Mixin class to be used alongside an instance of
- :py:class:`tempest.test.BaseTestCase`.
-
- Should be used to perform Patrole class setup for a base RBAC class. Child
- classes should not use this mixin.
-
- Example::
-
- class BaseRbacTest(rbac_utils.RbacUtilsMixin, base.BaseV2ComputeTest):
-
- @classmethod
- def skip_checks(cls):
- super(BaseRbacTest, cls).skip_checks()
- cls.skip_rbac_checks()
-
- @classmethod
- def setup_clients(cls):
- super(BaseRbacTest, cls).setup_clients()
- cls.setup_rbac_utils()
- """
-
- # Shows if override_role was called.
- __override_role_called = False
- # Shows if exception raised during override_role.
- __override_role_caught_exc = False
-
@classmethod
def get_auth_providers(cls):
"""Returns list of auth_providers used within test.
@@ -441,10 +447,6 @@
"""
return [cls.os_primary.auth_provider]
- @classmethod
- def setup_rbac_utils(cls):
- cls.rbac_utils = RbacUtils(cls)
-
def _set_override_role_called(self):
"""Helper for tracking whether ``override_role`` was called."""
self.__override_role_called = True