Initial functionality framework.
Includes:
rbac_util - Utility for switching between roles for tests.
rbac_auth - Determines if a given role is valid for a given api call.
rbac_rule_validation - Determines if a allowed proper access and denied improper access (403 error)
rbac_role_converter - Converts policy.json files into a list of api's and the roles that can access them.
One example rbac_base in tests/api/rbac_base
One example test in tests/api/images/test_images_rbac.py
New config settings for rbac_flag, rbac_test_role, and rbac_roles
Implements bp: initial-framework
Co-Authored-By: Sangeet Gupta <sg774j@att.com>
Co-Authored-By: Rick Bartra <rb560u@att.com>
Co-Authored-By: Felipe Monteiro <felipe.monteiro@att.com>
Co-Authored-By: Anthony Bellino <ab2434@att.com>
Co-Authored-By: Avishek Dutta <ad620p@att.com>
Change-Id: Ic97b2558ba33ab47ac8174ae37629d36ceb1c9de
diff --git a/patrole_tempest_plugin/rbac_role_converter.py b/patrole_tempest_plugin/rbac_role_converter.py
new file mode 100644
index 0000000..cfb7856
--- /dev/null
+++ b/patrole_tempest_plugin/rbac_role_converter.py
@@ -0,0 +1,153 @@
+# Copyright 2016 AT&T Corp
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import os
+
+from oslo_config import cfg
+from oslo_log import log as logging
+from oslo_policy import _checks
+from oslo_policy import policy
+from tempest import config
+
+from patrole_tempest_plugin.rbac_exceptions import RbacResourceSetupFailed
+
+CONF = config.CONF
+LOG = logging.getLogger(__name__)
+
+RULES_TO_SKIP = []
+TESTED_RULES = []
+PARSED_RULES = []
+
+
+class RbacPolicyConverter(object):
+ """A class for parsing policy rules into lists of allowed roles.
+
+ RBAC testing requires that each rule in a policy file be broken up into
+ the roles that constitute it. This class automates that process.
+ """
+
+ def __init__(self, service, path=None):
+ """Initialization of Policy Converter
+
+ Parse policy files to create dictionary mapping
+ policy actions to roles.
+ :param service: type string
+ :param path: type string
+ """
+
+ if path is None:
+ path = '/etc/{0}/policy.json'.format(service)
+
+ if not os.path.isfile(path):
+ raise RbacResourceSetupFailed('Policy file for service: {0}, {1}'
+ ' not found.'.format(service, path))
+
+ self.default_roles = CONF.rbac.rbac_roles
+ self.rules = {}
+
+ self._get_roles_for_each_rule_in_policy_file(path)
+
+ def _get_roles_for_each_rule_in_policy_file(self, path):
+ """Gets the roles for each rule in the policy file at given path."""
+
+ global PARSED_RULES
+ global TESTED_RULES
+ global RULES_TO_SKIP
+
+ rule_to_roles_dict = {}
+ enforcer = self._init_policy_enforcer(path)
+
+ base_rules = set()
+ for rule_name, rule_checker in enforcer.rules.items():
+ if isinstance(rule_checker, _checks.OrCheck):
+ for sub_rule in rule_checker.rules:
+ if hasattr(sub_rule, 'match'):
+ base_rules.add(sub_rule.match)
+ elif isinstance(rule_checker, _checks.RuleCheck):
+ if hasattr(rule_checker, 'match'):
+ base_rules.add(rule_checker.match)
+
+ RULES_TO_SKIP.extend(base_rules)
+ generic_check_dict = self._get_generic_check_dict(enforcer.rules)
+
+ for rule_name, rule_checker in enforcer.rules.items():
+ PARSED_RULES.append(rule_name)
+
+ if rule_name in RULES_TO_SKIP:
+ continue
+ if isinstance(rule_checker, _checks.GenericCheck):
+ continue
+
+ # Determine whether each role is contained within the current rule.
+ for role in self.default_roles:
+ roles = {'roles': [role]}
+ roles.update(generic_check_dict)
+ is_role_in_rule = rule_checker(
+ generic_check_dict, roles, enforcer)
+ if is_role_in_rule:
+ rule_to_roles_dict.setdefault(rule_name, set())
+ rule_to_roles_dict[rule_name].add(role)
+
+ self.rules = rule_to_roles_dict
+
+ def _init_policy_enforcer(self, policy_file):
+ """Initializes oslo policy enforcer"""
+
+ def find_file(path):
+ realpath = os.path.realpath(path)
+ if os.path.isfile(realpath):
+ return realpath
+ else:
+ return None
+
+ CONF = cfg.CONF
+ CONF.find_file = find_file
+
+ enforcer = policy.Enforcer(CONF,
+ policy_file=policy_file,
+ rules=None,
+ default_rule=None,
+ use_conf=True)
+ enforcer.load_rules()
+ return enforcer
+
+ def _get_generic_check_dict(self, enforcer_rules):
+ """Creates permissions dictionary that oslo policy uses
+
+ to determine if a user can perform an action.
+ """
+
+ generic_checks = set()
+ for rule_checker in enforcer_rules.values():
+ entries = set()
+ self._get_generic_check_entries(rule_checker, entries)
+ generic_checks |= entries
+ return {e: '' for e in generic_checks}
+
+ def _get_generic_check_entries(self, rule_checker, entries):
+ if isinstance(rule_checker, _checks.GenericCheck):
+ if hasattr(rule_checker, 'match'):
+ if rule_checker.match.startswith('%(') and\
+ rule_checker.match.endswith(')s'):
+ entries.add(rule_checker.match[2:-2])
+ if hasattr(rule_checker, 'rule'):
+ if isinstance(rule_checker.rule, _checks.GenericCheck) and\
+ hasattr(rule_checker.rule, 'match'):
+ if rule_checker.rule.match.startswith('%(') and\
+ rule_checker.rule.match.endswith(')s'):
+ entries.add(rule_checker.rule.match[2:-2])
+ if hasattr(rule_checker, 'rules'):
+ for rule in rule_checker.rules:
+ self._get_generic_check_entries(rule, entries)