Refactoring RbacUtils
Remove RbacUtils class and move all functionality to RbacUtilsMixin.
Story: 2002604
Task: 22223
Change-Id: If476be8fd3df78b28669ca940ebeb288af534899
diff --git a/patrole_tempest_plugin/rbac_rule_validation.py b/patrole_tempest_plugin/rbac_rule_validation.py
index 5dd4731..8236df9 100644
--- a/patrole_tempest_plugin/rbac_rule_validation.py
+++ b/patrole_tempest_plugin/rbac_rule_validation.py
@@ -345,7 +345,7 @@
roles.append(CONF.patrole.rbac_test_role)
# Adding implied roles
- roles = test_obj.rbac_utils.get_all_needed_roles(roles)
+ roles = test_obj.get_all_needed_roles(roles)
# Test RBAC against custom requirements. Otherwise use oslo.policy.
if CONF.patrole.test_custom_requirements:
diff --git a/patrole_tempest_plugin/rbac_utils.py b/patrole_tempest_plugin/rbac_utils.py
index a587b72..6cf3c46 100644
--- a/patrole_tempest_plugin/rbac_utils.py
+++ b/patrole_tempest_plugin/rbac_utils.py
@@ -102,8 +102,23 @@
self._validate_func()
-class RbacUtils(object):
- """Utility class responsible for switching ``os_primary`` role.
+class RbacUtilsMixin(object):
+ """Utility mixin responsible for switching ``os_primary`` role.
+
+ Should be used as a mixin class alongside an instance of
+ :py:class:`tempest.test.BaseTestCase` to perform Patrole class setup for a
+ base RBAC class. Child classes should not use this mixin.
+
+ Example::
+
+ class BaseRbacTest(rbac_utils.RbacUtilsMixin, base.BaseV2ComputeTest):
+
+ @classmethod
+ def setup_clients(cls):
+ super(BaseRbacTest, cls).setup_clients()
+
+ cls.hosts_client = cls.os_primary.hosts_client
+ ...
This class is responsible for overriding the value of the primary Tempest
credential's role (i.e. ``os_primary`` role). By doing so, it is possible
@@ -114,15 +129,29 @@
``CONF.patrole.rbac_test_roles``.
"""
- def __init__(self, test_obj):
- """Constructor for ``RbacUtils``.
+ def __init__(self, *args, **kwargs):
+ super(RbacUtilsMixin, self).__init__(*args, **kwargs)
- :param test_obj: An instance of `tempest.test.BaseTestCase`.
- """
- self.admin_role_id = None
- self.rbac_role_ids = None
- self._role_map = None
+ # Shows if override_role was called.
+ self.__override_role_called = False
+ # Shows if exception raised during override_role.
+ self.__override_role_caught_exc = False
+ _admin_role_id = None
+ _rbac_role_ids = None
+ _project_id = None
+ _user_id = None
+ _role_map = None
+ _role_inferences_mapping = None
+
+ admin_roles_client = None
+
+ @property
+ def rbac_utils(self):
+ return self
+
+ @classmethod
+ def setup_clients(cls):
# Intialize the admin roles_client to perform role switching.
admin_mgr = clients.Manager(
credentials.get_configured_admin_credentials())
@@ -132,16 +161,20 @@
raise lib_exc.InvalidConfiguration(
"Patrole role overriding only supports v3 identity API.")
- self.admin_roles_client = admin_roles_client
+ cls.admin_roles_client = admin_roles_client
- self.user_id = test_obj.os_primary.credentials.user_id
- self.project_id = test_obj.os_primary.credentials.tenant_id
- self._role_inferences_mapping = self._prepare_role_inferences_mapping()
+ cls._project_id = cls.os_primary.credentials.tenant_id
+ cls._user_id = cls.os_primary.credentials.user_id
+ cls._role_inferences_mapping = cls._prepare_role_inferences_mapping()
+
+ cls._init_roles()
# Change default role to admin
- self._override_role(test_obj, False)
+ cls._override_role(False)
+ super(RbacUtilsMixin, cls).setup_clients()
- def _prepare_role_inferences_mapping(self):
+ @classmethod
+ def _prepare_role_inferences_mapping(cls):
"""Preparing roles mapping to support role inferences
Making query to `list-all-role-inference-rules`_ keystone API
@@ -186,7 +219,7 @@
res[prior_role] = implies
return res
- raw_data = self.admin_roles_client.list_all_role_inference_rules()
+ raw_data = cls.admin_roles_client.list_all_role_inference_rules()
data = convert_data(raw_data['role_inferences'])
res = {}
for role_id in data:
@@ -207,15 +240,17 @@
"""
res = set(r for r in roles)
for role in res.copy():
- role_id = self._role_map.get(role)
- implied_roles = self._role_inferences_mapping.get(role_id, set())
- role_names = {self._role_map[rid] for rid in implied_roles}
+ role_id = self.__class__._role_map.get(role)
+ implied_roles = self.__class__._role_inferences_mapping.get(
+ role_id, set())
+ role_names = {self.__class__._role_map[rid]
+ for rid in implied_roles}
res.update(role_names)
LOG.debug('All needed roles: %s; Base roles: %s', res, roles)
return list(res)
@contextlib.contextmanager
- def override_role(self, test_obj):
+ def override_role(self, test_obj=None):
"""Override the role used by ``os_primary`` Tempest credentials.
Temporarily change the role used by ``os_primary`` credentials to:
@@ -225,7 +260,6 @@
Automatically switches to admin role after test execution.
- :param test_obj: Instance of ``tempest.test.BaseTestCase``.
:returns: None
.. warning::
@@ -248,8 +282,8 @@
# if the API call above threw an exception, any code below this
# point in the test is not executed.
"""
- test_obj._set_override_role_called()
- self._override_role(test_obj, True)
+ self._set_override_role_called()
+ self._override_role(True)
try:
# Execute the test.
yield
@@ -258,16 +292,16 @@
# for future validation.
exc = sys.exc_info()[0]
if exc is not None:
- test_obj._set_override_role_caught_exc()
+ self._set_override_role_caught_exc()
# This code block is always executed, no matter the result of the
# test. Automatically switch back to the admin role for test clean
# up.
- self._override_role(test_obj, False)
+ self._override_role(False)
- def _override_role(self, test_obj, toggle_rbac_role=False):
+ @classmethod
+ def _override_role(cls, toggle_rbac_role=False):
"""Private helper for overriding ``os_primary`` Tempest credentials.
- :param test_obj: instance of :py:class:`tempest.test.BaseTestCase`
:param toggle_rbac_role: Boolean value that controls the role that
overrides default role of ``os_primary`` credentials.
* If True: role is set to ``[patrole] rbac_test_role``
@@ -277,22 +311,19 @@
roles_already_present = False
try:
- if not all([self.admin_role_id, self.rbac_role_ids]):
- self._get_roles_by_name()
-
- target_roles = (self.rbac_role_ids
- if toggle_rbac_role else [self.admin_role_id])
- roles_already_present = self._list_and_clear_user_roles_on_project(
+ target_roles = (cls._rbac_role_ids
+ if toggle_rbac_role else [cls._admin_role_id])
+ roles_already_present = cls._list_and_clear_user_roles_on_project(
target_roles)
# Do not override roles if `target_role` already exists.
if not roles_already_present:
- self._create_user_role_on_project(target_roles)
+ cls._create_user_role_on_project(target_roles)
except Exception as exp:
with excutils.save_and_reraise_exception():
LOG.exception(exp)
finally:
- auth_providers = test_obj.get_auth_providers()
+ auth_providers = cls.get_auth_providers()
for provider in auth_providers:
provider.clear_auth()
# Fernet tokens are not subsecond aware so sleep to ensure we are
@@ -306,10 +337,11 @@
for provider in auth_providers:
provider.set_auth()
- def _get_roles_by_name(self):
- available_roles = self.admin_roles_client.list_roles()['roles']
- self._role_map = {r['name']: r['id'] for r in available_roles}
- LOG.debug('Available roles: %s', list(self._role_map.keys()))
+ @classmethod
+ def _init_roles(cls):
+ available_roles = cls.admin_roles_client.list_roles()['roles']
+ cls._role_map = {r['name']: r['id'] for r in available_roles}
+ LOG.debug('Available roles: %s', cls._role_map.keys())
rbac_role_ids = []
roles = CONF.patrole.rbac_test_roles
@@ -319,9 +351,9 @@
roles.append(CONF.patrole.rbac_test_role)
for role_name in roles:
- rbac_role_ids.append(self._role_map.get(role_name))
+ rbac_role_ids.append(cls._role_map.get(role_name))
- admin_role_id = self._role_map.get(CONF.identity.admin_role)
+ admin_role_id = cls._role_map.get(CONF.identity.admin_role)
if not all([admin_role_id, all(rbac_role_ids)]):
missing_roles = []
@@ -332,27 +364,28 @@
missing_roles.append(CONF.identity.admin_role)
if not all(rbac_role_ids):
missing_roles += [role_name for role_name in roles
- if not self._role_map.get(role_name)]
+ if role_name not in cls._role_map]
msg += " Following roles were not found: %s." % (
", ".join(missing_roles))
- msg += " Available roles: %s." % ", ".join(list(
- self._role_map.keys()))
+ msg += " Available roles: %s." % ", ".join(cls._role_map)
raise rbac_exceptions.RbacResourceSetupFailed(msg)
- self.admin_role_id = admin_role_id
- self.rbac_role_ids = rbac_role_ids
+ cls._admin_role_id = admin_role_id
+ cls._rbac_role_ids = rbac_role_ids
# Adding backward mapping
- self._role_map.update({v: k for k, v in self._role_map.items()})
+ cls._role_map.update({v: k for k, v in cls._role_map.items()})
- def _create_user_role_on_project(self, role_ids):
+ @classmethod
+ def _create_user_role_on_project(cls, role_ids):
for role_id in role_ids:
- self.admin_roles_client.create_user_role_on_project(
- self.project_id, self.user_id, role_id)
+ cls.admin_roles_client.create_user_role_on_project(
+ cls._project_id, cls._user_id, role_id)
- def _list_and_clear_user_roles_on_project(self, role_ids):
- roles = self.admin_roles_client.list_user_roles_on_project(
- self.project_id, self.user_id)['roles']
+ @classmethod
+ def _list_and_clear_user_roles_on_project(cls, role_ids):
+ roles = cls.admin_roles_client.list_user_roles_on_project(
+ cls._project_id, cls._user_id)['roles']
all_role_ids = [role['id'] for role in roles]
# NOTE(felipemonteiro): We do not use ``role_id in all_role_ids`` here
@@ -364,13 +397,14 @@
return True
for role in roles:
- self.admin_roles_client.delete_role_from_user_on_project(
- self.project_id, self.user_id, role['id'])
+ cls.admin_roles_client.delete_role_from_user_on_project(
+ cls._project_id, cls._user_id, role['id'])
return False
@contextlib.contextmanager
- def override_role_and_validate_list(self, test_obj, admin_resources=None,
+ def override_role_and_validate_list(self, test_obj=None,
+ admin_resources=None,
admin_resource_id=None):
"""Call ``override_role`` and validate RBAC for a list API action.
@@ -400,38 +434,10 @@
policy_id=self.policy_id)["dscp_marking_rules"]
"""
ctx = _ValidateListContext(admin_resources, admin_resource_id)
- with self.override_role(test_obj):
+ with self.override_role():
yield ctx
ctx._validate()
-
-class RbacUtilsMixin(object):
- """Mixin class to be used alongside an instance of
- :py:class:`tempest.test.BaseTestCase`.
-
- Should be used to perform Patrole class setup for a base RBAC class. Child
- classes should not use this mixin.
-
- Example::
-
- class BaseRbacTest(rbac_utils.RbacUtilsMixin, base.BaseV2ComputeTest):
-
- @classmethod
- def skip_checks(cls):
- super(BaseRbacTest, cls).skip_checks()
- cls.skip_rbac_checks()
-
- @classmethod
- def setup_clients(cls):
- super(BaseRbacTest, cls).setup_clients()
- cls.setup_rbac_utils()
- """
-
- # Shows if override_role was called.
- __override_role_called = False
- # Shows if exception raised during override_role.
- __override_role_caught_exc = False
-
@classmethod
def get_auth_providers(cls):
"""Returns list of auth_providers used within test.
@@ -441,10 +447,6 @@
"""
return [cls.os_primary.auth_provider]
- @classmethod
- def setup_rbac_utils(cls):
- cls.rbac_utils = RbacUtils(cls)
-
def _set_override_role_called(self):
"""Helper for tracking whether ``override_role`` was called."""
self.__override_role_called = True
diff --git a/patrole_tempest_plugin/tests/api/compute/rbac_base.py b/patrole_tempest_plugin/tests/api/compute/rbac_base.py
index ab4551e..b798b16 100644
--- a/patrole_tempest_plugin/tests/api/compute/rbac_base.py
+++ b/patrole_tempest_plugin/tests/api/compute/rbac_base.py
@@ -24,7 +24,6 @@
@classmethod
def setup_clients(cls):
super(BaseV2ComputeRbacTest, cls).setup_clients()
- cls.setup_rbac_utils()
cls.hosts_client = cls.os_primary.hosts_client
cls.tenant_usages_client = cls.os_primary.tenant_usages_client
cls.networks_client = cls.os_primary.networks_client
diff --git a/patrole_tempest_plugin/tests/api/identity/rbac_base.py b/patrole_tempest_plugin/tests/api/identity/rbac_base.py
index 44f5962..e3fac27 100644
--- a/patrole_tempest_plugin/tests/api/identity/rbac_base.py
+++ b/patrole_tempest_plugin/tests/api/identity/rbac_base.py
@@ -28,11 +28,6 @@
base.BaseIdentityTest):
@classmethod
- def setup_clients(cls):
- super(BaseIdentityRbacTest, cls).setup_clients()
- cls.setup_rbac_utils()
-
- @classmethod
def setup_test_endpoint(cls, service=None):
"""Creates a service and an endpoint for test."""
interface = 'public'
diff --git a/patrole_tempest_plugin/tests/api/image/rbac_base.py b/patrole_tempest_plugin/tests/api/image/rbac_base.py
index becd564..2f7d065 100644
--- a/patrole_tempest_plugin/tests/api/image/rbac_base.py
+++ b/patrole_tempest_plugin/tests/api/image/rbac_base.py
@@ -18,8 +18,4 @@
class BaseV2ImageRbacTest(rbac_utils.RbacUtilsMixin,
image_base.BaseV2ImageTest):
-
- @classmethod
- def setup_clients(cls):
- super(BaseV2ImageRbacTest, cls).setup_clients()
- cls.setup_rbac_utils()
+ pass
diff --git a/patrole_tempest_plugin/tests/api/network/rbac_base.py b/patrole_tempest_plugin/tests/api/network/rbac_base.py
index 62e95c3..7093b5c 100644
--- a/patrole_tempest_plugin/tests/api/network/rbac_base.py
+++ b/patrole_tempest_plugin/tests/api/network/rbac_base.py
@@ -23,11 +23,7 @@
class BaseNetworkRbacTest(rbac_utils.RbacUtilsMixin,
network_base.BaseNetworkTest):
-
- @classmethod
- def setup_clients(cls):
- super(BaseNetworkRbacTest, cls).setup_clients()
- cls.setup_rbac_utils()
+ pass
class BaseNetworkExtRbacTest(BaseNetworkRbacTest):
diff --git a/patrole_tempest_plugin/tests/api/volume/rbac_base.py b/patrole_tempest_plugin/tests/api/volume/rbac_base.py
index 1d0a44d..daf5b6d 100644
--- a/patrole_tempest_plugin/tests/api/volume/rbac_base.py
+++ b/patrole_tempest_plugin/tests/api/volume/rbac_base.py
@@ -31,7 +31,6 @@
@classmethod
def setup_clients(cls):
super(BaseVolumeRbacTest, cls).setup_clients()
- cls.setup_rbac_utils()
cls.volume_hosts_client = cls.os_primary.volume_hosts_client_latest
cls.volume_types_client = cls.os_primary.volume_types_client_latest
cls.groups_client = cls.os_primary.groups_client_latest
diff --git a/patrole_tempest_plugin/tests/unit/base.py b/patrole_tempest_plugin/tests/unit/base.py
index 9a801bd..c08da43 100644
--- a/patrole_tempest_plugin/tests/unit/base.py
+++ b/patrole_tempest_plugin/tests/unit/base.py
@@ -16,17 +16,15 @@
from tempest.tests import base
+from patrole_tempest_plugin.tests.unit import fixtures as patrole_fixtures
+
class TestCase(base.TestCase):
-
"""Test case base class for all unit tests."""
- def get_all_needed_roles(self, roles):
- role_inferences_mapping = {
- "admin": {"member", "reader"},
- "member": {"reader"}
- }
- res = set(r.lower() for r in roles)
- for role in res.copy():
- res.update(role_inferences_mapping.get(role, set()))
- return list(res)
+ def setUp(self):
+ super(TestCase, self).setUp()
+ # Disable patrole log for unit tests.
+ self.useFixture(
+ patrole_fixtures.ConfPatcher(enable_reporting=False,
+ group='patrole_log'))
diff --git a/patrole_tempest_plugin/tests/unit/fixtures.py b/patrole_tempest_plugin/tests/unit/fixtures.py
index f7a9059..41afe9b 100644
--- a/patrole_tempest_plugin/tests/unit/fixtures.py
+++ b/patrole_tempest_plugin/tests/unit/fixtures.py
@@ -16,12 +16,10 @@
"""Fixtures for Patrole tests."""
from __future__ import absolute_import
-from contextlib import contextmanager
import fixtures
import mock
import time
-from tempest import clients
from tempest.common import credentials_factory as credentials
from tempest import config
from tempest import test
@@ -57,81 +55,84 @@
CONF.set_override(k, v, self.group)
-class RbacUtilsFixture(fixtures.Fixture):
+class FakeBaseRbacTest(rbac_utils.RbacUtilsMixin, test.BaseTestCase):
+ os_primary = None
+
+ def runTest(self):
+ pass
+
+
+class RbacUtilsMixinFixture(fixtures.Fixture):
"""Fixture for `RbacUtils` class."""
USER_ID = mock.sentinel.user_id
PROJECT_ID = mock.sentinel.project_id
- def setUp(self):
- super(RbacUtilsFixture, self).setUp()
+ def __init__(self, do_reset_mocks=True, rbac_test_roles=None):
+ self._do_reset_mocks = do_reset_mocks
+ self._rbac_test_roles = rbac_test_roles or ['member']
- self.useFixture(ConfPatcher(rbac_test_roles=['member'],
+ def patchobject(self, target, attribute, *args, **kwargs):
+ p = mock.patch.object(target, attribute, *args, **kwargs)
+ m = p.start()
+ self.addCleanup(p.stop)
+ return m
+
+ def setUp(self):
+ super(RbacUtilsMixinFixture, self).setUp()
+
+ self.useFixture(ConfPatcher(rbac_test_roles=self._rbac_test_roles,
group='patrole'))
self.useFixture(ConfPatcher(
admin_role='admin', auth_version='v3', group='identity'))
self.useFixture(ConfPatcher(
api_v3=True, group='identity-feature-enabled'))
- test_obj_kwargs = {
- 'os_primary.credentials.user_id': self.USER_ID,
- 'os_primary.credentials.tenant_id': self.PROJECT_ID,
- 'os_primary.credentials.project_id': self.PROJECT_ID,
- }
- self.mock_test_obj = mock.Mock(
- __name__='patrole_unit_test', spec=test.BaseTestCase,
- os_primary=mock.Mock(),
- get_auth_providers=mock.Mock(return_value=[mock.Mock()]),
- **test_obj_kwargs)
-
# Mock out functionality that can't be used by unit tests. Mocking out
# time.sleep is a test optimization.
- self.mock_time = mock.patch.object(
- rbac_utils, 'time', __name__='mock_time', spec=time).start()
- mock.patch.object(credentials, 'get_configured_admin_credentials',
- spec=object).start()
- mock_admin_mgr = mock.patch.object(
- clients, 'Manager', spec=clients.Manager,
- roles_v3_client=mock.Mock(), roles_client=mock.Mock()).start()
+ self.mock_time = self.patchobject(rbac_utils, 'time',
+ __name__='mock_time', spec=time)
+ self.patchobject(credentials, 'get_configured_admin_credentials',
+ spec=object)
+ mock_admin_mgr = self.patchobject(rbac_utils.clients, 'Manager',
+ spec=rbac_utils.clients.Manager,
+ roles_v3_client=mock.Mock(),
+ roles_client=mock.Mock())
self.admin_roles_client = mock_admin_mgr.return_value.roles_v3_client
self.admin_roles_client.list_all_role_inference_rules.return_value = {
- "role_inferences": []}
+ "role_inferences": [
+ {
+ "implies": [{"id": "reader_id", "name": "reader"}],
+ "prior_role": {"id": "member_id", "name": "member"}
+ },
+ {
+ "implies": [{"id": "member_id", "name": "member"}],
+ "prior_role": {"id": "admin_id", "name": "admin"}
+ }
+ ]
+ }
- self.set_roles(['admin', 'member'], [])
+ default_roles = {'admin', 'member', 'reader'}.union(
+ set(self._rbac_test_roles))
+ self.set_roles(list(default_roles), [])
- def override_role(self, *role_toggles):
- """Instantiate `rbac_utils.RbacUtils` and call `override_role`.
+ test_obj_kwargs = {
+ 'credentials.user_id': self.USER_ID,
+ 'credentials.tenant_id': self.PROJECT_ID,
+ 'credentials.project_id': self.PROJECT_ID,
+ }
- Create an instance of `rbac_utils.RbacUtils` and call `override_role`
- for each boolean value in `role_toggles`. The number of calls to
- `override_role` is always 1 + len(`role_toggles`) because the
- `rbac_utils.RbacUtils` constructor automatically calls `override_role`.
+ class FakeRbacTest(FakeBaseRbacTest):
+ os_primary = mock.Mock()
- :param role_toggles: the list of boolean values iterated over and
- passed to `override_role`.
- """
- _rbac_utils = rbac_utils.RbacUtils(self.mock_test_obj)
+ FakeRbacTest.os_primary.configure_mock(**test_obj_kwargs)
- for role_toggle in role_toggles:
- _rbac_utils._override_role(self.mock_test_obj, role_toggle)
- # NOTE(felipemonteiro): Simulate that a role switch has occurred
- # by updating the user's current role to the new role. This means
- # that all API actions involved during a role switch -- listing,
- # deleting and adding roles -- are executed, making it easier to
- # assert that mock calls were called as expected.
- new_role = 'member' if role_toggle else 'admin'
- self.set_roles(['admin', 'member'], [new_role])
-
- @contextmanager
- def real_override_role(self, test_obj):
- """Actual call to ``override_role``.
-
- Useful for ensuring all the necessary mocks are performed before
- the method in question is called.
- """
- _rbac_utils = rbac_utils.RbacUtils(test_obj)
- with _rbac_utils.override_role(test_obj):
- yield
+ FakeRbacTest.setUpClass()
+ self.test_obj = FakeRbacTest()
+ if self._do_reset_mocks:
+ self.admin_roles_client.reset_mock()
+ self.test_obj.os_primary.reset_mock()
+ self.mock_time.reset_mock()
def set_roles(self, roles, roles_on_project=None):
"""Set the list of available roles in the system.
@@ -159,28 +160,3 @@
self.admin_roles_client.list_roles.return_value = available_roles
self.admin_roles_client.list_user_roles_on_project.return_value = (
available_project_roles)
-
- def get_all_needed_roles(self, roles):
- self.admin_roles_client.list_all_role_inference_rules.return_value = {
- "role_inferences": [
- {
- "implies": [{"id": "3", "name": "reader"}],
- "prior_role": {"id": "2", "name": "member"}
- },
- {
- "implies": [{"id": "2", "name": "member"}],
- "prior_role": {"id": "1", "name": "admin"}
- }
- ]
- }
-
- # Call real get_all_needed_roles function
- with mock.patch.object(rbac_utils.RbacUtils, '_override_role',
- autospec=True):
- obj = rbac_utils.RbacUtils(mock.Mock())
- obj._role_map = {
- "1": "admin", "admin": "1",
- "2": "member", "member": "2",
- "3": "reader", "reader": "3"
- }
- return obj.get_all_needed_roles(roles)
diff --git a/patrole_tempest_plugin/tests/unit/test_policy_authority.py b/patrole_tempest_plugin/tests/unit/test_policy_authority.py
index 12457cb..185e449 100644
--- a/patrole_tempest_plugin/tests/unit/test_policy_authority.py
+++ b/patrole_tempest_plugin/tests/unit/test_policy_authority.py
@@ -76,6 +76,9 @@
if attr in dir(policy_authority.PolicyAuthority):
delattr(policy_authority.PolicyAuthority, attr)
+ self.test_obj = self.useFixture(fixtures.RbacUtilsMixinFixture()).\
+ test_obj
+
@staticmethod
def _get_fake_policies(rules):
fake_rules = []
@@ -96,7 +99,7 @@
authority = policy_authority.PolicyAuthority(
test_tenant_id, test_user_id, service)
- roles = self.get_all_needed_roles(roles)
+ roles = self.test_obj.get_all_needed_roles(roles)
for rule in allowed_rules:
allowed = authority.allowed(rule, roles)
@@ -289,7 +292,7 @@
for rule in allowed_rules:
allowed = authority.allowed(
- rule, self.get_all_needed_roles(['member']))
+ rule, self.test_obj.get_all_needed_roles(['member']))
self.assertTrue(allowed)
# for sure that roles are in same order
mock_try_rule.call_args[0][2]["roles"] = sorted(
diff --git a/patrole_tempest_plugin/tests/unit/test_rbac_rule_validation.py b/patrole_tempest_plugin/tests/unit/test_rbac_rule_validation.py
index 79e8b1d..05ad7b7 100644
--- a/patrole_tempest_plugin/tests/unit/test_rbac_rule_validation.py
+++ b/patrole_tempest_plugin/tests/unit/test_rbac_rule_validation.py
@@ -20,12 +20,9 @@
import fixtures
from tempest.lib import exceptions
-from tempest import manager
-from tempest import test
from patrole_tempest_plugin import rbac_exceptions
from patrole_tempest_plugin import rbac_rule_validation as rbac_rv
-from patrole_tempest_plugin import rbac_utils
from patrole_tempest_plugin.tests.unit import base
from patrole_tempest_plugin.tests.unit import fixtures as patrole_fixtures
@@ -38,25 +35,10 @@
def setUp(self):
super(BaseRBACRuleValidationTest, self).setUp()
- self.mock_test_args = mock.Mock(spec=test.BaseTestCase)
- self.mock_test_args.os_primary = mock.Mock(spec=manager.Manager)
- self.mock_test_args.rbac_utils = mock.Mock(
- spec_set=rbac_utils.RbacUtils)
- self.mock_test_args.rbac_utils.get_all_needed_roles.side_effect = \
- self.get_all_needed_roles
-
- # Setup credentials for mock client manager.
- mock_creds = mock.Mock(user_id=mock.sentinel.user_id,
- project_id=mock.sentinel.project_id)
- setattr(self.mock_test_args.os_primary, 'credentials', mock_creds)
-
- self.useFixture(
- patrole_fixtures.ConfPatcher(rbac_test_roles=self.test_roles,
- group='patrole'))
- # Disable patrole log for unit tests.
- self.useFixture(
- patrole_fixtures.ConfPatcher(enable_reporting=False,
- group='patrole_log'))
+ self.rbac_utils_fixture = self.useFixture(
+ patrole_fixtures.RbacUtilsMixinFixture(
+ rbac_test_roles=self.test_roles))
+ self.test_obj = self.rbac_utils_fixture.test_obj
class BaseRBACMultiRoleRuleValidationTest(BaseRBACRuleValidationTest):
@@ -89,7 +71,7 @@
def test_policy(*args):
pass
- test_policy(self.mock_test_args)
+ test_policy(self.test_obj)
mock_log.error.assert_not_called()
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
@@ -107,7 +89,7 @@
def test_policy(*args):
raise exceptions.Forbidden()
- test_policy(self.mock_test_args)
+ test_policy(self.test_obj)
mock_log.error.assert_not_called()
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
@@ -131,7 +113,7 @@
"the following actions: \[%s\].*" % (mock.sentinel.action))
self.assertRaisesRegex(
rbac_exceptions.RbacUnderPermissionException, test_re, test_policy,
- self.mock_test_args)
+ self.test_obj)
self.assertRegex(mock_log.error.mock_calls[0][1][0], test_re)
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
@@ -188,7 +170,7 @@
mock.sentinel.action))
self.assertRaisesRegex(
rbac_exceptions.RbacUnderPermissionException, test_re,
- test_policy, self.mock_test_args)
+ test_policy, self.test_obj)
self.assertRegex(mock_log.error.mock_calls[0][1][0], test_re)
_do_test(rbac_exceptions.RbacMissingAttributeResponseBody,
@@ -221,7 +203,7 @@
return_value = allowed
self.assertRaisesRegex(
rbac_exceptions.RbacExpectedWrongException, error_re,
- test_policy, self.mock_test_args)
+ test_policy, self.test_obj)
self.assertTrue(mock_log.error.called)
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
@@ -259,10 +241,10 @@
if error_re:
self.assertRaisesRegex(
rbac_exceptions.RbacUnderPermissionException, error_re,
- test_policy, self.mock_test_args)
+ test_policy, self.test_obj)
self.assertRegex(mock_log.error.mock_calls[0][1][0], error_re)
else:
- test_policy(self.mock_test_args)
+ test_policy(self.test_obj)
mock_log.error.assert_not_called()
mock_log.warning.assert_called_with(
@@ -302,7 +284,7 @@
error_re = ".*OverPermission: .* \[%s\]$" % mock.sentinel.action
self.assertRaisesRegex(rbac_exceptions.RbacOverPermissionException,
- error_re, test_policy, self.mock_test_args)
+ error_re, test_policy, self.test_obj)
self.assertRegex(mock_log.error.mock_calls[0][1][0], error_re)
mock_log.error.reset_mock()
@@ -320,7 +302,7 @@
error_re = 'Attempted to test an invalid policy file or action'
self.assertRaisesRegex(rbac_exceptions.RbacParsingException, error_re,
- test_policy, self.mock_test_args)
+ test_policy, self.test_obj)
mock_authority.PolicyAuthority.assert_called_once_with(
mock.sentinel.project_id, mock.sentinel.user_id,
@@ -403,7 +385,7 @@
(mock.sentinel.action))
self.assertRaisesRegex(
rbac_exceptions.RbacUnderPermissionException, test_re, test_policy,
- self.mock_test_args)
+ self.test_obj)
self.assertRegex(mock_log.error.mock_calls[0][1][0], test_re)
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
@@ -441,10 +423,10 @@
if error_re:
self.assertRaisesRegex(
rbac_exceptions.RbacUnderPermissionException, error_re,
- test_policy, self.mock_test_args)
+ test_policy, self.test_obj)
self.assertRegex(mock_log.error.mock_calls[0][1][0], error_re)
else:
- test_policy(self.mock_test_args)
+ test_policy(self.test_obj)
mock_log.error.assert_not_called()
mock_log.warning.assert_called_with(
@@ -486,7 +468,7 @@
def test_policy(*args):
pass
- test_policy(self.mock_test_args)
+ test_policy(self.test_obj)
self.assertFalse(mock_rbaclog.info.called)
@mock.patch.object(rbac_rv, 'RBACLOG', autospec=True)
@@ -506,7 +488,7 @@
def test_policy(*args):
pass
- test_policy(self.mock_test_args)
+ test_policy(self.test_obj)
mock_rbaclog.info.assert_called_once_with(
"[Service]: %s, [Test]: %s, [Rules]: %s, "
"[Expected]: %s, [Actual]: %s",
@@ -528,12 +510,12 @@
def test_policy(*args):
pass
- test_policy(self.mock_test_args)
+ test_policy(self.test_obj)
policy_authority = mock_authority.PolicyAuthority.return_value
policy_authority.allowed.assert_called_with(
mock.sentinel.action,
- self.get_all_needed_roles(CONF.patrole.rbac_test_roles))
+ self.test_obj.get_all_needed_roles(CONF.patrole.rbac_test_roles))
mock_log.error.assert_not_called()
@@ -545,7 +527,7 @@
evaluated correctly.
"""
mock_authority.PolicyAuthority.return_value.allowed.return_value = True
- expected_roles = self.get_all_needed_roles(
+ expected_roles = self.test_obj.get_all_needed_roles(
CONF.patrole.rbac_test_roles)
def partial_func(x):
@@ -563,14 +545,14 @@
def test_bar_policy(*args):
pass
- test_foo_policy(self.mock_test_args)
+ test_foo_policy(self.test_obj)
policy_authority = mock_authority.PolicyAuthority.return_value
policy_authority.allowed.assert_called_with(
"foo",
expected_roles)
policy_authority.allowed.reset_mock()
- test_bar_policy(self.mock_test_args)
+ test_bar_policy(self.test_obj)
policy_authority = mock_authority.PolicyAuthority.return_value
policy_authority.allowed.assert_called_with(
"qux",
@@ -603,7 +585,7 @@
pass
self.assertRaises(rbac_exceptions.RbacInvalidServiceException,
- test_policy, self.mock_test_args)
+ test_policy, self.test_obj)
class RBACMultiRoleRuleValidationNegativeTest(
@@ -627,7 +609,8 @@
m_authority.allowed.assert_has_calls([
mock.call(
rule,
- self.get_all_needed_roles(CONF.patrole.rbac_test_roles)
+ self.test_obj.get_all_needed_roles(
+ CONF.patrole.rbac_test_roles)
) for rule in rules
])
m_authority.allowed.reset_mock()
@@ -648,7 +631,7 @@
def test_policy(*args):
pass
- test_policy(self.mock_test_args)
+ test_policy(self.test_obj)
self._assert_policy_authority_called_with(rules, mock_authority)
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
@@ -677,7 +660,7 @@
error_re = ".*OverPermission: .* \[%s\]$" % fail_on_action
self.assertRaisesRegex(
rbac_exceptions.RbacOverPermissionException, error_re,
- test_policy, self.mock_test_args)
+ test_policy, self.test_obj)
mock_log.debug.assert_any_call(
"%s: Expecting %d to be raised for policy name: %s",
'test_policy', 403, fail_on_action)
@@ -710,7 +693,7 @@
def _do_test(allowed_list, fail_on_action):
mock_authority.PolicyAuthority.return_value.allowed.\
side_effect = allowed_list
- test_policy(self.mock_test_args)
+ test_policy(self.test_obj)
mock_log.debug.assert_called_with(
"%s: Expecting %d to be raised for policy name: %s",
'test_policy', 403, fail_on_action)
@@ -747,7 +730,7 @@
(rules, rules)).replace('[', '\[').replace(']', '\]')
self.assertRaisesRegex(
rbac_exceptions.RbacUnderPermissionException, error_re,
- test_policy, self.mock_test_args)
+ test_policy, self.test_obj)
self.assertRegex(mock_log.error.mock_calls[0][1][0], error_re)
self._assert_policy_authority_called_with(rules, mock_authority)
@@ -773,7 +756,7 @@
def _do_test(allowed_list, fail_on_action):
mock_authority.PolicyAuthority.return_value.allowed.\
side_effect = allowed_list
- test_policy(self.mock_test_args)
+ test_policy(self.test_obj)
mock_log.debug.assert_called_with(
"%s: Expecting %d to be raised for policy name: %s",
'test_policy', 403, fail_on_action)
@@ -806,7 +789,7 @@
def _do_test(allowed_list, fail_on_action):
mock_authority.PolicyAuthority.return_value.allowed.\
side_effect = allowed_list
- test_policy(self.mock_test_args)
+ test_policy(self.test_obj)
mock_log.debug.assert_called_with(
"%s: Expecting %d to be raised for policy name: %s",
'test_policy', 404, fail_on_action)
@@ -833,7 +816,7 @@
def test_policy(*args):
raise exceptions.Forbidden()
- test_policy(self.mock_test_args)
+ test_policy(self.test_obj)
self._assert_policy_authority_called_with(rules, mock_authority)
# Assert that 403 is expected.
mock_calls = [x[1] for x in mock_log.debug.mock_calls]
@@ -847,7 +830,7 @@
def test_policy(*args):
raise exceptions.Forbidden()
- test_policy(self.mock_test_args)
+ test_policy(self.test_obj)
self._assert_policy_authority_called_with(rules, mock_authority)
# Assert that 403 is expected.
mock_calls = [x[1] for x in mock_log.debug.mock_calls]
@@ -932,7 +915,7 @@
(rules, rules)).replace('[', '\[').replace(']', '\]')
self.assertRaisesRegex(
rbac_exceptions.RbacUnderPermissionException, error_re,
- test_policy, self.mock_test_args)
+ test_policy, self.test_obj)
self.assertRegex(mock_log.error.mock_calls[0][1][0], error_re)
self._assert_policy_authority_called_with(rules, mock_authority)
@@ -951,21 +934,7 @@
def setUp(self):
super(RBACOverrideRoleValidationTest, self).setUp()
- # Mixin automatically initializes __override_role_called to False.
- class FakeRbacTest(rbac_utils.RbacUtilsMixin, test.BaseTestCase):
- def runTest(self):
- pass
-
- # Stub out problematic function calls.
- FakeRbacTest.os_primary = mock.Mock(spec=manager.Manager)
- FakeRbacTest.rbac_utils = self.useFixture(
- patrole_fixtures.RbacUtilsFixture())
- mock_creds = mock.Mock(user_id=mock.sentinel.user_id,
- project_id=mock.sentinel.project_id)
- setattr(FakeRbacTest.os_primary, 'credentials', mock_creds)
- setattr(FakeRbacTest.os_primary, 'auth_provider', mock.Mock())
-
- self.parent_class = FakeRbacTest
+ self.parent_class = self.test_obj.__class__
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
def test_rule_validation_override_role_called_inside_ctx(self,
@@ -981,7 +950,7 @@
@rbac_rv.action(mock.sentinel.service, rules=["fake:rule"],
expected_error_codes=[404])
def test_called(self_):
- with self_.rbac_utils.real_override_role(self_):
+ with self_.override_role():
raise exceptions.NotFound()
child_test = ChildRbacTest()
@@ -991,8 +960,8 @@
def test_rule_validation_override_role_patrole_exception_ignored(
self, mock_authority):
"""Test success case where Patrole exception is raised (which is
- valid in case of e.g. RbacPartialResponseBody) after override_role
- passes.
+ valid in case of e.g. BasePatroleResponseBodyException) after
+ override_role passes.
"""
mock_authority.PolicyAuthority.return_value.allowed.return_value =\
True
@@ -1002,14 +971,14 @@
@rbac_rv.action(mock.sentinel.service, rules=["fake:rule"],
expected_error_codes=[404])
def test_called(self_):
- with self_.rbac_utils.real_override_role(self_):
+ with self_.override_role():
pass
- # Instances of BasePatroleException don't count as they are
- # part of the validation work flow.
- raise rbac_exceptions.BasePatroleException()
+ # Instances of BasePatroleResponseBodyException don't count as
+ # they are part of the validation work flow.
+ raise rbac_exceptions.BasePatroleResponseBodyException()
child_test = ChildRbacTest()
- self.assertRaises(rbac_exceptions.BasePatroleException,
+ self.assertRaises(rbac_exceptions.RbacUnderPermissionException,
child_test.test_called)
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
@@ -1057,7 +1026,7 @@
@rbac_rv.action(mock.sentinel.service, rules=["fake:rule"],
expected_error_codes=[404])
def test_called_after(self_):
- with self_.rbac_utils.real_override_role(self_):
+ with self_.override_role():
pass
# Simulates a test tearDown failure or some such.
raise exception_type()
@@ -1098,7 +1067,7 @@
@rbac_rv.action(mock.sentinel.service, rules=["fake:rule1"],
expected_error_codes=[404])
def test_called(self_):
- with self_.rbac_utils.real_override_role(self_):
+ with self_.override_role():
raise exceptions.NotFound()
@rbac_rv.action(mock.sentinel.service, rules=["fake:rule2"],
diff --git a/patrole_tempest_plugin/tests/unit/test_rbac_utils.py b/patrole_tempest_plugin/tests/unit/test_rbac_utils.py
index 8acc678..e84a56d 100644
--- a/patrole_tempest_plugin/tests/unit/test_rbac_utils.py
+++ b/patrole_tempest_plugin/tests/unit/test_rbac_utils.py
@@ -17,7 +17,6 @@
import testtools
from tempest.lib import exceptions as lib_exc
-from tempest import test
from patrole_tempest_plugin import rbac_exceptions
from patrole_tempest_plugin import rbac_utils
@@ -25,118 +24,132 @@
from patrole_tempest_plugin.tests.unit import fixtures as patrole_fixtures
-class RBACUtilsTest(base.TestCase):
+class RBACUtilsMixinTest(base.TestCase):
def setUp(self):
- super(RBACUtilsTest, self).setUp()
- # Reset the role history after each test run to avoid validation
- # errors between tests.
- rbac_utils.RbacUtils.override_role_history = {}
- self.rbac_utils = self.useFixture(patrole_fixtures.RbacUtilsFixture())
+ super(RBACUtilsMixinTest, self).setUp()
+ self.rbac_utils_fixture = self.useFixture(
+ patrole_fixtures.RbacUtilsMixinFixture())
+ self.test_obj = self.rbac_utils_fixture.test_obj
- def test_override_role_with_missing_admin_role(self):
- self.rbac_utils.set_roles('member')
+ def test_init_roles_with_missing_admin_role(self):
+ self.rbac_utils_fixture.set_roles('member')
error_re = (".*Following roles were not found: admin. Available "
"roles: member.")
self.assertRaisesRegex(rbac_exceptions.RbacResourceSetupFailed,
- error_re, self.rbac_utils.override_role)
+ error_re, self.test_obj._init_roles)
- def test_override_role_with_missing_rbac_role(self):
- self.rbac_utils.set_roles('admin')
+ def test_init_roles_with_missing_rbac_role(self):
+ self.rbac_utils_fixture.set_roles('admin')
error_re = (".*Following roles were not found: member. Available "
"roles: admin.")
self.assertRaisesRegex(rbac_exceptions.RbacResourceSetupFailed,
- error_re, self.rbac_utils.override_role)
+ error_re, self.test_obj._init_roles)
- def test_override_role_to_admin_role(self):
- self.rbac_utils.override_role()
-
- mock_test_obj = self.rbac_utils.mock_test_obj
- roles_client = self.rbac_utils.admin_roles_client
- mock_time = self.rbac_utils.mock_time
+ def test_override_role_to_admin_role_at_creating(self):
+ rbac_utils_fixture = self.useFixture(
+ patrole_fixtures.RbacUtilsMixinFixture(do_reset_mocks=False))
+ test_obj = rbac_utils_fixture.test_obj
+ roles_client = rbac_utils_fixture.admin_roles_client
+ mock_time = rbac_utils_fixture.mock_time
roles_client.create_user_role_on_project.assert_called_once_with(
- self.rbac_utils.PROJECT_ID, self.rbac_utils.USER_ID, 'admin_id')
- mock_test_obj.get_auth_providers()[0].clear_auth\
+ rbac_utils_fixture.PROJECT_ID,
+ rbac_utils_fixture.USER_ID,
+ 'admin_id')
+ test_obj.get_auth_providers()[0].clear_auth.assert_called_once_with()
+ test_obj.get_auth_providers()[0].set_auth.assert_called_once_with()
+ mock_time.sleep.assert_called_once_with(1)
+
+ def test_override_role_to_admin_role(self):
+ self.test_obj._override_role()
+
+ roles_client = self.rbac_utils_fixture.admin_roles_client
+ mock_time = self.rbac_utils_fixture.mock_time
+
+ roles_client.create_user_role_on_project.assert_called_once_with(
+ self.rbac_utils_fixture.PROJECT_ID,
+ self.rbac_utils_fixture.USER_ID,
+ 'admin_id')
+ self.test_obj.get_auth_providers()[0].clear_auth\
.assert_called_once_with()
- mock_test_obj.get_auth_providers()[0].set_auth\
+ self.test_obj.get_auth_providers()[0].set_auth\
.assert_called_once_with()
mock_time.sleep.assert_called_once_with(1)
def test_override_role_to_admin_role_avoids_role_switch(self):
- self.rbac_utils.set_roles(['admin', 'member'], 'admin')
- self.rbac_utils.override_role()
+ self.rbac_utils_fixture.set_roles(['admin', 'member'], 'admin')
+ self.test_obj._override_role()
- roles_client = self.rbac_utils.admin_roles_client
- mock_time = self.rbac_utils.mock_time
+ roles_client = self.rbac_utils_fixture.admin_roles_client
+ mock_time = self.rbac_utils_fixture.mock_time
roles_client.create_user_role_on_project.assert_not_called()
mock_time.sleep.assert_not_called()
def test_override_role_to_member_role(self):
- self.rbac_utils.override_role(True)
+ self.test_obj._override_role(True)
- mock_test_obj = self.rbac_utils.mock_test_obj
- roles_client = self.rbac_utils.admin_roles_client
- mock_time = self.rbac_utils.mock_time
+ roles_client = self.rbac_utils_fixture.admin_roles_client
+ mock_time = self.rbac_utils_fixture.mock_time
roles_client.create_user_role_on_project.assert_has_calls([
- mock.call(self.rbac_utils.PROJECT_ID, self.rbac_utils.USER_ID,
- 'admin_id'),
- mock.call(self.rbac_utils.PROJECT_ID, self.rbac_utils.USER_ID,
+ mock.call(self.rbac_utils_fixture.PROJECT_ID,
+ self.rbac_utils_fixture.USER_ID,
'member_id')
])
- mock_test_obj.get_auth_providers()[0].clear_auth.assert_has_calls(
- [mock.call()] * 2)
- mock_test_obj.get_auth_providers()[0].set_auth.assert_has_calls(
- [mock.call()] * 2)
- mock_time.sleep.assert_has_calls([mock.call(1)] * 2)
+ self.test_obj.get_auth_providers()[0].clear_auth.assert_has_calls(
+ [mock.call()])
+ self.test_obj.get_auth_providers()[0].set_auth.assert_has_calls(
+ [mock.call()])
+ mock_time.sleep.assert_has_calls([mock.call(1)])
def test_override_role_to_member_role_avoids_role_switch(self):
- self.rbac_utils.set_roles(['admin', 'member'], 'member')
- self.rbac_utils.override_role(True)
+ self.rbac_utils_fixture.set_roles(['admin', 'member'], 'member')
+ self.test_obj._override_role(True)
- roles_client = self.rbac_utils.admin_roles_client
- mock_time = self.rbac_utils.mock_time
+ roles_client = self.rbac_utils_fixture.admin_roles_client
+ mock_time = self.rbac_utils_fixture.mock_time
- roles_client.create_user_role_on_project.assert_has_calls([
- mock.call(self.rbac_utils.PROJECT_ID, self.rbac_utils.USER_ID,
- 'admin_id')
- ])
- mock_time.sleep.assert_called_once_with(1)
+ self.assertEqual(0,
+ roles_client.create_user_role_on_project.call_count)
+ self.assertEqual(0,
+ mock_time.sleep.call_count)
def test_override_role_to_member_role_then_admin_role(self):
- self.rbac_utils.override_role(True, False)
+ self.test_obj._override_role(True)
+ self.test_obj._override_role(False)
- mock_test_obj = self.rbac_utils.mock_test_obj
- roles_client = self.rbac_utils.admin_roles_client
- mock_time = self.rbac_utils.mock_time
+ roles_client = self.rbac_utils_fixture.admin_roles_client
+ mock_time = self.rbac_utils_fixture.mock_time
roles_client.create_user_role_on_project.assert_has_calls([
- mock.call(self.rbac_utils.PROJECT_ID, self.rbac_utils.USER_ID,
- 'admin_id'),
- mock.call(self.rbac_utils.PROJECT_ID, self.rbac_utils.USER_ID,
+ mock.call(self.rbac_utils_fixture.PROJECT_ID,
+ self.rbac_utils_fixture.USER_ID,
'member_id'),
- mock.call(self.rbac_utils.PROJECT_ID, self.rbac_utils.USER_ID,
+ mock.call(self.rbac_utils_fixture.PROJECT_ID,
+ self.rbac_utils_fixture.USER_ID,
'admin_id')
])
- mock_test_obj.get_auth_providers()[0].clear_auth.assert_has_calls(
- [mock.call()] * 3)
- mock_test_obj.get_auth_providers()[0].set_auth.assert_has_calls(
- [mock.call()] * 3)
- mock_time.sleep.assert_has_calls([mock.call(1)] * 3)
+ self.test_obj.get_auth_providers()[0].clear_auth.assert_has_calls(
+ [mock.call()] * 2)
+ self.test_obj.get_auth_providers()[0].set_auth.assert_has_calls(
+ [mock.call()] * 2)
+ mock_time.sleep.assert_has_calls([mock.call(1)] * 2)
def test_clear_user_roles(self):
# NOTE(felipemonteiro): Set the user's roles on the project to
# include 'random' to coerce a role switch, or else it will be
# skipped.
- self.rbac_utils.set_roles(['admin', 'member'], ['member', 'random'])
- self.rbac_utils.override_role()
+ self.rbac_utils_fixture.set_roles(['admin', 'member'],
+ ['member', 'random'])
+ self.test_obj._override_role()
- roles_client = self.rbac_utils.admin_roles_client
+ roles_client = self.rbac_utils_fixture.admin_roles_client
roles_client.list_user_roles_on_project.assert_called_once_with(
- self.rbac_utils.PROJECT_ID, self.rbac_utils.USER_ID)
+ self.rbac_utils_fixture.PROJECT_ID,
+ self.rbac_utils_fixture.USER_ID)
roles_client.delete_role_from_user_on_project.\
assert_has_calls([
mock.call(mock.sentinel.project_id, mock.sentinel.user_id,
@@ -144,52 +157,32 @@
mock.call(mock.sentinel.project_id, mock.sentinel.user_id,
'random_id')])
- @mock.patch.object(rbac_utils.RbacUtils, '_override_role', autospec=True)
- def test_override_role_context_manager_simulate_pass(self,
- mock_override_role):
+ def test_override_role_context_manager_simulate_pass(self):
"""Validate that expected override_role calls are made when switching
to admin role for success path.
"""
- test_obj = mock.MagicMock()
- _rbac_utils = rbac_utils.RbacUtils(test_obj)
- # Validate constructor called _override_role with False.
- mock_override_role.assert_called_once_with(_rbac_utils, test_obj,
- False)
- mock_override_role.reset_mock()
-
- with _rbac_utils.override_role(test_obj):
+ mock_override_role = self.patchobject(self.test_obj, '_override_role')
+ with self.test_obj.override_role():
# Validate `override_role` public method called private method
# `_override_role` with True.
- mock_override_role.assert_called_once_with(_rbac_utils, test_obj,
- True)
+ mock_override_role.assert_called_once_with(True)
mock_override_role.reset_mock()
# Validate that `override_role` switched back to admin role after
# contextmanager.
- mock_override_role.assert_called_once_with(_rbac_utils, test_obj,
- False)
+ mock_override_role.assert_called_once_with(False)
- @mock.patch.object(rbac_utils.RbacUtils, '_override_role',
- autospec=True)
- def test_override_role_context_manager_simulate_fail(self,
- mock_override_role):
+ def test_override_role_context_manager_simulate_fail(self):
"""Validate that expected override_role calls are made when switching
to admin role for failure path (i.e. when test raises exception).
"""
- test_obj = mock.MagicMock()
- _rbac_utils = rbac_utils.RbacUtils(test_obj)
-
- # Validate constructor called _override_role with False.
- mock_override_role.assert_called_once_with(_rbac_utils, test_obj,
- False)
- mock_override_role.reset_mock()
+ mock_override_role = self.patchobject(self.test_obj, '_override_role')
def _do_test():
- with _rbac_utils.override_role(test_obj):
+ with self.test_obj.override_role():
# Validate `override_role` public method called private method
# `_override_role` with True.
- mock_override_role.assert_called_once_with(
- _rbac_utils, test_obj, True)
+ mock_override_role.assert_called_once_with(True)
mock_override_role.reset_mock()
# Raise exc to verify role switch works for negative case.
raise lib_exc.Forbidden()
@@ -197,61 +190,51 @@
# Validate that role is switched back to admin, despite test failure.
with testtools.ExpectedException(lib_exc.Forbidden):
_do_test()
- mock_override_role.assert_called_once_with(_rbac_utils, test_obj,
- False)
+ mock_override_role.assert_called_once_with(False)
def test_override_role_and_validate_list(self):
- self.patchobject(rbac_utils.RbacUtils, '_override_role')
- test_obj = mock.MagicMock()
- _rbac_utils = rbac_utils.RbacUtils(test_obj)
- m_override_role = self.patchobject(_rbac_utils, 'override_role')
+ m_override_role = self.patchobject(self.test_obj, 'override_role')
- with (_rbac_utils.override_role_and_validate_list(
- test_obj, 'foo')) as ctx:
+ with (self.test_obj.override_role_and_validate_list(
+ admin_resource_id='foo')) as ctx:
self.assertIsInstance(ctx, rbac_utils._ValidateListContext)
m_validate = self.patchobject(ctx, '_validate')
- m_override_role.assert_called_once_with(test_obj)
+ m_override_role.assert_called_once_with()
m_validate.assert_called_once()
def test_prepare_role_inferences_mapping(self):
- self.patchobject(rbac_utils.RbacUtils, '_override_role')
- test_obj = mock.MagicMock()
- _rbac_utils = rbac_utils.RbacUtils(test_obj)
- _rbac_utils.admin_roles_client.list_all_role_inference_rules.\
+ self.test_obj.admin_roles_client.list_all_role_inference_rules.\
return_value = {
"role_inferences": [
{
- "implies": [{"id": "3", "name": "reader"}],
- "prior_role": {"id": "2", "name": "member"}
+ "implies": [{"id": "reader_id", "name": "reader"}],
+ "prior_role": {"id": "member_id", "name": "member"}
},
{
- "implies": [{"id": "2", "name": "member"}],
- "prior_role": {"id": "1", "name": "admin"}
+ "implies": [{"id": "member_id", "name": "member"}],
+ "prior_role": {"id": "admin_id", "name": "admin"}
}
]
}
expected_role_inferences_mapping = {
- "2": {"3"}, # "member": ["reader"],
- "1": {"2", "3"} # "admin": ["member", "reader"]
+ "member_id": {"reader_id"},
+ "admin_id": {"member_id", "reader_id"}
}
- actual_role_inferences_mapping = _rbac_utils.\
+ actual_role_inferences_mapping = self.test_obj.\
_prepare_role_inferences_mapping()
self.assertEqual(expected_role_inferences_mapping,
actual_role_inferences_mapping)
def test_get_all_needed_roles(self):
- self.patchobject(rbac_utils.RbacUtils, '_override_role')
- test_obj = mock.MagicMock()
- _rbac_utils = rbac_utils.RbacUtils(test_obj)
- _rbac_utils._role_inferences_mapping = {
- "2": {"3"}, # "member": ["reader"],
- "1": {"2", "3"} # "admin": ["member", "reader"]
+ self.test_obj.__class__._role_inferences_mapping = {
+ "member_id": {"reader_id"},
+ "admin_id": {"member_id", "reader_id"}
}
- _rbac_utils._role_map = {
- "1": "admin", "admin": "1",
- "2": "member", "member": "2",
- "3": "reader", "reader": "3"
+ self.test_obj.__class__._role_map = {
+ "admin_id": "admin", "admin": "admin_id",
+ "member_id": "member", "member": "member_id",
+ "reader_id": "reader", "reader": "reader_id"
}
for roles, expected_roles in (
(['admin'], ['admin', 'member', 'reader']),
@@ -262,44 +245,10 @@
(['admin', 'member'], ['admin', 'member', 'reader']),
):
expected_roles = sorted(expected_roles)
- actual_roles = sorted(_rbac_utils.get_all_needed_roles(roles))
+ actual_roles = sorted(self.test_obj.get_all_needed_roles(roles))
self.assertEqual(expected_roles, actual_roles)
-class RBACUtilsMixinTest(base.TestCase):
-
- def setUp(self):
- super(RBACUtilsMixinTest, self).setUp()
-
- class FakeRbacTest(rbac_utils.RbacUtilsMixin, test.BaseTestCase):
-
- @classmethod
- def setup_clients(cls):
- super(FakeRbacTest, cls).setup_clients()
- cls.setup_rbac_utils()
-
- def runTest(self):
- pass
-
- self.parent_class = FakeRbacTest
-
- def test_setup_rbac_utils(self):
- """Validate that the child class has the `rbac_utils` attribute after
- running parent class's `cls.setup_rbac_utils`.
- """
- class ChildRbacTest(self.parent_class):
- pass
-
- child_test = ChildRbacTest()
-
- with mock.patch.object(rbac_utils.RbacUtils, '__init__',
- lambda *args: None):
- child_test.setUpClass()
-
- self.assertTrue(hasattr(child_test, 'rbac_utils'))
- self.assertIsInstance(child_test.rbac_utils, rbac_utils.RbacUtils)
-
-
class ValidateListContextTest(base.TestCase):
@staticmethod
def _get_context(admin_resources=None, admin_resource_id=None):
diff --git a/releasenotes/notes/rbac-utils-refactoring-2f4f1e3b52fcae14.yaml b/releasenotes/notes/rbac-utils-refactoring-2f4f1e3b52fcae14.yaml
new file mode 100644
index 0000000..f6944cf
--- /dev/null
+++ b/releasenotes/notes/rbac-utils-refactoring-2f4f1e3b52fcae14.yaml
@@ -0,0 +1,49 @@
+---
+features:
+ - |
+ Merged ``RbacUtils`` and ``RbacUtilsMixin`` classes. Now there is only
+ ``RbacUtilsMixin`` class. The new class still provides all functionality of
+ the original ``RbacUtils`` class. New implementation simplifies the usage
+ of the rbac utils:
+
+ * there is no need in calling ``cls.setup_rbac_utils()`` function, because
+ it happens automatically at the ``setup_clients`` step.
+
+ * there is no ``rbac_utils`` variable, so if you need to call a
+ ``override_role`` function, just do it using ``self``:
+
+ .. code-block:: python
+
+ with self.override_role():
+ ...
+
+ * there is no need in ``test_obj`` variable for ``override_role`` function,
+ because it can use ``self``.
+
+upgrade:
+ - Remove usage of ``cls.setup_rbac_utils()`` function.
+ - |
+ Remove usage of ``self.rbac_utils`` variable:
+
+ .. code-block:: python
+
+ with self.rbac_utils.override_role(self):
+
+ convert to
+
+ .. code-block:: python
+
+ with self.override_role():
+
+ - |
+ Remove ``test_obj`` in usage of ``override_role`` context manager:
+
+ .. code-block:: python
+
+ with self.override_role(self):
+
+ convert to
+
+ .. code-block:: python
+
+ with self.override_role():