blob: b24e5c43bf89dc34b773a21c131dd12ba1935430 [file] [log] [blame]
Michael Johnson6006de72021-02-21 01:42:39 +00001# Copyright 2021 Red Hat, Inc. All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain
5# a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations
13# under the License.
14
15import copy
16
17from oslo_log import log as logging
18from tempest import config
19from tempest.lib import exceptions
20from tempest import test
21
22from octavia_tempest_plugin.common import constants
23from octavia_tempest_plugin.tests import waiters
24
25CONF = config.CONF
26LOG = logging.getLogger(__name__)
27
28
29class RBACTestsMixin(test.BaseTestCase):
30
31 def _check_allowed(self, client_str, method_str, allowed_list,
32 *args, **kwargs):
33 """Test an API call allowed RBAC enforcement.
34
35 :param client_str: The service client to use for the test, without the
36 credential. Example: 'amphora_client'
37 :param method_str: The method on the client to call for the test.
38 Example: 'list_amphorae'
39 :param allowed_list: The list of credentials expected to be
40 allowed. Example: ['os_roles_lb_member'].
41 :param args: Any positional parameters needed by the method.
42 :param kwargs: Any named parameters needed by the method.
43 :raises AssertionError: Raised if the RBAC tests fail.
44 :raises Forbidden: Raised if a credential that should have access does
45 not and is denied.
46 :raises InvalidScope: Raised if a credential that should have the
47 correct scope for access is denied.
48 :returns: None on success
49 """
50 for cred in allowed_list:
51 try:
52 cred_obj = getattr(self, cred)
53 except AttributeError:
54 # TODO(johnsom) Remove once scoped tokens is the default.
55 if ((cred == 'os_system_admin' or cred == 'os_system_reader')
56 and not CONF.enforce_scope.octavia):
57 LOG.info('Skipping %s allowed RBAC test because '
58 'enforce_scope.octavia is not True', cred)
59 continue
60 else:
61 self.fail('Credential {} "expected_allowed" for RBAC '
62 'testing was not created by tempest '
63 'credentials setup. This is likely a bug in the '
64 'test.'.format(cred))
65 client = getattr(cred_obj, client_str)
66 method = getattr(client, method_str)
67 try:
68 method(*args, **kwargs)
69 except exceptions.Forbidden as e:
70 self.fail('Method {}.{} failed to allow access via RBAC using '
71 'credential {}. Error: {}'.format(
72 client_str, method_str, cred, str(e)))
73
74 def _check_disallowed(self, client_str, method_str, allowed_list,
75 status_method=None, obj_id=None, *args, **kwargs):
76 """Test an API call disallowed RBAC enforcement.
77
78 :param client_str: The service client to use for the test, without the
79 credential. Example: 'amphora_client'
80 :param method_str: The method on the client to call for the test.
81 Example: 'list_amphorae'
82 :param allowed_list: The list of credentials expected to be
83 allowed. Example: ['os_roles_lb_member'].
84 :param status_method: The service client method that will provide
85 the object status for a status change waiter.
86 :param obj_id: The ID of the object to check for the expected status
87 update.
88 :param args: Any positional parameters needed by the method.
89 :param kwargs: Any named parameters needed by the method.
90 :raises AssertionError: Raised if the RBAC tests fail.
91 :raises Forbidden: Raised if a credential that should have access does
92 not and is denied.
93 :raises InvalidScope: Raised if a credential that should have the
94 correct scope for access is denied.
95 :returns: None on success
96 """
97 expected_disallowed = (set(self.allocated_credentials) -
98 set(allowed_list))
99 for cred in expected_disallowed:
100 cred_obj = getattr(self, cred)
101 client = getattr(cred_obj, client_str)
102 method = getattr(client, method_str)
103
104 # Unfortunately tempest uses testtools assertRaises[1] which means
105 # we cannot use the unittest assertRaises context[2] with msg= to
106 # give a useful error.
107 # Also, testtools doesn't work with subTest[3], so we can't use
108 # that to expose the failing credential.
109 # This all means the exception raised testtools assertRaises
110 # is less than useful.
111 # TODO(johnsom) Remove this try block once testtools is useful.
112 # [1] https://testtools.readthedocs.io/en/latest/
113 # api.html#testtools.TestCase.assertRaises
114 # [2] https://docs.python.org/3/library/
115 # unittest.html#unittest.TestCase.assertRaises
116 # [3] https://github.com/testing-cabal/testtools/issues/235
117 try:
118 method(*args, **kwargs)
119 except exceptions.Forbidden:
120 if status_method:
121 waiters.wait_for_status(
122 status_method, obj_id,
123 constants.PROVISIONING_STATUS, constants.ACTIVE,
124 CONF.load_balancer.check_interval,
125 CONF.load_balancer.check_timeout)
126
127 continue
128 self.fail('Method {}.{} failed to deny access via RBAC using '
129 'credential {}.'.format(client_str, method_str, cred))
130
131 def _list_get_RBAC_enforcement(self, client_str, method_str,
132 expected_allowed, *args, **kwargs):
133 """Test an API call RBAC enforcement.
134
135 :param client_str: The service client to use for the test, without the
136 credential. Example: 'amphora_client'
137 :param method_str: The method on the client to call for the test.
138 Example: 'list_amphorae'
139 :param expected_allowed: The list of credentials expected to be
140 allowed. Example: ['os_roles_lb_member'].
141 :param args: Any positional parameters needed by the method.
142 :param kwargs: Any named parameters needed by the method.
143 :raises AssertionError: Raised if the RBAC tests fail.
144 :raises Forbidden: Raised if a credential that should have access does
145 not and is denied.
146 :raises InvalidScope: Raised if a credential that should have the
147 correct scope for access is denied.
148 :returns: None on success
149 """
150
151 allowed_list = copy.deepcopy(expected_allowed)
152 # os_admin is a special case as it is valid with the old defaults,
153 # but will not be with the new defaults and/or token scoping.
154 # The old keystone role "admin" becomes project scoped "admin"
155 # instead of being a global admin.
156 # To keep the tests simple, handle that edge case here.
157 # TODO(johnsom) Once token scope is default, remove this.
158 if ('os_system_admin' in expected_allowed and
159 not CONF.load_balancer.enforce_new_defaults and
160 not CONF.enforce_scope.octavia):
161 allowed_list.append('os_admin')
162
163 # #### Test that disallowed credentials cannot access the API.
164 self._check_disallowed(client_str, method_str, allowed_list,
165 None, None, *args, **kwargs)
166
167 # #### Test that allowed credentials can access the API.
168 self._check_allowed(client_str, method_str, allowed_list,
169 *args, **kwargs)
170
171 def check_show_RBAC_enforcement(self, client_str, method_str,
172 expected_allowed, *args, **kwargs):
173 """Test an API show call RBAC enforcement.
174
175 :param client_str: The service client to use for the test, without the
176 credential. Example: 'amphora_client'
177 :param method_str: The method on the client to call for the test.
178 Example: 'list_amphorae'
179 :param expected_allowed: The list of credentials expected to be
180 allowed. Example: ['os_roles_lb_member'].
181 :param args: Any positional parameters needed by the method.
182 :param kwargs: Any named parameters needed by the method.
183 :raises AssertionError: Raised if the RBAC tests fail.
184 :raises Forbidden: Raised if a credential that should have access does
185 not and is denied.
186 :raises InvalidScope: Raised if a credential that should have the
187 correct scope for access is denied.
188 :returns: None on success
189 """
190 self._list_get_RBAC_enforcement(client_str, method_str,
191 expected_allowed, *args, **kwargs)
192
193 def check_list_RBAC_enforcement(self, client_str, method_str,
194 expected_allowed, *args, **kwargs):
195 """Test an API list call RBAC enforcement.
196
197 :param client_str: The service client to use for the test, without the
198 credential. Example: 'amphora_client'
199 :param method_str: The method on the client to call for the test.
200 Example: 'list_amphorae'
201 :param expected_allowed: The list of credentials expected to be
202 allowed. Example: ['os_roles_lb_member'].
203 :param args: Any positional parameters needed by the method.
204 :param kwargs: Any named parameters needed by the method.
205 :raises AssertionError: Raised if the RBAC tests fail.
206 :raises Forbidden: Raised if a credential that should have access does
207 not and is denied.
208 :raises InvalidScope: Raised if a credential that should have the
209 correct scope for access is denied.
210 :returns: None on success
211 """
212 self._list_get_RBAC_enforcement(client_str, method_str,
213 expected_allowed, *args, **kwargs)
214
215 def _CUD_RBAC_enforcement(self, client_str, method_str, expected_allowed,
216 status_method=None, obj_id=None,
217 *args, **kwargs):
218 """Test an API create/update/delete call RBAC enforcement.
219
220 :param client_str: The service client to use for the test, without the
221 credential. Example: 'amphora_client'
222 :param method_str: The method on the client to call for the test.
223 Example: 'list_amphorae'
224 :param expected_allowed: The list of credentials expected to be
225 allowed. Example: ['os_roles_lb_member'].
226 :param status_method: The service client method that will provide
227 the object status for a status change waiter.
228 :param obj_id: The ID of the object to check for the expected status
229 update.
230 :param args: Any positional parameters needed by the method.
231 :param kwargs: Any named parameters needed by the method.
232 :raises AssertionError: Raised if the RBAC tests fail.
233 :raises Forbidden: Raised if a credential that should have access does
234 not and is denied.
235 :raises InvalidScope: Raised if a credential that should have the
236 correct scope for access is denied.
237 :returns: None on success
238 """
239
240 allowed_list = copy.deepcopy(expected_allowed)
241 # os_admin is a special case as it is valid with the old defaults,
242 # but will not be with the new defaults and/or token scoping.
243 # The old keystone role "admin" becomes project scoped "admin"
244 # instead of being a global admin.
245 # To keep the tests simple, handle that edge case here.
246 # TODO(johnsom) Once token scope is default, remove this.
247 if ('os_system_admin' in expected_allowed and
248 not CONF.load_balancer.enforce_new_defaults and
249 not CONF.enforce_scope.octavia):
250 allowed_list.append('os_admin')
251
252 # #### Test that disallowed credentials cannot access the API.
253 self._check_disallowed(client_str, method_str, allowed_list,
254 status_method, obj_id, *args, **kwargs)
255
256 def check_create_RBAC_enforcement(
257 self, client_str, method_str, expected_allowed,
258 status_method=None, obj_id=None, *args, **kwargs):
259 """Test an API create call RBAC enforcement.
260
261 :param client_str: The service client to use for the test, without the
262 credential. Example: 'amphora_client'
263 :param method_str: The method on the client to call for the test.
264 Example: 'list_amphorae'
265 :param expected_allowed: The list of credentials expected to be
266 allowed. Example: ['os_roles_lb_member'].
267 :param status_method: The service client method that will provide
268 the object status for a status change waiter.
269 :param obj_id: The ID of the object to check for the expected status
270 update.
271 :param args: Any positional parameters needed by the method.
272 :param kwargs: Any named parameters needed by the method.
273 :raises AssertionError: Raised if the RBAC tests fail.
274 :raises Forbidden: Raised if a credential that should have access does
275 not and is denied.
276 :raises InvalidScope: Raised if a credential that should have the
277 correct scope for access is denied.
278 :returns: None on success
279 """
280 self._CUD_RBAC_enforcement(client_str, method_str, expected_allowed,
281 status_method, obj_id, *args, **kwargs)
282
283 def check_delete_RBAC_enforcement(
284 self, client_str, method_str, expected_allowed,
285 status_method=None, obj_id=None, *args, **kwargs):
286 """Test an API delete call RBAC enforcement.
287
288 :param client_str: The service client to use for the test, without the
289 credential. Example: 'amphora_client'
290 :param method_str: The method on the client to call for the test.
291 Example: 'list_amphorae'
292 :param expected_allowed: The list of credentials expected to be
293 allowed. Example: ['os_roles_lb_member'].
294 :param status_method: The service client method that will provide
295 the object status for a status change waiter.
296 :param obj_id: The ID of the object to check for the expected status
297 update.
298 :param args: Any positional parameters needed by the method.
299 :param kwargs: Any named parameters needed by the method.
300 :raises AssertionError: Raised if the RBAC tests fail.
301 :raises Forbidden: Raised if a credential that should have access does
302 not and is denied.
303 :raises InvalidScope: Raised if a credential that should have the
304 correct scope for access is denied.
305 :returns: None on success
306 """
307 self._CUD_RBAC_enforcement(client_str, method_str, expected_allowed,
308 status_method, obj_id, *args, **kwargs)
309
310 def check_update_RBAC_enforcement(
311 self, client_str, method_str, expected_allowed,
312 status_method=None, obj_id=None, *args, **kwargs):
313 """Test an API update call RBAC enforcement.
314
315 :param client_str: The service client to use for the test, without the
316 credential. Example: 'amphora_client'
317 :param method_str: The method on the client to call for the test.
318 Example: 'list_amphorae'
319 :param expected_allowed: The list of credentials expected to be
320 allowed. Example: ['os_roles_lb_member'].
321 :param status_method: The service client method that will provide
322 the object status for a status change waiter.
323 :param obj_id: The ID of the object to check for the expected status
324 update.
325 :param args: Any positional parameters needed by the method.
326 :param kwargs: Any named parameters needed by the method.
327 :raises AssertionError: Raised if the RBAC tests fail.
328 :raises Forbidden: Raised if a credential that should have access does
329 not and is denied.
330 :raises InvalidScope: Raised if a credential that should have the
331 correct scope for access is denied.
332 :returns: None on success
333 """
334 self._CUD_RBAC_enforcement(client_str, method_str, expected_allowed,
335 status_method, obj_id, *args, **kwargs)
336
337 def check_list_RBAC_enforcement_count(
338 self, client_str, method_str, expected_allowed, expected_count,
339 *args, **kwargs):
340 """Test an API list call RBAC enforcement result count.
341
342 List APIs will return the object list for the project associated
343 with the token used to access the API. This means most credentials
344 will have access, but will get differing results.
345
346 This test will query the list API using a list of credentials and
347 will validate that only the expected count of results are returned.
348
349 :param client_str: The service client to use for the test, without the
350 credential. Example: 'amphora_client'
351 :param method_str: The method on the client to call for the test.
352 Example: 'list_amphorae'
353 :param expected_allowed: The list of credentials expected to be
354 allowed. Example: ['os_roles_lb_member'].
355 :param expected_count: The number of results expected in the list
356 returned from the API.
357 :param args: Any positional parameters needed by the method.
358 :param kwargs: Any named parameters needed by the method.
359 :raises AssertionError: Raised if the RBAC tests fail.
360 :raises Forbidden: Raised if a credential that should have access does
361 not and is denied.
362 :raises InvalidScope: Raised if a credential that should have the
363 correct scope for access is denied.
364 :returns: None on success
365 """
366
367 allowed_list = copy.deepcopy(expected_allowed)
368 # os_admin is a special case as it is valid with the old defaults,
369 # but will not be with the new defaults and/or token scoping.
370 # The old keystone role "admin" becomes project scoped "admin"
371 # instead of being a global admin.
372 # To keep the tests simple, handle that edge case here.
373 # TODO(johnsom) Once token scope is default, remove this.
374 if ('os_system_admin' in expected_allowed and
375 not CONF.load_balancer.enforce_new_defaults and
376 not CONF.enforce_scope.octavia):
377 allowed_list.append('os_admin')
378
379 for cred in allowed_list:
380 try:
381 cred_obj = getattr(self, cred)
382 except AttributeError:
383 # TODO(johnsom) Remove once scoped tokens is the default.
384 if ((cred == 'os_system_admin' or cred == 'os_system_reader')
385 and not CONF.enforce_scope.octavia):
386 LOG.info('Skipping %s allowed RBAC test because '
387 'enforce_scope.octavia is not True', cred)
388 continue
389 else:
390 self.fail('Credential {} "expected_allowed" for RBAC '
391 'testing was not created by tempest '
392 'credentials setup. This is likely a bug in the '
393 'test.'.format(cred))
394 client = getattr(cred_obj, client_str)
395 method = getattr(client, method_str)
396 try:
397 result = method(*args, **kwargs)
398 except exceptions.Forbidden as e:
399 self.fail('Method {}.{} failed to allow access via RBAC using '
400 'credential {}. Error: {}'.format(
401 client_str, method_str, cred, str(e)))
402 self.assertEqual(expected_count, len(result), message='Credential '
403 '{} saw {} objects when {} was expected.'.format(
404 cred, len(result), expected_count))
405
406 def check_list_IDs_RBAC_enforcement(
407 self, client_str, method_str, expected_allowed, expected_ids,
408 *args, **kwargs):
409 """Test an API list call RBAC enforcement result contains IDs.
410
411 List APIs will return the object list for the project associated
412 with the token used to access the API. This means most credentials
413 will have access, but will get differing results.
414
415 This test will query the list API using a list of credentials and
416 will validate that the expected object Ids in included in the results.
417
418 :param client_str: The service client to use for the test, without the
419 credential. Example: 'amphora_client'
420 :param method_str: The method on the client to call for the test.
421 Example: 'list_amphorae'
422 :param expected_allowed: The list of credentials expected to be
423 allowed. Example: ['os_roles_lb_member'].
424 :param expected_ids: The list of object IDs to validate are included
425 in the returned list from the API.
426 :param args: Any positional parameters needed by the method.
427 :param kwargs: Any named parameters needed by the method.
428 :raises AssertionError: Raised if the RBAC tests fail.
429 :raises Forbidden: Raised if a credential that should have access does
430 not and is denied.
431 :raises InvalidScope: Raised if a credential that should have the
432 correct scope for access is denied.
433 :returns: None on success
434 """
435
436 allowed_list = copy.deepcopy(expected_allowed)
437 # os_admin is a special case as it is valid with the old defaults,
438 # but will not be with the new defaults and/or token scoping.
439 # The old keystone role "admin" becomes project scoped "admin"
440 # instead of being a global admin.
441 # To keep the tests simple, handle that edge case here.
442 # TODO(johnsom) Once token scope is default, remove this.
443 if ('os_system_admin' in expected_allowed and
444 not CONF.load_balancer.enforce_new_defaults and
445 not CONF.enforce_scope.octavia):
446 allowed_list.append('os_admin')
447
448 for cred in allowed_list:
449 try:
450 cred_obj = getattr(self, cred)
451 except AttributeError:
452 # TODO(johnsom) Remove once scoped tokens is the default.
453 if ((cred == 'os_system_admin' or cred == 'os_system_reader')
454 and not CONF.enforce_scope.octavia):
455 LOG.info('Skipping %s allowed RBAC test because '
456 'enforce_scope.octavia is not True', cred)
457 continue
458 else:
459 self.fail('Credential {} "expected_allowed" for RBAC '
460 'testing was not created by tempest '
461 'credentials setup. This is likely a bug in the '
462 'test.'.format(cred))
463 client = getattr(cred_obj, client_str)
464 method = getattr(client, method_str)
465 try:
466 result = method(*args, **kwargs)
467 except exceptions.Forbidden as e:
468 self.fail('Method {}.{} failed to allow access via RBAC using '
469 'credential {}. Error: {}'.format(
470 client_str, method_str, cred, str(e)))
471 result_ids = [lb[constants.ID] for lb in result]
472 self.assertTrue(set(expected_ids).issubset(set(result_ids)))