Merge "Add role-switching validation to Patrole framework."
diff --git a/patrole_tempest_plugin/rbac_utils.py b/patrole_tempest_plugin/rbac_utils.py
index db648df..f61ccdf 100644
--- a/patrole_tempest_plugin/rbac_utils.py
+++ b/patrole_tempest_plugin/rbac_utils.py
@@ -13,15 +13,15 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
-import oslo_utils.uuidutils as uuid_utils
-import six
+import sys
+import testtools
 import time
 
-from tempest.common import credentials_factory
-from tempest import config
-from tempest.test import BaseTestCase
-
 from oslo_log import log as logging
+import oslo_utils.uuidutils as uuid_utils
+import six
+
+from tempest import config
 
 from patrole_tempest_plugin import rbac_exceptions
 
@@ -42,51 +42,40 @@
 @six.add_metaclass(Singleton)
 class RbacUtils(object):
 
-    def __init__(cls):
-        creds_provider = credentials_factory.get_credentials_provider(
-            name=__name__,
-            force_tenant_isolation=True,
-            identity_version=BaseTestCase.get_identity_version())
+    # References the last value of `switch_to_rbac_role` that was passed to
+    # `switch_role`. Used for ensuring that `switch_role` is correctly used
+    # in a test file, so that false positives are prevented. The key used
+    # to index into the dictionary is the class name, which is unique.
+    switch_role_history = {}
+    admin_role_id = None
+    rbac_role_id = None
 
-        cls.creds_client = creds_provider.creds_client
-        cls.available_roles = cls.creds_client.roles_client.list_roles()
-        cls.admin_role_id = cls.rbac_role_id = None
-        for item in cls.available_roles['roles']:
-            if item['name'] == CONF.rbac.rbac_test_role:
-                cls.rbac_role_id = item['id']
-            if item['name'] == 'admin':
-                cls.admin_role_id = item['id']
+    def switch_role(self, test_obj, switchToRbacRole=False):
+        self.user_id = test_obj.auth_provider.credentials.user_id
+        self.project_id = test_obj.auth_provider.credentials.tenant_id
+        self.token = test_obj.auth_provider.get_token()
+        self.identity_version = test_obj.get_identity_version()
 
-    def switch_role(cls, test_obj, switchToRbacRole=None):
+        if self.identity_version.endswith('v3'):
+            self.roles_client = test_obj.os_admin.roles_v3_client
+        else:
+            self.roles_client = test_obj.os_admin.roles_client
+
         LOG.debug('Switching role to: %s', switchToRbacRole)
-        # Check if admin and rbac roles exist.
-        if not cls.admin_role_id or not cls.rbac_role_id:
-            msg = ("Defined 'rbac_role' or 'admin' role does not exist"
-                   " in the system.")
-            raise rbac_exceptions.RbacResourceSetupFailed(msg)
-
-        if not isinstance(switchToRbacRole, bool):
-            msg = ("Wrong value for parameter 'switchToRbacRole' is passed."
-                   " It should be either 'True' or 'False'.")
-            raise rbac_exceptions.RbacResourceSetupFailed(msg)
 
         try:
-            user_id = test_obj.auth_provider.credentials.user_id
-            project_id = test_obj.auth_provider.credentials.tenant_id
+            if not self.admin_role_id or not self.rbac_role_id:
+                self._get_roles()
 
-            cls._clear_user_roles(user_id, project_id)
+            rbac_utils._validate_switch_role(self, test_obj, switchToRbacRole)
 
             if switchToRbacRole:
-                cls.creds_client.roles_client.create_user_role_on_project(
-                    project_id, user_id, cls.rbac_role_id)
+                self._add_role_to_user(self.rbac_role_id)
             else:
-                cls.creds_client.roles_client.create_user_role_on_project(
-                    project_id, user_id, cls.admin_role_id)
-
+                self._add_role_to_user(self.admin_role_id)
         except Exception as exp:
             LOG.error(exp)
             raise
-
         finally:
             # NOTE(felipemonteiro): These two comments below are copied from
             # tempest.api.identity.v2/v3.test_users.
@@ -99,17 +88,80 @@
             # precise to the second. Sleep to ensure we are passing the second
             # boundary before attempting to authenticate. If token is of type
             # uuid, then do not sleep.
-            if not uuid_utils.is_uuid_like(cls.creds_client.
-                                           identity_client.token):
+            if not uuid_utils.is_uuid_like(self.token):
                 time.sleep(1)
             test_obj.auth_provider.set_auth()
 
