Helper for validating RBAC list actions

List RBAC actions typically perform soft authorization checks meaning
that the response bodies omit resources that the user isn't authorized
to see.
For example, if an admin user creates a user, member role might not be
able to see that user when listing all the users in a tenant,
depending on the RBAC rule.
This patch set adds override_role_and_validate_list function to
RbacUtils to validate RBAC flows for API list actions.

Change-Id: I5f39efc8aa0004d4ad435cbd6b8fb037c33832d6
diff --git a/patrole_tempest_plugin/rbac_exceptions.py b/patrole_tempest_plugin/rbac_exceptions.py
index ad697b0..c30961b 100644
--- a/patrole_tempest_plugin/rbac_exceptions.py
+++ b/patrole_tempest_plugin/rbac_exceptions.py
@@ -47,7 +47,7 @@
     """Raised when a list or show action is empty following RBAC authorization
     failure.
     """
-    message = ("The response body is empty due to policy enforcement failure.")
+    message = "The response body is empty due to policy enforcement failure."
 
 
 class RbacResourceSetupFailed(BasePatroleException):
@@ -104,3 +104,16 @@
     * an exception is raised after ``override_role`` context
     """
     message = "Override role failure or incorrect usage"
+
+
+class RbacValidateListException(BasePatroleException):
+    """Raised when override_role_and_validate_list is used incorrectly.
+
+    Specifically, when:
+
+    * Neither ``resource_id`` nor ``resources`` is initialized
+    * Both ``resource_id`` and ``resources`` are initialized
+    * The ``ctx.resources`` variable wasn't set in
+        override_role_and_validate_list context.
+    """
+    message = "Incorrect usage of override_role_and_validate_list: %(reason)s"
diff --git a/patrole_tempest_plugin/rbac_utils.py b/patrole_tempest_plugin/rbac_utils.py
index 33955c3..6aab4d7 100644
--- a/patrole_tempest_plugin/rbac_utils.py
+++ b/patrole_tempest_plugin/rbac_utils.py
@@ -13,7 +13,7 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
-from contextlib import contextmanager
+import contextlib
 import sys
 import time
 
@@ -31,6 +31,77 @@
 LOG = logging.getLogger(__name__)
 
 
+class _ValidateListContext(object):
+    """Context class responsible for validation of the list functions.
+
+    This class is used in ``override_role_and_validate_list`` function and
+    the result of a list function must be assigned to the ``ctx.resources``
+    variable.
+
+    Example::
+
+        with self.rbac_utils.override_role_and_validate_list(...) as ctx:
+            ctx.resources = list_function()
+
+    """
+    def __init__(self, admin_resources=None, admin_resource_id=None):
+        """Constructor for ``ValidateListContext``.
+
+        Either ``admin_resources`` or ``admin_resource_id`` should be used,
+            not both.
+
+        :param list admin_resources: The list of resources received before
+            calling the ``override_role_and_validate_list`` function. To
+            validate will be used the ``_validate_len`` function.
+        :param UUID admin_resource_id: An ID of a resource created before
+            calling the ``override_role_and_validate_list`` function. To
+            validate will be used the ``_validate_resource`` function.
+        :raises RbacValidateListException: if both ``admin_resources`` and
+            ``admin_resource_id`` are set or unset.
+        """
+        self.resources = None
+        if admin_resources is not None and not admin_resource_id:
+            self._admin_len = len(admin_resources)
+            if not self._admin_len:
+                raise rbac_exceptions.RbacValidateListException(
+                    reason="the list of admin resources cannot be empty")
+            self._validate_func = self._validate_len
+        elif admin_resource_id and admin_resources is None:
+            self._admin_resource_id = admin_resource_id
+            self._validate_func = self._validate_resource
+        else:
+            raise rbac_exceptions.RbacValidateListException(
+                reason="admin_resources and admin_resource_id are mutually "
+                       "exclusive")
+
+    def _validate_len(self):
+        """Validates that the number of resources is less than admin resources.
+        """
+        if not len(self.resources):
+            raise rbac_exceptions.RbacEmptyResponseBody()
+        elif self._admin_len > len(self.resources):
+            raise rbac_exceptions.RbacPartialResponseBody(body=self.resources)
+
+    def _validate_resource(self):
+        """Validates that the admin resource is present in the resources.
+        """
+        for resource in self.resources:
+            if resource['id'] == self._admin_resource_id:
+                return
+        raise rbac_exceptions.RbacPartialResponseBody(body=self.resources)
+
+    def _validate(self):
+        """Calls the proper validation function.
+
+        :raises RbacValidateListException: if the ``ctx.resources`` variable is
+            not assigned.
+        """
+        if self.resources is None:
+            raise rbac_exceptions.RbacValidateListException(
+                reason="ctx.resources is not assigned")
+        self._validate_func()
+
+
 class RbacUtils(object):
     """Utility class responsible for switching ``os_primary`` role.
 
