Add role-switching validation to Patrole framework.

Currently, no role validation is performed when calling switch_role. This
is problematic for the following reasons:
  - The only "validation" right now checks whether switchToRbacRole is None.
    If so, None is returned. The validation used is nowhere near as robust
    as it should be -- what if a string or int is passed in? -- and an error
    should be thrown instead of silently returning None.
  - If switch_role is called with the same boolean value twice, then the
    rbac_role under test is never switched to: this should be detected
    and flagged as an error.
  - If switch_role is not called in a test, then an error should definitely
    be thrown as well, because then the test may pass as a false positive.

This patch adds role validation so that the above cases are avoided.

This patch also updated unit tests and added additional ones
where needed.

Implements: blueprint add-switch-role-validation
Change-Id: Ida0f03af236eb0f91d8cc96d51ca57671b4eef7c
diff --git a/patrole_tempest_plugin/rbac_utils.py b/patrole_tempest_plugin/rbac_utils.py
index db648df..f61ccdf 100644
--- a/patrole_tempest_plugin/rbac_utils.py
+++ b/patrole_tempest_plugin/rbac_utils.py
@@ -13,15 +13,15 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
-import oslo_utils.uuidutils as uuid_utils
-import six
+import sys
+import testtools
 import time
 
-from tempest.common import credentials_factory
-from tempest import config
-from tempest.test import BaseTestCase
-
 from oslo_log import log as logging
+import oslo_utils.uuidutils as uuid_utils
+import six
+
+from tempest import config
 
 from patrole_tempest_plugin import rbac_exceptions
 
@@ -42,51 +42,40 @@
 @six.add_metaclass(Singleton)
 class RbacUtils(object):
 
-    def __init__(cls):
-        creds_provider = credentials_factory.get_credentials_provider(
-            name=__name__,
-            force_tenant_isolation=True,
-            identity_version=BaseTestCase.get_identity_version())
+    # References the last value of `switch_to_rbac_role` that was passed to
+    # `switch_role`. Used for ensuring that `switch_role` is correctly used
+    # in a test file, so that false positives are prevented. The key used
+    # to index into the dictionary is the class name, which is unique.
+    switch_role_history = {}
+    admin_role_id = None
+    rbac_role_id = None
 
-        cls.creds_client = creds_provider.creds_client
-        cls.available_roles = cls.creds_client.roles_client.list_roles()
-        cls.admin_role_id = cls.rbac_role_id = None
-        for item in cls.available_roles['roles']:
-            if item['name'] == CONF.rbac.rbac_test_role:
-                cls.rbac_role_id = item['id']
-            if item['name'] == 'admin':
-                cls.admin_role_id = item['id']
+    def switch_role(self, test_obj, switchToRbacRole=False):
+        self.user_id = test_obj.auth_provider.credentials.user_id
+        self.project_id = test_obj.auth_provider.credentials.tenant_id
+        self.token = test_obj.auth_provider.get_token()
+        self.identity_version = test_obj.get_identity_version()
 
-    def switch_role(cls, test_obj, switchToRbacRole=None):
+        if self.identity_version.endswith('v3'):
+            self.roles_client = test_obj.os_admin.roles_v3_client
+        else:
+            self.roles_client = test_obj.os_admin.roles_client
+
         LOG.debug('Switching role to: %s', switchToRbacRole)
-        # Check if admin and rbac roles exist.
-        if not cls.admin_role_id or not cls.rbac_role_id:
-            msg = ("Defined 'rbac_role' or 'admin' role does not exist"
-                   " in the system.")
-            raise rbac_exceptions.RbacResourceSetupFailed(msg)
-
-        if not isinstance(switchToRbacRole, bool):
-            msg = ("Wrong value for parameter 'switchToRbacRole' is passed."
-                   " It should be either 'True' or 'False'.")
-            raise rbac_exceptions.RbacResourceSetupFailed(msg)
 
         try:
-            user_id = test_obj.auth_provider.credentials.user_id
-            project_id = test_obj.auth_provider.credentials.tenant_id
+            if not self.admin_role_id or not self.rbac_role_id:
+                self._get_roles()
 
-            cls._clear_user_roles(user_id, project_id)
+            rbac_utils._validate_switch_role(self, test_obj, switchToRbacRole)
 
             if switchToRbacRole:
