blob: db648dfbec42421fe2f178db82a99b1634937b7d [file] [log] [blame]
# Copyright 2017 AT&T Corporation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import oslo_utils.uuidutils as uuid_utils
import six
import time
from tempest.common import credentials_factory
from tempest import config
from tempest.test import BaseTestCase
from oslo_log import log as logging
from patrole_tempest_plugin import rbac_exceptions
CONF = config.CONF
LOG = logging.getLogger(__name__)
class Singleton(type):
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args,
**kwargs)
return cls._instances[cls]
@six.add_metaclass(Singleton)
class RbacUtils(object):
def __init__(cls):
creds_provider = credentials_factory.get_credentials_provider(
name=__name__,
force_tenant_isolation=True,
identity_version=BaseTestCase.get_identity_version())
cls.creds_client = creds_provider.creds_client
cls.available_roles = cls.creds_client.roles_client.list_roles()
cls.admin_role_id = cls.rbac_role_id = None
for item in cls.available_roles['roles']:
if item['name'] == CONF.rbac.rbac_test_role:
cls.rbac_role_id = item['id']
if item['name'] == 'admin':
cls.admin_role_id = item['id']
def switch_role(cls, test_obj, switchToRbacRole=None):
LOG.debug('Switching role to: %s', switchToRbacRole)
# Check if admin and rbac roles exist.
if not cls.admin_role_id or not cls.rbac_role_id:
msg = ("Defined 'rbac_role' or 'admin' role does not exist"
" in the system.")
raise rbac_exceptions.RbacResourceSetupFailed(msg)
if not isinstance(switchToRbacRole, bool):
msg = ("Wrong value for parameter 'switchToRbacRole' is passed."
" It should be either 'True' or 'False'.")
raise rbac_exceptions.RbacResourceSetupFailed(msg)
try:
user_id = test_obj.auth_provider.credentials.user_id
project_id = test_obj.auth_provider.credentials.tenant_id
cls._clear_user_roles(user_id, project_id)
if switchToRbacRole:
cls.creds_client.roles_client.create_user_role_on_project(
project_id, user_id, cls.rbac_role_id)
else:
cls.creds_client.roles_client.create_user_role_on_project(
project_id, user_id, cls.admin_role_id)
except Exception as exp:
LOG.error(exp)
raise
finally:
# NOTE(felipemonteiro): These two comments below are copied from
# tempest.api.identity.v2/v3.test_users.
#
# Reset auth again to verify the password restore does work.
# Clear auth restores the original credentials and deletes
# cached auth data.
test_obj.auth_provider.clear_auth()
# Fernet tokens are not subsecond aware and Keystone should only be
# precise to the second. Sleep to ensure we are passing the second
# boundary before attempting to authenticate. If token is of type
# uuid, then do not sleep.
if not uuid_utils.is_uuid_like(cls.creds_client.
identity_client.token):
time.sleep(1)
test_obj.auth_provider.set_auth()
def _clear_user_roles(cls, user_id, tenant_id):
roles = cls.creds_client.roles_client.list_user_roles_on_project(
tenant_id, user_id)['roles']
for role in roles:
cls.creds_client.roles_client.delete_role_from_user_on_project(
tenant_id, user_id, role['id'])
rbac_utils = RbacUtils