Base implementation of override_role for automatic role re-switch

This PS deprecated switch_role in rbac_utils and replaces it with
override_role. override_role realizes the same functionality as
switch_role, but uses @contextmanager so that role-switching can be
streamlined. This approach offers the following advantages:

  1) Role switching is performed in 1 class only. There is no
  need to call ``test_obj.switch_role(test_obj, toggle_rbac_role=False)``
  from ``rbac_rule_validation``. This de-coupling between both modules
  leads to cleaner, more readable code.
  2) Improves test code readability.
  3) Improve role switch granularity, meaning the role remains switched
     within the narrowest scope possible.
  4) Simplifies interface, making it easier for test-writers to use
     the Patrole framework.

Rather than doing:

    # setup code here
    rand_name = data_utils.rand_name(...)
    # ...
    # more setup code here
    self.rbac_utils.switch_role(self, toggle_rbac_role=True)
    # execute the test here

(Without newlines, this code is very hard to read.)

It is instead possible to now do:

    # setup code here
    rand_name = data_utils.rand_name(...)
    # ...
    # more setup code here
    with self.rbac_utils.override_role(self):
        # execute the test here
        # notice the indentation... visually it is easy to see
        # that this block here is where the role is switched
    # now we are back to admin credentials in case we still
    # need it in the test... this was not possible before w/o
    # calling ``switch_role`` yet again...
    waiters.wait_for_volume_status(self.volumes_client, ...)

This commit:
  * Adds the necessary logic to rbac_utils to allow for automatic
    role re-switch following test execution (i.e. override_role)
  * Deprecates switch_role method in rbac_utils.
  * Refactors RBAC tests in test_volumes_extend_rbac to prove
    the concept introduced here.
  * Removes _validate_switch_role functionality since its purpose
    was to overcompensate for the old switch_role interface which
    allowed users to pass in a boolean flag; now this is no longer
    needed. Also removes associated unit tests.
  * Updates a docstring in rbac_utils module.

Partially Implements: blueprint rbac-utils-contextmanager

Change-Id: I670fba358bf321eae0d22d18cea6d2f530f00716
diff --git a/patrole_tempest_plugin/rbac_rule_validation.py b/patrole_tempest_plugin/rbac_rule_validation.py
index 927c803..82bc1a0 100644
--- a/patrole_tempest_plugin/rbac_rule_validation.py
+++ b/patrole_tempest_plugin/rbac_rule_validation.py
@@ -175,6 +175,8 @@
                         "OverPermission: Role %s was allowed to perform %s" %
                         (role, rule))
             finally:
+                # TODO(felipemonteiro): Remove the `switch_role` call below
+                # once all the tests have migrated over to `override_role`.
                 test_obj.rbac_utils.switch_role(test_obj,
                                                 toggle_rbac_role=False)
                 if CONF.patrole_log.enable_reporting:
diff --git a/patrole_tempest_plugin/rbac_utils.py b/patrole_tempest_plugin/rbac_utils.py
index 2bb9eed..4ef0f80 100644
--- a/patrole_tempest_plugin/rbac_utils.py
+++ b/patrole_tempest_plugin/rbac_utils.py
@@ -14,8 +14,9 @@
 #    under the License.
 
 import abc
+from contextlib import contextmanager
+import debtcollector.removals
 import six
-import sys
 import time
 
 from oslo_log import log as logging
@@ -32,14 +33,15 @@
 
 
 class RbacUtils(object):
-    """Utility class responsible for switching os_primary role.
+    """Utility class responsible for switching ``os_primary`` role.
 
     This class is responsible for overriding the value of the primary Tempest
-    credential's role (i.e. "os_primary" role). By doing so, it is possible to
-    seamlessly swap between admin credentials, needed for setup and clean up,
-    and primary credentials, needed to perform the API call which does
+    credential's role (i.e. ``os_primary`` role). By doing so, it is possible
+    to seamlessly swap between admin credentials, needed for setup and clean
+    up, and primary credentials, needed to perform the API call which does
     policy enforcement. The primary credentials always cycle between roles
