blob: 00bfd24dc7d01e3a28b96253df8a9caba4f40564 [file] [log] [blame]
DavidPurcellb25f93d2017-01-27 12:46:27 -05001# Copyright 2017 AT&T Corporation.
DavidPurcell029d8c32017-01-06 15:27:41 -05002# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
15
Rick Bartraed950052017-06-29 17:20:33 -040016import abc
17import six
Felipe Monteirofa01d5f2017-04-01 06:18:25 +010018import sys
DavidPurcell029d8c32017-01-06 15:27:41 -050019import time
Felipe Monteiro34a138c2017-03-02 17:01:37 -050020
Rajiv Kumar645dfc92017-01-19 13:48:27 +053021from oslo_log import log as logging
Felipe Monteirofa01d5f2017-04-01 06:18:25 +010022import oslo_utils.uuidutils as uuid_utils
Felipe Monteirofa01d5f2017-04-01 06:18:25 +010023
Felipe Monteiroe7e552e2017-05-02 17:04:12 +010024from tempest.common import credentials_factory as credentials
Felipe Monteirofa01d5f2017-04-01 06:18:25 +010025from tempest import config
DavidPurcell029d8c32017-01-06 15:27:41 -050026
Felipe Monteiro34a138c2017-03-02 17:01:37 -050027from patrole_tempest_plugin import rbac_exceptions
DavidPurcell029d8c32017-01-06 15:27:41 -050028
DavidPurcell029d8c32017-01-06 15:27:41 -050029CONF = config.CONF
Felipe Monteiro34a138c2017-03-02 17:01:37 -050030LOG = logging.getLogger(__name__)
DavidPurcell029d8c32017-01-06 15:27:41 -050031
32
DavidPurcell029d8c32017-01-06 15:27:41 -050033class RbacUtils(object):
DavidPurcell029d8c32017-01-06 15:27:41 -050034
Felipe Monteirob35de582017-05-05 00:16:53 +010035 def __init__(self, test_obj):
36 self.switch_role(test_obj, toggle_rbac_role=False)
37
Felipe Monteiro75f23632017-04-07 15:56:26 +010038 # References the last value of `toggle_rbac_role` that was passed to
Felipe Monteirofa01d5f2017-04-01 06:18:25 +010039 # `switch_role`. Used for ensuring that `switch_role` is correctly used
40 # in a test file, so that false positives are prevented. The key used
Felipe Monteiro521e5c12017-04-05 22:59:57 +010041 # to index into the dictionary is the module path plus class name, which is
42 # unique.
Felipe Monteirofa01d5f2017-04-01 06:18:25 +010043 switch_role_history = {}
44 admin_role_id = None
45 rbac_role_id = None
DavidPurcell029d8c32017-01-06 15:27:41 -050046
Felipe Monteiro75f23632017-04-07 15:56:26 +010047 def switch_role(self, test_obj, toggle_rbac_role=False):
Felipe Monteirofa01d5f2017-04-01 06:18:25 +010048 self.user_id = test_obj.auth_provider.credentials.user_id
49 self.project_id = test_obj.auth_provider.credentials.tenant_id
50 self.token = test_obj.auth_provider.get_token()
51 self.identity_version = test_obj.get_identity_version()
DavidPurcell029d8c32017-01-06 15:27:41 -050052
Felipe Monteiroe7e552e2017-05-02 17:04:12 +010053 if not credentials.is_admin_available(
54 identity_version=self.identity_version):
55 msg = "Missing Identity Admin API credentials in configuration."
56 raise rbac_exceptions.RbacResourceSetupFailed(msg)
57
58 self.roles_client = test_obj.os_admin.roles_v3_client
Felipe Monteirofa01d5f2017-04-01 06:18:25 +010059
Felipe Monteiro75f23632017-04-07 15:56:26 +010060 LOG.debug('Switching role to: %s', toggle_rbac_role)
DavidPurcell029d8c32017-01-06 15:27:41 -050061
62 try:
Felipe Monteirofa01d5f2017-04-01 06:18:25 +010063 if not self.admin_role_id or not self.rbac_role_id:
64 self._get_roles()
DavidPurcell029d8c32017-01-06 15:27:41 -050065
Felipe Monteirob35de582017-05-05 00:16:53 +010066 self._validate_switch_role(test_obj, toggle_rbac_role)
DavidPurcell029d8c32017-01-06 15:27:41 -050067
Felipe Monteiro75f23632017-04-07 15:56:26 +010068 if toggle_rbac_role:
Felipe Monteirofa01d5f2017-04-01 06:18:25 +010069 self._add_role_to_user(self.rbac_role_id)
DavidPurcell029d8c32017-01-06 15:27:41 -050070 else:
Felipe Monteirofa01d5f2017-04-01 06:18:25 +010071 self._add_role_to_user(self.admin_role_id)
DavidPurcell029d8c32017-01-06 15:27:41 -050072 except Exception as exp:
73 LOG.error(exp)
74 raise
Felipe Monteiro34a138c2017-03-02 17:01:37 -050075 finally:
Felipe Monteiro23923f02017-03-17 02:15:07 +000076 # NOTE(felipemonteiro): These two comments below are copied from
77 # tempest.api.identity.v2/v3.test_users.
78 #
79 # Reset auth again to verify the password restore does work.
80 # Clear auth restores the original credentials and deletes
81 # cached auth data.
82 test_obj.auth_provider.clear_auth()
83 # Fernet tokens are not subsecond aware and Keystone should only be
84 # precise to the second. Sleep to ensure we are passing the second
Rick Bartra89f498f2017-03-20 15:54:45 -040085 # boundary before attempting to authenticate. If token is of type
86 # uuid, then do not sleep.
Felipe Monteirofa01d5f2017-04-01 06:18:25 +010087 if not uuid_utils.is_uuid_like(self.token):
Rick Bartra89f498f2017-03-20 15:54:45 -040088 time.sleep(1)
Felipe Monteiro23923f02017-03-17 02:15:07 +000089 test_obj.auth_provider.set_auth()
Felipe Monteiro34a138c2017-03-02 17:01:37 -050090
Felipe Monteirofa01d5f2017-04-01 06:18:25 +010091 def _add_role_to_user(self, role_id):
92 role_already_present = self._clear_user_roles(role_id)
93 if role_already_present:
94 return
95
96 self.roles_client.create_user_role_on_project(
97 self.project_id, self.user_id, role_id)
98
99 def _clear_user_roles(self, role_id):
100 roles = self.roles_client.list_user_roles_on_project(
101 self.project_id, self.user_id)['roles']
102
103 # If the user already has the role that is required, return early.
104 role_ids = [role['id'] for role in roles]
105 if role_ids == [role_id]:
106 return True
Felipe Monteirob3b7bc82017-03-03 15:58:15 -0500107
108 for role in roles:
Felipe Monteirofa01d5f2017-04-01 06:18:25 +0100109 self.roles_client.delete_role_from_user_on_project(
110 self.project_id, self.user_id, role['id'])
111
112 return False
113
Felipe Monteiro75f23632017-04-07 15:56:26 +0100114 def _validate_switch_role(self, test_obj, toggle_rbac_role):
Felipe Monteirofa01d5f2017-04-01 06:18:25 +0100115 """Validates that the rbac role passed to `switch_role` is legal.
116
117 Throws an error for the following improper usages of `switch_role`:
118 * `switch_role` is not called with a boolean value
119 * `switch_role` is never called in a test file, except in tearDown
120 * `switch_role` is called with the same boolean value twice
121 """
Felipe Monteiro75f23632017-04-07 15:56:26 +0100122 if not isinstance(toggle_rbac_role, bool):
Felipe Monteirofa01d5f2017-04-01 06:18:25 +0100123 raise rbac_exceptions.RbacResourceSetupFailed(
Felipe Monteiro75f23632017-04-07 15:56:26 +0100124 'toggle_rbac_role must be a boolean value.')
Felipe Monteirofa01d5f2017-04-01 06:18:25 +0100125
Felipe Monteiro521e5c12017-04-05 22:59:57 +0100126 # The unique key is the combination of module path plus class name.
127 class_name = test_obj.__name__ if isinstance(test_obj, type) else \
Felipe Monteirofa01d5f2017-04-01 06:18:25 +0100128 test_obj.__class__.__name__
Felipe Monteiro521e5c12017-04-05 22:59:57 +0100129 module_name = test_obj.__module__
130 key = '%s.%s' % (module_name, class_name)
131
Felipe Monteirofa01d5f2017-04-01 06:18:25 +0100132 self.switch_role_history.setdefault(key, None)
133
Felipe Monteiro75f23632017-04-07 15:56:26 +0100134 if self.switch_role_history[key] == toggle_rbac_role:
Felipe Monteiroba4881b2017-04-09 02:11:25 +0100135 # If an exception was thrown, like a skipException or otherwise,
136 # then this is a legitimate reason why `switch_role` was not
137 # called, so only raise an exception if no current exception is
138 # being handled.
139 if sys.exc_info()[0] is None:
Felipe Monteirofa01d5f2017-04-01 06:18:25 +0100140 self.switch_role_history[key] = False
Felipe Monteiro75f23632017-04-07 15:56:26 +0100141 error_message = '`toggle_rbac_role` must not be called with '\
Felipe Monteirofa01d5f2017-04-01 06:18:25 +0100142 'the same bool value twice. Make sure that you included '\
143 'a rbac_utils.switch_role method call inside the test.'
144 LOG.error(error_message)
145 raise rbac_exceptions.RbacResourceSetupFailed(error_message)
146 else:
Felipe Monteiro75f23632017-04-07 15:56:26 +0100147 self.switch_role_history[key] = toggle_rbac_role
Felipe Monteirofa01d5f2017-04-01 06:18:25 +0100148
149 def _get_roles(self):
150 available_roles = self.roles_client.list_roles()
151 admin_role_id = rbac_role_id = None
152
153 for role in available_roles['roles']:
154 if role['name'] == CONF.rbac.rbac_test_role:
155 rbac_role_id = role['id']
Felipe Monteirof6b69e22017-05-04 21:55:04 +0100156 if role['name'] == CONF.identity.admin_role:
Felipe Monteirofa01d5f2017-04-01 06:18:25 +0100157 admin_role_id = role['id']
158
159 if not admin_role_id or not rbac_role_id:
160 msg = "Role with name 'admin' does not exist in the system."\
161 if not admin_role_id else "Role defined by rbac_test_role "\
162 "does not exist in the system."
163 raise rbac_exceptions.RbacResourceSetupFailed(msg)
164
165 self.admin_role_id = admin_role_id
166 self.rbac_role_id = rbac_role_id
Felipe Monteiro17e9b492017-05-27 05:45:20 +0100167
168 @property
169 def is_admin(self):
170 """Verifies whether the current test role equals the admin role.
171
172 :returns: True if ``rbac_test_role`` is the admin role.
173 """
174 return CONF.rbac.rbac_test_role == CONF.identity.admin_role
Rick Bartraed950052017-06-29 17:20:33 -0400175
176
177@six.add_metaclass(abc.ABCMeta)
178class RbacAuthority(object):
179 # TODO(rb560u): Add documentation explaining what this class is for
180
181 @abc.abstractmethod
182 def allowed(self, rule_name, role):
183 """Determine whether the role should be able to perform the API"""
184 return