Add role-switching validation to Patrole framework.
Currently, no role validation is performed when calling switch_role. This
is problematic for the following reasons:
- The only "validation" right now checks whether switchToRbacRole is None.
If so, None is returned. The validation used is nowhere near as robust
as it should be -- what if a string or int is passed in? -- and an error
should be thrown instead of silently returning None.
- If switch_role is called with the same boolean value twice, then the
rbac_role under test is never switched to: this should be detected
and flagged as an error.
- If switch_role is not called in a test, then an error should definitely
be thrown as well, because then the test may pass as a false positive.
This patch adds role validation so that the above cases are avoided.
This patch also updated unit tests and added additional ones
where needed.
Implements: blueprint add-switch-role-validation
Change-Id: Ida0f03af236eb0f91d8cc96d51ca57671b4eef7c
diff --git a/patrole_tempest_plugin/ b/patrole_tempest_plugin/
index db648df..f61ccdf 100644
--- a/patrole_tempest_plugin/
+++ b/patrole_tempest_plugin/
@@ -13,15 +13,15 @@
# License for the specific language governing permissions and limitations
# under the License.
-import oslo_utils.uuidutils as uuid_utils
-import six
+import sys
+import testtools
import time
-from tempest.common import credentials_factory
-from tempest import config
-from tempest.test import BaseTestCase
from oslo_log import log as logging
+import oslo_utils.uuidutils as uuid_utils
+import six
+from tempest import config
from patrole_tempest_plugin import rbac_exceptions
@@ -42,51 +42,40 @@
class RbacUtils(object):
- def __init__(cls):
- creds_provider = credentials_factory.get_credentials_provider(
- name=__name__,
- force_tenant_isolation=True,
- identity_version=BaseTestCase.get_identity_version())
+ # References the last value of `switch_to_rbac_role` that was passed to
+ # `switch_role`. Used for ensuring that `switch_role` is correctly used
+ # in a test file, so that false positives are prevented. The key used
+ # to index into the dictionary is the class name, which is unique.
+ switch_role_history = {}
+ admin_role_id = None
+ rbac_role_id = None
- cls.creds_client = creds_provider.creds_client
- cls.available_roles = cls.creds_client.roles_client.list_roles()
- cls.admin_role_id = cls.rbac_role_id = None
- for item in cls.available_roles['roles']:
- if item['name'] == CONF.rbac.rbac_test_role:
- cls.rbac_role_id = item['id']
- if item['name'] == 'admin':
- cls.admin_role_id = item['id']
+ def switch_role(self, test_obj, switchToRbacRole=False):
+ self.user_id = test_obj.auth_provider.credentials.user_id
+ self.project_id = test_obj.auth_provider.credentials.tenant_id
+ self.token = test_obj.auth_provider.get_token()
+ self.identity_version = test_obj.get_identity_version()
- def switch_role(cls, test_obj, switchToRbacRole=None):
+ if self.identity_version.endswith('v3'):
+ self.roles_client = test_obj.os_admin.roles_v3_client
+ else:
+ self.roles_client = test_obj.os_admin.roles_client
LOG.debug('Switching role to: %s', switchToRbacRole)
- # Check if admin and rbac roles exist.
- if not cls.admin_role_id or not cls.rbac_role_id:
- msg = ("Defined 'rbac_role' or 'admin' role does not exist"
- " in the system.")
- raise rbac_exceptions.RbacResourceSetupFailed(msg)
- if not isinstance(switchToRbacRole, bool):
- msg = ("Wrong value for parameter 'switchToRbacRole' is passed."
- " It should be either 'True' or 'False'.")
- raise rbac_exceptions.RbacResourceSetupFailed(msg)
- user_id = test_obj.auth_provider.credentials.user_id
- project_id = test_obj.auth_provider.credentials.tenant_id
+ if not self.admin_role_id or not self.rbac_role_id:
+ self._get_roles()
- cls._clear_user_roles(user_id, project_id)
+ rbac_utils._validate_switch_role(self, test_obj, switchToRbacRole)
if switchToRbacRole:
- cls.creds_client.roles_client.create_user_role_on_project(
- project_id, user_id, cls.rbac_role_id)
+ self._add_role_to_user(self.rbac_role_id)
- cls.creds_client.roles_client.create_user_role_on_project(
- project_id, user_id, cls.admin_role_id)
+ self._add_role_to_user(self.admin_role_id)
except Exception as exp:
# NOTE(felipemonteiro): These two comments below are copied from
# tempest.api.identity.v2/v3.test_users.
@@ -99,17 +88,80 @@
# precise to the second. Sleep to ensure we are passing the second
# boundary before attempting to authenticate. If token is of type
# uuid, then do not sleep.
- if not uuid_utils.is_uuid_like(cls.creds_client.
- identity_client.token):
+ if not uuid_utils.is_uuid_like(self.token):
- def _clear_user_roles(cls, user_id, tenant_id):
- roles = cls.creds_client.roles_client.list_user_roles_on_project(
- tenant_id, user_id)['roles']
+ def _add_role_to_user(self, role_id):
+ role_already_present = self._clear_user_roles(role_id)
+ if role_already_present:
+ return
+ self.roles_client.create_user_role_on_project(
+ self.project_id, self.user_id, role_id)
+ def _clear_user_roles(self, role_id):
+ roles = self.roles_client.list_user_roles_on_project(
+ self.project_id, self.user_id)['roles']
+ # If the user already has the role that is required, return early.
+ role_ids = [role['id'] for role in roles]
+ if role_ids == [role_id]:
+ return True
for role in roles:
- cls.creds_client.roles_client.delete_role_from_user_on_project(
- tenant_id, user_id, role['id'])
+ self.roles_client.delete_role_from_user_on_project(
+ self.project_id, self.user_id, role['id'])
+ return False
+ def _validate_switch_role(self, test_obj, switchToRbacRole):
+ """Validates that the rbac role passed to `switch_role` is legal.
+ Throws an error for the following improper usages of `switch_role`:
+ * `switch_role` is not called with a boolean value
+ * `switch_role` is never called in a test file, except in tearDown
+ * `switch_role` is called with the same boolean value twice
+ """
+ if not isinstance(switchToRbacRole, bool):
+ raise rbac_exceptions.RbacResourceSetupFailed(
+ 'switchToRbacRole must be a boolean value.')
+ key = test_obj.__name__ if isinstance(test_obj, type) else \
+ test_obj.__class__.__name__
+ self.switch_role_history.setdefault(key, None)
+ if self.switch_role_history[key] == switchToRbacRole:
+ # If the test was skipped, then this is a legitimate use case,
+ # so do not throw an exception.
+ exc_value = sys.exc_info()[1]
+ if not isinstance(exc_value, testtools.TestCase.skipException):
+ self.switch_role_history[key] = False
+ error_message = '`switchToRbacRole` must not be called with '\
+ 'the same bool value twice. Make sure that you included '\
+ 'a rbac_utils.switch_role method call inside the test.'
+ LOG.error(error_message)
+ raise rbac_exceptions.RbacResourceSetupFailed(error_message)
+ else:
+ self.switch_role_history[key] = switchToRbacRole
+ def _get_roles(self):
+ available_roles = self.roles_client.list_roles()
+ admin_role_id = rbac_role_id = None
+ for role in available_roles['roles']:
+ if role['name'] == CONF.rbac.rbac_test_role:
+ rbac_role_id = role['id']
+ if role['name'] == 'admin':
+ admin_role_id = role['id']
+ if not admin_role_id or not rbac_role_id:
+ msg = "Role with name 'admin' does not exist in the system."\
+ if not admin_role_id else "Role defined by rbac_test_role "\
+ "does not exist in the system."
+ raise rbac_exceptions.RbacResourceSetupFailed(msg)
+ self.admin_role_id = admin_role_id
+ self.rbac_role_id = rbac_role_id
rbac_utils = RbacUtils
diff --git a/patrole_tempest_plugin/tests/api/image/v2/ b/patrole_tempest_plugin/tests/api/image/v2/
index 7d99d55..c53480d 100644
--- a/patrole_tempest_plugin/tests/api/image/v2/
+++ b/patrole_tempest_plugin/tests/api/image/v2/
@@ -40,10 +40,6 @@
cls.image_client = cls.os.image_client_v2
cls.image_member_client = cls.os.image_member_client_v2
- def setUp(self):
- self.rbac_utils.switch_role(self, switchToRbacRole=False)
- super(ImagesMemberRbacTest, self).setUp()
diff --git a/patrole_tempest_plugin/tests/unit/ b/patrole_tempest_plugin/tests/unit/
index add1770..692c0b9 100644
--- a/patrole_tempest_plugin/tests/unit/
+++ b/patrole_tempest_plugin/tests/unit/
@@ -14,6 +14,7 @@
# under the License.
import mock
+import testtools
from tempest import config
from tempest.lib import exceptions as lib_exc
@@ -27,62 +28,87 @@
class RBACUtilsTest(base.TestCase):
- @mock.patch.object(rbac_utils, 'time', autospec=True)
- def setUp(self, _):
+ def setUp(self):
super(RBACUtilsTest, self).setUp()
- self.mock_creds_provider = mock.patch.object(
- rbac_utils, 'credentials_factory', autospec=True).start()
available_roles = {
'roles': [
{'name': 'admin', 'id': 'admin_id'},
{'name': 'Member', 'id': 'member_id'}
- self.mock_creds_provider.get_credentials_provider.return_value.\
- creds_client.roles_client.list_roles.return_value = \
- available_roles
- self.addCleanup(mock.patch.stopall)
- CONF.set_override('rbac_test_role', 'Member', group='rbac',
- enforce_type=True)
- self.addCleanup(CONF.clear_override, 'rbac_test_role', group='rbac')
# Because rbac_utils is a singleton, reset all of its role-related
# parameters to the correct values for each test run.
self.rbac_utils = rbac_utils.rbac_utils()
- self.rbac_utils.available_roles = available_roles
+ self.rbac_utils.switch_role_history = {}
self.rbac_utils.admin_role_id = 'admin_id'
self.rbac_utils.rbac_role_id = 'member_id'
- def test_initialization_with_missing_admin_role(self):
- self.rbac_utils.admin_role_id = None
- e = self.assertRaises(rbac_exceptions.RbacResourceSetupFailed,
- self.rbac_utils.switch_role, None)
- self.assertIn("Defined 'rbac_role' or 'admin' role does not exist"
- " in the system.", e.__str__())
+ self.mock_test_obj = mock.Mock()
+ self.mock_test_obj.auth_provider = mock.Mock(
+ **{'credentials.user_id': mock.sentinel.user_id,
+ 'credentials.tenant_id': mock.sentinel.project_id})
+ self.mock_test_obj.os_admin = mock.Mock(
+ **{'roles_v3_client.list_roles.return_value': available_roles})
- def test_initialization_with_missing_rbac_role(self):
+ CONF.set_override('rbac_test_role', 'Member', group='rbac',
+ enforce_type=True)
+ CONF.set_override('auth_version', 'v3', group='identity',
+ enforce_type=True)
+ self.addCleanup(CONF.clear_override, 'rbac_test_role', group='rbac')
+ self.addCleanup(CONF.clear_override, 'auth_version', group='identity')
+ self.addCleanup(mock.patch.stopall)
+ def _mock_list_user_roles_on_project(self, return_value):
+ self.mock_test_obj.admin_manager = mock.Mock(
+ **{'roles_client.list_user_roles_on_project.'
+ 'return_value': {'roles': [{'id': return_value}]}})
+ @mock.patch.object(rbac_utils.rbac_utils, '_clear_user_roles',
+ autospec=True, return_value=False)
+ def test_initialization_with_missing_admin_role(self, _):
+ self.mock_test_obj.os_admin = mock.Mock(
+ **{'roles_v3_client.list_roles.return_value':
+ {'roles': [{'name': 'Member', 'id': 'member_id'}]}})
+ self.rbac_utils.admin_role_id = None
self.rbac_utils.rbac_role_id = None
e = self.assertRaises(rbac_exceptions.RbacResourceSetupFailed,
- self.rbac_utils.switch_role, None)
- self.assertIn("Defined 'rbac_role' or 'admin' role does not exist"
- " in the system.", e.__str__())
+ self.rbac_utils.switch_role, self.mock_test_obj,
+ True)
+ self.assertIn("Role with name 'admin' does not exist in the system.",
+ e.__str__())
+ @mock.patch.object(rbac_utils.rbac_utils, '_clear_user_roles',
+ autospec=True, return_value=False)
+ def test_initialization_with_missing_rbac_role(self, _):
+ self.mock_test_obj.os_admin = mock.Mock(
+ **{'roles_v3_client.list_roles.return_value':
+ {'roles': [{'name': 'admin', 'id': 'admin_id'}]}})
+ self.rbac_utils.admin_role_id = None
+ self.rbac_utils.rbac_role_id = None
+ e = self.assertRaises(rbac_exceptions.RbacResourceSetupFailed,
+ self.rbac_utils.switch_role, self.mock_test_obj,
+ True)
+ self.assertIn("Role defined by rbac_test_role does not exist in the "
+ "system.", e.__str__())
def test_clear_user_roles(self):
- self.rbac_utils.creds_client = mock.Mock()
- creds_client = self.rbac_utils.creds_client
- creds_client.roles_client.list_user_roles_on_project.return_value = {
+ roles_client = self.mock_test_obj.os_admin.roles_v3_client
+ roles_client.list_user_roles_on_project.return_value = {
'roles': [{'id': 'admin_id'}, {'id': 'member_id'}]
- self.rbac_utils._clear_user_roles(mock.sentinel.user_id,
- mock.sentinel.project_id)
+ self.rbac_utils.roles_client = roles_client
+ self.rbac_utils.project_id = mock.sentinel.project_id
+ self.rbac_utils.user_id = mock.sentinel.user_id
- creds_client.roles_client.list_user_roles_on_project.\
+ self.rbac_utils._clear_user_roles(None)
+ roles_client.list_user_roles_on_project.\
- creds_client.roles_client.delete_role_from_user_on_project.\
+ roles_client.delete_role_from_user_on_project.\
assert_has_calls([, mock.sentinel.user_id,
@@ -91,65 +117,109 @@
@mock.patch.object(rbac_utils.rbac_utils, '_clear_user_roles',
- autospec=True)
- def test_rbac_utils_switch_role_to_admin(self, mock_clear_user_roles):
- mock_test_object = mock.Mock()
- mock_test_object.auth_provider.credentials.user_id = \
- mock.sentinel.user_id
- mock_test_object.auth_provider.credentials.tenant_id = \
- mock.sentinel.project_id
+ autospec=True, return_value=False)
+ @mock.patch.object(rbac_utils, 'time', autospec=True)
+ def test_rbac_utils_switch_role_to_admin_role(self, mock_time,
+ mock_clear_user_roles):
+ self.rbac_utils.prev_switch_role = True
+ self._mock_list_user_roles_on_project('admin_id')
+ roles_client = self.mock_test_obj.os_admin.roles_v3_client
- self.rbac_utils.creds_client = mock.Mock()
- creds_client = self.rbac_utils.creds_client
+ self.rbac_utils.switch_role(self.mock_test_obj, False)
- self.rbac_utils.switch_role(mock_test_object, False)
- creds_client.roles_client.create_user_role_on_project.\
+ roles_client.create_user_role_on_project.\
- self.rbac_utils, mock.sentinel.user_id, mock.sentinel.project_id)
- mock_test_object.auth_provider.clear_auth.assert_called_once_with()
- mock_test_object.auth_provider.set_auth.assert_called_once_with()
+ self.rbac_utils, 'admin_id')
+ self.mock_test_obj.auth_provider.clear_auth.assert_called_once_with()
+ self.mock_test_obj.auth_provider.set_auth.assert_called_once_with()
+ mock_time.sleep.assert_called_once_with(1)
@mock.patch.object(rbac_utils.rbac_utils, '_clear_user_roles',
- autospec=True)
- def test_rbac_utils_switch_role_to_rbac_role(self, mock_clear_user_roles):
- mock_test_object = mock.Mock()
- mock_test_object.auth_provider.credentials.user_id = \
- mock.sentinel.user_id
- mock_test_object.auth_provider.credentials.tenant_id = \
- mock.sentinel.project_id
+ autospec=True, return_value=False)
+ @mock.patch.object(rbac_utils, 'time', autospec=True)
+ def test_rbac_utils_switch_role_to_rbac_role(self, mock_time,
+ mock_clear_user_roles):
+ self._mock_list_user_roles_on_project('member_id')
+ roles_client = self.mock_test_obj.os_admin.roles_v3_client
- self.rbac_utils.creds_client = mock.Mock()
- creds_client = self.rbac_utils.creds_client
+ self.rbac_utils.switch_role(self.mock_test_obj, True)
- self.rbac_utils.switch_role(mock_test_object, True)
- creds_client.roles_client.create_user_role_on_project.\
+ roles_client.create_user_role_on_project.\
- self.rbac_utils, mock.sentinel.user_id, mock.sentinel.project_id)
- mock_test_object.auth_provider.clear_auth.assert_called_once_with()
- mock_test_object.auth_provider.set_auth.assert_called_once_with()
+ self.rbac_utils, 'member_id')
+ self.mock_test_obj.auth_provider.clear_auth.assert_called_once_with()
+ self.mock_test_obj.auth_provider.set_auth.assert_called_once_with()
+ mock_time.sleep.assert_called_once_with(1)
- def test_rbac_utils_switch_roles_with_invalid_value(self):
- e = self.assertRaises(rbac_exceptions.RbacResourceSetupFailed,
- self.rbac_utils.switch_role, None)
- self.assertIn("Wrong value for parameter 'switchToRbacRole' is passed."
- " It should be either 'True' or 'False'.", e.__str__())
+ def test_RBAC_utils_switch_roles_without_boolean_value(self):
+ self.assertRaises(rbac_exceptions.RbacResourceSetupFailed,
+ self.rbac_utils.switch_role, self.mock_test_obj,
+ "admin")
+ self.assertRaises(rbac_exceptions.RbacResourceSetupFailed,
+ self.rbac_utils.switch_role, self.mock_test_obj,
+ None)
@mock.patch.object(rbac_utils.rbac_utils, '_clear_user_roles',
- autospec=True)
+ autospec=True, return_value=False)
+ def test_rbac_utils_switch_roles_with_false_value_twice(self, _):
+ self._mock_list_user_roles_on_project('admin_id')
+ self.rbac_utils.switch_role(self.mock_test_obj, False)
+ e = self.assertRaises(rbac_exceptions.RbacResourceSetupFailed,
+ self.rbac_utils.switch_role, self.mock_test_obj,
+ False)
+ self.assertIn(
+ '`switchToRbacRole` must not be called with the same bool value '
+ 'twice. Make sure that you included a rbac_utils.switch_role '
+ 'method call inside the test.', str(e))
+ @mock.patch.object(rbac_utils.rbac_utils, '_clear_user_roles',
+ autospec=True, return_value=False)
+ def test_rbac_utils_switch_roles_with_true_value_twice(self, _):
+ self._mock_list_user_roles_on_project('admin_id')
+ self.rbac_utils.switch_role(self.mock_test_obj, True)
+ e = self.assertRaises(rbac_exceptions.RbacResourceSetupFailed,
+ self.rbac_utils.switch_role, self.mock_test_obj,
+ True)
+ self.assertIn(
+ '`switchToRbacRole` must not be called with the same bool value '
+ 'twice. Make sure that you included a rbac_utils.switch_role '
+ 'method call inside the test.', str(e))
+ @mock.patch.object(rbac_utils.rbac_utils, '_clear_user_roles',
+ autospec=True, return_value=False)
+ @mock.patch.object(rbac_utils, 'LOG', autospec=True)
+ @mock.patch.object(rbac_utils, 'sys', autospec=True)
+ def test_rbac_utils_switch_roles_with_skip_exception(self, mock_sys,
+ mock_log, _):
+ self._mock_list_user_roles_on_project('member_id')
+ mock_skip_exception = mock.Mock(spec=testtools.TestCase.skipException)
+ mock_sys.exc_info.return_value = [None, mock_skip_exception]
+ # Ordinarily switching to the same role would result in an error,
+ # but because the skipException is thrown before the test finishes,
+ # this is not treated as a failure.
+ self.rbac_utils.switch_role(self.mock_test_obj, False)
+ self.rbac_utils.switch_role(self.mock_test_obj, False)
+ mock_log.error.assert_not_called()
+ self.rbac_utils.switch_role(self.mock_test_obj, True)
+ self.rbac_utils.switch_role(self.mock_test_obj, True)
+ mock_log.error.assert_not_called()
+ @mock.patch.object(rbac_utils.rbac_utils, '_clear_user_roles',
+ autospec=True, return_value=False)
def test_rbac_utils_switch_role_except_exception(self,
- self.rbac_utils.creds_client = mock.Mock()
- creds_client = self.rbac_utils.creds_client
- creds_client.roles_client.create_user_role_on_project.side_effect =\
+ roles_client = self.mock_test_obj.os_admin.roles_v3_client
+ roles_client.create_user_role_on_project.side_effect =\
self.assertRaises(lib_exc.NotFound, self.rbac_utils.switch_role,
- mock.Mock(), True)
+ self.mock_test_obj, True)