-    defined by ``CONF.identity.admin_role`` and `CONF.patrole.rbac_test_role``.
+    defined by ``CONF.identity.admin_role`` and
+    ``CONF.patrole.rbac_test_role``.
     """
 
     def __init__(self, test_obj):
@@ -56,18 +58,56 @@
             admin_roles_client = admin_mgr.roles_client
 
         self.admin_roles_client = admin_roles_client
-        self.switch_role(test_obj, toggle_rbac_role=False)
+        self._override_role(test_obj, False)
 
-    # References the last value of `toggle_rbac_role` that was passed to
-    # `switch_role`. Used for ensuring that `switch_role` is correctly used
-    # in a test file, so that false positives are prevented. The key used
-    # to index into the dictionary is the module path plus class name, which is
-    # unique.
-    switch_role_history = {}
     admin_role_id = None
     rbac_role_id = None
 
-    def switch_role(self, test_obj, toggle_rbac_role=False):
+    @contextmanager
+    def override_role(self, test_obj):
+        """Override the role used by ``os_primary`` Tempest credentials.
+
+        Temporarily change the role used by ``os_primary`` credentials to:
+          * ``[patrole] rbac_test_role`` before test execution
+          * ``[identity] admin_role`` after test execution
+
+        Automatically switches to admin role after test execution.
+
+        :param test_obj: Instance of ``tempest.test.BaseTestCase``.
+        :returns: None
+
+        .. warning::
+
+            This function can alter user roles for pre-provisioned credentials.
+            Work is underway to safely clean up after this function.
+
+        Example::
+
+            @rbac_rule_validation.action(service='test',
+                                         rule='a:test:rule')
+            def test_foo(self):
+                # Allocate test-level resources here.
+                with self.rbac_utils.override_role(self):
+                    # The role for `os_primary` has now been overriden. Within
+                    # this block, call the API endpoint that enforces the
+                    # expected policy specified by "rule" in the decorator.
+                    self.foo_service.bar_api_call()
+                # The role is switched back to admin automatically. Note that
+                # if the API call above threw an exception, any code below this
+                # point in the test is not executed.
+        """
+        self._override_role(test_obj, True)
+        try:
+            # Execute the test.
+            yield
+        finally:
+            # This code block is always executed, no matter the result of the
+            # test. Automatically switch back to the admin role for test clean
+            # up.
+            self._override_role(test_obj, False)
+
+    @debtcollector.removals.remove(removal_version='Rocky')
+    def switch_role(self, test_obj, toggle_rbac_role):
         """Switch the role used by `os_primary` Tempest credentials.
 
         Switch the role used by `os_primary` credentials to:
@@ -77,25 +117,34 @@
         :param test_obj: test object of type tempest.lib.base.BaseTestCase
         :param toggle_rbac_role: role to switch `os_primary` Tempest creds to
         """
+        self._override_role(test_obj, toggle_rbac_role)
+
+    def _override_role(self, test_obj, toggle_rbac_role=False):
+        """Private helper for overriding ``os_primary`` Tempest credentials.
+
+        :param test_obj: test object of type tempest.lib.base.BaseTestCase
+        :param toggle_rbac_role: Boolean value that controls the role that
+            overrides default role of ``os_primary`` credentials.
+            * If True: role is set to ``[patrole] rbac_test_role``
+            * If False: role is set to ``[identity] admin_role``
+        """
         self.user_id = test_obj.os_primary.credentials.user_id
         self.project_id = test_obj.os_primary.credentials.tenant_id
         self.token = test_obj.os_primary.auth_provider.get_token()
 
-        LOG.debug('Switching role to: %s.', toggle_rbac_role)
+        LOG.debug('Overriding role to: %s.', toggle_rbac_role)
         role_already_present = False
 
         try:
             if not all([self.admin_role_id, self.rbac_role_id]):
                 self._get_roles_by_name()
 
-            self._validate_switch_role(test_obj, toggle_rbac_role)
-
             target_role = (
                 self.rbac_role_id if toggle_rbac_role else self.admin_role_id)
             role_already_present = self._list_and_clear_user_roles_on_project(
                 target_role)
 
-            # Do not switch roles if `target_role` already exists.
+            # Do not override roles if `target_role` already exists.
             if not role_already_present:
                 self._create_user_role_on_project(target_role)
         except Exception as exp:
@@ -106,7 +155,7 @@
             # Fernet tokens are not subsecond aware so sleep to ensure we are
             # passing the second boundary before attempting to authenticate.
             # Only sleep if a token revocation occurred as a result of role
-            # switching. This will optimize test runtime in the case where
+            # overriding. This will optimize test runtime in the case where
             # ``[identity] admin_role`` == ``[patrole] rbac_test_role``.
             if not role_already_present:
                 time.sleep(1)
@@ -153,44 +202,6 @@
 
         return False
 
-    def _validate_switch_role(self, test_obj, toggle_rbac_role):
-        """Validates that the test role passed to `switch_role` is legal.
-
-        Throws an error for the following improper usages of `switch_role`:
-            * `switch_role` is not called with a boolean value
-            * `switch_role` is never called inside a test, except in tearDown
-            * `switch_role` is called with the same boolean value twice
-
-        If a `skipException` is thrown then this is a legitimate reason why
-        `switch_role` is not called.
-        """
-        if not isinstance(toggle_rbac_role, bool):
-            raise rbac_exceptions.RbacResourceSetupFailed(
-                '`toggle_rbac_role` must be a boolean value.')
-
-        # The unique key is the combination of module path plus class name.
-        class_name = test_obj.__name__ if isinstance(test_obj, type) else \
-            test_obj.__class__.__name__
-        module_name = test_obj.__module__
-        key = '%s.%s' % (module_name, class_name)
-
-        self.switch_role_history.setdefault(key, None)
-
-        if self.switch_role_history[key] == toggle_rbac_role:
-            # If an exception was thrown, like a skipException or otherwise,
-            # then this is a legitimate reason why `switch_role` was not
-            # called, so only raise an exception if no current exception is
-            # being handled.
-            if sys.exc_info()[0] is None:
-                self.switch_role_history[key] = False
-                error_message = '`toggle_rbac_role` must not be called with '\
-                    'the same bool value twice. Make sure that you included '\
-                    'a rbac_utils.switch_role method call inside the test.'
-                LOG.error(error_message)
-                raise rbac_exceptions.RbacResourceSetupFailed(error_message)
-        else:
-            self.switch_role_history[key] = toggle_rbac_role
-
 
 def is_admin():
     """Verifies whether the current test role equals the admin role.
diff --git a/patrole_tempest_plugin/tests/api/volume/test_volumes_extend_rbac.py b/patrole_tempest_plugin/tests/api/volume/test_volumes_extend_rbac.py
index 8a34923..18883c9 100644
--- a/patrole_tempest_plugin/tests/api/volume/test_volumes_extend_rbac.py
+++ b/patrole_tempest_plugin/tests/api/volume/test_volumes_extend_rbac.py
@@ -14,22 +14,13 @@
 #    under the License.
 
 from tempest.common import waiters
-from tempest import config
 from tempest.lib import decorators
 
 from patrole_tempest_plugin import rbac_rule_validation
 from patrole_tempest_plugin.tests.api.volume import rbac_base
 
-CONF = config.CONF
-
 
 class VolumesExtendV3RbacTest(rbac_base.BaseVolumeRbacTest):
-    credentials = ['primary', 'admin']
-
-    @classmethod
-    def setup_clients(cls):
-        super(VolumesExtendV3RbacTest, cls).setup_clients()
-        cls.admin_volumes_client = cls.os_admin.volumes_client_latest
 
     @classmethod
     def resource_setup(cls):
@@ -42,8 +33,8 @@
     def test_volume_extend(self):
         # Extend volume test
         extend_size = int(self.volume['size']) + 1
-        self.rbac_utils.switch_role(self, toggle_rbac_role=True)
-        self.volumes_client.extend_volume(self.volume['id'],
-                                          new_size=extend_size)
+        with self.rbac_utils.override_role(self):
+            self.volumes_client.extend_volume(self.volume['id'],
+                                              new_size=extend_size)
         waiters.wait_for_volume_resource_status(
-            self.admin_volumes_client, self.volume['id'], 'available')
+            self.volumes_client, self.volume['id'], 'available')
diff --git a/patrole_tempest_plugin/tests/unit/fixtures.py b/patrole_tempest_plugin/tests/unit/fixtures.py
index 52c2598..4e3387e 100644
--- a/patrole_tempest_plugin/tests/unit/fixtures.py
+++ b/patrole_tempest_plugin/tests/unit/fixtures.py
@@ -92,21 +92,21 @@
 
         self.set_roles(['admin', 'member'], [])
 
-    def switch_role(self, *role_toggles):
-        """Instantiate `rbac_utils.RbacUtils` and call `switch_role`.
+    def override_role(self, *role_toggles):
+        """Instantiate `rbac_utils.RbacUtils` and call `override_role`.
 
-        Create an instance of `rbac_utils.RbacUtils` and call `switch_role`
+        Create an instance of `rbac_utils.RbacUtils` and call `override_role`
         for each boolean value in `role_toggles`. The number of calls to
-        `switch_role` is always 1 + len(`role_toggles`) because the
-        `rbac_utils.RbacUtils` constructor automatically calls `switch_role`.
+        `override_role` is always 1 + len(`role_toggles`) because the
+        `rbac_utils.RbacUtils` constructor automatically calls `override_role`.
 
         :param role_toggles: the list of boolean values iterated over and
-            passed to `switch_role`.
+            passed to `override_role`.
         """
-        self.fake_rbac_utils = rbac_utils.RbacUtils(self.mock_test_obj)
+        _rbac_utils = rbac_utils.RbacUtils(self.mock_test_obj)
 
         for role_toggle in role_toggles:
-            self.fake_rbac_utils.switch_role(self.mock_test_obj, role_toggle)
+            _rbac_utils._override_role(self.mock_test_obj, role_toggle)
             # NOTE(felipemonteiro): Simulate that a role switch has occurred
             # by updating the user's current role to the new role. This means
             # that all API actions involved during a role switch -- listing,
diff --git a/patrole_tempest_plugin/tests/unit/test_rbac_utils.py b/patrole_tempest_plugin/tests/unit/test_rbac_utils.py
index 87adff0..0d75c3e 100644
--- a/patrole_tempest_plugin/tests/unit/test_rbac_utils.py
+++ b/patrole_tempest_plugin/tests/unit/test_rbac_utils.py
@@ -16,6 +16,7 @@
 import mock
 import testtools
 
+from tempest.lib import exceptions as lib_exc
 from tempest.tests import base
 
 from patrole_tempest_plugin import rbac_exceptions
@@ -29,27 +30,27 @@
         super(RBACUtilsTest, self).setUp()
         # Reset the role history after each test run to avoid validation
         # errors between tests.
-        rbac_utils.RbacUtils.switch_role_history = {}
+        rbac_utils.RbacUtils.override_role_history = {}
         self.rbac_utils = self.useFixture(patrole_fixtures.RbacUtilsFixture())
 
-    def test_switch_role_with_missing_admin_role(self):
+    def test_override_role_with_missing_admin_role(self):
         self.rbac_utils.set_roles('member')
         error_re = (
             'Roles defined by `\[patrole\] rbac_test_role` and `\[identity\] '
             'admin_role` must be defined in the system.')
         self.assertRaisesRegex(rbac_exceptions.RbacResourceSetupFailed,
-                               error_re, self.rbac_utils.switch_role)
+                               error_re, self.rbac_utils.override_role)
 
