Expand Designate RBAC testing
This patch adds RBAC testing for allowed and disallowed credentials.
Change-Id: I0f7609b45bb21890a86144f74315f1d2f02a6e7d
diff --git a/designate_tempest_plugin/tests/api/v2/test_blacklists.py b/designate_tempest_plugin/tests/api/v2/test_blacklists.py
index 95688b3..6dea9ad 100644
--- a/designate_tempest_plugin/tests/api/v2/test_blacklists.py
+++ b/designate_tempest_plugin/tests/api/v2/test_blacklists.py
@@ -30,7 +30,6 @@
class BlacklistsAdminTest(BaseBlacklistsTest):
- credentials = ["admin", "system_admin", "primary"]
@classmethod
def setup_credentials(cls):
# Do not create network resources for these test.
@@ -59,6 +58,13 @@
self.assertExpected(blacklist, body, self.excluded_keys)
+ expected_allowed = ['os_admin']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin']
+
+ self.check_CUD_RBAC_enforcement('BlacklistsClient', 'create_blacklist',
+ expected_allowed)
+
@decorators.idempotent_id('ea608152-da3c-11eb-b8b8-74e5f9e2a801')
@decorators.skip_because(bug="1934252")
def test_create_blacklist_invalid_pattern(self):
@@ -95,6 +101,14 @@
LOG.info('Ensure the fetched response matches the created blacklist')
self.assertExpected(blacklist, body, self.excluded_keys)
+ expected_allowed = ['os_admin']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin', 'os_system_reader']
+
+ self.check_list_show_RBAC_enforcement(
+ 'BlacklistsClient', 'show_blacklist', expected_allowed,
+ blacklist['id'])
+
@decorators.idempotent_id('dcea40d9-8d36-43cb-8440-4a842faaef0d')
def test_delete_blacklist(self):
LOG.info('Create a blacklist')
@@ -108,6 +122,13 @@
# A blacklist delete returns an empty body
self.assertEqual(body.strip(), b"")
+ expected_allowed = ['os_admin']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin']
+
+ self.check_CUD_RBAC_enforcement('BlacklistsClient', 'delete_blacklist',
+ expected_allowed, blacklist['id'])
+
@decorators.idempotent_id('3a2a1e6c-8176-428c-b5dd-d85217c0209d')
def test_list_blacklists(self):
LOG.info('Create a blacklist')
@@ -120,6 +141,14 @@
# TODO(pglass): Assert that the created blacklist is in the response
self.assertGreater(len(body['blacklists']), 0)
+ expected_allowed = ['os_admin']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin', 'os_system_reader']
+
+ self.check_list_IDs_RBAC_enforcement(
+ 'BlacklistsClient', 'list_blacklists',
+ expected_allowed, [blacklist['id']])
+
@decorators.idempotent_id('0063d6ad-9557-49c7-b521-e64a14d4d0d0')
def test_update_blacklist(self):
LOG.info('Create a blacklist')
@@ -139,6 +168,14 @@
self.assertEqual(pattern, body['pattern'])
self.assertEqual(description, body['description'])
+ expected_allowed = ['os_admin']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin']
+
+ self.check_CUD_RBAC_enforcement(
+ 'BlacklistsClient', 'update_blacklist', expected_allowed,
+ uuid=blacklist['id'], pattern=pattern, description=description)
+
class TestBlacklistNotFoundAdmin(BaseBlacklistsTest):
diff --git a/designate_tempest_plugin/tests/base.py b/designate_tempest_plugin/tests/base.py
index e251ac8..9035c83 100644
--- a/designate_tempest_plugin/tests/base.py
+++ b/designate_tempest_plugin/tests/base.py
@@ -18,6 +18,7 @@
from designate_tempest_plugin.services.dns.query.query_client import \
QueryClient
+from designate_tempest_plugin.tests import rbac_utils
CONF = config.CONF
@@ -55,7 +56,7 @@
return False
-class BaseDnsTest(test.BaseTestCase):
+class BaseDnsTest(rbac_utils.RBACTestsMixin, test.BaseTestCase):
"""Base class for DNS tests."""
# NOTE(andreaf) credentials holds a list of the credentials to be allocated
@@ -64,9 +65,22 @@
# rest the actual roles.
# NOTE(kiall) primary will result in a manager @ cls.os_primary, alt will
# have cls.os_alt, and admin will have cls.os_admin.
- # NOTE(kiall) We should default to only primary, and request additional
- # credentials in the tests that require them.
- credentials = ['primary']
+ # NOTE(johnsom) We will allocate most credentials here so that each test
+ # can test for allowed and disallowed RBAC policies.
+ credentials = ['admin', 'primary']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ credentials.extend(['system_admin', 'system_reader', 'project_reader'])
+
+ # A tuple of credentials that will be allocated by tempest using the
+ # 'credentials' list above. These are used to build RBAC test lists.
+ allocated_creds = []
+ for cred in credentials:
+ if isinstance(cred, list):
+ allocated_creds.append('os_roles_' + cred[0])
+ else:
+ allocated_creds.append('os_' + cred)
+ # Tests shall not mess with the list of allocated credentials
+ allocated_credentials = tuple(allocated_creds)
@classmethod
def skip_checks(cls):
diff --git a/designate_tempest_plugin/tests/rbac_utils.py b/designate_tempest_plugin/tests/rbac_utils.py
new file mode 100644
index 0000000..e02fdf1
--- /dev/null
+++ b/designate_tempest_plugin/tests/rbac_utils.py
@@ -0,0 +1,304 @@
+# Copyright 2021 Red Hat, Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import copy
+
+from oslo_log import log as logging
+from tempest import config
+from tempest.lib import exceptions
+from tempest import test
+
+CONF = config.CONF
+LOG = logging.getLogger(__name__)
+
+
+class RBACTestsMixin(test.BaseTestCase):
+
+ def _get_client_method(self, cred_obj, client_str, method_str):
+ """Get requested method from registered clients in Tempest."""
+ dns_clients = getattr(cred_obj, 'dns_v2')
+ client = getattr(dns_clients, client_str)
+ client_obj = client()
+ method = getattr(client_obj, method_str)
+ return method
+
+ def _check_allowed(self, client_str, method_str, allowed_list,
+ *args, **kwargs):
+ """Test an API call allowed RBAC enforcement.
+
+ :param client_str: The service client to use for the test, without the
+ credential. Example: 'ZonesClient'
+ :param method_str: The method on the client to call for the test.
+ Example: 'list_zones'
+ :param allowed_list: The list of credentials expected to be
+ allowed. Example: ['primary'].
+ :param args: Any positional parameters needed by the method.
+ :param kwargs: Any named parameters needed by the method.
+ :raises AssertionError: Raised if the RBAC tests fail.
+ :raises Forbidden: Raised if a credential that should have access does
+ not and is denied.
+ :raises InvalidScope: Raised if a credential that should have the
+ correct scope for access is denied.
+ :returns: None on success
+ """
+ for cred in allowed_list:
+ try:
+ cred_obj = getattr(self, cred)
+ except AttributeError:
+ # TODO(johnsom) Remove once scoped tokens is the default.
+ if ((cred == 'os_system_admin' or
+ cred == 'os_system_reader') and
+ not CONF.enforce_scope.designate):
+ LOG.info('Skipping %s allowed RBAC test because '
+ 'enforce_scope.designate is not True', cred)
+ continue
+ else:
+ self.fail('Credential {} "expected_allowed" for RBAC '
+ 'testing was not created by tempest '
+ 'credentials setup. This is likely a bug in the '
+ 'test.'.format(cred))
+ method = self._get_client_method(cred_obj, client_str, method_str)
+ try:
+ method(*args, **kwargs)
+ except exceptions.Forbidden as e:
+ self.fail('Method {}.{} failed to allow access via RBAC using '
+ 'credential {}. Error: {}'.format(
+ client_str, method_str, cred, str(e)))
+
+ def _check_disallowed(self, client_str, method_str, allowed_list,
+ *args, **kwargs):
+ """Test an API call disallowed RBAC enforcement.
+
+ :param client_str: The service client to use for the test, without the
+ credential. Example: 'ZonesClient'
+ :param method_str: The method on the client to call for the test.
+ Example: 'list_zones'
+ :param allowed_list: The list of credentials expected to be
+ allowed. Example: ['primary'].
+ :param args: Any positional parameters needed by the method.
+ :param kwargs: Any named parameters needed by the method.
+ :raises AssertionError: Raised if the RBAC tests fail.
+ :raises Forbidden: Raised if a credential that should have access does
+ not and is denied.
+ :raises InvalidScope: Raised if a credential that should have the
+ correct scope for access is denied.
+ :returns: None on success
+ """
+ expected_disallowed = (set(self.allocated_credentials) -
+ set(allowed_list))
+ for cred in expected_disallowed:
+ cred_obj = getattr(self, cred)
+ method = self._get_client_method(cred_obj, client_str, method_str)
+
+ # Unfortunately tempest uses testtools assertRaises[1] which means
+ # we cannot use the unittest assertRaises context[2] with msg= to
+ # give a useful error.
+ # Also, testtools doesn't work with subTest[3], so we can't use
+ # that to expose the failing credential.
+ # This all means the exception raised testtools assertRaises
+ # is less than useful.
+ # TODO(johnsom) Remove this try block once testtools is useful.
+ # [1] https://testtools.readthedocs.io/en/latest/
+ # api.html#testtools.TestCase.assertRaises
+ # [2] https://docs.python.org/3/library/
+ # unittest.html#unittest.TestCase.assertRaises
+ # [3] https://github.com/testing-cabal/testtools/issues/235
+ try:
+ method(*args, **kwargs)
+ except exceptions.Forbidden:
+ continue
+ self.fail('Method {}.{} failed to deny access via RBAC using '
+ 'credential {}.'.format(client_str, method_str, cred))
+
+ def check_list_show_RBAC_enforcement(self, client_str, method_str,
+ expected_allowed, *args, **kwargs):
+ """Test list or show API call RBAC enforcement.
+
+ :param client_str: The service client to use for the test, without the
+ credential. Example: 'ZonesClient'
+ :param method_str: The method on the client to call for the test.
+ Example: 'list_zones'
+ :param expected_allowed: The list of credentials expected to be
+ allowed. Example: ['primary'].
+ :param args: Any positional parameters needed by the method.
+ :param kwargs: Any named parameters needed by the method.
+ :raises AssertionError: Raised if the RBAC tests fail.
+ :raises Forbidden: Raised if a credential that should have access does
+ not and is denied.
+ :raises InvalidScope: Raised if a credential that should have the
+ correct scope for access is denied.
+ :returns: None on success
+ """
+
+ allowed_list = copy.deepcopy(expected_allowed)
+
+ # #### Test that disallowed credentials cannot access the API.
+ self._check_disallowed(client_str, method_str, allowed_list,
+ *args, **kwargs)
+
+ # #### Test that allowed credentials can access the API.
+ self._check_allowed(client_str, method_str, allowed_list,
+ *args, **kwargs)
+
+ def check_CUD_RBAC_enforcement(self, client_str, method_str,
+ expected_allowed, *args, **kwargs):
+ """Test an API create/update/delete call RBAC enforcement.
+
+ :param client_str: The service client to use for the test, without the
+ credential. Example: 'ZonesClient'
+ :param method_str: The method on the client to call for the test.
+ Example: 'list_zones'
+ :param expected_allowed: The list of credentials expected to be
+ allowed. Example: ['primary'].
+ :param args: Any positional parameters needed by the method.
+ :param kwargs: Any named parameters needed by the method.
+ :raises AssertionError: Raised if the RBAC tests fail.
+ :raises Forbidden: Raised if a credential that should have access does
+ not and is denied.
+ :raises InvalidScope: Raised if a credential that should have the
+ correct scope for access is denied.
+ :returns: None on success
+ """
+
+ allowed_list = copy.deepcopy(expected_allowed)
+
+ # #### Test that disallowed credentials cannot access the API.
+ self._check_disallowed(client_str, method_str, allowed_list,
+ *args, **kwargs)
+
+ def check_list_RBAC_enforcement_count(
+ self, client_str, method_str, expected_allowed, expected_count,
+ *args, **kwargs):
+ """Test an API list call RBAC enforcement result count.
+
+ List APIs will return the object list for the project associated
+ with the token used to access the API. This means most credentials
+ will have access, but will get differing results.
+
+ This test will query the list API using a list of credentials and
+ will validate that only the expected count of results are returned.
+
+ :param client_str: The service client to use for the test, without the
+ credential. Example: 'ZonesClient'
+ :param method_str: The method on the client to call for the test.
+ Example: 'list_zones'
+ :param expected_allowed: The list of credentials expected to be
+ allowed. Example: ['primary'].
+ :param expected_count: The number of results expected in the list
+ returned from the API.
+ :param args: Any positional parameters needed by the method.
+ :param kwargs: Any named parameters needed by the method.
+ :raises AssertionError: Raised if the RBAC tests fail.
+ :raises Forbidden: Raised if a credential that should have access does
+ not and is denied.
+ :raises InvalidScope: Raised if a credential that should have the
+ correct scope for access is denied.
+ :returns: None on success
+ """
+
+ allowed_list = copy.deepcopy(expected_allowed)
+
+ for cred in allowed_list:
+ try:
+ cred_obj = getattr(self, cred)
+ except AttributeError:
+ # TODO(johnsom) Remove once scoped tokens is the default.
+ if ((cred == 'os_system_admin' or
+ cred == 'os_system_reader') and
+ not CONF.enforce_scope.designate):
+ LOG.info('Skipping %s allowed RBAC test because '
+ 'enforce_scope.designate is not True', cred)
+ continue
+ else:
+ self.fail('Credential {} "expected_allowed" for RBAC '
+ 'testing was not created by tempest '
+ 'credentials setup. This is likely a bug in the '
+ 'test.'.format(cred))
+ method = self._get_client_method(cred_obj, client_str, method_str)
+ try:
+ # Get the result body
+ result = method(*args, **kwargs)[1]
+ except exceptions.Forbidden as e:
+ self.fail('Method {}.{} failed to allow access via RBAC using '
+ 'credential {}. Error: {}'.format(
+ client_str, method_str, cred, str(e)))
+ # Remove the root element
+ result_objs = next(iter(result.values()))
+
+ self.assertEqual(expected_count, len(result_objs),
+ message='Credential {} saw {} objects when {} '
+ 'was expected.'.format(cred, len(result),
+ expected_count))
+
+ def check_list_IDs_RBAC_enforcement(
+ self, client_str, method_str, expected_allowed, expected_ids,
+ *args, **kwargs):
+ """Test an API list call RBAC enforcement result contains IDs.
+
+ List APIs will return the object list for the project associated
+ with the token used to access the API. This means most credentials
+ will have access, but will get differing results.
+
+ This test will query the list API using a list of credentials and
+ will validate that the expected object Ids in included in the results.
+
+ :param client_str: The service client to use for the test, without the
+ credential. Example: 'ZonesClient'
+ :param method_str: The method on the client to call for the test.
+ Example: 'list_zones'
+ :param expected_allowed: The list of credentials expected to be
+ allowed. Example: ['primary'].
+ :param expected_ids: The list of object IDs to validate are included
+ in the returned list from the API.
+ :param args: Any positional parameters needed by the method.
+ :param kwargs: Any named parameters needed by the method.
+ :raises AssertionError: Raised if the RBAC tests fail.
+ :raises Forbidden: Raised if a credential that should have access does
+ not and is denied.
+ :raises InvalidScope: Raised if a credential that should have the
+ correct scope for access is denied.
+ :returns: None on success
+ """
+
+ allowed_list = copy.deepcopy(expected_allowed)
+
+ for cred in allowed_list:
+ try:
+ cred_obj = getattr(self, cred)
+ except AttributeError:
+ # TODO(johnsom) Remove once scoped tokens is the default.
+ if ((cred == 'os_system_admin' or
+ cred == 'os_system_reader') and
+ not CONF.enforce_scope.designate):
+ LOG.info('Skipping %s allowed RBAC test because '
+ 'enforce_scope.designate is not True', cred)
+ continue
+ else:
+ self.fail('Credential {} "expected_allowed" for RBAC '
+ 'testing was not created by tempest '
+ 'credentials setup. This is likely a bug in the '
+ 'test.'.format(cred))
+ method = self._get_client_method(cred_obj, client_str, method_str)
+ try:
+ # Get the result body
+ result = method(*args, **kwargs)[1]
+ except exceptions.Forbidden as e:
+ self.fail('Method {}.{} failed to allow access via RBAC using '
+ 'credential {}. Error: {}'.format(
+ client_str, method_str, cred, str(e)))
+ # Remove the root element
+ result_objs = next(iter(result.values()))
+
+ result_ids = [result_obj["id"] for result_obj in result_objs]
+ self.assertTrue(set(expected_ids).issubset(set(result_ids)))