blob: b7ac8d9eb177bffa09de23639bac4fc96223ad5c [file] [log] [blame]
# Copyright 2017 AT&T Corporation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from contextlib import contextmanager
import sys
import time
from oslo_log import log as logging
from oslo_utils import excutils
from tempest import clients
from tempest.common import credentials_factory as credentials
from tempest import config
from tempest.lib import exceptions as lib_exc
from patrole_tempest_plugin import rbac_exceptions
CONF = config.CONF
LOG = logging.getLogger(__name__)
class RbacUtils(object):
"""Utility class responsible for switching ``os_primary`` role.
This class is responsible for overriding the value of the primary Tempest
credential's role (i.e. ``os_primary`` role). By doing so, it is possible
to seamlessly swap between admin credentials, needed for setup and clean
up, and primary credentials, needed to perform the API call which does
policy enforcement. The primary credentials always cycle between roles
defined by ``CONF.identity.admin_role`` and
``CONF.patrole.rbac_test_role``.
"""
def __init__(self, test_obj):
"""Constructor for ``RbacUtils``.
:param test_obj: An instance of `tempest.test.BaseTestCase`.
"""
# Intialize the admin roles_client to perform role switching.
admin_mgr = clients.Manager(
credentials.get_configured_admin_credentials())
if CONF.identity_feature_enabled.api_v3:
admin_roles_client = admin_mgr.roles_v3_client
else:
raise lib_exc.InvalidConfiguration(
"Patrole role overriding only supports v3 identity API.")
self.admin_roles_client = admin_roles_client
self._override_role(test_obj, False)
admin_role_id = None
rbac_role_id = None
@contextmanager
def override_role(self, test_obj):
"""Override the role used by ``os_primary`` Tempest credentials.
Temporarily change the role used by ``os_primary`` credentials to:
* ``[patrole] rbac_test_role`` before test execution
* ``[identity] admin_role`` after test execution
Automatically switches to admin role after test execution.
:param test_obj: Instance of ``tempest.test.BaseTestCase``.
:returns: None
.. warning::
This function can alter user roles for pre-provisioned credentials.
Work is underway to safely clean up after this function.
Example::
@rbac_rule_validation.action(service='test',
rules=['a:test:rule'])
def test_foo(self):
# Allocate test-level resources here.
with self.rbac_utils.override_role(self):
# The role for `os_primary` has now been overridden. Within
# this block, call the API endpoint that enforces the
# expected policy specified by "rule" in the decorator.
self.foo_service.bar_api_call()
# The role is switched back to admin automatically. Note that
# if the API call above threw an exception, any code below this
# point in the test is not executed.
"""
test_obj._set_override_role_called()
self._override_role(test_obj, True)
try:
# Execute the test.
yield
finally:
# Check whether an exception was raised. If so, remember that
# for future validation.
exc = sys.exc_info()[0]
if exc is not None:
test_obj._set_override_role_caught_exc()
# This code block is always executed, no matter the result of the
# test. Automatically switch back to the admin role for test clean
# up.
self._override_role(test_obj, False)
def _override_role(self, test_obj, toggle_rbac_role=False):
"""Private helper for overriding ``os_primary`` Tempest credentials.
:param test_obj: instance of :py:class:`tempest.test.BaseTestCase`
:param toggle_rbac_role: Boolean value that controls the role that
overrides default role of ``os_primary`` credentials.
* If True: role is set to ``[patrole] rbac_test_role``
* If False: role is set to ``[identity] admin_role``
"""
self.user_id = test_obj.os_primary.credentials.user_id
self.project_id = test_obj.os_primary.credentials.tenant_id
self.token = test_obj.os_primary.auth_provider.get_token()
LOG.debug('Overriding role to: %s.', toggle_rbac_role)
role_already_present = False
try:
if not all([self.admin_role_id, self.rbac_role_id]):
self._get_roles_by_name()
target_role = (
self.rbac_role_id if toggle_rbac_role else self.admin_role_id)
role_already_present = self._list_and_clear_user_roles_on_project(
target_role)
# Do not override roles if `target_role` already exists.
if not role_already_present:
self._create_user_role_on_project(target_role)
except Exception as exp:
with excutils.save_and_reraise_exception():
LOG.exception(exp)
finally:
auth_providers = test_obj.get_auth_providers()
for provider in auth_providers:
provider.clear_auth()
# Fernet tokens are not subsecond aware so sleep to ensure we are
# passing the second boundary before attempting to authenticate.
# Only sleep if a token revocation occurred as a result of role
# overriding. This will optimize test runtime in the case where
# ``[identity] admin_role`` == ``[patrole] rbac_test_role``.
if not role_already_present:
time.sleep(1)
for provider in auth_providers:
provider.set_auth()
def _get_roles_by_name(self):
available_roles = self.admin_roles_client.list_roles()['roles']
role_map = {r['name']: r['id'] for r in available_roles}
LOG.debug('Available roles: %s', list(role_map.keys()))
admin_role_id = role_map.get(CONF.identity.admin_role)
rbac_role_id = role_map.get(CONF.patrole.rbac_test_role)
if not all([admin_role_id, rbac_role_id]):
missing_roles = []
msg = ("Could not find `[patrole] rbac_test_role` or "
"`[identity] admin_role`, both of which are required for "
"RBAC testing.")
if not admin_role_id:
missing_roles.append(CONF.identity.admin_role)
if not rbac_role_id:
missing_roles.append(CONF.patrole.rbac_test_role)
msg += " Following roles were not found: %s." % (
", ".join(missing_roles))
msg += " Available roles: %s." % ", ".join(list(role_map.keys()))
raise rbac_exceptions.RbacResourceSetupFailed(msg)
self.admin_role_id = admin_role_id
self.rbac_role_id = rbac_role_id
def _create_user_role_on_project(self, role_id):
self.admin_roles_client.create_user_role_on_project(
self.project_id, self.user_id, role_id)
def _list_and_clear_user_roles_on_project(self, role_id):
roles = self.admin_roles_client.list_user_roles_on_project(
self.project_id, self.user_id)['roles']
role_ids = [role['id'] for role in roles]
# NOTE(felipemonteiro): We do not use ``role_id in role_ids`` here to
# avoid over-permission errors: if the current list of roles on the
# project includes "admin" and "Member", and we are switching to the
# "Member" role, then we must delete the "admin" role. Thus, we only
# return early if the user's roles on the project are an exact match.
if [role_id] == role_ids:
return True
for role in roles:
self.admin_roles_client.delete_role_from_user_on_project(
self.project_id, self.user_id, role['id'])
return False
class RbacUtilsMixin(object):
"""Mixin class to be used alongside an instance of
:py:class:`tempest.test.BaseTestCase`.
Should be used to perform Patrole class setup for a base RBAC class. Child
classes should not use this mixin.
Example::
class BaseRbacTest(rbac_utils.RbacUtilsMixin, base.BaseV2ComputeTest):
@classmethod
def skip_checks(cls):
super(BaseRbacTest, cls).skip_checks()
cls.skip_rbac_checks()
@classmethod
def setup_clients(cls):
super(BaseRbacTest, cls).setup_clients()
cls.setup_rbac_utils()
"""
# Shows if override_role was called.
__override_role_called = False
# Shows if exception raised during override_role.
__override_role_caught_exc = False
@classmethod
def get_auth_providers(cls):
"""Returns list of auth_providers used within test.
Tests may redefine this method to include their own or third party
client auth_providers.
"""
return [cls.os_primary.auth_provider]
@classmethod
def setup_rbac_utils(cls):
cls.rbac_utils = RbacUtils(cls)
def _set_override_role_called(self):
"""Helper for tracking whether ``override_role`` was called."""
self.__override_role_called = True
def _set_override_role_caught_exc(self):
"""Helper for tracking whether exception was thrown inside
``override_role``.
"""
self.__override_role_caught_exc = True
def _validate_override_role_called(self):
"""Idempotently validate that ``override_role`` is called and reset
its value to False for sequential tests.
"""
was_called = self.__override_role_called
self.__override_role_called = False
return was_called
def _validate_override_role_caught_exc(self):
"""Idempotently validate that exception was caught inside
``override_role``, so that, by process of elimination, it can be
determined whether one was thrown outside (which is invalid).
"""
caught_exception = self.__override_role_caught_exc
self.__override_role_caught_exc = False
return caught_exception
def is_admin():
"""Verifies whether the current test role equals the admin role.
:returns: True if ``rbac_test_role`` is the admin role.
"""
# TODO(felipemonteiro): Make this more robust via a context is admin
# lookup.
return CONF.patrole.rbac_test_role == CONF.identity.admin_role