Add support for handling multiple error codes
Patrole currently cannot handle the scenario where two possible
error codes can returned by Neutron policy enforcement for a
failed policy check (403 Forbidden and 404 NotFound), depending
on what role is being tested. Patrole framework can only handle
one expected_exception.
This change builds upon the recent multi-policy support to allow
the tester to specify multiple policy actions for one API test.
For each policy action, the tester would need to specify an
error code that is expected if the action should fail. If multiple
policy actions fail, the error code for the first policy action
that fails will be expected to be returned from the service.
This handles the cases in Neutron where Neutron may use a second
policy rule to determine whether or not to return a 403 error
code or a 404 error code. The tester is expected to list out
which policy rules are being tested by the API endpoint test.
Change-Id: I5cd861e184da90bb27f8ba454c94fa4d4f99c269
Closes-Bug: #1772710
diff --git a/patrole_tempest_plugin/rbac_rule_validation.py b/patrole_tempest_plugin/rbac_rule_validation.py
index 69a43ea..7d48870 100644
--- a/patrole_tempest_plugin/rbac_rule_validation.py
+++ b/patrole_tempest_plugin/rbac_rule_validation.py
@@ -33,11 +33,13 @@
LOG = logging.getLogger(__name__)
_SUPPORTED_ERROR_CODES = [403, 404]
+_DEFAULT_ERROR_CODE = 403
RBACLOG = logging.getLogger('rbac_reporting')
-def action(service, rule='', rules=None, expected_error_code=403,
+def action(service, rule='', rules=None,
+ expected_error_code=_DEFAULT_ERROR_CODE, expected_error_codes=None,
extra_target_data=None):
"""A decorator for verifying OpenStack policy enforcement.
@@ -90,6 +92,25 @@
A 404 should not be provided *unless* the endpoint masks a
``Forbidden`` exception as a ``NotFound`` exception.
+ :param list expected_error_codes: When the ``rules`` list parameter is
+ used, then this list indicates the expected error code to use if one
+ of the rules does not allow the role being tested. This list must
+ coincide with and its elements remain in the same order as the rules
+ in the rules list.
+
+ Example::
+ rules=["api_action1", "api_action2"]
+ expected_error_codes=[404, 403]
+
+ a) If api_action1 fails and api_action2 passes, then the expected
+ error code is 404.
+ b) if api_action2 fails and api_action1 passes, then the expected
+ error code is 403.
+ c) if both api_action1 and api_action2 fail, then the expected error
+ code is the first error seen (404).
+
+ If an error code is missing from the list, it is defaulted to 403.
+
:param dict extra_target_data: Dictionary, keyed with ``oslo.policy``
generic check names, whose values are string literals that reference
nested ``tempest.test.BaseTestCase`` attributes. Used by
@@ -118,7 +139,9 @@
if extra_target_data is None:
extra_target_data = {}
- rules = _prepare_rules(rule, rules)
+ rules, expected_error_codes = _prepare_multi_policy(rule, rules,
+ expected_error_code,
+ expected_error_codes)
def decorator(test_func):
role = CONF.patrole.rbac_test_role
@@ -141,8 +164,18 @@
disallowed_rules.append(rule)
allowed = allowed and _allowed
+ exp_error_code = expected_error_code
+ if disallowed_rules:
+ # Choose the first disallowed rule and expect the error
+ # code corresponding to it.
+ first_error_index = rules.index(disallowed_rules[0])
+ exp_error_code = expected_error_codes[first_error_index]
+ LOG.debug("%s: Expecting %d to be raised for policy name: %s",
+ test_func.__name__, exp_error_code,
+ disallowed_rules[0])
+
expected_exception, irregular_msg = _get_exception_type(
- expected_error_code)
+ exp_error_code)
test_status = 'Allowed'
@@ -202,7 +235,32 @@
return decorator
-def _prepare_rules(rule, rules):
+def _prepare_multi_policy(rule, rules, exp_error_code, exp_error_codes):
+
+ if exp_error_codes:
+ if not rules:
+ msg = ("The `rules` list must be provided if using the "
+ "`expected_error_codes` list.")
+ raise ValueError(msg)
+ if len(rules) != len(exp_error_codes):
+ msg = ("The `expected_error_codes` list is not the same length "
+ "as the `rules` list.")
+ raise ValueError(msg)
+ if exp_error_code:
+ deprecation_msg = (
+ "The `exp_error_code` argument has been deprecated in favor "
+ "of `exp_error_codes` and will be removed in a future "
+ "version.")
+ versionutils.report_deprecated_feature(LOG, deprecation_msg)
+ LOG.debug("The `exp_error_codes` argument will be used instead of "
+ "`exp_error_code`.")
+ if not isinstance(exp_error_codes, (tuple, list)):
+ exp_error_codes = [exp_error_codes]
+ else:
+ exp_error_codes = []
+ if exp_error_code:
+ exp_error_codes.append(exp_error_code)
+
if rules is None:
rules = []
elif not isinstance(rules, (tuple, list)):
@@ -216,7 +274,18 @@
LOG.debug("The `rules` argument will be used instead of `rule`.")
else:
rules.append(rule)
- return rules
+
+ # Fill in the exp_error_codes if needed. This is needed for the scenarios
+ # where no exp_error_codes array is provided, so the error codes must be
+ # set to the default error code value and there must be the same number
+ # of error codes as rules.
+ num_ecs = len(exp_error_codes)
+ num_rules = len(rules)
+ if (num_ecs < num_rules):
+ for i in range(num_rules - num_ecs):
+ exp_error_codes.append(_DEFAULT_ERROR_CODE)
+
+ return rules, exp_error_codes
def _is_authorized(test_obj, service, rule, extra_target_data):