Merge "Base implementation of override_role for automatic role re-switch"
diff --git a/patrole_tempest_plugin/rbac_rule_validation.py b/patrole_tempest_plugin/rbac_rule_validation.py
index 927c803..82bc1a0 100644
--- a/patrole_tempest_plugin/rbac_rule_validation.py
+++ b/patrole_tempest_plugin/rbac_rule_validation.py
@@ -175,6 +175,8 @@
"OverPermission: Role %s was allowed to perform %s" %
(role, rule))
finally:
+ # TODO(felipemonteiro): Remove the `switch_role` call below
+ # once all the tests have migrated over to `override_role`.
test_obj.rbac_utils.switch_role(test_obj,
toggle_rbac_role=False)
if CONF.patrole_log.enable_reporting:
diff --git a/patrole_tempest_plugin/rbac_utils.py b/patrole_tempest_plugin/rbac_utils.py
index 2bb9eed..4ef0f80 100644
--- a/patrole_tempest_plugin/rbac_utils.py
+++ b/patrole_tempest_plugin/rbac_utils.py
@@ -14,8 +14,9 @@
# under the License.
import abc
+from contextlib import contextmanager
+import debtcollector.removals
import six
-import sys
import time
from oslo_log import log as logging
@@ -32,14 +33,15 @@
class RbacUtils(object):
- """Utility class responsible for switching os_primary role.
+ """Utility class responsible for switching ``os_primary`` role.
This class is responsible for overriding the value of the primary Tempest
- credential's role (i.e. "os_primary" role). By doing so, it is possible to
- seamlessly swap between admin credentials, needed for setup and clean up,
- and primary credentials, needed to perform the API call which does
+ credential's role (i.e. ``os_primary`` role). By doing so, it is possible
+ to seamlessly swap between admin credentials, needed for setup and clean
+ up, and primary credentials, needed to perform the API call which does
policy enforcement. The primary credentials always cycle between roles
- defined by ``CONF.identity.admin_role`` and `CONF.patrole.rbac_test_role``.
+ defined by ``CONF.identity.admin_role`` and
+ ``CONF.patrole.rbac_test_role``.
"""
def __init__(self, test_obj):
@@ -56,18 +58,56 @@
admin_roles_client = admin_mgr.roles_client
self.admin_roles_client = admin_roles_client
- self.switch_role(test_obj, toggle_rbac_role=False)
+ self._override_role(test_obj, False)
- # References the last value of `toggle_rbac_role` that was passed to
- # `switch_role`. Used for ensuring that `switch_role` is correctly used
- # in a test file, so that false positives are prevented. The key used
- # to index into the dictionary is the module path plus class name, which is
- # unique.
- switch_role_history = {}
admin_role_id = None
rbac_role_id = None
- def switch_role(self, test_obj, toggle_rbac_role=False):
+ @contextmanager
+ def override_role(self, test_obj):
+ """Override the role used by ``os_primary`` Tempest credentials.
+
+ Temporarily change the role used by ``os_primary`` credentials to:
+ * ``[patrole] rbac_test_role`` before test execution
+ * ``[identity] admin_role`` after test execution
+
+ Automatically switches to admin role after test execution.
+
+ :param test_obj: Instance of ``tempest.test.BaseTestCase``.
+ :returns: None
+
+ .. warning::
+
+ This function can alter user roles for pre-provisioned credentials.
+ Work is underway to safely clean up after this function.
+
+ Example::
+
+ @rbac_rule_validation.action(service='test',
+ rule='a:test:rule')
+ def test_foo(self):
+ # Allocate test-level resources here.
+ with self.rbac_utils.override_role(self):
+ # The role for `os_primary` has now been overriden. Within
+ # this block, call the API endpoint that enforces the
+ # expected policy specified by "rule" in the decorator.
+ self.foo_service.bar_api_call()
+ # The role is switched back to admin automatically. Note that
+ # if the API call above threw an exception, any code below this
+ # point in the test is not executed.
+ """
+ self._override_role(test_obj, True)
+ try:
+ # Execute the test.
+ yield
+ finally:
+ # This code block is always executed, no matter the result of the
+ # test. Automatically switch back to the admin role for test clean
+ # up.
+ self._override_role(test_obj, False)
+
+ @debtcollector.removals.remove(removal_version='Rocky')
+ def switch_role(self, test_obj, toggle_rbac_role):
"""Switch the role used by `os_primary` Tempest credentials.
Switch the role used by `os_primary` credentials to:
@@ -77,25 +117,34 @@
:param test_obj: test object of type tempest.lib.base.BaseTestCase
:param toggle_rbac_role: role to switch `os_primary` Tempest creds to
"""
+ self._override_role(test_obj, toggle_rbac_role)
+
+ def _override_role(self, test_obj, toggle_rbac_role=False):
+ """Private helper for overriding ``os_primary`` Tempest credentials.
+
+ :param test_obj: test object of type tempest.lib.base.BaseTestCase
+ :param toggle_rbac_role: Boolean value that controls the role that
+ overrides default role of ``os_primary`` credentials.
+ * If True: role is set to ``[patrole] rbac_test_role``
+ * If False: role is set to ``[identity] admin_role``
+ """
self.user_id = test_obj.os_primary.credentials.user_id
self.project_id = test_obj.os_primary.credentials.tenant_id
self.token = test_obj.os_primary.auth_provider.get_token()
- LOG.debug('Switching role to: %s.', toggle_rbac_role)
+ LOG.debug('Overriding role to: %s.', toggle_rbac_role)
role_already_present = False
try:
if not all([self.admin_role_id, self.rbac_role_id]):
self._get_roles_by_name()
- self._validate_switch_role(test_obj, toggle_rbac_role)
-
target_role = (
self.rbac_role_id if toggle_rbac_role else self.admin_role_id)
role_already_present = self._list_and_clear_user_roles_on_project(
target_role)
- # Do not switch roles if `target_role` already exists.
+ # Do not override roles if `target_role` already exists.
if not role_already_present:
self._create_user_role_on_project(target_role)
except Exception as exp:
@@ -106,7 +155,7 @@
# Fernet tokens are not subsecond aware so sleep to ensure we are
# passing the second boundary before attempting to authenticate.
# Only sleep if a token revocation occurred as a result of role
- # switching. This will optimize test runtime in the case where
+ # overriding. This will optimize test runtime in the case where
# ``[identity] admin_role`` == ``[patrole] rbac_test_role``.
if not role_already_present:
time.sleep(1)
@@ -153,44 +202,6 @@
return False
- def _validate_switch_role(self, test_obj, toggle_rbac_role):
- """Validates that the test role passed to `switch_role` is legal.
-
- Throws an error for the following improper usages of `switch_role`:
- * `switch_role` is not called with a boolean value
- * `switch_role` is never called inside a test, except in tearDown
- * `switch_role` is called with the same boolean value twice
-
- If a `skipException` is thrown then this is a legitimate reason why
- `switch_role` is not called.
- """
- if not isinstance(toggle_rbac_role, bool):
- raise rbac_exceptions.RbacResourceSetupFailed(
- '`toggle_rbac_role` must be a boolean value.')
-
- # The unique key is the combination of module path plus class name.
- class_name = test_obj.__name__ if isinstance(test_obj, type) else \
- test_obj.__class__.__name__
- module_name = test_obj.__module__
- key = '%s.%s' % (module_name, class_name)
-
- self.switch_role_history.setdefault(key, None)
-
- if self.switch_role_history[key] == toggle_rbac_role:
- # If an exception was thrown, like a skipException or otherwise,
- # then this is a legitimate reason why `switch_role` was not
- # called, so only raise an exception if no current exception is
- # being handled.
- if sys.exc_info()[0] is None:
- self.switch_role_history[key] = False
- error_message = '`toggle_rbac_role` must not be called with '\
- 'the same bool value twice. Make sure that you included '\
- 'a rbac_utils.switch_role method call inside the test.'
- LOG.error(error_message)
- raise rbac_exceptions.RbacResourceSetupFailed(error_message)
- else:
- self.switch_role_history[key] = toggle_rbac_role
-
def is_admin():
"""Verifies whether the current test role equals the admin role.
diff --git a/patrole_tempest_plugin/tests/api/volume/test_volumes_extend_rbac.py b/patrole_tempest_plugin/tests/api/volume/test_volumes_extend_rbac.py
index 8a34923..18883c9 100644
--- a/patrole_tempest_plugin/tests/api/volume/test_volumes_extend_rbac.py
+++ b/patrole_tempest_plugin/tests/api/volume/test_volumes_extend_rbac.py
@@ -14,22 +14,13 @@
# under the License.
from tempest.common import waiters
-from tempest import config
from tempest.lib import decorators
from patrole_tempest_plugin import rbac_rule_validation
from patrole_tempest_plugin.tests.api.volume import rbac_base
-CONF = config.CONF
-
class VolumesExtendV3RbacTest(rbac_base.BaseVolumeRbacTest):
- credentials = ['primary', 'admin']
-
- @classmethod
- def setup_clients(cls):
- super(VolumesExtendV3RbacTest, cls).setup_clients()
- cls.admin_volumes_client = cls.os_admin.volumes_client_latest
@classmethod
def resource_setup(cls):
@@ -42,8 +33,8 @@
def test_volume_extend(self):
# Extend volume test
extend_size = int(self.volume['size']) + 1
- self.rbac_utils.switch_role(self, toggle_rbac_role=True)
- self.volumes_client.extend_volume(self.volume['id'],
- new_size=extend_size)
+ with self.rbac_utils.override_role(self):
+ self.volumes_client.extend_volume(self.volume['id'],
+ new_size=extend_size)
waiters.wait_for_volume_resource_status(
- self.admin_volumes_client, self.volume['id'], 'available')
+ self.volumes_client, self.volume['id'], 'available')
diff --git a/patrole_tempest_plugin/tests/unit/fixtures.py b/patrole_tempest_plugin/tests/unit/fixtures.py
index 52c2598..4e3387e 100644
--- a/patrole_tempest_plugin/tests/unit/fixtures.py
+++ b/patrole_tempest_plugin/tests/unit/fixtures.py
@@ -92,21 +92,21 @@
self.set_roles(['admin', 'member'], [])
- def switch_role(self, *role_toggles):
- """Instantiate `rbac_utils.RbacUtils` and call `switch_role`.
+ def override_role(self, *role_toggles):
+ """Instantiate `rbac_utils.RbacUtils` and call `override_role`.
- Create an instance of `rbac_utils.RbacUtils` and call `switch_role`
+ Create an instance of `rbac_utils.RbacUtils` and call `override_role`
for each boolean value in `role_toggles`. The number of calls to
- `switch_role` is always 1 + len(`role_toggles`) because the
- `rbac_utils.RbacUtils` constructor automatically calls `switch_role`.
+ `override_role` is always 1 + len(`role_toggles`) because the
+ `rbac_utils.RbacUtils` constructor automatically calls `override_role`.
:param role_toggles: the list of boolean values iterated over and
- passed to `switch_role`.
+ passed to `override_role`.
"""
- self.fake_rbac_utils = rbac_utils.RbacUtils(self.mock_test_obj)
+ _rbac_utils = rbac_utils.RbacUtils(self.mock_test_obj)
for role_toggle in role_toggles:
- self.fake_rbac_utils.switch_role(self.mock_test_obj, role_toggle)
+ _rbac_utils._override_role(self.mock_test_obj, role_toggle)
# NOTE(felipemonteiro): Simulate that a role switch has occurred
# by updating the user's current role to the new role. This means
# that all API actions involved during a role switch -- listing,
diff --git a/patrole_tempest_plugin/tests/unit/test_rbac_utils.py b/patrole_tempest_plugin/tests/unit/test_rbac_utils.py
index 87adff0..0d75c3e 100644
--- a/patrole_tempest_plugin/tests/unit/test_rbac_utils.py
+++ b/patrole_tempest_plugin/tests/unit/test_rbac_utils.py
@@ -16,6 +16,7 @@
import mock
import testtools
+from tempest.lib import exceptions as lib_exc
from tempest.tests import base
from patrole_tempest_plugin import rbac_exceptions
@@ -29,27 +30,27 @@
super(RBACUtilsTest, self).setUp()
# Reset the role history after each test run to avoid validation
# errors between tests.
- rbac_utils.RbacUtils.switch_role_history = {}
+ rbac_utils.RbacUtils.override_role_history = {}
self.rbac_utils = self.useFixture(patrole_fixtures.RbacUtilsFixture())
- def test_switch_role_with_missing_admin_role(self):
+ def test_override_role_with_missing_admin_role(self):
self.rbac_utils.set_roles('member')
error_re = (
'Roles defined by `\[patrole\] rbac_test_role` and `\[identity\] '
'admin_role` must be defined in the system.')
self.assertRaisesRegex(rbac_exceptions.RbacResourceSetupFailed,
- error_re, self.rbac_utils.switch_role)
+ error_re, self.rbac_utils.override_role)
- def test_switch_role_with_missing_rbac_role(self):
+ def test_override_role_with_missing_rbac_role(self):
self.rbac_utils.set_roles('admin')
error_re = (
'Roles defined by `\[patrole\] rbac_test_role` and `\[identity\] '
'admin_role` must be defined in the system.')
self.assertRaisesRegex(rbac_exceptions.RbacResourceSetupFailed,
- error_re, self.rbac_utils.switch_role)
+ error_re, self.rbac_utils.override_role)
- def test_switch_role_to_admin_role(self):
- self.rbac_utils.switch_role()
+ def test_override_role_to_admin_role(self):
+ self.rbac_utils.override_role()
mock_test_obj = self.rbac_utils.mock_test_obj
roles_client = self.rbac_utils.roles_v3_client
@@ -63,9 +64,9 @@
.assert_called_once_with()
mock_time.sleep.assert_called_once_with(1)
- def test_switch_role_to_admin_role_avoids_role_switch(self):
+ def test_override_role_to_admin_role_avoids_role_switch(self):
self.rbac_utils.set_roles(['admin', 'member'], 'admin')
- self.rbac_utils.switch_role()
+ self.rbac_utils.override_role()
roles_client = self.rbac_utils.roles_v3_client
mock_time = self.rbac_utils.mock_time
@@ -73,8 +74,8 @@
roles_client.create_user_role_on_project.assert_not_called()
mock_time.sleep.assert_not_called()
- def test_switch_role_to_member_role(self):
- self.rbac_utils.switch_role(True)
+ def test_override_role_to_member_role(self):
+ self.rbac_utils.override_role(True)
mock_test_obj = self.rbac_utils.mock_test_obj
roles_client = self.rbac_utils.roles_v3_client
@@ -92,9 +93,9 @@
[mock.call()] * 2)
mock_time.sleep.assert_has_calls([mock.call(1)] * 2)
- def test_switch_role_to_member_role_avoids_role_switch(self):
+ def test_override_role_to_member_role_avoids_role_switch(self):
self.rbac_utils.set_roles(['admin', 'member'], 'member')
- self.rbac_utils.switch_role(True)
+ self.rbac_utils.override_role(True)
roles_client = self.rbac_utils.roles_v3_client
mock_time = self.rbac_utils.mock_time
@@ -105,8 +106,8 @@
])
mock_time.sleep.assert_called_once_with(1)
- def test_switch_role_to_member_role_then_admin_role(self):
- self.rbac_utils.switch_role(True, False)
+ def test_override_role_to_member_role_then_admin_role(self):
+ self.rbac_utils.override_role(True, False)
mock_test_obj = self.rbac_utils.mock_test_obj
roles_client = self.rbac_utils.roles_v3_client
@@ -126,47 +127,12 @@
[mock.call()] * 3)
mock_time.sleep.assert_has_calls([mock.call(1)] * 3)
- def test_switch_role_without_boolean_value(self):
- self.assertRaises(rbac_exceptions.RbacResourceSetupFailed,
- self.rbac_utils.switch_role, "admin")
- self.assertRaises(rbac_exceptions.RbacResourceSetupFailed,
- self.rbac_utils.switch_role, None)
-
- def test_switch_role_with_false_value_twice(self):
- expected_error_message = (
- '`toggle_rbac_role` must not be called with the same bool value '
- 'twice. Make sure that you included a rbac_utils.switch_role '
- 'method call inside the test.')
-
- e = self.assertRaises(rbac_exceptions.RbacResourceSetupFailed,
- self.rbac_utils.switch_role, False)
- self.assertIn(expected_error_message, str(e))
-
- e = self.assertRaises(rbac_exceptions.RbacResourceSetupFailed,
- self.rbac_utils.switch_role, True, False, False)
- self.assertIn(expected_error_message, str(e))
-
- def test_switch_role_with_true_value_twice(self):
- expected_error_message = (
- '`toggle_rbac_role` must not be called with the same bool value '
- 'twice. Make sure that you included a rbac_utils.switch_role '
- 'method call inside the test.')
-
- e = self.assertRaises(rbac_exceptions.RbacResourceSetupFailed,
- self.rbac_utils.switch_role, True, True)
- self.assertIn(expected_error_message, str(e))
-
- e = self.assertRaises(rbac_exceptions.RbacResourceSetupFailed,
- self.rbac_utils.switch_role, True, False, True,
- True)
- self.assertIn(expected_error_message, str(e))
-
def test_clear_user_roles(self):
# NOTE(felipemonteiro): Set the user's roles on the project to
# include 'random' to coerce a role switch, or else it will be
# skipped.
self.rbac_utils.set_roles(['admin', 'member'], ['member', 'random'])
- self.rbac_utils.switch_role()
+ self.rbac_utils.override_role()
roles_client = self.rbac_utils.roles_v3_client
@@ -179,20 +145,57 @@
mock.call(mock.sentinel.project_id, mock.sentinel.user_id,
'random_id')])
- @mock.patch.object(rbac_utils, 'LOG', autospec=True)
- @mock.patch.object(rbac_utils, 'sys', autospec=True)
- def test_switch_roles_with_unexpected_exception(self, mock_sys, mock_log):
- """Test whether unexpected exceptions don't throw error.
-
- If an unexpected exception or skip exception is raised, then that
- should not result in an error being raised.
+ @mock.patch.object(rbac_utils.RbacUtils, '_override_role', autospec=True)
+ def test_override_role_context_manager_simulate_pass(self,
+ mock_override_role):
+ """Validate that expected override_role calls are made when switching
+ to admin role for success path.
"""
- unexpected_exceptions = [testtools.TestCase.skipException,
- AttributeError]
+ test_obj = mock.MagicMock()
+ _rbac_utils = rbac_utils.RbacUtils(test_obj)
- for unexpected_exception in unexpected_exceptions:
- mock_sys.exc_info.return_value = [unexpected_exception()]
- # Ordinarily calling switch_role twice with the same value should
- # result in an error being thrown -- but not in this case.
- self.rbac_utils.switch_role(False)
- mock_log.error.assert_not_called()
+ # Validate constructor called _override_role with False.
+ mock_override_role.assert_called_once_with(_rbac_utils, test_obj,
+ False)
+ mock_override_role.reset_mock()
+
+ with _rbac_utils.override_role(test_obj):
+ # Validate `override_role` public method called private method
+ # `_override_role` with True.
+ mock_override_role.assert_called_once_with(_rbac_utils, test_obj,
+ True)
+ mock_override_role.reset_mock()
+ # Validate that `override_role` switched back to admin role after
+ # contextmanager.
+ mock_override_role.assert_called_once_with(_rbac_utils, test_obj,
+ False)
+
+ @mock.patch.object(rbac_utils.RbacUtils, '_override_role', autospec=True)
+ def test_override_role_context_manager_simulate_fail(self,
+ mock_override_role):
+ """Validate that expected override_role calls are made when switching
+ to admin role for failure path (i.e. when test raises exception).
+ """
+ test_obj = mock.MagicMock()
+ _rbac_utils = rbac_utils.RbacUtils(test_obj)
+
+ # Validate constructor called _override_role with False.
+ mock_override_role.assert_called_once_with(_rbac_utils, test_obj,
+ False)
+ mock_override_role.reset_mock()
+
+ def _do_test():
+ with _rbac_utils.override_role(test_obj):
+ # Validate `override_role` public method called private method
+ # `_override_role` with True.
+ mock_override_role.assert_called_once_with(
+ _rbac_utils, test_obj, True)
+ mock_override_role.reset_mock()
+ # Raise exc to verify role switch works for negative case.
+ raise lib_exc.Forbidden()
+
+ # Validate that role is switched back to admin, despite test failure.
+ with testtools.ExpectedException(lib_exc.Forbidden):
+ _do_test()
+ mock_override_role.assert_called_once_with(_rbac_utils, test_obj,
+ False)
diff --git a/releasenotes/notes/deprecate-rbac-utils-switch-role-a959f7bb3ebab353.yaml b/releasenotes/notes/deprecate-rbac-utils-switch-role-a959f7bb3ebab353.yaml
new file mode 100644
index 0000000..6b55879
--- /dev/null
+++ b/releasenotes/notes/deprecate-rbac-utils-switch-role-a959f7bb3ebab353.yaml
@@ -0,0 +1,13 @@
+---
+features:
+ - |
+ Implemented a new method ``override_role`` in ``rbac_utils`` module, which
+ provides the exact same functionality as the now-deprecated ``switch_role``
+ method, with one difference: ``override_role`` is a contextmanager which
+ provides better policy validation granularity. This means that immediately
+ after the contextmanager's code has executed, the role is switched back
+ to the admin role automatically.
+deprecations:
+ - |
+ The ``switch_role`` method in ``rbac_utils`` module has been deprecated
+ and will be removed during the Rocky release cycle.