blob: 69a43ea4c0280e4c11d6c51da8a9b5fb941b246c [file] [log] [blame]
# Copyright 2017 AT&T Corporation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import functools
import logging
import sys
from oslo_log import versionutils
from oslo_utils import excutils
import six
from tempest import config
from tempest.lib import exceptions
from tempest import test
from patrole_tempest_plugin import policy_authority
from patrole_tempest_plugin import rbac_exceptions
from patrole_tempest_plugin import requirements_authority
CONF = config.CONF
LOG = logging.getLogger(__name__)
_SUPPORTED_ERROR_CODES = [403, 404]
RBACLOG = logging.getLogger('rbac_reporting')
def action(service, rule='', rules=None, expected_error_code=403,
extra_target_data=None):
"""A decorator for verifying OpenStack policy enforcement.
A decorator which allows for positive and negative RBAC testing. Given:
* an OpenStack service,
* a policy action (``rule``) enforced by that service, and
* the test role defined by ``[patrole] rbac_test_role``
determines whether the test role has sufficient permissions to perform an
API call that enforces the ``rule``.
This decorator should only be applied to an instance or subclass of
``tempest.test.BaseTestCase``.
The result from ``_is_authorized`` is used to determine the *expected*
test result. The *actual* test result is determined by running the
Tempest test this decorator applies to.
Below are the following possibilities from comparing the *expected* and
*actual* results:
1) If *expected* is True and the test passes (*actual*), this is a success.
2) If *expected* is True and the test fails (*actual*), this results in a
`Forbidden` exception failure.
3) If *expected* is False and the test passes (*actual*), this results in
an `OverPermission` exception failure.
4) If *expected* is False and the test fails (*actual*), this is a success.
As such, negative and positive testing can be applied using this decorator.
:param str service: An OpenStack service. Examples: "nova" or "neutron".
:param str rule: (DEPRECATED) A policy action defined in a policy.json file
or in code.
:param list rules: A list of policy actions defined in a policy.json file
or in code. The rules are logical-ANDed together to derive the expected
result.
.. note::
Patrole currently only supports custom JSON policy files.
:param int expected_error_code: Overrides default value of 403 (Forbidden)
with endpoint-specific error code. Currently only supports 403 and 404.
Support for 404 is needed because some services, like Neutron,
intentionally throw a 404 for security reasons.
.. warning::
A 404 should not be provided *unless* the endpoint masks a
``Forbidden`` exception as a ``NotFound`` exception.
:param dict extra_target_data: Dictionary, keyed with ``oslo.policy``
generic check names, whose values are string literals that reference
nested ``tempest.test.BaseTestCase`` attributes. Used by
``oslo.policy`` for performing matching against attributes that are
sent along with the API calls. Example::
extra_target_data={
"target.token.user_id":
"os_alt.auth_provider.credentials.user_id"
})
:raises NotFound: If ``service`` is invalid.
:raises Forbidden: For item (2) above.
:raises RbacOverPermission: For item (3) above.
Examples::
@rbac_rule_validation.action(
service="nova", rule="os_compute_api:os-agents")
def test_list_agents_rbac(self):
# The call to `override_role` is mandatory.
with self.rbac_utils.override_role(self):
self.agents_client.list_agents()
"""
if extra_target_data is None:
extra_target_data = {}
rules = _prepare_rules(rule, rules)
def decorator(test_func):
role = CONF.patrole.rbac_test_role
@functools.wraps(test_func)
def wrapper(*args, **kwargs):
if args and isinstance(args[0], test.BaseTestCase):
test_obj = args[0]
else:
raise rbac_exceptions.RbacResourceSetupFailed(
'`rbac_rule_validation` decorator can only be applied to '
'an instance of `tempest.test.BaseTestCase`.')
allowed = True
disallowed_rules = []
for rule in rules:
_allowed = _is_authorized(
test_obj, service, rule, extra_target_data)
if not _allowed:
disallowed_rules.append(rule)
allowed = allowed and _allowed
expected_exception, irregular_msg = _get_exception_type(
expected_error_code)
test_status = 'Allowed'
try:
test_func(*args, **kwargs)
except rbac_exceptions.RbacInvalidService as e:
msg = ("%s is not a valid service." % service)
test_status = ('Error, %s' % (msg))
LOG.error(msg)
raise exceptions.NotFound(
"%s RbacInvalidService was: %s" % (msg, e))
except (expected_exception,
rbac_exceptions.RbacConflictingPolicies,
rbac_exceptions.RbacMalformedResponse) as e:
test_status = 'Denied'
if irregular_msg:
LOG.warning(irregular_msg.format(rule, service))
if allowed:
msg = ("Role %s was not allowed to perform the following "
"actions: %s. Expected allowed actions: %s. "
"Expected disallowed actions: %s." % (
role, sorted(rules),
sorted(set(rules) - set(disallowed_rules)),
sorted(disallowed_rules)))
LOG.error(msg)
raise exceptions.Forbidden(
"%s Exception was: %s" % (msg, e))
except Exception as e:
with excutils.save_and_reraise_exception():
exc_info = sys.exc_info()
error_details = six.text_type(exc_info[1])
msg = ("An unexpected exception has occurred during test: "
"%s. Exception was: %s" % (test_func.__name__,
error_details))
test_status = 'Error, %s' % (error_details)
LOG.error(msg)
else:
if not allowed:
msg = (
"OverPermission: Role %s was allowed to perform the "
"following disallowed actions: %s" % (
role, sorted(disallowed_rules)
)
)
LOG.error(msg)
raise rbac_exceptions.RbacOverPermission(msg)
finally:
if CONF.patrole_log.enable_reporting:
RBACLOG.info(
"[Service]: %s, [Test]: %s, [Rule]: %s, "
"[Expected]: %s, [Actual]: %s",
service, test_func.__name__, rule,
"Allowed" if allowed else "Denied",
test_status)
return wrapper
return decorator
def _prepare_rules(rule, rules):
if rules is None:
rules = []
elif not isinstance(rules, (tuple, list)):
rules = [rules]
if rule:
deprecation_msg = (
"The `rule` argument has been deprecated in favor of `rules` "
"and will be removed in a future version.")
versionutils.report_deprecated_feature(LOG, deprecation_msg)
if rules:
LOG.debug("The `rules` argument will be used instead of `rule`.")
else:
rules.append(rule)
return rules
def _is_authorized(test_obj, service, rule, extra_target_data):
"""Validates whether current RBAC role has permission to do policy action.
:param test_obj: An instance or subclass of ``tempest.test.BaseTestCase``.
:param service: The OpenStack service that enforces ``rule``.
:param rule: The name of the policy action. Examples include
"identity:create_user" or "os_compute_api:os-agents".
:param extra_target_data: Dictionary, keyed with ``oslo.policy`` generic
check names, whose values are string literals that reference nested
``tempest.test.BaseTestCase`` attributes. Used by ``oslo.policy`` for
performing matching against attributes that are sent along with the API
calls.
:returns: True if the current RBAC role can perform the policy action,
else False.
:raises RbacResourceSetupFailed: If `project_id` or `user_id` are missing
from the `auth_provider` attribute in `test_obj`.
"""
try:
project_id = test_obj.os_primary.credentials.project_id
user_id = test_obj.os_primary.credentials.user_id
except AttributeError as e:
msg = ("{0}: project_id or user_id not found in os_primary.credentials"
.format(e))
LOG.error(msg)
raise rbac_exceptions.RbacResourceSetupFailed(msg)
role = CONF.patrole.rbac_test_role
# Test RBAC against custom requirements. Otherwise use oslo.policy.
if CONF.patrole.test_custom_requirements:
authority = requirements_authority.RequirementsAuthority(
CONF.patrole.custom_requirements_file, service)
else:
formatted_target_data = _format_extra_target_data(
test_obj, extra_target_data)
authority = policy_authority.PolicyAuthority(
project_id, user_id, service,
extra_target_data=formatted_target_data)
is_allowed = authority.allowed(rule, role)
if is_allowed:
LOG.debug("[Action]: %s, [Role]: %s is allowed!", rule,
role)
else:
LOG.debug("[Action]: %s, [Role]: %s is NOT allowed!",
rule, role)
return is_allowed
def _get_exception_type(expected_error_code=403):
"""Dynamically calculate the expected exception to be caught.
Dynamically calculate the expected exception to be caught by the test case.
Only ``Forbidden`` and ``NotFound`` exceptions are permitted. ``NotFound``
is supported because Neutron, for security reasons, masks ``Forbidden``
exceptions as ``NotFound`` exceptions.
:param expected_error_code: the integer representation of the expected
exception to be caught. Must be contained in
``_SUPPORTED_ERROR_CODES``.
:returns: tuple of the exception type corresponding to
``expected_error_code`` and a message explaining that a non-Forbidden
exception was expected, if applicable.
"""
expected_exception = None
irregular_msg = None
if not isinstance(expected_error_code, six.integer_types) \
or expected_error_code not in _SUPPORTED_ERROR_CODES:
msg = ("Please pass an expected error code. Currently "
"supported codes: {0}".format(_SUPPORTED_ERROR_CODES))
LOG.error(msg)
raise rbac_exceptions.RbacInvalidErrorCode(msg)
if expected_error_code == 403:
expected_exception = exceptions.Forbidden
elif expected_error_code == 404:
expected_exception = exceptions.NotFound
irregular_msg = ("NotFound exception was caught for policy action "
"{0}. The service {1} throws a 404 instead of a 403, "
"which is irregular.")
return expected_exception, irregular_msg
def _format_extra_target_data(test_obj, extra_target_data):
"""Formats the "extra_target_data" dictionary with correct test data.
Before being formatted, "extra_target_data" is a dictionary that maps a
policy string like "trust.trustor_user_id" to a nested list of
``tempest.test.BaseTestCase`` attributes. For example, the attribute list
in::
"trust.trustor_user_id": "os.auth_provider.credentials.user_id"
is parsed by iteratively calling ``getattr`` until the value of "user_id"
is resolved. The resulting dictionary returns::
"trust.trustor_user_id": "the user_id of the `os_primary` credential"
:param test_obj: An instance or subclass of ``tempest.test.BaseTestCase``.
:param extra_target_data: Dictionary, keyed with ``oslo.policy`` generic
check names, whose values are string literals that reference nested
``tempest.test.BaseTestCase`` attributes. Used by ``oslo.policy`` for
performing matching against attributes that are sent along with the API
calls.
:returns: Dictionary containing additional object data needed by
``oslo.policy`` to validate generic checks.
"""
attr_value = test_obj
formatted_target_data = {}
for user_attribute, attr_string in extra_target_data.items():
attrs = attr_string.split('.')
for attr in attrs:
attr_value = getattr(attr_value, attr)
formatted_target_data[user_attribute] = attr_value
return formatted_target_data