Merge "Refactor policy parser init so that validate service is in helper"
diff --git a/patrole_tempest_plugin/rbac_rule_validation.py b/patrole_tempest_plugin/rbac_rule_validation.py
index ba04a30..53f84ff 100644
--- a/patrole_tempest_plugin/rbac_rule_validation.py
+++ b/patrole_tempest_plugin/rbac_rule_validation.py
@@ -29,6 +29,8 @@
CONF = config.CONF
LOG = logging.getLogger(__name__)
+_SUPPORTED_ERROR_CODES = [403, 404]
+
def action(service, rule='', admin_only=False, expected_error_code=403,
extra_target_data=None):
@@ -47,15 +49,13 @@
:param service: A OpenStack service: for example, "nova" or "neutron".
:param rule: A policy action defined in a policy.json file (or in code).
:param admin_only: Skips over oslo.policy check because the policy action
- defined by `rule` is not enforced by the service's
- policy enforcement logic. For example, Keystone v2
- performs an admin check for most of its endpoints. If
- True, `rule` is effectively ignored.
+ defined by `rule` is not enforced by the service's policy enforcement
+ logic. For example, Keystone v2 performs an admin check for most of its
+ endpoints. If True, `rule` is effectively ignored.
:param expected_error_code: Overrides default value of 403 (Forbidden)
- with endpoint-specific error code. Currently
- only supports 403 and 404. Support for 404
- is needed because some services, like Neutron,
- intentionally throw a 404 for security reasons.
+ with endpoint-specific error code. Currently only supports 403 and 404.
+ Support for 404 is needed because some services, like Neutron,
+ intentionally throw a 404 for security reasons.
:raises NotFound: if `service` is invalid or
if Tempest credentials cannot be found.
@@ -163,10 +163,29 @@
return False
-def _get_exception_type(expected_error_code):
+def _get_exception_type(expected_error_code=403):
+ """Dynamically calculate the expected exception to be caught.
+
+ Dynamically calculate the expected exception to be caught by the test case.
+ Only `Forbidden` and `NotFound` exceptions are permitted. `NotFound` is
+ supported because Neutron, for security reasons, masks `Forbidden`
+ exceptions as `NotFound` exceptions.
+
+ :param expected_error_code: the integer representation of the expected
+ exception to be caught. Must be contained in `_SUPPORTED_ERROR_CODES`.
+ :returns: tuple of the exception type corresponding to
+ `expected_error_code` and a message explaining that a non-Forbidden
+ exception was expected, if applicable.
+ """
expected_exception = None
irregular_msg = None
- supported_error_codes = [403, 404]
+
+ if not isinstance(expected_error_code, six.integer_types) \
+ or expected_error_code not in _SUPPORTED_ERROR_CODES:
+ msg = ("Please pass an expected error code. Currently "
+ "supported codes: {0}".format(_SUPPORTED_ERROR_CODES))
+ LOG.error(msg)
+ raise rbac_exceptions.RbacInvalidErrorCode(msg)
if expected_error_code == 403:
expected_exception = exceptions.Forbidden
@@ -175,11 +194,6 @@
irregular_msg = ("NotFound exception was caught for policy action "
"{0}. The service {1} throws a 404 instead of a 403, "
"which is irregular.")
- else:
- msg = ("Please pass an expected error code. Currently "
- "supported codes: {0}".format(str(supported_error_codes)))
- LOG.error(msg)
- raise rbac_exceptions.RbacInvalidErrorCode()
return expected_exception, irregular_msg
@@ -200,7 +214,7 @@
:param test_obj: type BaseTestCase (tempest base test class)
:param extra_target_data: dictionary with unresolved string literals that
- reference nested BaseTestCase attributes
+ reference nested BaseTestCase attributes
:returns: dictionary with resolved BaseTestCase attributes
"""
attr_value = test_obj
diff --git a/patrole_tempest_plugin/tests/api/compute/test_admin_server_actions_rbac.py b/patrole_tempest_plugin/tests/api/compute/test_admin_server_actions_rbac.py
deleted file mode 100644
index 37fad18..0000000
--- a/patrole_tempest_plugin/tests/api/compute/test_admin_server_actions_rbac.py
+++ /dev/null
@@ -1,65 +0,0 @@
-# Copyright 2017 AT&T Corporation.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from tempest import config
-from tempest.lib import decorators
-from tempest import test
-
-from patrole_tempest_plugin import rbac_rule_validation
-from patrole_tempest_plugin.tests.api.compute import rbac_base
-
-CONF = config.CONF
-
-
-class AdminServerActionsRbacTest(rbac_base.BaseV2ComputeRbacTest):
-
- @classmethod
- def skip_checks(cls):
- super(AdminServerActionsRbacTest, cls).skip_checks()
- if not test.is_extension_enabled('os-admin-actions', 'compute'):
- msg = "%s skipped as os-admin-actions not enabled." % cls.__name__
- raise cls.skipException(msg)
-
- @classmethod
- def resource_setup(cls):
- super(AdminServerActionsRbacTest, cls).resource_setup()
- cls.server_id = cls.create_test_server(wait_until='ACTIVE')['id']
-
- @rbac_rule_validation.action(
- service="nova",
- rule="os_compute_api:os-admin-actions:reset_state")
- @decorators.idempotent_id('ae84dd0b-f364-462e-b565-3457f9c019ef')
- def test_reset_server_state(self):
- self.rbac_utils.switch_role(self, toggle_rbac_role=True)
- self.servers_client.reset_state(self.server_id, state='error')
- self.addCleanup(self.servers_client.reset_state,
- self.server_id,
- state='active')
-
- @rbac_rule_validation.action(
- service="nova",
- rule="os_compute_api:os-admin-actions:inject_network_info")
- @decorators.idempotent_id('ce48c340-51c1-4cff-9b6e-0cc5ef008630')
- def test_inject_network_info(self):
- self.rbac_utils.switch_role(self, toggle_rbac_role=True)
- self.servers_client.inject_network_info(self.server_id)
-
- @rbac_rule_validation.action(
- service="nova",
- rule="os_compute_api:os-admin-actions:reset_network")
- @decorators.idempotent_id('2911a242-15c4-4fcb-80d5-80a8930661b0')
- def test_reset_network(self):
- self.rbac_utils.switch_role(self, toggle_rbac_role=True)
- self.servers_client.reset_network(self.server_id)
diff --git a/patrole_tempest_plugin/tests/api/compute/test_deferred_delete_rbac.py b/patrole_tempest_plugin/tests/api/compute/test_deferred_delete_rbac.py
deleted file mode 100644
index ebeab76..0000000
--- a/patrole_tempest_plugin/tests/api/compute/test_deferred_delete_rbac.py
+++ /dev/null
@@ -1,47 +0,0 @@
-# Copyright 2017 AT&T Corporation.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from tempest.lib import decorators
-from tempest import test
-
-from patrole_tempest_plugin import rbac_rule_validation
-from patrole_tempest_plugin.tests.api.compute import rbac_base
-
-
-class DeferredDeleteRbacTest(rbac_base.BaseV2ComputeRbacTest):
-
- @classmethod
- def skip_checks(cls):
- super(DeferredDeleteRbacTest, cls).skip_checks()
- if not test.is_extension_enabled('os-deferred-delete', 'compute'):
- msg = "%s skipped as os-deferred-delete extension not enabled." \
- % cls.__name__
- raise cls.skipException(msg)
-
- @classmethod
- def resource_setup(cls):
- super(DeferredDeleteRbacTest, cls).resource_setup()
- cls.server = cls.create_test_server(wait_until='ACTIVE')
-
- @decorators.idempotent_id('189bfed4-1e6d-475c-bb8c-d57e60895391')
- @rbac_rule_validation.action(
- service="nova",
- rule="os_compute_api:os-deferred-delete")
- def test_force_delete_server(self):
- self.rbac_utils.switch_role(self, toggle_rbac_role=True)
- # Force-deleting a server enforces os-deferred-delete according to the
- # following API: https://github.com/openstack/nova/blob/master/nova/api
- # /openstack/compute/deferred_delete.py
- self.servers_client.force_delete_server(self.server['id'])
diff --git a/patrole_tempest_plugin/tests/api/compute/test_lock_server_rbac.py b/patrole_tempest_plugin/tests/api/compute/test_lock_server_rbac.py
new file mode 100644
index 0000000..1daf305
--- /dev/null
+++ b/patrole_tempest_plugin/tests/api/compute/test_lock_server_rbac.py
@@ -0,0 +1,58 @@
+# Copyright 2017 AT&T Corporation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from tempest.lib import decorators
+
+from patrole_tempest_plugin import rbac_rule_validation
+from patrole_tempest_plugin.tests.api.compute import rbac_base as base
+
+
+class ComputeLockServersRbacTest(base.BaseV2ComputeRbacTest):
+
+ @rbac_rule_validation.action(
+ service="nova",
+ rule="os_compute_api:os-lock-server:lock")
+ @decorators.idempotent_id('b81e10fb-1864-498f-8c1d-5175c6fec5fb')
+ def test_lock_server(self):
+ server = self.create_test_server(wait_until='ACTIVE')
+ self.rbac_utils.switch_role(self, toggle_rbac_role=True)
+ self.servers_client.lock_server(server['id'])
+ self.addCleanup(self.servers_client.unlock_server, server['id'])
+
+ @rbac_rule_validation.action(
+ service="nova",
+ rule="os_compute_api:os-lock-server:unlock")
+ @decorators.idempotent_id('d50ef8e8-4bce-11e7-b114-b2f933d5fe66')
+ def test_unlock_server(self):
+ server = self.create_test_server(wait_until='ACTIVE')
+ self.servers_client.lock_server(server['id'])
+ self.addCleanup(self.servers_client.unlock_server, server['id'])
+ self.rbac_utils.switch_role(self, toggle_rbac_role=True)
+ self.servers_client.unlock_server(server['id'])
+
+ @rbac_rule_validation.action(
+ service="nova",
+ rule="os_compute_api:os-lock-server:unlock:unlock_override")
+ @decorators.idempotent_id('40dfeef9-73ee-48a9-be19-a219875de457')
+ def test_unlock_server_override(self):
+ server = self.create_test_server(wait_until='ACTIVE')
+ # In order to trigger the unlock:unlock_override policy instead
+ # of the unlock policy, the server must be locked by a different
+ # user than the one who is attempting to unlock it.
+ self.os_admin.servers_client.lock_server(server['id'])
+ self.addCleanup(self.servers_client.unlock_server, server['id'])
+
+ self.rbac_utils.switch_role(self, toggle_rbac_role=True)
+ self.servers_client.unlock_server(server['id'])
diff --git a/patrole_tempest_plugin/tests/api/compute/test_rescue_rbac.py b/patrole_tempest_plugin/tests/api/compute/test_rescue_rbac.py
deleted file mode 100644
index a0a021f..0000000
--- a/patrole_tempest_plugin/tests/api/compute/test_rescue_rbac.py
+++ /dev/null
@@ -1,43 +0,0 @@
-# Copyright 2017 AT&T Corporation.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from tempest.lib import decorators
-from tempest import test
-
-from patrole_tempest_plugin import rbac_rule_validation
-from patrole_tempest_plugin.tests.api.compute import rbac_base
-
-
-class RescueRbacTest(rbac_base.BaseV2ComputeRbacTest):
-
- @classmethod
- def skip_checks(cls):
- super(RescueRbacTest, cls).skip_checks()
- if not test.is_extension_enabled('os-rescue', 'compute'):
- msg = "%s skipped as os-rescue not enabled." % cls.__name__
- raise cls.skipException(msg)
-
- @classmethod
- def resource_setup(cls):
- super(RescueRbacTest, cls).resource_setup()
- cls.server = cls.create_test_server(wait_until='ACTIVE')
-
- @rbac_rule_validation.action(
- service="nova",
- rule="os_compute_api:os-rescue")
- @decorators.idempotent_id('fbbb2afc-ed0e-4552-887d-ac00fb5d436e')
- def test_rescue_server(self):
- self.rbac_utils.switch_role(self, toggle_rbac_role=True)
- self.servers_client.rescue_server(self.server['id'])
diff --git a/patrole_tempest_plugin/tests/api/compute/test_server_diagnostics_rbac.py b/patrole_tempest_plugin/tests/api/compute/test_server_diagnostics_rbac.py
deleted file mode 100644
index f6359b4..0000000
--- a/patrole_tempest_plugin/tests/api/compute/test_server_diagnostics_rbac.py
+++ /dev/null
@@ -1,44 +0,0 @@
-# Copyright 2017 AT&T Corporation.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from tempest.lib import decorators
-from tempest import test
-
-from patrole_tempest_plugin import rbac_rule_validation
-from patrole_tempest_plugin.tests.api.compute import rbac_base
-
-
-class ServerDiagnosticsRbacTest(rbac_base.BaseV2ComputeRbacTest):
-
- @classmethod
- def skip_checks(cls):
- super(ServerDiagnosticsRbacTest, cls).skip_checks()
- if not test.is_extension_enabled('os-server-diagnostics', 'compute'):
- msg = ("%s skipped as os-server-diagnostics not "
- "enabled." % cls.__name__)
- raise cls.skipException(msg)
-
- @classmethod
- def resource_setup(cls):
- super(ServerDiagnosticsRbacTest, cls).resource_setup()
- cls.server = cls.create_test_server(wait_until='ACTIVE')
-
- @rbac_rule_validation.action(
- service="nova",
- rule="os_compute_api:os-server-diagnostics")
- @decorators.idempotent_id('5dabfcc4-bedb-417b-8247-b3ee7c5c0f3e')
- def test_show_server_diagnostics(self):
- self.rbac_utils.switch_role(self, toggle_rbac_role=True)
- self.servers_client.show_server_diagnostics(self.server['id'])
diff --git a/patrole_tempest_plugin/tests/api/compute/test_server_misc_policy_actions_rbac.py b/patrole_tempest_plugin/tests/api/compute/test_server_misc_policy_actions_rbac.py
new file mode 100644
index 0000000..cb826e3
--- /dev/null
+++ b/patrole_tempest_plugin/tests/api/compute/test_server_misc_policy_actions_rbac.py
@@ -0,0 +1,154 @@
+# Copyright 2017 AT&T Corporation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from tempest.common import waiters
+from tempest.lib import decorators
+from tempest.lib import exceptions as lib_exc
+from tempest import test
+
+from patrole_tempest_plugin import rbac_rule_validation
+from patrole_tempest_plugin.tests.api.compute import rbac_base
+
+
+class MiscPolicyActionsRbacTest(rbac_base.BaseV2ComputeRbacTest):
+ """Test multiple policy actions that require a server to be created.
+
+ Minimize the number of servers that need to be created across classes
+ by consolidating test cases related to different policy "families" into
+ one class. This reduces the risk of running into `BuildErrorException`
+ errors being raised due to too many servers being created simultaneously
+ especially when higher concurrency is used.
+
+ Only applies to:
+ * policy "families" that require server creation
+ * small policy "families" -- i.e. containing one to three policies
+ """
+
+ @classmethod
+ def resource_setup(cls):
+ super(MiscPolicyActionsRbacTest, cls).resource_setup()
+ cls.server_id = cls.create_test_server(wait_until='ACTIVE')['id']
+
+ def setUp(self):
+ super(MiscPolicyActionsRbacTest, self).setUp()
+ try:
+ waiters.wait_for_server_status(self.servers_client,
+ self.server_id, 'ACTIVE')
+ except lib_exc.NotFound:
+ # If the server was found to be deleted by a previous test,
+ # a new one is built
+ server = self.create_test_server(wait_until='ACTIVE')
+ self.__class__.server_id = server['id']
+ except Exception:
+ # Rebuilding the server in case something happened during a test
+ self.__class__.server_id = self.rebuild_server(self.server_id)
+
+ @test.requires_ext(extension='os-admin-actions', service='compute')
+ @rbac_rule_validation.action(
+ service="nova",
+ rule="os_compute_api:os-admin-actions:reset_state")
+ @decorators.idempotent_id('ae84dd0b-f364-462e-b565-3457f9c019ef')
+ def test_reset_server_state(self):
+ """Test reset server state, part of os-admin-actions."""
+ self.rbac_utils.switch_role(self, toggle_rbac_role=True)
+ self.servers_client.reset_state(self.server_id, state='error')
+ self.addCleanup(self.servers_client.reset_state, self.server_id,
+ state='active')
+
+ @test.requires_ext(extension='os-admin-actions', service='compute')
+ @rbac_rule_validation.action(
+ service="nova",
+ rule="os_compute_api:os-admin-actions:inject_network_info")
+ @decorators.idempotent_id('ce48c340-51c1-4cff-9b6e-0cc5ef008630')
+ def test_inject_network_info(self):
+ """Test inject network info, part of os-admin-actions."""
+ self.rbac_utils.switch_role(self, toggle_rbac_role=True)
+ self.servers_client.inject_network_info(self.server_id)
+
+ @test.attr(type=['slow'])
+ @test.requires_ext(extension='os-admin-actions', service='compute')
+ @rbac_rule_validation.action(
+ service="nova",
+ rule="os_compute_api:os-admin-actions:reset_network")
+ @decorators.idempotent_id('2911a242-15c4-4fcb-80d5-80a8930661b0')
+ def test_reset_network(self):
+ """Test reset network, part of os-admin-actions."""
+ self.rbac_utils.switch_role(self, toggle_rbac_role=True)
+ self.servers_client.reset_network(self.server_id)
+
+ @test.requires_ext(extension='os-deferred-delete', service='compute')
+ @decorators.idempotent_id('189bfed4-1e6d-475c-bb8c-d57e60895391')
+ @rbac_rule_validation.action(
+ service="nova",
+ rule="os_compute_api:os-deferred-delete")
+ def test_force_delete_server(self):
+ """Test force delete server, part of os-deferred-delete."""
+ self.rbac_utils.switch_role(self, toggle_rbac_role=True)
+ # Force-deleting a server enforces os-deferred-delete.
+ self.servers_client.force_delete_server(self.server_id)
+
+ @test.requires_ext(extension='os-rescue', service='compute')
+ @rbac_rule_validation.action(
+ service="nova",
+ rule="os_compute_api:os-rescue")
+ @decorators.idempotent_id('fbbb2afc-ed0e-4552-887d-ac00fb5d436e')
+ def test_rescue_server(self):
+ """Test rescue server, part of os-rescue."""
+ self.rbac_utils.switch_role(self, toggle_rbac_role=True)
+ self.servers_client.rescue_server(self.server_id)
+
+ @test.requires_ext(extension='os-server-diagnostics', service='compute')
+ @rbac_rule_validation.action(
+ service="nova",
+ rule="os_compute_api:os-server-diagnostics")
+ @decorators.idempotent_id('5dabfcc4-bedb-417b-8247-b3ee7c5c0f3e')
+ def test_show_server_diagnostics(self):
+ """Test show server diagnostics, part of os-server-diagnostics."""
+ self.rbac_utils.switch_role(self, toggle_rbac_role=True)
+ self.servers_client.show_server_diagnostics(self.server_id)
+
+ @test.requires_ext(extension='os-server-password', service='compute')
+ @decorators.idempotent_id('aaf43f78-c178-4581-ac18-14afd3f1f6ba')
+ @rbac_rule_validation.action(
+ service="nova",
+ rule="os_compute_api:os-server-password")
+ def test_delete_server_password(self):
+ """Test delete server password, part of os-server-password."""
+ self.rbac_utils.switch_role(self, toggle_rbac_role=True)
+ self.servers_client.delete_password(self.server_id)
+
+ @test.requires_ext(extension='os-server-password', service='compute')
+ @rbac_rule_validation.action(
+ service="nova",
+ rule="os_compute_api:os-server-password")
+ @decorators.idempotent_id('f677971a-7d20-493c-977f-6ff0a74b5b2c')
+ def test_get_server_password(self):
+ """Test show server password, part of os-server-password."""
+ self.rbac_utils.switch_role(self, toggle_rbac_role=True)
+ self.servers_client.show_password(self.server_id)
+
+ @test.requires_ext(extension='OS-SRV-USG', service='compute')
+ @rbac_rule_validation.action(
+ service="nova",
+ rule="os_compute_api:os-server-usage")
+ @decorators.idempotent_id('f0437ead-b9fb-462a-9f3d-ce53fac9d57a')
+ def test_show_server_usage(self):
+ """Test show server usage, part of os-server-usage.
+
+ TODO(felipemonteiro): Once multiple policy test is supported, this
+ test can be combined with the generic test for showing a server.
+ """
+ self.rbac_utils.switch_role(self, toggle_rbac_role=True)
+ self.servers_client.show_server(self.server_id)
diff --git a/patrole_tempest_plugin/tests/api/compute/test_server_password_rbac.py b/patrole_tempest_plugin/tests/api/compute/test_server_password_rbac.py
deleted file mode 100644
index 7826268..0000000
--- a/patrole_tempest_plugin/tests/api/compute/test_server_password_rbac.py
+++ /dev/null
@@ -1,52 +0,0 @@
-# Copyright 2017 AT&T Corporation.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from tempest.lib import decorators
-from tempest import test
-
-from patrole_tempest_plugin import rbac_rule_validation
-from patrole_tempest_plugin.tests.api.compute import rbac_base
-
-
-class ServerPasswordRbacTest(rbac_base.BaseV2ComputeRbacTest):
-
- @classmethod
- def skip_checks(cls):
- super(ServerPasswordRbacTest, cls).skip_checks()
- if not test.is_extension_enabled('os-server-password', 'compute'):
- msg = "%s skipped as os-server-password extension not enabled." \
- % cls.__name__
- raise cls.skipException(msg)
-
- @classmethod
- def resource_setup(cls):
- super(ServerPasswordRbacTest, cls).resource_setup()
- cls.server = cls.create_test_server(wait_until="ACTIVE")
-
- @decorators.idempotent_id('aaf43f78-c178-4581-ac18-14afd3f1f6ba')
- @rbac_rule_validation.action(
- service="nova",
- rule="os_compute_api:os-server-password")
- def test_delete_server_password(self):
- self.rbac_utils.switch_role(self, toggle_rbac_role=True)
- self.servers_client.delete_password(self.server['id'])
-
- @rbac_rule_validation.action(
- service="nova",
- rule="os_compute_api:os-server-password")
- @decorators.idempotent_id('f677971a-7d20-493c-977f-6ff0a74b5b2c')
- def test_get_server_password(self):
- self.rbac_utils.switch_role(self, toggle_rbac_role=True)
- self.servers_client.show_password(self.server['id'])
diff --git a/patrole_tempest_plugin/tests/api/compute/test_server_usage_rbac.py b/patrole_tempest_plugin/tests/api/compute/test_server_usage_rbac.py
deleted file mode 100644
index 777af29..0000000
--- a/patrole_tempest_plugin/tests/api/compute/test_server_usage_rbac.py
+++ /dev/null
@@ -1,43 +0,0 @@
-# Copyright 2017 AT&T Corporation.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from tempest.lib import decorators
-from tempest import test
-
-from patrole_tempest_plugin import rbac_rule_validation
-from patrole_tempest_plugin.tests.api.compute import rbac_base
-
-
-class ServerUsageRbacTest(rbac_base.BaseV2ComputeRbacTest):
-
- @classmethod
- def skip_checks(cls):
- super(ServerUsageRbacTest, cls).skip_checks()
- if not test.is_extension_enabled('OS-SRV-USG', 'compute'):
- msg = "%s skipped as OS-SRV-USG not enabled." % cls.__name__
- raise cls.skipException(msg)
-
- @classmethod
- def resource_setup(cls):
- super(ServerUsageRbacTest, cls).resource_setup()
- cls.server = cls.create_test_server(wait_until='ACTIVE')
-
- @rbac_rule_validation.action(
- service="nova",
- rule="os_compute_api:os-server-usage")
- @decorators.idempotent_id('f0437ead-b9fb-462a-9f3d-ce53fac9d57a')
- def test_show_server_diagnostics(self):
- self.rbac_utils.switch_role(self, toggle_rbac_role=True)
- self.servers_client.show_server(self.server['id'])
diff --git a/patrole_tempest_plugin/tests/api/identity/rbac_base.py b/patrole_tempest_plugin/tests/api/identity/rbac_base.py
index 6cb3d2f..a053614 100644
--- a/patrole_tempest_plugin/tests/api/identity/rbac_base.py
+++ b/patrole_tempest_plugin/tests/api/identity/rbac_base.py
@@ -183,6 +183,7 @@
def resource_setup(cls):
super(BaseIdentityV2AdminRbacTest, cls).resource_setup()
cls.tenants = []
+ cls.tokens = []
@classmethod
def resource_cleanup(cls):
@@ -190,6 +191,10 @@
test_utils.call_and_ignore_notfound_exc(
cls.tenants_client.delete_tenant, tenant['id'])
+ for token in cls.tokens:
+ test_utils.call_and_ignore_notfound_exc(
+ cls.client.delete_token, token)
+
super(BaseIdentityV2AdminRbacTest, cls).resource_cleanup()
@classmethod
@@ -203,6 +208,15 @@
cls.tenants.append(tenant)
return tenant
+ @classmethod
+ def setup_test_token(cls, user_name, password, tenant_name):
+ """Set up a test token."""
+ token = cls.token_client.auth(user_name, password,
+ tenant_name)['token']
+ token_id = token['id']
+ cls.tokens.append(token_id)
+ return token_id
+
class BaseIdentityV3RbacTest(BaseIdentityRbacTest):
diff --git a/patrole_tempest_plugin/tests/api/identity/v2/test_tokens_rbac.py b/patrole_tempest_plugin/tests/api/identity/v2/test_tokens_rbac.py
new file mode 100644
index 0000000..25150dd
--- /dev/null
+++ b/patrole_tempest_plugin/tests/api/identity/v2/test_tokens_rbac.py
@@ -0,0 +1,72 @@
+# Copyright 2017 AT&T Corporation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from tempest.lib import decorators
+
+from patrole_tempest_plugin import rbac_rule_validation
+from patrole_tempest_plugin.tests.api.identity import rbac_base
+
+
+class IdentityTokenV2RbacTest(rbac_base.BaseIdentityV2AdminRbacTest):
+
+ def _create_token(self):
+
+ user_name = self.client.auth_provider.credentials.username
+ tenant_name = self.client.auth_provider.credentials.tenant_name
+ password = self.client.auth_provider.credentials.password
+ token_id = self.setup_test_token(user_name, password,
+ tenant_name)
+ return token_id
+
+ @rbac_rule_validation.action(service="keystone",
+ rule="identity:validate_token")
+ @decorators.idempotent_id('71471202-4c4e-4a3d-9d41-57eb621bf3bb')
+ def test_validate_token(self):
+
+ """Validate token (get-token)
+
+ RBAC test for Identity v2 get token validation
+ """
+ token_id = self._create_token()
+
+ self.rbac_utils.switch_role(self, toggle_rbac_role=True)
+ self.client.show_token(token_id)
+
+ @rbac_rule_validation.action(service="keystone",
+ rule="identity:revoke_token")
+ @decorators.idempotent_id('77338f66-0713-4f20-b11c-b0d750618276')
+ def test_revoke_token(self):
+
+ """Revoke token (delete-token)
+
+ RBAC test for Identity v2 delete token
+ """
+ token_id = self._create_token()
+
+ self.rbac_utils.switch_role(self, toggle_rbac_role=True)
+ self.client.delete_token(token_id)
+
+ @rbac_rule_validation.action(service="keystone",
+ rule="identity:validate_token_head")
+ @decorators.idempotent_id('71471202-4c4e-4a3d-9d41-57eb621bf3ba')
+ def test_check_token_existence(self):
+
+ """Validate Token head
+
+ RBAC test for Identity v2 token head validation
+ """
+ token_id = self._create_token()
+ self.rbac_utils.switch_role(self, toggle_rbac_role=True)
+ self.client.check_token_existence(token_id)
diff --git a/patrole_tempest_plugin/tests/api/network/test_agents_rbac.py b/patrole_tempest_plugin/tests/api/network/test_agents_rbac.py
new file mode 100644
index 0000000..506dd5b
--- /dev/null
+++ b/patrole_tempest_plugin/tests/api/network/test_agents_rbac.py
@@ -0,0 +1,237 @@
+# Copyright 2017 AT&T Corporation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from tempest.lib.common.utils import test_utils
+from tempest.lib import decorators
+from tempest import test
+
+from patrole_tempest_plugin import rbac_rule_validation
+from patrole_tempest_plugin.tests.api.network import rbac_base as base
+
+
+class AgentsRbacTest(base.BaseNetworkRbacTest):
+
+ @classmethod
+ def skip_checks(cls):
+ super(AgentsRbacTest, cls).skip_checks()
+ if not test.is_extension_enabled('agent', 'network'):
+ msg = "agent extension not enabled."
+ raise cls.skipException(msg)
+
+ @classmethod
+ def resource_setup(cls):
+ super(AgentsRbacTest, cls).resource_setup()
+ agents = cls.agents_client.list_agents()['agents']
+ cls.agent = agents[0]
+
+ @decorators.idempotent_id('f88e38e0-ab52-4b97-8ffa-48a27f9d199b')
+ @rbac_rule_validation.action(service="neutron",
+ rule="get_agent",
+ expected_error_code=404)
+ def test_show_agent(self):
+ """Show agent test.
+
+ RBAC test for the neutron get_agent policy
+ """
+ self.rbac_utils.switch_role(self, toggle_rbac_role=True)
+ self.agents_client.show_agent(self.agent['id'])
+
+ @decorators.idempotent_id('8ca68fdb-eaf6-4880-af82-ba0982949dec')
+ @rbac_rule_validation.action(service="neutron",
+ rule="update_agent",
+ expected_error_code=404)
+ def test_update_agent(self):
+ """Update agent test.
+
+ RBAC test for the neutron update_agent policy
+ """
+ original_status = self.agent['admin_state_up']
+ agent_status = {'admin_state_up': original_status}
+
+ self.rbac_utils.switch_role(self, toggle_rbac_role=True)
+ self.agents_client.update_agent(agent_id=self.agent['id'],
+ agent=agent_status)
+
+
+class L3AgentSchedulerRbacTest(base.BaseNetworkRbacTest):
+
+ @classmethod
+ def skip_checks(cls):
+ super(L3AgentSchedulerRbacTest, cls).skip_checks()
+ if not test.is_extension_enabled('l3_agent_scheduler', 'network'):
+ msg = "l3_agent_scheduler extension not enabled."
+ raise cls.skipException(msg)
+
+ @classmethod
+ def resource_setup(cls):
+ super(L3AgentSchedulerRbacTest, cls).resource_setup()
+ cls.router = cls.create_router()
+ cls.agent = None
+
+ def setUp(self):
+ super(L3AgentSchedulerRbacTest, self).setUp()
+ if self.agent is not None:
+ return
+
+ # Find an agent and validate that it is correct.
+ agents = self.agents_client.list_agents()['agents']
+ agent = {'agent_type': None}
+ for a in agents:
+ if a['agent_type'] == 'L3 agent':
+ agent = a
+ break
+ self.assertEqual(agent['agent_type'], 'L3 agent', 'Could not find '
+ 'L3 agent in agent list though l3_agent_scheduler '
+ 'is enabled.')
+ self.agent = agent
+
+ @decorators.idempotent_id('5d2bbdbc-40a5-43d2-828a-84dc93fcc453')
+ @rbac_rule_validation.action(service="neutron",
+ rule="get_l3-routers")
+ def test_list_routers_on_l3_agent(self):
+ """List routers on L3 agent test.
+
+ RBAC test for the neutron get_l3-routers policy
+ """
+ self.rbac_utils.switch_role(self, toggle_rbac_role=True)
+ self.agents_client.list_routers_on_l3_agent(self.agent['id'])
+
+ @decorators.idempotent_id('466b2a10-8747-4c09-855a-bd90a1c86ce7')
+ @rbac_rule_validation.action(service="neutron",
+ rule="create_l3-router")
+ def test_create_router_on_l3_agent(self):
+ """Create router on L3 agent test.
+
+ RBAC test for the neutron create_l3-router policy
+ """
+ self.rbac_utils.switch_role(self, toggle_rbac_role=True)
+ self.agents_client.create_router_on_l3_agent(
+ self.agent['id'], router_id=self.router['id'])
+ self.addCleanup(
+ test_utils.call_and_ignore_notfound_exc,
+ self.agents_client.delete_router_from_l3_agent,
+ self.agent['id'], router_id=self.router['id'])
+
+ @decorators.idempotent_id('8138cfc9-3e48-4a34-adf6-894077aa1be4')
+ @rbac_rule_validation.action(service="neutron",
+ rule="delete_l3-router")
+ def test_delete_router_from_l3_agent(self):
+ """Delete router from L3 agent test.
+
+ RBAC test for the neutron delete_l3-router policy
+ """
+ self.agents_client.create_router_on_l3_agent(
+ self.agent['id'], router_id=self.router['id'])
+ self.addCleanup(
+ test_utils.call_and_ignore_notfound_exc,
+ self.agents_client.delete_router_from_l3_agent,
+ self.agent['id'], router_id=self.router['id'])
+
+ self.rbac_utils.switch_role(self, toggle_rbac_role=True)
+ self.agents_client.delete_router_from_l3_agent(
+ self.agent['id'], router_id=self.router['id'])
+
+
+class DHCPAgentSchedulersRbacTest(base.BaseNetworkRbacTest):
+
+ @classmethod
+ def skip_checks(cls):
+ super(DHCPAgentSchedulersRbacTest, cls).skip_checks()
+ if not test.is_extension_enabled('dhcp_agent_scheduler', 'network'):
+ msg = "dhcp_agent_scheduler extension not enabled."
+ raise cls.skipException(msg)
+
+ @classmethod
+ def resource_setup(cls):
+ super(DHCPAgentSchedulersRbacTest, cls).resource_setup()
+ cls.agent = None
+
+ def setUp(self):
+ super(DHCPAgentSchedulersRbacTest, self).setUp()
+ if self.agent is not None:
+ return
+
+ # Find a DHCP agent and validate that it is correct.
+ agents = self.agents_client.list_agents()['agents']
+ agent = {'agent_type': None}
+ for a in agents:
+ if a['agent_type'] == 'DHCP agent':
+ agent = a
+ break
+ self.assertEqual(agent['agent_type'], 'DHCP agent', 'Could not find '
+ 'DHCP agent in agent list though dhcp_agent_scheduler'
+ ' is enabled.')
+ self.agent = agent
+
+ def _create_and_prepare_network_for_agent(self, agent_id):
+ """Create network and ensure it is not hosted by agent_id."""
+ network_id = self.create_network()['id']
+
+ if self._check_network_in_dhcp_agent(network_id, agent_id):
+ self.agents_client.delete_network_from_dhcp_agent(
+ agent_id=agent_id, network_id=network_id)
+
+ return network_id
+
+ def _check_network_in_dhcp_agent(self, network_id, agent_id):
+ networks = self.agents_client.list_networks_hosted_by_one_dhcp_agent(
+ agent_id)['networks'] or []
+ return network_id in [network['id'] for network in networks]
+
+ @decorators.idempotent_id('dc84087b-4c2a-4878-8ed0-40370e19da17')
+ @rbac_rule_validation.action(service="neutron",
+ rule="get_dhcp-networks")
+ def test_list_networks_hosted_by_one_dhcp_agent(self):
+ """List networks hosted by one DHCP agent test.
+
+ RBAC test for the neutron get_dhcp-networks policy
+ """
+ self.rbac_utils.switch_role(self, toggle_rbac_role=True)
+ self.agents_client.list_networks_hosted_by_one_dhcp_agent(
+ self.agent['id'])
+
+ @decorators.idempotent_id('14e014ac-f355-46d3-b6d8-98f2c9ec1610')
+ @rbac_rule_validation.action(service="neutron",
+ rule="create_dhcp-network")
+ def test_add_dhcp_agent_to_network(self):
+ """Add DHCP agent to network test.
+
+ RBAC test for the neutron create_dhcp-network policy
+ """
+ network_id = self._create_and_prepare_network_for_agent(
+ self.agent['id'])
+
+ self.rbac_utils.switch_role(self, toggle_rbac_role=True)
+ self.agents_client.add_dhcp_agent_to_network(
+ self.agent['id'], network_id=network_id)
+ # Clean up is not necessary and might result in 409 being raised.
+
+ @decorators.idempotent_id('937a4302-4b49-407d-9980-5843d7badc38')
+ @rbac_rule_validation.action(service="neutron",
+ rule="delete_dhcp-network")
+ def test_delete_network_from_dhcp_agent(self):
+ """Delete DHCP agent from network test.
+
+ RBAC test for the neutron delete_dhcp-network policy
+ """
+ network_id = self._create_and_prepare_network_for_agent(
+ self.agent['id'])
+ self.agents_client.add_dhcp_agent_to_network(
+ self.agent['id'], network_id=network_id)
+ # Clean up is not necessary and might result in 409 being raised.
+
+ self.rbac_utils.switch_role(self, toggle_rbac_role=True)
+ self.agents_client.delete_network_from_dhcp_agent(
+ self.agent['id'], network_id=network_id)
diff --git a/patrole_tempest_plugin/tests/api/network/test_routers_rbac.py b/patrole_tempest_plugin/tests/api/network/test_routers_rbac.py
index 1be46ab..6aec5d1 100644
--- a/patrole_tempest_plugin/tests/api/network/test_routers_rbac.py
+++ b/patrole_tempest_plugin/tests/api/network/test_routers_rbac.py
@@ -15,21 +15,16 @@
import netaddr
-from oslo_log import log
-
from tempest.common.utils import net_utils
-from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib.common.utils import test_utils
from tempest.lib import decorators
from tempest import test
+from patrole_tempest_plugin import rbac_exceptions
from patrole_tempest_plugin import rbac_rule_validation
from patrole_tempest_plugin.tests.api.network import rbac_base as base
-CONF = config.CONF
-LOG = log.getLogger(__name__)
-
class RouterRbacTest(base.BaseNetworkRbacTest):
@classmethod
@@ -42,8 +37,9 @@
@classmethod
def resource_setup(cls):
super(RouterRbacTest, cls).resource_setup()
- post_body = {}
- post_body['router:external'] = True
+ # Create a network with external gateway so that
+ # ``external_gateway_info`` policies can be validated.
+ post_body = {'router:external': True}
cls.network = cls.create_network(**post_body)
cls.subnet = cls.create_subnet(cls.network)
cls.ip_range = netaddr.IPRange(
@@ -51,6 +47,14 @@
cls.subnet['allocation_pools'][0]['end'])
cls.router = cls.create_router()
+ def _get_unused_ip_address(self):
+ unused_ip = net_utils.get_unused_ip_addresses(self.ports_client,
+ self.subnets_client,
+ self.network['id'],
+ self.subnet['id'],
+ 1)
+ return unused_ip[0]
+
@rbac_rule_validation.action(service="neutron",
rule="create_router")
@decorators.idempotent_id('acc5005c-bdb6-4192-bc9f-ece9035bb488')
@@ -64,6 +68,35 @@
self.addCleanup(self.routers_client.delete_router,
router['router']['id'])
+ @decorators.idempotent_id('6139eb97-95c0-40d8-a109-99de11ab2e5e')
+ @test.requires_ext(extension='l3-ha', service='network')
+ @rbac_rule_validation.action(service="neutron",
+ rule="create_router:ha")
+ def test_create_high_availability_router(self):
+ """Create high-availability router
+
+ RBAC test for the neutron create_router:ha policy
+ """
+ self.rbac_utils.switch_role(self, toggle_rbac_role=True)
+ router = self.routers_client.create_router(ha=True)
+ self.addCleanup(self.routers_client.delete_router,
+ router['router']['id'])
+
+ @decorators.idempotent_id('c6254ca6-2728-412d-803d-d4aa3935e56d')
+ @test.requires_ext(extension='dvr', service='network')
+ @rbac_rule_validation.action(service="neutron",
+ rule="create_router:distributed")
+ def test_create_distributed_router(self):
+ """Create distributed router
+
+ RBAC test for the neutron create_router:distributed policy
+ """
+ self.rbac_utils.switch_role(self, toggle_rbac_role=True)
+ router = self.routers_client.create_router(distributed=True)
+ self.addCleanup(self.routers_client.delete_router,
+ router['router']['id'])
+
+ @test.requires_ext(extension='ext-gw-mode', service='network')
@rbac_rule_validation.action(
service="neutron",
rule="create_router:external_gateway_info:enable_snat")
@@ -96,16 +129,11 @@
"""
name = data_utils.rand_name(self.__class__.__name__ + '-snat-router')
- # Pick an unused IP address.
- ip_list = net_utils.get_unused_ip_addresses(self.ports_client,
- self.subnets_client,
- self.network['id'],
- self.subnet['id'],
- 1)
+ unused_ip = self._get_unused_ip_address()
external_fixed_ips = {'subnet_id': self.subnet['id'],
- 'ip_address': ip_list[0]}
+ 'ip_address': unused_ip}
external_gateway_info = {'network_id': self.network['id'],
- 'enable_snat': False,
+ 'enable_snat': False, # Default is True.
'external_fixed_ips': [external_fixed_ips]}
self.rbac_utils.switch_role(self, toggle_rbac_role=True)
@@ -124,7 +152,29 @@
RBAC test for the neutron get_router policy
"""
self.rbac_utils.switch_role(self, toggle_rbac_role=True)
- self.routers_client.show_router(self.router['id'])
+ # Prevent other policies from being enforced by using barebones fields.
+ self.routers_client.show_router(self.router['id'], fields=['id'])
+
+ @decorators.idempotent_id('3ed26ea2-b419-410c-b4b5-576c1edafa06')
+ @test.requires_ext(extension='dvr', service='network')
+ @rbac_rule_validation.action(service="neutron",
+ rule="get_router:distributed")
+ def test_show_distributed_router(self):
+ """Get distributed router
+
+ RBAC test for the neutron get_router:distributed policy
+ """
+ router = self.routers_client.create_router(distributed=True)['router']
+ self.addCleanup(self.routers_client.delete_router, router['id'])
+
+ self.rbac_utils.switch_role(self, toggle_rbac_role=True)
+ retrieved_fields = self.routers_client.show_router(
+ router['id'], fields=['distributed'])['router']
+
+ # Rather than throwing a 403, the field is not present, so raise exc.
+ if 'distributed' not in retrieved_fields:
+ raise rbac_exceptions.RbacActionFailed(
+ '"distributed" parameter not present in response body')
@rbac_rule_validation.action(
service="neutron", rule="update_router")
@@ -134,11 +184,10 @@
RBAC test for the neutron update_router policy
"""
- new_name = data_utils.rand_name(self.__class__.__name__ +
- '-new-router-name')
+ new_name = data_utils.rand_name(
+ self.__class__.__name__ + '-new-router-name')
self.rbac_utils.switch_role(self, toggle_rbac_role=True)
- self.routers_client.update_router(self.router['id'],
- name=new_name)
+ self.routers_client.update_router(self.router['id'], name=new_name)
@rbac_rule_validation.action(
service="neutron", rule="update_router:external_gateway_info")
@@ -172,6 +221,7 @@
self.router['id'],
external_gateway_info=None)
+ @test.requires_ext(extension='ext-gw-mode', service='network')
@rbac_rule_validation.action(
service="neutron",
rule="update_router:external_gateway_info:enable_snat")
@@ -202,14 +252,9 @@
RBAC test for the neutron
update_router:external_gateway_info:external_fixed_ips policy
"""
- # Pick an unused IP address.
- ip_list = net_utils.get_unused_ip_addresses(self.ports_client,
- self.subnets_client,
- self.network['id'],
- self.subnet['id'],
- 1)
+ unused_ip = self._get_unused_ip_address()
external_fixed_ips = {'subnet_id': self.subnet['id'],
- 'ip_address': ip_list[0]}
+ 'ip_address': unused_ip}
external_gateway_info = {'network_id': self.network['id'],
'external_fixed_ips': [external_fixed_ips]}
@@ -222,6 +267,34 @@
self.router['id'],
external_gateway_info=None)
+ @decorators.idempotent_id('ddc20731-dea1-4321-9abf-8772bf0b5977')
+ @test.requires_ext(extension='l3-ha', service='network')
+ @rbac_rule_validation.action(service="neutron",
+ rule="update_router:ha")
+ def test_update_high_availability_router(self):
+ """Update high-availability router
+
+ RBAC test for the neutron update_router:ha policy
+ """
+ self.rbac_utils.switch_role(self, toggle_rbac_role=True)
+ self.routers_client.update_router(self.router['id'], ha=True)
+ self.addCleanup(self.routers_client.update_router, self.router['id'],
+ ha=False)
+
+ @decorators.idempotent_id('e1932c19-8f73-41cd-b5d2-84c7ae5d530c')
+ @test.requires_ext(extension='dvr', service='network')
+ @rbac_rule_validation.action(service="neutron",
+ rule="update_router:distributed")
+ def test_update_distributed_router(self):
+ """Update distributed router
+
+ RBAC test for the neutron update_router:distributed policy
+ """
+ self.rbac_utils.switch_role(self, toggle_rbac_role=True)
+ self.routers_client.update_router(self.router['id'], distributed=True)
+ self.addCleanup(self.routers_client.update_router, self.router['id'],
+ distributed=False)
+
@rbac_rule_validation.action(service="neutron",
rule="delete_router",
expected_error_code=404)
diff --git a/patrole_tempest_plugin/tests/api/network/test_subnets_rbac.py b/patrole_tempest_plugin/tests/api/network/test_subnets_rbac.py
new file mode 100644
index 0000000..68f5156
--- /dev/null
+++ b/patrole_tempest_plugin/tests/api/network/test_subnets_rbac.py
@@ -0,0 +1,96 @@
+# Copyright 2017 AT&T Corporation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from tempest.lib.common.utils import data_utils
+from tempest.lib import decorators
+from tempest import test
+
+from patrole_tempest_plugin import rbac_rule_validation
+from patrole_tempest_plugin.tests.api.network import rbac_base as base
+
+
+class SubnetsRbacTest(base.BaseNetworkRbacTest):
+
+ @classmethod
+ def skip_checks(cls):
+ super(SubnetsRbacTest, cls).skip_checks()
+ if not test.is_extension_enabled('subnet_allocation', 'network'):
+ msg = "subnet_allocation extension not enabled."
+ raise cls.skipException(msg)
+
+ @classmethod
+ def resource_setup(cls):
+ super(SubnetsRbacTest, cls).resource_setup()
+ cls.network = cls.create_network()
+ cls.subnet = cls.create_subnet(cls.network)
+
+ @decorators.idempotent_id('0481adeb-4301-44d5-851c-35910cc18a6b')
+ @rbac_rule_validation.action(service="neutron",
+ rule="create_subnet")
+ def test_create_subnet(self):
+ """Create subnet.
+
+ RBAC test for the neutron "create_subnet" policy
+ """
+ self.rbac_utils.switch_role(self, toggle_rbac_role=True)
+ self.create_subnet(self.network)
+
+ @decorators.idempotent_id('c02618e7-bb20-4abd-83c8-6eec2af08752')
+ @rbac_rule_validation.action(service="neutron",
+ rule="get_subnet")
+ def test_show_subnet(self):
+ """Show subnet.
+
+ RBAC test for the neutron "get_subnet" policy
+ """
+ self.rbac_utils.switch_role(self, toggle_rbac_role=True)
+ self.subnets_client.show_subnet(self.subnet['id'])
+
+ @decorators.idempotent_id('e2ddc415-5cab-43f4-9b61-166aed65d637')
+ @rbac_rule_validation.action(service="neutron",
+ rule="get_subnet")
+ def test_list_subnets(self):
+ """List subnets.
+
+ RBAC test for the neutron "get_subnet" policy
+ """
+ self.rbac_utils.switch_role(self, toggle_rbac_role=True)
+ self.subnets_client.list_subnets()
+
+ @decorators.idempotent_id('f36cd821-dd22-4bd0-b43d-110fc4b553eb')
+ @rbac_rule_validation.action(service="neutron",
+ rule="update_subnet")
+ def test_update_subnet(self):
+ """Update subnet.
+
+ RBAC test for the neutron "update_subnet" policy
+ """
+ update_name = data_utils.rand_name(self.__class__.__name__ + '-Subnet')
+
+ self.rbac_utils.switch_role(self, toggle_rbac_role=True)
+ self.subnets_client.update_subnet(self.subnet['id'], name=update_name)
+
+ @decorators.idempotent_id('bcfc7153-bbd1-43a4-a908-b3e1b0cde0dc')
+ @rbac_rule_validation.action(service="neutron",
+ rule="delete_subnet")
+ def test_delete_subnet(self):
+ """Delete subnet.
+
+ RBAC test for the neutron "delete_subnet" policy
+ """
+ subnet = self.create_subnet(self.network)
+
+ self.rbac_utils.switch_role(self, toggle_rbac_role=True)
+ self.subnets_client.delete_subnet(subnet['id'])
diff --git a/patrole_tempest_plugin/tests/api/volume/test_volume_actions_rbac.py b/patrole_tempest_plugin/tests/api/volume/test_volume_actions_rbac.py
index f27b8a8..7916c8c 100644
--- a/patrole_tempest_plugin/tests/api/volume/test_volume_actions_rbac.py
+++ b/patrole_tempest_plugin/tests/api/volume/test_volume_actions_rbac.py
@@ -29,13 +29,10 @@
class VolumesActionsRbacTest(rbac_base.BaseVolumeRbacTest):
- # TODO(felipemonteiro): "volume_extension:volume_actions:upload_public"
- # test can be created once volumes v3 client is created in Tempest.
-
@classmethod
def setup_clients(cls):
super(VolumesActionsRbacTest, cls).setup_clients()
- cls.image_client = cls.os_primary.image_client_v2
+ cls.admin_image_client = cls.os_admin.image_client_v2
@classmethod
def resource_setup(cls):
@@ -87,6 +84,7 @@
self.rbac_utils.switch_role(self, toggle_rbac_role=True)
self._detach_volume()
+ @test.attr(type=["slow"])
@test.services('image')
@rbac_rule_validation.action(
service="cinder",
@@ -104,9 +102,10 @@
disk_format=CONF.volume.disk_format)['os-volume_upload_image']
image_id = body["image_id"]
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
- self.image_client.delete_image,
+ self.admin_image_client.delete_image,
image_id)
- waiters.wait_for_image_status(self.image_client, image_id, 'active')
+ waiters.wait_for_image_status(self.admin_image_client, image_id,
+ 'active')
waiters.wait_for_volume_resource_status(self.os_admin.volumes_client,
self.volume['id'], 'available')
@@ -210,6 +209,41 @@
_api_version = 3
+class VolumesActionsV310RbacTest(rbac_base.BaseVolumeRbacTest):
+ _api_version = 3
+ min_microversion = '3.10'
+ max_microversion = 'latest'
+
+ @classmethod
+ def setup_clients(cls):
+ super(VolumesActionsV310RbacTest, cls).setup_clients()
+ cls.admin_image_client = cls.os_admin.image_client_v2
+
+ @test.attr(type=["slow"])
+ @test.services('image')
+ @rbac_rule_validation.action(
+ service="cinder",
+ rule="volume_extension:volume_actions:upload_public")
+ @decorators.idempotent_id('578a84dd-a6bd-4f97-a418-4a0c3c272c08')
+ def test_volume_upload_public(self):
+ # This also enforces "volume_extension:volume_actions:upload_image".
+ volume = self.create_volume()
+ image_name = data_utils.rand_name(self.__class__.__name__ + '-Image')
+
+ self.rbac_utils.switch_role(self, toggle_rbac_role=True)
+ body = self.volumes_client.upload_volume(
+ volume['id'], image_name=image_name, visibility="public",
+ disk_format=CONF.volume.disk_format)['os-volume_upload_image']
+ image_id = body["image_id"]
+ self.addCleanup(test_utils.call_and_ignore_notfound_exc,
+ self.admin_image_client.delete_image,
+ image_id)
+ waiters.wait_for_image_status(self.admin_image_client, image_id,
+ 'active')
+ waiters.wait_for_volume_resource_status(self.os_admin.volumes_client,
+ volume['id'], 'available')
+
+
class VolumesActionsV312RbacTest(rbac_base.BaseVolumeRbacTest):
_api_version = 3
min_microversion = '3.12'
diff --git a/patrole_tempest_plugin/tests/api/volume/test_volume_types_access_rbac.py b/patrole_tempest_plugin/tests/api/volume/test_volume_types_access_rbac.py
new file mode 100644
index 0000000..8fd68a3
--- /dev/null
+++ b/patrole_tempest_plugin/tests/api/volume/test_volume_types_access_rbac.py
@@ -0,0 +1,81 @@
+# Copyright 2017 AT&T Corporation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from tempest.lib.common.utils import test_utils
+from tempest.lib import decorators
+from tempest import test
+
+from patrole_tempest_plugin import rbac_rule_validation
+from patrole_tempest_plugin.tests.api.volume import rbac_base
+
+
+class VolumeTypesAccessRbacTest(rbac_base.BaseVolumeRbacTest):
+ _api_version = 3
+
+ @classmethod
+ def skip_checks(cls):
+ super(VolumeTypesAccessRbacTest, cls).skip_checks()
+ if not test.is_extension_enabled('os-volume-type-access', 'volume'):
+ msg = "os-volume-type-access extension not enabled."
+ raise cls.skipException(msg)
+
+ @classmethod
+ def resource_setup(cls):
+ super(VolumeTypesAccessRbacTest, cls).resource_setup()
+ cls.vol_type = cls.create_volume_type(
+ **{'os-volume-type-access:is_public': False})
+ cls.project_id = cls.auth_provider.credentials.project_id
+
+ def _add_type_access(self, ignore_not_found=False):
+ self.volume_types_client.add_type_access(
+ self.vol_type['id'], project=self.project_id)
+
+ if ignore_not_found:
+ self.addCleanup(test_utils.call_and_ignore_notfound_exc,
+ self.volume_types_client.remove_type_access,
+ self.vol_type['id'], project=self.project_id)
+ else:
+ self.addCleanup(self.volume_types_client.remove_type_access,
+ self.vol_type['id'], project=self.project_id)
+
+ @decorators.idempotent_id('af70e6ad-e931-419f-9200-8bcc284e4e47')
+ @rbac_rule_validation.action(
+ service="cinder",
+ rule="volume_extension:volume_type_access")
+ def test_list_type_access(self):
+ self._add_type_access()
+
+ self.rbac_utils.switch_role(self, toggle_rbac_role=True)
+ self.volume_types_client.list_type_access(self.vol_type['id'])[
+ 'volume_type_access']
+
+ @decorators.idempotent_id('b462eeba-45d0-4d6e-945a-a1d27708d367')
+ @rbac_rule_validation.action(
+ service="cinder",
+ rule="volume_extension:volume_type_access:addProjectAccess")
+ def test_add_type_access(self):
+ self.rbac_utils.switch_role(self, toggle_rbac_role=True)
+ self._add_type_access(ignore_not_found=True)
+
+ @decorators.idempotent_id('8f848aeb-636a-46f1-aeeb-e2a60e9d2bfe')
+ @rbac_rule_validation.action(
+ service="cinder",
+ rule="volume_extension:volume_type_access:removeProjectAccess")
+ def test_remove_type_access(self):
+ self._add_type_access(ignore_not_found=True)
+
+ self.rbac_utils.switch_role(self, toggle_rbac_role=True)
+ self.volume_types_client.remove_type_access(
+ self.vol_type['id'], project=self.project_id)
diff --git a/patrole_tempest_plugin/tests/api/volume/test_volume_types_extra_specs_rbac.py b/patrole_tempest_plugin/tests/api/volume/test_volume_types_extra_specs_rbac.py
index 94199b5..97eaab7 100644
--- a/patrole_tempest_plugin/tests/api/volume/test_volume_types_extra_specs_rbac.py
+++ b/patrole_tempest_plugin/tests/api/volume/test_volume_types_extra_specs_rbac.py
@@ -13,21 +13,88 @@
# License for the specific language governing permissions and limitations
# under the License.
+from tempest.lib.common.utils import data_utils
+from tempest.lib.common.utils import test_utils
from tempest.lib import decorators
+from tempest import test
from patrole_tempest_plugin import rbac_rule_validation
from patrole_tempest_plugin.tests.api.volume import rbac_base
class VolumeTypesExtraSpecsRbacTest(rbac_base.BaseVolumeRbacTest):
+ _api_version = 3
+
+ @classmethod
+ def skip_checks(cls):
+ super(VolumeTypesExtraSpecsRbacTest, cls).skip_checks()
+ if not test.is_extension_enabled('os-types-extra-specs', 'volume'):
+ msg = "os-types-extra-specs extension not enabled."
+ raise cls.skipException(msg)
+
+ @classmethod
+ def resource_setup(cls):
+ super(VolumeTypesExtraSpecsRbacTest, cls).resource_setup()
+ cls.vol_type = cls.create_volume_type()
+ cls.spec_key = data_utils.rand_name(cls.__name__ + '-Spec')
+
+ def _create_volume_type_extra_specs(self, ignore_not_found=False):
+ extra_specs = {self.spec_key: "val1"}
+ self.volume_types_client.create_volume_type_extra_specs(
+ self.vol_type['id'], extra_specs)
+
+ if ignore_not_found:
+ self.addCleanup(
+ test_utils.call_and_ignore_notfound_exc,
+ self.volume_types_client.delete_volume_type_extra_specs,
+ self.vol_type['id'], self.spec_key)
+ else:
+ self.addCleanup(
+ self.volume_types_client.delete_volume_type_extra_specs,
+ self.vol_type['id'], self.spec_key)
+
+ @decorators.idempotent_id('76c36be2-2b6c-4acf-9aac-c9dc5c17cdbe')
+ @rbac_rule_validation.action(service="cinder",
+ rule="volume_extension:types_extra_specs")
+ def test_list_volume_types_extra_specs(self):
+ self.rbac_utils.switch_role(self, toggle_rbac_role=True)
+ self.volume_types_client.list_volume_types_extra_specs(
+ self.vol_type['id'])['extra_specs']
@rbac_rule_validation.action(service="cinder",
rule="volume_extension:types_extra_specs")
@decorators.idempotent_id('eea40251-990b-49b0-99ae-10e4585b479b')
def test_create_volume_type_extra_specs(self):
- vol_type = self.create_volume_type()
- # List Volume types extra specs.
- extra_specs = {"spec1": "val1"}
self.rbac_utils.switch_role(self, toggle_rbac_role=True)
- self.volume_types_client.create_volume_type_extra_specs(
- vol_type['id'], extra_specs)
+ self._create_volume_type_extra_specs(ignore_not_found=True)
+
+ @decorators.idempotent_id('e2dcc9c6-2fef-431d-afaf-92b45bc76d1a')
+ @rbac_rule_validation.action(service="cinder",
+ rule="volume_extension:types_extra_specs")
+ def test_show_volume_type_extra_specs(self):
+ self._create_volume_type_extra_specs()
+
+ self.rbac_utils.switch_role(self, toggle_rbac_role=True)
+ self.volume_types_client.show_volume_type_extra_specs(
+ self.vol_type['id'], self.spec_key)
+
+ @decorators.idempotent_id('93001912-f938-41c7-8787-62dc7010fd52')
+ @rbac_rule_validation.action(service="cinder",
+ rule="volume_extension:types_extra_specs")
+ def test_delete_volume_type_extra_specs(self):
+ self._create_volume_type_extra_specs(ignore_not_found=True)
+
+ self.rbac_utils.switch_role(self, toggle_rbac_role=True)
+ self.volume_types_client.delete_volume_type_extra_specs(
+ self.vol_type['id'], self.spec_key)
+
+ @decorators.idempotent_id('0a444437-7402-4fbe-a18a-93af2ee00618')
+ @rbac_rule_validation.action(service="cinder",
+ rule="volume_extension:types_extra_specs")
+ def test_update_volume_type_extra_specs(self):
+ self._create_volume_type_extra_specs()
+ update_extra_specs = {self.spec_key: "val2"}
+
+ self.rbac_utils.switch_role(self, toggle_rbac_role=True)
+ self.volume_types_client.update_volume_type_extra_specs(
+ self.vol_type['id'], self.spec_key, update_extra_specs)
diff --git a/patrole_tempest_plugin/tests/api/volume/test_volumes_backup_rbac.py b/patrole_tempest_plugin/tests/api/volume/test_volumes_backup_rbac.py
index 25c8504..2861531 100644
--- a/patrole_tempest_plugin/tests/api/volume/test_volumes_backup_rbac.py
+++ b/patrole_tempest_plugin/tests/api/volume/test_volumes_backup_rbac.py
@@ -100,6 +100,8 @@
volume_id=self.volume['id'])['backup']
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.backups_client.delete_backup, backup['id'])
+ waiters.wait_for_volume_resource_status(self.os_admin.backups_client,
+ backup['id'], 'available')
self.rbac_utils.switch_role(self, toggle_rbac_role=True)
self.backups_client.delete_backup(backup['id'])
diff --git a/releasenotes/notes/additional-router-rbac-tests-66ef013c54016326.yaml b/releasenotes/notes/additional-router-rbac-tests-66ef013c54016326.yaml
new file mode 100644
index 0000000..4f91a12
--- /dev/null
+++ b/releasenotes/notes/additional-router-rbac-tests-66ef013c54016326.yaml
@@ -0,0 +1,11 @@
+---
+features:
+ - |
+ Add additional RBAC tests for network routers API, providing coverage for
+ the following policy actions:
+
+ * create_router:ha
+ * create_router:distributed
+ * get_router:distributed
+ * update_router:ha
+ * update_router:distributed
\ No newline at end of file
diff --git a/releasenotes/notes/extra-volume-types-tests-2e4538bed7348be4.yaml b/releasenotes/notes/extra-volume-types-tests-2e4538bed7348be4.yaml
new file mode 100644
index 0000000..9be15fc
--- /dev/null
+++ b/releasenotes/notes/extra-volume-types-tests-2e4538bed7348be4.yaml
@@ -0,0 +1,10 @@
+---
+features:
+ - |
+ Added RBAC tests for volume type access and volume type extra specs
+ APIs, providing coverage for the following policy actions:
+
+ * "volume_extension:types_extra_specs"
+ * "volume_extension:volume_type_access"
+ * "volume_extension:volume_type_access:addProjectAccess"
+ * "volume_extension:volume_type_access:removeProjectAccess"
diff --git a/releasenotes/notes/lock-server-460767a02d15bb29.yaml b/releasenotes/notes/lock-server-460767a02d15bb29.yaml
new file mode 100644
index 0000000..7af7ff3
--- /dev/null
+++ b/releasenotes/notes/lock-server-460767a02d15bb29.yaml
@@ -0,0 +1,5 @@
+---
+features:
+ - |
+ Adds tests for Nova's lock_server policies: lock,
+ unlock, and unlock_override.
diff --git a/releasenotes/notes/rbac-tests-for-network-agents-fbc899925b5948b1.yaml b/releasenotes/notes/rbac-tests-for-network-agents-fbc899925b5948b1.yaml
new file mode 100644
index 0000000..64deadc
--- /dev/null
+++ b/releasenotes/notes/rbac-tests-for-network-agents-fbc899925b5948b1.yaml
@@ -0,0 +1,14 @@
+---
+features:
+ - |
+ Implements RBAC tests for Tempest network agents_client, providing
+ coverage for the following policies:
+
+ * update_agent
+ * get_agent
+ * create_dhcp-network
+ * delete_dhcp-network
+ * get_dhcp-networks
+ * create_l3-router
+ * delete_l3-router
+ * get_l3-routers
diff --git a/releasenotes/notes/subnet-rbac-tests-6d3cf54e39a7b486.yaml b/releasenotes/notes/subnet-rbac-tests-6d3cf54e39a7b486.yaml
new file mode 100644
index 0000000..8d3d22a
--- /dev/null
+++ b/releasenotes/notes/subnet-rbac-tests-6d3cf54e39a7b486.yaml
@@ -0,0 +1,10 @@
+---
+features:
+ - |
+ Add RBAC tests for network subnet endpoints, providing coverage for the
+ following policy actions:
+
+ * create_subnet
+ * get_subnet
+ * update_subnet
+ * delete_subnet
diff --git a/releasenotes/notes/test_tokens_rbac-7f36919b786e9ffc.yaml b/releasenotes/notes/test_tokens_rbac-7f36919b786e9ffc.yaml
new file mode 100644
index 0000000..5ff448b
--- /dev/null
+++ b/releasenotes/notes/test_tokens_rbac-7f36919b786e9ffc.yaml
@@ -0,0 +1,3 @@
+---
+features:
+ - Added RBAC test scenarios for the token-related admin v2 identity API.
diff --git a/releasenotes/notes/volume-upload-public-test-f8e741a838ae7607.yaml b/releasenotes/notes/volume-upload-public-test-f8e741a838ae7607.yaml
new file mode 100644
index 0000000..c8c0ecd
--- /dev/null
+++ b/releasenotes/notes/volume-upload-public-test-f8e741a838ae7607.yaml
@@ -0,0 +1,5 @@
+---
+features:
+ - |
+ Add RBAC test to provide coverage for the following cinder policy:
+ "volume_extension:volume_actions:upload_public".