Merge "Improve is_admin support in Patrole converter framework."
diff --git a/patrole_tempest_plugin/rbac_role_converter.py b/patrole_tempest_plugin/rbac_role_converter.py
index 420de7f..fb0d2fe 100644
--- a/patrole_tempest_plugin/rbac_role_converter.py
+++ b/patrole_tempest_plugin/rbac_role_converter.py
@@ -17,6 +17,7 @@
import os
from oslo_log import log as logging
+from oslo_policy import generator
from oslo_policy import policy
from tempest import config
@@ -39,51 +40,73 @@
def __init__(self, tenant_id, service, path=None):
"""Initialization of Policy Converter.
- Parse policy files to create dictionary mapping policy actions to
- roles.
+ Parses a policy file to create a dictionary, mapping policy actions to
+ roles. If a policy file does not exist, checks whether the policy file
+ is registered as a namespace under oslo.policy.policies. Nova, for
+ example, doesn't use a policy.json file by default; its policy is
+ implemented in code and registered as 'nova' under
+ oslo.policy.policies.
+
+ If the policy file is not found in either place, raises an exception.
+
+ Additionally, if the policy file exists in both code and as a
+ policy.json (for example, by creating a custom nova policy.json file),
+ the custom policy file over the default policy implementation is
+ prioritized.
:param tenant_id: type uuid
:param service: type string
:param path: type string
"""
+ service = service.lower().strip()
if path is None:
self.path = '/etc/{0}/policy.json'.format(service)
else:
self.path = path
- if not os.path.isfile(self.path):
- raise rbac_exceptions.RbacResourceSetupFailed(
- 'Policy file for service: {0}, {1} not found.'
- .format(service, self.path))
+ policy_data = "{}"
+ # First check whether policy file exists.
+ if os.path.isfile(self.path):
+ policy_data = open(self.path, 'r').read()
+ # Otherwise use oslo_policy to fetch the rules for provided service.
+ else:
+ policy_generator = generator._get_policies_dict([service])
+ if policy_generator and service in policy_generator:
+ policy_data = "{\n"
+ for r in policy_generator[service]:
+ policy_data = policy_data + r.__str__() + ",\n"
+ policy_data = policy_data[:-2] + "\n}"
+ # Otherwise raise an exception.
+ else:
+ raise rbac_exceptions.RbacResourceSetupFailed(
+ 'Policy file for service: {0}, {1} not found.'
+ .format(service, self.path))
+
+ self.rules = policy.Rules.load(policy_data, "default")
self.tenant_id = tenant_id
def allowed(self, rule_name, role):
- policy_file = open(self.path, 'r')
- access_token = self._get_access_token(role)
-
+ is_admin_context = self._is_admin_context(role)
is_allowed = self._allowed(
- policy_file=policy_file,
- access=access_token,
+ access=self._get_access_token(role),
apply_rule=rule_name,
- is_admin=False)
+ is_admin=is_admin_context)
- policy_file = open(self.path, 'r')
- access_token = self._get_access_token(role)
- allowed_as_admin_context = self._allowed(
- policy_file=policy_file,
- access=access_token,
- apply_rule=rule_name,
- is_admin=True)
+ return is_allowed
- if allowed_as_admin_context and is_allowed:
- return True
- if allowed_as_admin_context and not is_allowed:
- return False
- if not allowed_as_admin_context and is_allowed:
- return True
- if not allowed_as_admin_context and not is_allowed:
- return False
+ def _is_admin_context(self, role):
+ """Checks whether a role has admin context.
+
+ If context_is_admin is contained in the policy file, then checks
+ whether the given role is contained in context_is_admin. If it is not
+ in the policy file, then default to context_is_admin: admin.
+ """
+ if 'context_is_admin' in self.rules.keys():
+ return self._allowed(
+ access=self._get_access_token(role),
+ apply_rule='context_is_admin')
+ return role == 'admin'
def _get_access_token(self, role):
access_token = {
@@ -100,12 +123,11 @@
}
return access_token
- def _allowed(self, policy_file, access, apply_rule, is_admin=False):
+ def _allowed(self, access, apply_rule, is_admin=False):
"""Checks if a given rule in a policy is allowed with given access.
Adapted from oslo_policy.shell.
- :param policy file: type string: path to policy file
:param access: type dict: dictionary from ``_get_access_token``
:param apply_rule: type string: rule to be checked
:param is_admin: type bool: whether admin context is used
@@ -114,24 +136,30 @@
access_data['roles'] = [role['name'] for role in access_data['roles']]
access_data['project_id'] = access_data['project']['id']
access_data['is_admin'] = is_admin
- policy_data = policy_file.read()
- rules = policy.Rules.load(policy_data, "default")
+ # TODO(felipemonteiro): Dynamically calculate is_admin_project rather
+ # than hard-coding it to True. is_admin_project cannot be determined
+ # from the role, but rather from project and domain names. See
+ # _populate_is_admin_project in keystone.token.providers.common
+ # for more information.
+ access_data['is_admin_project'] = True
class Object(object):
pass
o = Object()
- o.rules = rules
+ o.rules = self.rules
target = {"project_id": access_data['project_id']}
- key = apply_rule
- rule = rules[apply_rule]
- result = self._try_rule(key, rule, target, access_data, o)
+ result = self._try_rule(apply_rule, target, access_data, o)
return result
- def _try_rule(self, key, rule, target, access_data, o):
+ def _try_rule(self, apply_rule, target, access_data, o):
try:
+ rule = self.rules[apply_rule]
return rule(target, access_data, o)
+ except KeyError as e:
+ LOG.debug("{0} not found in policy file.".format(apply_rule))
+ return False
except Exception as e:
- LOG.debug("Exception: {0} for rule: {1}".format(e, rule))
+ LOG.debug("Exception: {0} for rule: {1}.".format(e, rule))
return False
diff --git a/tests/resources/alt_admin_rbac_policy.json b/tests/resources/alt_admin_rbac_policy.json
new file mode 100644
index 0000000..bf07182
--- /dev/null
+++ b/tests/resources/alt_admin_rbac_policy.json
@@ -0,0 +1,5 @@
+{
+ "context_is_admin": "role:super_admin",
+ "admin_rule": "role:super_admin",
+ "non_admin_rule": "role:fake_admin"
+}
diff --git a/tests/test_rbac_role_converter.py b/tests/test_rbac_role_converter.py
index 04eb626..f3a97ab 100644
--- a/tests/test_rbac_role_converter.py
+++ b/tests/test_rbac_role_converter.py
@@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
+import mock
import os
from tempest import config
@@ -35,8 +36,12 @@
self.admin_policy_file = os.path.join(current_directory,
'resources',
'admin_rbac_policy.json')
+ self.alt_admin_policy_file = os.path.join(current_directory,
+ 'resources',
+ 'alt_admin_rbac_policy.json')
- def test_custom_policy(self):
+ @mock.patch.object(rbac_role_converter, 'LOG', autospec=True)
+ def test_custom_policy(self, m_log):
default_roles = ['zero', 'one', 'two', 'three', 'four',
'five', 'six', 'seven', 'eight', 'nine']
@@ -56,7 +61,10 @@
fake_rule = 'fake_rule'
for role in default_roles:
- self.assertRaises(KeyError, converter.allowed, fake_rule, role)
+ self.assertFalse(converter.allowed(fake_rule, role))
+ m_log.debug.assert_called_once_with(
+ "{0} not found in policy file.".format('fake_rule'))
+ m_log.debug.reset_mock()
for rule, role_list in expected.items():
for role in role_list:
@@ -70,10 +78,9 @@
role = 'admin'
allowed_rules = [
- 'admin_rule'
+ 'admin_rule', 'is_admin_rule', 'alt_admin_rule'
]
- disallowed_rules = [
- 'is_admin_rule', 'alt_admin_rule', 'non_admin_rule']
+ disallowed_rules = ['non_admin_rule']
for rule in allowed_rules:
allowed = converter.allowed(rule, role)
@@ -101,3 +108,31 @@
for rule in disallowed_rules:
allowed = converter.allowed(rule, role)
self.assertFalse(allowed)
+
+ def test_admin_policy_file_with_context_is_admin(self):
+ converter = rbac_role_converter.RbacPolicyConverter(
+ None, "test", self.alt_admin_policy_file)
+
+ role = 'fake_admin'
+ allowed_rules = ['non_admin_rule']
+ disallowed_rules = ['admin_rule']
+
+ for rule in allowed_rules:
+ allowed = converter.allowed(rule, role)
+ self.assertTrue(allowed)
+
+ for rule in disallowed_rules:
+ allowed = converter.allowed(rule, role)
+ self.assertFalse(allowed)
+
+ role = 'super_admin'
+ allowed_rules = ['admin_rule']
+ disallowed_rules = ['non_admin_rule']
+
+ for rule in allowed_rules:
+ allowed = converter.allowed(rule, role)
+ self.assertTrue(allowed)
+
+ for rule in disallowed_rules:
+ allowed = converter.allowed(rule, role)
+ self.assertFalse(allowed)