-    def _clear_user_roles(cls, user_id, tenant_id):
-        roles = cls.creds_client.roles_client.list_user_roles_on_project(
-            tenant_id, user_id)['roles']
+    def _add_role_to_user(self, role_id):
+        role_already_present = self._clear_user_roles(role_id)
+        if role_already_present:
+            return
+
+        self.roles_client.create_user_role_on_project(
+            self.project_id, self.user_id, role_id)
+
+    def _clear_user_roles(self, role_id):
+        roles = self.roles_client.list_user_roles_on_project(
+            self.project_id, self.user_id)['roles']
+
+        # If the user already has the role that is required, return early.
+        role_ids = [role['id'] for role in roles]
+        if role_ids == [role_id]:
+            return True
 
         for role in roles:
-            cls.creds_client.roles_client.delete_role_from_user_on_project(
-                tenant_id, user_id, role['id'])
+            self.roles_client.delete_role_from_user_on_project(
+                self.project_id, self.user_id, role['id'])
+
+        return False
+
+    def _validate_switch_role(self, test_obj, switchToRbacRole):
+        """Validates that the rbac role passed to `switch_role` is legal.
+
+        Throws an error for the following improper usages of `switch_role`:
+            * `switch_role` is not called with a boolean value
+            * `switch_role` is never called in a test file, except in tearDown
+            * `switch_role` is called with the same boolean value twice
+        """
+        if not isinstance(switchToRbacRole, bool):
+            raise rbac_exceptions.RbacResourceSetupFailed(
+                'switchToRbacRole must be a boolean value.')
+
+        key = test_obj.__name__ if isinstance(test_obj, type) else \
+            test_obj.__class__.__name__
+        self.switch_role_history.setdefault(key, None)
+
+        if self.switch_role_history[key] == switchToRbacRole:
+            # If the test was skipped, then this is a legitimate use case,
+            # so do not throw an exception.
+            exc_value = sys.exc_info()[1]
+            if not isinstance(exc_value, testtools.TestCase.skipException):
+                self.switch_role_history[key] = False
+                error_message = '`switchToRbacRole` must not be called with '\
+                    'the same bool value twice. Make sure that you included '\
+                    'a rbac_utils.switch_role method call inside the test.'
+                LOG.error(error_message)
+                raise rbac_exceptions.RbacResourceSetupFailed(error_message)
+        else:
+            self.switch_role_history[key] = switchToRbacRole
+
+    def _get_roles(self):
+        available_roles = self.roles_client.list_roles()
+        admin_role_id = rbac_role_id = None
+
+        for role in available_roles['roles']:
+            if role['name'] == CONF.rbac.rbac_test_role:
+                rbac_role_id = role['id']
+            if role['name'] == 'admin':
+                admin_role_id = role['id']
+
+        if not admin_role_id or not rbac_role_id:
+            msg = "Role with name 'admin' does not exist in the system."\
+                if not admin_role_id else "Role defined by rbac_test_role "\
+                "does not exist in the system."
+            raise rbac_exceptions.RbacResourceSetupFailed(msg)
+
+        self.admin_role_id = admin_role_id
+        self.rbac_role_id = rbac_role_id
 
 rbac_utils = RbacUtils
diff --git a/patrole_tempest_plugin/tests/api/image/v2/test_images_member_rbac.py b/patrole_tempest_plugin/tests/api/image/v2/test_images_member_rbac.py
index 7d99d55..c53480d 100644
--- a/patrole_tempest_plugin/tests/api/image/v2/test_images_member_rbac.py
+++ b/patrole_tempest_plugin/tests/api/image/v2/test_images_member_rbac.py
@@ -40,10 +40,6 @@
         cls.image_client = cls.os.image_client_v2
         cls.image_member_client = cls.os.image_member_client_v2
 
-    def setUp(self):
-        self.rbac_utils.switch_role(self, switchToRbacRole=False)
-        super(ImagesMemberRbacTest, self).setUp()
-
     @rbac_rule_validation.action(service="glance",
                                  rule="add_member")
     @decorators.idempotent_id('b1b85ace-6484-11e6-881e-080027d0d606')
diff --git a/patrole_tempest_plugin/tests/unit/test_rbac_utils.py b/patrole_tempest_plugin/tests/unit/test_rbac_utils.py
index add1770..692c0b9 100644
--- a/patrole_tempest_plugin/tests/unit/test_rbac_utils.py
+++ b/patrole_tempest_plugin/tests/unit/test_rbac_utils.py
@@ -14,6 +14,7 @@
 #    under the License.
 
 import mock
+import testtools
 
 from tempest import config
 from tempest.lib import exceptions as lib_exc
@@ -27,62 +28,87 @@
 
 class RBACUtilsTest(base.TestCase):
 
