Merge "Assisted Volume snapshot RBAC test for Compute v2.1 API roles"
diff --git a/patrole_tempest_plugin/rbac_auth.py b/patrole_tempest_plugin/rbac_auth.py
index 1afc7ae..40a46a7 100644
--- a/patrole_tempest_plugin/rbac_auth.py
+++ b/patrole_tempest_plugin/rbac_auth.py
@@ -15,15 +15,15 @@
from oslo_log import log as logging
-from patrole_tempest_plugin import rbac_role_converter
+from patrole_tempest_plugin import rbac_policy_parser
LOG = logging.getLogger(__name__)
class RbacAuthority(object):
def __init__(self, tenant_id, service=None):
- self.converter = rbac_role_converter.RbacPolicyConverter(tenant_id,
- service)
+ self.converter = rbac_policy_parser.RbacPolicyParser(tenant_id,
+ service)
def get_permission(self, rule_name, role):
try:
diff --git a/patrole_tempest_plugin/rbac_role_converter.py b/patrole_tempest_plugin/rbac_policy_parser.py
similarity index 97%
rename from patrole_tempest_plugin/rbac_role_converter.py
rename to patrole_tempest_plugin/rbac_policy_parser.py
index bc6e006..860a53d 100644
--- a/patrole_tempest_plugin/rbac_role_converter.py
+++ b/patrole_tempest_plugin/rbac_policy_parser.py
@@ -19,15 +19,13 @@
from oslo_log import log as logging
from oslo_policy import generator
from oslo_policy import policy
-from tempest import config
from patrole_tempest_plugin import rbac_exceptions
-CONF = config.CONF
LOG = logging.getLogger(__name__)
-class RbacPolicyConverter(object):
+class RbacPolicyParser(object):
"""A class for parsing policy rules into lists of allowed roles.
RBAC testing requires that each rule in a policy file be broken up into
@@ -38,7 +36,7 @@
"""
def __init__(self, tenant_id, service, path=None):
- """Initialization of Policy Converter.
+ """Initialization of Rbac Policy Parser.
Parses a policy file to create a dictionary, mapping policy actions to
roles. If a policy file does not exist, checks whether the policy file
@@ -161,5 +159,5 @@
LOG.debug("{0} not found in policy file.".format(apply_rule))
return False
except Exception as e:
- LOG.debug("Exception: {0} for rule: {1}.".format(e, rule))
+ LOG.debug("Exception: {0} for rule: {1}.".format(e, apply_rule))
return False
diff --git a/patrole_tempest_plugin/tests/api/compute/test_config_drive_rbac.py b/patrole_tempest_plugin/tests/api/compute/test_config_drive_rbac.py
new file mode 100644
index 0000000..ed4edfd
--- /dev/null
+++ b/patrole_tempest_plugin/tests/api/compute/test_config_drive_rbac.py
@@ -0,0 +1,53 @@
+# Copyright 2017 AT&T Corporation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from tempest.lib import decorators
+from tempest import test
+
+from patrole_tempest_plugin import rbac_rule_validation
+from patrole_tempest_plugin.rbac_utils import rbac_utils
+from patrole_tempest_plugin.tests.api.compute import rbac_base
+
+
+class ConfigDriveRbacTest(rbac_base.BaseV2ComputeRbacTest):
+
+ @classmethod
+ def setup_clients(cls):
+ super(ConfigDriveRbacTest, cls).setup_clients()
+ cls.client = cls.servers_client
+
+ @classmethod
+ def skip_checks(cls):
+ super(ConfigDriveRbacTest, cls).skip_checks()
+ if not test.is_extension_enabled('os-config-drive', 'compute'):
+ msg = "%s skipped as os-config-drive extension not enabled." \
+ % cls.__name__
+ raise cls.skipException(msg)
+
+ def tearDown(self):
+ rbac_utils.switch_role(self, switchToRbacRole=False)
+ super(ConfigDriveRbacTest, self).tearDown()
+
+ @decorators.idempotent_id('55c62ef7-b72b-4970-acc6-05b0a4316e5d')
+ @rbac_rule_validation.action(
+ service="nova",
+ rule="os_compute_api:os-config-drive")
+ def test_create_test_server_with_config_drive(self):
+ rbac_utils.switch_role(self, switchToRbacRole=True)
+ # NOTE(felipemonteiro): This policy action is always enforced,
+ # regardless whether the config_drive flag is set to true or false.
+ # However, it has been explicitly set to true below, in case that this
+ # behavior ever changes in the future.
+ self.create_test_server(config_drive=True)
diff --git a/patrole_tempest_plugin/tests/api/compute/test_hypervisor_rbac.py b/patrole_tempest_plugin/tests/api/compute/test_hypervisor_rbac.py
index e495b7d..e50bb9e 100644
--- a/patrole_tempest_plugin/tests/api/compute/test_hypervisor_rbac.py
+++ b/patrole_tempest_plugin/tests/api/compute/test_hypervisor_rbac.py
@@ -13,15 +13,13 @@
# License for the specific language governing permissions and limitations
# under the License.
-from tempest import config
from tempest.lib import decorators
+from tempest import test
from patrole_tempest_plugin import rbac_rule_validation
from patrole_tempest_plugin.rbac_utils import rbac_utils
from patrole_tempest_plugin.tests.api.compute import rbac_base
-CONF = config.CONF
-
class HypervisorAdminRbacTest(rbac_base.BaseV2ComputeAdminRbacTest):
@@ -33,9 +31,10 @@
@classmethod
def skip_checks(cls):
super(HypervisorAdminRbacTest, cls).skip_checks()
- if not CONF.compute_feature_enabled.api_extensions:
- raise cls.skipException(
- '%s skipped as no compute extensions enabled' % cls.__name__)
+ if not test.is_extension_enabled('os-hypervisors', 'compute'):
+ msg = "%s skipped as os-hypervisors extension not enabled." \
+ % cls.__name__
+ raise cls.skipException(msg)
def tearDown(self):
rbac_utils.switch_role(self, switchToRbacRole=False)
@@ -46,4 +45,5 @@
service="nova",
rule="os_compute_api:os-hypervisors")
def test_list_hypervisors(self):
+ rbac_utils.switch_role(self, switchToRbacRole=True)
self.client.list_hypervisors()['hypervisors']
diff --git a/patrole_tempest_plugin/tests/api/compute/test_server_actions_rbac.py b/patrole_tempest_plugin/tests/api/compute/test_server_actions_rbac.py
new file mode 100644
index 0000000..9e3b259
--- /dev/null
+++ b/patrole_tempest_plugin/tests/api/compute/test_server_actions_rbac.py
@@ -0,0 +1,81 @@
+# Copyright 2017 AT&T Corporation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from tempest.common import waiters
+from tempest import config
+from tempest.lib import decorators
+
+from patrole_tempest_plugin import rbac_rule_validation
+from patrole_tempest_plugin.rbac_utils import rbac_utils
+from patrole_tempest_plugin.tests.api.compute import rbac_base
+
+CONF = config.CONF
+
+
+class ServerActionsRbacTest(rbac_base.BaseV2ComputeRbacTest):
+
+ def tearDown(self):
+ rbac_utils.switch_role(self, switchToRbacRole=False)
+ super(ServerActionsRbacTest, self).tearDown()
+
+ @classmethod
+ def setup_clients(cls):
+ super(ServerActionsRbacTest, cls).setup_clients()
+ cls.client = cls.servers_client
+
+ @classmethod
+ def skip_checks(cls):
+ super(ServerActionsRbacTest, cls).skip_checks()
+ if not CONF.compute_feature_enabled.api_extensions:
+ raise cls.skipException(
+ '%s skipped as no compute extensions enabled' % cls.__name__)
+ if not CONF.compute_feature_enabled.interface_attach:
+ raise cls.skipException(
+ '%s skipped as interface attachment is not available'
+ % cls.__name__)
+
+ @classmethod
+ def resource_setup(cls):
+ cls.set_validation_resources()
+ super(ServerActionsRbacTest, cls).resource_setup()
+ cls.server_id = cls.create_test_server(wait_until='ACTIVE',
+ validatable=True)['id']
+
+ def _test_start_server(self):
+ self.client.start_server(self.server_id)
+ waiters.wait_for_server_status(self.client, self.server_id,
+ 'ACTIVE')
+
+ def _test_stop_server(self):
+ self.client.stop_server(self.server_id)
+ waiters.wait_for_server_status(self.client, self.server_id,
+ 'SHUTOFF')
+
+ @rbac_rule_validation.action(
+ service="nova",
+ rule="os_compute_api:servers:stop")
+ @decorators.idempotent_id('ab4a17d2-166f-4a6d-9944-f17baa576cf2')
+ def test_stop_server(self):
+ rbac_utils.switch_role(self, switchToRbacRole=True)
+ self._test_stop_server()
+
+ @rbac_rule_validation.action(
+ service="nova",
+ rule="os_compute_api:servers:start")
+ @decorators.idempotent_id('8876bfa9-4d10-406e-a335-a57e451abb12')
+ def test_start_server(self):
+ self._test_stop_server()
+ rbac_utils.switch_role(self, switchToRbacRole=True)
+ self._test_start_server()
diff --git a/patrole_tempest_plugin/tests/api/network/test_floating_ips_rbac.py b/patrole_tempest_plugin/tests/api/network/test_floating_ips_rbac.py
new file mode 100644
index 0000000..8351905
--- /dev/null
+++ b/patrole_tempest_plugin/tests/api/network/test_floating_ips_rbac.py
@@ -0,0 +1,154 @@
+# Copyright 2017 AT&T Corporation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import netaddr
+
+from oslo_log import log
+from tempest import config
+from tempest.lib.common.utils import test_utils
+from tempest.lib import decorators
+from tempest.lib import exceptions
+
+from patrole_tempest_plugin import rbac_exceptions
+from patrole_tempest_plugin import rbac_rule_validation
+from patrole_tempest_plugin.rbac_utils import rbac_utils
+from patrole_tempest_plugin.tests.api.network import rbac_base as base
+
+CONF = config.CONF
+LOG = log.getLogger(__name__)
+
+
+class RbacFloatingIpsTest(base.BaseNetworkRbacTest):
+
+ @classmethod
+ def resource_setup(cls):
+ super(RbacFloatingIpsTest, cls).resource_setup()
+
+ # Create an external network for floating ip creation
+ cls.fip_extnet = cls.create_network(**{'router:external': True})
+ cls.fip_extnet_id = cls.fip_extnet['id']
+
+ # Create a subnet for the external network
+ cls.cidr = netaddr.IPNetwork(CONF.network.project_network_cidr)
+ cls.create_subnet(cls.fip_extnet,
+ cidr=cls.cidr,
+ mask_bits=24)
+
+ @classmethod
+ def resource_cleanup(cls):
+ # Update router:external attribute to False for proper subnet resource
+ # cleanup by base class
+ cls.networks_client.update_network(cls.fip_extnet_id,
+ **{'router:external': False})
+ super(RbacFloatingIpsTest, cls).resource_cleanup()
+
+ def _create_floatingip(self, floating_ip_address=None):
+ if floating_ip_address is not None:
+ body = self.floating_ips_client.create_floatingip(
+ floating_network_id=self.fip_extnet_id,
+ floating_ip_address=floating_ip_address)
+ else:
+ body = self.floating_ips_client.create_floatingip(
+ floating_network_id=self.fip_extnet_id)
+
+ floating_ip = body['floatingip']
+ self.addCleanup(test_utils.call_and_ignore_notfound_exc,
+ self.floating_ips_client.delete_floatingip,
+ floating_ip['id'])
+
+ return floating_ip
+
+ def tearDown(self):
+ rbac_utils.switch_role(self, switchToRbacRole=False)
+ super(RbacFloatingIpsTest, self).tearDown()
+
+ @rbac_rule_validation.action(service="neutron",
+ rule="create_floatingip")
+ @decorators.idempotent_id('f8f7474c-b8a5-4174-af84-73097d6ced38')
+ def test_create_floating_ip(self):
+ """Create floating IP.
+
+ RBAC test for the neutron create_floatingip policy
+ """
+ rbac_utils.switch_role(self, switchToRbacRole=True)
+ self._create_floatingip()
+
+ @rbac_rule_validation.action(service="neutron",
+ rule="create_floatingip:floating_ip_address")
+ @decorators.idempotent_id('a8bb826a-403d-4130-a55d-120a0a660806')
+ def test_create_floating_ip_floatingip_address(self):
+ """Create floating IP with address.
+
+ RBAC test for the neutron create_floatingip:floating_ip_address policy
+ """
+ fip = str(netaddr.IPAddress(self.cidr) + 10)
+
+ rbac_utils.switch_role(self, switchToRbacRole=True)
+ self._create_floatingip(floating_ip_address=fip)
+
+ @rbac_rule_validation.action(service="neutron",
+ rule="update_floatingip")
+ @decorators.idempotent_id('2ab1b060-19f8-4ef6-a838-e2ab7b377c63')
+ def test_update_floating_ip(self):
+ """Update floating IP.
+
+ RBAC test for the neutron update_floatingip policy
+ """
+ floating_ip = self._create_floatingip()
+ rbac_utils.switch_role(self, switchToRbacRole=True)
+
+ # Associate floating IP to the other port
+ self.floating_ips_client.update_floatingip(
+ floating_ip['id'], port_id=None)
+
+ @rbac_rule_validation.action(service="neutron", rule="get_floatingip")
+ @decorators.idempotent_id('f8846fd0-c976-48fe-a148-105303931b32')
+ def test_show_floating_ip(self):
+ """Show floating IP.
+
+ RBAC test for the neutron get_floatingip policy
+ """
+ floating_ip = self._create_floatingip()
+ rbac_utils.switch_role(self, switchToRbacRole=True)
+
+ try:
+ # Show floating IP
+ self.floating_ips_client.show_floatingip(floating_ip['id'])
+ except exceptions.NotFound as e:
+ LOG.info("NotFound exception caught. Exception is thrown when "
+ "role doesn't have access to the endpoint."
+ "This is irregular and should be fixed.")
+ raise rbac_exceptions.RbacActionFailed(e)
+
+ @rbac_rule_validation.action(service="neutron",
+ rule="delete_floatingip")
+ @decorators.idempotent_id('2611b068-30d4-4241-a78f-1b801a14db7e')
+ def test_delete_floating_ip(self):
+ """Delete floating IP.
+
+ RBAC test for the neutron delete_floatingip policy
+ """
+ floating_ip = self._create_floatingip()
+ rbac_utils.switch_role(self, switchToRbacRole=True)
+
+ try:
+ # Delete the floating IP
+ self.floating_ips_client.delete_floatingip(floating_ip['id'])
+
+ except exceptions.NotFound as e:
+ LOG.info("NotFound exception caught. Exception is thrown when "
+ "role doesn't have access to the endpoint."
+ "This is irregular and should be fixed.")
+ raise rbac_exceptions.RbacActionFailed(e)
diff --git a/tests/test_rbac_role_converter.py b/tests/test_rbac_policy_parser.py
similarity index 93%
rename from tests/test_rbac_role_converter.py
rename to tests/test_rbac_policy_parser.py
index 09fa081..cc0fc4f 100644
--- a/tests/test_rbac_role_converter.py
+++ b/tests/test_rbac_policy_parser.py
@@ -19,7 +19,7 @@
from tempest import config
from tempest.tests import base
-from patrole_tempest_plugin import rbac_role_converter
+from patrole_tempest_plugin import rbac_policy_parser
CONF = config.CONF
@@ -43,12 +43,12 @@
'resources',
'tenant_rbac_policy.json')
- @mock.patch.object(rbac_role_converter, 'LOG', autospec=True)
+ @mock.patch.object(rbac_policy_parser, 'LOG', autospec=True)
def test_custom_policy(self, m_log):
default_roles = ['zero', 'one', 'two', 'three', 'four',
'five', 'six', 'seven', 'eight', 'nine']
- converter = rbac_role_converter.RbacPolicyConverter(
+ converter = rbac_policy_parser.RbacPolicyParser(
None, "test", self.custom_policy_file)
expected = {
@@ -76,7 +76,7 @@
self.assertFalse(converter.allowed(rule, role))
def test_admin_policy_file_with_admin_role(self):
- converter = rbac_role_converter.RbacPolicyConverter(
+ converter = rbac_policy_parser.RbacPolicyParser(
None, "test", self.admin_policy_file)
role = 'admin'
@@ -94,7 +94,7 @@
self.assertFalse(allowed)
def test_admin_policy_file_with_member_role(self):
- converter = rbac_role_converter.RbacPolicyConverter(
+ converter = rbac_policy_parser.RbacPolicyParser(
None, "test", self.admin_policy_file)
role = 'Member'
@@ -113,7 +113,7 @@
self.assertFalse(allowed)
def test_admin_policy_file_with_context_is_admin(self):
- converter = rbac_role_converter.RbacPolicyConverter(
+ converter = rbac_policy_parser.RbacPolicyParser(
None, "test", self.alt_admin_policy_file)
role = 'fake_admin'
@@ -147,7 +147,7 @@
network:tenant_id pass.
"""
test_tenant_id = mock.sentinel.tenant_id
- converter = rbac_role_converter.RbacPolicyConverter(
+ converter = rbac_policy_parser.RbacPolicyParser(
test_tenant_id, "test", self.tenant_policy_file)
# Check whether Member role can perform expected actions.