-    def test_switch_role_with_missing_rbac_role(self):
+    def test_override_role_with_missing_rbac_role(self):
         self.rbac_utils.set_roles('admin')
         error_re = (
             'Roles defined by `\[patrole\] rbac_test_role` and `\[identity\] '
             'admin_role` must be defined in the system.')
         self.assertRaisesRegex(rbac_exceptions.RbacResourceSetupFailed,
-                               error_re, self.rbac_utils.switch_role)
+                               error_re, self.rbac_utils.override_role)
 
-    def test_switch_role_to_admin_role(self):
-        self.rbac_utils.switch_role()
+    def test_override_role_to_admin_role(self):
+        self.rbac_utils.override_role()
 
         mock_test_obj = self.rbac_utils.mock_test_obj
         roles_client = self.rbac_utils.roles_v3_client
@@ -63,9 +64,9 @@
             .assert_called_once_with()
         mock_time.sleep.assert_called_once_with(1)
 
-    def test_switch_role_to_admin_role_avoids_role_switch(self):
+    def test_override_role_to_admin_role_avoids_role_switch(self):
         self.rbac_utils.set_roles(['admin', 'member'], 'admin')
-        self.rbac_utils.switch_role()
+        self.rbac_utils.override_role()
 
         roles_client = self.rbac_utils.roles_v3_client
         mock_time = self.rbac_utils.mock_time
