Merge "Helper for validating RBAC list actions"
diff --git a/patrole_tempest_plugin/rbac_exceptions.py b/patrole_tempest_plugin/rbac_exceptions.py
index ad697b0..c30961b 100644
--- a/patrole_tempest_plugin/rbac_exceptions.py
+++ b/patrole_tempest_plugin/rbac_exceptions.py
@@ -47,7 +47,7 @@
     """Raised when a list or show action is empty following RBAC authorization
     failure.
     """
-    message = ("The response body is empty due to policy enforcement failure.")
+    message = "The response body is empty due to policy enforcement failure."
 
 
 class RbacResourceSetupFailed(BasePatroleException):
@@ -104,3 +104,16 @@
     * an exception is raised after ``override_role`` context
     """
     message = "Override role failure or incorrect usage"
+
+
+class RbacValidateListException(BasePatroleException):
+    """Raised when override_role_and_validate_list is used incorrectly.
+
+    Specifically, when:
+
+    * Neither ``resource_id`` nor ``resources`` is initialized
+    * Both ``resource_id`` and ``resources`` are initialized
+    * The ``ctx.resources`` variable wasn't set in
+        override_role_and_validate_list context.
+    """
+    message = "Incorrect usage of override_role_and_validate_list: %(reason)s"
diff --git a/patrole_tempest_plugin/rbac_utils.py b/patrole_tempest_plugin/rbac_utils.py
index 33955c3..6aab4d7 100644
--- a/patrole_tempest_plugin/rbac_utils.py
+++ b/patrole_tempest_plugin/rbac_utils.py
@@ -13,7 +13,7 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
-from contextlib import contextmanager
+import contextlib
 import sys
 import time
 
@@ -31,6 +31,77 @@
 LOG = logging.getLogger(__name__)
 
 
+class _ValidateListContext(object):
+    """Context class responsible for validation of the list functions.
+
+    This class is used in ``override_role_and_validate_list`` function and
+    the result of a list function must be assigned to the ``ctx.resources``
+    variable.
+
+    Example::
+
+        with self.rbac_utils.override_role_and_validate_list(...) as ctx:
+            ctx.resources = list_function()
+
+    """
+    def __init__(self, admin_resources=None, admin_resource_id=None):
+        """Constructor for ``ValidateListContext``.
+
+        Either ``admin_resources`` or ``admin_resource_id`` should be used,
+            not both.
+
+        :param list admin_resources: The list of resources received before
+            calling the ``override_role_and_validate_list`` function. To
+            validate will be used the ``_validate_len`` function.
+        :param UUID admin_resource_id: An ID of a resource created before
+            calling the ``override_role_and_validate_list`` function. To
+            validate will be used the ``_validate_resource`` function.
+        :raises RbacValidateListException: if both ``admin_resources`` and
+            ``admin_resource_id`` are set or unset.
+        """
+        self.resources = None
+        if admin_resources is not None and not admin_resource_id:
+            self._admin_len = len(admin_resources)
+            if not self._admin_len:
+                raise rbac_exceptions.RbacValidateListException(
+                    reason="the list of admin resources cannot be empty")
+            self._validate_func = self._validate_len
+        elif admin_resource_id and admin_resources is None:
+            self._admin_resource_id = admin_resource_id
+            self._validate_func = self._validate_resource
+        else:
+            raise rbac_exceptions.RbacValidateListException(
+                reason="admin_resources and admin_resource_id are mutually "
+                       "exclusive")
+
+    def _validate_len(self):
+        """Validates that the number of resources is less than admin resources.
+        """
+        if not len(self.resources):
+            raise rbac_exceptions.RbacEmptyResponseBody()
+        elif self._admin_len > len(self.resources):
+            raise rbac_exceptions.RbacPartialResponseBody(body=self.resources)
+
+    def _validate_resource(self):
+        """Validates that the admin resource is present in the resources.
+        """
+        for resource in self.resources:
+            if resource['id'] == self._admin_resource_id:
+                return
+        raise rbac_exceptions.RbacPartialResponseBody(body=self.resources)
+
+    def _validate(self):
+        """Calls the proper validation function.
+
+        :raises RbacValidateListException: if the ``ctx.resources`` variable is
+            not assigned.
+        """
+        if self.resources is None:
+            raise rbac_exceptions.RbacValidateListException(
+                reason="ctx.resources is not assigned")
+        self._validate_func()
+
+
 class RbacUtils(object):
     """Utility class responsible for switching ``os_primary`` role.
 
