blob: cfb7856b99a3c61a37d83a784fcab041570c99ef [file] [log] [blame]
DavidPurcell029d8c32017-01-06 15:27:41 -05001# Copyright 2016 AT&T Corp
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
15
16import os
17
18from oslo_config import cfg
19from oslo_log import log as logging
20from oslo_policy import _checks
21from oslo_policy import policy
22from tempest import config
23
24from patrole_tempest_plugin.rbac_exceptions import RbacResourceSetupFailed
25
26CONF = config.CONF
27LOG = logging.getLogger(__name__)
28
29RULES_TO_SKIP = []
30TESTED_RULES = []
31PARSED_RULES = []
32
33
34class RbacPolicyConverter(object):
35 """A class for parsing policy rules into lists of allowed roles.
36
37 RBAC testing requires that each rule in a policy file be broken up into
38 the roles that constitute it. This class automates that process.
39 """
40
41 def __init__(self, service, path=None):
42 """Initialization of Policy Converter
43
44 Parse policy files to create dictionary mapping
45 policy actions to roles.
46 :param service: type string
47 :param path: type string
48 """
49
50 if path is None:
51 path = '/etc/{0}/policy.json'.format(service)
52
53 if not os.path.isfile(path):
54 raise RbacResourceSetupFailed('Policy file for service: {0}, {1}'
55 ' not found.'.format(service, path))
56
57 self.default_roles = CONF.rbac.rbac_roles
58 self.rules = {}
59
60 self._get_roles_for_each_rule_in_policy_file(path)
61
62 def _get_roles_for_each_rule_in_policy_file(self, path):
63 """Gets the roles for each rule in the policy file at given path."""
64
65 global PARSED_RULES
66 global TESTED_RULES
67 global RULES_TO_SKIP
68
69 rule_to_roles_dict = {}
70 enforcer = self._init_policy_enforcer(path)
71
72 base_rules = set()
73 for rule_name, rule_checker in enforcer.rules.items():
74 if isinstance(rule_checker, _checks.OrCheck):
75 for sub_rule in rule_checker.rules:
76 if hasattr(sub_rule, 'match'):
77 base_rules.add(sub_rule.match)
78 elif isinstance(rule_checker, _checks.RuleCheck):
79 if hasattr(rule_checker, 'match'):
80 base_rules.add(rule_checker.match)
81
82 RULES_TO_SKIP.extend(base_rules)
83 generic_check_dict = self._get_generic_check_dict(enforcer.rules)
84
85 for rule_name, rule_checker in enforcer.rules.items():
86 PARSED_RULES.append(rule_name)
87
88 if rule_name in RULES_TO_SKIP:
89 continue
90 if isinstance(rule_checker, _checks.GenericCheck):
91 continue
92
93 # Determine whether each role is contained within the current rule.
94 for role in self.default_roles:
95 roles = {'roles': [role]}
96 roles.update(generic_check_dict)
97 is_role_in_rule = rule_checker(
98 generic_check_dict, roles, enforcer)
99 if is_role_in_rule:
100 rule_to_roles_dict.setdefault(rule_name, set())
101 rule_to_roles_dict[rule_name].add(role)
102
103 self.rules = rule_to_roles_dict
104
105 def _init_policy_enforcer(self, policy_file):
106 """Initializes oslo policy enforcer"""
107
108 def find_file(path):
109 realpath = os.path.realpath(path)
110 if os.path.isfile(realpath):
111 return realpath
112 else:
113 return None
114
115 CONF = cfg.CONF
116 CONF.find_file = find_file
117
118 enforcer = policy.Enforcer(CONF,
119 policy_file=policy_file,
120 rules=None,
121 default_rule=None,
122 use_conf=True)
123 enforcer.load_rules()
124 return enforcer
125
126 def _get_generic_check_dict(self, enforcer_rules):
127 """Creates permissions dictionary that oslo policy uses
128
129 to determine if a user can perform an action.
130 """
131
132 generic_checks = set()
133 for rule_checker in enforcer_rules.values():
134 entries = set()
135 self._get_generic_check_entries(rule_checker, entries)
136 generic_checks |= entries
137 return {e: '' for e in generic_checks}
138
139 def _get_generic_check_entries(self, rule_checker, entries):
140 if isinstance(rule_checker, _checks.GenericCheck):
141 if hasattr(rule_checker, 'match'):
142 if rule_checker.match.startswith('%(') and\
143 rule_checker.match.endswith(')s'):
144 entries.add(rule_checker.match[2:-2])
145 if hasattr(rule_checker, 'rule'):
146 if isinstance(rule_checker.rule, _checks.GenericCheck) and\
147 hasattr(rule_checker.rule, 'match'):
148 if rule_checker.rule.match.startswith('%(') and\
149 rule_checker.rule.match.endswith(')s'):
150 entries.add(rule_checker.rule.match[2:-2])
151 if hasattr(rule_checker, 'rules'):
152 for rule in rule_checker.rules:
153 self._get_generic_check_entries(rule, entries)