@@ -73,8 +74,8 @@
         roles_client.create_user_role_on_project.assert_not_called()
         mock_time.sleep.assert_not_called()
 
-    def test_switch_role_to_member_role(self):
-        self.rbac_utils.switch_role(True)
+    def test_override_role_to_member_role(self):
+        self.rbac_utils.override_role(True)
 
         mock_test_obj = self.rbac_utils.mock_test_obj
         roles_client = self.rbac_utils.roles_v3_client
@@ -92,9 +93,9 @@
             [mock.call()] * 2)
         mock_time.sleep.assert_has_calls([mock.call(1)] * 2)
 
-    def test_switch_role_to_member_role_avoids_role_switch(self):
+    def test_override_role_to_member_role_avoids_role_switch(self):
         self.rbac_utils.set_roles(['admin', 'member'], 'member')
-        self.rbac_utils.switch_role(True)
+        self.rbac_utils.override_role(True)
 
         roles_client = self.rbac_utils.roles_v3_client
         mock_time = self.rbac_utils.mock_time
@@ -105,8 +106,8 @@
         ])
         mock_time.sleep.assert_called_once_with(1)
 
-    def test_switch_role_to_member_role_then_admin_role(self):
-        self.rbac_utils.switch_role(True, False)
+    def test_override_role_to_member_role_then_admin_role(self):
+        self.rbac_utils.override_role(True, False)
 
         mock_test_obj = self.rbac_utils.mock_test_obj
         roles_client = self.rbac_utils.roles_v3_client
