Improve is_admin support in Patrole converter framework.
Right now is_admin rules are still causing issues. Fixes those
issues by checking for context_is_admin in the policy file under
test and whether the current role is_admin according to the rule
context_is_admin. This ensures that is_admin is reliably
passed to oslo policy's check.
Also adds support for services like nova, which does not
auto-generate a policy.json file. Uses oslo_policy.generator
in this case.
Change-Id: I91b567fd13130ebd9e3a9c49c46488c76d99d7a8
diff --git a/patrole_tempest_plugin/rbac_role_converter.py b/patrole_tempest_plugin/rbac_role_converter.py
index 420de7f..fb0d2fe 100644
--- a/patrole_tempest_plugin/rbac_role_converter.py
+++ b/patrole_tempest_plugin/rbac_role_converter.py
@@ -17,6 +17,7 @@
import os
from oslo_log import log as logging
+from oslo_policy import generator
from oslo_policy import policy
from tempest import config
@@ -39,51 +40,73 @@
def __init__(self, tenant_id, service, path=None):
"""Initialization of Policy Converter.
- Parse policy files to create dictionary mapping policy actions to
- roles.
+ Parses a policy file to create a dictionary, mapping policy actions to
+ roles. If a policy file does not exist, checks whether the policy file
+ is registered as a namespace under oslo.policy.policies. Nova, for
+ example, doesn't use a policy.json file by default; its policy is
+ implemented in code and registered as 'nova' under
+ oslo.policy.policies.
+
+ If the policy file is not found in either place, raises an exception.
+
+ Additionally, if the policy file exists in both code and as a
+ policy.json (for example, by creating a custom nova policy.json file),
+ the custom policy file over the default policy implementation is
+ prioritized.
:param tenant_id: type uuid
:param service: type string
:param path: type string
"""
+ service = service.lower().strip()
if path is None:
self.path = '/etc/{0}/policy.json'.format(service)
else:
self.path = path
- if not os.path.isfile(self.path):
- raise rbac_exceptions.RbacResourceSetupFailed(
- 'Policy file for service: {0}, {1} not found.'
- .format(service, self.path))
+ policy_data = "{}"
+ # First check whether policy file exists.
+ if os.path.isfile(self.path):
+ policy_data = open(self.path, 'r').read()
+ # Otherwise use oslo_policy to fetch the rules for provided service.
+ else:
+ policy_generator = generator._get_policies_dict([service])
+ if policy_generator and service in policy_generator:
+ policy_data = "{\n"
+ for r in policy_generator[service]:
+ policy_data = policy_data + r.__str__() + ",\n"
+ policy_data = policy_data[:-2] + "\n}"
+ # Otherwise raise an exception.
+ else:
+ raise rbac_exceptions.RbacResourceSetupFailed(
+ 'Policy file for service: {0}, {1} not found.'
+ .format(service, self.path))
+
+ self.rules = policy.Rules.load(policy_data, "default")
self.tenant_id = tenant_id
def allowed(self, rule_name, role):
- policy_file = open(self.path, 'r')
- access_token = self._get_access_token(role)
-
+ is_admin_context = self._is_admin_context(role)
is_allowed = self._allowed(
- policy_file=policy_file,
- access=access_token,
+ access=self._get_access_token(role),
apply_rule=rule_name,
- is_admin=False)
+ is_admin=is_admin_context)
- policy_file = open(self.path, 'r')
- access_token = self._get_access_token(role)
- allowed_as_admin_context = self._allowed(
- policy_file=policy_file,
- access=access_token,
- apply_rule=rule_name,
- is_admin=True)
+ return is_allowed
- if allowed_as_admin_context and is_allowed:
- return True
- if allowed_as_admin_context and not is_allowed:
- return False
- if not allowed_as_admin_context and is_allowed:
- return True
- if not allowed_as_admin_context and not is_allowed:
- return False
+ def _is_admin_context(self, role):
+ """Checks whether a role has admin context.
+
+ If context_is_admin is contained in the policy file, then checks
+ whether the given role is contained in context_is_admin. If it is not
+ in the policy file, then default to context_is_admin: admin.
+ """
+ if 'context_is_admin' in self.rules.keys():
+ return self._allowed(
+ access=self._get_access_token(role),
+ apply_rule='context_is_admin')
+ return role == 'admin'
def _get_access_token(self, role):
access_token = {
@@ -100,12 +123,11 @@
}
return access_token
- def _allowed(self, policy_file, access, apply_rule, is_admin=False):
+ def _allowed(self, access, apply_rule, is_admin=False):
"""Checks if a given rule in a policy is allowed with given access.
Adapted from oslo_policy.shell.
- :param policy file: type string: path to policy file
:param access: type dict: dictionary from ``_get_access_token``
:param apply_rule: type string: rule to be checked
:param is_admin: type bool: whether admin context is used
@@ -114,24 +136,30 @@
access_data['roles'] = [role['name'] for role in access_data['roles']]
access_data['project_id'] = access_data['project']['id']
access_data['is_admin'] = is_admin
- policy_data = policy_file.read()
- rules = policy.Rules.load(policy_data, "default")
+ # TODO(felipemonteiro): Dynamically calculate is_admin_project rather
+ # than hard-coding it to True. is_admin_project cannot be determined
+ # from the role, but rather from project and domain names. See
+ # _populate_is_admin_project in keystone.token.providers.common
+ # for more information.
+ access_data['is_admin_project'] = True
class Object(object):
pass
o = Object()
- o.rules = rules
+ o.rules = self.rules
target = {"project_id": access_data['project_id']}
- key = apply_rule
- rule = rules[apply_rule]
- result = self._try_rule(key, rule, target, access_data, o)
+ result = self._try_rule(apply_rule, target, access_data, o)
return result
- def _try_rule(self, key, rule, target, access_data, o):
+ def _try_rule(self, apply_rule, target, access_data, o):
try:
+ rule = self.rules[apply_rule]
return rule(target, access_data, o)
+ except KeyError as e:
+ LOG.debug("{0} not found in policy file.".format(apply_rule))
+ return False
except Exception as e:
- LOG.debug("Exception: {0} for rule: {1}".format(e, rule))
+ LOG.debug("Exception: {0} for rule: {1}.".format(e, rule))
return False