Fix OverPermission exception for keystone tests
Extend the roles in `access_token` according to the implementation[0]
of the bp basic-default-roles:
`admin` implies `member` implies `reader`
Support deprecated rules.
[0] Ie18a269e3d1075d955fe494acaf634a393c6bd7b
Change-Id: I0d0de2a20b03548a7e5dab1ee7af7b72651abcb6
Story: 2004709
Task: 28740
diff --git a/patrole_tempest_plugin/config.py b/patrole_tempest_plugin/config.py
index 62337f7..63f0a8a 100644
--- a/patrole_tempest_plugin/config.py
+++ b/patrole_tempest_plugin/config.py
@@ -96,6 +96,11 @@
* add_image
allowed_role = the ``oslo.policy`` role that is allowed to perform the API.
+"""),
+ cfg.BoolOpt('validate_deprecated_rules', default=True,
+ help="""Some of the policy rules have deprecated version,
+Patrole should be able to run check against default and deprecated rules,
+otherwise the result of the tests may not be correct.
""")
]
diff --git a/patrole_tempest_plugin/policy_authority.py b/patrole_tempest_plugin/policy_authority.py
index e0a26a3..fd7b9f7 100644
--- a/patrole_tempest_plugin/policy_authority.py
+++ b/patrole_tempest_plugin/policy_authority.py
@@ -169,6 +169,27 @@
is_admin=is_admin_context)
return is_allowed
+ def _handle_deprecated_rule(self, default):
+ deprecated_rule = default.deprecated_rule
+ deprecated_msg = (
+ 'Policy "%(old_name)s":"%(old_check_str)s" was deprecated in '
+ '%(release)s in favor of "%(name)s":"%(check_str)s". Reason: '
+ '%(reason)s. Either ensure your deployment is ready for the new '
+ 'default or copy/paste the deprecated policy into your policy '
+ 'file and maintain it manually.' % {
+ 'old_name': deprecated_rule.name,
+ 'old_check_str': deprecated_rule.check_str,
+ 'release': default.deprecated_since,
+ 'name': default.name,
+ 'check_str': default.check_str,
+ 'reason': default.deprecated_reason
+ }
+ )
+ LOG.warn(deprecated_msg)
+ check_str = '(%s) or (%s)' % (default.check_str,
+ deprecated_rule.check_str)
+ return policy.RuleDefault(default.name, check_str)
+
def get_rules(self):
rules = policy.Rules()
# Check whether policy file exists and attempt to read it.
@@ -203,6 +224,12 @@
if self.service in policy_generator:
for rule in policy_generator[self.service]:
if rule.name not in rules:
+ if CONF.patrole.validate_deprecated_rules:
+ # NOTE (sergey.vilgelm):
+ # The `DocumentedRuleDefault` object has no
+ # `deprecated_rule` attribute in Pike
+ if getattr(rule, 'deprecated_rule', False):
+ rule = self._handle_deprecated_rule(rule)
rules[rule.name] = rule.check
elif str(rule.check) != str(rules[rule.name]):
msg = ("The same policy name: %s was found in the "
@@ -238,13 +265,17 @@
return CONF.identity.admin_role in roles
def _get_access_token(self, roles):
+ roles = {r.lower() for r in roles if r}
+
+ # Extend roles for an user with admin or member role
+ if 'admin' in roles:
+ roles.add('member')
+ if 'member' in roles:
+ roles.add('reader')
+
access_token = {
"token": {
- "roles": [
- {
- "name": role
- } for role in roles
- ],
+ "roles": [{'name': r} for r in roles],
"project_id": self.project_id,
"tenant_id": self.project_id,
"user_id": self.user_id
diff --git a/patrole_tempest_plugin/tests/unit/resources/admin_rbac_policy.json b/patrole_tempest_plugin/tests/unit/resources/admin_rbac_policy.json
index 7828921..d6d9605 100644
--- a/patrole_tempest_plugin/tests/unit/resources/admin_rbac_policy.json
+++ b/patrole_tempest_plugin/tests/unit/resources/admin_rbac_policy.json
@@ -2,5 +2,6 @@
"admin_rule": "role:admin",
"is_admin_rule": "is_admin:True",
"alt_admin_rule": "is_admin:True or (role:admin and is_project_admin:True)",
- "non_admin_rule": "role:Member"
+ "member_rule": "role:member",
+ "reader_rule": "role:reader"
}
diff --git a/patrole_tempest_plugin/tests/unit/test_policy_authority.py b/patrole_tempest_plugin/tests/unit/test_policy_authority.py
index 6a4d219..90e45f9 100644
--- a/patrole_tempest_plugin/tests/unit/test_policy_authority.py
+++ b/patrole_tempest_plugin/tests/unit/test_policy_authority.py
@@ -83,9 +83,29 @@
for name, check in rules.items():
fake_rule = mock.Mock(check=check, __name__='foo')
fake_rule.name = name
+ fake_rule.deprecated_rule = False
fake_rules.append(fake_rule)
return fake_rules
+ def _test_policy_file(self, roles, allowed_rules,
+ disallowed_rules, authority=None, service=None):
+ if authority is None:
+ self.assertIsNotNone(service)
+ test_tenant_id = mock.sentinel.tenant_id
+ test_user_id = mock.sentinel.user_id
+ authority = policy_authority.PolicyAuthority(
+ test_tenant_id, test_user_id, service)
+
+ for rule in allowed_rules:
+ allowed = authority.allowed(rule, roles)
+ self.assertTrue(allowed)
+
+ for rule in disallowed_rules:
+ allowed = authority.allowed(rule, roles)
+ self.assertFalse(allowed)
+
+ return authority
+
@mock.patch.object(policy_authority, 'LOG', autospec=True)
def _test_custom_policy(self, *args):
default_roles = ['zero', 'one', 'two', 'three', 'four',
@@ -145,23 +165,14 @@
self.assertFalse(authority.allowed(rule, test_roles))
def test_empty_rbac_test_roles(self):
- test_tenant_id = mock.sentinel.tenant_id
- test_user_id = mock.sentinel.user_id
- authority = policy_authority.PolicyAuthority(
- test_tenant_id, test_user_id, "custom_rbac_policy")
-
- disallowed_for_empty_roles = ['policy_action_1', 'policy_action_2',
- 'policy_action_3', 'policy_action_4',
- 'policy_action_6']
-
- # Due to "policy_action_5": "rule:all_rule" / "all_rule": ""
- allowed_for_empty_roles = ['policy_action_5']
-
- for rule in disallowed_for_empty_roles:
- self.assertFalse(authority.allowed(rule, []))
-
- for rule in allowed_for_empty_roles:
- self.assertTrue(authority.allowed(rule, []))
+ self._test_policy_file(
+ roles=[],
+ allowed_rules=['policy_action_5'],
+ disallowed_rules=['policy_action_1', 'policy_action_2',
+ 'policy_action_3', 'policy_action_4',
+ 'policy_action_6'],
+ service="custom_rbac_policy"
+ )
def test_custom_policy_json(self):
# The CONF.patrole.custom_policy_files has a path to JSON file by
@@ -184,75 +195,47 @@
self._test_custom_multi_roles_policy()
def test_admin_policy_file_with_admin_role(self):
- test_tenant_id = mock.sentinel.tenant_id
- test_user_id = mock.sentinel.user_id
- authority = policy_authority.PolicyAuthority(
- test_tenant_id, test_user_id, "admin_rbac_policy")
-
- roles = ['admin']
- allowed_rules = [
- 'admin_rule', 'is_admin_rule', 'alt_admin_rule'
- ]
- disallowed_rules = ['non_admin_rule']
-
- for rule in allowed_rules:
- allowed = authority.allowed(rule, roles)
- self.assertTrue(allowed)
-
- for rule in disallowed_rules:
- allowed = authority.allowed(rule, roles)
- self.assertFalse(allowed)
+ # admin role implies member and reader roles
+ self._test_policy_file(
+ roles=['admin'],
+ allowed_rules=['admin_rule', 'is_admin_rule', 'alt_admin_rule',
+ 'member_rule', 'reader_rule'],
+ disallowed_rules=[],
+ service="admin_rbac_policy"
+ )
def test_admin_policy_file_with_member_role(self):
- test_tenant_id = mock.sentinel.tenant_id
- test_user_id = mock.sentinel.user_id
- authority = policy_authority.PolicyAuthority(
- test_tenant_id, test_user_id, "admin_rbac_policy")
+ # member role implies reader role
+ self._test_policy_file(
+ roles=['member'],
+ allowed_rules=['member_rule', 'reader_rule'],
+ disallowed_rules=['admin_rule', 'is_admin_rule', 'alt_admin_rule'],
+ service="admin_rbac_policy"
+ )
- roles = ['Member']
- allowed_rules = [
- 'non_admin_rule'
- ]
- disallowed_rules = [
- 'admin_rule', 'is_admin_rule', 'alt_admin_rule']
-
- for rule in allowed_rules:
- allowed = authority.allowed(rule, roles)
- self.assertTrue(allowed)
-
- for rule in disallowed_rules:
- allowed = authority.allowed(rule, roles)
- self.assertFalse(allowed)
+ def test_admin_policy_file_with_reader_role(self):
+ self._test_policy_file(
+ roles=['reader'],
+ allowed_rules=['reader_rule'],
+ disallowed_rules=['admin_rule', 'is_admin_rule', 'alt_admin_rule',
+ 'member_rule'],
+ service="admin_rbac_policy"
+ )
def test_alt_admin_policy_file_with_context_is_admin(self):
- test_tenant_id = mock.sentinel.tenant_id
- test_user_id = mock.sentinel.user_id
- authority = policy_authority.PolicyAuthority(
- test_tenant_id, test_user_id, "alt_admin_rbac_policy")
+ authority = self._test_policy_file(
+ roles=['fake_admin'],
+ allowed_rules=['non_admin_rule'],
+ disallowed_rules=['admin_rule'],
+ service="alt_admin_rbac_policy"
+ )
- roles = ['fake_admin']
- allowed_rules = ['non_admin_rule']
- disallowed_rules = ['admin_rule']
-
- for rule in allowed_rules:
- allowed = authority.allowed(rule, roles)
- self.assertTrue(allowed)
-
- for rule in disallowed_rules:
- allowed = authority.allowed(rule, roles)
- self.assertFalse(allowed)
-
- roles = ['super_admin']
- allowed_rules = ['admin_rule']
- disallowed_rules = ['non_admin_rule']
-
- for rule in allowed_rules:
- allowed = authority.allowed(rule, roles)
- self.assertTrue(allowed)
-
- for rule in disallowed_rules:
- allowed = authority.allowed(rule, roles)
- self.assertFalse(allowed)
+ self._test_policy_file(
+ roles=['super_admin'],
+ allowed_rules=['admin_rule'],
+ disallowed_rules=['non_admin_rule'],
+ authority=authority
+ )
def test_tenant_user_policy(self):
"""Test whether rules with format tenant_id/user_id formatting work.
@@ -261,26 +244,25 @@
network:tenant_id pass. And test whether Nova rules that contain
user_id pass.
"""
- test_tenant_id = mock.sentinel.tenant_id
- test_user_id = mock.sentinel.user_id
- authority = policy_authority.PolicyAuthority(
- test_tenant_id, test_user_id, "tenant_rbac_policy")
+ allowed_rules = ['rule1', 'rule2', 'rule3', 'rule4']
+ disallowed_rules = ['admin_tenant_rule', 'admin_user_rule']
# Check whether Member role can perform expected actions.
- allowed_rules = ['rule1', 'rule2', 'rule3', 'rule4']
- for rule in allowed_rules:
- allowed = authority.allowed(rule, ['Member'])
- self.assertTrue(allowed)
-
- disallowed_rules = ['admin_tenant_rule', 'admin_user_rule']
- for disallowed_rule in disallowed_rules:
- self.assertFalse(authority.allowed(disallowed_rule, ['Member']))
+ authority = self._test_policy_file(
+ roles=['member'],
+ allowed_rules=allowed_rules,
+ disallowed_rules=disallowed_rules,
+ service="tenant_rbac_policy",
+ )
# Check whether admin role can perform expected actions.
allowed_rules.extend(disallowed_rules)
- for rule in allowed_rules:
- allowed = authority.allowed(rule, ['admin'])
- self.assertTrue(allowed)
+ self._test_policy_file(
+ roles=['admin'],
+ allowed_rules=allowed_rules,
+ disallowed_rules=[],
+ authority=authority
+ )
# Check whether _try_rule is called with the correct target dictionary.
with mock.patch.object(
@@ -295,7 +277,7 @@
}
expected_access_data = {
- "roles": ['Member'],
+ "roles": sorted(['member', 'reader']),
"is_admin": False,
"is_admin_project": True,
"user_id": mock.sentinel.user_id,
@@ -304,8 +286,11 @@
}
for rule in allowed_rules:
- allowed = authority.allowed(rule, ['Member'])
+ allowed = authority.allowed(rule, ['member'])
self.assertTrue(allowed)
+ # for sure that roles are in same order
+ mock_try_rule.call_args[0][2]["roles"] = sorted(
+ mock_try_rule.call_args[0][2]["roles"])
mock_try_rule.assert_called_once_with(
rule, expected_target, expected_access_data, mock.ANY)
mock_try_rule.reset_mock()