Merge "Docs: Add requirements authority module to documentation"
diff --git a/patrole_tempest_plugin/policy_authority.py b/patrole_tempest_plugin/policy_authority.py
index 9499bf6..b813f88 100644
--- a/patrole_tempest_plugin/policy_authority.py
+++ b/patrole_tempest_plugin/policy_authority.py
@@ -156,9 +156,9 @@
def allowed(self, rule_name, role):
"""Checks if a given rule in a policy is allowed with given role.
- :param string rule_name: Rule to be checked using ``oslo.policy``.
- :param bool is_admin: Whether admin context is used.
- :raises RbacParsingException: If `rule_name`` does not exist in the
+ :param string rule_name: Policy name to pass to``oslo.policy``.
+ :param string role: Role to validate for authorization.
+ :raises RbacParsingException: If ``rule_name`` does not exist in the
cloud (in policy file or among registered in-code policy defaults).
"""
is_admin_context = self._is_admin_context(role)
diff --git a/patrole_tempest_plugin/requirements_authority.py b/patrole_tempest_plugin/requirements_authority.py
index 683a7eb..75df9f4 100644
--- a/patrole_tempest_plugin/requirements_authority.py
+++ b/patrole_tempest_plugin/requirements_authority.py
@@ -16,14 +16,17 @@
from oslo_log import log as logging
+from tempest import config
from tempest.lib import exceptions
from patrole_tempest_plugin.rbac_authority import RbacAuthority
+CONF = config.CONF
LOG = logging.getLogger(__name__)
class RequirementsParser(object):
+ """A class that parses a custom requirements file."""
_inner = None
class Inner(object):
@@ -40,6 +43,27 @@
@staticmethod
def parse(component):
+ """Parses a requirements file with the following format:
+
+ .. code-block:: yaml
+
+ <service_foo>:
+ <api_action_a>:
+ - <allowed_role_1>
+ - <allowed_role_2>
+ - <allowed_role_3>
+ <api_action_b>:
+ - <allowed_role_2>
+ - <allowed_role_4>
+ <service_bar>:
+ <api_action_c>:
+ - <allowed_role_3>
+
+ :param str component: Name of the OpenStack service to be validated.
+ :returns: The dictionary that maps each policy action to the list
+ of allowed roles, for the given ``component``.
+ :rtype: dict
+ """
try:
for section in RequirementsParser.Inner._rbac_map:
if component in section:
@@ -51,13 +75,39 @@
class RequirementsAuthority(RbacAuthority):
+ """A class that uses a custom requirements file to validate RBAC."""
+
def __init__(self, filepath=None, component=None):
- if filepath is not None and component is not None:
+ """This class can be used to achieve a requirements-driven approach to
+ validating an OpenStack cloud's RBAC implementation. Using this
+ approach, Patrole computes expected test results by performing lookups
+ against a custom requirements file which precisely defines the cloud's
+ RBAC requirements.
+
+ :param str filepath: Path where the custom requirements file lives.
+ Defaults to ``[patrole].custom_requirements_file``.
+ :param str component: Name of the OpenStack service to be validated.
+ """
+ filepath = filepath or CONF.patrole.custom_requirements_file
+
+ if component is not None:
self.roles_dict = RequirementsParser(filepath).parse(component)
else:
self.roles_dict = None
def allowed(self, rule_name, role):
+ """Checks if a given rule in a policy is allowed with given role.
+
+ :param string rule_name: Rule to be checked using provided requirements
+ file specified by ``[patrole].custom_requirements_file``. Must be
+ a key present in this file, under the appropriate component.
+ :param string role: Role to validate against custom requirements file.
+ :returns: True if ``role`` is allowed to perform ``rule_name``, else
+ False.
+ :rtype: bool
+ :raises KeyError: If ``rule_name`` does not exist among the keyed
+ policy names in the custom requirements file.
+ """
if self.roles_dict is None:
raise exceptions.InvalidConfiguration(
"Roles dictionary parsed from requirements YAML file is "
diff --git a/patrole_tempest_plugin/tests/api/compute/test_server_misc_policy_actions_rbac.py b/patrole_tempest_plugin/tests/api/compute/test_server_misc_policy_actions_rbac.py
index 58a829d..f6dfcca 100644
--- a/patrole_tempest_plugin/tests/api/compute/test_server_misc_policy_actions_rbac.py
+++ b/patrole_tempest_plugin/tests/api/compute/test_server_misc_policy_actions_rbac.py
@@ -729,41 +729,3 @@
with self.rbac_utils.override_role(self):
self.servers_client.add_fixed_ip(self.server['id'],
networkId=network_id)
-
-
-class VirtualInterfacesRbacTest(rbac_base.BaseV2ComputeRbacTest):
- # The compute os-virtual-interfaces API is deprecated from the Microversion
- # 2.44 onward. For more information, see:
- # https://developer.openstack.org/api-ref/compute/#servers-virtual-interfaces-servers-os-virtual-interfaces-deprecated
- max_microversion = '2.43'
-
- @classmethod
- def setup_credentials(cls):
- # This test needs a network and a subnet
- cls.set_network_resources(network=True, subnet=True)
- super(VirtualInterfacesRbacTest, cls).setup_credentials()
-
- @classmethod
- def resource_setup(cls):
- super(VirtualInterfacesRbacTest, cls).resource_setup()
- cls.server = cls.create_test_server(wait_until='ACTIVE')
-
- @rbac_rule_validation.action(
- service="nova",
- rule="os_compute_api:os-virtual-interfaces")
- @decorators.idempotent_id('fc719ae3-0f73-4689-8378-1b841f0f2818')
- def test_list_virtual_interfaces(self):
- """Test list virtual interfaces, part of os-virtual-interfaces.
-
- If Neutron is available, then call the API and expect it to fail
- with a 400 BadRequest (policy enforcement is done before that happens).
- """
- with self.rbac_utils.override_role(self):
- if CONF.service_available.neutron:
- msg = ("Listing virtual interfaces is not supported by this "
- "cloud.")
- with self.assertRaisesRegex(lib_exc.BadRequest, msg):
- self.servers_client.list_virtual_interfaces(
- self.server['id'])
- else:
- self.servers_client.list_virtual_interfaces(self.server['id'])
diff --git a/patrole_tempest_plugin/tests/api/compute/test_virtual_interfaces_rbac.py b/patrole_tempest_plugin/tests/api/compute/test_virtual_interfaces_rbac.py
new file mode 100644
index 0000000..6edb8d9
--- /dev/null
+++ b/patrole_tempest_plugin/tests/api/compute/test_virtual_interfaces_rbac.py
@@ -0,0 +1,61 @@
+# Copyright 2017 AT&T Corporation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from tempest import config
+from tempest.lib import decorators
+from tempest.lib import exceptions as lib_exc
+
+from patrole_tempest_plugin import rbac_rule_validation
+from patrole_tempest_plugin.tests.api.compute import rbac_base
+
+CONF = config.CONF
+
+
+class VirtualInterfacesRbacTest(rbac_base.BaseV2ComputeRbacTest):
+ # The compute os-virtual-interfaces API is deprecated from the Microversion
+ # 2.44 onward. For more information, see:
+ # https://developer.openstack.org/api-ref/compute/#servers-virtual-interfaces-servers-os-virtual-interfaces-deprecated
+ max_microversion = '2.43'
+
+ @classmethod
+ def setup_credentials(cls):
+ # This test needs a network and a subnet
+ cls.set_network_resources(network=True, subnet=True)
+ super(VirtualInterfacesRbacTest, cls).setup_credentials()
+
+ @classmethod
+ def resource_setup(cls):
+ super(VirtualInterfacesRbacTest, cls).resource_setup()
+ cls.server = cls.create_test_server(wait_until='ACTIVE')
+
+ @rbac_rule_validation.action(
+ service="nova",
+ rule="os_compute_api:os-virtual-interfaces")
+ @decorators.idempotent_id('fc719ae3-0f73-4689-8378-1b841f0f2818')
+ def test_list_virtual_interfaces(self):
+ """Test list virtual interfaces, part of os-virtual-interfaces.
+
+ If Neutron is available, then call the API and expect it to fail
+ with a 400 BadRequest (policy enforcement is done before that happens).
+ """
+ with self.rbac_utils.override_role(self):
+ if CONF.service_available.neutron:
+ msg = ("Listing virtual interfaces is not supported by this "
+ "cloud.")
+ with self.assertRaisesRegex(lib_exc.BadRequest, msg):
+ self.servers_client.list_virtual_interfaces(
+ self.server['id'])
+ else:
+ self.servers_client.list_virtual_interfaces(self.server['id'])