blob: d31d506086b1265e93e633816997a29a4d3c7086 [file] [log] [blame]
Michael Johnson6006de72021-02-21 01:42:39 +00001# Copyright 2021 Red Hat, Inc. All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain
5# a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations
13# under the License.
14
15import copy
16
17from oslo_log import log as logging
18from tempest import config
19from tempest.lib import exceptions
20from tempest import test
21
22from octavia_tempest_plugin.common import constants
23from octavia_tempest_plugin.tests import waiters
24
25CONF = config.CONF
26LOG = logging.getLogger(__name__)
27
28
29class RBACTestsMixin(test.BaseTestCase):
30
Michael Johnson29d8e612021-06-23 16:16:12 +000031 def _get_client_method(self, cred_obj, client_str, method_str):
32 """Get requested method from registered clients in Tempest."""
33 lb_clients = getattr(cred_obj, 'load_balancer_v2')
34 client = getattr(lb_clients, client_str)
35 client_obj = client()
36 method = getattr(client_obj, method_str)
37 return method
38
Michael Johnson6006de72021-02-21 01:42:39 +000039 def _check_allowed(self, client_str, method_str, allowed_list,
40 *args, **kwargs):
41 """Test an API call allowed RBAC enforcement.
42
43 :param client_str: The service client to use for the test, without the
Michael Johnson29d8e612021-06-23 16:16:12 +000044 credential. Example: 'AmphoraClient'
Michael Johnson6006de72021-02-21 01:42:39 +000045 :param method_str: The method on the client to call for the test.
46 Example: 'list_amphorae'
47 :param allowed_list: The list of credentials expected to be
48 allowed. Example: ['os_roles_lb_member'].
49 :param args: Any positional parameters needed by the method.
50 :param kwargs: Any named parameters needed by the method.
51 :raises AssertionError: Raised if the RBAC tests fail.
52 :raises Forbidden: Raised if a credential that should have access does
53 not and is denied.
54 :raises InvalidScope: Raised if a credential that should have the
55 correct scope for access is denied.
56 :returns: None on success
57 """
58 for cred in allowed_list:
59 try:
60 cred_obj = getattr(self, cred)
61 except AttributeError:
62 # TODO(johnsom) Remove once scoped tokens is the default.
63 if ((cred == 'os_system_admin' or cred == 'os_system_reader')
64 and not CONF.enforce_scope.octavia):
65 LOG.info('Skipping %s allowed RBAC test because '
66 'enforce_scope.octavia is not True', cred)
67 continue
68 else:
69 self.fail('Credential {} "expected_allowed" for RBAC '
70 'testing was not created by tempest '
71 'credentials setup. This is likely a bug in the '
72 'test.'.format(cred))
Michael Johnson29d8e612021-06-23 16:16:12 +000073 method = self._get_client_method(cred_obj, client_str, method_str)
Michael Johnson6006de72021-02-21 01:42:39 +000074 try:
75 method(*args, **kwargs)
76 except exceptions.Forbidden as e:
77 self.fail('Method {}.{} failed to allow access via RBAC using '
78 'credential {}. Error: {}'.format(
79 client_str, method_str, cred, str(e)))
80
81 def _check_disallowed(self, client_str, method_str, allowed_list,
82 status_method=None, obj_id=None, *args, **kwargs):
83 """Test an API call disallowed RBAC enforcement.
84
85 :param client_str: The service client to use for the test, without the
Michael Johnson29d8e612021-06-23 16:16:12 +000086 credential. Example: 'AmphoraClient'
Michael Johnson6006de72021-02-21 01:42:39 +000087 :param method_str: The method on the client to call for the test.
88 Example: 'list_amphorae'
89 :param allowed_list: The list of credentials expected to be
90 allowed. Example: ['os_roles_lb_member'].
91 :param status_method: The service client method that will provide
92 the object status for a status change waiter.
93 :param obj_id: The ID of the object to check for the expected status
94 update.
95 :param args: Any positional parameters needed by the method.
96 :param kwargs: Any named parameters needed by the method.
97 :raises AssertionError: Raised if the RBAC tests fail.
98 :raises Forbidden: Raised if a credential that should have access does
99 not and is denied.
100 :raises InvalidScope: Raised if a credential that should have the
101 correct scope for access is denied.
102 :returns: None on success
103 """
104 expected_disallowed = (set(self.allocated_credentials) -
105 set(allowed_list))
106 for cred in expected_disallowed:
107 cred_obj = getattr(self, cred)
Michael Johnson29d8e612021-06-23 16:16:12 +0000108 method = self._get_client_method(cred_obj, client_str, method_str)
Michael Johnson6006de72021-02-21 01:42:39 +0000109
110 # Unfortunately tempest uses testtools assertRaises[1] which means
111 # we cannot use the unittest assertRaises context[2] with msg= to
112 # give a useful error.
113 # Also, testtools doesn't work with subTest[3], so we can't use
114 # that to expose the failing credential.
115 # This all means the exception raised testtools assertRaises
116 # is less than useful.
117 # TODO(johnsom) Remove this try block once testtools is useful.
118 # [1] https://testtools.readthedocs.io/en/latest/
119 # api.html#testtools.TestCase.assertRaises
120 # [2] https://docs.python.org/3/library/
121 # unittest.html#unittest.TestCase.assertRaises
122 # [3] https://github.com/testing-cabal/testtools/issues/235
123 try:
124 method(*args, **kwargs)
125 except exceptions.Forbidden:
126 if status_method:
127 waiters.wait_for_status(
128 status_method, obj_id,
129 constants.PROVISIONING_STATUS, constants.ACTIVE,
130 CONF.load_balancer.check_interval,
131 CONF.load_balancer.check_timeout)
132
133 continue
134 self.fail('Method {}.{} failed to deny access via RBAC using '
135 'credential {}.'.format(client_str, method_str, cred))
136
137 def _list_get_RBAC_enforcement(self, client_str, method_str,
138 expected_allowed, *args, **kwargs):
139 """Test an API call RBAC enforcement.
140
141 :param client_str: The service client to use for the test, without the
Michael Johnson29d8e612021-06-23 16:16:12 +0000142 credential. Example: 'AmphoraClient'
Michael Johnson6006de72021-02-21 01:42:39 +0000143 :param method_str: The method on the client to call for the test.
144 Example: 'list_amphorae'
145 :param expected_allowed: The list of credentials expected to be
146 allowed. Example: ['os_roles_lb_member'].
147 :param args: Any positional parameters needed by the method.
148 :param kwargs: Any named parameters needed by the method.
149 :raises AssertionError: Raised if the RBAC tests fail.
150 :raises Forbidden: Raised if a credential that should have access does
151 not and is denied.
152 :raises InvalidScope: Raised if a credential that should have the
153 correct scope for access is denied.
154 :returns: None on success
155 """
156
157 allowed_list = copy.deepcopy(expected_allowed)
158 # os_admin is a special case as it is valid with the old defaults,
159 # but will not be with the new defaults and/or token scoping.
160 # The old keystone role "admin" becomes project scoped "admin"
161 # instead of being a global admin.
162 # To keep the tests simple, handle that edge case here.
163 # TODO(johnsom) Once token scope is default, remove this.
164 if ('os_system_admin' in expected_allowed and
165 not CONF.load_balancer.enforce_new_defaults and
166 not CONF.enforce_scope.octavia):
167 allowed_list.append('os_admin')
168
169 # #### Test that disallowed credentials cannot access the API.
170 self._check_disallowed(client_str, method_str, allowed_list,
171 None, None, *args, **kwargs)
172
173 # #### Test that allowed credentials can access the API.
174 self._check_allowed(client_str, method_str, allowed_list,
175 *args, **kwargs)
176
177 def check_show_RBAC_enforcement(self, client_str, method_str,
178 expected_allowed, *args, **kwargs):
179 """Test an API show call RBAC enforcement.
180
181 :param client_str: The service client to use for the test, without the
Michael Johnson29d8e612021-06-23 16:16:12 +0000182 credential. Example: 'AmphoraClient'
Michael Johnson6006de72021-02-21 01:42:39 +0000183 :param method_str: The method on the client to call for the test.
184 Example: 'list_amphorae'
185 :param expected_allowed: The list of credentials expected to be
186 allowed. Example: ['os_roles_lb_member'].
187 :param args: Any positional parameters needed by the method.
188 :param kwargs: Any named parameters needed by the method.
189 :raises AssertionError: Raised if the RBAC tests fail.
190 :raises Forbidden: Raised if a credential that should have access does
191 not and is denied.
192 :raises InvalidScope: Raised if a credential that should have the
193 correct scope for access is denied.
194 :returns: None on success
195 """
196 self._list_get_RBAC_enforcement(client_str, method_str,
197 expected_allowed, *args, **kwargs)
198
199 def check_list_RBAC_enforcement(self, client_str, method_str,
200 expected_allowed, *args, **kwargs):
201 """Test an API list call RBAC enforcement.
202
203 :param client_str: The service client to use for the test, without the
Michael Johnson29d8e612021-06-23 16:16:12 +0000204 credential. Example: 'AmphoraClient'
Michael Johnson6006de72021-02-21 01:42:39 +0000205 :param method_str: The method on the client to call for the test.
206 Example: 'list_amphorae'
207 :param expected_allowed: The list of credentials expected to be
208 allowed. Example: ['os_roles_lb_member'].
209 :param args: Any positional parameters needed by the method.
210 :param kwargs: Any named parameters needed by the method.
211 :raises AssertionError: Raised if the RBAC tests fail.
212 :raises Forbidden: Raised if a credential that should have access does
213 not and is denied.
214 :raises InvalidScope: Raised if a credential that should have the
215 correct scope for access is denied.
216 :returns: None on success
217 """
218 self._list_get_RBAC_enforcement(client_str, method_str,
219 expected_allowed, *args, **kwargs)
220
221 def _CUD_RBAC_enforcement(self, client_str, method_str, expected_allowed,
222 status_method=None, obj_id=None,
223 *args, **kwargs):
224 """Test an API create/update/delete call RBAC enforcement.
225
226 :param client_str: The service client to use for the test, without the
Michael Johnson29d8e612021-06-23 16:16:12 +0000227 credential. Example: 'AmphoraClient'
Michael Johnson6006de72021-02-21 01:42:39 +0000228 :param method_str: The method on the client to call for the test.
229 Example: 'list_amphorae'
230 :param expected_allowed: The list of credentials expected to be
231 allowed. Example: ['os_roles_lb_member'].
232 :param status_method: The service client method that will provide
233 the object status for a status change waiter.
234 :param obj_id: The ID of the object to check for the expected status
235 update.
236 :param args: Any positional parameters needed by the method.
237 :param kwargs: Any named parameters needed by the method.
238 :raises AssertionError: Raised if the RBAC tests fail.
239 :raises Forbidden: Raised if a credential that should have access does
240 not and is denied.
241 :raises InvalidScope: Raised if a credential that should have the
242 correct scope for access is denied.
243 :returns: None on success
244 """
245
246 allowed_list = copy.deepcopy(expected_allowed)
247 # os_admin is a special case as it is valid with the old defaults,
248 # but will not be with the new defaults and/or token scoping.
249 # The old keystone role "admin" becomes project scoped "admin"
250 # instead of being a global admin.
251 # To keep the tests simple, handle that edge case here.
252 # TODO(johnsom) Once token scope is default, remove this.
253 if ('os_system_admin' in expected_allowed and
254 not CONF.load_balancer.enforce_new_defaults and
255 not CONF.enforce_scope.octavia):
256 allowed_list.append('os_admin')
257
258 # #### Test that disallowed credentials cannot access the API.
259 self._check_disallowed(client_str, method_str, allowed_list,
260 status_method, obj_id, *args, **kwargs)
261
262 def check_create_RBAC_enforcement(
263 self, client_str, method_str, expected_allowed,
264 status_method=None, obj_id=None, *args, **kwargs):
265 """Test an API create call RBAC enforcement.
266
267 :param client_str: The service client to use for the test, without the
Michael Johnson29d8e612021-06-23 16:16:12 +0000268 credential. Example: 'AmphoraClient'
Michael Johnson6006de72021-02-21 01:42:39 +0000269 :param method_str: The method on the client to call for the test.
270 Example: 'list_amphorae'
271 :param expected_allowed: The list of credentials expected to be
272 allowed. Example: ['os_roles_lb_member'].
273 :param status_method: The service client method that will provide
274 the object status for a status change waiter.
275 :param obj_id: The ID of the object to check for the expected status
276 update.
277 :param args: Any positional parameters needed by the method.
278 :param kwargs: Any named parameters needed by the method.
279 :raises AssertionError: Raised if the RBAC tests fail.
280 :raises Forbidden: Raised if a credential that should have access does
281 not and is denied.
282 :raises InvalidScope: Raised if a credential that should have the
283 correct scope for access is denied.
284 :returns: None on success
285 """
286 self._CUD_RBAC_enforcement(client_str, method_str, expected_allowed,
287 status_method, obj_id, *args, **kwargs)
288
289 def check_delete_RBAC_enforcement(
290 self, client_str, method_str, expected_allowed,
291 status_method=None, obj_id=None, *args, **kwargs):
292 """Test an API delete call RBAC enforcement.
293
294 :param client_str: The service client to use for the test, without the
Michael Johnson29d8e612021-06-23 16:16:12 +0000295 credential. Example: 'AmphoraClient'
Michael Johnson6006de72021-02-21 01:42:39 +0000296 :param method_str: The method on the client to call for the test.
297 Example: 'list_amphorae'
298 :param expected_allowed: The list of credentials expected to be
299 allowed. Example: ['os_roles_lb_member'].
300 :param status_method: The service client method that will provide
301 the object status for a status change waiter.
302 :param obj_id: The ID of the object to check for the expected status
303 update.
304 :param args: Any positional parameters needed by the method.
305 :param kwargs: Any named parameters needed by the method.
306 :raises AssertionError: Raised if the RBAC tests fail.
307 :raises Forbidden: Raised if a credential that should have access does
308 not and is denied.
309 :raises InvalidScope: Raised if a credential that should have the
310 correct scope for access is denied.
311 :returns: None on success
312 """
313 self._CUD_RBAC_enforcement(client_str, method_str, expected_allowed,
314 status_method, obj_id, *args, **kwargs)
315
316 def check_update_RBAC_enforcement(
317 self, client_str, method_str, expected_allowed,
318 status_method=None, obj_id=None, *args, **kwargs):
319 """Test an API update call RBAC enforcement.
320
321 :param client_str: The service client to use for the test, without the
Michael Johnson29d8e612021-06-23 16:16:12 +0000322 credential. Example: 'AmphoraClient'
Michael Johnson6006de72021-02-21 01:42:39 +0000323 :param method_str: The method on the client to call for the test.
324 Example: 'list_amphorae'
325 :param expected_allowed: The list of credentials expected to be
326 allowed. Example: ['os_roles_lb_member'].
327 :param status_method: The service client method that will provide
328 the object status for a status change waiter.
329 :param obj_id: The ID of the object to check for the expected status
330 update.
331 :param args: Any positional parameters needed by the method.
332 :param kwargs: Any named parameters needed by the method.
333 :raises AssertionError: Raised if the RBAC tests fail.
334 :raises Forbidden: Raised if a credential that should have access does
335 not and is denied.
336 :raises InvalidScope: Raised if a credential that should have the
337 correct scope for access is denied.
338 :returns: None on success
339 """
340 self._CUD_RBAC_enforcement(client_str, method_str, expected_allowed,
341 status_method, obj_id, *args, **kwargs)
342
343 def check_list_RBAC_enforcement_count(
344 self, client_str, method_str, expected_allowed, expected_count,
345 *args, **kwargs):
346 """Test an API list call RBAC enforcement result count.
347
348 List APIs will return the object list for the project associated
349 with the token used to access the API. This means most credentials
350 will have access, but will get differing results.
351
352 This test will query the list API using a list of credentials and
353 will validate that only the expected count of results are returned.
354
355 :param client_str: The service client to use for the test, without the
Michael Johnson29d8e612021-06-23 16:16:12 +0000356 credential. Example: 'AmphoraClient'
Michael Johnson6006de72021-02-21 01:42:39 +0000357 :param method_str: The method on the client to call for the test.
358 Example: 'list_amphorae'
359 :param expected_allowed: The list of credentials expected to be
360 allowed. Example: ['os_roles_lb_member'].
361 :param expected_count: The number of results expected in the list
362 returned from the API.
363 :param args: Any positional parameters needed by the method.
364 :param kwargs: Any named parameters needed by the method.
365 :raises AssertionError: Raised if the RBAC tests fail.
366 :raises Forbidden: Raised if a credential that should have access does
367 not and is denied.
368 :raises InvalidScope: Raised if a credential that should have the
369 correct scope for access is denied.
370 :returns: None on success
371 """
372
373 allowed_list = copy.deepcopy(expected_allowed)
374 # os_admin is a special case as it is valid with the old defaults,
375 # but will not be with the new defaults and/or token scoping.
376 # The old keystone role "admin" becomes project scoped "admin"
377 # instead of being a global admin.
378 # To keep the tests simple, handle that edge case here.
379 # TODO(johnsom) Once token scope is default, remove this.
380 if ('os_system_admin' in expected_allowed and
381 not CONF.load_balancer.enforce_new_defaults and
382 not CONF.enforce_scope.octavia):
383 allowed_list.append('os_admin')
384
385 for cred in allowed_list:
386 try:
387 cred_obj = getattr(self, cred)
388 except AttributeError:
389 # TODO(johnsom) Remove once scoped tokens is the default.
390 if ((cred == 'os_system_admin' or cred == 'os_system_reader')
391 and not CONF.enforce_scope.octavia):
392 LOG.info('Skipping %s allowed RBAC test because '
393 'enforce_scope.octavia is not True', cred)
394 continue
395 else:
396 self.fail('Credential {} "expected_allowed" for RBAC '
397 'testing was not created by tempest '
398 'credentials setup. This is likely a bug in the '
399 'test.'.format(cred))
Michael Johnson29d8e612021-06-23 16:16:12 +0000400 method = self._get_client_method(cred_obj, client_str, method_str)
Michael Johnson6006de72021-02-21 01:42:39 +0000401 try:
402 result = method(*args, **kwargs)
403 except exceptions.Forbidden as e:
404 self.fail('Method {}.{} failed to allow access via RBAC using '
405 'credential {}. Error: {}'.format(
406 client_str, method_str, cred, str(e)))
407 self.assertEqual(expected_count, len(result), message='Credential '
408 '{} saw {} objects when {} was expected.'.format(
409 cred, len(result), expected_count))
410
411 def check_list_IDs_RBAC_enforcement(
412 self, client_str, method_str, expected_allowed, expected_ids,
413 *args, **kwargs):
414 """Test an API list call RBAC enforcement result contains IDs.
415
416 List APIs will return the object list for the project associated
417 with the token used to access the API. This means most credentials
418 will have access, but will get differing results.
419
420 This test will query the list API using a list of credentials and
421 will validate that the expected object Ids in included in the results.
422
423 :param client_str: The service client to use for the test, without the
Michael Johnson29d8e612021-06-23 16:16:12 +0000424 credential. Example: 'AmphoraClient'
Michael Johnson6006de72021-02-21 01:42:39 +0000425 :param method_str: The method on the client to call for the test.
426 Example: 'list_amphorae'
427 :param expected_allowed: The list of credentials expected to be
428 allowed. Example: ['os_roles_lb_member'].
429 :param expected_ids: The list of object IDs to validate are included
430 in the returned list from the API.
431 :param args: Any positional parameters needed by the method.
432 :param kwargs: Any named parameters needed by the method.
433 :raises AssertionError: Raised if the RBAC tests fail.
434 :raises Forbidden: Raised if a credential that should have access does
435 not and is denied.
436 :raises InvalidScope: Raised if a credential that should have the
437 correct scope for access is denied.
438 :returns: None on success
439 """
440
441 allowed_list = copy.deepcopy(expected_allowed)
442 # os_admin is a special case as it is valid with the old defaults,
443 # but will not be with the new defaults and/or token scoping.
444 # The old keystone role "admin" becomes project scoped "admin"
445 # instead of being a global admin.
446 # To keep the tests simple, handle that edge case here.
447 # TODO(johnsom) Once token scope is default, remove this.
448 if ('os_system_admin' in expected_allowed and
449 not CONF.load_balancer.enforce_new_defaults and
450 not CONF.enforce_scope.octavia):
451 allowed_list.append('os_admin')
452
453 for cred in allowed_list:
454 try:
455 cred_obj = getattr(self, cred)
456 except AttributeError:
457 # TODO(johnsom) Remove once scoped tokens is the default.
458 if ((cred == 'os_system_admin' or cred == 'os_system_reader')
459 and not CONF.enforce_scope.octavia):
460 LOG.info('Skipping %s allowed RBAC test because '
461 'enforce_scope.octavia is not True', cred)
462 continue
463 else:
464 self.fail('Credential {} "expected_allowed" for RBAC '
465 'testing was not created by tempest '
466 'credentials setup. This is likely a bug in the '
467 'test.'.format(cred))
Michael Johnson29d8e612021-06-23 16:16:12 +0000468 method = self._get_client_method(cred_obj, client_str, method_str)
Michael Johnson6006de72021-02-21 01:42:39 +0000469 try:
470 result = method(*args, **kwargs)
471 except exceptions.Forbidden as e:
472 self.fail('Method {}.{} failed to allow access via RBAC using '
473 'credential {}. Error: {}'.format(
474 client_str, method_str, cred, str(e)))
475 result_ids = [lb[constants.ID] for lb in result]
476 self.assertTrue(set(expected_ids).issubset(set(result_ids)))