Fixes converter not working for certain edge cases.
Currently, the converter framework is not robust enough to handle
all policy cases. For example, is_admin context breaks.
This patch makes the converter more robust. The converter was changed
to use oslo_policy's shell tool for figuring out which roles are
permitted for a given rule. The shell tool can be found here:
https://github.com/openstack/oslo.policy/blob/master/oslo_policy/shell.py
Because the shell tool is intended to be used as a CLI tool, it was
adapted from oslo policy to better work within Patrole.
implements blueprint: oslo-policy-converter
Change-Id: Ia0fe9113e2be44e609b0edbb4c6facd1425f28b5
diff --git a/patrole_tempest_plugin/rbac_role_converter.py b/patrole_tempest_plugin/rbac_role_converter.py
index cfb7856..65e3f27 100644
--- a/patrole_tempest_plugin/rbac_role_converter.py
+++ b/patrole_tempest_plugin/rbac_role_converter.py
@@ -13,141 +13,125 @@
# License for the specific language governing permissions and limitations
# under the License.
+import copy
import os
-from oslo_config import cfg
from oslo_log import log as logging
-from oslo_policy import _checks
from oslo_policy import policy
from tempest import config
-from patrole_tempest_plugin.rbac_exceptions import RbacResourceSetupFailed
+from patrole_tempest_plugin import rbac_exceptions
CONF = config.CONF
LOG = logging.getLogger(__name__)
-RULES_TO_SKIP = []
-TESTED_RULES = []
-PARSED_RULES = []
-
class RbacPolicyConverter(object):
"""A class for parsing policy rules into lists of allowed roles.
RBAC testing requires that each rule in a policy file be broken up into
the roles that constitute it. This class automates that process.
+
+ The list of roles per rule can be reverse-engineered by checking, for
+ each role, whether a given rule is allowed using oslo policy.
"""
- def __init__(self, service, path=None):
- """Initialization of Policy Converter
+ def __init__(self, tenant_id, service, path=None):
+ """Initialization of Policy Converter.
- Parse policy files to create dictionary mapping
- policy actions to roles.
+ Parse policy files to create dictionary mapping policy actions to
+ roles.
+
+ :param tenant_id: type uuid
:param service: type string
:param path: type string
"""
-
if path is None:
- path = '/etc/{0}/policy.json'.format(service)
+ self.path = '/etc/{0}/policy.json'.format(service)
+ else:
+ self.path = path
- if not os.path.isfile(path):
- raise RbacResourceSetupFailed('Policy file for service: {0}, {1}'
- ' not found.'.format(service, path))
+ if not os.path.isfile(self.path):
+ raise rbac_exceptions.RbacResourceSetupFailed(
+ 'Policy file for service: {0}, {1} not found.'
+ .format(service, self.path))
- self.default_roles = CONF.rbac.rbac_roles
- self.rules = {}
+ self.tenant_id = tenant_id
- self._get_roles_for_each_rule_in_policy_file(path)
+ def allowed(self, rule_name, role):
+ policy_file = open(self.path, 'r')
+ access_token = self._get_access_token(role)
- def _get_roles_for_each_rule_in_policy_file(self, path):
- """Gets the roles for each rule in the policy file at given path."""
+ is_allowed = self._allowed(
+ policy_file=policy_file,
+ access=access_token,
+ apply_rule=rule_name,
+ is_admin=False)
- global PARSED_RULES
- global TESTED_RULES
- global RULES_TO_SKIP
+ policy_file = open(self.path, 'r')
+ access_token = self._get_access_token(role)
+ allowed_as_admin_context = self._allowed(
+ policy_file=policy_file,
+ access=access_token,
+ apply_rule=rule_name,
+ is_admin=True)
- rule_to_roles_dict = {}
- enforcer = self._init_policy_enforcer(path)
+ if allowed_as_admin_context and is_allowed:
+ return True
+ if allowed_as_admin_context and not is_allowed:
+ return False
+ if not allowed_as_admin_context and is_allowed:
+ return True
+ if not allowed_as_admin_context and not is_allowed:
+ return False
- base_rules = set()
- for rule_name, rule_checker in enforcer.rules.items():
- if isinstance(rule_checker, _checks.OrCheck):
- for sub_rule in rule_checker.rules:
- if hasattr(sub_rule, 'match'):
- base_rules.add(sub_rule.match)
- elif isinstance(rule_checker, _checks.RuleCheck):
- if hasattr(rule_checker, 'match'):
- base_rules.add(rule_checker.match)
+ def _get_access_token(self, role):
+ access_token = {
+ "token": {
+ "roles": [
+ {
+ "name": role
+ }
+ ],
+ "project": {
+ "id": self.tenant_id
+ }
+ }
+ }
+ return access_token
- RULES_TO_SKIP.extend(base_rules)
- generic_check_dict = self._get_generic_check_dict(enforcer.rules)
+ def _allowed(self, policy_file, access, apply_rule, is_admin=False):
+ """Checks if a given rule in a policy is allowed with given access.
- for rule_name, rule_checker in enforcer.rules.items():
- PARSED_RULES.append(rule_name)
+ Adapted from oslo_policy.shell.
- if rule_name in RULES_TO_SKIP:
- continue
- if isinstance(rule_checker, _checks.GenericCheck):
- continue
-
- # Determine whether each role is contained within the current rule.
- for role in self.default_roles:
- roles = {'roles': [role]}
- roles.update(generic_check_dict)
- is_role_in_rule = rule_checker(
- generic_check_dict, roles, enforcer)
- if is_role_in_rule:
- rule_to_roles_dict.setdefault(rule_name, set())
- rule_to_roles_dict[rule_name].add(role)
-
- self.rules = rule_to_roles_dict
-
- def _init_policy_enforcer(self, policy_file):
- """Initializes oslo policy enforcer"""
-
- def find_file(path):
- realpath = os.path.realpath(path)
- if os.path.isfile(realpath):
- return realpath
- else:
- return None
-
- CONF = cfg.CONF
- CONF.find_file = find_file
-
- enforcer = policy.Enforcer(CONF,
- policy_file=policy_file,
- rules=None,
- default_rule=None,
- use_conf=True)
- enforcer.load_rules()
- return enforcer
-
- def _get_generic_check_dict(self, enforcer_rules):
- """Creates permissions dictionary that oslo policy uses
-
- to determine if a user can perform an action.
+ :param policy file: type string: path to policy file
+ :param access: type dict: dictionary from ``_get_access_token``
+ :param apply_rule: type string: rule to be checked
+ :param is_admin: type bool: whether admin context is used
"""
+ access_data = copy.copy(access['token'])
+ access_data['roles'] = [role['name'] for role in access_data['roles']]
+ access_data['project_id'] = access_data['project']['id']
+ access_data['is_admin'] = is_admin
+ policy_data = policy_file.read()
+ rules = policy.Rules.load(policy_data, "default")
- generic_checks = set()
- for rule_checker in enforcer_rules.values():
- entries = set()
- self._get_generic_check_entries(rule_checker, entries)
- generic_checks |= entries
- return {e: '' for e in generic_checks}
+ class Object(object):
+ pass
+ o = Object()
+ o.rules = rules
- def _get_generic_check_entries(self, rule_checker, entries):
- if isinstance(rule_checker, _checks.GenericCheck):
- if hasattr(rule_checker, 'match'):
- if rule_checker.match.startswith('%(') and\
- rule_checker.match.endswith(')s'):
- entries.add(rule_checker.match[2:-2])
- if hasattr(rule_checker, 'rule'):
- if isinstance(rule_checker.rule, _checks.GenericCheck) and\
- hasattr(rule_checker.rule, 'match'):
- if rule_checker.rule.match.startswith('%(') and\
- rule_checker.rule.match.endswith(')s'):
- entries.add(rule_checker.rule.match[2:-2])
- if hasattr(rule_checker, 'rules'):
- for rule in rule_checker.rules:
- self._get_generic_check_entries(rule, entries)
+ target = {"project_id": access_data['project_id']}
+
+ key = apply_rule
+ rule = rules[apply_rule]
+ result = self._try_rule(key, rule, target, access_data, o)
+ return result
+
+ def _try_rule(self, key, rule, target, access_data, o):
+ try:
+ return rule(target, access_data, o)
+ except Exception as e:
+ LOG.debug("Exception: {0} for rule: {1}".format(e, rule))
+ return False