@@ -126,47 +127,12 @@
             [mock.call()] * 3)
         mock_time.sleep.assert_has_calls([mock.call(1)] * 3)
 
-    def test_switch_role_without_boolean_value(self):
-        self.assertRaises(rbac_exceptions.RbacResourceSetupFailed,
-                          self.rbac_utils.switch_role, "admin")
-        self.assertRaises(rbac_exceptions.RbacResourceSetupFailed,
-                          self.rbac_utils.switch_role, None)
-
-    def test_switch_role_with_false_value_twice(self):
-        expected_error_message = (
-            '`toggle_rbac_role` must not be called with the same bool value '
-            'twice. Make sure that you included a rbac_utils.switch_role '
-            'method call inside the test.')
-
-        e = self.assertRaises(rbac_exceptions.RbacResourceSetupFailed,
-                              self.rbac_utils.switch_role, False)
-        self.assertIn(expected_error_message, str(e))
-
-        e = self.assertRaises(rbac_exceptions.RbacResourceSetupFailed,
-                              self.rbac_utils.switch_role, True, False, False)
-        self.assertIn(expected_error_message, str(e))
-
-    def test_switch_role_with_true_value_twice(self):
-        expected_error_message = (
-            '`toggle_rbac_role` must not be called with the same bool value '
-            'twice. Make sure that you included a rbac_utils.switch_role '
-            'method call inside the test.')
-
-        e = self.assertRaises(rbac_exceptions.RbacResourceSetupFailed,
-                              self.rbac_utils.switch_role, True, True)
-        self.assertIn(expected_error_message, str(e))
-
-        e = self.assertRaises(rbac_exceptions.RbacResourceSetupFailed,
-                              self.rbac_utils.switch_role, True, False, True,
-                              True)
-        self.assertIn(expected_error_message, str(e))
-
     def test_clear_user_roles(self):
         # NOTE(felipemonteiro): Set the user's roles on the project to
         # include 'random' to coerce a role switch, or else it will be
         # skipped.
         self.rbac_utils.set_roles(['admin', 'member'], ['member', 'random'])
