Add role-switching validation to Patrole framework.
Currently, no role validation is performed when calling switch_role. This
is problematic for the following reasons:
- The only "validation" right now checks whether switchToRbacRole is None.
If so, None is returned. The validation used is nowhere near as robust
as it should be -- what if a string or int is passed in? -- and an error
should be thrown instead of silently returning None.
- If switch_role is called with the same boolean value twice, then the
rbac_role under test is never switched to: this should be detected
and flagged as an error.
- If switch_role is not called in a test, then an error should definitely
be thrown as well, because then the test may pass as a false positive.
This patch adds role validation so that the above cases are avoided.
This patch also updated unit tests and added additional ones
where needed.
Implements: blueprint add-switch-role-validation
Change-Id: Ida0f03af236eb0f91d8cc96d51ca57671b4eef7c
diff --git a/patrole_tempest_plugin/rbac_utils.py b/patrole_tempest_plugin/rbac_utils.py
index db648df..f61ccdf 100644
--- a/patrole_tempest_plugin/rbac_utils.py
+++ b/patrole_tempest_plugin/rbac_utils.py
@@ -13,15 +13,15 @@
# License for the specific language governing permissions and limitations
# under the License.
-import oslo_utils.uuidutils as uuid_utils
-import six
+import sys
+import testtools
import time
-from tempest.common import credentials_factory
-from tempest import config
-from tempest.test import BaseTestCase
-
from oslo_log import log as logging
+import oslo_utils.uuidutils as uuid_utils
+import six
+
+from tempest import config
from patrole_tempest_plugin import rbac_exceptions
@@ -42,51 +42,40 @@
@six.add_metaclass(Singleton)
class RbacUtils(object):
- def __init__(cls):
- creds_provider = credentials_factory.get_credentials_provider(
- name=__name__,
- force_tenant_isolation=True,
- identity_version=BaseTestCase.get_identity_version())
+ # References the last value of `switch_to_rbac_role` that was passed to
+ # `switch_role`. Used for ensuring that `switch_role` is correctly used
+ # in a test file, so that false positives are prevented. The key used
+ # to index into the dictionary is the class name, which is unique.
+ switch_role_history = {}
+ admin_role_id = None
+ rbac_role_id = None
- cls.creds_client = creds_provider.creds_client
- cls.available_roles = cls.creds_client.roles_client.list_roles()
- cls.admin_role_id = cls.rbac_role_id = None
- for item in cls.available_roles['roles']:
- if item['name'] == CONF.rbac.rbac_test_role:
- cls.rbac_role_id = item['id']
- if item['name'] == 'admin':
- cls.admin_role_id = item['id']
+ def switch_role(self, test_obj, switchToRbacRole=False):
+ self.user_id = test_obj.auth_provider.credentials.user_id
+ self.project_id = test_obj.auth_provider.credentials.tenant_id
+ self.token = test_obj.auth_provider.get_token()
+ self.identity_version = test_obj.get_identity_version()
- def switch_role(cls, test_obj, switchToRbacRole=None):
+ if self.identity_version.endswith('v3'):
+ self.roles_client = test_obj.os_admin.roles_v3_client
+ else:
+ self.roles_client = test_obj.os_admin.roles_client
+
LOG.debug('Switching role to: %s', switchToRbacRole)
- # Check if admin and rbac roles exist.
- if not cls.admin_role_id or not cls.rbac_role_id:
- msg = ("Defined 'rbac_role' or 'admin' role does not exist"
- " in the system.")
- raise rbac_exceptions.RbacResourceSetupFailed(msg)
-
- if not isinstance(switchToRbacRole, bool):
- msg = ("Wrong value for parameter 'switchToRbacRole' is passed."
- " It should be either 'True' or 'False'.")
- raise rbac_exceptions.RbacResourceSetupFailed(msg)
try:
- user_id = test_obj.auth_provider.credentials.user_id
- project_id = test_obj.auth_provider.credentials.tenant_id
+ if not self.admin_role_id or not self.rbac_role_id:
+ self._get_roles()
- cls._clear_user_roles(user_id, project_id)
+ rbac_utils._validate_switch_role(self, test_obj, switchToRbacRole)
if switchToRbacRole:
- cls.creds_client.roles_client.create_user_role_on_project(
- project_id, user_id, cls.rbac_role_id)
+ self._add_role_to_user(self.rbac_role_id)
else:
- cls.creds_client.roles_client.create_user_role_on_project(
- project_id, user_id, cls.admin_role_id)
-
+ self._add_role_to_user(self.admin_role_id)
except Exception as exp:
LOG.error(exp)
raise
-
finally:
# NOTE(felipemonteiro): These two comments below are copied from
# tempest.api.identity.v2/v3.test_users.
@@ -99,17 +88,80 @@
# precise to the second. Sleep to ensure we are passing the second
# boundary before attempting to authenticate. If token is of type
# uuid, then do not sleep.
- if not uuid_utils.is_uuid_like(cls.creds_client.
- identity_client.token):
+ if not uuid_utils.is_uuid_like(self.token):
time.sleep(1)
test_obj.auth_provider.set_auth()
- def _clear_user_roles(cls, user_id, tenant_id):
- roles = cls.creds_client.roles_client.list_user_roles_on_project(
- tenant_id, user_id)['roles']
+ def _add_role_to_user(self, role_id):
+ role_already_present = self._clear_user_roles(role_id)
+ if role_already_present:
+ return
+
+ self.roles_client.create_user_role_on_project(
+ self.project_id, self.user_id, role_id)
+
+ def _clear_user_roles(self, role_id):
+ roles = self.roles_client.list_user_roles_on_project(
+ self.project_id, self.user_id)['roles']
+
+ # If the user already has the role that is required, return early.
+ role_ids = [role['id'] for role in roles]
+ if role_ids == [role_id]:
+ return True
for role in roles:
- cls.creds_client.roles_client.delete_role_from_user_on_project(
- tenant_id, user_id, role['id'])
+ self.roles_client.delete_role_from_user_on_project(
+ self.project_id, self.user_id, role['id'])
+
+ return False
+
+ def _validate_switch_role(self, test_obj, switchToRbacRole):
+ """Validates that the rbac role passed to `switch_role` is legal.
+
+ Throws an error for the following improper usages of `switch_role`:
+ * `switch_role` is not called with a boolean value
+ * `switch_role` is never called in a test file, except in tearDown
+ * `switch_role` is called with the same boolean value twice
+ """
+ if not isinstance(switchToRbacRole, bool):
+ raise rbac_exceptions.RbacResourceSetupFailed(
+ 'switchToRbacRole must be a boolean value.')
+
+ key = test_obj.__name__ if isinstance(test_obj, type) else \
+ test_obj.__class__.__name__
+ self.switch_role_history.setdefault(key, None)
+
+ if self.switch_role_history[key] == switchToRbacRole:
+ # If the test was skipped, then this is a legitimate use case,
+ # so do not throw an exception.
+ exc_value = sys.exc_info()[1]
+ if not isinstance(exc_value, testtools.TestCase.skipException):
+ self.switch_role_history[key] = False
+ error_message = '`switchToRbacRole` must not be called with '\
+ 'the same bool value twice. Make sure that you included '\
+ 'a rbac_utils.switch_role method call inside the test.'
+ LOG.error(error_message)
+ raise rbac_exceptions.RbacResourceSetupFailed(error_message)
+ else:
+ self.switch_role_history[key] = switchToRbacRole
+
+ def _get_roles(self):
+ available_roles = self.roles_client.list_roles()
+ admin_role_id = rbac_role_id = None
+
+ for role in available_roles['roles']:
+ if role['name'] == CONF.rbac.rbac_test_role:
+ rbac_role_id = role['id']
+ if role['name'] == 'admin':
+ admin_role_id = role['id']
+
+ if not admin_role_id or not rbac_role_id:
+ msg = "Role with name 'admin' does not exist in the system."\
+ if not admin_role_id else "Role defined by rbac_test_role "\
+ "does not exist in the system."
+ raise rbac_exceptions.RbacResourceSetupFailed(msg)
+
+ self.admin_role_id = admin_role_id
+ self.rbac_role_id = rbac_role_id
rbac_utils = RbacUtils