-    @mock.patch.object(rbac_utils, 'time', autospec=True)
-    def setUp(self, _):
+    def setUp(self):
         super(RBACUtilsTest, self).setUp()
-        self.mock_creds_provider = mock.patch.object(
-            rbac_utils, 'credentials_factory', autospec=True).start()
-
         available_roles = {
             'roles': [
                 {'name': 'admin', 'id': 'admin_id'},
                 {'name': 'Member', 'id': 'member_id'}
             ]
         }
-        self.mock_creds_provider.get_credentials_provider.return_value.\
-            creds_client.roles_client.list_roles.return_value = \
-            available_roles
-        self.addCleanup(mock.patch.stopall)
-
-        CONF.set_override('rbac_test_role', 'Member', group='rbac',
-                          enforce_type=True)
-        self.addCleanup(CONF.clear_override, 'rbac_test_role', group='rbac')
 
         # Because rbac_utils is a singleton, reset all of its role-related
         # parameters to the correct values for each test run.
         self.rbac_utils = rbac_utils.rbac_utils()
-        self.rbac_utils.available_roles = available_roles
+        self.rbac_utils.switch_role_history = {}
         self.rbac_utils.admin_role_id = 'admin_id'
         self.rbac_utils.rbac_role_id = 'member_id'
 
-    def test_initialization_with_missing_admin_role(self):
-        self.rbac_utils.admin_role_id = None
-        e = self.assertRaises(rbac_exceptions.RbacResourceSetupFailed,
-                              self.rbac_utils.switch_role, None)
-        self.assertIn("Defined 'rbac_role' or 'admin' role does not exist"
-                      " in the system.", e.__str__())
+        self.mock_test_obj = mock.Mock()
+        self.mock_test_obj.auth_provider = mock.Mock(
+            **{'credentials.user_id': mock.sentinel.user_id,
+               'credentials.tenant_id': mock.sentinel.project_id})
+        self.mock_test_obj.os_admin = mock.Mock(
+            **{'roles_v3_client.list_roles.return_value': available_roles})
 
-    def test_initialization_with_missing_rbac_role(self):
+        CONF.set_override('rbac_test_role', 'Member', group='rbac',
+                          enforce_type=True)
+        CONF.set_override('auth_version', 'v3', group='identity',
+                          enforce_type=True)
+
+        self.addCleanup(CONF.clear_override, 'rbac_test_role', group='rbac')
+        self.addCleanup(CONF.clear_override, 'auth_version', group='identity')
+        self.addCleanup(mock.patch.stopall)
+
+    def _mock_list_user_roles_on_project(self, return_value):
+        self.mock_test_obj.admin_manager = mock.Mock(
+            **{'roles_client.list_user_roles_on_project.'
+               'return_value': {'roles': [{'id': return_value}]}})
+
+    @mock.patch.object(rbac_utils.rbac_utils, '_clear_user_roles',
+                       autospec=True, return_value=False)
+    def test_initialization_with_missing_admin_role(self, _):
+        self.mock_test_obj.os_admin = mock.Mock(
+            **{'roles_v3_client.list_roles.return_value':
+               {'roles': [{'name': 'Member', 'id': 'member_id'}]}})
+        self.rbac_utils.admin_role_id = None
         self.rbac_utils.rbac_role_id = None
         e = self.assertRaises(rbac_exceptions.RbacResourceSetupFailed,
-                              self.rbac_utils.switch_role, None)
-        self.assertIn("Defined 'rbac_role' or 'admin' role does not exist"
-                      " in the system.", e.__str__())
+                              self.rbac_utils.switch_role, self.mock_test_obj,
+                              True)
+        self.assertIn("Role with name 'admin' does not exist in the system.",
+                      e.__str__())
+
+    @mock.patch.object(rbac_utils.rbac_utils, '_clear_user_roles',
+                       autospec=True, return_value=False)
+    def test_initialization_with_missing_rbac_role(self, _):
+        self.mock_test_obj.os_admin = mock.Mock(
+            **{'roles_v3_client.list_roles.return_value':
+               {'roles': [{'name': 'admin', 'id': 'admin_id'}]}})
+        self.rbac_utils.admin_role_id = None
+        self.rbac_utils.rbac_role_id = None
+        e = self.assertRaises(rbac_exceptions.RbacResourceSetupFailed,
+                              self.rbac_utils.switch_role, self.mock_test_obj,
+                              True)
+        self.assertIn("Role defined by rbac_test_role does not exist in the "
+                      "system.", e.__str__())
 
     def test_clear_user_roles(self):