-        self.rbac_utils.switch_role()
+        self.rbac_utils.override_role()
 
         roles_client = self.rbac_utils.roles_v3_client
 
@@ -179,20 +145,57 @@
                 mock.call(mock.sentinel.project_id, mock.sentinel.user_id,
                           'random_id')])
 
-    @mock.patch.object(rbac_utils, 'LOG', autospec=True)
-    @mock.patch.object(rbac_utils, 'sys', autospec=True)
-    def test_switch_roles_with_unexpected_exception(self, mock_sys, mock_log):
-        """Test whether unexpected exceptions don't throw error.
-
-        If an unexpected exception or skip exception is raised, then that
-        should not result in an error being raised.
+    @mock.patch.object(rbac_utils.RbacUtils, '_override_role', autospec=True)
+    def test_override_role_context_manager_simulate_pass(self,
+                                                         mock_override_role):
+        """Validate that expected override_role calls are made when switching
+        to admin role for success path.
         """
-        unexpected_exceptions = [testtools.TestCase.skipException,
-                                 AttributeError]
+        test_obj = mock.MagicMock()
+        _rbac_utils = rbac_utils.RbacUtils(test_obj)
 
-        for unexpected_exception in unexpected_exceptions:
-            mock_sys.exc_info.return_value = [unexpected_exception()]
-            # Ordinarily calling switch_role twice with the same value should
-            # result in an error being thrown -- but not in this case.
-            self.rbac_utils.switch_role(False)
-            mock_log.error.assert_not_called()
+        # Validate constructor called _override_role with False.
+        mock_override_role.assert_called_once_with(_rbac_utils, test_obj,
+                                                   False)
+        mock_override_role.reset_mock()
+
+        with _rbac_utils.override_role(test_obj):
+            # Validate `override_role` public method called private method
+            # `_override_role` with True.
+            mock_override_role.assert_called_once_with(_rbac_utils, test_obj,
+                                                       True)
+            mock_override_role.reset_mock()
+        # Validate that `override_role` switched back to admin role after
+        # contextmanager.
+        mock_override_role.assert_called_once_with(_rbac_utils, test_obj,
+                                                   False)
+
+    @mock.patch.object(rbac_utils.RbacUtils, '_override_role', autospec=True)
+    def test_override_role_context_manager_simulate_fail(self,
+                                                         mock_override_role):
+        """Validate that expected override_role calls are made when switching
+        to admin role for failure path (i.e. when test raises exception).
+        """
+        test_obj = mock.MagicMock()
+        _rbac_utils = rbac_utils.RbacUtils(test_obj)
+
+        # Validate constructor called _override_role with False.
+        mock_override_role.assert_called_once_with(_rbac_utils, test_obj,
+                                                   False)
+        mock_override_role.reset_mock()
+
+        def _do_test():
+            with _rbac_utils.override_role(test_obj):
+                # Validate `override_role` public method called private method
+                # `_override_role` with True.
+                mock_override_role.assert_called_once_with(
+                    _rbac_utils, test_obj, True)
+                mock_override_role.reset_mock()
+                # Raise exc to verify role switch works for negative case.
+                raise lib_exc.Forbidden()
+
+        # Validate that role is switched back to admin, despite test failure.
+        with testtools.ExpectedException(lib_exc.Forbidden):
+            _do_test()
+        mock_override_role.assert_called_once_with(_rbac_utils, test_obj,
+                                                   False)