Multi role RBAC validation
This patchset replaces ``CONF.patrole.rbac_test_role`` with
``CONF.patrole.rbac_test_roles``, where instead of single role
we can specify list of roles to be assigned to test user.
Change-Id: Ia68bcbdbb523dfe7c4abd6107fb4c426a566ae9d
diff --git a/patrole_tempest_plugin/config.py b/patrole_tempest_plugin/config.py
index dc0ed25..90d5fba 100644
--- a/patrole_tempest_plugin/config.py
+++ b/patrole_tempest_plugin/config.py
@@ -22,8 +22,16 @@
PatroleGroup = [
cfg.StrOpt('rbac_test_role',
default='admin',
+ deprecated_for_removal=True,
+ deprecated_reason="""This option is deprecated and being
+replaced with ``rbac_test_roles``.
+""",
help="""The current RBAC role against which to run
Patrole tests."""),
+ cfg.ListOpt('rbac_test_roles',
+ help="""List of the RBAC roles against which to run
+Patrole tests.""",
+ default=['admin']),
cfg.ListOpt('custom_policy_files',
default=['/etc/%s/policy.json'],
help="""List of the paths to search for policy files. Each
diff --git a/patrole_tempest_plugin/policy_authority.py b/patrole_tempest_plugin/policy_authority.py
index 2a49b6c..e0a26a3 100644
--- a/patrole_tempest_plugin/policy_authority.py
+++ b/patrole_tempest_plugin/policy_authority.py
@@ -154,17 +154,17 @@
if os.path.isfile(filename):
cls.policy_files[service].append(filename)
- def allowed(self, rule_name, role):
+ def allowed(self, rule_name, roles):
"""Checks if a given rule in a policy is allowed with given role.
:param string rule_name: Policy name to pass to``oslo.policy``.
- :param string role: Role to validate for authorization.
+ :param List[string] roles: List of roles to validate for authorization.
:raises RbacParsingException: If ``rule_name`` does not exist in the
cloud (in policy file or among registered in-code policy defaults).
"""
- is_admin_context = self._is_admin_context(role)
+ is_admin_context = self._is_admin_context(roles)
is_allowed = self._allowed(
- access=self._get_access_token(role),
+ access=self._get_access_token(roles),
apply_rule=rule_name,
is_admin=is_admin_context)
return is_allowed
@@ -224,7 +224,7 @@
return rules
- def _is_admin_context(self, role):
+ def _is_admin_context(self, roles):
"""Checks whether a role has admin context.
If context_is_admin is contained in the policy file, then checks
@@ -233,17 +233,17 @@
"""
if 'context_is_admin' in self.rules.keys():
return self._allowed(
- access=self._get_access_token(role),
+ access=self._get_access_token(roles),
apply_rule='context_is_admin')
- return role == CONF.identity.admin_role
+ return CONF.identity.admin_role in roles
- def _get_access_token(self, role):
+ def _get_access_token(self, roles):
access_token = {
"token": {
"roles": [
{
"name": role
- }
+ } for role in roles
],
"project_id": self.project_id,
"tenant_id": self.project_id,
diff --git a/patrole_tempest_plugin/rbac_rule_validation.py b/patrole_tempest_plugin/rbac_rule_validation.py
index c85376f..575e2c3 100644
--- a/patrole_tempest_plugin/rbac_rule_validation.py
+++ b/patrole_tempest_plugin/rbac_rule_validation.py
@@ -17,6 +17,7 @@
import logging
import sys
+from oslo_log import versionutils
from oslo_utils import excutils
import six
@@ -47,7 +48,7 @@
* an OpenStack service,
* a policy action (``rule``) enforced by that service, and
- * the test role defined by ``[patrole] rbac_test_role``
+ * the test roles defined by ``[patrole] rbac_test_roles``
determines whether the test role has sufficient permissions to perform an
API call that enforces the ``rule``.
@@ -142,7 +143,15 @@
expected_error_codes)
def decorator(test_func):
- role = CONF.patrole.rbac_test_role
+ roles = CONF.patrole.rbac_test_roles
+ # TODO(vegasq) drop once CONF.patrole.rbac_test_role is removed
+ if CONF.patrole.rbac_test_role:
+ msg = ('CONF.patrole.rbac_test_role is deprecated in favor of '
+ 'CONF.patrole.rbac_test_roles and will be removed in '
+ 'future.')
+ versionutils.report_deprecated_feature(LOG, msg)
+ if not roles:
+ roles.append(CONF.patrole.rbac_test_role)
@functools.wraps(test_func)
def wrapper(*args, **kwargs):
@@ -200,10 +209,10 @@
service)
if allowed:
- msg = ("Role %s was not allowed to perform the following "
- "actions: %s. Expected allowed actions: %s. "
- "Expected disallowed actions: %s." % (
- role, sorted(rules),
+ msg = ("User with roles %s was not allowed to perform the "
+ "following actions: %s. Expected allowed actions: "
+ "%s. Expected disallowed actions: %s." % (
+ roles, sorted(rules),
sorted(set(rules) - set(disallowed_rules)),
sorted(disallowed_rules)))
LOG.error(msg)
@@ -236,7 +245,7 @@
msg = (
"OverPermission: Role %s was allowed to perform the "
"following disallowed actions: %s" % (
- role, sorted(disallowed_rules)
+ roles, sorted(disallowed_rules)
)
)
LOG.error(msg)
@@ -328,7 +337,12 @@
LOG.error(msg)
raise rbac_exceptions.RbacResourceSetupFailed(msg)
- role = CONF.patrole.rbac_test_role
+ roles = CONF.patrole.rbac_test_roles
+ # TODO(vegasq) drop once CONF.patrole.rbac_test_role is removed
+ if CONF.patrole.rbac_test_role:
+ if not roles:
+ roles.append(CONF.patrole.rbac_test_role)
+
# Test RBAC against custom requirements. Otherwise use oslo.policy.
if CONF.patrole.test_custom_requirements:
authority = requirements_authority.RequirementsAuthority(
@@ -339,14 +353,14 @@
authority = policy_authority.PolicyAuthority(
project_id, user_id, service,
extra_target_data=formatted_target_data)
- is_allowed = authority.allowed(rule, role)
+ is_allowed = authority.allowed(rule, roles)
if is_allowed:
LOG.debug("[Policy action]: %s, [Role]: %s is allowed!", rule,
- role)
+ roles)
else:
LOG.debug("[Policy action]: %s, [Role]: %s is NOT allowed!",
- rule, role)
+ rule, roles)
return is_allowed
diff --git a/patrole_tempest_plugin/rbac_utils.py b/patrole_tempest_plugin/rbac_utils.py
index b7ac8d9..33955c3 100644
--- a/patrole_tempest_plugin/rbac_utils.py
+++ b/patrole_tempest_plugin/rbac_utils.py
@@ -40,7 +40,7 @@
up, and primary credentials, needed to perform the API call which does
policy enforcement. The primary credentials always cycle between roles
defined by ``CONF.identity.admin_role`` and
- ``CONF.patrole.rbac_test_role``.
+ ``CONF.patrole.rbac_test_roles``.
"""
def __init__(self, test_obj):
@@ -58,10 +58,15 @@
"Patrole role overriding only supports v3 identity API.")
self.admin_roles_client = admin_roles_client
+
+ self.user_id = test_obj.os_primary.credentials.user_id
+ self.project_id = test_obj.os_primary.credentials.tenant_id
+
+ # Change default role to admin
self._override_role(test_obj, False)
admin_role_id = None
- rbac_role_id = None
+ rbac_role_ids = None
@contextmanager
def override_role(self, test_obj):
@@ -69,7 +74,7 @@
Temporarily change the role used by ``os_primary`` credentials to:
- * ``[patrole] rbac_test_role`` before test execution
+ * ``[patrole] rbac_test_roles`` before test execution
* ``[identity] admin_role`` after test execution
Automatically switches to admin role after test execution.
@@ -122,25 +127,21 @@
* If True: role is set to ``[patrole] rbac_test_role``
* If False: role is set to ``[identity] admin_role``
"""
- self.user_id = test_obj.os_primary.credentials.user_id
- self.project_id = test_obj.os_primary.credentials.tenant_id
- self.token = test_obj.os_primary.auth_provider.get_token()
-
LOG.debug('Overriding role to: %s.', toggle_rbac_role)
- role_already_present = False
+ roles_already_present = False
try:
- if not all([self.admin_role_id, self.rbac_role_id]):
+ if not all([self.admin_role_id, self.rbac_role_ids]):
self._get_roles_by_name()
- target_role = (
- self.rbac_role_id if toggle_rbac_role else self.admin_role_id)
- role_already_present = self._list_and_clear_user_roles_on_project(
- target_role)
+ target_roles = (self.rbac_role_ids
+ if toggle_rbac_role else [self.admin_role_id])
+ roles_already_present = self._list_and_clear_user_roles_on_project(
+ target_roles)
# Do not override roles if `target_role` already exists.
- if not role_already_present:
- self._create_user_role_on_project(target_role)
+ if not roles_already_present:
+ self._create_user_role_on_project(target_roles)
except Exception as exp:
with excutils.save_and_reraise_exception():
LOG.exception(exp)
@@ -152,8 +153,8 @@
# passing the second boundary before attempting to authenticate.
# Only sleep if a token revocation occurred as a result of role
# overriding. This will optimize test runtime in the case where
- # ``[identity] admin_role`` == ``[patrole] rbac_test_role``.
- if not role_already_present:
+ # ``[identity] admin_role`` == ``[patrole] rbac_test_roles``.
+ if not roles_already_present:
time.sleep(1)
for provider in auth_providers:
@@ -164,41 +165,53 @@
role_map = {r['name']: r['id'] for r in available_roles}
LOG.debug('Available roles: %s', list(role_map.keys()))
- admin_role_id = role_map.get(CONF.identity.admin_role)
- rbac_role_id = role_map.get(CONF.patrole.rbac_test_role)
+ rbac_role_ids = []
+ roles = CONF.patrole.rbac_test_roles
+ # TODO(vegasq) drop once CONF.patrole.rbac_test_role is removed
+ if CONF.patrole.rbac_test_role:
+ if not roles:
+ roles.append(CONF.patrole.rbac_test_role)
- if not all([admin_role_id, rbac_role_id]):
+ for role_name in roles:
+ rbac_role_ids.append(role_map.get(role_name))
+
+ admin_role_id = role_map.get(CONF.identity.admin_role)
+
+ if not all([admin_role_id, all(rbac_role_ids)]):
missing_roles = []
- msg = ("Could not find `[patrole] rbac_test_role` or "
+ msg = ("Could not find `[patrole] rbac_test_roles` or "
"`[identity] admin_role`, both of which are required for "
"RBAC testing.")
if not admin_role_id:
missing_roles.append(CONF.identity.admin_role)
- if not rbac_role_id:
- missing_roles.append(CONF.patrole.rbac_test_role)
+ if not all(rbac_role_ids):
+ missing_roles += [role_name for role_name in roles
+ if not role_map.get(role_name)]
+
msg += " Following roles were not found: %s." % (
", ".join(missing_roles))
msg += " Available roles: %s." % ", ".join(list(role_map.keys()))
raise rbac_exceptions.RbacResourceSetupFailed(msg)
self.admin_role_id = admin_role_id
- self.rbac_role_id = rbac_role_id
+ self.rbac_role_ids = rbac_role_ids
- def _create_user_role_on_project(self, role_id):
- self.admin_roles_client.create_user_role_on_project(
- self.project_id, self.user_id, role_id)
+ def _create_user_role_on_project(self, role_ids):
+ for role_id in role_ids:
+ self.admin_roles_client.create_user_role_on_project(
+ self.project_id, self.user_id, role_id)
- def _list_and_clear_user_roles_on_project(self, role_id):
+ def _list_and_clear_user_roles_on_project(self, role_ids):
roles = self.admin_roles_client.list_user_roles_on_project(
self.project_id, self.user_id)['roles']
- role_ids = [role['id'] for role in roles]
+ all_role_ids = [role['id'] for role in roles]
- # NOTE(felipemonteiro): We do not use ``role_id in role_ids`` here to
- # avoid over-permission errors: if the current list of roles on the
+ # NOTE(felipemonteiro): We do not use ``role_id in all_role_ids`` here
+ # to avoid over-permission errors: if the current list of roles on the
# project includes "admin" and "Member", and we are switching to the
# "Member" role, then we must delete the "admin" role. Thus, we only
# return early if the user's roles on the project are an exact match.
- if [role_id] == role_ids:
+ if set(role_ids) == set(all_role_ids):
return True
for role in roles:
@@ -279,8 +292,14 @@
def is_admin():
"""Verifies whether the current test role equals the admin role.
- :returns: True if ``rbac_test_role`` is the admin role.
+ :returns: True if ``rbac_test_roles`` contain the admin role.
"""
+ roles = CONF.patrole.rbac_test_roles
+ # TODO(vegasq) drop once CONF.patrole.rbac_test_role is removed
+ if CONF.patrole.rbac_test_role:
+ roles.append(CONF.patrole.rbac_test_role)
+ roles = list(set(roles))
+
# TODO(felipemonteiro): Make this more robust via a context is admin
# lookup.
- return CONF.patrole.rbac_test_role == CONF.identity.admin_role
+ return CONF.identity.admin_role in roles
diff --git a/patrole_tempest_plugin/requirements_authority.py b/patrole_tempest_plugin/requirements_authority.py
index 75df9f4..57caf79 100644
--- a/patrole_tempest_plugin/requirements_authority.py
+++ b/patrole_tempest_plugin/requirements_authority.py
@@ -95,13 +95,14 @@
else:
self.roles_dict = None
- def allowed(self, rule_name, role):
+ def allowed(self, rule_name, roles):
"""Checks if a given rule in a policy is allowed with given role.
:param string rule_name: Rule to be checked using provided requirements
file specified by ``[patrole].custom_requirements_file``. Must be
a key present in this file, under the appropriate component.
- :param string role: Role to validate against custom requirements file.
+ :param List[string] roles: Roles to validate against custom
+ requirements file.
:returns: True if ``role`` is allowed to perform ``rule_name``, else
False.
:rtype: bool
@@ -115,8 +116,7 @@
"formatted.")
try:
_api = self.roles_dict[rule_name]
- return role in _api
+ return all(role in _api for role in roles)
except KeyError:
raise KeyError("'%s' API is not defined in the requirements YAML "
"file" % rule_name)
- return False
diff --git a/patrole_tempest_plugin/tests/unit/fixtures.py b/patrole_tempest_plugin/tests/unit/fixtures.py
index aee36fa..78e87e5 100644
--- a/patrole_tempest_plugin/tests/unit/fixtures.py
+++ b/patrole_tempest_plugin/tests/unit/fixtures.py
@@ -66,7 +66,8 @@
def setUp(self):
super(RbacUtilsFixture, self).setUp()
- self.useFixture(ConfPatcher(rbac_test_role='member', group='patrole'))
+ self.useFixture(ConfPatcher(rbac_test_roles=['member'],
+ group='patrole'))
self.useFixture(ConfPatcher(
admin_role='admin', auth_version='v3', group='identity'))
self.useFixture(ConfPatcher(
@@ -92,7 +93,7 @@
mock_admin_mgr = mock.patch.object(
clients, 'Manager', spec=clients.Manager,
roles_v3_client=mock.Mock(), roles_client=mock.Mock()).start()
- self.roles_v3_client = mock_admin_mgr.return_value.roles_v3_client
+ self.admin_roles_client = mock_admin_mgr.return_value.roles_v3_client
self.set_roles(['admin', 'member'], [])
@@ -153,6 +154,6 @@
for role in roles_on_project]
}
- self.roles_v3_client.list_roles.return_value = available_roles
- self.roles_v3_client.list_user_roles_on_project.return_value = (
+ self.admin_roles_client.list_roles.return_value = available_roles
+ self.admin_roles_client.list_user_roles_on_project.return_value = (
available_project_roles)
diff --git a/patrole_tempest_plugin/tests/unit/test_policy_authority.py b/patrole_tempest_plugin/tests/unit/test_policy_authority.py
index 624c0c5..6a4d219 100644
--- a/patrole_tempest_plugin/tests/unit/test_policy_authority.py
+++ b/patrole_tempest_plugin/tests/unit/test_policy_authority.py
@@ -108,9 +108,60 @@
for rule, role_list in expected.items():
for role in role_list:
- self.assertTrue(authority.allowed(rule, role))
+ self.assertTrue(authority.allowed(rule, [role]))
for role in set(default_roles) - set(role_list):
- self.assertFalse(authority.allowed(rule, role))
+ self.assertFalse(authority.allowed(rule, [role]))
+
+ @mock.patch.object(policy_authority, 'LOG', autospec=True)
+ def _test_custom_multi_roles_policy(self, *args):
+ default_roles = ['zero', 'one', 'two', 'three', 'four',
+ 'five', 'six', 'seven', 'eight', 'nine']
+
+ test_tenant_id = mock.sentinel.tenant_id
+ test_user_id = mock.sentinel.user_id
+ authority = policy_authority.PolicyAuthority(
+ test_tenant_id, test_user_id, "custom_rbac_policy")
+
+ expected = {
+ 'policy_action_1': ['two', 'four', 'six', 'eight'],
+ 'policy_action_2': ['one', 'three', 'five', 'seven', 'nine'],
+ 'policy_action_4': ['one', 'two', 'three', 'five', 'seven'],
+ 'policy_action_5': ['zero', 'one', 'two', 'three', 'four', 'five',
+ 'six', 'seven', 'eight', 'nine'],
+ }
+
+ for rule, role_list in expected.items():
+ allowed_roles_lists = [roles for roles in [
+ role_list[len(role_list) // 2:],
+ role_list[:len(role_list) // 2]] if roles]
+ for test_roles in allowed_roles_lists:
+ self.assertTrue(authority.allowed(rule, test_roles))
+
+ disallowed_roles = list(set(default_roles) - set(role_list))
+ disallowed_roles_lists = [roles for roles in [
+ disallowed_roles[len(disallowed_roles) // 2:],
+ disallowed_roles[:len(disallowed_roles) // 2]] if roles]
+ for test_roles in disallowed_roles_lists:
+ self.assertFalse(authority.allowed(rule, test_roles))
+
+ def test_empty_rbac_test_roles(self):
+ test_tenant_id = mock.sentinel.tenant_id
+ test_user_id = mock.sentinel.user_id
+ authority = policy_authority.PolicyAuthority(
+ test_tenant_id, test_user_id, "custom_rbac_policy")
+
+ disallowed_for_empty_roles = ['policy_action_1', 'policy_action_2',
+ 'policy_action_3', 'policy_action_4',
+ 'policy_action_6']
+
+ # Due to "policy_action_5": "rule:all_rule" / "all_rule": ""
+ allowed_for_empty_roles = ['policy_action_5']
+
+ for rule in disallowed_for_empty_roles:
+ self.assertFalse(authority.allowed(rule, []))
+
+ for rule in allowed_for_empty_roles:
+ self.assertTrue(authority.allowed(rule, []))
def test_custom_policy_json(self):
# The CONF.patrole.custom_policy_files has a path to JSON file by
@@ -122,24 +173,34 @@
custom_policy_files=[self.conf_policy_path_yaml], group='patrole'))
self._test_custom_policy()
+ def test_custom_multi_roles_policy_json(self):
+ # The CONF.patrole.custom_policy_files has a path to JSON file by
+ # default, so we don't need to use ConfPatcher here.
+ self._test_custom_multi_roles_policy()
+
+ def test_custom_multi_roles_policy_yaml(self):
+ self.useFixture(fixtures.ConfPatcher(
+ custom_policy_files=[self.conf_policy_path_yaml], group='patrole'))
+ self._test_custom_multi_roles_policy()
+
def test_admin_policy_file_with_admin_role(self):
test_tenant_id = mock.sentinel.tenant_id
test_user_id = mock.sentinel.user_id
authority = policy_authority.PolicyAuthority(
test_tenant_id, test_user_id, "admin_rbac_policy")
- role = 'admin'
+ roles = ['admin']
allowed_rules = [
'admin_rule', 'is_admin_rule', 'alt_admin_rule'
]
disallowed_rules = ['non_admin_rule']
for rule in allowed_rules:
- allowed = authority.allowed(rule, role)
+ allowed = authority.allowed(rule, roles)
self.assertTrue(allowed)
for rule in disallowed_rules:
- allowed = authority.allowed(rule, role)
+ allowed = authority.allowed(rule, roles)
self.assertFalse(allowed)
def test_admin_policy_file_with_member_role(self):
@@ -148,7 +209,7 @@
authority = policy_authority.PolicyAuthority(
test_tenant_id, test_user_id, "admin_rbac_policy")
- role = 'Member'
+ roles = ['Member']
allowed_rules = [
'non_admin_rule'
]
@@ -156,11 +217,11 @@
'admin_rule', 'is_admin_rule', 'alt_admin_rule']
for rule in allowed_rules:
- allowed = authority.allowed(rule, role)
+ allowed = authority.allowed(rule, roles)
self.assertTrue(allowed)
for rule in disallowed_rules:
- allowed = authority.allowed(rule, role)
+ allowed = authority.allowed(rule, roles)
self.assertFalse(allowed)
def test_alt_admin_policy_file_with_context_is_admin(self):
@@ -169,28 +230,28 @@
authority = policy_authority.PolicyAuthority(
test_tenant_id, test_user_id, "alt_admin_rbac_policy")
- role = 'fake_admin'
+ roles = ['fake_admin']
allowed_rules = ['non_admin_rule']
disallowed_rules = ['admin_rule']
for rule in allowed_rules:
- allowed = authority.allowed(rule, role)
+ allowed = authority.allowed(rule, roles)
self.assertTrue(allowed)
for rule in disallowed_rules:
- allowed = authority.allowed(rule, role)
+ allowed = authority.allowed(rule, roles)
self.assertFalse(allowed)
- role = 'super_admin'
+ roles = ['super_admin']
allowed_rules = ['admin_rule']
disallowed_rules = ['non_admin_rule']
for rule in allowed_rules:
- allowed = authority.allowed(rule, role)
+ allowed = authority.allowed(rule, roles)
self.assertTrue(allowed)
for rule in disallowed_rules:
- allowed = authority.allowed(rule, role)
+ allowed = authority.allowed(rule, roles)
self.assertFalse(allowed)
def test_tenant_user_policy(self):
@@ -208,17 +269,17 @@
# Check whether Member role can perform expected actions.
allowed_rules = ['rule1', 'rule2', 'rule3', 'rule4']
for rule in allowed_rules:
- allowed = authority.allowed(rule, 'Member')
+ allowed = authority.allowed(rule, ['Member'])
self.assertTrue(allowed)
disallowed_rules = ['admin_tenant_rule', 'admin_user_rule']
for disallowed_rule in disallowed_rules:
- self.assertFalse(authority.allowed(disallowed_rule, 'Member'))
+ self.assertFalse(authority.allowed(disallowed_rule, ['Member']))
# Check whether admin role can perform expected actions.
allowed_rules.extend(disallowed_rules)
for rule in allowed_rules:
- allowed = authority.allowed(rule, 'admin')
+ allowed = authority.allowed(rule, ['admin'])
self.assertTrue(allowed)
# Check whether _try_rule is called with the correct target dictionary.
@@ -243,7 +304,7 @@
}
for rule in allowed_rules:
- allowed = authority.allowed(rule, 'Member')
+ allowed = authority.allowed(rule, ['Member'])
self.assertTrue(allowed)
mock_try_rule.assert_called_once_with(
rule, expected_target, expected_access_data, mock.ANY)
@@ -292,7 +353,7 @@
fake_rule, [self.custom_policy_file], "custom_rbac_policy")
e = self.assertRaises(rbac_exceptions.RbacParsingException,
- authority.allowed, fake_rule, None)
+ authority.allowed, fake_rule, [None])
self.assertIn(expected_message, str(e))
m_log.debug.assert_called_once_with(expected_message)
@@ -309,13 +370,13 @@
mock.sentinel.error)})
expected_message = (
- 'Policy action "{0}" not found in policy files: {1} or among '
+ 'Policy action "[{0}]" not found in policy files: {1} or among '
'registered policy in code defaults for {2} service.').format(
mock.sentinel.rule, [self.custom_policy_file],
"custom_rbac_policy")
e = self.assertRaises(rbac_exceptions.RbacParsingException,
- authority.allowed, mock.sentinel.rule, None)
+ authority.allowed, [mock.sentinel.rule], [None])
self.assertIn(expected_message, str(e))
m_log.debug.assert_called_once_with(expected_message)
diff --git a/patrole_tempest_plugin/tests/unit/test_rbac_rule_validation.py b/patrole_tempest_plugin/tests/unit/test_rbac_rule_validation.py
index 1a2c691..1531df1 100644
--- a/patrole_tempest_plugin/tests/unit/test_rbac_rule_validation.py
+++ b/patrole_tempest_plugin/tests/unit/test_rbac_rule_validation.py
@@ -47,7 +47,7 @@
setattr(self.mock_test_args.os_primary, 'credentials', mock_creds)
self.useFixture(
- patrole_fixtures.ConfPatcher(rbac_test_role='Member',
+ patrole_fixtures.ConfPatcher(rbac_test_roles=['member'],
group='patrole'))
# Disable patrole log for unit tests.
self.useFixture(
@@ -55,6 +55,29 @@
group='patrole_log'))
+class BaseRBACMultiRoleRuleValidationTest(base.TestCase):
+
+ def setUp(self):
+ super(BaseRBACMultiRoleRuleValidationTest, self).setUp()
+ self.mock_test_args = mock.Mock(spec=test.BaseTestCase)
+ self.mock_test_args.os_primary = mock.Mock(spec=manager.Manager)
+ self.mock_test_args.rbac_utils = mock.Mock(
+ spec_set=rbac_utils.RbacUtils)
+
+ # Setup credentials for mock client manager.
+ mock_creds = mock.Mock(user_id=mock.sentinel.user_id,
+ project_id=mock.sentinel.project_id)
+ setattr(self.mock_test_args.os_primary, 'credentials', mock_creds)
+
+ self.useFixture(
+ patrole_fixtures.ConfPatcher(
+ rbac_test_roles=['member', 'anotherrole'], group='patrole'))
+ # Disable patrole log for unit tests.
+ self.useFixture(
+ patrole_fixtures.ConfPatcher(enable_reporting=False,
+ group='patrole_log'))
+
+
class RBACRuleValidationTest(BaseRBACRuleValidationTest):
"""Test suite for validating fundamental functionality for the
``rbac_rule_validation`` decorator.
@@ -118,8 +141,8 @@
def test_policy(*args):
raise exceptions.Forbidden()
- test_re = ("Role Member was not allowed to perform the following "
- "actions: \[%s\].*" % (mock.sentinel.action))
+ test_re = ("User with roles \['member'\] was not allowed to perform "
+ "the following actions: \[%s\].*" % (mock.sentinel.action))
self.assertRaisesRegex(
rbac_exceptions.RbacUnderPermissionException, test_re, test_policy,
self.mock_test_args)
@@ -159,11 +182,10 @@
def test_policy(*args):
raise rbac_exceptions.RbacMalformedResponse()
- test_re = ("Role Member was not allowed to perform the following "
- "actions: \[%s\].*" % (mock.sentinel.action))
- self.assertRaisesRegex(
- rbac_exceptions.RbacUnderPermissionException, test_re, test_policy,
- self.mock_test_args)
+ test_re = ("User with roles \['member'\] was not allowed to perform "
+ "the following actions: \[%s\]. " % (mock.sentinel.action))
+ self.assertRaisesRegex(rbac_exceptions.RbacUnderPermissionException,
+ test_re, test_policy, self.mock_test_args)
self.assertRegex(mock_log.error.mock_calls[0][1][0], test_re)
@mock.patch.object(rbac_rv, 'LOG', autospec=True)
@@ -214,8 +236,8 @@
raise exceptions.NotFound()
expected_errors = [
- ("Role Member was not allowed to perform the following "
- "actions: \['%s'\].*" % policy_names[0]),
+ ("User with roles \['member'\] was not allowed to perform the "
+ "following actions: \['%s'\].*" % policy_names[0]),
None
]
@@ -348,6 +370,108 @@
mock_log.error.reset_mock()
+class RBACMultiRoleRuleValidationTest(BaseRBACMultiRoleRuleValidationTest,
+ RBACRuleValidationTest):
+ @mock.patch.object(rbac_rv, 'LOG', autospec=True)
+ @mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
+ def test_rule_validation_forbidden_negative(self, mock_authority,
+ mock_log):
+ """Test RbacUnderPermissionException error is thrown and have
+ permission fails.
+
+ Negative test case: if Forbidden is thrown and the user should be
+ allowed to perform the action, then the RbacUnderPermissionException
+ exception should be raised.
+ """
+ mock_authority.PolicyAuthority.return_value.allowed.return_value = True
+
+ @rbac_rv.action(mock.sentinel.service, rules=[mock.sentinel.action])
+ def test_policy(*args):
+ raise exceptions.Forbidden()
+
+ test_re = ("User with roles \['member', 'anotherrole'\] was not "
+ "allowed to perform the following actions: \[%s\].*" %
+ (mock.sentinel.action))
+ self.assertRaisesRegex(
+ rbac_exceptions.RbacUnderPermissionException, test_re, test_policy,
+ self.mock_test_args)
+ self.assertRegex(mock_log.error.mock_calls[0][1][0], test_re)
+
+ @mock.patch.object(rbac_rv, 'LOG', autospec=True)
+ @mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
+ def test_rule_validation_rbac_malformed_response_negative(
+ self, mock_authority, mock_log):
+ """Test RbacMalformedResponse error is thrown with permission fails.
+
+ Negative test case: if RbacMalformedResponse is thrown and the user is
+ allowed to perform the action, then this is an expected failure.
+ """
+ mock_authority.PolicyAuthority.return_value.allowed.return_value = True
+
+ @rbac_rv.action(mock.sentinel.service, rules=[mock.sentinel.action])
+ def test_policy(*args):
+ raise rbac_exceptions.RbacMalformedResponse()
+
+ test_re = ("User with roles \['member', 'anotherrole'\] was not "
+ "allowed to perform the following actions: \[%s\]. " %
+ (mock.sentinel.action))
+ self.assertRaisesRegex(rbac_exceptions.RbacUnderPermissionException,
+ test_re, test_policy, self.mock_test_args)
+ self.assertRegex(mock_log.error.mock_calls[0][1][0], test_re)
+
+ @mock.patch.object(rbac_rv, 'LOG', autospec=True)
+ @mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
+ def test_expect_not_found_and_raise_not_found(self, mock_authority,
+ mock_log):
+ """Test that expecting 404 and getting 404 works for all scenarios.
+
+ Tests the following scenarios:
+ 1) Test no permission and 404 is expected and 404 is thrown succeeds.
+ 2) Test have permission and 404 is expected and 404 is thrown fails.
+
+ In both cases, a LOG.warning is called with the "irregular message"
+ that signals to user that a 404 was expected and caught.
+ """
+ policy_names = ['foo:bar']
+
+ @rbac_rv.action(mock.sentinel.service, rules=policy_names,
+ expected_error_codes=[404])
+ def test_policy(*args):
+ raise exceptions.NotFound()
+
+ expected_errors = [
+ ("User with roles \['member', 'anotherrole'\] was not allowed to "
+ "perform the following actions: \['%s'\].*" % policy_names[0]),
+ None
+ ]
+
+ for pos, allowed in enumerate([True, False]):
+ mock_authority.PolicyAuthority.return_value.allowed\
+ .return_value = allowed
+
+ error_re = expected_errors[pos]
+
+ if error_re:
+ self.assertRaisesRegex(
+ rbac_exceptions.RbacUnderPermissionException, error_re,
+ test_policy, self.mock_test_args)
+ self.assertRegex(mock_log.error.mock_calls[0][1][0], error_re)
+ else:
+ test_policy(self.mock_test_args)
+ mock_log.error.assert_not_called()
+
+ mock_log.warning.assert_called_with(
+ "NotFound exception was caught for test %s. Expected policies "
+ "which may have caused the error: %s. The service %s throws a "
+ "404 instead of a 403, which is irregular",
+ test_policy.__name__,
+ ', '.join(policy_names),
+ mock.sentinel.service)
+
+ mock_log.warning.reset_mock()
+ mock_log.error.reset_mock()
+
+
class RBACRuleValidationLoggingTest(BaseRBACRuleValidationTest):
"""Test class for validating the RBAC log, dedicated to just logging
Patrole RBAC validation work flows.
@@ -422,7 +546,7 @@
policy_authority = mock_authority.PolicyAuthority.return_value
policy_authority.allowed.assert_called_with(
mock.sentinel.action,
- CONF.patrole.rbac_test_role)
+ CONF.patrole.rbac_test_roles)
mock_log.error.assert_not_called()
@@ -454,18 +578,23 @@
policy_authority = mock_authority.PolicyAuthority.return_value
policy_authority.allowed.assert_called_with(
"foo",
- CONF.patrole.rbac_test_role)
+ CONF.patrole.rbac_test_roles)
policy_authority.allowed.reset_mock()
test_bar_policy(self.mock_test_args)
policy_authority = mock_authority.PolicyAuthority.return_value
policy_authority.allowed.assert_called_with(
"qux",
- CONF.patrole.rbac_test_role)
+ CONF.patrole.rbac_test_roles)
mock_log.error.assert_not_called()
+class RBACMultiRoleRuleValidationLoggingTest(
+ BaseRBACMultiRoleRuleValidationTest, RBACRuleValidationLoggingTest):
+ pass
+
+
class RBACRuleValidationNegativeTest(BaseRBACRuleValidationTest):
def setUp(self):
@@ -488,6 +617,11 @@
test_policy, self.mock_test_args)
+class RBACMultiRoleRuleValidationNegativeTest(
+ BaseRBACMultiRoleRuleValidationTest, RBACRuleValidationNegativeTest):
+ pass
+
+
class RBACRuleValidationTestMultiPolicy(BaseRBACRuleValidationTest):
"""Test suite for validating multi-policy support for the
``rbac_rule_validation`` decorator.
@@ -502,7 +636,7 @@
def _assert_policy_authority_called_with(self, rules, mock_authority):
m_authority = mock_authority.PolicyAuthority.return_value
m_authority.allowed.assert_has_calls([
- mock.call(rule, CONF.patrole.rbac_test_role) for rule in rules
+ mock.call(rule, CONF.patrole.rbac_test_roles) for rule in rules
])
@mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
@@ -614,10 +748,10 @@
mock_authority.PolicyAuthority.return_value.allowed\
.return_value = True
- error_re = ("Role Member was not allowed to perform the following "
- "actions: %s. Expected allowed actions: %s. Expected "
- "disallowed actions: []." % (rules, rules)).replace(
- '[', '\[').replace(']', '\]')
+ error_re = ("User with roles ['member'] was not allowed to perform "
+ "the following actions: %s. Expected allowed actions: %s. "
+ "Expected disallowed actions: []." %
+ (rules, rules)).replace('[', '\[').replace(']', '\]')
self.assertRaisesRegex(
rbac_exceptions.RbacUnderPermissionException, error_re,
test_policy, self.mock_test_args)
@@ -739,6 +873,39 @@
self.assertRaisesRegex(ValueError, error_re, _do_test, None, [404])
+class RBACMultiRoleRuleValidationTestMultiPolicy(
+ BaseRBACMultiRoleRuleValidationTest, RBACRuleValidationTestMultiPolicy):
+ @mock.patch.object(rbac_rv, 'LOG', autospec=True)
+ @mock.patch.object(rbac_rv, 'policy_authority', autospec=True)
+ def test_rule_validation_multi_policy_forbidden_failure(
+ self, mock_authority, mock_log):
+ """Test that when the expected result is authorized and the test
+ fails (with a Forbidden error code) that the overall evaluation
+ results in a RbacUnderPermissionException getting raised.
+ """
+
+ # NOTE: Avoid mock.sentinel here due to weird sorting with them.
+ rules = ['action1', 'action2', 'action3']
+
+ @rbac_rv.action(mock.sentinel.service, rules=rules,
+ expected_error_codes=[403, 403, 403])
+ def test_policy(*args):
+ raise exceptions.Forbidden()
+
+ mock_authority.PolicyAuthority.return_value.allowed\
+ .return_value = True
+
+ error_re = ("User with roles ['member', 'anotherrole'] was not "
+ "allowed to perform the following actions: %s. Expected "
+ "allowed actions: %s. Expected disallowed actions: []." %
+ (rules, rules)).replace('[', '\[').replace(']', '\]')
+ self.assertRaisesRegex(
+ rbac_exceptions.RbacUnderPermissionException, error_re,
+ test_policy, self.mock_test_args)
+ self.assertRegex(mock_log.error.mock_calls[0][1][0], error_re)
+ self._assert_policy_authority_called_with(rules, mock_authority)
+
+
class RBACOverrideRoleValidationTest(BaseRBACRuleValidationTest):
"""Class for validating that untimely exceptions (outside
``override_role`` is called) result in test failures.
diff --git a/patrole_tempest_plugin/tests/unit/test_rbac_utils.py b/patrole_tempest_plugin/tests/unit/test_rbac_utils.py
index 5132079..bd13e34 100644
--- a/patrole_tempest_plugin/tests/unit/test_rbac_utils.py
+++ b/patrole_tempest_plugin/tests/unit/test_rbac_utils.py
@@ -52,7 +52,7 @@
self.rbac_utils.override_role()
mock_test_obj = self.rbac_utils.mock_test_obj
- roles_client = self.rbac_utils.roles_v3_client
+ roles_client = self.rbac_utils.admin_roles_client
mock_time = self.rbac_utils.mock_time
roles_client.create_user_role_on_project.assert_called_once_with(
@@ -67,7 +67,7 @@
self.rbac_utils.set_roles(['admin', 'member'], 'admin')
self.rbac_utils.override_role()
- roles_client = self.rbac_utils.roles_v3_client
+ roles_client = self.rbac_utils.admin_roles_client
mock_time = self.rbac_utils.mock_time
roles_client.create_user_role_on_project.assert_not_called()
@@ -77,7 +77,7 @@
self.rbac_utils.override_role(True)
mock_test_obj = self.rbac_utils.mock_test_obj
- roles_client = self.rbac_utils.roles_v3_client
+ roles_client = self.rbac_utils.admin_roles_client
mock_time = self.rbac_utils.mock_time
roles_client.create_user_role_on_project.assert_has_calls([
@@ -96,7 +96,7 @@
self.rbac_utils.set_roles(['admin', 'member'], 'member')
self.rbac_utils.override_role(True)
- roles_client = self.rbac_utils.roles_v3_client
+ roles_client = self.rbac_utils.admin_roles_client
mock_time = self.rbac_utils.mock_time
roles_client.create_user_role_on_project.assert_has_calls([
@@ -109,7 +109,7 @@
self.rbac_utils.override_role(True, False)
mock_test_obj = self.rbac_utils.mock_test_obj
- roles_client = self.rbac_utils.roles_v3_client
+ roles_client = self.rbac_utils.admin_roles_client
mock_time = self.rbac_utils.mock_time
roles_client.create_user_role_on_project.assert_has_calls([
@@ -133,7 +133,7 @@
self.rbac_utils.set_roles(['admin', 'member'], ['member', 'random'])
self.rbac_utils.override_role()
- roles_client = self.rbac_utils.roles_v3_client
+ roles_client = self.rbac_utils.admin_roles_client
roles_client.list_user_roles_on_project.assert_called_once_with(
self.rbac_utils.PROJECT_ID, self.rbac_utils.USER_ID)
@@ -169,7 +169,8 @@
mock_override_role.assert_called_once_with(_rbac_utils, test_obj,
False)
- @mock.patch.object(rbac_utils.RbacUtils, '_override_role', autospec=True)
+ @mock.patch.object(rbac_utils.RbacUtils, '_override_role',
+ autospec=True)
def test_override_role_context_manager_simulate_fail(self,
mock_override_role):
"""Validate that expected override_role calls are made when switching
diff --git a/patrole_tempest_plugin/tests/unit/test_requirements_authority.py b/patrole_tempest_plugin/tests/unit/test_requirements_authority.py
index 1fb9636..4b75ef5 100644
--- a/patrole_tempest_plugin/tests/unit/test_requirements_authority.py
+++ b/patrole_tempest_plugin/tests/unit/test_requirements_authority.py
@@ -38,11 +38,11 @@
def test_auth_allowed_empty_roles(self):
self.rbac_auth.roles_dict = None
self.assertRaises(exceptions.InvalidConfiguration,
- self.rbac_auth.allowed, "", "")
+ self.rbac_auth.allowed, "", [""])
def test_auth_allowed_role_in_api(self):
self.rbac_auth.roles_dict = {'api': ['_member_']}
- self.assertTrue(self.rbac_auth.allowed("api", "_member_"))
+ self.assertTrue(self.rbac_auth.allowed("api", ["_member_"]))
def test_auth_allowed_role_not_in_api(self):
self.rbac_auth.roles_dict = {'api': ['_member_']}
@@ -64,7 +64,8 @@
self.rbac_auth.roles_dict = req_auth.RequirementsParser.parse("Test")
self.assertEqual(self.expected_result, self.rbac_auth.roles_dict)
- self.assertTrue(self.rbac_auth.allowed("test:create2", "test_member"))
+ self.assertTrue(
+ self.rbac_auth.allowed("test:create2", ["test_member"]))
def test_parser_role_not_in_api(self):
req_auth.RequirementsParser.Inner._rbac_map = \
@@ -82,4 +83,4 @@
self.assertIsNone(self.rbac_auth.roles_dict)
self.assertRaises(exceptions.InvalidConfiguration,
- self.rbac_auth.allowed, "", "")
+ self.rbac_auth.allowed, "", [""])