-        self.rbac_utils.creds_client = mock.Mock()
-        creds_client = self.rbac_utils.creds_client
-        creds_client.roles_client.list_user_roles_on_project.return_value = {
+        roles_client = self.mock_test_obj.os_admin.roles_v3_client
+        roles_client.list_user_roles_on_project.return_value = {
             'roles': [{'id': 'admin_id'}, {'id': 'member_id'}]
         }
 
-        self.rbac_utils._clear_user_roles(mock.sentinel.user_id,
-                                          mock.sentinel.project_id)
+        self.rbac_utils.roles_client = roles_client
+        self.rbac_utils.project_id = mock.sentinel.project_id
+        self.rbac_utils.user_id = mock.sentinel.user_id
 
-        creds_client.roles_client.list_user_roles_on_project.\
+        self.rbac_utils._clear_user_roles(None)
+
+        roles_client.list_user_roles_on_project.\
             assert_called_once_with(mock.sentinel.project_id,
                                     mock.sentinel.user_id)
-        creds_client.roles_client.delete_role_from_user_on_project.\
+        roles_client.delete_role_from_user_on_project.\
             assert_has_calls([
                 mock.call(mock.sentinel.project_id, mock.sentinel.user_id,
                           'admin_id'),
@@ -91,65 +117,109 @@
             ])
 
     @mock.patch.object(rbac_utils.rbac_utils, '_clear_user_roles',
-                       autospec=True)
-    def test_rbac_utils_switch_role_to_admin(self, mock_clear_user_roles):
-        mock_test_object = mock.Mock()
-        mock_test_object.auth_provider.credentials.user_id = \
-            mock.sentinel.user_id
-        mock_test_object.auth_provider.credentials.tenant_id = \
-            mock.sentinel.project_id
+                       autospec=True, return_value=False)
+    @mock.patch.object(rbac_utils, 'time', autospec=True)
+    def test_rbac_utils_switch_role_to_admin_role(self, mock_time,
+                                                  mock_clear_user_roles):
+        self.rbac_utils.prev_switch_role = True
+        self._mock_list_user_roles_on_project('admin_id')
+        roles_client = self.mock_test_obj.os_admin.roles_v3_client
 
-        self.rbac_utils.creds_client = mock.Mock()
-        creds_client = self.rbac_utils.creds_client
+        self.rbac_utils.switch_role(self.mock_test_obj, False)
 
-        self.rbac_utils.switch_role(mock_test_object, False)
-
-        creds_client.roles_client.create_user_role_on_project.\
+        roles_client.create_user_role_on_project.\
             assert_called_once_with(mock.sentinel.project_id,
                                     mock.sentinel.user_id,
                                     'admin_id')
         mock_clear_user_roles.assert_called_once_with(
-            self.rbac_utils, mock.sentinel.user_id, mock.sentinel.project_id)
-        mock_test_object.auth_provider.clear_auth.assert_called_once_with()
-        mock_test_object.auth_provider.set_auth.assert_called_once_with()
+            self.rbac_utils, 'admin_id')
+        self.mock_test_obj.auth_provider.clear_auth.assert_called_once_with()
+        self.mock_test_obj.auth_provider.set_auth.assert_called_once_with()
+        mock_time.sleep.assert_called_once_with(1)
 
     @mock.patch.object(rbac_utils.rbac_utils, '_clear_user_roles',
-                       autospec=True)
-    def test_rbac_utils_switch_role_to_rbac_role(self, mock_clear_user_roles):
-        mock_test_object = mock.Mock()
-        mock_test_object.auth_provider.credentials.user_id = \
-            mock.sentinel.user_id
-        mock_test_object.auth_provider.credentials.tenant_id = \
-            mock.sentinel.project_id
+                       autospec=True, return_value=False)
+    @mock.patch.object(rbac_utils, 'time', autospec=True)
+    def test_rbac_utils_switch_role_to_rbac_role(self, mock_time,
+                                                 mock_clear_user_roles):
+        self._mock_list_user_roles_on_project('member_id')
+        roles_client = self.mock_test_obj.os_admin.roles_v3_client
 
-        self.rbac_utils.creds_client = mock.Mock()
-        creds_client = self.rbac_utils.creds_client
+        self.rbac_utils.switch_role(self.mock_test_obj, True)
 
