Refactors Patrole framework to only use admin tenant credential type.
This patch adds following capablities to RBAC tempest framework:
1. Restricts admin only tenant to be used for rbac as much as possible,
and thus removes need of creating additional tenant with "primary"
credential type.
2. Patrole framework requires "tempest_roles" value in the conf file
to be set to "admin" role. Which again restricts tempest.conf to a
hardcoded value. This patch takes care of this problem also.
Note: Adding this patch will required some cleanup and refactoring in
test files, which will be taken care in separate commits component wise.
Co-Authored-By: Mh Raies <mh.raies@ericsson.com>
Co-Authored-By: Felipe Monteiro <felipe.monteiro@att.com>
Implements bp: modifying-switching-role-mechanism
Closes-Bug: #1664600
Closes-Bug: #1664278
Change-Id: Ic665d35332def6b6ec7b0065d1ebe65514a926b9
diff --git a/patrole_tempest_plugin/rbac_utils.py b/patrole_tempest_plugin/rbac_utils.py
index 48d5b4c..d5f1c2b 100644
--- a/patrole_tempest_plugin/rbac_utils.py
+++ b/patrole_tempest_plugin/rbac_utils.py
@@ -13,19 +13,19 @@
# License for the specific language governing permissions and limitations
# under the License.
-import json
import six
import time
-import urllib3
+
+from tempest.common import credentials_factory
+from tempest import config
+from tempest.test import BaseTestCase
from oslo_log import log as logging
-from tempest import config
-from patrole_tempest_plugin import rbac_exceptions as rbac_exc
+from patrole_tempest_plugin import rbac_exceptions
-LOG = logging.getLogger(__name__)
CONF = config.CONF
-http = urllib3.PoolManager()
+LOG = logging.getLogger(__name__)
class Singleton(type):
@@ -40,89 +40,64 @@
@six.add_metaclass(Singleton)
class RbacUtils(object):
- def __init__(self):
- RbacUtils.dictionary = {}
- @staticmethod
- def get_roles(caller):
- admin_role_id = None
- rbac_role_id = None
+ def __init__(cls):
+ creds_provider = credentials_factory.get_credentials_provider(
+ name=__name__,
+ force_tenant_isolation=True,
+ identity_version=BaseTestCase.get_identity_version())
- if bool(RbacUtils.dictionary) is False:
- admin_token = caller.admin_client.token
- headers = {'X-Auth-Token': admin_token,
- "Content-Type": "application/json"}
- url_to_get_role = CONF.identity.uri_v3 + '/roles/'
- response = http.request('GET', url_to_get_role, headers=headers)
- if response.status != 200:
- raise rbac_exc.RbacResourceSetupFailed('Unable to'
- ' retrieve roles')
- data = response.data
- roles = json.loads(data)
- for item in roles['roles']:
- if item['name'] == CONF.rbac.rbac_test_role:
- rbac_role_id = item['id']
- if item['name'] == 'admin':
- admin_role_id = item['id']
+ cls.creds_client = creds_provider.creds_client
+ cls.available_roles = cls.creds_client.roles_client.list_roles()
+ cls.admin_role_id = cls.rbac_role_id = None
+ for item in cls.available_roles['roles']:
+ if item['name'] == CONF.rbac.rbac_test_role:
+ cls.rbac_role_id = item['id']
+ if item['name'] == 'admin':
+ cls.admin_role_id = item['id']
+ # Check if admin and rbac role exits
+ if not cls.admin_role_id or not cls.rbac_role_id:
+ msg = ("defined 'rbac_role' or 'admin' role does not exist"
+ " in the system.")
+ raise rbac_exceptions.RbacResourceSetupFailed(msg)
- RbacUtils.dictionary.update({'admin_role_id': admin_role_id,
- 'rbac_role_id': rbac_role_id})
+ def clear_user_roles(cls, user_id, tenant_id):
+ roles = cls.creds_client.roles_client.list_user_roles_on_project(
+ tenant_id, user_id)['roles']
- return RbacUtils.dictionary
+ for role in roles:
+ cls.creds_client.roles_client.delete_role_from_user_on_project(
+ tenant_id, user_id, role['id'])
- @staticmethod
- def delete_all_roles(self, base_url, headers):
- # Find the current role
- response = http.request('GET', base_url, headers=headers)
- if response.status != 200:
- raise rbac_exc.RbacResourceSetupFailed('Unable to retrieve'
- ' user role')
- data = response.data
- roles = json.loads(data)
- for item in roles['roles']:
- url = base_url + item['id']
- response = http.request('DELETE', url, headers=headers)
- self.assertEqual(204, response.status)
-
- @staticmethod
- def switch_role(self, switchToRbacRole=None):
+ def switch_role(cls, test_obj, switchToRbacRole=None):
LOG.debug('Switching role to: %s', switchToRbacRole)
- if switchToRbacRole is None:
- return
-
- roles = rbac_utils.get_roles(self)
- rbac_role_id = roles.get('rbac_role_id')
- admin_role_id = roles.get('admin_role_id')
+ if not isinstance(switchToRbacRole, bool):
+ msg = ("Wrong value for parameter 'switchToRbacRole' is passed."
+ " It should be either 'True' or 'False'.")
+ raise rbac_exceptions.RbacActionFailed(msg)
try:
- user_id = self.auth_provider.credentials.user_id
- project_id = self.auth_provider.credentials.tenant_id
- admin_token = self.admin_client.token
+ user_id = test_obj.auth_provider.credentials.user_id
+ project_id = test_obj.auth_provider.credentials.tenant_id
- headers = {'X-Auth-Token': admin_token,
- "Content-Type": "application/json"}
- base_url = (CONF.identity.uri_v3 + '/projects/' + project_id +
- '/users/' + user_id + '/roles/')
-
- rbac_utils.delete_all_roles(self, base_url, headers)
+ cls.clear_user_roles(user_id, project_id)
if switchToRbacRole:
- url = base_url + rbac_role_id
- response = http.request('PUT', url, headers=headers)
- self.assertEqual(204, response.status)
+ cls.creds_client.roles_client.create_user_role_on_project(
+ project_id, user_id, cls.rbac_role_id)
else:
- url = base_url + admin_role_id
- response = http.request('PUT', url, headers=headers)
- self.assertEqual(204, response.status)
+ cls.creds_client.roles_client.create_user_role_on_project(
+ project_id, user_id, cls.admin_role_id)
except Exception as exp:
LOG.error(exp)
raise
- finally:
- self.auth_provider.clear_auth()
- # Sleep to avoid 401 errors caused by rounding
- # In timing of fernet token creation
- time.sleep(1)
- self.auth_provider.set_auth()
-rbac_utils = RbacUtils()
+ finally:
+ test_obj.auth_provider.clear_auth()
+ # Sleep to avoid 401 errors caused by rounding
+ # In timing of fernet token creation
+ time.sleep(1)
+ test_obj.auth_provider.set_auth()
+
+rbac_utils = RbacUtils