Merge "Helper for validating RBAC list actions"
diff --git a/patrole_tempest_plugin/rbac_exceptions.py b/patrole_tempest_plugin/rbac_exceptions.py
index ad697b0..c30961b 100644
--- a/patrole_tempest_plugin/rbac_exceptions.py
+++ b/patrole_tempest_plugin/rbac_exceptions.py
@@ -47,7 +47,7 @@
"""Raised when a list or show action is empty following RBAC authorization
failure.
"""
- message = ("The response body is empty due to policy enforcement failure.")
+ message = "The response body is empty due to policy enforcement failure."
class RbacResourceSetupFailed(BasePatroleException):
@@ -104,3 +104,16 @@
* an exception is raised after ``override_role`` context
"""
message = "Override role failure or incorrect usage"
+
+
+class RbacValidateListException(BasePatroleException):
+ """Raised when override_role_and_validate_list is used incorrectly.
+
+ Specifically, when:
+
+ * Neither ``resource_id`` nor ``resources`` is initialized
+ * Both ``resource_id`` and ``resources`` are initialized
+ * The ``ctx.resources`` variable wasn't set in
+ override_role_and_validate_list context.
+ """
+ message = "Incorrect usage of override_role_and_validate_list: %(reason)s"
diff --git a/patrole_tempest_plugin/rbac_utils.py b/patrole_tempest_plugin/rbac_utils.py
index 33955c3..6aab4d7 100644
--- a/patrole_tempest_plugin/rbac_utils.py
+++ b/patrole_tempest_plugin/rbac_utils.py
@@ -13,7 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
-from contextlib import contextmanager
+import contextlib
import sys
import time
@@ -31,6 +31,77 @@
LOG = logging.getLogger(__name__)
+class _ValidateListContext(object):
+ """Context class responsible for validation of the list functions.
+
+ This class is used in ``override_role_and_validate_list`` function and
+ the result of a list function must be assigned to the ``ctx.resources``
+ variable.
+
+ Example::
+
+ with self.rbac_utils.override_role_and_validate_list(...) as ctx:
+ ctx.resources = list_function()
+
+ """
+ def __init__(self, admin_resources=None, admin_resource_id=None):
+ """Constructor for ``ValidateListContext``.
+
+ Either ``admin_resources`` or ``admin_resource_id`` should be used,
+ not both.
+
+ :param list admin_resources: The list of resources received before
+ calling the ``override_role_and_validate_list`` function. To
+ validate will be used the ``_validate_len`` function.
+ :param UUID admin_resource_id: An ID of a resource created before
+ calling the ``override_role_and_validate_list`` function. To
+ validate will be used the ``_validate_resource`` function.
+ :raises RbacValidateListException: if both ``admin_resources`` and
+ ``admin_resource_id`` are set or unset.
+ """
+ self.resources = None
+ if admin_resources is not None and not admin_resource_id:
+ self._admin_len = len(admin_resources)
+ if not self._admin_len:
+ raise rbac_exceptions.RbacValidateListException(
+ reason="the list of admin resources cannot be empty")
+ self._validate_func = self._validate_len
+ elif admin_resource_id and admin_resources is None:
+ self._admin_resource_id = admin_resource_id
+ self._validate_func = self._validate_resource
+ else:
+ raise rbac_exceptions.RbacValidateListException(
+ reason="admin_resources and admin_resource_id are mutually "
+ "exclusive")
+
+ def _validate_len(self):
+ """Validates that the number of resources is less than admin resources.
+ """
+ if not len(self.resources):
+ raise rbac_exceptions.RbacEmptyResponseBody()
+ elif self._admin_len > len(self.resources):
+ raise rbac_exceptions.RbacPartialResponseBody(body=self.resources)
+
+ def _validate_resource(self):
+ """Validates that the admin resource is present in the resources.
+ """
+ for resource in self.resources:
+ if resource['id'] == self._admin_resource_id:
+ return
+ raise rbac_exceptions.RbacPartialResponseBody(body=self.resources)
+
+ def _validate(self):
+ """Calls the proper validation function.
+
+ :raises RbacValidateListException: if the ``ctx.resources`` variable is
+ not assigned.
+ """
+ if self.resources is None:
+ raise rbac_exceptions.RbacValidateListException(
+ reason="ctx.resources is not assigned")
+ self._validate_func()
+
+
class RbacUtils(object):
"""Utility class responsible for switching ``os_primary`` role.
@@ -68,7 +139,7 @@
admin_role_id = None
rbac_role_ids = None
- @contextmanager
+ @contextlib.contextmanager
def override_role(self, test_obj):
"""Override the role used by ``os_primary`` Tempest credentials.
@@ -220,6 +291,41 @@
return False
+ @contextlib.contextmanager
+ def override_role_and_validate_list(self, test_obj, admin_resources=None,
+ admin_resource_id=None):
+ """Call ``override_role`` and validate RBAC for a list API action.
+
+ List actions usually do soft authorization: partial or empty response
+ bodies are returned instead of exceptions. This helper validates
+ that unauthorized roles only return a subset of the available
+ resources.
+ Should only be used for validating list API actions.
+
+ :param test_obj: Instance of ``tempest.test.BaseTestCase``.
+ :param list admin_resources: The list of resources received before
+ calling the ``override_role_and_validate_list`` function.
+ :param UUID admin_resource_id: An ID of a resource created before
+ calling the ``override_role_and_validate_list`` function.
+ :return: py:class:`_ValidateListContext` object.
+
+ Example::
+
+ # the resource created by admin
+ admin_resource_id = (
+ self.ntp_client.create_dscp_marking_rule()
+ ["dscp_marking_rule"]["id'])
+ with self.rbac_utils.override_role_and_validate_list(
+ self, admin_resource_id=admin_resource_id) as ctx:
+ # the list of resources available for member role
+ ctx.resources = self.ntp_client.list_dscp_marking_rules(
+ policy_id=self.policy_id)["dscp_marking_rules"]
+ """
+ ctx = _ValidateListContext(admin_resources, admin_resource_id)
+ with self.override_role(test_obj):
+ yield ctx
+ ctx._validate()
+
class RbacUtilsMixin(object):
"""Mixin class to be used alongside an instance of
diff --git a/patrole_tempest_plugin/tests/unit/test_rbac_utils.py b/patrole_tempest_plugin/tests/unit/test_rbac_utils.py
index bd13e34..9fe5ffa 100644
--- a/patrole_tempest_plugin/tests/unit/test_rbac_utils.py
+++ b/patrole_tempest_plugin/tests/unit/test_rbac_utils.py
@@ -200,6 +200,19 @@
mock_override_role.assert_called_once_with(_rbac_utils, test_obj,
False)
+ def test_override_role_and_validate_list(self):
+ self.patchobject(rbac_utils.RbacUtils, '_override_role')
+ test_obj = mock.MagicMock()
+ _rbac_utils = rbac_utils.RbacUtils(test_obj)
+ m_override_role = self.patchobject(_rbac_utils, 'override_role')
+
+ with (_rbac_utils.override_role_and_validate_list(
+ test_obj, 'foo')) as ctx:
+ self.assertIsInstance(ctx, rbac_utils._ValidateListContext)
+ m_validate = self.patchobject(ctx, '_validate')
+ m_override_role.assert_called_once_with(test_obj)
+ m_validate.assert_called_once()
+
class RBACUtilsMixinTest(base.TestCase):
@@ -233,3 +246,87 @@
self.assertTrue(hasattr(child_test, 'rbac_utils'))
self.assertIsInstance(child_test.rbac_utils, rbac_utils.RbacUtils)
+
+
+class ValidateListContextTest(base.TestCase):
+ @staticmethod
+ def _get_context(admin_resources=None, admin_resource_id=None):
+ return rbac_utils._ValidateListContext(
+ admin_resources=admin_resources,
+ admin_resource_id=admin_resource_id)
+
+ def test_incorrect_usage(self):
+ # admin_resources and admin_resource_is are not assigned
+ self.assertRaises(rbac_exceptions.RbacValidateListException,
+ self._get_context)
+
+ # both admin_resources and admin_resource_is are assigned
+ self.assertRaises(rbac_exceptions.RbacValidateListException,
+ self._get_context,
+ admin_resources='foo', admin_resource_id='bar')
+ # empty list assigned to admin_resources
+ self.assertRaises(rbac_exceptions.RbacValidateListException,
+ self._get_context, admin_resources=[])
+
+ # ctx.resources is not assigned
+ ctx = self._get_context(admin_resources='foo')
+ self.assertRaises(rbac_exceptions.RbacValidateListException,
+ ctx._validate)
+
+ def test_validate_len_negative(self):
+ ctx = self._get_context(admin_resources=[1, 2, 3, 4])
+ self.assertEqual(ctx._validate_len, ctx._validate_func)
+ self.assertEqual(4, ctx._admin_len)
+ self.assertFalse(hasattr(ctx, '_admin_resource_id'))
+
+ # the number of resources is less than admin resources
+ ctx.resources = [1, 2, 3]
+ self.assertRaises(rbac_exceptions.RbacPartialResponseBody,
+ ctx._validate_len)
+
+ # the resources is empty
+ ctx.resources = []
+ self.assertRaises(rbac_exceptions.RbacEmptyResponseBody,
+ ctx._validate_len)
+
+ def test_validate_len(self):
+ ctx = self._get_context(admin_resources=[1, 2, 3, 4])
+
+ # the number of resources and admin resources are same
+ ctx.resources = [1, 2, 3, 4]
+ self.assertIsNone(ctx._validate_len())
+
+ def test_validate_resource_negative(self):
+ ctx = self._get_context(admin_resource_id=1)
+ self.assertEqual(ctx._validate_resource, ctx._validate_func)
+ self.assertEqual(1, ctx._admin_resource_id)
+ self.assertFalse(hasattr(ctx, '_admin_len'))
+
+ # there is no admin resource in the resources
+ ctx.resources = [{'id': 2}, {'id': 3}]
+ self.assertRaises(rbac_exceptions.RbacPartialResponseBody,
+ ctx._validate_resource)
+
+ def test_validate_resource(self):
+ ctx = self._get_context(admin_resource_id=1)
+
+ # there is admin resource in the resources
+ ctx.resources = [{'id': 1}, {'id': 2}]
+ self.assertIsNone(ctx._validate_resource())
+
+ def test_validate(self):
+ ctx = self._get_context(admin_resources='foo')
+ ctx.resources = 'bar'
+ with mock.patch.object(ctx, '_validate_func',
+ autospec=False) as m_validate_func:
+ m_validate_func.side_effect = (
+ rbac_exceptions.RbacPartialResponseBody,
+ None
+ )
+ self.assertRaises(rbac_exceptions.RbacPartialResponseBody,
+ ctx._validate)
+ m_validate_func.assert_called_once()
+
+ m_validate_func.reset_mock()
+ ctx._validate()
+ m_validate_func.assert_called_once()
diff --git a/releasenotes/notes/override-role-and-validate-list-d3b80f773674a652.yaml b/releasenotes/notes/override-role-and-validate-list-d3b80f773674a652.yaml
new file mode 100644
index 0000000..de05b76
--- /dev/null
+++ b/releasenotes/notes/override-role-and-validate-list-d3b80f773674a652.yaml
@@ -0,0 +1,37 @@
+---
+features:
+ - |
+ In order to test the list actions which doesn't have its own policy,
+ implemented the ``override_role_and_validate_list`` function.
+ The function has two modes:
+
+ * Validating the number of the resources in a ``ResponseBody`` before
+ calling the ``override_role`` and after.
+
+ .. code-block:: python
+
+ # make sure at least one resource is available
+ self.ntp_client.create_policy_dscp_marking_rule()
+ # the list of resources available for a user with admin role
+ admin_resources = self.ntp_client.list_dscp_marking_rules(
+ policy_id=self.policy_id)["dscp_marking_rules"]
+ with self.rbac_utils.override_role_and_validate_list(
+ self, admin_resources=admin_resources) as ctx:
+ # the list of resources available for a user with member role
+ ctx.resources = self.ntp_client.list_dscp_marking_rules(
+ policy_id=self.policy_id)["dscp_marking_rules"]
+
+ * Validating that a resource, created before ``override_role``, is not
+ present in a ``ResponseBody``.
+
+ .. code-block:: python
+
+ # the resource created by a user with admin role
+ admin_resource_id = (
+ self.ntp_client.create_dscp_marking_rule()
+ ["dscp_marking_rule"]["id'])
+ with self.rbac_utils.override_role_and_validate_list(
+ self, admin_resource_id=admin_resource_id) as ctx:
+ # the list of resources available for a user wirh member role
+ ctx.resources = self.ntp_client.list_dscp_marking_rules(
+ policy_id=self.policy_id)["dscp_marking_rules"]