Expand Designate RBAC testing

This patch adds RBAC testing for allowed and disallowed credentials.

Change-Id: I0f7609b45bb21890a86144f74315f1d2f02a6e7d
diff --git a/designate_tempest_plugin/tests/api/v2/test_blacklists.py b/designate_tempest_plugin/tests/api/v2/test_blacklists.py
index 95688b3..6dea9ad 100644
--- a/designate_tempest_plugin/tests/api/v2/test_blacklists.py
+++ b/designate_tempest_plugin/tests/api/v2/test_blacklists.py
@@ -30,7 +30,6 @@
 
 class BlacklistsAdminTest(BaseBlacklistsTest):
 
-    credentials = ["admin", "system_admin", "primary"]
     @classmethod
     def setup_credentials(cls):
         # Do not create network resources for these test.
@@ -59,6 +58,13 @@
 
         self.assertExpected(blacklist, body, self.excluded_keys)
 
+        expected_allowed = ['os_admin']
+        if CONF.dns_feature_enabled.enforce_new_defaults:
+            expected_allowed = ['os_system_admin']
+
+        self.check_CUD_RBAC_enforcement('BlacklistsClient', 'create_blacklist',
+                                        expected_allowed)
+
     @decorators.idempotent_id('ea608152-da3c-11eb-b8b8-74e5f9e2a801')
     @decorators.skip_because(bug="1934252")
     def test_create_blacklist_invalid_pattern(self):
@@ -95,6 +101,14 @@
         LOG.info('Ensure the fetched response matches the created blacklist')
         self.assertExpected(blacklist, body, self.excluded_keys)
 
+        expected_allowed = ['os_admin']
+        if CONF.dns_feature_enabled.enforce_new_defaults:
+            expected_allowed = ['os_system_admin', 'os_system_reader']
+
+        self.check_list_show_RBAC_enforcement(
+            'BlacklistsClient', 'show_blacklist', expected_allowed,
+            blacklist['id'])
+
     @decorators.idempotent_id('dcea40d9-8d36-43cb-8440-4a842faaef0d')
     def test_delete_blacklist(self):
         LOG.info('Create a blacklist')
@@ -108,6 +122,13 @@
         # A blacklist delete returns an empty body
         self.assertEqual(body.strip(), b"")
 
+        expected_allowed = ['os_admin']
+        if CONF.dns_feature_enabled.enforce_new_defaults:
+            expected_allowed = ['os_system_admin']
+
+        self.check_CUD_RBAC_enforcement('BlacklistsClient', 'delete_blacklist',
+                                        expected_allowed, blacklist['id'])
+
     @decorators.idempotent_id('3a2a1e6c-8176-428c-b5dd-d85217c0209d')
     def test_list_blacklists(self):
         LOG.info('Create a blacklist')
@@ -120,6 +141,14 @@
         # TODO(pglass): Assert that the created blacklist is in the response
         self.assertGreater(len(body['blacklists']), 0)
 
+        expected_allowed = ['os_admin']
+        if CONF.dns_feature_enabled.enforce_new_defaults:
+            expected_allowed = ['os_system_admin', 'os_system_reader']
+
+        self.check_list_IDs_RBAC_enforcement(
+            'BlacklistsClient', 'list_blacklists',
+            expected_allowed, [blacklist['id']])
+
     @decorators.idempotent_id('0063d6ad-9557-49c7-b521-e64a14d4d0d0')
     def test_update_blacklist(self):
         LOG.info('Create a blacklist')
@@ -139,6 +168,14 @@
         self.assertEqual(pattern, body['pattern'])
         self.assertEqual(description, body['description'])
 
+        expected_allowed = ['os_admin']
+        if CONF.dns_feature_enabled.enforce_new_defaults:
+            expected_allowed = ['os_system_admin']
+
+        self.check_CUD_RBAC_enforcement(
+            'BlacklistsClient', 'update_blacklist', expected_allowed,
+            uuid=blacklist['id'], pattern=pattern, description=description)
+
 
 class TestBlacklistNotFoundAdmin(BaseBlacklistsTest):
 
diff --git a/designate_tempest_plugin/tests/base.py b/designate_tempest_plugin/tests/base.py
index e251ac8..9035c83 100644
--- a/designate_tempest_plugin/tests/base.py
+++ b/designate_tempest_plugin/tests/base.py
@@ -18,6 +18,7 @@
 
 from designate_tempest_plugin.services.dns.query.query_client import \
     QueryClient
+from designate_tempest_plugin.tests import rbac_utils
 
 
 CONF = config.CONF