@@ -68,7 +139,7 @@
     admin_role_id = None
     rbac_role_ids = None
 
-    @contextmanager
+    @contextlib.contextmanager
     def override_role(self, test_obj):
         """Override the role used by ``os_primary`` Tempest credentials.
 
@@ -220,6 +291,41 @@
 
         return False
 
+    @contextlib.contextmanager
+    def override_role_and_validate_list(self, test_obj, admin_resources=None,
+                                        admin_resource_id=None):
+        """Call ``override_role`` and validate RBAC for a list API action.
+
+        List actions usually do soft authorization: partial or empty response
+        bodies are returned instead of exceptions. This helper validates
+        that unauthorized roles only return a subset of the available
+        resources.
+        Should only be used for validating list API actions.
+
+        :param test_obj: Instance of ``tempest.test.BaseTestCase``.
+        :param list admin_resources: The list of resources received before
+            calling the ``override_role_and_validate_list`` function.
+        :param UUID admin_resource_id: An ID of a resource created before
+            calling the ``override_role_and_validate_list`` function.
+        :return: py:class:`_ValidateListContext` object.
+
+        Example::
+
+            # the resource created by admin
+            admin_resource_id = (
+                self.ntp_client.create_dscp_marking_rule()
+                ["dscp_marking_rule"]["id'])
+            with self.rbac_utils.override_role_and_validate_list(
+                    self, admin_resource_id=admin_resource_id) as ctx:
+                # the list of resources available for member role
+                ctx.resources = self.ntp_client.list_dscp_marking_rules(
+                    policy_id=self.policy_id)["dscp_marking_rules"]
+        """
+        ctx = _ValidateListContext(admin_resources, admin_resource_id)
+        with self.override_role(test_obj):
+            yield ctx
+            ctx._validate()
+
 
 class RbacUtilsMixin(object):
     """Mixin class to be used alongside an instance of
diff --git a/patrole_tempest_plugin/tests/unit/test_rbac_utils.py b/patrole_tempest_plugin/tests/unit/test_rbac_utils.py
index bd13e34..9fe5ffa 100644
--- a/patrole_tempest_plugin/tests/unit/test_rbac_utils.py
+++ b/patrole_tempest_plugin/tests/unit/test_rbac_utils.py
@@ -200,6 +200,19 @@
         mock_override_role.assert_called_once_with(_rbac_utils, test_obj,
                                                    False)
 
+    def test_override_role_and_validate_list(self):
+        self.patchobject(rbac_utils.RbacUtils, '_override_role')
+        test_obj = mock.MagicMock()
+        _rbac_utils = rbac_utils.RbacUtils(test_obj)
+        m_override_role = self.patchobject(_rbac_utils, 'override_role')
+
+        with (_rbac_utils.override_role_and_validate_list(
+                test_obj, 'foo')) as ctx:
+            self.assertIsInstance(ctx, rbac_utils._ValidateListContext)
+            m_validate = self.patchobject(ctx, '_validate')
+        m_override_role.assert_called_once_with(test_obj)
+        m_validate.assert_called_once()
+
 
 class RBACUtilsMixinTest(base.TestCase):
 
@@ -233,3 +246,87 @@
 
         self.assertTrue(hasattr(child_test, 'rbac_utils'))
         self.assertIsInstance(child_test.rbac_utils, rbac_utils.RbacUtils)
+
+
+class ValidateListContextTest(base.TestCase):
+    @staticmethod
+    def _get_context(admin_resources=None, admin_resource_id=None):
+        return rbac_utils._ValidateListContext(
+            admin_resources=admin_resources,
+            admin_resource_id=admin_resource_id)
+
+    def test_incorrect_usage(self):
+        # admin_resources and admin_resource_is are not assigned
+        self.assertRaises(rbac_exceptions.RbacValidateListException,
+                          self._get_context)
+
+        # both admin_resources and admin_resource_is are assigned
+        self.assertRaises(rbac_exceptions.RbacValidateListException,
+                          self._get_context,
+                          admin_resources='foo', admin_resource_id='bar')
+        # empty list assigned to admin_resources
+        self.assertRaises(rbac_exceptions.RbacValidateListException,
+                          self._get_context, admin_resources=[])
+
+        # ctx.resources is not assigned
+        ctx = self._get_context(admin_resources='foo')
+        self.assertRaises(rbac_exceptions.RbacValidateListException,
+                          ctx._validate)
+
+    def test_validate_len_negative(self):
+        ctx = self._get_context(admin_resources=[1, 2, 3, 4])
+        self.assertEqual(ctx._validate_len, ctx._validate_func)
+        self.assertEqual(4, ctx._admin_len)
+        self.assertFalse(hasattr(ctx, '_admin_resource_id'))
+
+        # the number of resources is less than admin resources
+        ctx.resources = [1, 2, 3]
+        self.assertRaises(rbac_exceptions.RbacPartialResponseBody,
+                          ctx._validate_len)
+
+        # the resources is empty
+        ctx.resources = []
+        self.assertRaises(rbac_exceptions.RbacEmptyResponseBody,
+                          ctx._validate_len)
+
+    def test_validate_len(self):
+        ctx = self._get_context(admin_resources=[1, 2, 3, 4])
+
+        # the number of resources and admin resources are same
+        ctx.resources = [1, 2, 3, 4]
+        self.assertIsNone(ctx._validate_len())
+
+    def test_validate_resource_negative(self):
+        ctx = self._get_context(admin_resource_id=1)
+        self.assertEqual(ctx._validate_resource, ctx._validate_func)
+        self.assertEqual(1, ctx._admin_resource_id)
+        self.assertFalse(hasattr(ctx, '_admin_len'))
+
+        # there is no admin resource in the resources
+        ctx.resources = [{'id': 2}, {'id': 3}]
+        self.assertRaises(rbac_exceptions.RbacPartialResponseBody,
+                          ctx._validate_resource)
+
+    def test_validate_resource(self):
+        ctx = self._get_context(admin_resource_id=1)
+
+        # there is admin resource in the resources
+        ctx.resources = [{'id': 1}, {'id': 2}]
+        self.assertIsNone(ctx._validate_resource())
+
+    def test_validate(self):
+        ctx = self._get_context(admin_resources='foo')
+        ctx.resources = 'bar'
+        with mock.patch.object(ctx, '_validate_func',
+                               autospec=False) as m_validate_func:
+            m_validate_func.side_effect = (
+                rbac_exceptions.RbacPartialResponseBody,
+                None
+            )
+            self.assertRaises(rbac_exceptions.RbacPartialResponseBody,
+                              ctx._validate)
+            m_validate_func.assert_called_once()
+
+            m_validate_func.reset_mock()
+            ctx._validate()
+            m_validate_func.assert_called_once()
diff --git a/releasenotes/notes/override-role-and-validate-list-d3b80f773674a652.yaml b/releasenotes/notes/override-role-and-validate-list-d3b80f773674a652.yaml
new file mode 100644
index 0000000..de05b76
--- /dev/null
+++ b/releasenotes/notes/override-role-and-validate-list-d3b80f773674a652.yaml
@@ -0,0 +1,37 @@
+---
+features:
+  - |
+    In order to test the list actions which doesn't have its own policy,
+    implemented the ``override_role_and_validate_list`` function.
+    The function has two modes:
+
+    * Validating the number of the resources in a ``ResponseBody`` before
+      calling the ``override_role`` and after.
+
+      .. code-block:: python
+
+        # make sure at least one resource is available
+        self.ntp_client.create_policy_dscp_marking_rule()
+        # the list of resources available for a user with admin role
+        admin_resources = self.ntp_client.list_dscp_marking_rules(
+            policy_id=self.policy_id)["dscp_marking_rules"]
+        with self.rbac_utils.override_role_and_validate_list(
+                self, admin_resources=admin_resources) as ctx:
+            # the list of resources available for a user with member role
+            ctx.resources = self.ntp_client.list_dscp_marking_rules(
+                policy_id=self.policy_id)["dscp_marking_rules"]
+
+    * Validating that a resource, created before ``override_role``, is not
+      present in a ``ResponseBody``.
+
+      .. code-block:: python
+
+        # the resource created by a user with admin role
+        admin_resource_id = (
+            self.ntp_client.create_dscp_marking_rule()
+            ["dscp_marking_rule"]["id'])
+        with self.rbac_utils.override_role_and_validate_list(
+                self, admin_resource_id=admin_resource_id) as ctx:
+            # the list of resources available for a user wirh member role
+            ctx.resources = self.ntp_client.list_dscp_marking_rules(
+                policy_id=self.policy_id)["dscp_marking_rules"]