Merge "Migrate patrol jobs to bionic(Ubuntu LTS 18.04)"
diff --git a/.zuul.yaml b/.zuul.yaml
index 161f8aa..5203990 100644
--- a/.zuul.yaml
+++ b/.zuul.yaml
@@ -77,10 +77,7 @@
parent: patrole-base
description: Patrole job for member role.
# This currently works from stable/pike onward.
- branches:
- - master
- - stable/queens
- - stable/pike
+ branches: ^(?!stable/ocata).*$
vars:
devstack_localrc:
RBAC_TEST_ROLES: member
diff --git a/HACKING.rst b/HACKING.rst
index 9868e39..cd85d84 100644
--- a/HACKING.rst
+++ b/HACKING.rst
@@ -41,7 +41,24 @@
code that is more maintainable and easier to read
- [P104] RBAC `extension test class`_ names must end in 'ExtRbacTest'
-.. _extension test class: https://github.com/openstack/patrole/tree/master/patrole_tempest_plugin/tests/api/network#neutron-extension-rbac-tests
+.. _extension test class: https://git.openstack.org/cgit/openstack/patrole/plain/patrole_tempest_plugin/tests/api/network/README.rst
+
+Supported OpenStack Components
+------------------------------
+
+Patrole only offers **in-tree** integration testing coverage for the following
+components:
+
+* Cinder
+* Glance
+* Keystone
+* Neutron
+* Nova
+
+Patrole currently has no stable library, so reliance upon Patrole's framework
+for external RBAC testing should be done with caution. Nonetheless, even when
+Patrole has a stable library, it will only offer in-tree RBAC testing for
+the components listed above.
Role Overriding
---------------
diff --git a/README.rst b/README.rst
index 5331445..dfba71f 100644
--- a/README.rst
+++ b/README.rst
@@ -18,9 +18,6 @@
allowing deployments to verify that only intended roles have access to those
APIs.
-Patrole currently offers testing for the following OpenStack services: Nova,
-Neutron, Glance, Cinder and Keystone.
-
Patrole is currently undergoing heavy development. As more projects move
toward policy in code, Patrole will align its testing with the appropriate
documentation.
@@ -78,8 +75,8 @@
.. _Tempest plugin: https://docs.openstack.org/tempest/latest/plugin.html
.. _Tempest design principles: https://docs.openstack.org/tempest/latest/overview.html#design-principles
.. _policy in code: https://specs.openstack.org/openstack/oslo-specs/specs/newton/policy-in-code.html
-.. _Nova repository: https://github.com/openstack/nova/tree/master/nova/policies
-.. _Keystone repository: https://github.com/openstack/keystone/tree/master/keystone/common/policies
+.. _Nova repository: https://git.openstack.org/cgit/openstack/nova/tree/nova/policies
+.. _Keystone repository: https://git.openstack.org/cgit/openstack/keystone/tree/keystone/common/policies
Features
--------
@@ -177,7 +174,7 @@
the Patrole repository. To configure Patrole's logging, see the
`Patrole Configuration Guide <https://docs.openstack.org/patrole/latest/configuration.html#patrole-configuration>`_.
-.. _Tempest: https://github.com/openstack/tempest
+.. _Tempest: https://git.openstack.org/cgit/openstack/tempest
.. _Tempest_quickstart: https://docs.openstack.org/tempest/latest/overview.html#quickstart
.. _tempest_run: https://docs.openstack.org/tempest/latest/run.html
.. _testr: https://testrepository.readthedocs.org/en/latest/MANUAL.html
@@ -205,7 +202,11 @@
For more information about RBAC, reference the `rbac-overview`_
documentation page.
+For information regarding which projects Patrole offers RBAC testing for,
+reference the `HACKING`_ documentation page.
+
.. _rbac-overview: https://docs.openstack.org/patrole/latest/rbac-overview.html
+.. _HACKING: https://docs.openstack.org/patrole/latest/HACKING.html#supported-openstack-components
Unit Tests
----------
diff --git a/devstack/README.rst b/devstack/README.rst
index 053afd4..490f833 100644
--- a/devstack/README.rst
+++ b/devstack/README.rst
@@ -22,4 +22,4 @@
3. Run ``stack.sh`` found in the DevStack repo.
-.. _README file: https://github.com/openstack-dev/devstack/blob/master/README.rst
+.. _README file: https://git.openstack.org/cgit/openstack-dev/devstack/plain/README.rst
diff --git a/doc/source/multi-policy-validation.rst b/doc/source/multi-policy-validation.rst
index d38b31e..576fd68 100644
--- a/doc/source/multi-policy-validation.rst
+++ b/doc/source/multi-policy-validation.rst
@@ -20,7 +20,7 @@
Multi-policy support allows Patrole to more accurately offer RBAC tests for API
endpoints that enforce multiple policy actions.
-.. _this spec: https://github.com/openstack/qa-specs/blob/master/specs/patrole/rbac-testing-multiple-policies.rst
+.. _this spec: http://specs.openstack.org/openstack/qa-specs/specs/patrole/rbac-testing-multiple-policies.html
Scope
-----
diff --git a/doc/source/rbac-overview.rst b/doc/source/rbac-overview.rst
index cc47f75..747eab8 100644
--- a/doc/source/rbac-overview.rst
+++ b/doc/source/rbac-overview.rst
@@ -271,8 +271,8 @@
Related term: :term:`hard authorization`.
-.. _Nova repository: https://github.com/openstack/nova/tree/master/nova/policies
-.. _Keystone repository: https://github.com/openstack/keystone/tree/master/keystone/common/policies
+.. _Nova repository: https://git.openstack.org/cgit/openstack/nova/tree/nova/policies
+.. _Keystone repository: https://git.openstack.org/cgit/openstack/keystone/tree/keystone/common/policies
.. _governance goal: https://governance.openstack.org/tc/goals/queens/policy-in-code.html
.. _scope types: https://docs.openstack.org/keystone/latest/admin/identity-tokens.html#authorization-scopes
.. _policy.yaml: https://docs.openstack.org/ocata/config-reference/policy-yaml-file.html
diff --git a/etc/patrole.conf.sample b/etc/patrole.conf.sample
index 6433f40..42f1042 100644
--- a/etc/patrole.conf.sample
+++ b/etc/patrole.conf.sample
@@ -81,7 +81,7 @@
#
# Where:
#
-# service = the service that is being tested (Cinder, Nova, etc.).
+# service = the service that is being tested (cinder, nova, etc.).
#
# api_action = the policy action that is being tested. Examples:
#
diff --git a/patrole_tempest_plugin/config.py b/patrole_tempest_plugin/config.py
index 62337f7..63f0a8a 100644
--- a/patrole_tempest_plugin/config.py
+++ b/patrole_tempest_plugin/config.py
@@ -96,6 +96,11 @@
* add_image
allowed_role = the ``oslo.policy`` role that is allowed to perform the API.
+"""),
+ cfg.BoolOpt('validate_deprecated_rules', default=True,
+ help="""Some of the policy rules have deprecated version,
+Patrole should be able to run check against default and deprecated rules,
+otherwise the result of the tests may not be correct.
""")
]
diff --git a/patrole_tempest_plugin/policy_authority.py b/patrole_tempest_plugin/policy_authority.py
index e0a26a3..9c25e5f 100644
--- a/patrole_tempest_plugin/policy_authority.py
+++ b/patrole_tempest_plugin/policy_authority.py
@@ -169,6 +169,27 @@
is_admin=is_admin_context)
return is_allowed
+ def _handle_deprecated_rule(self, default):
+ deprecated_rule = default.deprecated_rule
+ deprecated_msg = (
+ 'Policy "%(old_name)s":"%(old_check_str)s" was deprecated in '
+ '%(release)s in favor of "%(name)s":"%(check_str)s". Reason: '
+ '%(reason)s. Either ensure your deployment is ready for the new '
+ 'default or copy/paste the deprecated policy into your policy '
+ 'file and maintain it manually.' % {
+ 'old_name': deprecated_rule.name,
+ 'old_check_str': deprecated_rule.check_str,
+ 'release': default.deprecated_since,
+ 'name': default.name,
+ 'check_str': default.check_str,
+ 'reason': default.deprecated_reason
+ }
+ )
+ LOG.warn(deprecated_msg)
+ check_str = '(%s) or (%s)' % (default.check_str,
+ deprecated_rule.check_str)
+ return policy.RuleDefault(default.name, check_str)
+
def get_rules(self):
rules = policy.Rules()
# Check whether policy file exists and attempt to read it.
@@ -203,6 +224,12 @@
if self.service in policy_generator:
for rule in policy_generator[self.service]:
if rule.name not in rules:
+ if CONF.patrole.validate_deprecated_rules:
+ # NOTE (sergey.vilgelm):
+ # The `DocumentedRuleDefault` object has no
+ # `deprecated_rule` attribute in Pike
+ if getattr(rule, 'deprecated_rule', False):
+ rule = self._handle_deprecated_rule(rule)
rules[rule.name] = rule.check
elif str(rule.check) != str(rules[rule.name]):
msg = ("The same policy name: %s was found in the "
@@ -231,20 +258,24 @@
whether the given role is contained in context_is_admin. If it is not
in the policy file, then default to context_is_admin: admin.
"""
- if 'context_is_admin' in self.rules.keys():
+ if 'context_is_admin' in self.rules:
return self._allowed(
access=self._get_access_token(roles),
apply_rule='context_is_admin')
return CONF.identity.admin_role in roles
def _get_access_token(self, roles):
+ roles = {r.lower() for r in roles if r}
+
+ # Extend roles for an user with admin or member role
+ if 'admin' in roles:
+ roles.add('member')
+ if 'member' in roles:
+ roles.add('reader')
+
access_token = {
"token": {
- "roles": [
- {
- "name": role
- } for role in roles
- ],
+ "roles": [{'name': r} for r in roles],
"project_id": self.project_id,
"tenant_id": self.project_id,
"user_id": self.user_id
@@ -266,7 +297,7 @@
# than hard-coding it to True. is_admin_project cannot be determined
# from the role, but rather from project and domain names. For more
# information, see:
- # https://github.com/openstack/keystone/blob/37ce5417418f8acbd27f3dacb70c605b0fe48301/keystone/token/providers/common.py#L150
+ # https://git.openstack.org/cgit/openstack/keystone/tree/keystone/token/providers/common.py?id=37ce5417418f8acbd27f3dacb70c605b0fe48301#n150
access_data['is_admin_project'] = True
class Object(object):
diff --git a/patrole_tempest_plugin/rbac_exceptions.py b/patrole_tempest_plugin/rbac_exceptions.py
index ad697b0..c30961b 100644
--- a/patrole_tempest_plugin/rbac_exceptions.py
+++ b/patrole_tempest_plugin/rbac_exceptions.py
@@ -47,7 +47,7 @@
"""Raised when a list or show action is empty following RBAC authorization
failure.
"""
- message = ("The response body is empty due to policy enforcement failure.")
+ message = "The response body is empty due to policy enforcement failure."
class RbacResourceSetupFailed(BasePatroleException):
@@ -104,3 +104,16 @@
* an exception is raised after ``override_role`` context
"""
message = "Override role failure or incorrect usage"
+
+
+class RbacValidateListException(BasePatroleException):
+ """Raised when override_role_and_validate_list is used incorrectly.
+
+ Specifically, when:
+
+ * Neither ``resource_id`` nor ``resources`` is initialized
+ * Both ``resource_id`` and ``resources`` are initialized
+ * The ``ctx.resources`` variable wasn't set in
+ override_role_and_validate_list context.
+ """
+ message = "Incorrect usage of override_role_and_validate_list: %(reason)s"
diff --git a/patrole_tempest_plugin/rbac_utils.py b/patrole_tempest_plugin/rbac_utils.py
index 33955c3..6aab4d7 100644
--- a/patrole_tempest_plugin/rbac_utils.py
+++ b/patrole_tempest_plugin/rbac_utils.py
@@ -13,7 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
-from contextlib import contextmanager
+import contextlib
import sys
import time
@@ -31,6 +31,77 @@
LOG = logging.getLogger(__name__)
+class _ValidateListContext(object):
+ """Context class responsible for validation of the list functions.
+
+ This class is used in ``override_role_and_validate_list`` function and
+ the result of a list function must be assigned to the ``ctx.resources``
+ variable.
+
+ Example::
+
+ with self.rbac_utils.override_role_and_validate_list(...) as ctx:
+ ctx.resources = list_function()
+
+ """
+ def __init__(self, admin_resources=None, admin_resource_id=None):
+ """Constructor for ``ValidateListContext``.
+
+ Either ``admin_resources`` or ``admin_resource_id`` should be used,
+ not both.
+
+ :param list admin_resources: The list of resources received before
+ calling the ``override_role_and_validate_list`` function. To
+ validate will be used the ``_validate_len`` function.
+ :param UUID admin_resource_id: An ID of a resource created before
+ calling the ``override_role_and_validate_list`` function. To
+ validate will be used the ``_validate_resource`` function.
+ :raises RbacValidateListException: if both ``admin_resources`` and
+ ``admin_resource_id`` are set or unset.
+ """
+ self.resources = None
+ if admin_resources is not None and not admin_resource_id:
+ self._admin_len = len(admin_resources)
+ if not self._admin_len:
+ raise rbac_exceptions.RbacValidateListException(
+ reason="the list of admin resources cannot be empty")
+ self._validate_func = self._validate_len
+ elif admin_resource_id and admin_resources is None:
+ self._admin_resource_id = admin_resource_id
+ self._validate_func = self._validate_resource
+ else:
+ raise rbac_exceptions.RbacValidateListException(
+ reason="admin_resources and admin_resource_id are mutually "
+ "exclusive")
+
+ def _validate_len(self):
+ """Validates that the number of resources is less than admin resources.
+ """
+ if not len(self.resources):
+ raise rbac_exceptions.RbacEmptyResponseBody()
+ elif self._admin_len > len(self.resources):
+ raise rbac_exceptions.RbacPartialResponseBody(body=self.resources)
+
+ def _validate_resource(self):
+ """Validates that the admin resource is present in the resources.
+ """
+ for resource in self.resources:
+ if resource['id'] == self._admin_resource_id:
+ return
+ raise rbac_exceptions.RbacPartialResponseBody(body=self.resources)
+
+ def _validate(self):
+ """Calls the proper validation function.
+
+ :raises RbacValidateListException: if the ``ctx.resources`` variable is
+ not assigned.
+ """
+ if self.resources is None:
+ raise rbac_exceptions.RbacValidateListException(
+ reason="ctx.resources is not assigned")
+ self._validate_func()
+
+
class RbacUtils(object):
"""Utility class responsible for switching ``os_primary`` role.
@@ -68,7 +139,7 @@
admin_role_id = None
rbac_role_ids = None
- @contextmanager
+ @contextlib.contextmanager
def override_role(self, test_obj):
"""Override the role used by ``os_primary`` Tempest credentials.
@@ -220,6 +291,41 @@
return False
+ @contextlib.contextmanager
+ def override_role_and_validate_list(self, test_obj, admin_resources=None,
+ admin_resource_id=None):
+ """Call ``override_role`` and validate RBAC for a list API action.
+
+ List actions usually do soft authorization: partial or empty response
+ bodies are returned instead of exceptions. This helper validates
+ that unauthorized roles only return a subset of the available
+ resources.
+ Should only be used for validating list API actions.
+
+ :param test_obj: Instance of ``tempest.test.BaseTestCase``.
+ :param list admin_resources: The list of resources received before
+ calling the ``override_role_and_validate_list`` function.
+ :param UUID admin_resource_id: An ID of a resource created before
+ calling the ``override_role_and_validate_list`` function.
+ :return: py:class:`_ValidateListContext` object.
+
+ Example::
+
+ # the resource created by admin
+ admin_resource_id = (
+ self.ntp_client.create_dscp_marking_rule()
+ ["dscp_marking_rule"]["id'])
+ with self.rbac_utils.override_role_and_validate_list(
+ self, admin_resource_id=admin_resource_id) as ctx:
+ # the list of resources available for member role
+ ctx.resources = self.ntp_client.list_dscp_marking_rules(
+ policy_id=self.policy_id)["dscp_marking_rules"]
+ """
+ ctx = _ValidateListContext(admin_resources, admin_resource_id)
+ with self.override_role(test_obj):
+ yield ctx
+ ctx._validate()
+
class RbacUtilsMixin(object):
"""Mixin class to be used alongside an instance of
diff --git a/patrole_tempest_plugin/tests/api/compute/test_server_misc_policy_actions_rbac.py b/patrole_tempest_plugin/tests/api/compute/test_server_misc_policy_actions_rbac.py
index 64e1300..5e38970 100644
--- a/patrole_tempest_plugin/tests/api/compute/test_server_misc_policy_actions_rbac.py
+++ b/patrole_tempest_plugin/tests/api/compute/test_server_misc_policy_actions_rbac.py
@@ -505,7 +505,7 @@
TODO(felipemonteiro): Once multiple policy testing is supported, this
test should also check for additional policies mentioned here:
- https://github.com/openstack/nova/blob/master/nova/policies/server_usage.py
+ https://git.openstack.org/cgit/openstack/nova/tree/nova/policies/server_usage.py?h=17.0.0
"""
expected_attrs = ('OS-SRV-USG:launched_at',
'OS-SRV-USG:terminated_at')
diff --git a/patrole_tempest_plugin/tests/api/identity/v3/test_auth_rbac.py b/patrole_tempest_plugin/tests/api/identity/v3/test_auth_rbac.py
index a63192f..a8cc767 100644
--- a/patrole_tempest_plugin/tests/api/identity/v3/test_auth_rbac.py
+++ b/patrole_tempest_plugin/tests/api/identity/v3/test_auth_rbac.py
@@ -23,7 +23,7 @@
"""Tests the APIs that enforce the auth policy actions.
For more information about the auth policy actions, see:
- https://github.com/openstack/keystone/blob/master/keystone/common/policies/auth.py
+ https://git.openstack.org/cgit/openstack/keystone/tree/keystone/common/policies/auth.py
"""
# TODO(felipemonteiro): Add tests for identity:get_auth_catalog
diff --git a/patrole_tempest_plugin/tests/api/identity/v3/test_domain_configuration_rbac.py b/patrole_tempest_plugin/tests/api/identity/v3/test_domain_configuration_rbac.py
index 4fa3937..35154eb 100644
--- a/patrole_tempest_plugin/tests/api/identity/v3/test_domain_configuration_rbac.py
+++ b/patrole_tempest_plugin/tests/api/identity/v3/test_domain_configuration_rbac.py
@@ -28,7 +28,7 @@
"""RBAC tests for domain configuration client.
Provides coverage for the following policy actions:
- https://github.com/openstack/keystone/blob/master/keystone/common/policies/domain_config.py
+ https://git.openstack.org/cgit/openstack/keystone/tree/keystone/common/policies/domain_config.py
"""
identity = {"driver": "ldap"}
diff --git a/patrole_tempest_plugin/tests/api/network/README.rst b/patrole_tempest_plugin/tests/api/network/README.rst
index 352af8a..6eaae08 100644
--- a/patrole_tempest_plugin/tests/api/network/README.rst
+++ b/patrole_tempest_plugin/tests/api/network/README.rst
@@ -41,7 +41,7 @@
possible (such as ``neutron_tempest_plugin.api.clients`` for the service
clients) because the module is not a `stable interface`_.
-.. _policy.json file: https://github.com/openstack/neutron/blob/master/etc/policy.json
-.. _Zuul jobs: https://github.com/openstack/patrole/blob/master/.zuul.yaml
-.. _neutron-tempest-plugin: https://github.com/openstack/neutron-tempest-plugin
-.. _stable interface: https://github.com/openstack/neutron-tempest-plugin/tree/master/neutron_tempest_plugin#warning
+.. _policy.json file: https://git.openstack.org/cgit/openstack/neutron/tree/etc/policy.json?h=12.0.0
+.. _Zuul jobs: https://git.openstack.org/cgit/openstack/patrole/tree/.zuul.yaml
+.. _neutron-tempest-plugin: https://git.openstack.org/cgit/openstack/neutron-tempest-plugin
+.. _stable interface: https://git.openstack.org/cgit/openstack/neutron-tempest-plugin/plain/neutron_tempest_plugin/README.rst
diff --git a/patrole_tempest_plugin/tests/api/network/test_address_scope_rbac.py b/patrole_tempest_plugin/tests/api/network/test_address_scope_rbac.py
index 6cdeccd..ad0a1d4 100644
--- a/patrole_tempest_plugin/tests/api/network/test_address_scope_rbac.py
+++ b/patrole_tempest_plugin/tests/api/network/test_address_scope_rbac.py
@@ -137,3 +137,18 @@
address_scope = self._create_address_scope()
with self.rbac_utils.override_role(self):
self.ntp_client.delete_address_scope(address_scope['id'])
+
+ @rbac_rule_validation.action(service="neutron",
+ rules=["get_address_scope"])
+ @decorators.idempotent_id('c093fd34-96ee-4abe-8fa5-916dc29653e3')
+ def test_list_address_scopes(self):
+ """List Address Scopes
+
+ RBAC test for the neutron ``list_address_scopes`` function and
+ the ``get_address_scope`` policy
+ """
+ admin_resource_id = self._create_address_scope()['id']
+ with (self.rbac_utils.override_role_and_validate_list(
+ self, admin_resource_id=admin_resource_id)) as ctx:
+ ctx.resources = self.ntp_client.list_address_scopes(
+ id=admin_resource_id)["address_scopes"]
diff --git a/patrole_tempest_plugin/tests/api/network/test_agents_rbac.py b/patrole_tempest_plugin/tests/api/network/test_agents_rbac.py
index c2b23f2..fe5f5a1 100644
--- a/patrole_tempest_plugin/tests/api/network/test_agents_rbac.py
+++ b/patrole_tempest_plugin/tests/api/network/test_agents_rbac.py
@@ -65,6 +65,20 @@
self.agents_client.update_agent(agent_id=self.agent['id'],
agent=agent_status)
+ @decorators.idempotent_id('f7a085e2-71b1-4d39-be3e-fea4bc10ccb8')
+ @rbac_rule_validation.action(service="neutron", rules=["get_agent"])
+ def test_list_agents(self):
+ """List agents test.
+
+ RBAC test for the neutron ``list_agents`` function and
+ the ``get_agent`` policy
+ """
+ admin_resource_id = self.agent['id']
+ with (self.rbac_utils.override_role_and_validate_list(
+ self, admin_resource_id=admin_resource_id)) as ctx:
+ ctx.resources = self.agents_client.list_agents(
+ id=admin_resource_id)["agents"]
+
class L3AgentSchedulerRbacTest(base.BaseNetworkRbacTest):
diff --git a/patrole_tempest_plugin/tests/api/network/test_availability_zones_rbac.py b/patrole_tempest_plugin/tests/api/network/test_availability_zones_rbac.py
new file mode 100644
index 0000000..b1c806b
--- /dev/null
+++ b/patrole_tempest_plugin/tests/api/network/test_availability_zones_rbac.py
@@ -0,0 +1,48 @@
+# Copyright 2018 AT&T Corporation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from tempest.common import utils
+from tempest.lib import decorators
+
+from patrole_tempest_plugin import rbac_rule_validation
+from patrole_tempest_plugin.tests.api.network import rbac_base as base
+
+
+class AvailabilityZoneExtRbacTest(base.BaseNetworkExtRbacTest):
+
+ @classmethod
+ def skip_checks(cls):
+ super(AvailabilityZoneExtRbacTest, cls).skip_checks()
+ if not utils.is_extension_enabled('availability_zone',
+ 'network'):
+ msg = "network_availability_zone extension not enabled."
+ raise cls.skipException(msg)
+
+ @rbac_rule_validation.action(service="neutron",
+ rules=["get_availability_zone"])
+ @decorators.idempotent_id('3c521be8-c32e-11e8-a611-080027758b73')
+ def test_list_availability_zone_rbac(self):
+
+ """List all available zones.
+
+ RBAC test for the neutron ``list_availability_zones``
+ function and the ``get_availability_zone`` policy
+ """
+ admin_resources = (self.ntp_client.list_availability_zones()
+ ["availability_zones"])
+ with self.rbac_utils.override_role_and_validate_list(
+ self, admin_resources=admin_resources) as ctx:
+ ctx.resources = (self.ntp_client.list_availability_zones()
+ ['availability_zones'])
diff --git a/patrole_tempest_plugin/tests/api/network/test_dscp_marking_rule_rbac.py b/patrole_tempest_plugin/tests/api/network/test_dscp_marking_rule_rbac.py
index e03de74..bdc2a7c 100644
--- a/patrole_tempest_plugin/tests/api/network/test_dscp_marking_rule_rbac.py
+++ b/patrole_tempest_plugin/tests/api/network/test_dscp_marking_rule_rbac.py
@@ -104,3 +104,18 @@
with self.rbac_utils.override_role(self):
self.ntp_client.delete_dscp_marking_rule(self.policy_id, rule_id)
+
+ @decorators.idempotent_id('c012fd4f-3a3e-4af4-9075-dd3e170daecd')
+ @rbac_rule_validation.action(service="neutron",
+ rules=["get_policy_dscp_marking_rule"])
+ def test_list_policy_dscp_marking_rules(self):
+ """List policy_dscp_marking_rules.
+
+ RBAC test for the neutron ``list_dscp_marking_rules`` function and
+ the ``get_policy_dscp_marking_rule`` policy
+ """
+ admin_resource_id = self.create_policy_dscp_marking_rule()
+ with (self.rbac_utils.override_role_and_validate_list(
+ self, admin_resource_id=admin_resource_id)) as ctx:
+ ctx.resources = self.ntp_client.list_dscp_marking_rules(
+ policy_id=self.policy_id)["dscp_marking_rules"]
diff --git a/patrole_tempest_plugin/tests/api/network/test_floating_ips_rbac.py b/patrole_tempest_plugin/tests/api/network/test_floating_ips_rbac.py
index 336490a..89772d9 100644
--- a/patrole_tempest_plugin/tests/api/network/test_floating_ips_rbac.py
+++ b/patrole_tempest_plugin/tests/api/network/test_floating_ips_rbac.py
@@ -130,3 +130,17 @@
with self.rbac_utils.override_role(self):
# Delete the floating IP
self.floating_ips_client.delete_floatingip(floating_ip['id'])
+
+ @rbac_rule_validation.action(service="neutron", rules=["get_floatingip"])
+ @decorators.idempotent_id('824965e3-8be8-46e2-be64-0d793533ad20')
+ def test_list_floating_ips(self):
+ """List Floating IPs.
+
+ RBAC test for the neutron ``list_floatingips`` function and
+ the ``get_floatingip`` policy
+ """
+ admin_resource_id = self._create_floatingip()['id']
+ with (self.rbac_utils.override_role_and_validate_list(
+ self, admin_resource_id=admin_resource_id)) as ctx:
+ ctx.resources = self.floating_ips_client.list_floatingips(
+ id=admin_resource_id)["floatingips"]
diff --git a/patrole_tempest_plugin/tests/api/network/test_metering_label_rules_rbac.py b/patrole_tempest_plugin/tests/api/network/test_metering_label_rules_rbac.py
index bf49053..6673201 100644
--- a/patrole_tempest_plugin/tests/api/network/test_metering_label_rules_rbac.py
+++ b/patrole_tempest_plugin/tests/api/network/test_metering_label_rules_rbac.py
@@ -101,3 +101,20 @@
with self.rbac_utils.override_role(self):
self.metering_label_rules_client.delete_metering_label_rule(
label_rule['id'])
+
+ @rbac_rule_validation.action(service="neutron",
+ rules=["get_metering_label_rule"])
+ @decorators.idempotent_id('eaaf9eb5-ee53-4b6b-a4d3-a721dd39bc40')
+ def test_list_metering_label_rules(self):
+ """List metering label rules.
+
+ RBAC test for the neutron ``list_metering_label_rules`` function and
+ the ``get_metering_label_rule`` policy
+ """
+ admin_resource_id = self._create_metering_label_rule(self.label)['id']
+ with (self.rbac_utils.override_role_and_validate_list(
+ self, admin_resource_id=admin_resource_id)) as ctx:
+ ctx.resources = (
+ self.metering_label_rules_client.
+ list_metering_label_rules(id=admin_resource_id)
+ ["metering_label_rules"])
diff --git a/patrole_tempest_plugin/tests/api/network/test_metering_labels_rbac.py b/patrole_tempest_plugin/tests/api/network/test_metering_labels_rbac.py
index ed6e316..bac55d1 100644
--- a/patrole_tempest_plugin/tests/api/network/test_metering_labels_rbac.py
+++ b/patrole_tempest_plugin/tests/api/network/test_metering_labels_rbac.py
@@ -83,3 +83,20 @@
label = self._create_metering_label()
with self.rbac_utils.override_role(self):
self.metering_labels_client.delete_metering_label(label['id'])
+
+ @rbac_rule_validation.action(service="neutron",
+ rules=["get_metering_label"])
+ @decorators.idempotent_id('d60d72b0-cb8f-44db-b10b-5092fa01cb0e')
+ def test_list_metering_labels(self):
+ """List metering label.
+
+ RBAC test for the neutron ``list_metering_labels`` function and
+ the ``get_metering_label`` policy
+ """
+ admin_resource_id = self._create_metering_label()['id']
+ with (self.rbac_utils.override_role_and_validate_list(
+ self, admin_resource_id=admin_resource_id)) as ctx:
+ ctx.resources = (
+ self.metering_labels_client.
+ list_metering_labels(id=admin_resource_id)
+ ["metering_labels"])
diff --git a/patrole_tempest_plugin/tests/api/network/test_network_ip_availability_rbac.py b/patrole_tempest_plugin/tests/api/network/test_network_ip_availability_rbac.py
new file mode 100644
index 0000000..b5346d5
--- /dev/null
+++ b/patrole_tempest_plugin/tests/api/network/test_network_ip_availability_rbac.py
@@ -0,0 +1,63 @@
+# Copyright 2018 AT&T Corporation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from tempest.common import utils
+from tempest.lib.common.utils import data_utils
+from tempest.lib import decorators
+
+from patrole_tempest_plugin import rbac_rule_validation
+from patrole_tempest_plugin.tests.api.network import rbac_base as base
+
+
+class NetworkIpAvailabilityExtRbacTest(base.BaseNetworkExtRbacTest):
+ @classmethod
+ def skip_checks(cls):
+ super(NetworkIpAvailabilityExtRbacTest, cls).skip_checks()
+ if not utils.is_extension_enabled('network-ip-availability',
+ 'network'):
+ msg = "network-ip-availability extension not enabled."
+ raise cls.skipException(msg)
+
+ @rbac_rule_validation.action(service="neutron",
+ rules=["get_network_ip_availability"],
+ expected_error_codes=[404])
+ @decorators.idempotent_id('93edc5ed-385f-4a8e-9b15-4370ec608253')
+ def test_get_network_ip_availability(self):
+ """Get network availability
+
+ RBAC test for the neutron get_network_ip_availability policy
+ """
+
+ network_name = data_utils.rand_name(
+ self.__class__.__name__ + '-Network')
+ network = self.create_network(network_name=network_name)
+
+ with self.rbac_utils.override_role(self):
+ self.ntp_client.show_network_ip_availability(network['id'])
+
+ @rbac_rule_validation.action(service="neutron",
+ rules=["get_network_ip_availabilities"])
+ @decorators.idempotent_id('d4ceb5f0-2342-4412-a617-4e1aaf7fcaf0')
+ def test_get_network_ip_availabilities(self):
+ """List network ip availabilities
+
+ RBAC test for the neutron get_network_ip_availabilities policy
+ """
+ admin_resources = (self.ntp_client.list_network_ip_availabilities()
+ ["network_ip_availabilities"])
+ with self.rbac_utils.override_role_and_validate_list(
+ self, admin_resources=admin_resources) as ctx:
+ ctx.resources = (self.ntp_client.list_network_ip_availabilities()
+ ["network_ip_availabilities"])
diff --git a/patrole_tempest_plugin/tests/api/network/test_networks_rbac.py b/patrole_tempest_plugin/tests/api/network/test_networks_rbac.py
index b39489a..d98febd 100644
--- a/patrole_tempest_plugin/tests/api/network/test_networks_rbac.py
+++ b/patrole_tempest_plugin/tests/api/network/test_networks_rbac.py
@@ -457,3 +457,18 @@
with self.rbac_utils.override_role(self):
self.networks_client.list_dhcp_agents_on_hosting_network(
self.network['id'])
+
+ @rbac_rule_validation.action(service="neutron", rules=["get_network"])
+ @decorators.idempotent_id('53d6d826-ec9a-4407-9362-b474187fae6d')
+ def test_list_networks(self):
+ """List Networks
+
+ RBAC test for the neutron ``list_networks`` function and
+ the ``get_network`` policy
+ """
+
+ admin_resource_id = self.network['id']
+ with (self.rbac_utils.override_role_and_validate_list(
+ self, admin_resource_id=admin_resource_id)) as ctx:
+ ctx.resources = self.networks_client.list_networks(
+ id=admin_resource_id)["networks"]
diff --git a/patrole_tempest_plugin/tests/api/network/test_ports_rbac.py b/patrole_tempest_plugin/tests/api/network/test_ports_rbac.py
index dd3537f..a5e4be6 100644
--- a/patrole_tempest_plugin/tests/api/network/test_ports_rbac.py
+++ b/patrole_tempest_plugin/tests/api/network/test_ports_rbac.py
@@ -388,3 +388,17 @@
port = self.create_port(self.network)
with self.rbac_utils.override_role(self):
self.ports_client.delete_port(port['id'])
+
+ @rbac_rule_validation.action(service="neutron", rules=["get_port"])
+ @decorators.idempotent_id('877ea70d-b000-4af4-9322-0a76b47b7890')
+ def test_list_ports(self):
+ """List Ports
+
+ RBAC test for the neutron ``list_ports`` function and
+ the ``get_port`` policy
+ """
+ admin_resource_id = self.port['id']
+ with (self.rbac_utils.override_role_and_validate_list(
+ self, admin_resource_id=admin_resource_id)) as ctx:
+ ctx.resources = self.ports_client.list_ports(
+ id=admin_resource_id)["ports"]
diff --git a/patrole_tempest_plugin/tests/api/network/test_qos_rbac.py b/patrole_tempest_plugin/tests/api/network/test_qos_rbac.py
index 3fcb7e4..95a1456 100644
--- a/patrole_tempest_plugin/tests/api/network/test_qos_rbac.py
+++ b/patrole_tempest_plugin/tests/api/network/test_qos_rbac.py
@@ -98,3 +98,17 @@
policy = self.create_policy()
with self.rbac_utils.override_role(self):
self.ntp_client.delete_qos_policy(policy['id'])
+
+ @rbac_rule_validation.action(service="neutron", rules=["get_policy"])
+ @decorators.idempotent_id('e84cec88-8478-4787-b603-5fcdd8ed7bd5')
+ def test_list_policies(self):
+ """List Policies Test
+
+ RBAC test for the neutron ``list_qos_policies`` function and
+ the ``get_policy``
+ """
+ admin_resource_id = self.create_policy()['id']
+ with (self.rbac_utils.override_role_and_validate_list(
+ self, admin_resource_id=admin_resource_id)) as ctx:
+ ctx.resources = self.ntp_client.list_qos_policies(
+ id=admin_resource_id)["policies"]
diff --git a/patrole_tempest_plugin/tests/api/network/test_rbac_policies_rbac.py b/patrole_tempest_plugin/tests/api/network/test_rbac_policies_rbac.py
index 2123eb3..599cab7 100644
--- a/patrole_tempest_plugin/tests/api/network/test_rbac_policies_rbac.py
+++ b/patrole_tempest_plugin/tests/api/network/test_rbac_policies_rbac.py
@@ -109,3 +109,18 @@
with self.rbac_utils.override_role(self):
self.ntp_client.delete_rbac_policy(policy_id)
+
+ @decorators.idempotent_id('5337d95a-2e75-47bb-a0ea-0a082be930bf')
+ @rbac_rule_validation.action(service="neutron", rules=["get_rbac_policy"])
+ def test_list_rbac_policies(self):
+ """List RBAC policies.
+
+ RBAC test for the neutron ``list_rbac_policies`` function and
+ the ``get_rbac_policy`` policy
+ """
+ admin_resource_id = self.create_rbac_policy(self.tenant_id,
+ self.network_id)
+ with (self.rbac_utils.override_role_and_validate_list(
+ self, admin_resource_id=admin_resource_id)) as ctx:
+ ctx.resources = self.ntp_client.list_rbac_policies(
+ id=admin_resource_id)["rbac_policies"]
diff --git a/patrole_tempest_plugin/tests/api/network/test_routers_rbac.py b/patrole_tempest_plugin/tests/api/network/test_routers_rbac.py
index 399ad47..e253b1e 100644
--- a/patrole_tempest_plugin/tests/api/network/test_routers_rbac.py
+++ b/patrole_tempest_plugin/tests/api/network/test_routers_rbac.py
@@ -401,3 +401,18 @@
self.routers_client.remove_router_interface(
router['id'],
subnet_id=subnet['id'])
+
+ @rbac_rule_validation.action(service="neutron", rules=["get_router"])
+ @decorators.idempotent_id('86816700-12d1-4173-a50f-34bd137f47e6')
+ def test_list_routers(self):
+ """List Routers
+
+ RBAC test for the neutron ``get_router policy`` and
+ the ``get_router`` policy
+ """
+
+ admin_resource_id = self.router['id']
+ with (self.rbac_utils.override_role_and_validate_list(
+ self, admin_resource_id=admin_resource_id)) as ctx:
+ ctx.resources = self.routers_client.list_routers(
+ id=admin_resource_id)["routers"]
diff --git a/patrole_tempest_plugin/tests/api/network/test_security_groups_rbac.py b/patrole_tempest_plugin/tests/api/network/test_security_groups_rbac.py
index e9fa018..750ba3d 100644
--- a/patrole_tempest_plugin/tests/api/network/test_security_groups_rbac.py
+++ b/patrole_tempest_plugin/tests/api/network/test_security_groups_rbac.py
@@ -119,14 +119,16 @@
rules=["get_security_group"])
@decorators.idempotent_id('fbaf8d96-ed3e-49af-b24c-5fb44f05bbb7')
def test_list_security_groups(self):
+ """List Security Groups
- with self.rbac_utils.override_role(self):
- security_groups = self.security_groups_client.\
- list_security_groups()
-
- # Neutron may return an empty list if access is denied.
- if not security_groups['security_groups']:
- raise rbac_exceptions.RbacEmptyResponseBody()
+ RBAC test for the neutron ``list_security_groups`` function and
+ the ``get_security_group`` policy
+ """
+ admin_resource_id = self.secgroup['id']
+ with (self.rbac_utils.override_role_and_validate_list(
+ self, admin_resource_id=admin_resource_id)) as ctx:
+ ctx.resources = self.security_groups_client.list_security_groups(
+ id=admin_resource_id)["security_groups"]
@rbac_rule_validation.action(service="neutron",
rules=["create_security_group_rule"])
diff --git a/patrole_tempest_plugin/tests/api/network/test_segments_rbac.py b/patrole_tempest_plugin/tests/api/network/test_segments_rbac.py
index 0b58649..a85b4d5 100644
--- a/patrole_tempest_plugin/tests/api/network/test_segments_rbac.py
+++ b/patrole_tempest_plugin/tests/api/network/test_segments_rbac.py
@@ -120,3 +120,17 @@
with self.rbac_utils.override_role(self):
self.ntp_client.delete_segment(segment['segment']['id'])
+
+ @decorators.idempotent_id('d68a0578-36ae-435e-8aaa-508ee96bdfae')
+ @rbac_rule_validation.action(service="neutron", rules=["get_segment"])
+ def test_list_segments(self):
+ """List segments.
+
+ RBAC test for the neutron ``list_segments`` function and
+ the``get_segment`` policy
+ """
+ admin_resource_id = self.create_segment(self.network)['segment']['id']
+ with (self.rbac_utils.override_role_and_validate_list(
+ self, admin_resource_id=admin_resource_id)) as ctx:
+ ctx.resources = self.ntp_client.list_segments(
+ id=admin_resource_id)["segments"]
diff --git a/patrole_tempest_plugin/tests/api/network/test_subnetpools_rbac.py b/patrole_tempest_plugin/tests/api/network/test_subnetpools_rbac.py
index bc6b923..3daeff1 100644
--- a/patrole_tempest_plugin/tests/api/network/test_subnetpools_rbac.py
+++ b/patrole_tempest_plugin/tests/api/network/test_subnetpools_rbac.py
@@ -164,3 +164,17 @@
subnetpool = self._create_subnetpool()
with self.rbac_utils.override_role(self):
self.subnetpools_client.delete_subnetpool(subnetpool['id'])
+
+ @rbac_rule_validation.action(service="neutron", rules=["get_subnetpool"])
+ @decorators.idempotent_id('f1caf0f6-bde5-11e8-a355-529269fb1459')
+ def test_list_subnetpools(self):
+ """List subnetpools.
+
+ RBAC test for the neutron ``list_subnetpools`` function and
+ the ``get_subnetpool`` policy
+ """
+ admin_resource_id = self._create_subnetpool()['id']
+ with (self.rbac_utils.override_role_and_validate_list(
+ self, admin_resource_id=admin_resource_id)) as ctx:
+ ctx.resources = self.subnetpools_client.list_subnetpools(
+ id=admin_resource_id)["subnetpools"]
diff --git a/patrole_tempest_plugin/tests/api/network/test_subnets_rbac.py b/patrole_tempest_plugin/tests/api/network/test_subnets_rbac.py
index 8fe157a..babb6ad 100644
--- a/patrole_tempest_plugin/tests/api/network/test_subnets_rbac.py
+++ b/patrole_tempest_plugin/tests/api/network/test_subnets_rbac.py
@@ -17,7 +17,6 @@
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
-from patrole_tempest_plugin import rbac_exceptions
from patrole_tempest_plugin import rbac_rule_validation
from patrole_tempest_plugin.tests.api.network import rbac_base as base
@@ -61,19 +60,18 @@
self.subnets_client.show_subnet(self.subnet['id'])
@decorators.idempotent_id('e2ddc415-5cab-43f4-9b61-166aed65d637')
- @rbac_rule_validation.action(service="neutron",
- rules=["get_subnet"])
+ @rbac_rule_validation.action(service="neutron", rules=["get_subnet"])
def test_list_subnets(self):
"""List subnets.
- RBAC test for the neutron "get_subnet" policy
+ RBAC test for the neutron ``list_subnets`` function and
+ the ``get_subnet`` policy
"""
- with self.rbac_utils.override_role(self):
- subnets = self.subnets_client.list_subnets()
-
- # Neutron may return an empty list if access is denied.
- if not subnets['subnets']:
- raise rbac_exceptions.RbacEmptyResponseBody()
+ admin_resource_id = self.subnet['id']
+ with (self.rbac_utils.override_role_and_validate_list(
+ self, admin_resource_id=admin_resource_id)) as ctx:
+ ctx.resources = self.subnets_client.list_subnets(
+ id=admin_resource_id)["subnets"]
@decorators.idempotent_id('f36cd821-dd22-4bd0-b43d-110fc4b553eb')
@rbac_rule_validation.action(service="neutron",
diff --git a/patrole_tempest_plugin/tests/api/network/test_trunks_rbac.py b/patrole_tempest_plugin/tests/api/network/test_trunks_rbac.py
index 4b2eefd..761820b 100644
--- a/patrole_tempest_plugin/tests/api/network/test_trunks_rbac.py
+++ b/patrole_tempest_plugin/tests/api/network/test_trunks_rbac.py
@@ -83,3 +83,119 @@
with self.rbac_utils.override_role(self):
self.ntp_client.delete_trunk(trunk['trunk']['id'])
+
+ @decorators.idempotent_id('047badd1-e4ff-40c5-9929-99ffcb8750a7')
+ @rbac_rule_validation.action(service="neutron", rules=["get_trunk"])
+ def test_list_trunks(self):
+ """Show trunk.
+
+ RBAC test for the neutron ``list_trunks``` function and
+ the ``get_trunk`` policy
+ """
+ admin_resource_id = self.create_trunk(self.port_id)["trunk"]['id']
+ with (self.rbac_utils.override_role_and_validate_list(
+ self, admin_resource_id=admin_resource_id)) as ctx:
+ ctx.resources = self.ntp_client.list_trunks(
+ id=admin_resource_id)["trunks"]
+
+
+class TrunksSubportsExtRbacTest(base.BaseNetworkExtRbacTest):
+
+ @classmethod
+ def skip_checks(cls):
+ super(TrunksSubportsExtRbacTest, cls).skip_checks()
+ if not utils.is_extension_enabled('trunk', 'network'):
+ msg = "trunk extension not enabled."
+ raise cls.skipException(msg)
+
+ @classmethod
+ def resource_setup(cls):
+ super(TrunksSubportsExtRbacTest, cls).resource_setup()
+ cls.network = cls.create_network()
+ cls.port_id = cls.create_port(cls.network)["id"]
+ cls.trunk_id = cls.ntp_client.create_trunk(
+ cls.port_id, [])['trunk']['id']
+
+ cls.addClassResourceCleanup(
+ test_utils.call_and_ignore_notfound_exc,
+ cls.ntp_client.delete_trunk, cls.trunk_id)
+
+ def create_subports(self, trunk_id, port_id):
+ subports = [{'port_id': port_id,
+ 'segmentation_type': 'vlan',
+ 'segmentation_id': 4000}]
+ sub = self.ntp_client.add_subports(trunk_id, subports)
+ self.addCleanup(
+ test_utils.call_and_ignore_notfound_exc,
+ self.ntp_client.remove_subports,
+ trunk_id, subports)
+ return sub["sub_ports"]
+
+ @decorators.idempotent_id('c02618e7-bb20-1a3a-83c8-6eec2af08133')
+ @rbac_rule_validation.action(service="neutron",
+ rules=["get_trunk",
+ "get_subports"],
+ expected_error_codes=[404, 403])
+ def test_get_subports(self):
+ """Get subports.
+
+ RBAC test for the neutron "get_subports" policy.
+
+ Error 403 expected due to implementation of subports as a part of
+ trunk object.
+ """
+ network = self.create_network()
+ port = self.create_port(network)
+
+ self.create_subports(self.trunk_id, port["id"])
+
+ with self.rbac_utils.override_role(self):
+ self.ntp_client.get_subports(self.trunk_id)
+
+ @decorators.idempotent_id('c02618e7-bb20-1a3a-83c8-6eec2af08134')
+ @rbac_rule_validation.action(service="neutron",
+ rules=["get_trunk",
+ "add_subports"],
+ expected_error_codes=[404, 403])
+ def test_add_subports(self):
+ """Add subports.
+
+ RBAC test for the neutron "add_subports" policy
+
+ Error 403 expected due to implementation of subports as a part of
+ trunk object.
+ """
+ network = self.create_network()
+ port = self.create_port(network)
+
+ subports = [{'port_id': port["id"],
+ 'segmentation_type': 'vlan',
+ 'segmentation_id': 4000}]
+ self.addCleanup(
+ test_utils.call_and_ignore_notfound_exc,
+ self.ntp_client.remove_subports,
+ self.trunk_id, subports)
+
+ with self.rbac_utils.override_role(self):
+ self.ntp_client.add_subports(self.trunk_id, subports)
+
+ @decorators.idempotent_id('c02618e7-bb20-1a3a-83c8-6eec2af08135')
+ @rbac_rule_validation.action(service="neutron",
+ rules=["get_trunk",
+ "remove_subports"],
+ expected_error_codes=[404, 403])
+ def test_remove_subports(self):
+ """Remove subports.
+
+ RBAC test for the neutron "remove_subports" policy
+
+ Error 403 expected due to implementation of subports as a part of
+ trunk object.
+ """
+ network = self.create_network()
+ port = self.create_port(network)
+
+ subports = self.create_subports(self.trunk_id, port["id"])
+
+ with self.rbac_utils.override_role(self):
+ self.ntp_client.remove_subports(self.trunk_id, subports)
diff --git a/patrole_tempest_plugin/tests/unit/resources/admin_rbac_policy.json b/patrole_tempest_plugin/tests/unit/resources/admin_rbac_policy.json
index 7828921..d6d9605 100644
--- a/patrole_tempest_plugin/tests/unit/resources/admin_rbac_policy.json
+++ b/patrole_tempest_plugin/tests/unit/resources/admin_rbac_policy.json
@@ -2,5 +2,6 @@
"admin_rule": "role:admin",
"is_admin_rule": "is_admin:True",
"alt_admin_rule": "is_admin:True or (role:admin and is_project_admin:True)",
- "non_admin_rule": "role:Member"
+ "member_rule": "role:member",
+ "reader_rule": "role:reader"
}
diff --git a/patrole_tempest_plugin/tests/unit/test_policy_authority.py b/patrole_tempest_plugin/tests/unit/test_policy_authority.py
index 6a4d219..90e45f9 100644
--- a/patrole_tempest_plugin/tests/unit/test_policy_authority.py
+++ b/patrole_tempest_plugin/tests/unit/test_policy_authority.py
@@ -83,9 +83,29 @@
for name, check in rules.items():
fake_rule = mock.Mock(check=check, __name__='foo')
fake_rule.name = name
+ fake_rule.deprecated_rule = False
fake_rules.append(fake_rule)
return fake_rules
+ def _test_policy_file(self, roles, allowed_rules,
+ disallowed_rules, authority=None, service=None):
+ if authority is None:
+ self.assertIsNotNone(service)
+ test_tenant_id = mock.sentinel.tenant_id
+ test_user_id = mock.sentinel.user_id
+ authority = policy_authority.PolicyAuthority(
+ test_tenant_id, test_user_id, service)
+
+ for rule in allowed_rules:
+ allowed = authority.allowed(rule, roles)
+ self.assertTrue(allowed)
+
+ for rule in disallowed_rules:
+ allowed = authority.allowed(rule, roles)
+ self.assertFalse(allowed)
+
+ return authority
+
@mock.patch.object(policy_authority, 'LOG', autospec=True)
def _test_custom_policy(self, *args):
default_roles = ['zero', 'one', 'two', 'three', 'four',
@@ -145,23 +165,14 @@
self.assertFalse(authority.allowed(rule, test_roles))
def test_empty_rbac_test_roles(self):
- test_tenant_id = mock.sentinel.tenant_id
- test_user_id = mock.sentinel.user_id
- authority = policy_authority.PolicyAuthority(
- test_tenant_id, test_user_id, "custom_rbac_policy")
-
- disallowed_for_empty_roles = ['policy_action_1', 'policy_action_2',
- 'policy_action_3', 'policy_action_4',
- 'policy_action_6']
-
- # Due to "policy_action_5": "rule:all_rule" / "all_rule": ""
- allowed_for_empty_roles = ['policy_action_5']
-
- for rule in disallowed_for_empty_roles:
- self.assertFalse(authority.allowed(rule, []))
-
- for rule in allowed_for_empty_roles:
- self.assertTrue(authority.allowed(rule, []))
+ self._test_policy_file(
+ roles=[],
+ allowed_rules=['policy_action_5'],
+ disallowed_rules=['policy_action_1', 'policy_action_2',
+ 'policy_action_3', 'policy_action_4',
+ 'policy_action_6'],
+ service="custom_rbac_policy"
+ )
def test_custom_policy_json(self):
# The CONF.patrole.custom_policy_files has a path to JSON file by
@@ -184,75 +195,47 @@
self._test_custom_multi_roles_policy()
def test_admin_policy_file_with_admin_role(self):
- test_tenant_id = mock.sentinel.tenant_id
- test_user_id = mock.sentinel.user_id
- authority = policy_authority.PolicyAuthority(
- test_tenant_id, test_user_id, "admin_rbac_policy")
-
- roles = ['admin']
- allowed_rules = [
- 'admin_rule', 'is_admin_rule', 'alt_admin_rule'
- ]
- disallowed_rules = ['non_admin_rule']
-
- for rule in allowed_rules:
- allowed = authority.allowed(rule, roles)
- self.assertTrue(allowed)
-
- for rule in disallowed_rules:
- allowed = authority.allowed(rule, roles)
- self.assertFalse(allowed)
+ # admin role implies member and reader roles
+ self._test_policy_file(
+ roles=['admin'],
+ allowed_rules=['admin_rule', 'is_admin_rule', 'alt_admin_rule',
+ 'member_rule', 'reader_rule'],
+ disallowed_rules=[],
+ service="admin_rbac_policy"
+ )
def test_admin_policy_file_with_member_role(self):
- test_tenant_id = mock.sentinel.tenant_id
- test_user_id = mock.sentinel.user_id
- authority = policy_authority.PolicyAuthority(
- test_tenant_id, test_user_id, "admin_rbac_policy")
+ # member role implies reader role
+ self._test_policy_file(
+ roles=['member'],
+ allowed_rules=['member_rule', 'reader_rule'],
+ disallowed_rules=['admin_rule', 'is_admin_rule', 'alt_admin_rule'],
+ service="admin_rbac_policy"
+ )
- roles = ['Member']
- allowed_rules = [
- 'non_admin_rule'
- ]
- disallowed_rules = [
- 'admin_rule', 'is_admin_rule', 'alt_admin_rule']
-
- for rule in allowed_rules:
- allowed = authority.allowed(rule, roles)
- self.assertTrue(allowed)
-
- for rule in disallowed_rules:
- allowed = authority.allowed(rule, roles)
- self.assertFalse(allowed)
+ def test_admin_policy_file_with_reader_role(self):
+ self._test_policy_file(
+ roles=['reader'],
+ allowed_rules=['reader_rule'],
+ disallowed_rules=['admin_rule', 'is_admin_rule', 'alt_admin_rule',
+ 'member_rule'],
+ service="admin_rbac_policy"
+ )
def test_alt_admin_policy_file_with_context_is_admin(self):
- test_tenant_id = mock.sentinel.tenant_id
- test_user_id = mock.sentinel.user_id
- authority = policy_authority.PolicyAuthority(
- test_tenant_id, test_user_id, "alt_admin_rbac_policy")
+ authority = self._test_policy_file(
+ roles=['fake_admin'],
+ allowed_rules=['non_admin_rule'],
+ disallowed_rules=['admin_rule'],
+ service="alt_admin_rbac_policy"
+ )
- roles = ['fake_admin']
- allowed_rules = ['non_admin_rule']
- disallowed_rules = ['admin_rule']
-
- for rule in allowed_rules:
- allowed = authority.allowed(rule, roles)
- self.assertTrue(allowed)
-
- for rule in disallowed_rules:
- allowed = authority.allowed(rule, roles)
- self.assertFalse(allowed)
-
- roles = ['super_admin']
- allowed_rules = ['admin_rule']
- disallowed_rules = ['non_admin_rule']
-
- for rule in allowed_rules:
- allowed = authority.allowed(rule, roles)
- self.assertTrue(allowed)
-
- for rule in disallowed_rules:
- allowed = authority.allowed(rule, roles)
- self.assertFalse(allowed)
+ self._test_policy_file(
+ roles=['super_admin'],
+ allowed_rules=['admin_rule'],
+ disallowed_rules=['non_admin_rule'],
+ authority=authority
+ )
def test_tenant_user_policy(self):
"""Test whether rules with format tenant_id/user_id formatting work.
@@ -261,26 +244,25 @@
network:tenant_id pass. And test whether Nova rules that contain
user_id pass.
"""
- test_tenant_id = mock.sentinel.tenant_id
- test_user_id = mock.sentinel.user_id
- authority = policy_authority.PolicyAuthority(
- test_tenant_id, test_user_id, "tenant_rbac_policy")
+ allowed_rules = ['rule1', 'rule2', 'rule3', 'rule4']
+ disallowed_rules = ['admin_tenant_rule', 'admin_user_rule']
# Check whether Member role can perform expected actions.
- allowed_rules = ['rule1', 'rule2', 'rule3', 'rule4']
- for rule in allowed_rules:
- allowed = authority.allowed(rule, ['Member'])
- self.assertTrue(allowed)
-
- disallowed_rules = ['admin_tenant_rule', 'admin_user_rule']
- for disallowed_rule in disallowed_rules:
- self.assertFalse(authority.allowed(disallowed_rule, ['Member']))
+ authority = self._test_policy_file(
+ roles=['member'],
+ allowed_rules=allowed_rules,
+ disallowed_rules=disallowed_rules,
+ service="tenant_rbac_policy",
+ )
# Check whether admin role can perform expected actions.
allowed_rules.extend(disallowed_rules)
- for rule in allowed_rules:
- allowed = authority.allowed(rule, ['admin'])
- self.assertTrue(allowed)
+ self._test_policy_file(
+ roles=['admin'],
+ allowed_rules=allowed_rules,
+ disallowed_rules=[],
+ authority=authority
+ )
# Check whether _try_rule is called with the correct target dictionary.
with mock.patch.object(
@@ -295,7 +277,7 @@
}
expected_access_data = {
- "roles": ['Member'],
+ "roles": sorted(['member', 'reader']),
"is_admin": False,
"is_admin_project": True,
"user_id": mock.sentinel.user_id,
@@ -304,8 +286,11 @@
}
for rule in allowed_rules:
- allowed = authority.allowed(rule, ['Member'])
+ allowed = authority.allowed(rule, ['member'])
self.assertTrue(allowed)
+ # for sure that roles are in same order
+ mock_try_rule.call_args[0][2]["roles"] = sorted(
+ mock_try_rule.call_args[0][2]["roles"])
mock_try_rule.assert_called_once_with(
rule, expected_target, expected_access_data, mock.ANY)
mock_try_rule.reset_mock()
diff --git a/patrole_tempest_plugin/tests/unit/test_rbac_utils.py b/patrole_tempest_plugin/tests/unit/test_rbac_utils.py
index bd13e34..9fe5ffa 100644
--- a/patrole_tempest_plugin/tests/unit/test_rbac_utils.py
+++ b/patrole_tempest_plugin/tests/unit/test_rbac_utils.py
@@ -200,6 +200,19 @@
mock_override_role.assert_called_once_with(_rbac_utils, test_obj,
False)
+ def test_override_role_and_validate_list(self):
+ self.patchobject(rbac_utils.RbacUtils, '_override_role')
+ test_obj = mock.MagicMock()
+ _rbac_utils = rbac_utils.RbacUtils(test_obj)
+ m_override_role = self.patchobject(_rbac_utils, 'override_role')
+
+ with (_rbac_utils.override_role_and_validate_list(
+ test_obj, 'foo')) as ctx:
+ self.assertIsInstance(ctx, rbac_utils._ValidateListContext)
+ m_validate = self.patchobject(ctx, '_validate')
+ m_override_role.assert_called_once_with(test_obj)
+ m_validate.assert_called_once()
+
class RBACUtilsMixinTest(base.TestCase):
@@ -233,3 +246,87 @@
self.assertTrue(hasattr(child_test, 'rbac_utils'))
self.assertIsInstance(child_test.rbac_utils, rbac_utils.RbacUtils)
+
+
+class ValidateListContextTest(base.TestCase):
+ @staticmethod
+ def _get_context(admin_resources=None, admin_resource_id=None):
+ return rbac_utils._ValidateListContext(
+ admin_resources=admin_resources,
+ admin_resource_id=admin_resource_id)
+
+ def test_incorrect_usage(self):
+ # admin_resources and admin_resource_is are not assigned
+ self.assertRaises(rbac_exceptions.RbacValidateListException,
+ self._get_context)
+
+ # both admin_resources and admin_resource_is are assigned
+ self.assertRaises(rbac_exceptions.RbacValidateListException,
+ self._get_context,
+ admin_resources='foo', admin_resource_id='bar')
+ # empty list assigned to admin_resources
+ self.assertRaises(rbac_exceptions.RbacValidateListException,
+ self._get_context, admin_resources=[])
+
+ # ctx.resources is not assigned
+ ctx = self._get_context(admin_resources='foo')
+ self.assertRaises(rbac_exceptions.RbacValidateListException,
+ ctx._validate)
+
+ def test_validate_len_negative(self):
+ ctx = self._get_context(admin_resources=[1, 2, 3, 4])
+ self.assertEqual(ctx._validate_len, ctx._validate_func)
+ self.assertEqual(4, ctx._admin_len)
+ self.assertFalse(hasattr(ctx, '_admin_resource_id'))
+
+ # the number of resources is less than admin resources
+ ctx.resources = [1, 2, 3]
+ self.assertRaises(rbac_exceptions.RbacPartialResponseBody,
+ ctx._validate_len)
+
+ # the resources is empty
+ ctx.resources = []
+ self.assertRaises(rbac_exceptions.RbacEmptyResponseBody,
+ ctx._validate_len)
+
+ def test_validate_len(self):
+ ctx = self._get_context(admin_resources=[1, 2, 3, 4])
+
+ # the number of resources and admin resources are same
+ ctx.resources = [1, 2, 3, 4]
+ self.assertIsNone(ctx._validate_len())
+
+ def test_validate_resource_negative(self):
+ ctx = self._get_context(admin_resource_id=1)
+ self.assertEqual(ctx._validate_resource, ctx._validate_func)
+ self.assertEqual(1, ctx._admin_resource_id)
+ self.assertFalse(hasattr(ctx, '_admin_len'))
+
+ # there is no admin resource in the resources
+ ctx.resources = [{'id': 2}, {'id': 3}]
+ self.assertRaises(rbac_exceptions.RbacPartialResponseBody,
+ ctx._validate_resource)
+
+ def test_validate_resource(self):
+ ctx = self._get_context(admin_resource_id=1)
+
+ # there is admin resource in the resources
+ ctx.resources = [{'id': 1}, {'id': 2}]
+ self.assertIsNone(ctx._validate_resource())
+
+ def test_validate(self):
+ ctx = self._get_context(admin_resources='foo')
+ ctx.resources = 'bar'
+ with mock.patch.object(ctx, '_validate_func',
+ autospec=False) as m_validate_func:
+ m_validate_func.side_effect = (
+ rbac_exceptions.RbacPartialResponseBody,
+ None
+ )
+ self.assertRaises(rbac_exceptions.RbacPartialResponseBody,
+ ctx._validate)
+ m_validate_func.assert_called_once()
+
+ m_validate_func.reset_mock()
+ ctx._validate()
+ m_validate_func.assert_called_once()
diff --git a/releasenotes/notes/override-role-and-validate-list-d3b80f773674a652.yaml b/releasenotes/notes/override-role-and-validate-list-d3b80f773674a652.yaml
new file mode 100644
index 0000000..de05b76
--- /dev/null
+++ b/releasenotes/notes/override-role-and-validate-list-d3b80f773674a652.yaml
@@ -0,0 +1,37 @@
+---
+features:
+ - |
+ In order to test the list actions which doesn't have its own policy,
+ implemented the ``override_role_and_validate_list`` function.
+ The function has two modes:
+
+ * Validating the number of the resources in a ``ResponseBody`` before
+ calling the ``override_role`` and after.
+
+ .. code-block:: python
+
+ # make sure at least one resource is available
+ self.ntp_client.create_policy_dscp_marking_rule()
+ # the list of resources available for a user with admin role
+ admin_resources = self.ntp_client.list_dscp_marking_rules(
+ policy_id=self.policy_id)["dscp_marking_rules"]
+ with self.rbac_utils.override_role_and_validate_list(
+ self, admin_resources=admin_resources) as ctx:
+ # the list of resources available for a user with member role
+ ctx.resources = self.ntp_client.list_dscp_marking_rules(
+ policy_id=self.policy_id)["dscp_marking_rules"]
+
+ * Validating that a resource, created before ``override_role``, is not
+ present in a ``ResponseBody``.
+
+ .. code-block:: python
+
+ # the resource created by a user with admin role
+ admin_resource_id = (
+ self.ntp_client.create_dscp_marking_rule()
+ ["dscp_marking_rule"]["id'])
+ with self.rbac_utils.override_role_and_validate_list(
+ self, admin_resource_id=admin_resource_id) as ctx:
+ # the list of resources available for a user wirh member role
+ ctx.resources = self.ntp_client.list_dscp_marking_rules(
+ policy_id=self.policy_id)["dscp_marking_rules"]
diff --git a/releasenotes/notes/support-deprecated-roles-eae9dc742cb4fa33.yaml b/releasenotes/notes/support-deprecated-roles-eae9dc742cb4fa33.yaml
new file mode 100644
index 0000000..0d581a0
--- /dev/null
+++ b/releasenotes/notes/support-deprecated-roles-eae9dc742cb4fa33.yaml
@@ -0,0 +1,7 @@
+---
+features:
+ - |
+ Patrole will validate the deprecated policy rules (if applicable) alongside
+ the current policy rule.
+ Add ``[patrole] validate_deprecated_rules`` enabled by default to validate
+ the deprecated rules.
diff --git a/setup.cfg b/setup.cfg
index 77a039a..12e2184 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -4,7 +4,7 @@
description-file =
README.rst
author = OpenStack
-author-email = openstack-dev@lists.openstack.org
+author-email = openstack-discuss@lists.openstack.org
home-page = https://docs.openstack.org/patrole/latest/
classifier =
Environment :: OpenStack