blob: a7da2d33ae99f09ac9aacf0fc8b327a653553e1d [file] [log] [blame]
DavidPurcellb25f93d2017-01-27 12:46:27 -05001# Copyright 2017 AT&T Corporation.
DavidPurcell029d8c32017-01-06 15:27:41 -05002# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
15
Rick Bartraed950052017-06-29 17:20:33 -040016import abc
17import six
Felipe Monteirofa01d5f2017-04-01 06:18:25 +010018import sys
DavidPurcell029d8c32017-01-06 15:27:41 -050019import time
Felipe Monteiro34a138c2017-03-02 17:01:37 -050020
Rajiv Kumar645dfc92017-01-19 13:48:27 +053021from oslo_log import log as logging
Felipe Monteiro2693bf72017-08-12 22:56:47 +010022from oslo_utils import excutils
Felipe Monteiroe8d93e02017-07-19 20:52:20 +010023import testtools
Felipe Monteirofa01d5f2017-04-01 06:18:25 +010024
Felipe Monteiroe7e552e2017-05-02 17:04:12 +010025from tempest.common import credentials_factory as credentials
Felipe Monteirofa01d5f2017-04-01 06:18:25 +010026from tempest import config
DavidPurcell029d8c32017-01-06 15:27:41 -050027
Felipe Monteiro34a138c2017-03-02 17:01:37 -050028from patrole_tempest_plugin import rbac_exceptions
DavidPurcell029d8c32017-01-06 15:27:41 -050029
DavidPurcell029d8c32017-01-06 15:27:41 -050030CONF = config.CONF
Felipe Monteiro34a138c2017-03-02 17:01:37 -050031LOG = logging.getLogger(__name__)
DavidPurcell029d8c32017-01-06 15:27:41 -050032
33
DavidPurcell029d8c32017-01-06 15:27:41 -050034class RbacUtils(object):
Felipe Monteiroe8d93e02017-07-19 20:52:20 +010035 """Utility class responsible for switching os_primary role.
36
37 This class is responsible for overriding the value of the primary Tempest
38 credential's role (i.e. "os_primary" role). By doing so, it is possible to
39 seamlessly swap between admin credentials, needed for setup and clean up,
40 and primary credentials, needed to perform the API call which does
41 policy enforcement. The primary credentials always cycle between roles
Felipe Monteirof6eb8622017-08-06 06:08:02 +010042 defined by ``CONF.identity.admin_role`` and `CONF.patrole.rbac_test_role``.
Felipe Monteiroe8d93e02017-07-19 20:52:20 +010043 """
DavidPurcell029d8c32017-01-06 15:27:41 -050044
Felipe Monteirob35de582017-05-05 00:16:53 +010045 def __init__(self, test_obj):
Felipe Monteiroe8d93e02017-07-19 20:52:20 +010046 """Constructor for ``RbacUtils``.
47
Felipe Monteiro2693bf72017-08-12 22:56:47 +010048 :param test_obj: An instance of `tempest.test.BaseTestCase`.
Felipe Monteiroe8d93e02017-07-19 20:52:20 +010049 """
50 # Since we are going to instantiate a client manager with
51 # admin credentials, first check if admin is available.
52 if not credentials.is_admin_available(
53 identity_version=test_obj.get_identity_version()):
54 msg = "Missing Identity Admin API credentials in configuration."
55 raise testtools.TestCase.skipException(msg)
56
57 # Intialize the admin roles_client to perform role switching.
58 admin_creds = test_obj.get_client_manager(credential_type='admin')
59 if test_obj.get_identity_version() == 'v3':
60 admin_roles_client = admin_creds.roles_v3_client
61 else:
62 admin_roles_client = admin_creds.roles_client
63
64 self.admin_roles_client = admin_roles_client
Felipe Monteirob35de582017-05-05 00:16:53 +010065 self.switch_role(test_obj, toggle_rbac_role=False)
66
Felipe Monteiro75f23632017-04-07 15:56:26 +010067 # References the last value of `toggle_rbac_role` that was passed to
Felipe Monteirofa01d5f2017-04-01 06:18:25 +010068 # `switch_role`. Used for ensuring that `switch_role` is correctly used
69 # in a test file, so that false positives are prevented. The key used
Felipe Monteiro521e5c12017-04-05 22:59:57 +010070 # to index into the dictionary is the module path plus class name, which is
71 # unique.
Felipe Monteirofa01d5f2017-04-01 06:18:25 +010072 switch_role_history = {}
73 admin_role_id = None
74 rbac_role_id = None
DavidPurcell029d8c32017-01-06 15:27:41 -050075
Felipe Monteiro75f23632017-04-07 15:56:26 +010076 def switch_role(self, test_obj, toggle_rbac_role=False):
Felipe Monteiro7be94e82017-07-26 02:17:08 +010077 """Switch the role used by `os_primary` Tempest credentials.
78
79 Switch the role used by `os_primary` credentials to:
80 * admin if `toggle_rbac_role` is False
Felipe Monteirof6eb8622017-08-06 06:08:02 +010081 * `CONF.patrole.rbac_test_role` if `toggle_rbac_role` is True
Felipe Monteiro7be94e82017-07-26 02:17:08 +010082
Felipe Monteirof6eb8622017-08-06 06:08:02 +010083 :param test_obj: test object of type tempest.lib.base.BaseTestCase
84 :param toggle_rbac_role: role to switch `os_primary` Tempest creds to
Felipe Monteiro7be94e82017-07-26 02:17:08 +010085 """
Felipe Monteiroe8d93e02017-07-19 20:52:20 +010086 self.user_id = test_obj.os_primary.credentials.user_id
87 self.project_id = test_obj.os_primary.credentials.tenant_id
88 self.token = test_obj.os_primary.auth_provider.get_token()
DavidPurcell029d8c32017-01-06 15:27:41 -050089
Felipe Monteiroe8d93e02017-07-19 20:52:20 +010090 LOG.debug('Switching role to: %s.', toggle_rbac_role)
Felipe Monteiro2693bf72017-08-12 22:56:47 +010091 role_already_present = False
92
DavidPurcell029d8c32017-01-06 15:27:41 -050093 try:
Felipe Monteiro2693bf72017-08-12 22:56:47 +010094 if not all([self.admin_role_id, self.rbac_role_id]):
95 self._get_roles_by_name()
DavidPurcell029d8c32017-01-06 15:27:41 -050096
Felipe Monteirob35de582017-05-05 00:16:53 +010097 self._validate_switch_role(test_obj, toggle_rbac_role)
DavidPurcell029d8c32017-01-06 15:27:41 -050098
Felipe Monteiro2693bf72017-08-12 22:56:47 +010099 target_role = (
100 self.rbac_role_id if toggle_rbac_role else self.admin_role_id)
101 role_already_present = self._list_and_clear_user_roles_on_project(
102 target_role)
103
104 # Do not switch roles if `target_role` already exists.
105 if not role_already_present:
106 self._create_user_role_on_project(target_role)
DavidPurcell029d8c32017-01-06 15:27:41 -0500107 except Exception as exp:
Felipe Monteiro2693bf72017-08-12 22:56:47 +0100108 with excutils.save_and_reraise_exception():
109 LOG.exception(exp)
Felipe Monteiro34a138c2017-03-02 17:01:37 -0500110 finally:
Felipe Monteiroe8d93e02017-07-19 20:52:20 +0100111 test_obj.os_primary.auth_provider.clear_auth()
Felipe Monteiro7be94e82017-07-26 02:17:08 +0100112 # Fernet tokens are not subsecond aware so sleep to ensure we are
113 # passing the second boundary before attempting to authenticate.
Felipe Monteiro2693bf72017-08-12 22:56:47 +0100114 # Only sleep if a token revocation occurred as a result of role
115 # switching. This will optimize test runtime in the case where
116 # ``[identity] admin_role`` == ``[rbac] rbac_test_role``.
117 if not role_already_present:
Rick Bartra89f498f2017-03-20 15:54:45 -0400118 time.sleep(1)
Felipe Monteiroe8d93e02017-07-19 20:52:20 +0100119 test_obj.os_primary.auth_provider.set_auth()
Felipe Monteiro34a138c2017-03-02 17:01:37 -0500120
Felipe Monteiro2693bf72017-08-12 22:56:47 +0100121 def _get_roles_by_name(self):
122 available_roles = self.admin_roles_client.list_roles()
123 admin_role_id = rbac_role_id = None
Felipe Monteirofa01d5f2017-04-01 06:18:25 +0100124
Felipe Monteiro2693bf72017-08-12 22:56:47 +0100125 for role in available_roles['roles']:
Felipe Monteirof6eb8622017-08-06 06:08:02 +0100126 if role['name'] == CONF.patrole.rbac_test_role:
Felipe Monteiro2693bf72017-08-12 22:56:47 +0100127 rbac_role_id = role['id']
128 if role['name'] == CONF.identity.admin_role:
129 admin_role_id = role['id']
130
131 if not all([admin_role_id, rbac_role_id]):
Felipe Monteirof6eb8622017-08-06 06:08:02 +0100132 msg = ("Roles defined by `[patrole] rbac_test_role` and "
133 "`[identity] admin_role` must be defined in the system.")
Felipe Monteiro2693bf72017-08-12 22:56:47 +0100134 raise rbac_exceptions.RbacResourceSetupFailed(msg)
135
136 self.admin_role_id = admin_role_id
137 self.rbac_role_id = rbac_role_id
138
139 def _create_user_role_on_project(self, role_id):
Felipe Monteiroe8d93e02017-07-19 20:52:20 +0100140 self.admin_roles_client.create_user_role_on_project(
Felipe Monteirofa01d5f2017-04-01 06:18:25 +0100141 self.project_id, self.user_id, role_id)
142
Felipe Monteiro2693bf72017-08-12 22:56:47 +0100143 def _list_and_clear_user_roles_on_project(self, role_id):
Felipe Monteiroe8d93e02017-07-19 20:52:20 +0100144 roles = self.admin_roles_client.list_user_roles_on_project(
Felipe Monteirofa01d5f2017-04-01 06:18:25 +0100145 self.project_id, self.user_id)['roles']
Felipe Monteirofa01d5f2017-04-01 06:18:25 +0100146 role_ids = [role['id'] for role in roles]
Felipe Monteiro2693bf72017-08-12 22:56:47 +0100147
148 # NOTE(felipemonteiro): We do not use ``role_id in role_ids`` here to
149 # avoid over-permission errors: if the current list of roles on the
150 # project includes "admin" and "Member", and we are switching to the
151 # "Member" role, then we must delete the "admin" role. Thus, we only
152 # return early if the user's roles on the project are an exact match.
153 if [role_id] == role_ids:
Felipe Monteirofa01d5f2017-04-01 06:18:25 +0100154 return True
Felipe Monteirob3b7bc82017-03-03 15:58:15 -0500155
156 for role in roles:
Felipe Monteiroe8d93e02017-07-19 20:52:20 +0100157 self.admin_roles_client.delete_role_from_user_on_project(
Felipe Monteirofa01d5f2017-04-01 06:18:25 +0100158 self.project_id, self.user_id, role['id'])
159
160 return False
161
Felipe Monteiro75f23632017-04-07 15:56:26 +0100162 def _validate_switch_role(self, test_obj, toggle_rbac_role):
Felipe Monteiro2693bf72017-08-12 22:56:47 +0100163 """Validates that the test role passed to `switch_role` is legal.
Felipe Monteirofa01d5f2017-04-01 06:18:25 +0100164
165 Throws an error for the following improper usages of `switch_role`:
166 * `switch_role` is not called with a boolean value
Felipe Monteiro2693bf72017-08-12 22:56:47 +0100167 * `switch_role` is never called inside a test, except in tearDown
Felipe Monteirofa01d5f2017-04-01 06:18:25 +0100168 * `switch_role` is called with the same boolean value twice
Felipe Monteiro7be94e82017-07-26 02:17:08 +0100169
170 If a `skipException` is thrown then this is a legitimate reason why
171 `switch_role` is not called.
Felipe Monteirofa01d5f2017-04-01 06:18:25 +0100172 """
Felipe Monteiro75f23632017-04-07 15:56:26 +0100173 if not isinstance(toggle_rbac_role, bool):
Felipe Monteirofa01d5f2017-04-01 06:18:25 +0100174 raise rbac_exceptions.RbacResourceSetupFailed(
Felipe Monteiro2693bf72017-08-12 22:56:47 +0100175 '`toggle_rbac_role` must be a boolean value.')
Felipe Monteirofa01d5f2017-04-01 06:18:25 +0100176
Felipe Monteiro521e5c12017-04-05 22:59:57 +0100177 # The unique key is the combination of module path plus class name.
178 class_name = test_obj.__name__ if isinstance(test_obj, type) else \
Felipe Monteirofa01d5f2017-04-01 06:18:25 +0100179 test_obj.__class__.__name__
Felipe Monteiro521e5c12017-04-05 22:59:57 +0100180 module_name = test_obj.__module__
181 key = '%s.%s' % (module_name, class_name)
182
Felipe Monteirofa01d5f2017-04-01 06:18:25 +0100183 self.switch_role_history.setdefault(key, None)
184
Felipe Monteiro75f23632017-04-07 15:56:26 +0100185 if self.switch_role_history[key] == toggle_rbac_role:
Felipe Monteiroba4881b2017-04-09 02:11:25 +0100186 # If an exception was thrown, like a skipException or otherwise,
187 # then this is a legitimate reason why `switch_role` was not
188 # called, so only raise an exception if no current exception is
189 # being handled.
190 if sys.exc_info()[0] is None:
Felipe Monteirofa01d5f2017-04-01 06:18:25 +0100191 self.switch_role_history[key] = False
Felipe Monteiro75f23632017-04-07 15:56:26 +0100192 error_message = '`toggle_rbac_role` must not be called with '\
Felipe Monteirofa01d5f2017-04-01 06:18:25 +0100193 'the same bool value twice. Make sure that you included '\
194 'a rbac_utils.switch_role method call inside the test.'
195 LOG.error(error_message)
196 raise rbac_exceptions.RbacResourceSetupFailed(error_message)
197 else:
Felipe Monteiro75f23632017-04-07 15:56:26 +0100198 self.switch_role_history[key] = toggle_rbac_role
Felipe Monteirofa01d5f2017-04-01 06:18:25 +0100199
Felipe Monteirof6eb8622017-08-06 06:08:02 +0100200 def _get_roles(self):
201 available_roles = self.admin_roles_client.list_roles()
202 admin_role_id = rbac_role_id = None
203
204 for role in available_roles['roles']:
205 if role['name'] == CONF.patrole.rbac_test_role:
206 rbac_role_id = role['id']
207 if role['name'] == CONF.identity.admin_role:
208 admin_role_id = role['id']
209
210 if not admin_role_id or not rbac_role_id:
211 msg = "Role with name 'admin' does not exist in the system."\
212 if not admin_role_id else "Role defined by rbac_test_role "\
213 "does not exist in the system."
214 raise rbac_exceptions.RbacResourceSetupFailed(msg)
215
216 self.admin_role_id = admin_role_id
217 self.rbac_role_id = rbac_role_id
218
Felipe Monteiro17e9b492017-05-27 05:45:20 +0100219
Felipe Monteiro8a043fb2017-08-06 06:29:05 +0100220def is_admin():
221 """Verifies whether the current test role equals the admin role.
222
223 :returns: True if ``rbac_test_role`` is the admin role.
224 """
Felipe Monteirof6eb8622017-08-06 06:08:02 +0100225 return CONF.patrole.rbac_test_role == CONF.identity.admin_role
Rick Bartraed950052017-06-29 17:20:33 -0400226
227
228@six.add_metaclass(abc.ABCMeta)
229class RbacAuthority(object):
230 # TODO(rb560u): Add documentation explaining what this class is for
231
232 @abc.abstractmethod
233 def allowed(self, rule_name, role):
234 """Determine whether the role should be able to perform the API"""
235 return