@@ -68,7 +139,7 @@
     admin_role_id = None
     rbac_role_ids = None
 
-    @contextmanager
+    @contextlib.contextmanager
     def override_role(self, test_obj):
         """Override the role used by ``os_primary`` Tempest credentials.
 
@@ -220,6 +291,41 @@
 
         return False
 
+    @contextlib.contextmanager
+    def override_role_and_validate_list(self, test_obj, admin_resources=None,
+                                        admin_resource_id=None):
+        """Call ``override_role`` and validate RBAC for a list API action.
+
+        List actions usually do soft authorization: partial or empty response
+        bodies are returned instead of exceptions. This helper validates
+        that unauthorized roles only return a subset of the available
+        resources.
+        Should only be used for validating list API actions.
+
+        :param test_obj: Instance of ``tempest.test.BaseTestCase``.
+        :param list admin_resources: The list of resources received before
+            calling the ``override_role_and_validate_list`` function.
+        :param UUID admin_resource_id: An ID of a resource created before
+            calling the ``override_role_and_validate_list`` function.
+        :return: py:class:`_ValidateListContext` object.
+
+        Example::
+
+            # the resource created by admin
+            admin_resource_id = (
+                self.ntp_client.create_dscp_marking_rule()
+                ["dscp_marking_rule"]["id'])
+            with self.rbac_utils.override_role_and_validate_list(
+                    self, admin_resource_id=admin_resource_id) as ctx:
+                # the list of resources available for member role
+                ctx.resources = self.ntp_client.list_dscp_marking_rules(
+                    policy_id=self.policy_id)["dscp_marking_rules"]
+        """
+        ctx = _ValidateListContext(admin_resources, admin_resource_id)
+        with self.override_role(test_obj):
+            yield ctx
+            ctx._validate()
+
 
 class RbacUtilsMixin(object):
     """Mixin class to be used alongside an instance of
diff --git a/patrole_tempest_plugin/tests/unit/test_rbac_utils.py b/patrole_tempest_plugin/tests/unit/test_rbac_utils.py
index bd13e34..9fe5ffa 100644
--- a/patrole_tempest_plugin/tests/unit/test_rbac_utils.py
+++ b/patrole_tempest_plugin/tests/unit/test_rbac_utils.py
@@ -200,6 +200,19 @@
         mock_override_role.assert_called_once_with(_rbac_utils, test_obj,
                                                    False)
 
+    def test_override_role_and_validate_list(self):
+        self.patchobject(rbac_utils.RbacUtils, '_override_role')
+        test_obj = mock.MagicMock()
+        _rbac_utils = rbac_utils.RbacUtils(test_obj)
+        m_override_role = self.patchobject(_rbac_utils, 'override_role')
+
+        with (_rbac_utils.override_role_and_validate_list(
+                test_obj, 'foo')) as ctx:
+            self.assertIsInstance(ctx, rbac_utils._ValidateListContext)
+            m_validate = self.patchobject(ctx, '_validate')
+        m_override_role.assert_called_once_with(test_obj)
+        m_validate.assert_called_once()
+
 
 class RBACUtilsMixinTest(base.TestCase):
 
@@ -233,3 +246,87 @@
 
         self.assertTrue(hasattr(child_test, 'rbac_utils'))
         self.assertIsInstance(child_test.rbac_utils, rbac_utils.RbacUtils)
+
+
+class ValidateListContextTest(base.TestCase):
+    @staticmethod
+    def _get_context(admin_resources=None, admin_resource_id=None):
+        return rbac_utils._ValidateListContext(
+            admin_resources=admin_resources,
+            admin_resource_id=admin_resource_id)
+
+    def test_incorrect_usage(self):
+        # admin_resources and admin_resource_is are not assigned
+        self.assertRaises(rbac_exceptions.RbacValidateListException,
+                          self._get_context)
+
+        # both admin_resources and admin_resource_is are assigned
+        self.assertRaises(rbac_exceptions.RbacValidateListException,
+                          self._get_context,
+                          admin_resources='foo', admin_resource_id='bar')
+        # empty list assigned to admin_resources
+        self.assertRaises(rbac_exceptions.RbacValidateListException,
+                          self._get_context, admin_resources=[])
+
+        # ctx.resources is not assigned
+        ctx = self._get_context(admin_resources='foo')
+        self.assertRaises(rbac_exceptions.RbacValidateListException,
+                          ctx._validate)
+
+    def test_validate_len_negative(self):
+        ctx = self._get_context(admin_resources=[1, 2, 3, 4])
+        self.assertEqual(ctx._validate_len, ctx._validate_func)
+        self.assertEqual(4, ctx._admin_len)
+        self.assertFalse(hasattr(ctx, '_admin_resource_id'))
+
+        # the number of resources is less than admin resources
+        ctx.resources = [1, 2, 3]
+        self.assertRaises(rbac_exceptions.RbacPartialResponseBody,
+                          ctx._validate_len)
+
+        # the resources is empty
+        ctx.resources = []
+        self.assertRaises(rbac_exceptions.RbacEmptyResponseBody,
+                          ctx._validate_len)
+
+    def test_validate_len(self):
+        ctx = self._get_context(admin_resources=[1, 2, 3, 4])
+
+        # the number of resources and admin resources are same
+        ctx.resources = [1, 2, 3, 4]
+        self.assertIsNone(ctx._validate_len())
+
+    def test_validate_resource_negative(self):
+        ctx = self._get_context(admin_resource_id=1)
+        self.assertEqual(ctx._validate_resource, ctx._validate_func)
+        self.assertEqual(1, ctx._admin_resource_id)
+        self.assertFalse(hasattr(ctx, '_admin_len'))
+
+        # there is no admin resource in the resources
+        ctx.resources = [{'id': 2}, {'id': 3}]
+        self.assertRaises(rbac_exceptions.RbacPartialResponseBody,
+                          ctx._validate_resource)
+
+    def test_validate_resource(self):
+        ctx = self._get_context(admin_resource_id=1)
+
+        # there is admin resource in the resources
+        ctx.resources = [{'id': 1}, {'id': 2}]
+        self.assertIsNone(ctx._validate_resource())
+
+    def test_validate(self):
+        ctx = self._get_context(admin_resources='foo')
+        ctx.resources = 'bar'
+        with mock.patch.object(ctx, '_validate_func',
+                               autospec=False) as m_validate_func:
+            m_validate_func.side_effect = (
+                rbac_exceptions.RbacPartialResponseBody,
+                None
+            )
+            self.assertRaises(rbac_exceptions.RbacPartialResponseBody,
+                              ctx._validate)
+            m_validate_func.assert_called_once()
+
+            m_validate_func.reset_mock()
+            ctx._validate()
+            m_validate_func.assert_called_once()
diff --git a/releasenotes/notes/override-role-and-validate-list-d3b80f773674a652.yaml b/releasenotes/notes/override-role-and-validate-list-d3b80f773674a652.yaml
new file mode 100644
index 0000000..de05b76
--- /dev/null
+++ b/releasenotes/notes/override-role-and-validate-list-d3b80f773674a652.yaml
@@ -0,0 +1,37 @@
+---
+features:
+  - |
+    In order to test the list actions which doesn't have its own policy,
+    implemented the ``override_role_and_validate_list`` function.
+    The function has two modes:
+
+    * Validating the number of the resources in a ``ResponseBody`` before
+      calling the ``override_role`` and after.
+
+      .. code-block:: python
+
+        # make sure at least one resource is available
+        self.ntp_client.create_policy_dscp_marking_rule()
+        # the list of resources available for a user with admin role
+        admin_resources = self.ntp_client.list_dscp_marking_rules(
+            policy_id=self.policy_id)["dscp_marking_rules"]
+        with self.rbac_utils.override_role_and_validate_list(
+                self, admin_resources=admin_resources) as ctx:
+            # the list of resources available for a user with member role
+            ctx.resources = self.ntp_client.list_dscp_marking_rules(
+                policy_id=self.policy_id)["dscp_marking_rules"]
+
+    * Validating that a resource, created before ``override_role``, is not
+      present in a ``ResponseBody``.
+
+      .. code-block:: python
+
+        # the resource created by a user with admin role
+        admin_resource_id = (
+            self.ntp_client.create_dscp_marking_rule()
+            ["dscp_marking_rule"]["id'])
+        with self.rbac_utils.override_role_and_validate_list(
+                self, admin_resource_id=admin_resource_id) as ctx:
+            # the list of resources available for a user wirh member role
+            ctx.resources = self.ntp_client.list_dscp_marking_rules(
+                policy_id=self.policy_id)["dscp_marking_rules"]