-        self.rbac_utils.switch_role(mock_test_object, True)
-
-        creds_client.roles_client.create_user_role_on_project.\
+        roles_client.create_user_role_on_project.\
             assert_called_once_with(mock.sentinel.project_id,
                                     mock.sentinel.user_id,
                                     'member_id')
         mock_clear_user_roles.assert_called_once_with(
-            self.rbac_utils, mock.sentinel.user_id, mock.sentinel.project_id)
-        mock_test_object.auth_provider.clear_auth.assert_called_once_with()
-        mock_test_object.auth_provider.set_auth.assert_called_once_with()
+            self.rbac_utils, 'member_id')
+        self.mock_test_obj.auth_provider.clear_auth.assert_called_once_with()
+        self.mock_test_obj.auth_provider.set_auth.assert_called_once_with()
+        mock_time.sleep.assert_called_once_with(1)
 
-    def test_rbac_utils_switch_roles_with_invalid_value(self):
-        e = self.assertRaises(rbac_exceptions.RbacResourceSetupFailed,
-                              self.rbac_utils.switch_role, None)
-        self.assertIn("Wrong value for parameter 'switchToRbacRole' is passed."
-                      " It should be either 'True' or 'False'.", e.__str__())
+    def test_RBAC_utils_switch_roles_without_boolean_value(self):
+        self.assertRaises(rbac_exceptions.RbacResourceSetupFailed,
+                          self.rbac_utils.switch_role, self.mock_test_obj,
+                          "admin")
+        self.assertRaises(rbac_exceptions.RbacResourceSetupFailed,
+                          self.rbac_utils.switch_role, self.mock_test_obj,
+                          None)
 
     @mock.patch.object(rbac_utils.rbac_utils, '_clear_user_roles',
-                       autospec=True)
+                       autospec=True, return_value=False)
+    def test_rbac_utils_switch_roles_with_false_value_twice(self, _):
+        self._mock_list_user_roles_on_project('admin_id')
+        self.rbac_utils.switch_role(self.mock_test_obj, False)
+        e = self.assertRaises(rbac_exceptions.RbacResourceSetupFailed,
+                              self.rbac_utils.switch_role, self.mock_test_obj,
+                              False)
+        self.assertIn(
+            '`switchToRbacRole` must not be called with the same bool value '
+            'twice. Make sure that you included a rbac_utils.switch_role '
+            'method call inside the test.', str(e))
+
+    @mock.patch.object(rbac_utils.rbac_utils, '_clear_user_roles',
+                       autospec=True, return_value=False)
+    def test_rbac_utils_switch_roles_with_true_value_twice(self, _):
+        self._mock_list_user_roles_on_project('admin_id')
+        self.rbac_utils.switch_role(self.mock_test_obj, True)
+        e = self.assertRaises(rbac_exceptions.RbacResourceSetupFailed,
+                              self.rbac_utils.switch_role, self.mock_test_obj,
+                              True)
+        self.assertIn(
+            '`switchToRbacRole` must not be called with the same bool value '
+            'twice. Make sure that you included a rbac_utils.switch_role '
+            'method call inside the test.', str(e))
+
+    @mock.patch.object(rbac_utils.rbac_utils, '_clear_user_roles',
+                       autospec=True, return_value=False)
+    @mock.patch.object(rbac_utils, 'LOG', autospec=True)
+    @mock.patch.object(rbac_utils, 'sys', autospec=True)
+    def test_rbac_utils_switch_roles_with_skip_exception(self, mock_sys,
+                                                         mock_log, _):
+        self._mock_list_user_roles_on_project('member_id')
+
+        mock_skip_exception = mock.Mock(spec=testtools.TestCase.skipException)
+        mock_sys.exc_info.return_value = [None, mock_skip_exception]
+
+        # Ordinarily switching to the same role would result in an error,
+        # but because the skipException is thrown before the test finishes,
+        # this is not treated as a failure.
+        self.rbac_utils.switch_role(self.mock_test_obj, False)
+        self.rbac_utils.switch_role(self.mock_test_obj, False)
+        mock_log.error.assert_not_called()
+
+        self.rbac_utils.switch_role(self.mock_test_obj, True)
+        self.rbac_utils.switch_role(self.mock_test_obj, True)
+        mock_log.error.assert_not_called()
+
+    @mock.patch.object(rbac_utils.rbac_utils, '_clear_user_roles',
+                       autospec=True, return_value=False)
     def test_rbac_utils_switch_role_except_exception(self,
                                                      mock_clear_user_roles):
-        self.rbac_utils.creds_client = mock.Mock()
-        creds_client = self.rbac_utils.creds_client
-        creds_client.roles_client.create_user_role_on_project.side_effect =\
+        roles_client = self.mock_test_obj.os_admin.roles_v3_client
+        roles_client.create_user_role_on_project.side_effect =\
             lib_exc.NotFound
 
         self.assertRaises(lib_exc.NotFound, self.rbac_utils.switch_role,
-                          mock.Mock(), True)
+                          self.mock_test_obj, True)