requirements authority: Use better exception/return code
This patch set makes some minor modifications to the requirements
authority module:
* return {} instead of None in RequirementsParser.parse since
its return type is dict
* raise RbacParsingException if an invalid rule is passed to
RequirementsAuthority.allowed since KeyError is a builtin type
and is not specific enough
* change the exception message raise for the above case -- "API"
is not the right word; the word should be "rule name" as that
is what is being keyed into the roles_dict
Change-Id: Ia4408c0745d2b5ddb1c73c1eb9a6316ae0c1f646
diff --git a/patrole_tempest_plugin/requirements_authority.py b/patrole_tempest_plugin/requirements_authority.py
index 4697c3b..4d6f25b 100644
--- a/patrole_tempest_plugin/requirements_authority.py
+++ b/patrole_tempest_plugin/requirements_authority.py
@@ -18,9 +18,10 @@
from oslo_log import log as logging
from tempest import config
-from tempest.lib import exceptions
+from tempest.lib import exceptions as lib_exc
from patrole_tempest_plugin.rbac_authority import RbacAuthority
+from patrole_tempest_plugin import rbac_exceptions
CONF = config.CONF
LOG = logging.getLogger(__name__)
@@ -81,7 +82,7 @@
except yaml.parser.ParserError:
LOG.error("Error while parsing the requirements YAML file. Did "
"you pass a valid component name from the test case?")
- return None
+ return {}
class RequirementsAuthority(RbacAuthority):
@@ -98,10 +99,10 @@
Defaults to ``[patrole].custom_requirements_file``.
:param str component: Name of the OpenStack service to be validated.
"""
- filepath = filepath or CONF.patrole.custom_requirements_file
-
+ self.filepath = filepath or CONF.patrole.custom_requirements_file
if component is not None:
- self.roles_dict = RequirementsParser(filepath).parse(component)
+ self.roles_dict = RequirementsParser(self.filepath).parse(
+ component)
else:
self.roles_dict = None
@@ -116,33 +117,34 @@
:returns: True if ``role`` is allowed to perform ``rule_name``, else
False.
:rtype: bool
- :raises KeyError: If ``rule_name`` does not exist among the keyed
- policy names in the custom requirements file.
+ :raises RbacParsingException: If ``rule_name`` does not exist among the
+ keyed policy names in the custom requirements file.
"""
- if self.roles_dict is None:
- raise exceptions.InvalidConfiguration(
+ if not self.roles_dict:
+ raise lib_exc.InvalidConfiguration(
"Roles dictionary parsed from requirements YAML file is "
"empty. Ensure the requirements YAML file is correctly "
"formatted.")
try:
requirement_roles = self.roles_dict[rule_name]
-
- for role_reqs in requirement_roles:
- required_roles = [
- role for role in role_reqs if not role.startswith("!")]
- forbidden_roles = [
- role[1:] for role in role_reqs if role.startswith("!")]
-
- # User must have all required roles
- required_passed = all([r in roles for r in required_roles])
- # User must not have any forbidden roles
- forbidden_passed = all([r not in forbidden_roles
- for r in roles])
-
- if required_passed and forbidden_passed:
- return True
-
- return False
except KeyError:
- raise KeyError("'%s' API is not defined in the requirements YAML "
- "file" % rule_name)
+ raise rbac_exceptions.RbacParsingException(
+ "'%s' rule name is not defined in the requirements YAML file: "
+ "%s" % (rule_name, self.filepath))
+
+ for role_reqs in requirement_roles:
+ required_roles = [
+ role for role in role_reqs if not role.startswith("!")]
+ forbidden_roles = [
+ role[1:] for role in role_reqs if role.startswith("!")]
+
+ # User must have all required roles
+ required_passed = all([r in roles for r in required_roles])
+ # User must not have any forbidden roles
+ forbidden_passed = all([r not in forbidden_roles
+ for r in roles])
+
+ if required_passed and forbidden_passed:
+ return True
+
+ return False
diff --git a/patrole_tempest_plugin/tests/unit/test_requirements_authority.py b/patrole_tempest_plugin/tests/unit/test_requirements_authority.py
index 0f7310e..94af81f 100644
--- a/patrole_tempest_plugin/tests/unit/test_requirements_authority.py
+++ b/patrole_tempest_plugin/tests/unit/test_requirements_authority.py
@@ -17,6 +17,7 @@
from tempest.lib import exceptions
from tempest.tests import base
+from patrole_tempest_plugin import rbac_exceptions
from patrole_tempest_plugin import requirements_authority as req_auth
@@ -50,17 +51,17 @@
self.rbac_auth.allowed, "", [""])
def test_auth_allowed_role_in_api(self):
- self.rbac_auth.roles_dict = {'api': [['_member_']]}
- self.assertTrue(self.rbac_auth.allowed("api", ["_member_"]))
+ self.rbac_auth.roles_dict = {'rule': [['_member_']]}
+ self.assertTrue(self.rbac_auth.allowed("rule", ["_member_"]))
def test_auth_allowed_role_not_in_api(self):
- self.rbac_auth.roles_dict = {'api': [['_member_']]}
- self.assertFalse(self.rbac_auth.allowed("api", "support_member"))
+ self.rbac_auth.roles_dict = {'rule': [['_member_']]}
+ self.assertFalse(self.rbac_auth.allowed("rule", "support_member"))
- def test_parser_get_allowed_except_keyerror(self):
- self.rbac_auth.roles_dict = {}
- self.assertRaises(KeyError, self.rbac_auth.allowed,
- "api", "support_member")
+ def test_parser_get_allowed_invalid_rule_raises_parsing_exception(self):
+ self.rbac_auth.roles_dict = {"foo": "bar"}
+ self.assertRaises(rbac_exceptions.RbacParsingException,
+ self.rbac_auth.allowed, "baz", "support_member")
def test_parser_init(self):
req_auth.RequirementsParser(self.yaml_test_file)
@@ -90,7 +91,7 @@
self.rbac_auth.roles_dict = \
req_auth.RequirementsParser.parse("Failure")
- self.assertIsNone(self.rbac_auth.roles_dict)
+ self.assertFalse(self.rbac_auth.roles_dict)
self.assertRaises(exceptions.InvalidConfiguration,
self.rbac_auth.allowed, "", [""])
@@ -99,10 +100,10 @@
considered as part of role itself.
"""
- self.rbac_auth.roles_dict = {'api': [['!admin']]}
- self.assertTrue(self.rbac_auth.allowed("api", ["member"]))
- self.assertTrue(self.rbac_auth.allowed("api", ["!admin"]))
- self.assertFalse(self.rbac_auth.allowed("api", ["admin"]))
+ self.rbac_auth.roles_dict = {'rule': [['!admin']]}
+ self.assertTrue(self.rbac_auth.allowed("rule", ["member"]))
+ self.assertTrue(self.rbac_auth.allowed("rule", ["!admin"]))
+ self.assertFalse(self.rbac_auth.allowed("rule", ["admin"]))
class RequirementsAuthorityMultiRoleTest(BaseRequirementsAuthorityTest):
@@ -112,39 +113,39 @@
considered as part of role itself.
"""
- self.rbac_auth.roles_dict = {'api': [['member', '!admin']]}
- self.assertFalse(self.rbac_auth.allowed("api", ["member", "admin"]))
- self.assertTrue(self.rbac_auth.allowed("api", ["member", "!admin"]))
+ self.rbac_auth.roles_dict = {'rule': [['member', '!admin']]}
+ self.assertFalse(self.rbac_auth.allowed("rule", ["member", "admin"]))
+ self.assertTrue(self.rbac_auth.allowed("rule", ["member", "!admin"]))
def test_auth_allowed_single_rule_scenario(self):
# member and support and not admin and not manager
- self.rbac_auth.roles_dict = {'api': [['member', 'support',
- '!admin', '!manager']]}
+ self.rbac_auth.roles_dict = {'rule': [['member', 'support',
+ '!admin', '!manager']]}
# User is member and support and not manager or admin
- self.assertTrue(self.rbac_auth.allowed("api", ["member",
- "support"]))
+ self.assertTrue(self.rbac_auth.allowed("rule", ["member",
+ "support"]))
# User is member and not manager or admin, but not support
- self.assertFalse(self.rbac_auth.allowed("api", ["member"]))
+ self.assertFalse(self.rbac_auth.allowed("rule", ["member"]))
# User is support and not manager or admin, but not member
- self.assertFalse(self.rbac_auth.allowed("api", ["support"]))
+ self.assertFalse(self.rbac_auth.allowed("rule", ["support"]))
# User is member and support and not manager, but have admin role
- self.assertFalse(self.rbac_auth.allowed("api", ["member",
- "support",
- "admin"]))
+ self.assertFalse(self.rbac_auth.allowed("rule", ["member",
+ "support",
+ "admin"]))
# User is member and not manager, but have admin role and not support
- self.assertFalse(self.rbac_auth.allowed("api", ["member",
- "admin"]))
+ self.assertFalse(self.rbac_auth.allowed("rule", ["member",
+ "admin"]))
# User is member and support, but have manager and admin roles
- self.assertFalse(self.rbac_auth.allowed("api", ["member",
- "support",
- "admin",
- "manager"]))
+ self.assertFalse(self.rbac_auth.allowed("rule", ["member",
+ "support",
+ "admin",
+ "manager"]))
def test_auth_allowed_multi_rule_scenario(self):
rules = [
@@ -152,36 +153,36 @@
['member', 'admin'],
["manager"]
]
- self.rbac_auth.roles_dict = {'api': rules}
+ self.rbac_auth.roles_dict = {'rule': rules}
# Not a single role allows viewer
- self.assertFalse(self.rbac_auth.allowed("api", ["viewer"]))
+ self.assertFalse(self.rbac_auth.allowed("rule", ["viewer"]))
# We have no rule that allows support and admin
- self.assertFalse(self.rbac_auth.allowed("api", ["support",
- "admin"]))
+ self.assertFalse(self.rbac_auth.allowed("rule", ["support",
+ "admin"]))
# There is no rule that requires member without additional requirements
- self.assertFalse(self.rbac_auth.allowed("api", ["member"]))
+ self.assertFalse(self.rbac_auth.allowed("rule", ["member"]))
# Pass with rules[2]
- self.assertTrue(self.rbac_auth.allowed("api", ["manager"]))
+ self.assertTrue(self.rbac_auth.allowed("rule", ["manager"]))
# Pass with rules[0]
- self.assertTrue(self.rbac_auth.allowed("api", ["member",
- "support"]))
+ self.assertTrue(self.rbac_auth.allowed("rule", ["member",
+ "support"]))
# Pass with rules[1]
- self.assertTrue(self.rbac_auth.allowed("api", ["member",
- "admin"]))
+ self.assertTrue(self.rbac_auth.allowed("rule", ["member",
+ "admin"]))
# Pass with rules[2]
- self.assertTrue(self.rbac_auth.allowed("api", ["manager",
- "admin"]))
+ self.assertTrue(self.rbac_auth.allowed("rule", ["manager",
+ "admin"]))
# Pass with rules[1]
- self.assertTrue(self.rbac_auth.allowed("api", ["member",
- "support",
- "admin"]))
+ self.assertTrue(self.rbac_auth.allowed("rule", ["member",
+ "support",
+ "admin"]))
# Pass with rules[1]
- self.assertTrue(self.rbac_auth.allowed("api", ["member",
- "support",
- "admin",
- "manager"]))
+ self.assertTrue(self.rbac_auth.allowed("rule", ["member",
+ "support",
+ "admin",
+ "manager"]))
# Pass with rules[2]
- self.assertTrue(self.rbac_auth.allowed("api", ["admin",
- "manager"]))
+ self.assertTrue(self.rbac_auth.allowed("rule", ["admin",
+ "manager"]))