@@ -55,7 +56,7 @@
         return False
 
 
-class BaseDnsTest(test.BaseTestCase):
+class BaseDnsTest(rbac_utils.RBACTestsMixin, test.BaseTestCase):
     """Base class for DNS tests."""
 
     # NOTE(andreaf) credentials holds a list of the credentials to be allocated
@@ -64,9 +65,22 @@
     # rest the actual roles.
     # NOTE(kiall) primary will result in a manager @ cls.os_primary, alt will
     # have cls.os_alt, and admin will have cls.os_admin.
-    # NOTE(kiall) We should default to only primary, and request additional
-    # credentials in the tests that require them.
-    credentials = ['primary']
+    # NOTE(johnsom) We will allocate most credentials here so that each test
+    # can test for allowed and disallowed RBAC policies.
+    credentials = ['admin', 'primary']
+    if CONF.dns_feature_enabled.enforce_new_defaults:
+        credentials.extend(['system_admin', 'system_reader', 'project_reader'])
+
+    # A tuple of credentials that will be allocated by tempest using the
+    # 'credentials' list above. These are used to build RBAC test lists.
+    allocated_creds = []
+    for cred in credentials:
+        if isinstance(cred, list):
+            allocated_creds.append('os_roles_' + cred[0])
+        else:
+            allocated_creds.append('os_' + cred)
+    # Tests shall not mess with the list of allocated credentials
+    allocated_credentials = tuple(allocated_creds)
 
     @classmethod
     def skip_checks(cls):
