blob: 922851b194cc8a3d1bd4dd0fa6eb9e43b4d8ab9c [file] [log] [blame]
DavidPurcellb25f93d2017-01-27 12:46:27 -05001# Copyright 2017 AT&T Corporation.
DavidPurcell029d8c32017-01-06 15:27:41 -05002# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
15
Sergey Vilgelmbab9e942018-10-11 14:04:48 -050016import contextlib
Mykola Yakovliev11376ab2018-08-06 15:34:22 -050017import sys
DavidPurcell029d8c32017-01-06 15:27:41 -050018import time
Felipe Monteiro34a138c2017-03-02 17:01:37 -050019
Rajiv Kumar645dfc92017-01-19 13:48:27 +053020from oslo_log import log as logging
Felipe Monteiro2693bf72017-08-12 22:56:47 +010021from oslo_utils import excutils
Felipe Monteirofa01d5f2017-04-01 06:18:25 +010022
Felipe Monteiro3e14f472017-08-17 23:02:11 +010023from tempest import clients
Felipe Monteiroe7e552e2017-05-02 17:04:12 +010024from tempest.common import credentials_factory as credentials
Felipe Monteirofa01d5f2017-04-01 06:18:25 +010025from tempest import config
Felipe Monteirobf524fb2018-10-03 09:03:35 -050026from tempest.lib import exceptions as lib_exc
DavidPurcell029d8c32017-01-06 15:27:41 -050027
Felipe Monteiro34a138c2017-03-02 17:01:37 -050028from patrole_tempest_plugin import rbac_exceptions
DavidPurcell029d8c32017-01-06 15:27:41 -050029
DavidPurcell029d8c32017-01-06 15:27:41 -050030CONF = config.CONF
Felipe Monteiro34a138c2017-03-02 17:01:37 -050031LOG = logging.getLogger(__name__)
DavidPurcell029d8c32017-01-06 15:27:41 -050032
33
Sergey Vilgelmbab9e942018-10-11 14:04:48 -050034class _ValidateListContext(object):
35 """Context class responsible for validation of the list functions.
36
37 This class is used in ``override_role_and_validate_list`` function and
38 the result of a list function must be assigned to the ``ctx.resources``
39 variable.
40
41 Example::
42
Sergey Vilgelmd3d77ef2019-02-02 09:34:52 -060043 with self.override_role_and_validate_list(...) as ctx:
Sergey Vilgelmbab9e942018-10-11 14:04:48 -050044 ctx.resources = list_function()
45
46 """
47 def __init__(self, admin_resources=None, admin_resource_id=None):
48 """Constructor for ``ValidateListContext``.
49
50 Either ``admin_resources`` or ``admin_resource_id`` should be used,
51 not both.
52
53 :param list admin_resources: The list of resources received before
54 calling the ``override_role_and_validate_list`` function. To
55 validate will be used the ``_validate_len`` function.
56 :param UUID admin_resource_id: An ID of a resource created before
57 calling the ``override_role_and_validate_list`` function. To
58 validate will be used the ``_validate_resource`` function.
59 :raises RbacValidateListException: if both ``admin_resources`` and
60 ``admin_resource_id`` are set or unset.
61 """
62 self.resources = None
63 if admin_resources is not None and not admin_resource_id:
64 self._admin_len = len(admin_resources)
65 if not self._admin_len:
66 raise rbac_exceptions.RbacValidateListException(
67 reason="the list of admin resources cannot be empty")
68 self._validate_func = self._validate_len
69 elif admin_resource_id and admin_resources is None:
70 self._admin_resource_id = admin_resource_id
71 self._validate_func = self._validate_resource
72 else:
73 raise rbac_exceptions.RbacValidateListException(
74 reason="admin_resources and admin_resource_id are mutually "
75 "exclusive")
76
77 def _validate_len(self):
78 """Validates that the number of resources is less than admin resources.
79 """
80 if not len(self.resources):
81 raise rbac_exceptions.RbacEmptyResponseBody()
82 elif self._admin_len > len(self.resources):
83 raise rbac_exceptions.RbacPartialResponseBody(body=self.resources)
84
85 def _validate_resource(self):
86 """Validates that the admin resource is present in the resources.
87 """
88 for resource in self.resources:
89 if resource['id'] == self._admin_resource_id:
90 return
91 raise rbac_exceptions.RbacPartialResponseBody(body=self.resources)
92
93 def _validate(self):
94 """Calls the proper validation function.
95
96 :raises RbacValidateListException: if the ``ctx.resources`` variable is
97 not assigned.
98 """
99 if self.resources is None:
100 raise rbac_exceptions.RbacValidateListException(
101 reason="ctx.resources is not assigned")
102 self._validate_func()
103
104
Sergey Vilgelmace8ea32018-11-19 16:25:10 -0600105class RbacUtilsMixin(object):
106 """Utility mixin responsible for switching ``os_primary`` role.
107
108 Should be used as a mixin class alongside an instance of
109 :py:class:`tempest.test.BaseTestCase` to perform Patrole class setup for a
110 base RBAC class. Child classes should not use this mixin.
111
112 Example::
113
114 class BaseRbacTest(rbac_utils.RbacUtilsMixin, base.BaseV2ComputeTest):
115
116 @classmethod
117 def setup_clients(cls):
118 super(BaseRbacTest, cls).setup_clients()
119
120 cls.hosts_client = cls.os_primary.hosts_client
121 ...
Felipe Monteiroe8d93e02017-07-19 20:52:20 +0100122
123 This class is responsible for overriding the value of the primary Tempest
Felipe Monteiro10e82fd2017-11-21 01:47:20 +0000124 credential's role (i.e. ``os_primary`` role). By doing so, it is possible
125 to seamlessly swap between admin credentials, needed for setup and clean
126 up, and primary credentials, needed to perform the API call which does
Felipe Monteiroe8d93e02017-07-19 20:52:20 +0100127 policy enforcement. The primary credentials always cycle between roles
Felipe Monteiro10e82fd2017-11-21 01:47:20 +0000128 defined by ``CONF.identity.admin_role`` and
Mykola Yakovlieve0f35502018-09-26 18:26:57 -0500129 ``CONF.patrole.rbac_test_roles``.
Felipe Monteiroe8d93e02017-07-19 20:52:20 +0100130 """
DavidPurcell029d8c32017-01-06 15:27:41 -0500131
Sergey Vilgelmace8ea32018-11-19 16:25:10 -0600132 def __init__(self, *args, **kwargs):
133 super(RbacUtilsMixin, self).__init__(*args, **kwargs)
Felipe Monteiroe8d93e02017-07-19 20:52:20 +0100134
Sergey Vilgelmace8ea32018-11-19 16:25:10 -0600135 # Shows if override_role was called.
136 self.__override_role_called = False
137 # Shows if exception raised during override_role.
138 self.__override_role_caught_exc = False
Sergey Vilgelm19e3bec2019-01-07 11:59:41 -0600139
Sergey Vilgelmace8ea32018-11-19 16:25:10 -0600140 _admin_role_id = None
141 _rbac_role_ids = None
142 _project_id = None
143 _user_id = None
144 _role_map = None
145 _role_inferences_mapping = None
146
147 admin_roles_client = None
148
Sergey Vilgelmace8ea32018-11-19 16:25:10 -0600149 @classmethod
150 def setup_clients(cls):
Felipe Monteiroe8d93e02017-07-19 20:52:20 +0100151 # Intialize the admin roles_client to perform role switching.
Felipe Monteiro3e14f472017-08-17 23:02:11 +0100152 admin_mgr = clients.Manager(
153 credentials.get_configured_admin_credentials())
Felipe Monteirobf524fb2018-10-03 09:03:35 -0500154 if CONF.identity_feature_enabled.api_v3:
Felipe Monteiro3e14f472017-08-17 23:02:11 +0100155 admin_roles_client = admin_mgr.roles_v3_client
Felipe Monteiroe8d93e02017-07-19 20:52:20 +0100156 else:
Felipe Monteirobf524fb2018-10-03 09:03:35 -0500157 raise lib_exc.InvalidConfiguration(
158 "Patrole role overriding only supports v3 identity API.")
Felipe Monteiroe8d93e02017-07-19 20:52:20 +0100159
Sergey Vilgelmace8ea32018-11-19 16:25:10 -0600160 cls.admin_roles_client = admin_roles_client
Mykola Yakovlieve0f35502018-09-26 18:26:57 -0500161
Sergey Vilgelmace8ea32018-11-19 16:25:10 -0600162 cls._project_id = cls.os_primary.credentials.tenant_id
163 cls._user_id = cls.os_primary.credentials.user_id
164 cls._role_inferences_mapping = cls._prepare_role_inferences_mapping()
165
166 cls._init_roles()
Mykola Yakovlieve0f35502018-09-26 18:26:57 -0500167
168 # Change default role to admin
Sergey Vilgelmace8ea32018-11-19 16:25:10 -0600169 cls._override_role(False)
170 super(RbacUtilsMixin, cls).setup_clients()
Felipe Monteirob35de582017-05-05 00:16:53 +0100171
Sergey Vilgelmace8ea32018-11-19 16:25:10 -0600172 @classmethod
173 def _prepare_role_inferences_mapping(cls):
Sergey Vilgelm19e3bec2019-01-07 11:59:41 -0600174 """Preparing roles mapping to support role inferences
175
176 Making query to `list-all-role-inference-rules`_ keystone API
177 returns all inference rules, which makes it possible to prepare
178 roles mapping.
179
180 It walks recursively through the raw data::
181
182 {"role_inferences": [
183 {
184 "implies": [{"id": "3", "name": "reader"}],
185 "prior_role": {"id": "2", "name": "member"}
186 },
187 {
188 "implies": [{"id": "2", "name": "member"}],
189 "prior_role": {"id": "1", "name": "admin"}
190 }
191 ]
192 }
193
194 and converts it to the mapping::
195
196 {
197 "2": ["3"], # "member": ["reader"],
198 "1": ["2", "3"] # "admin": ["member", "reader"]
199 }
200
201 .. _list-all-role-inference-rules: https://developer.openstack.org/api-ref/identity/v3/#list-all-role-inference-rules
202 """ # noqa: E501
203 def process_roles(role_id, data):
204 roles = data.get(role_id, set())
205 for rid in roles.copy():
206 roles.update(process_roles(rid, data))
207
208 return roles
209
210 def convert_data(data):
211 res = {}
212 for rule in data:
213 prior_role = rule['prior_role']['id']
214 implies = {r['id'] for r in rule['implies']}
215 res[prior_role] = implies
216 return res
217
Sergey Vilgelmace8ea32018-11-19 16:25:10 -0600218 raw_data = cls.admin_roles_client.list_all_role_inference_rules()
Sergey Vilgelm19e3bec2019-01-07 11:59:41 -0600219 data = convert_data(raw_data['role_inferences'])
220 res = {}
221 for role_id in data:
222 res[role_id] = process_roles(role_id, data)
223 return res
224
225 def get_all_needed_roles(self, roles):
226 """Extending given roles with roles from mapping
227
228 Examples::
229 ["admin"] >> ["admin", "member", "reader"]
230 ["member"] >> ["member", "reader"]
231 ["reader"] >> ["reader"]
232 ["custom_role"] >> ["custom_role"]
233
234 :param roles: list of roles
235 :return: extended list of roles
236 """
237 res = set(r for r in roles)
238 for role in res.copy():
Sergey Vilgelmace8ea32018-11-19 16:25:10 -0600239 role_id = self.__class__._role_map.get(role)
240 implied_roles = self.__class__._role_inferences_mapping.get(
241 role_id, set())
242 role_names = {self.__class__._role_map[rid]
243 for rid in implied_roles}
Sergey Vilgelm19e3bec2019-01-07 11:59:41 -0600244 res.update(role_names)
245 LOG.debug('All needed roles: %s; Base roles: %s', res, roles)
246 return list(res)
DavidPurcell029d8c32017-01-06 15:27:41 -0500247
Sergey Vilgelmbab9e942018-10-11 14:04:48 -0500248 @contextlib.contextmanager
Sergey Vilgelmd3d77ef2019-02-02 09:34:52 -0600249 def override_role(self):
Felipe Monteiro10e82fd2017-11-21 01:47:20 +0000250 """Override the role used by ``os_primary`` Tempest credentials.
251
252 Temporarily change the role used by ``os_primary`` credentials to:
Masayuki Igawa80b9aab2018-01-09 17:00:45 +0900253
Mykola Yakovlieve0f35502018-09-26 18:26:57 -0500254 * ``[patrole] rbac_test_roles`` before test execution
Masayuki Igawa80b9aab2018-01-09 17:00:45 +0900255 * ``[identity] admin_role`` after test execution
Felipe Monteiro10e82fd2017-11-21 01:47:20 +0000256
257 Automatically switches to admin role after test execution.
258
Felipe Monteiro10e82fd2017-11-21 01:47:20 +0000259 :returns: None
260
261 .. warning::
262
263 This function can alter user roles for pre-provisioned credentials.
264 Work is underway to safely clean up after this function.
265
266 Example::
267
268 @rbac_rule_validation.action(service='test',
Felipe Monteiro59f538f2018-08-22 23:34:40 -0400269 rules=['a:test:rule'])
Felipe Monteiro10e82fd2017-11-21 01:47:20 +0000270 def test_foo(self):
271 # Allocate test-level resources here.
Sergey Vilgelmd3d77ef2019-02-02 09:34:52 -0600272 with self.override_role():
melissaml7cd21612018-05-23 21:00:50 +0800273 # The role for `os_primary` has now been overridden. Within
Felipe Monteiro10e82fd2017-11-21 01:47:20 +0000274 # this block, call the API endpoint that enforces the
275 # expected policy specified by "rule" in the decorator.
276 self.foo_service.bar_api_call()
277 # The role is switched back to admin automatically. Note that
278 # if the API call above threw an exception, any code below this
279 # point in the test is not executed.
280 """
Sergey Vilgelmace8ea32018-11-19 16:25:10 -0600281 self._set_override_role_called()
282 self._override_role(True)
Felipe Monteiro10e82fd2017-11-21 01:47:20 +0000283 try:
284 # Execute the test.
285 yield
286 finally:
Mykola Yakovliev11376ab2018-08-06 15:34:22 -0500287 # Check whether an exception was raised. If so, remember that
288 # for future validation.
289 exc = sys.exc_info()[0]
290 if exc is not None:
Sergey Vilgelmace8ea32018-11-19 16:25:10 -0600291 self._set_override_role_caught_exc()
Felipe Monteiro10e82fd2017-11-21 01:47:20 +0000292 # This code block is always executed, no matter the result of the
293 # test. Automatically switch back to the admin role for test clean
294 # up.
Sergey Vilgelmace8ea32018-11-19 16:25:10 -0600295 self._override_role(False)
Felipe Monteiro10e82fd2017-11-21 01:47:20 +0000296
Sergey Vilgelmace8ea32018-11-19 16:25:10 -0600297 @classmethod
298 def _override_role(cls, toggle_rbac_role=False):
Felipe Monteiro10e82fd2017-11-21 01:47:20 +0000299 """Private helper for overriding ``os_primary`` Tempest credentials.
300
Felipe Monteiro10e82fd2017-11-21 01:47:20 +0000301 :param toggle_rbac_role: Boolean value that controls the role that
302 overrides default role of ``os_primary`` credentials.
303 * If True: role is set to ``[patrole] rbac_test_role``
304 * If False: role is set to ``[identity] admin_role``
305 """
Felipe Monteiro10e82fd2017-11-21 01:47:20 +0000306 LOG.debug('Overriding role to: %s.', toggle_rbac_role)
Mykola Yakovlieve0f35502018-09-26 18:26:57 -0500307 roles_already_present = False
Felipe Monteiro2693bf72017-08-12 22:56:47 +0100308
DavidPurcell029d8c32017-01-06 15:27:41 -0500309 try:
Sergey Vilgelmace8ea32018-11-19 16:25:10 -0600310 target_roles = (cls._rbac_role_ids
311 if toggle_rbac_role else [cls._admin_role_id])
312 roles_already_present = cls._list_and_clear_user_roles_on_project(
Mykola Yakovlieve0f35502018-09-26 18:26:57 -0500313 target_roles)
Felipe Monteiro2693bf72017-08-12 22:56:47 +0100314
Felipe Monteiro10e82fd2017-11-21 01:47:20 +0000315 # Do not override roles if `target_role` already exists.
Mykola Yakovlieve0f35502018-09-26 18:26:57 -0500316 if not roles_already_present:
Sergey Vilgelmace8ea32018-11-19 16:25:10 -0600317 cls._create_user_role_on_project(target_roles)
DavidPurcell029d8c32017-01-06 15:27:41 -0500318 except Exception as exp:
Felipe Monteiro2693bf72017-08-12 22:56:47 +0100319 with excutils.save_and_reraise_exception():
320 LOG.exception(exp)
Felipe Monteiro34a138c2017-03-02 17:01:37 -0500321 finally:
Sergey Vilgelmace8ea32018-11-19 16:25:10 -0600322 auth_providers = cls.get_auth_providers()
Mykola Yakovliev1d829782018-08-03 14:37:37 -0500323 for provider in auth_providers:
324 provider.clear_auth()
Felipe Monteiro7be94e82017-07-26 02:17:08 +0100325 # Fernet tokens are not subsecond aware so sleep to ensure we are
326 # passing the second boundary before attempting to authenticate.
Felipe Monteiro2693bf72017-08-12 22:56:47 +0100327 # Only sleep if a token revocation occurred as a result of role
Felipe Monteiro10e82fd2017-11-21 01:47:20 +0000328 # overriding. This will optimize test runtime in the case where
Mykola Yakovlieve0f35502018-09-26 18:26:57 -0500329 # ``[identity] admin_role`` == ``[patrole] rbac_test_roles``.
330 if not roles_already_present:
Rick Bartra89f498f2017-03-20 15:54:45 -0400331 time.sleep(1)
Mykola Yakovliev1d829782018-08-03 14:37:37 -0500332
333 for provider in auth_providers:
334 provider.set_auth()
Felipe Monteiro34a138c2017-03-02 17:01:37 -0500335
Sergey Vilgelmace8ea32018-11-19 16:25:10 -0600336 @classmethod
337 def _init_roles(cls):
338 available_roles = cls.admin_roles_client.list_roles()['roles']
339 cls._role_map = {r['name']: r['id'] for r in available_roles}
340 LOG.debug('Available roles: %s', cls._role_map.keys())
Felipe Monteirofa01d5f2017-04-01 06:18:25 +0100341
Mykola Yakovlieve0f35502018-09-26 18:26:57 -0500342 rbac_role_ids = []
343 roles = CONF.patrole.rbac_test_roles
344 # TODO(vegasq) drop once CONF.patrole.rbac_test_role is removed
345 if CONF.patrole.rbac_test_role:
346 if not roles:
347 roles.append(CONF.patrole.rbac_test_role)
Felipe Monteiro2693bf72017-08-12 22:56:47 +0100348
Mykola Yakovlieve0f35502018-09-26 18:26:57 -0500349 for role_name in roles:
Sergey Vilgelmace8ea32018-11-19 16:25:10 -0600350 rbac_role_ids.append(cls._role_map.get(role_name))
Mykola Yakovlieve0f35502018-09-26 18:26:57 -0500351
Sergey Vilgelmace8ea32018-11-19 16:25:10 -0600352 admin_role_id = cls._role_map.get(CONF.identity.admin_role)
Mykola Yakovlieve0f35502018-09-26 18:26:57 -0500353
354 if not all([admin_role_id, all(rbac_role_ids)]):
Felipe Monteiro2fc29292018-06-15 18:26:27 -0400355 missing_roles = []
Mykola Yakovlieve0f35502018-09-26 18:26:57 -0500356 msg = ("Could not find `[patrole] rbac_test_roles` or "
Felipe Monteiro2fc29292018-06-15 18:26:27 -0400357 "`[identity] admin_role`, both of which are required for "
358 "RBAC testing.")
359 if not admin_role_id:
360 missing_roles.append(CONF.identity.admin_role)
Mykola Yakovlieve0f35502018-09-26 18:26:57 -0500361 if not all(rbac_role_ids):
362 missing_roles += [role_name for role_name in roles
Sergey Vilgelmace8ea32018-11-19 16:25:10 -0600363 if role_name not in cls._role_map]
Mykola Yakovlieve0f35502018-09-26 18:26:57 -0500364
Felipe Monteiro2fc29292018-06-15 18:26:27 -0400365 msg += " Following roles were not found: %s." % (
366 ", ".join(missing_roles))
Sergey Vilgelmace8ea32018-11-19 16:25:10 -0600367 msg += " Available roles: %s." % ", ".join(cls._role_map)
Felipe Monteiro2693bf72017-08-12 22:56:47 +0100368 raise rbac_exceptions.RbacResourceSetupFailed(msg)
369
Sergey Vilgelmace8ea32018-11-19 16:25:10 -0600370 cls._admin_role_id = admin_role_id
371 cls._rbac_role_ids = rbac_role_ids
Sergey Vilgelm19e3bec2019-01-07 11:59:41 -0600372 # Adding backward mapping
Sergey Vilgelmace8ea32018-11-19 16:25:10 -0600373 cls._role_map.update({v: k for k, v in cls._role_map.items()})
Felipe Monteiro2693bf72017-08-12 22:56:47 +0100374
Sergey Vilgelmace8ea32018-11-19 16:25:10 -0600375 @classmethod
376 def _create_user_role_on_project(cls, role_ids):
Mykola Yakovlieve0f35502018-09-26 18:26:57 -0500377 for role_id in role_ids:
Sergey Vilgelmace8ea32018-11-19 16:25:10 -0600378 cls.admin_roles_client.create_user_role_on_project(
379 cls._project_id, cls._user_id, role_id)
Felipe Monteirofa01d5f2017-04-01 06:18:25 +0100380
Sergey Vilgelmace8ea32018-11-19 16:25:10 -0600381 @classmethod
382 def _list_and_clear_user_roles_on_project(cls, role_ids):
383 roles = cls.admin_roles_client.list_user_roles_on_project(
384 cls._project_id, cls._user_id)['roles']
Mykola Yakovlieve0f35502018-09-26 18:26:57 -0500385 all_role_ids = [role['id'] for role in roles]
Felipe Monteiro2693bf72017-08-12 22:56:47 +0100386
Mykola Yakovlieve0f35502018-09-26 18:26:57 -0500387 # NOTE(felipemonteiro): We do not use ``role_id in all_role_ids`` here
388 # to avoid over-permission errors: if the current list of roles on the
Felipe Monteiro2693bf72017-08-12 22:56:47 +0100389 # project includes "admin" and "Member", and we are switching to the
390 # "Member" role, then we must delete the "admin" role. Thus, we only
391 # return early if the user's roles on the project are an exact match.
Mykola Yakovlieve0f35502018-09-26 18:26:57 -0500392 if set(role_ids) == set(all_role_ids):
Felipe Monteirofa01d5f2017-04-01 06:18:25 +0100393 return True
Felipe Monteirob3b7bc82017-03-03 15:58:15 -0500394
395 for role in roles:
Sergey Vilgelmace8ea32018-11-19 16:25:10 -0600396 cls.admin_roles_client.delete_role_from_user_on_project(
397 cls._project_id, cls._user_id, role['id'])
Felipe Monteirofa01d5f2017-04-01 06:18:25 +0100398
399 return False
400
Sergey Vilgelmbab9e942018-10-11 14:04:48 -0500401 @contextlib.contextmanager
Sergey Vilgelmd3d77ef2019-02-02 09:34:52 -0600402 def override_role_and_validate_list(self,
Sergey Vilgelmace8ea32018-11-19 16:25:10 -0600403 admin_resources=None,
Sergey Vilgelmbab9e942018-10-11 14:04:48 -0500404 admin_resource_id=None):
405 """Call ``override_role`` and validate RBAC for a list API action.
406
407 List actions usually do soft authorization: partial or empty response
408 bodies are returned instead of exceptions. This helper validates
409 that unauthorized roles only return a subset of the available
410 resources.
411 Should only be used for validating list API actions.
412
413 :param test_obj: Instance of ``tempest.test.BaseTestCase``.
414 :param list admin_resources: The list of resources received before
415 calling the ``override_role_and_validate_list`` function.
416 :param UUID admin_resource_id: An ID of a resource created before
417 calling the ``override_role_and_validate_list`` function.
418 :return: py:class:`_ValidateListContext` object.
419
420 Example::
421
422 # the resource created by admin
423 admin_resource_id = (
424 self.ntp_client.create_dscp_marking_rule()
425 ["dscp_marking_rule"]["id'])
Sergey Vilgelmd3d77ef2019-02-02 09:34:52 -0600426 with self.override_role_and_validate_list(
427 admin_resource_id=admin_resource_id) as ctx:
Sergey Vilgelmbab9e942018-10-11 14:04:48 -0500428 # the list of resources available for member role
429 ctx.resources = self.ntp_client.list_dscp_marking_rules(
430 policy_id=self.policy_id)["dscp_marking_rules"]
431 """
432 ctx = _ValidateListContext(admin_resources, admin_resource_id)
Sergey Vilgelmace8ea32018-11-19 16:25:10 -0600433 with self.override_role():
Sergey Vilgelmbab9e942018-10-11 14:04:48 -0500434 yield ctx
435 ctx._validate()
436
Felipe Monteiro07a1c172017-12-10 04:26:08 +0000437 @classmethod
Mykola Yakovliev1d829782018-08-03 14:37:37 -0500438 def get_auth_providers(cls):
439 """Returns list of auth_providers used within test.
440
441 Tests may redefine this method to include their own or third party
442 client auth_providers.
443 """
444 return [cls.os_primary.auth_provider]
445
Mykola Yakovliev11376ab2018-08-06 15:34:22 -0500446 def _set_override_role_called(self):
447 """Helper for tracking whether ``override_role`` was called."""
448 self.__override_role_called = True
449
450 def _set_override_role_caught_exc(self):
451 """Helper for tracking whether exception was thrown inside
452 ``override_role``.
453 """
454 self.__override_role_caught_exc = True
455
456 def _validate_override_role_called(self):
457 """Idempotently validate that ``override_role`` is called and reset
458 its value to False for sequential tests.
459 """
460 was_called = self.__override_role_called
461 self.__override_role_called = False
462 return was_called
463
464 def _validate_override_role_caught_exc(self):
465 """Idempotently validate that exception was caught inside
466 ``override_role``, so that, by process of elimination, it can be
467 determined whether one was thrown outside (which is invalid).
468 """
469 caught_exception = self.__override_role_caught_exc
470 self.__override_role_caught_exc = False
471 return caught_exception
472
Felipe Monteiro07a1c172017-12-10 04:26:08 +0000473
Felipe Monteiro8a043fb2017-08-06 06:29:05 +0100474def is_admin():
475 """Verifies whether the current test role equals the admin role.
476
Mykola Yakovlieve0f35502018-09-26 18:26:57 -0500477 :returns: True if ``rbac_test_roles`` contain the admin role.
Felipe Monteiro8a043fb2017-08-06 06:29:05 +0100478 """
Mykola Yakovlieve0f35502018-09-26 18:26:57 -0500479 roles = CONF.patrole.rbac_test_roles
480 # TODO(vegasq) drop once CONF.patrole.rbac_test_role is removed
481 if CONF.patrole.rbac_test_role:
482 roles.append(CONF.patrole.rbac_test_role)
483 roles = list(set(roles))
484
Felipe Monteiro2fc29292018-06-15 18:26:27 -0400485 # TODO(felipemonteiro): Make this more robust via a context is admin
486 # lookup.
Mykola Yakovlieve0f35502018-09-26 18:26:57 -0500487 return CONF.identity.admin_role in roles