Only sleep following a role switch
This commit drops support for not sleeping if the token being
used for keystone auth is UUID, as it is being deprecated
during the "R" release [0]. Thus, the condition for sleeping
has completely changed: Now, time.sleep(1) is only carried
out if the role actually switched (causing a token revocation
event in keystone and hence the need to sleep).
For example, if rbac_test_role = "admin" then there is no need
to sleep (except once, at the beginning, which strips the user
of any additional roles) as setup, API execution and clean up
all require admin. If rbac_test_role = "Member" then sleep
must be carried out each test at least twice, as setup requires
admin, API execution requires Member, and clean up requires
admin.
This commit also adds unit tests to confirm that sleep is
only performed if the role switched. This required
changes to some testing logic.
[0] http://lists.openstack.org/pipermail/openstack-dev/2017-August/121067.html
Change-Id: Iedc97397d9ae76c7f5efed5913850ced470c6c5f
Closes-Bug: #1710237
Depends-On: I34d324575c8117022724a944c034f089eb0c7541
diff --git a/patrole_tempest_plugin/rbac_utils.py b/patrole_tempest_plugin/rbac_utils.py
index 29d41d3..5543cbb 100644
--- a/patrole_tempest_plugin/rbac_utils.py
+++ b/patrole_tempest_plugin/rbac_utils.py
@@ -19,7 +19,7 @@
import time
from oslo_log import log as logging
-import oslo_utils.uuidutils as uuid_utils
+from oslo_utils import excutils
import testtools
from tempest.common import credentials_factory as credentials
@@ -39,15 +39,13 @@
seamlessly swap between admin credentials, needed for setup and clean up,
and primary credentials, needed to perform the API call which does
policy enforcement. The primary credentials always cycle between roles
- defined by ``CONF.identity.admin_role`` and `CONF.rbac.rbac_test_role``.
+ defined by ``[identity] admin_role`` and ``[rbac] rbac_test_role``.
"""
def __init__(self, test_obj):
"""Constructor for ``RbacUtils``.
- :param test_obj: A Tempest test instance.
- :type test_obj: tempest.lib.base.BaseTestCase or
- tempest.test.BaseTestCase
+ :param test_obj: An instance of `tempest.test.BaseTestCase`.
"""
# Since we are going to instantiate a client manager with
# admin credentials, first check if admin is available.
@@ -80,56 +78,83 @@
Switch the role used by `os_primary` credentials to:
* admin if `toggle_rbac_role` is False
- * `CONF.rbac.rbac_test_role` if `toggle_rbac_role` is True
+ * `[rbac] rbac_test_role` if `toggle_rbac_role` is True
- :param test_obj: test object of type tempest.lib.base.BaseTestCase
- :param toggle_rbac_role: role to switch `os_primary` Tempest creds to
+ :param test_obj: An instance of `tempest.test.BaseTestCase`.
+ :param toggle_rbac_role: Role to switch `os_primary` Tempest creds to.
+ :returns: None.
+ :raises RbacResourceSetupFailed: If admin or test roles are missing. Or
+ if `toggle_rbac_role` is not a boolean value or role validation
+ fails.
"""
self.user_id = test_obj.os_primary.credentials.user_id
self.project_id = test_obj.os_primary.credentials.tenant_id
self.token = test_obj.os_primary.auth_provider.get_token()
LOG.debug('Switching role to: %s.', toggle_rbac_role)
+ role_already_present = False
+
try:
- if not self.admin_role_id or not self.rbac_role_id:
- self._get_roles()
+ if not all([self.admin_role_id, self.rbac_role_id]):
+ self._get_roles_by_name()
self._validate_switch_role(test_obj, toggle_rbac_role)
- if toggle_rbac_role:
- self._add_role_to_user(self.rbac_role_id)
- else:
- self._add_role_to_user(self.admin_role_id)
+ target_role = (
+ self.rbac_role_id if toggle_rbac_role else self.admin_role_id)
+ role_already_present = self._list_and_clear_user_roles_on_project(
+ target_role)
+
+ # Do not switch roles if `target_role` already exists.
+ if not role_already_present:
+ self._create_user_role_on_project(target_role)
except Exception as exp:
- LOG.exception(exp)
- raise
+ with excutils.save_and_reraise_exception():
+ LOG.exception(exp)
finally:
test_obj.os_primary.auth_provider.clear_auth()
# Fernet tokens are not subsecond aware so sleep to ensure we are
# passing the second boundary before attempting to authenticate.
- #
- # FIXME(felipemonteiro): Rather than skipping sleep if the token
- # is not uuid, this should instead be skipped if the token is not
- # Fernet.
- if not uuid_utils.is_uuid_like(self.token):
+ # Only sleep if a token revocation occurred as a result of role
+ # switching. This will optimize test runtime in the case where
+ # ``[identity] admin_role`` == ``[rbac] rbac_test_role``.
+ if not role_already_present:
time.sleep(1)
test_obj.os_primary.auth_provider.set_auth()
- def _add_role_to_user(self, role_id):
- role_already_present = self._clear_user_roles(role_id)
- if role_already_present:
- return
+ def _get_roles_by_name(self):
+ available_roles = self.admin_roles_client.list_roles()
+ admin_role_id = rbac_role_id = None
+ for role in available_roles['roles']:
+ if role['name'] == CONF.rbac.rbac_test_role:
+ rbac_role_id = role['id']
+ if role['name'] == CONF.identity.admin_role:
+ admin_role_id = role['id']
+
+ if not all([admin_role_id, rbac_role_id]):
+ msg = ("Roles defined by `[rbac] rbac_test_role` and `[identity] "
+ "admin_role` must be defined in the system.")
+ raise rbac_exceptions.RbacResourceSetupFailed(msg)
+
+ self.admin_role_id = admin_role_id
+ self.rbac_role_id = rbac_role_id
+
+ def _create_user_role_on_project(self, role_id):
self.admin_roles_client.create_user_role_on_project(
self.project_id, self.user_id, role_id)
- def _clear_user_roles(self, role_id):
+ def _list_and_clear_user_roles_on_project(self, role_id):
roles = self.admin_roles_client.list_user_roles_on_project(
self.project_id, self.user_id)['roles']
-
- # If the user already has the role that is required, return early.
role_ids = [role['id'] for role in roles]
- if role_ids == [role_id]:
+
+ # NOTE(felipemonteiro): We do not use ``role_id in role_ids`` here to
+ # avoid over-permission errors: if the current list of roles on the
+ # project includes "admin" and "Member", and we are switching to the
+ # "Member" role, then we must delete the "admin" role. Thus, we only
+ # return early if the user's roles on the project are an exact match.
+ if [role_id] == role_ids:
return True
for role in roles:
@@ -139,11 +164,11 @@
return False
def _validate_switch_role(self, test_obj, toggle_rbac_role):
- """Validates that the rbac role passed to `switch_role` is legal.
+ """Validates that the test role passed to `switch_role` is legal.
Throws an error for the following improper usages of `switch_role`:
* `switch_role` is not called with a boolean value
- * `switch_role` is never called in a test file, except in tearDown
+ * `switch_role` is never called inside a test, except in tearDown
* `switch_role` is called with the same boolean value twice
If a `skipException` is thrown then this is a legitimate reason why
@@ -151,7 +176,7 @@
"""
if not isinstance(toggle_rbac_role, bool):
raise rbac_exceptions.RbacResourceSetupFailed(
- 'toggle_rbac_role must be a boolean value.')
+ '`toggle_rbac_role` must be a boolean value.')
# The unique key is the combination of module path plus class name.
class_name = test_obj.__name__ if isinstance(test_obj, type) else \
@@ -176,25 +201,6 @@
else:
self.switch_role_history[key] = toggle_rbac_role
- def _get_roles(self):
- available_roles = self.admin_roles_client.list_roles()
- admin_role_id = rbac_role_id = None
-
- for role in available_roles['roles']:
- if role['name'] == CONF.rbac.rbac_test_role:
- rbac_role_id = role['id']
- if role['name'] == CONF.identity.admin_role:
- admin_role_id = role['id']
-
- if not admin_role_id or not rbac_role_id:
- msg = "Role with name 'admin' does not exist in the system."\
- if not admin_role_id else "Role defined by rbac_test_role "\
- "does not exist in the system."
- raise rbac_exceptions.RbacResourceSetupFailed(msg)
-
- self.admin_role_id = admin_role_id
- self.rbac_role_id = rbac_role_id
-
def is_admin():
"""Verifies whether the current test role equals the admin role.
diff --git a/patrole_tempest_plugin/tests/unit/fixtures.py b/patrole_tempest_plugin/tests/unit/fixtures.py
index f25e05d..ed50e15 100644
--- a/patrole_tempest_plugin/tests/unit/fixtures.py
+++ b/patrole_tempest_plugin/tests/unit/fixtures.py
@@ -53,15 +53,11 @@
class RbacUtilsFixture(fixtures.Fixture):
- """Fixture for RbacUtils class."""
+ """Fixture for `RbacUtils` class."""
USER_ID = mock.sentinel.user_id
PROJECT_ID = mock.sentinel.project_id
- def __init__(self, **kwargs):
- super(RbacUtilsFixture, self).__init__()
- self.available_roles = None
-
def setUp(self):
super(RbacUtilsFixture, self).setUp()
@@ -81,29 +77,54 @@
self.roles_v3_client = (
self.mock_test_obj.get_client_manager.return_value.roles_v3_client)
+ self.set_roles(['admin', 'member'], [])
+
def switch_role(self, *role_toggles):
"""Instantiate `rbac_utils.RbacUtils` and call `switch_role`.
Create an instance of `rbac_utils.RbacUtils` and call `switch_role`
for each boolean value in `role_toggles`. The number of calls to
- `switch_role` is always 1 + len(role_toggles) because the
+ `switch_role` is always 1 + len(`role_toggles`) because the
`rbac_utils.RbacUtils` constructor automatically calls `switch_role`.
:param role_toggles: the list of boolean values iterated over and
passed to `switch_role`.
"""
- if not self.available_roles:
- self.set_roles('admin', 'member')
-
self.fake_rbac_utils = rbac_utils.RbacUtils(self.mock_test_obj)
+
for role_toggle in role_toggles:
self.fake_rbac_utils.switch_role(self.mock_test_obj, role_toggle)
+ # NOTE(felipemonteiro): Simulate that a role switch has occurred
+ # by updating the user's current role to the new role. This means
+ # that all API actions involved during a role switch -- listing,
+ # deleting and adding roles -- are executed, making it easier to
+ # assert that mock calls were called as expected.
+ new_role = 'member' if role_toggle else 'admin'
+ self.set_roles(['admin', 'member'], [new_role])
- def set_roles(self, *roles):
- """Set the list of available roles in the system to `roles`."""
- self.available_roles = {
+ def set_roles(self, roles, roles_on_project=None):
+ """Set the list of available roles in the system.
+
+ :param roles: List of roles returned by ``list_roles``.
+ :param roles_on_project: List of roles returned by
+ ``list_user_roles_on_project``.
+ :returns: None.
+ """
+ if not roles_on_project:
+ roles_on_project = []
+ if not isinstance(roles, list):
+ roles = [roles]
+ if not isinstance(roles_on_project, list):
+ roles_on_project = [roles_on_project]
+
+ available_roles = {
'roles': [{'name': role, 'id': '%s_id' % role} for role in roles]
}
- self.roles_v3_client.list_user_roles_on_project.return_value =\
- self.available_roles
- self.roles_v3_client.list_roles.return_value = self.available_roles
+ available_project_roles = {
+ 'roles': [{'name': role, 'id': '%s_id' % role}
+ for role in roles_on_project]
+ }
+
+ self.roles_v3_client.list_roles.return_value = available_roles
+ self.roles_v3_client.list_user_roles_on_project.return_value = (
+ available_project_roles)
diff --git a/patrole_tempest_plugin/tests/unit/test_rbac_utils.py b/patrole_tempest_plugin/tests/unit/test_rbac_utils.py
index 79503b1..540283a 100644
--- a/patrole_tempest_plugin/tests/unit/test_rbac_utils.py
+++ b/patrole_tempest_plugin/tests/unit/test_rbac_utils.py
@@ -34,17 +34,19 @@
def test_switch_role_with_missing_admin_role(self):
self.rbac_utils.set_roles('member')
- e = self.assertRaises(rbac_exceptions.RbacResourceSetupFailed,
- self.rbac_utils.switch_role, True)
- self.assertIn("Role with name 'admin' does not exist in the system.",
- e.__str__())
+ error_re = (
+ 'Roles defined by `\[rbac\] rbac_test_role` and `\[identity\] '
+ 'admin_role` must be defined in the system.')
+ self.assertRaisesRegex(rbac_exceptions.RbacResourceSetupFailed,
+ error_re, self.rbac_utils.switch_role)
def test_switch_role_with_missing_rbac_role(self):
self.rbac_utils.set_roles('admin')
- e = self.assertRaises(rbac_exceptions.RbacResourceSetupFailed,
- self.rbac_utils.switch_role, True)
- self.assertIn("Role defined by rbac_test_role does not exist in the "
- "system.", e.__str__())
+ error_re = (
+ 'Roles defined by `\[rbac\] rbac_test_role` and `\[identity\] '
+ 'admin_role` must be defined in the system.')
+ self.assertRaisesRegex(rbac_exceptions.RbacResourceSetupFailed,
+ error_re, self.rbac_utils.switch_role)
def test_switch_role_to_admin_role(self):
self.rbac_utils.switch_role()
@@ -61,6 +63,16 @@
.assert_called_once_with()
mock_time.sleep.assert_called_once_with(1)
+ def test_switch_role_to_admin_role_avoids_role_switch(self):
+ self.rbac_utils.set_roles(['admin', 'member'], 'admin')
+ self.rbac_utils.switch_role()
+
+ roles_client = self.rbac_utils.roles_v3_client
+ mock_time = self.rbac_utils.mock_time
+
+ roles_client.create_user_role_on_project.assert_not_called()
+ mock_time.sleep.assert_not_called()
+
def test_switch_role_to_member_role(self):
self.rbac_utils.switch_role(True)
@@ -80,6 +92,19 @@
[mock.call()] * 2)
mock_time.sleep.assert_has_calls([mock.call(1)] * 2)
+ def test_switch_role_to_member_role_avoids_role_switch(self):
+ self.rbac_utils.set_roles(['admin', 'member'], 'member')
+ self.rbac_utils.switch_role(True)
+
+ roles_client = self.rbac_utils.roles_v3_client
+ mock_time = self.rbac_utils.mock_time
+
+ roles_client.create_user_role_on_project.assert_has_calls([
+ mock.call(self.rbac_utils.PROJECT_ID, self.rbac_utils.USER_ID,
+ 'admin_id')
+ ])
+ mock_time.sleep.assert_called_once_with(1)
+
def test_switch_role_to_member_role_then_admin_role(self):
self.rbac_utils.switch_role(True, False)
@@ -137,19 +162,22 @@
self.assertIn(expected_error_message, str(e))
def test_clear_user_roles(self):
- self.rbac_utils.switch_role(True)
+ # NOTE(felipemonteiro): Set the user's roles on the project to
+ # include 'random' to coerce a role switch, or else it will be
+ # skipped.
+ self.rbac_utils.set_roles(['admin', 'member'], ['member', 'random'])
+ self.rbac_utils.switch_role()
roles_client = self.rbac_utils.roles_v3_client
- roles_client.list_user_roles_on_project.assert_has_calls([
- mock.call(self.rbac_utils.PROJECT_ID, self.rbac_utils.USER_ID)
- ] * 2)
+ roles_client.list_user_roles_on_project.assert_called_once_with(
+ self.rbac_utils.PROJECT_ID, self.rbac_utils.USER_ID)
roles_client.delete_role_from_user_on_project.\
assert_has_calls([
mock.call(mock.sentinel.project_id, mock.sentinel.user_id,
- 'admin_id'),
+ 'member_id'),
mock.call(mock.sentinel.project_id, mock.sentinel.user_id,
- 'member_id')] * 2)
+ 'random_id')])
@mock.patch.object(rbac_utils, 'LOG', autospec=True)
@mock.patch.object(rbac_utils, 'sys', autospec=True)