-                cls.creds_client.roles_client.create_user_role_on_project(
-                    project_id, user_id, cls.rbac_role_id)
+                self._add_role_to_user(self.rbac_role_id)
             else:
-                cls.creds_client.roles_client.create_user_role_on_project(
-                    project_id, user_id, cls.admin_role_id)
-
+                self._add_role_to_user(self.admin_role_id)
         except Exception as exp:
             LOG.error(exp)
             raise
-
         finally:
             # NOTE(felipemonteiro): These two comments below are copied from
             # tempest.api.identity.v2/v3.test_users.
@@ -99,17 +88,80 @@
             # precise to the second. Sleep to ensure we are passing the second
             # boundary before attempting to authenticate. If token is of type
             # uuid, then do not sleep.
-            if not uuid_utils.is_uuid_like(cls.creds_client.
-                                           identity_client.token):
+            if not uuid_utils.is_uuid_like(self.token):
                 time.sleep(1)
             test_obj.auth_provider.set_auth()
 
-    def _clear_user_roles(cls, user_id, tenant_id):
-        roles = cls.creds_client.roles_client.list_user_roles_on_project(
-            tenant_id, user_id)['roles']
+    def _add_role_to_user(self, role_id):
+        role_already_present = self._clear_user_roles(role_id)
+        if role_already_present:
+            return
+
+        self.roles_client.create_user_role_on_project(
+            self.project_id, self.user_id, role_id)
+
+    def _clear_user_roles(self, role_id):
+        roles = self.roles_client.list_user_roles_on_project(
+            self.project_id, self.user_id)['roles']
+
+        # If the user already has the role that is required, return early.
+        role_ids = [role['id'] for role in roles]
+        if role_ids == [role_id]:
+            return True
 
         for role in roles:
-            cls.creds_client.roles_client.delete_role_from_user_on_project(
-                tenant_id, user_id, role['id'])
+            self.roles_client.delete_role_from_user_on_project(
+                self.project_id, self.user_id, role['id'])
+
+        return False
+
+    def _validate_switch_role(self, test_obj, switchToRbacRole):
+        """Validates that the rbac role passed to `switch_role` is legal.
+
+        Throws an error for the following improper usages of `switch_role`:
+            * `switch_role` is not called with a boolean value
+            * `switch_role` is never called in a test file, except in tearDown
+            * `switch_role` is called with the same boolean value twice
+        """
+        if not isinstance(switchToRbacRole, bool):
+            raise rbac_exceptions.RbacResourceSetupFailed(
+                'switchToRbacRole must be a boolean value.')
+
+        key = test_obj.__name__ if isinstance(test_obj, type) else \
+            test_obj.__class__.__name__
+        self.switch_role_history.setdefault(key, None)
+
+        if self.switch_role_history[key] == switchToRbacRole:
+            # If the test was skipped, then this is a legitimate use case,
+            # so do not throw an exception.
+            exc_value = sys.exc_info()[1]
+            if not isinstance(exc_value, testtools.TestCase.skipException):
+                self.switch_role_history[key] = False
+                error_message = '`switchToRbacRole` must not be called with '\
+                    'the same bool value twice. Make sure that you included '\
+                    'a rbac_utils.switch_role method call inside the test.'
+                LOG.error(error_message)
+                raise rbac_exceptions.RbacResourceSetupFailed(error_message)
+        else:
+            self.switch_role_history[key] = switchToRbacRole
+
+    def _get_roles(self):
+        available_roles = self.roles_client.list_roles()
+        admin_role_id = rbac_role_id = None
+
+        for role in available_roles['roles']:
+            if role['name'] == CONF.rbac.rbac_test_role:
+                rbac_role_id = role['id']
+            if role['name'] == 'admin':
+                admin_role_id = role['id']
+
+        if not admin_role_id or not rbac_role_id:
+            msg = "Role with name 'admin' does not exist in the system."\
+                if not admin_role_id else "Role defined by rbac_test_role "\
+                "does not exist in the system."
+            raise rbac_exceptions.RbacResourceSetupFailed(msg)
+
+        self.admin_role_id = admin_role_id
+        self.rbac_role_id = rbac_role_id
 
 rbac_utils = RbacUtils