diff --git a/designate_tempest_plugin/tests/rbac_utils.py b/designate_tempest_plugin/tests/rbac_utils.py
new file mode 100644
index 0000000..e02fdf1
--- /dev/null
+++ b/designate_tempest_plugin/tests/rbac_utils.py
@@ -0,0 +1,304 @@
+# Copyright 2021 Red Hat, Inc. All rights reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import copy
+
+from oslo_log import log as logging
+from tempest import config
+from tempest.lib import exceptions
+from tempest import test
+
+CONF = config.CONF
+LOG = logging.getLogger(__name__)
+
+
+class RBACTestsMixin(test.BaseTestCase):
+
+    def _get_client_method(self, cred_obj, client_str, method_str):
+        """Get requested method from registered clients in Tempest."""
+        dns_clients = getattr(cred_obj, 'dns_v2')
+        client = getattr(dns_clients, client_str)
+        client_obj = client()
+        method = getattr(client_obj, method_str)
+        return method
+
+    def _check_allowed(self, client_str, method_str, allowed_list,
+                       *args, **kwargs):
+        """Test an API call allowed RBAC enforcement.
+
+        :param client_str: The service client to use for the test, without the
+                           credential.  Example: 'ZonesClient'
+        :param method_str: The method on the client to call for the test.
+                           Example: 'list_zones'
+        :param allowed_list: The list of credentials expected to be
+                             allowed.  Example: ['primary'].
+        :param args: Any positional parameters needed by the method.
+        :param kwargs: Any named parameters needed by the method.
+        :raises AssertionError: Raised if the RBAC tests fail.
+        :raises Forbidden: Raised if a credential that should have access does
+                           not and is denied.
+        :raises InvalidScope: Raised if a credential that should have the
+                              correct scope for access is denied.
+        :returns: None on success
+        """
+        for cred in allowed_list:
+            try:
+                cred_obj = getattr(self, cred)
+            except AttributeError:
+                # TODO(johnsom) Remove once scoped tokens is the default.
+                if ((cred == 'os_system_admin' or
+                     cred == 'os_system_reader') and
+                        not CONF.enforce_scope.designate):
+                    LOG.info('Skipping %s allowed RBAC test because '
+                             'enforce_scope.designate is not True', cred)
+                    continue
+                else:
+                    self.fail('Credential {} "expected_allowed" for RBAC '
+                              'testing was not created by tempest '
+                              'credentials setup. This is likely a bug in the '
+                              'test.'.format(cred))
+            method = self._get_client_method(cred_obj, client_str, method_str)
+            try:
+                method(*args, **kwargs)
+            except exceptions.Forbidden as e:
+                self.fail('Method {}.{} failed to allow access via RBAC using '
+                          'credential {}. Error: {}'.format(
+                              client_str, method_str, cred, str(e)))
+
+    def _check_disallowed(self, client_str, method_str, allowed_list,
+                          *args, **kwargs):
+        """Test an API call disallowed RBAC enforcement.
+
+        :param client_str: The service client to use for the test, without the
+                           credential.  Example: 'ZonesClient'
+        :param method_str: The method on the client to call for the test.
+                           Example: 'list_zones'
+        :param allowed_list: The list of credentials expected to be
+                             allowed.  Example: ['primary'].
+        :param args: Any positional parameters needed by the method.
+        :param kwargs: Any named parameters needed by the method.
+        :raises AssertionError: Raised if the RBAC tests fail.
+        :raises Forbidden: Raised if a credential that should have access does
+                           not and is denied.
+        :raises InvalidScope: Raised if a credential that should have the
+                              correct scope for access is denied.
+        :returns: None on success
+        """
+        expected_disallowed = (set(self.allocated_credentials) -
+                               set(allowed_list))
+        for cred in expected_disallowed:
+            cred_obj = getattr(self, cred)
+            method = self._get_client_method(cred_obj, client_str, method_str)
+
+            # Unfortunately tempest uses testtools assertRaises[1] which means
+            # we cannot use the unittest assertRaises context[2] with msg= to
+            # give a useful error.
+            # Also, testtools doesn't work with subTest[3], so we can't use
+            # that to expose the failing credential.
+            # This all means the exception raised testtools assertRaises
+            # is less than useful.
+            # TODO(johnsom) Remove this try block once testtools is useful.
+            # [1] https://testtools.readthedocs.io/en/latest/
+            #     api.html#testtools.TestCase.assertRaises
+            # [2] https://docs.python.org/3/library/
+            #     unittest.html#unittest.TestCase.assertRaises
+            # [3] https://github.com/testing-cabal/testtools/issues/235
+            try:
+                method(*args, **kwargs)
+            except exceptions.Forbidden:
+                continue
+            self.fail('Method {}.{} failed to deny access via RBAC using '
+                      'credential {}.'.format(client_str, method_str, cred))
+
+    def check_list_show_RBAC_enforcement(self, client_str, method_str,
+                                   expected_allowed, *args, **kwargs):
+        """Test list or show API call RBAC enforcement.
+
+        :param client_str: The service client to use for the test, without the
+                           credential.  Example: 'ZonesClient'
+        :param method_str: The method on the client to call for the test.
+                           Example: 'list_zones'
+        :param expected_allowed: The list of credentials expected to be
+                                 allowed.  Example: ['primary'].
+        :param args: Any positional parameters needed by the method.
+        :param kwargs: Any named parameters needed by the method.
+        :raises AssertionError: Raised if the RBAC tests fail.
+        :raises Forbidden: Raised if a credential that should have access does
+                           not and is denied.
+        :raises InvalidScope: Raised if a credential that should have the
+                              correct scope for access is denied.
+        :returns: None on success
+        """
+
+        allowed_list = copy.deepcopy(expected_allowed)
+
+        # #### Test that disallowed credentials cannot access the API.
+        self._check_disallowed(client_str, method_str, allowed_list,
+                               *args, **kwargs)
+
+        # #### Test that allowed credentials can access the API.
+        self._check_allowed(client_str, method_str, allowed_list,
+                            *args, **kwargs)
+
+    def check_CUD_RBAC_enforcement(self, client_str, method_str,
+                                   expected_allowed, *args, **kwargs):
+        """Test an API create/update/delete call RBAC enforcement.
+
+        :param client_str: The service client to use for the test, without the
+                           credential.  Example: 'ZonesClient'
+        :param method_str: The method on the client to call for the test.
+                           Example: 'list_zones'
+        :param expected_allowed: The list of credentials expected to be
+                                 allowed.  Example: ['primary'].
+        :param args: Any positional parameters needed by the method.
+        :param kwargs: Any named parameters needed by the method.
+        :raises AssertionError: Raised if the RBAC tests fail.
+        :raises Forbidden: Raised if a credential that should have access does
+                           not and is denied.
+        :raises InvalidScope: Raised if a credential that should have the
+                              correct scope for access is denied.
+        :returns: None on success
+        """
+
+        allowed_list = copy.deepcopy(expected_allowed)
+
+        # #### Test that disallowed credentials cannot access the API.
+        self._check_disallowed(client_str, method_str, allowed_list,
+                               *args, **kwargs)
+
+    def check_list_RBAC_enforcement_count(
+            self, client_str, method_str, expected_allowed, expected_count,
+            *args, **kwargs):
+        """Test an API list call RBAC enforcement result count.
+
+        List APIs will return the object list for the project associated
+        with the token used to access the API. This means most credentials
+        will have access, but will get differing results.
+
+        This test will query the list API using a list of credentials and
+        will validate that only the expected count of results are returned.
+
+        :param client_str: The service client to use for the test, without the
+                           credential.  Example: 'ZonesClient'
+        :param method_str: The method on the client to call for the test.
+                           Example: 'list_zones'
+        :param expected_allowed: The list of credentials expected to be
+                                 allowed.  Example: ['primary'].
+        :param expected_count: The number of results expected in the list
+                               returned from the API.
+        :param args: Any positional parameters needed by the method.
+        :param kwargs: Any named parameters needed by the method.
+        :raises AssertionError: Raised if the RBAC tests fail.
+        :raises Forbidden: Raised if a credential that should have access does
+                           not and is denied.
+        :raises InvalidScope: Raised if a credential that should have the
+                              correct scope for access is denied.
+        :returns: None on success
+        """
+
+        allowed_list = copy.deepcopy(expected_allowed)
+
+        for cred in allowed_list:
+            try:
+                cred_obj = getattr(self, cred)
+            except AttributeError:
+                # TODO(johnsom) Remove once scoped tokens is the default.
+                if ((cred == 'os_system_admin' or
+                     cred == 'os_system_reader') and
+                        not CONF.enforce_scope.designate):
+                    LOG.info('Skipping %s allowed RBAC test because '
+                             'enforce_scope.designate is not True', cred)
+                    continue
+                else:
+                    self.fail('Credential {} "expected_allowed" for RBAC '
+                              'testing was not created by tempest '
+                              'credentials setup. This is likely a bug in the '
+                              'test.'.format(cred))
+            method = self._get_client_method(cred_obj, client_str, method_str)
+            try:
+                # Get the result body
+                result = method(*args, **kwargs)[1]
+            except exceptions.Forbidden as e:
+                self.fail('Method {}.{} failed to allow access via RBAC using '
+                          'credential {}. Error: {}'.format(
+                              client_str, method_str, cred, str(e)))
+            # Remove the root element
+            result_objs = next(iter(result.values()))
+
+            self.assertEqual(expected_count, len(result_objs),
+                             message='Credential {} saw {} objects when {} '
+                             'was expected.'.format(cred, len(result),
+                                                    expected_count))
+
+    def check_list_IDs_RBAC_enforcement(
+            self, client_str, method_str, expected_allowed, expected_ids,
+            *args, **kwargs):
+        """Test an API list call RBAC enforcement result contains IDs.
+
+        List APIs will return the object list for the project associated
+        with the token used to access the API. This means most credentials
+        will have access, but will get differing results.
+
+        This test will query the list API using a list of credentials and
+        will validate that the expected object Ids in included in the results.
+
+        :param client_str: The service client to use for the test, without the
+                           credential.  Example: 'ZonesClient'
+        :param method_str: The method on the client to call for the test.
+                           Example: 'list_zones'
+        :param expected_allowed: The list of credentials expected to be
+                                 allowed.  Example: ['primary'].
+        :param expected_ids: The list of object IDs to validate are included
+                             in the returned list from the API.
+        :param args: Any positional parameters needed by the method.
+        :param kwargs: Any named parameters needed by the method.
+        :raises AssertionError: Raised if the RBAC tests fail.
+        :raises Forbidden: Raised if a credential that should have access does
+                           not and is denied.
+        :raises InvalidScope: Raised if a credential that should have the
+                              correct scope for access is denied.
+        :returns: None on success
+        """
+
+        allowed_list = copy.deepcopy(expected_allowed)
+
+        for cred in allowed_list:
+            try:
+                cred_obj = getattr(self, cred)
+            except AttributeError:
+                # TODO(johnsom) Remove once scoped tokens is the default.
+                if ((cred == 'os_system_admin' or
+                     cred == 'os_system_reader') and
+                        not CONF.enforce_scope.designate):
+                    LOG.info('Skipping %s allowed RBAC test because '
+                             'enforce_scope.designate is not True', cred)
+                    continue
+                else:
+                    self.fail('Credential {} "expected_allowed" for RBAC '
+                              'testing was not created by tempest '
+                              'credentials setup. This is likely a bug in the '
+                              'test.'.format(cred))
+            method = self._get_client_method(cred_obj, client_str, method_str)
+            try:
+                # Get the result body
+                result = method(*args, **kwargs)[1]
+            except exceptions.Forbidden as e:
+                self.fail('Method {}.{} failed to allow access via RBAC using '
+                          'credential {}. Error: {}'.format(
+                              client_str, method_str, cred, str(e)))
+            # Remove the root element
+            result_objs = next(iter(result.values()))
+
+            result_ids = [result_obj["id"] for result_obj in result_objs]
+            self.assertTrue(set(expected_ids).issubset(set(result_ids)))