Merge "Adds a pool re-encryption scenario test"
diff --git a/devstack/plugin.sh b/devstack/plugin.sh
index 5eac133..fb4e932 100644
--- a/devstack/plugin.sh
+++ b/devstack/plugin.sh
@@ -24,6 +24,15 @@
         ${DEST}/octavia-tempest-plugin/octavia_tempest_plugin/contrib/test_server/test_server.go
 }
 
+function _configure_tempest {
+    if [ -n "$Q_ROUTER_NAME" ]; then
+        iniset $TEMPEST_CONFIG load_balancer default_router "$Q_ROUTER_NAME"
+    fi
+    if [ -n "$SUBNETPOOL_NAME_V6" ]; then
+        iniset $TEMPEST_CONFIG load_balancer default_ipv6_subnetpool "$SUBNETPOOL_NAME_V6"
+    fi
+}
+
 if [[ "$1" == "stack" ]]; then
     case "$2" in
         install)
@@ -40,6 +49,7 @@
         test-config)
             echo_summary "Building backend test server"
             build_backend_test_server
+            _configure_tempest
             ;;
     esac
 fi
diff --git a/octavia_tempest_plugin/clients.py b/octavia_tempest_plugin/clients.py
deleted file mode 100644
index 7fe3606..0000000
--- a/octavia_tempest_plugin/clients.py
+++ /dev/null
@@ -1,90 +0,0 @@
-#   Copyright 2017 GoDaddy
-#
-#   Licensed under the Apache License, Version 2.0 (the "License"); you may
-#   not use this file except in compliance with the License. You may obtain
-#   a copy of the License at
-#
-#        http://www.apache.org/licenses/LICENSE-2.0
-#
-#   Unless required by applicable law or agreed to in writing, software
-#   distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-#   WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-#   License for the specific language governing permissions and limitations
-#   under the License.
-#
-from tempest import clients
-from tempest import config
-
-from octavia_tempest_plugin.services.load_balancer.v2 import (
-    amphora_client)
-from octavia_tempest_plugin.services.load_balancer.v2 import (
-    availability_zone_capabilities_client)
-from octavia_tempest_plugin.services.load_balancer.v2 import (
-    availability_zone_client)
-from octavia_tempest_plugin.services.load_balancer.v2 import (
-    availability_zone_profile_client)
-from octavia_tempest_plugin.services.load_balancer.v2 import (
-    flavor_capabilities_client)
-from octavia_tempest_plugin.services.load_balancer.v2 import (
-    flavor_client)
-from octavia_tempest_plugin.services.load_balancer.v2 import (
-    flavor_profile_client)
-from octavia_tempest_plugin.services.load_balancer.v2 import (
-    healthmonitor_client)
-from octavia_tempest_plugin.services.load_balancer.v2 import (
-    l7policy_client)
-from octavia_tempest_plugin.services.load_balancer.v2 import (
-    l7rule_client)
-from octavia_tempest_plugin.services.load_balancer.v2 import (
-    listener_client)
-from octavia_tempest_plugin.services.load_balancer.v2 import (
-    loadbalancer_client)
-from octavia_tempest_plugin.services.load_balancer.v2 import (
-    member_client)
-from octavia_tempest_plugin.services.load_balancer.v2 import (
-    pool_client)
-from octavia_tempest_plugin.services.load_balancer.v2 import (
-    provider_client)
-
-CONF = config.CONF
-
-
-class ManagerV2(clients.Manager):
-
-    def __init__(self, credentials):
-        super(ManagerV2, self).__init__(credentials)
-
-        params = dict(self.default_params)
-        params.update({
-            'auth_provider': self.auth_provider,
-            'service': CONF.load_balancer.catalog_type,
-            'region': CONF.load_balancer.region or CONF.identity.region,
-            'endpoint_type': CONF.load_balancer.endpoint_type,
-            'build_interval': CONF.load_balancer.build_interval,
-            'build_timeout': CONF.load_balancer.build_timeout
-        })
-
-        self.loadbalancer_client = loadbalancer_client.LoadbalancerClient(
-            **params)
-        self.listener_client = listener_client.ListenerClient(**params)
-        self.pool_client = pool_client.PoolClient(**params)
-        self.member_client = member_client.MemberClient(**params)
-        self.healthmonitor_client = healthmonitor_client.HealthMonitorClient(
-            **params)
-        self.l7policy_client = l7policy_client.L7PolicyClient(**params)
-        self.l7rule_client = l7rule_client.L7RuleClient(**params)
-        self.amphora_client = amphora_client.AmphoraClient(**params)
-        self.flavor_profile_client = flavor_profile_client.FlavorProfileClient(
-            **params)
-        self.flavor_client = flavor_client.FlavorClient(**params)
-        self.provider_client = provider_client.ProviderClient(**params)
-        self.flavor_capabilities_client = (
-            flavor_capabilities_client.FlavorCapabilitiesClient(**params))
-        self.availability_zone_capabilities_client = (
-            availability_zone_capabilities_client
-            .AvailabilityZoneCapabilitiesClient(**params))
-        self.availability_zone_profile_client = (
-            availability_zone_profile_client.AvailabilityZoneProfileClient(
-                **params))
-        self.availability_zone_client = (
-            availability_zone_client.AvailabilityZoneClient(**params))
diff --git a/octavia_tempest_plugin/common/constants.py b/octavia_tempest_plugin/common/constants.py
index d466082..174589c 100644
--- a/octavia_tempest_plugin/common/constants.py
+++ b/octavia_tempest_plugin/common/constants.py
@@ -186,6 +186,7 @@
 
 # RBAC options
 ADVANCED = 'advanced'
+KEYSTONE_DEFAULT_ROLES = 'keystone_default_roles'
 OWNERADMIN = 'owner_or_admin'
 NONE = 'none'
 
diff --git a/octavia_tempest_plugin/config.py b/octavia_tempest_plugin/config.py
index f44bf96..4d1543b 100644
--- a/octavia_tempest_plugin/config.py
+++ b/octavia_tempest_plugin/config.py
@@ -86,6 +86,12 @@
     cfg.StrOpt('admin_role',
                default='load-balancer_admin',
                help='The load balancing admin RBAC role.'),
+    cfg.StrOpt('observer_role',
+               default='load-balancer_observer',
+               help='The load balancing observer RBAC role.'),
+    cfg.StrOpt('global_observer_role',
+               default='load-balancer_global_observer',
+               help='The load balancing global observer RBAC role.'),
     cfg.IntOpt('scp_connection_timeout',
                default=5,
                help='Timeout in seconds to wait for a '
@@ -97,10 +103,13 @@
                default='octavia',
                help='The provider driver to use for the tests.'),
     cfg.StrOpt('RBAC_test_type', default=const.ADVANCED,
-               choices=[const.ADVANCED, const.OWNERADMIN, const.NONE],
+               choices=[const.ADVANCED, const.KEYSTONE_DEFAULT_ROLES,
+                        const.OWNERADMIN, const.NONE],
                help='Type of RBAC tests to run. "advanced" runs the octavia '
                     'default RBAC tests. "owner_or_admin" runs the legacy '
-                    'owner or admin tests. "none" disables the RBAC tests.'),
+                    'owner or admin tests. "keystone_default_roles" runs the '
+                    'tests using only the keystone default roles. "none" '
+                    'disables the RBAC tests.'),
     cfg.DictOpt('enabled_provider_drivers',
                 help=('A comma separated list of dictionaries of the '
                       'enabled provider driver names and descriptions. '
@@ -179,6 +188,13 @@
     cfg.StrOpt('member_2_ipv6_subnet_cidr',
                default='fd77:1457:4cf0:26a8::/64',
                help='CIDR format subnet to use for the member 1 ipv6 subnet.'),
+    cfg.StrOpt('default_router',
+               default='router1',
+               help='The default router connected to the public network.'),
+    cfg.StrOpt('default_ipv6_subnetpool',
+               default='shared-default-subnetpool-v6',
+               help='The default IPv6 subnetpool to use when creating the '
+                    'IPv6 VIP subnet.'),
     # Amphora specific options
     cfg.StrOpt('amphora_ssh_user',
                default='ubuntu',
@@ -217,6 +233,15 @@
                default='/opt/octavia-tempest-plugin/test_server.bin',
                help='Filesystem path to the test web server that will be '
                     'installed in the web server VMs.'),
+    # RBAC related options
+    # Note: Also see the enforce_scope section (from tempest) for Octavia API
+    #       scope checking setting.
+    cfg.BoolOpt('enforce_new_defaults',
+                default=False,
+                help='Does the load-balancer service API policies enforce '
+                     'the new keystone default roles? This configuration '
+                     'value should be same as octavia.conf: '
+                     '[oslo_policy].enforce_new_defaults option.'),
 ]
 
 lb_feature_enabled_group = cfg.OptGroup(name='loadbalancer-feature-enabled',
@@ -261,3 +286,15 @@
                      "the tempest instance have access to the log files "
                      "specified in the tempest configuration."),
 ]
+
+# Extending this enforce_scope group defined in tempest
+enforce_scope_group = cfg.OptGroup(name="enforce_scope",
+                                   title="OpenStack Services with "
+                                         "enforce scope")
+EnforceScopeGroup = [
+    cfg.BoolOpt('octavia',
+                default=False,
+                help='Does the load-balancer service API policies enforce '
+                     'scope? This configuration value should be same as '
+                     'octavia.conf: [oslo_policy].enforce_scope option.'),
+]
diff --git a/octavia_tempest_plugin/plugin.py b/octavia_tempest_plugin/plugin.py
index ec093e7..ec8e7c5 100644
--- a/octavia_tempest_plugin/plugin.py
+++ b/octavia_tempest_plugin/plugin.py
@@ -19,6 +19,7 @@
 from tempest.test_discover import plugins
 
 from octavia_tempest_plugin import config as project_config
+from octavia_tempest_plugin.services.load_balancer import v2 as lb_v2_services
 
 
 class OctaviaTempestPlugin(plugins.TempestPlugin):
@@ -38,6 +39,8 @@
         config.register_opt_group(conf,
                                   project_config.lb_feature_enabled_group,
                                   project_config.LBFeatureEnabledGroup)
+        config.register_opt_group(conf, project_config.enforce_scope_group,
+                                  project_config.EnforceScopeGroup)
 
     def get_opt_lists(self):
         return [
@@ -55,10 +58,10 @@
         )
 
         params = {
-            'name': 'load-balancer_v2',
+            'name': 'load_balancer_v2',
             'service_version': 'load-balancer.v2',
             'module_path': 'octavia_tempest_plugin.services.load_balancer.v2',
-            'client_names': ['LoadbalancerClient'],
+            'client_names': lb_v2_services.__all__,
         }
         params.update(octavia_config)
 
diff --git a/octavia_tempest_plugin/services/load_balancer/v2/__init__.py b/octavia_tempest_plugin/services/load_balancer/v2/__init__.py
index 04cb473..2067372 100644
--- a/octavia_tempest_plugin/services/load_balancer/v2/__init__.py
+++ b/octavia_tempest_plugin/services/load_balancer/v2/__init__.py
@@ -12,6 +12,35 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
+from .amphora_client import AmphoraClient
+from .availability_zone_capabilities_client import (
+    AvailabilityZoneCapabilitiesClient)
+from .availability_zone_client import AvailabilityZoneClient
+from .availability_zone_profile_client import AvailabilityZoneProfileClient
+from .flavor_capabilities_client import FlavorCapabilitiesClient
+from .flavor_client import FlavorClient
+from .flavor_profile_client import FlavorProfileClient
+from .healthmonitor_client import HealthMonitorClient
+from .l7policy_client import L7PolicyClient
+from .l7rule_client import L7RuleClient
+from .listener_client import ListenerClient
 from .loadbalancer_client import LoadbalancerClient
+from .member_client import MemberClient
+from .pool_client import PoolClient
+from .provider_client import ProviderClient
 
-__all__ = ['LoadbalancerClient']
+__all__ = ['LoadbalancerClient',
+           'ListenerClient',
+           'PoolClient',
+           'MemberClient',
+           'HealthMonitorClient',
+           'L7PolicyClient',
+           'L7RuleClient',
+           'FlavorClient',
+           'FlavorProfileClient',
+           'FlavorCapabilitiesClient',
+           'AmphoraClient',
+           'ProviderClient',
+           'AvailabilityZoneClient',
+           'AvailabilityZoneProfileClient',
+           'AvailabilityZoneCapabilitiesClient']
diff --git a/octavia_tempest_plugin/tests/RBAC_tests.py b/octavia_tempest_plugin/tests/RBAC_tests.py
new file mode 100644
index 0000000..d31d506
--- /dev/null
+++ b/octavia_tempest_plugin/tests/RBAC_tests.py
@@ -0,0 +1,476 @@
+# Copyright 2021 Red Hat, Inc. All rights reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import copy
+
+from oslo_log import log as logging
+from tempest import config
+from tempest.lib import exceptions
+from tempest import test
+
+from octavia_tempest_plugin.common import constants
+from octavia_tempest_plugin.tests import waiters
+
+CONF = config.CONF
+LOG = logging.getLogger(__name__)
+
+
+class RBACTestsMixin(test.BaseTestCase):
+
+    def _get_client_method(self, cred_obj, client_str, method_str):
+        """Get requested method from registered clients in Tempest."""
+        lb_clients = getattr(cred_obj, 'load_balancer_v2')
+        client = getattr(lb_clients, client_str)
+        client_obj = client()
+        method = getattr(client_obj, method_str)
+        return method
+
+    def _check_allowed(self, client_str, method_str, allowed_list,
+                       *args, **kwargs):
+        """Test an API call allowed RBAC enforcement.
+
+        :param client_str: The service client to use for the test, without the
+                           credential.  Example: 'AmphoraClient'
+        :param method_str: The method on the client to call for the test.
+                           Example: 'list_amphorae'
+        :param allowed_list: The list of credentials expected to be
+                             allowed.  Example: ['os_roles_lb_member'].
+        :param args: Any positional parameters needed by the method.
+        :param kwargs: Any named parameters needed by the method.
+        :raises AssertionError: Raised if the RBAC tests fail.
+        :raises Forbidden: Raised if a credential that should have access does
+                           not and is denied.
+        :raises InvalidScope: Raised if a credential that should have the
+                              correct scope for access is denied.
+        :returns: None on success
+        """
+        for cred in allowed_list:
+            try:
+                cred_obj = getattr(self, cred)
+            except AttributeError:
+                # TODO(johnsom) Remove once scoped tokens is the default.
+                if ((cred == 'os_system_admin' or cred == 'os_system_reader')
+                        and not CONF.enforce_scope.octavia):
+                    LOG.info('Skipping %s allowed RBAC test because '
+                             'enforce_scope.octavia is not True', cred)
+                    continue
+                else:
+                    self.fail('Credential {} "expected_allowed" for RBAC '
+                              'testing was not created by tempest '
+                              'credentials setup. This is likely a bug in the '
+                              'test.'.format(cred))
+            method = self._get_client_method(cred_obj, client_str, method_str)
+            try:
+                method(*args, **kwargs)
+            except exceptions.Forbidden as e:
+                self.fail('Method {}.{} failed to allow access via RBAC using '
+                          'credential {}. Error: {}'.format(
+                              client_str, method_str, cred, str(e)))
+
+    def _check_disallowed(self, client_str, method_str, allowed_list,
+                          status_method=None, obj_id=None, *args, **kwargs):
+        """Test an API call disallowed RBAC enforcement.
+
+        :param client_str: The service client to use for the test, without the
+                           credential.  Example: 'AmphoraClient'
+        :param method_str: The method on the client to call for the test.
+                           Example: 'list_amphorae'
+        :param allowed_list: The list of credentials expected to be
+                             allowed.  Example: ['os_roles_lb_member'].
+        :param status_method: The service client method that will provide
+                              the object status for a status change waiter.
+        :param obj_id: The ID of the object to check for the expected status
+                       update.
+        :param args: Any positional parameters needed by the method.
+        :param kwargs: Any named parameters needed by the method.
+        :raises AssertionError: Raised if the RBAC tests fail.
+        :raises Forbidden: Raised if a credential that should have access does
+                           not and is denied.
+        :raises InvalidScope: Raised if a credential that should have the
+                              correct scope for access is denied.
+        :returns: None on success
+        """
+        expected_disallowed = (set(self.allocated_credentials) -
+                               set(allowed_list))
+        for cred in expected_disallowed:
+            cred_obj = getattr(self, cred)
+            method = self._get_client_method(cred_obj, client_str, method_str)
+
+            # Unfortunately tempest uses testtools assertRaises[1] which means
+            # we cannot use the unittest assertRaises context[2] with msg= to
+            # give a useful error.
+            # Also, testtools doesn't work with subTest[3], so we can't use
+            # that to expose the failing credential.
+            # This all means the exception raised testtools assertRaises
+            # is less than useful.
+            # TODO(johnsom) Remove this try block once testtools is useful.
+            # [1] https://testtools.readthedocs.io/en/latest/
+            #     api.html#testtools.TestCase.assertRaises
+            # [2] https://docs.python.org/3/library/
+            #     unittest.html#unittest.TestCase.assertRaises
+            # [3] https://github.com/testing-cabal/testtools/issues/235
+            try:
+                method(*args, **kwargs)
+            except exceptions.Forbidden:
+                if status_method:
+                    waiters.wait_for_status(
+                        status_method, obj_id,
+                        constants.PROVISIONING_STATUS, constants.ACTIVE,
+                        CONF.load_balancer.check_interval,
+                        CONF.load_balancer.check_timeout)
+
+                continue
+            self.fail('Method {}.{} failed to deny access via RBAC using '
+                      'credential {}.'.format(client_str, method_str, cred))
+
+    def _list_get_RBAC_enforcement(self, client_str, method_str,
+                                   expected_allowed, *args, **kwargs):
+        """Test an API call RBAC enforcement.
+
+        :param client_str: The service client to use for the test, without the
+                           credential.  Example: 'AmphoraClient'
+        :param method_str: The method on the client to call for the test.
+                           Example: 'list_amphorae'
+        :param expected_allowed: The list of credentials expected to be
+                                 allowed.  Example: ['os_roles_lb_member'].
+        :param args: Any positional parameters needed by the method.
+        :param kwargs: Any named parameters needed by the method.
+        :raises AssertionError: Raised if the RBAC tests fail.
+        :raises Forbidden: Raised if a credential that should have access does
+                           not and is denied.
+        :raises InvalidScope: Raised if a credential that should have the
+                              correct scope for access is denied.
+        :returns: None on success
+        """
+
+        allowed_list = copy.deepcopy(expected_allowed)
+        # os_admin is a special case as it is valid with the old defaults,
+        # but will not be with the new defaults and/or token scoping.
+        # The old keystone role "admin" becomes project scoped "admin"
+        # instead of being a global admin.
+        # To keep the tests simple, handle that edge case here.
+        # TODO(johnsom) Once token scope is default, remove this.
+        if ('os_system_admin' in expected_allowed and
+                not CONF.load_balancer.enforce_new_defaults and
+                not CONF.enforce_scope.octavia):
+            allowed_list.append('os_admin')
+
+        # #### Test that disallowed credentials cannot access the API.
+        self._check_disallowed(client_str, method_str, allowed_list,
+                               None, None, *args, **kwargs)
+
+        # #### Test that allowed credentials can access the API.
+        self._check_allowed(client_str, method_str, allowed_list,
+                            *args, **kwargs)
+
+    def check_show_RBAC_enforcement(self, client_str, method_str,
+                                    expected_allowed, *args, **kwargs):
+        """Test an API show call RBAC enforcement.
+
+        :param client_str: The service client to use for the test, without the
+                           credential.  Example: 'AmphoraClient'
+        :param method_str: The method on the client to call for the test.
+                           Example: 'list_amphorae'
+        :param expected_allowed: The list of credentials expected to be
+                                 allowed.  Example: ['os_roles_lb_member'].
+        :param args: Any positional parameters needed by the method.
+        :param kwargs: Any named parameters needed by the method.
+        :raises AssertionError: Raised if the RBAC tests fail.
+        :raises Forbidden: Raised if a credential that should have access does
+                           not and is denied.
+        :raises InvalidScope: Raised if a credential that should have the
+                              correct scope for access is denied.
+        :returns: None on success
+        """
+        self._list_get_RBAC_enforcement(client_str, method_str,
+                                        expected_allowed, *args, **kwargs)
+
+    def check_list_RBAC_enforcement(self, client_str, method_str,
+                                    expected_allowed, *args, **kwargs):
+        """Test an API list call RBAC enforcement.
+
+        :param client_str: The service client to use for the test, without the
+                           credential.  Example: 'AmphoraClient'
+        :param method_str: The method on the client to call for the test.
+                           Example: 'list_amphorae'
+        :param expected_allowed: The list of credentials expected to be
+                                 allowed.  Example: ['os_roles_lb_member'].
+        :param args: Any positional parameters needed by the method.
+        :param kwargs: Any named parameters needed by the method.
+        :raises AssertionError: Raised if the RBAC tests fail.
+        :raises Forbidden: Raised if a credential that should have access does
+                           not and is denied.
+        :raises InvalidScope: Raised if a credential that should have the
+                              correct scope for access is denied.
+        :returns: None on success
+        """
+        self._list_get_RBAC_enforcement(client_str, method_str,
+                                        expected_allowed, *args, **kwargs)
+
+    def _CUD_RBAC_enforcement(self, client_str, method_str, expected_allowed,
+                              status_method=None, obj_id=None,
+                              *args, **kwargs):
+        """Test an API create/update/delete call RBAC enforcement.
+
+        :param client_str: The service client to use for the test, without the
+                           credential.  Example: 'AmphoraClient'
+        :param method_str: The method on the client to call for the test.
+                           Example: 'list_amphorae'
+        :param expected_allowed: The list of credentials expected to be
+                                 allowed.  Example: ['os_roles_lb_member'].
+        :param status_method: The service client method that will provide
+                              the object status for a status change waiter.
+        :param obj_id: The ID of the object to check for the expected status
+                       update.
+        :param args: Any positional parameters needed by the method.
+        :param kwargs: Any named parameters needed by the method.
+        :raises AssertionError: Raised if the RBAC tests fail.
+        :raises Forbidden: Raised if a credential that should have access does
+                           not and is denied.
+        :raises InvalidScope: Raised if a credential that should have the
+                              correct scope for access is denied.
+        :returns: None on success
+        """
+
+        allowed_list = copy.deepcopy(expected_allowed)
+        # os_admin is a special case as it is valid with the old defaults,
+        # but will not be with the new defaults and/or token scoping.
+        # The old keystone role "admin" becomes project scoped "admin"
+        # instead of being a global admin.
+        # To keep the tests simple, handle that edge case here.
+        # TODO(johnsom) Once token scope is default, remove this.
+        if ('os_system_admin' in expected_allowed and
+                not CONF.load_balancer.enforce_new_defaults and
+                not CONF.enforce_scope.octavia):
+            allowed_list.append('os_admin')
+
+        # #### Test that disallowed credentials cannot access the API.
+        self._check_disallowed(client_str, method_str, allowed_list,
+                               status_method, obj_id, *args, **kwargs)
+
+    def check_create_RBAC_enforcement(
+            self, client_str, method_str, expected_allowed,
+            status_method=None, obj_id=None, *args, **kwargs):
+        """Test an API create call RBAC enforcement.
+
+        :param client_str: The service client to use for the test, without the
+                           credential.  Example: 'AmphoraClient'
+        :param method_str: The method on the client to call for the test.
+                           Example: 'list_amphorae'
+        :param expected_allowed: The list of credentials expected to be
+                                 allowed.  Example: ['os_roles_lb_member'].
+        :param status_method: The service client method that will provide
+                              the object status for a status change waiter.
+        :param obj_id: The ID of the object to check for the expected status
+                       update.
+        :param args: Any positional parameters needed by the method.
+        :param kwargs: Any named parameters needed by the method.
+        :raises AssertionError: Raised if the RBAC tests fail.
+        :raises Forbidden: Raised if a credential that should have access does
+                           not and is denied.
+        :raises InvalidScope: Raised if a credential that should have the
+                              correct scope for access is denied.
+        :returns: None on success
+        """
+        self._CUD_RBAC_enforcement(client_str, method_str, expected_allowed,
+                                   status_method, obj_id, *args, **kwargs)
+
+    def check_delete_RBAC_enforcement(
+            self, client_str, method_str, expected_allowed,
+            status_method=None, obj_id=None, *args, **kwargs):
+        """Test an API delete call RBAC enforcement.
+
+        :param client_str: The service client to use for the test, without the
+                           credential.  Example: 'AmphoraClient'
+        :param method_str: The method on the client to call for the test.
+                           Example: 'list_amphorae'
+        :param expected_allowed: The list of credentials expected to be
+                                 allowed.  Example: ['os_roles_lb_member'].
+        :param status_method: The service client method that will provide
+                              the object status for a status change waiter.
+        :param obj_id: The ID of the object to check for the expected status
+                       update.
+        :param args: Any positional parameters needed by the method.
+        :param kwargs: Any named parameters needed by the method.
+        :raises AssertionError: Raised if the RBAC tests fail.
+        :raises Forbidden: Raised if a credential that should have access does
+                           not and is denied.
+        :raises InvalidScope: Raised if a credential that should have the
+                              correct scope for access is denied.
+        :returns: None on success
+        """
+        self._CUD_RBAC_enforcement(client_str, method_str, expected_allowed,
+                                   status_method, obj_id, *args, **kwargs)
+
+    def check_update_RBAC_enforcement(
+            self, client_str, method_str, expected_allowed,
+            status_method=None, obj_id=None, *args, **kwargs):
+        """Test an API update call RBAC enforcement.
+
+        :param client_str: The service client to use for the test, without the
+                           credential.  Example: 'AmphoraClient'
+        :param method_str: The method on the client to call for the test.
+                           Example: 'list_amphorae'
+        :param expected_allowed: The list of credentials expected to be
+                                 allowed.  Example: ['os_roles_lb_member'].
+        :param status_method: The service client method that will provide
+                              the object status for a status change waiter.
+        :param obj_id: The ID of the object to check for the expected status
+                       update.
+        :param args: Any positional parameters needed by the method.
+        :param kwargs: Any named parameters needed by the method.
+        :raises AssertionError: Raised if the RBAC tests fail.
+        :raises Forbidden: Raised if a credential that should have access does
+                           not and is denied.
+        :raises InvalidScope: Raised if a credential that should have the
+                              correct scope for access is denied.
+        :returns: None on success
+        """
+        self._CUD_RBAC_enforcement(client_str, method_str, expected_allowed,
+                                   status_method, obj_id, *args, **kwargs)
+
+    def check_list_RBAC_enforcement_count(
+            self, client_str, method_str, expected_allowed, expected_count,
+            *args, **kwargs):
+        """Test an API list call RBAC enforcement result count.
+
+        List APIs will return the object list for the project associated
+        with the token used to access the API. This means most credentials
+        will have access, but will get differing results.
+
+        This test will query the list API using a list of credentials and
+        will validate that only the expected count of results are returned.
+
+        :param client_str: The service client to use for the test, without the
+                           credential.  Example: 'AmphoraClient'
+        :param method_str: The method on the client to call for the test.
+                           Example: 'list_amphorae'
+        :param expected_allowed: The list of credentials expected to be
+                                 allowed.  Example: ['os_roles_lb_member'].
+        :param expected_count: The number of results expected in the list
+                               returned from the API.
+        :param args: Any positional parameters needed by the method.
+        :param kwargs: Any named parameters needed by the method.
+        :raises AssertionError: Raised if the RBAC tests fail.
+        :raises Forbidden: Raised if a credential that should have access does
+                           not and is denied.
+        :raises InvalidScope: Raised if a credential that should have the
+                              correct scope for access is denied.
+        :returns: None on success
+        """
+
+        allowed_list = copy.deepcopy(expected_allowed)
+        # os_admin is a special case as it is valid with the old defaults,
+        # but will not be with the new defaults and/or token scoping.
+        # The old keystone role "admin" becomes project scoped "admin"
+        # instead of being a global admin.
+        # To keep the tests simple, handle that edge case here.
+        # TODO(johnsom) Once token scope is default, remove this.
+        if ('os_system_admin' in expected_allowed and
+                not CONF.load_balancer.enforce_new_defaults and
+                not CONF.enforce_scope.octavia):
+            allowed_list.append('os_admin')
+
+        for cred in allowed_list:
+            try:
+                cred_obj = getattr(self, cred)
+            except AttributeError:
+                # TODO(johnsom) Remove once scoped tokens is the default.
+                if ((cred == 'os_system_admin' or cred == 'os_system_reader')
+                        and not CONF.enforce_scope.octavia):
+                    LOG.info('Skipping %s allowed RBAC test because '
+                             'enforce_scope.octavia is not True', cred)
+                    continue
+                else:
+                    self.fail('Credential {} "expected_allowed" for RBAC '
+                              'testing was not created by tempest '
+                              'credentials setup. This is likely a bug in the '
+                              'test.'.format(cred))
+            method = self._get_client_method(cred_obj, client_str, method_str)
+            try:
+                result = method(*args, **kwargs)
+            except exceptions.Forbidden as e:
+                self.fail('Method {}.{} failed to allow access via RBAC using '
+                          'credential {}. Error: {}'.format(
+                              client_str, method_str, cred, str(e)))
+            self.assertEqual(expected_count, len(result), message='Credential '
+                             '{} saw {} objects when {} was expected.'.format(
+                                 cred, len(result), expected_count))
+
+    def check_list_IDs_RBAC_enforcement(
+            self, client_str, method_str, expected_allowed, expected_ids,
+            *args, **kwargs):
+        """Test an API list call RBAC enforcement result contains IDs.
+
+        List APIs will return the object list for the project associated
+        with the token used to access the API. This means most credentials
+        will have access, but will get differing results.
+
+        This test will query the list API using a list of credentials and
+        will validate that the expected object Ids in included in the results.
+
+        :param client_str: The service client to use for the test, without the
+                           credential.  Example: 'AmphoraClient'
+        :param method_str: The method on the client to call for the test.
+                           Example: 'list_amphorae'
+        :param expected_allowed: The list of credentials expected to be
+                                 allowed.  Example: ['os_roles_lb_member'].
+        :param expected_ids: The list of object IDs to validate are included
+                             in the returned list from the API.
+        :param args: Any positional parameters needed by the method.
+        :param kwargs: Any named parameters needed by the method.
+        :raises AssertionError: Raised if the RBAC tests fail.
+        :raises Forbidden: Raised if a credential that should have access does
+                           not and is denied.
+        :raises InvalidScope: Raised if a credential that should have the
+                              correct scope for access is denied.
+        :returns: None on success
+        """
+
+        allowed_list = copy.deepcopy(expected_allowed)
+        # os_admin is a special case as it is valid with the old defaults,
+        # but will not be with the new defaults and/or token scoping.
+        # The old keystone role "admin" becomes project scoped "admin"
+        # instead of being a global admin.
+        # To keep the tests simple, handle that edge case here.
+        # TODO(johnsom) Once token scope is default, remove this.
+        if ('os_system_admin' in expected_allowed and
+                not CONF.load_balancer.enforce_new_defaults and
+                not CONF.enforce_scope.octavia):
+            allowed_list.append('os_admin')
+
+        for cred in allowed_list:
+            try:
+                cred_obj = getattr(self, cred)
+            except AttributeError:
+                # TODO(johnsom) Remove once scoped tokens is the default.
+                if ((cred == 'os_system_admin' or cred == 'os_system_reader')
+                        and not CONF.enforce_scope.octavia):
+                    LOG.info('Skipping %s allowed RBAC test because '
+                             'enforce_scope.octavia is not True', cred)
+                    continue
+                else:
+                    self.fail('Credential {} "expected_allowed" for RBAC '
+                              'testing was not created by tempest '
+                              'credentials setup. This is likely a bug in the '
+                              'test.'.format(cred))
+            method = self._get_client_method(cred_obj, client_str, method_str)
+            try:
+                result = method(*args, **kwargs)
+            except exceptions.Forbidden as e:
+                self.fail('Method {}.{} failed to allow access via RBAC using '
+                          'credential {}. Error: {}'.format(
+                              client_str, method_str, cred, str(e)))
+            result_ids = [lb[constants.ID] for lb in result]
+            self.assertTrue(set(expected_ids).issubset(set(result_ids)))
diff --git a/octavia_tempest_plugin/tests/act_stdby_scenario/v2/test_active_standby.py b/octavia_tempest_plugin/tests/act_stdby_scenario/v2/test_active_standby.py
index 5655b3f..11b0cec 100644
--- a/octavia_tempest_plugin/tests/act_stdby_scenario/v2/test_active_standby.py
+++ b/octavia_tempest_plugin/tests/act_stdby_scenario/v2/test_active_standby.py
@@ -183,6 +183,7 @@
         * Sends traffic through the load balancer
         * Validates that the Backup has assumed the Master role
         """
+        amphora_client = self.os_admin.load_balancer_v2.AmphoraClient()
         # We have to do this here as the api_version and clients are not
         # setup in time to use a decorator or the skip_checks mixin
         if not self.mem_listener_client.is_version_supported(
@@ -197,7 +198,7 @@
         self.check_members_balanced(self.lb_vip_address)
 
         # Get the amphorae associated with this load balancer
-        amphorae = self.os_admin.amphora_client.list_amphorae(
+        amphorae = amphora_client.list_amphorae(
             query_params='{loadbalancer_id}={lb_id}'.format(
                 loadbalancer_id=const.LOADBALANCER_ID,
                 lb_id=self.lb_id))
@@ -216,7 +217,7 @@
         start = int(time.time())
         while True:
             for amp in amphorae:
-                amphora_stats = self.os_admin.amphora_client.get_amphora_stats(
+                amphora_stats = amphora_client.get_amphora_stats(
                     amp[const.ID])
                 for listener in amphora_stats:
                     if listener[const.TOTAL_CONNECTIONS] > 0:
@@ -243,7 +244,7 @@
             else:
                 backup_amp = amp
         self.assertIsNotNone(backup_amp)
-        amphora_stats = self.os_admin.amphora_client.get_amphora_stats(
+        amphora_stats = amphora_client.get_amphora_stats(
             backup_amp[const.ID])
         for listener in amphora_stats:
             self.assertEqual(0, listener[const.TOTAL_CONNECTIONS])
@@ -265,7 +266,7 @@
             time.sleep(1)
 
         # Check that the Backup amphora is now Master
-        amphora_stats = self.os_admin.amphora_client.get_amphora_stats(
+        amphora_stats = amphora_client.get_amphora_stats(
             backup_amp[const.ID])
         connections = 0
         for listener in amphora_stats:
diff --git a/octavia_tempest_plugin/tests/api/v2/test_amphora.py b/octavia_tempest_plugin/tests/api/v2/test_amphora.py
index 146096d..180e4f3 100644
--- a/octavia_tempest_plugin/tests/api/v2/test_amphora.py
+++ b/octavia_tempest_plugin/tests/api/v2/test_amphora.py
@@ -20,7 +20,6 @@
 from tempest import config
 from tempest.lib.common.utils import data_utils
 from tempest.lib import decorators
-from tempest.lib import exceptions
 
 from octavia_tempest_plugin.common import constants as const
 from octavia_tempest_plugin.tests import test_base
@@ -90,12 +89,17 @@
                                 CONF.load_balancer.lb_build_interval,
                                 CONF.load_balancer.lb_build_timeout)
 
-        # Test that a user, without the load balancer member role, cannot
-        # list amphorae
+        # Test RBAC for list amphorae
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_roles_lb_admin']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            self.assertRaises(
-                exceptions.Forbidden,
-                self.os_primary.amphora_client.list_amphorae)
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+        if expected_allowed:
+            self.check_list_RBAC_enforcement(
+                'AmphoraClient', 'list_amphorae', expected_allowed)
 
         # Get an actual list of the amphorae
         amphorae = self.lb_admin_amphora_client.list_amphorae()
@@ -112,13 +116,11 @@
         self.assertEqual(self._expected_amp_count(amphorae), len(amphorae))
         self.assertEqual(self.lb_id, amphorae[0][const.LOADBALANCER_ID])
 
-        # Test that a different user, with load balancer member role, cannot
-        # see this amphora
-        if not CONF.load_balancer.RBAC_test_type == const.NONE:
-            member2_client = self.os_roles_lb_member2.amphora_client
-            self.assertRaises(exceptions.Forbidden,
-                              member2_client.show_amphora,
-                              amphora_id=amphorae[0][const.ID])
+        # Test RBAC for show amphora
+        if expected_allowed:
+            self.check_show_RBAC_enforcement(
+                'AmphoraClient', 'show_amphora', expected_allowed,
+                amphora_id=amphorae[0][const.ID])
 
         show_amphora_response_fields = const.SHOW_AMPHORA_RESPONSE_FIELDS
         if self.lb_admin_amphora_client.is_version_supported(
@@ -175,13 +177,18 @@
                 loadbalancer_id=const.LOADBALANCER_ID, lb_id=self.lb_id))
         amphora_1 = amphorae[0]
 
-        # Test that a user without the load balancer admin role cannot
-        # create a flavor
+        # Test RBAC for update an amphora
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_roles_lb_admin']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            self.assertRaises(
-                exceptions.Forbidden,
-                self.os_primary.amphora_client.update_amphora_config,
-                amphora_1[const.ID])
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+        if expected_allowed:
+            self.check_update_RBAC_enforcement(
+                'AmphoraClient', 'update_amphora_config', expected_allowed,
+                None, None, amphora_1[const.ID])
 
         self.lb_admin_amphora_client.update_amphora_config(amphora_1[const.ID])
 
@@ -205,15 +212,18 @@
                 loadbalancer_id=const.LOADBALANCER_ID, lb_id=self.lb_id))
         amphora_1 = amphorae[0]
 
-        # Test RBAC not authorized for non-admin role
-        if not CONF.load_balancer.RBAC_test_type == const.NONE:
-            self.assertRaises(exceptions.Forbidden,
-                              self.os_primary.amphora_client.amphora_failover,
-                              amphora_1[const.ID])
-            self.assertRaises(
-                exceptions.Forbidden,
-                self.os_roles_lb_member.amphora_client.amphora_failover,
-                amphora_1[const.ID])
+        # Test RBAC for failover an amphora
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_roles_lb_admin']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+        if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+        if expected_allowed:
+            self.check_update_RBAC_enforcement(
+                'AmphoraClient', 'amphora_failover', expected_allowed,
+                None, None, amphora_1[const.ID])
 
         self.lb_admin_amphora_client.amphora_failover(amphora_1[const.ID])
 
diff --git a/octavia_tempest_plugin/tests/api/v2/test_availability_zone.py b/octavia_tempest_plugin/tests/api/v2/test_availability_zone.py
index 29426c2..fa7b6a4 100644
--- a/octavia_tempest_plugin/tests/api/v2/test_availability_zone.py
+++ b/octavia_tempest_plugin/tests/api/v2/test_availability_zone.py
@@ -104,12 +104,18 @@
                 self.availability_zone_profile_id}
 
         # Test that a user without the load balancer admin role cannot
-        # create an availability zone
+        # create an availability zone.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_roles_lb_admin']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            self.assertRaises(exceptions.Forbidden,
-                              self.os_primary.availability_zone_client
-                              .create_availability_zone,
-                              **availability_zone_kwargs)
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+        if expected_allowed:
+            self.check_create_RBAC_enforcement(
+                'AvailabilityZoneClient', 'create_availability_zone',
+                expected_allowed, **availability_zone_kwargs)
 
         # Happy path
         availability_zone = (
@@ -220,11 +226,25 @@
 
         # Test that a user without the load balancer role cannot
         # list availability zones.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = [
+                'os_admin', 'os_primary', 'os_roles_lb_admin',
+                'os_roles_lb_member', 'os_roles_lb_member2']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_admin', 'os_primary', 'os_system_admin',
+                                'os_system_reader', 'os_roles_lb_observer',
+                                'os_roles_lb_global_observer',
+                                'os_roles_lb_member', 'os_roles_lb_member2']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            self.assertRaises(
-                exceptions.Forbidden,
-                self.os_primary.availability_zone_client
-                    .list_availability_zones)
+            expected_allowed = [
+                'os_system_admin', 'os_system_reader', 'os_roles_lb_admin',
+                'os_roles_lb_observer', 'os_roles_lb_global_observer',
+                'os_roles_lb_member', 'os_roles_lb_member2']
+        if expected_allowed:
+            self.check_list_RBAC_enforcement(
+                'AvailabilityZoneClient', 'list_availability_zones',
+                expected_allowed)
 
         # Check the default sort order (by ID)
         availability_zones = (
@@ -359,12 +379,25 @@
 
         # Test that a user without the load balancer role cannot
         # show availability zone details.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = [
+                'os_admin', 'os_primary', 'os_roles_lb_admin',
+                'os_roles_lb_member', 'os_roles_lb_member2']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_admin', 'os_primary', 'os_system_admin',
+                                'os_system_reader', 'os_roles_lb_observer',
+                                'os_roles_lb_global_observer',
+                                'os_roles_lb_member', 'os_roles_lb_member2']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            self.assertRaises(
-                exceptions.Forbidden,
-                self.os_primary.availability_zone_client
-                    .show_availability_zone,
-                availability_zone[const.NAME])
+            expected_allowed = [
+                'os_system_admin', 'os_system_reader', 'os_roles_lb_admin',
+                'os_roles_lb_observer', 'os_roles_lb_global_observer',
+                'os_roles_lb_member', 'os_roles_lb_member2']
+        if expected_allowed:
+            self.check_show_RBAC_enforcement(
+                'AvailabilityZoneClient', 'show_availability_zone',
+                expected_allowed, availability_zone[const.NAME])
 
         result = self.mem_availability_zone_client.show_availability_zone(
             availability_zone[const.NAME])
@@ -420,13 +453,18 @@
             const.ENABLED: False}
 
         # Test that a user without the load balancer role cannot
-        # show availability zone details.
+        # update availability zone details.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_roles_lb_admin']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            self.assertRaises(
-                exceptions.Forbidden,
-                self.os_primary.availability_zone_client
-                    .update_availability_zone,
-                availability_zone[const.NAME],
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+        if expected_allowed:
+            self.check_update_RBAC_enforcement(
+                'AvailabilityZoneClient', 'update_availability_zone',
+                expected_allowed, None, None, availability_zone[const.NAME],
                 **availability_zone_updated_kwargs)
 
         updated_availability_zone = (
@@ -493,12 +531,17 @@
 
         # Test that a user without the load balancer admin role cannot
         # delete an availability zone.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_roles_lb_admin']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            self.assertRaises(
-                exceptions.Forbidden,
-                self.os_primary.availability_zone_client
-                    .delete_availability_zone,
-                availability_zone[const.NAME])
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+        if expected_allowed:
+            self.check_delete_RBAC_enforcement(
+                'AvailabilityZoneClient', 'delete_availability_zone',
+                expected_allowed, None, None, availability_zone[const.NAME])
 
         # Happy path
         self.lb_admin_availability_zone_client.delete_availability_zone(
diff --git a/octavia_tempest_plugin/tests/api/v2/test_availability_zone_capabilities.py b/octavia_tempest_plugin/tests/api/v2/test_availability_zone_capabilities.py
index 4a12057..d3833f6 100644
--- a/octavia_tempest_plugin/tests/api/v2/test_availability_zone_capabilities.py
+++ b/octavia_tempest_plugin/tests/api/v2/test_availability_zone_capabilities.py
@@ -15,7 +15,6 @@
 
 from tempest import config
 from tempest.lib import decorators
-from tempest.lib import exceptions
 
 from octavia_tempest_plugin.common import constants as const
 from octavia_tempest_plugin.tests import test_base
@@ -45,13 +44,17 @@
 
         # Test that a user without the load balancer admin role cannot
         # list provider availability zone capabilities.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_roles_lb_admin']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            os_primary_capabilities_client = (
-                self.os_primary.availability_zone_capabilities_client)
-            self.assertRaises(
-                exceptions.Forbidden,
-                (os_primary_capabilities_client
-                 .list_availability_zone_capabilities),
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+        if expected_allowed:
+            self.check_list_RBAC_enforcement(
+                'AvailabilityZoneCapabilitiesClient',
+                'list_availability_zone_capabilities', expected_allowed,
                 CONF.load_balancer.provider)
 
         # Check for an expected availability zone capability for the
diff --git a/octavia_tempest_plugin/tests/api/v2/test_availability_zone_profile.py b/octavia_tempest_plugin/tests/api/v2/test_availability_zone_profile.py
index f5e4b7e..456a01e 100644
--- a/octavia_tempest_plugin/tests/api/v2/test_availability_zone_profile.py
+++ b/octavia_tempest_plugin/tests/api/v2/test_availability_zone_profile.py
@@ -75,13 +75,19 @@
         }
 
         # Test that a user without the load balancer admin role cannot
-        # create an availability zone profile profile
+        # create an availability zone profile.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_roles_lb_admin']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            self.assertRaises(
-                exceptions.Forbidden,
-                self.os_primary.availability_zone_profile_client
-                    .create_availability_zone_profile,
-                **availability_zone_profile_kwargs)
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+        if expected_allowed:
+            self.check_create_RBAC_enforcement(
+                'AvailabilityZoneProfileClient',
+                'create_availability_zone_profile',
+                expected_allowed, **availability_zone_profile_kwargs)
 
         # Happy path
         availability_zone_profile = (
@@ -225,11 +231,17 @@
 
         # Test that a user without the load balancer admin role cannot
         # list availability zone profiles.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_roles_lb_admin']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            self.assertRaises(
-                exceptions.Forbidden,
-                self.os_primary.availability_zone_profile_client
-                    .list_availability_zone_profiles)
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+        if expected_allowed:
+            self.check_list_RBAC_enforcement(
+                'AvailabilityZoneProfileClient',
+                'list_availability_zone_profiles', expected_allowed)
 
         # Check the default sort order (by ID)
         profiles = (self.lb_admin_availability_zone_profile_client
@@ -379,12 +391,18 @@
             availability_zone_profile[const.ID])
 
         # Test that a user without the load balancer admin role cannot
-        # show an availability zone profile profile
+        # show an availability zone profile
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_roles_lb_admin']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            self.assertRaises(
-                exceptions.Forbidden,
-                self.os_primary.availability_zone_profile_client
-                    .show_availability_zone_profile,
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+        if expected_allowed:
+            self.check_show_RBAC_enforcement(
+                'AvailabilityZoneProfileClient',
+                'show_availability_zone_profile', expected_allowed,
                 availability_zone_profile[const.ID])
 
         result = (
@@ -475,13 +493,19 @@
         }
 
         # Test that a user without the load balancer admin role cannot
-        # create an availability zone profile profile
+        # update an availability zone profile.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_roles_lb_admin']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            self.assertRaises(
-                exceptions.Forbidden,
-                self.os_primary.availability_zone_profile_client
-                    .update_availability_zone_profile,
-                availability_zone_profile[const.ID],
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+        if expected_allowed:
+            self.check_update_RBAC_enforcement(
+                'AvailabilityZoneProfileClient',
+                'update_availability_zone_profile', expected_allowed,
+                None, None, availability_zone_profile[const.ID],
                 **availability_zone_profile_updated_kwargs)
 
         result = (
@@ -551,13 +575,19 @@
             availability_zone_profile[const.ID])
 
         # Test that a user without the load balancer admin role cannot
-        # delete an availability zone profile profile
+        # delete an availability zone profile.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_roles_lb_admin']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            self.assertRaises(
-                exceptions.Forbidden,
-                self.os_primary.availability_zone_profile_client
-                    .delete_availability_zone_profile,
-                availability_zone_profile[const.ID])
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+        if expected_allowed:
+            self.check_delete_RBAC_enforcement(
+                'AvailabilityZoneProfileClient',
+                'delete_availability_zone_profile', expected_allowed,
+                None, None, availability_zone_profile[const.ID])
 
         # Happy path
         (self.lb_admin_availability_zone_profile_client
diff --git a/octavia_tempest_plugin/tests/api/v2/test_flavor.py b/octavia_tempest_plugin/tests/api/v2/test_flavor.py
index 4333121..b5b4254 100644
--- a/octavia_tempest_plugin/tests/api/v2/test_flavor.py
+++ b/octavia_tempest_plugin/tests/api/v2/test_flavor.py
@@ -87,11 +87,18 @@
             const.FLAVOR_PROFILE_ID: self.flavor_profile_id}
 
         # Test that a user without the load balancer admin role cannot
-        # create a flavor
+        # create a flavor.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_roles_lb_admin']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            self.assertRaises(exceptions.Forbidden,
-                              self.os_primary.flavor_client.create_flavor,
-                              **flavor_kwargs)
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+        if expected_allowed:
+            self.check_create_RBAC_enforcement(
+                'FlavorClient', 'create_flavor',
+                expected_allowed, None, None, **flavor_kwargs)
 
         # Happy path
         flavor = self.lb_admin_flavor_client.create_flavor(**flavor_kwargs)
@@ -185,10 +192,24 @@
 
         # Test that a user without the load balancer role cannot
         # list flavors.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = [
+                'os_admin', 'os_primary', 'os_roles_lb_admin',
+                'os_roles_lb_member', 'os_roles_lb_member2']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_admin', 'os_primary', 'os_system_admin',
+                                'os_system_reader', 'os_roles_lb_observer',
+                                'os_roles_lb_global_observer',
+                                'os_roles_lb_member', 'os_roles_lb_member2']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            self.assertRaises(
-                exceptions.Forbidden,
-                self.os_primary.flavor_client.list_flavors)
+            expected_allowed = [
+                'os_system_admin', 'os_system_reader', 'os_roles_lb_admin',
+                'os_roles_lb_observer', 'os_roles_lb_global_observer',
+                'os_roles_lb_member', 'os_roles_lb_member2']
+        if expected_allowed:
+            self.check_list_RBAC_enforcement(
+                'FlavorClient', 'list_flavors', expected_allowed)
 
         # Check the default sort order (by ID)
         flavors = self.mem_flavor_client.list_flavors()
@@ -299,10 +320,24 @@
 
         # Test that a user without the load balancer role cannot
         # show flavor details.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = [
+                'os_admin', 'os_primary', 'os_roles_lb_admin',
+                'os_roles_lb_member', 'os_roles_lb_member2']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_admin', 'os_primary', 'os_system_admin',
+                                'os_system_reader', 'os_roles_lb_observer',
+                                'os_roles_lb_global_observer',
+                                'os_roles_lb_member', 'os_roles_lb_member2']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            self.assertRaises(
-                exceptions.Forbidden,
-                self.os_primary.flavor_client.show_flavor,
+            expected_allowed = [
+                'os_system_admin', 'os_system_reader', 'os_roles_lb_admin',
+                'os_roles_lb_observer', 'os_roles_lb_global_observer',
+                'os_roles_lb_member', 'os_roles_lb_member2']
+        if expected_allowed:
+            self.check_show_RBAC_enforcement(
+                'FlavorClient', 'show_flavor', expected_allowed,
                 flavor[const.ID])
 
         result = self.mem_flavor_client.show_flavor(flavor[const.ID])
@@ -354,11 +389,17 @@
             const.ENABLED: False}
 
         # Test that a user without the load balancer role cannot
-        # show flavor details.
+        # update flavor details.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_roles_lb_admin']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            self.assertRaises(
-                exceptions.Forbidden,
-                self.os_primary.flavor_client.update_flavor,
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+        if expected_allowed:
+            self.check_update_RBAC_enforcement(
+                'FlavorClient', 'update_flavor', expected_allowed, None, None,
                 flavor[const.ID], **flavor_updated_kwargs)
 
         updated_flavor = self.lb_admin_flavor_client.update_flavor(
@@ -413,11 +454,17 @@
 
         # Test that a user without the load balancer admin role cannot
         # delete a flavor.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_roles_lb_admin']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            self.assertRaises(
-                exceptions.Forbidden,
-                self.os_primary.flavor_client.delete_flavor,
-                flavor[const.ID])
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+        if expected_allowed:
+            self.check_delete_RBAC_enforcement(
+                'FlavorClient', 'delete_flavor', expected_allowed,
+                None, None, flavor[const.ID])
 
         # Happy path
         self.lb_admin_flavor_client.delete_flavor(flavor[const.ID])
diff --git a/octavia_tempest_plugin/tests/api/v2/test_flavor_capabilities.py b/octavia_tempest_plugin/tests/api/v2/test_flavor_capabilities.py
index 7f9da51..884f656 100644
--- a/octavia_tempest_plugin/tests/api/v2/test_flavor_capabilities.py
+++ b/octavia_tempest_plugin/tests/api/v2/test_flavor_capabilities.py
@@ -14,7 +14,6 @@
 
 from tempest import config
 from tempest.lib import decorators
-from tempest.lib import exceptions
 
 from octavia_tempest_plugin.common import constants as const
 from octavia_tempest_plugin.tests import test_base
@@ -43,12 +42,17 @@
 
         # Test that a user without the load balancer admin role cannot
         # list provider flavor capabilities.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_roles_lb_admin']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            os_primary_capabilities_client = (
-                self.os_primary.flavor_capabilities_client)
-            self.assertRaises(
-                exceptions.Forbidden,
-                os_primary_capabilities_client.list_flavor_capabilities,
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+        if expected_allowed:
+            self.check_list_RBAC_enforcement(
+                'FlavorCapabilitiesClient',
+                'list_flavor_capabilities', expected_allowed,
                 CONF.load_balancer.provider)
 
         # Check for an expected flavor capability for the configured provider
diff --git a/octavia_tempest_plugin/tests/api/v2/test_flavor_profile.py b/octavia_tempest_plugin/tests/api/v2/test_flavor_profile.py
index 8f9c7d3..39f3338 100644
--- a/octavia_tempest_plugin/tests/api/v2/test_flavor_profile.py
+++ b/octavia_tempest_plugin/tests/api/v2/test_flavor_profile.py
@@ -60,11 +60,17 @@
 
         # Test that a user without the load balancer admin role cannot
         # create a flavor profile
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_roles_lb_admin']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            self.assertRaises(
-                exceptions.Forbidden,
-                self.os_primary.flavor_profile_client.create_flavor_profile,
-                **flavor_profile_kwargs)
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+        if expected_allowed:
+            self.check_create_RBAC_enforcement(
+                'FlavorProfileClient', 'create_flavor_profile',
+                expected_allowed, None, None, **flavor_profile_kwargs)
 
         # Happy path
         flavor_profile = (
@@ -174,10 +180,17 @@
 
         # Test that a user without the load balancer admin role cannot
         # list flavor profiles.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_roles_lb_admin']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            self.assertRaises(
-                exceptions.Forbidden,
-                self.os_primary.flavor_profile_client.list_flavor_profiles)
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+        if expected_allowed:
+            self.check_list_RBAC_enforcement(
+                'FlavorProfileClient', 'list_flavor_profiles',
+                expected_allowed)
 
         # Check the default sort order (by ID)
         profiles = self.lb_admin_flavor_profile_client.list_flavor_profiles()
@@ -295,12 +308,18 @@
             flavor_profile[const.ID])
 
         # Test that a user without the load balancer admin role cannot
-        # show a flavor profile
+        # show a flavor profile.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_roles_lb_admin']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            self.assertRaises(
-                exceptions.Forbidden,
-                self.os_primary.flavor_profile_client.show_flavor_profile,
-                flavor_profile[const.ID])
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+        if expected_allowed:
+            self.check_show_RBAC_enforcement(
+                'FlavorProfileClient', 'show_flavor_profile',
+                expected_allowed, flavor_profile[const.ID])
 
         result = (
             self.lb_admin_flavor_profile_client.show_flavor_profile(
@@ -367,12 +386,19 @@
         }
 
         # Test that a user without the load balancer admin role cannot
-        # create a flavor profile
+        # update a flavor profile.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_roles_lb_admin']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            self.assertRaises(
-                exceptions.Forbidden,
-                self.os_primary.flavor_profile_client.update_flavor_profile,
-                flavor_profile[const.ID], **flavor_profile_updated_kwargs)
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+        if expected_allowed:
+            self.check_update_RBAC_enforcement(
+                'FlavorProfileClient', 'update_flavor_profile',
+                expected_allowed, None, None, flavor_profile[const.ID],
+                **flavor_profile_updated_kwargs)
 
         result = self.lb_admin_flavor_profile_client.update_flavor_profile(
             flavor_profile[const.ID], **flavor_profile_updated_kwargs)
@@ -428,11 +454,17 @@
 
         # Test that a user without the load balancer admin role cannot
         # delete a flavor profile
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_roles_lb_admin']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            self.assertRaises(
-                exceptions.Forbidden,
-                self.os_primary.flavor_profile_client.delete_flavor_profile,
-                flavor_profile[const.ID])
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+        if expected_allowed:
+            self.check_delete_RBAC_enforcement(
+                'FlavorProfileClient', 'delete_flavor_profile',
+                expected_allowed, None, None, flavor_profile[const.ID])
 
         # Happy path
         self.lb_admin_flavor_profile_client.delete_flavor_profile(
diff --git a/octavia_tempest_plugin/tests/api/v2/test_healthmonitor.py b/octavia_tempest_plugin/tests/api/v2/test_healthmonitor.py
index ff07c73..a305ead 100644
--- a/octavia_tempest_plugin/tests/api/v2/test_healthmonitor.py
+++ b/octavia_tempest_plugin/tests/api/v2/test_healthmonitor.py
@@ -277,11 +277,21 @@
 
         # Test that a user without the loadbalancer role cannot
         # create a healthmonitor
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_roles_lb_admin',
+                                'os_roles_lb_member']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_system_admin', 'os_roles_lb_member']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            self.assertRaises(
-                exceptions.Forbidden,
-                self.os_primary.healthmonitor_client.create_healthmonitor,
-                **hm_kwargs)
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+                                'os_roles_lb_member']
+        if expected_allowed:
+            self.check_create_RBAC_enforcement(
+                'HealthMonitorClient', 'create_healthmonitor',
+                expected_allowed,
+                status_method=self.mem_lb_client.show_loadbalancer,
+                obj_id=self.lb_id, **hm_kwargs)
 
         hm = self.mem_healthmonitor_client.create_healthmonitor(**hm_kwargs)
         self.addCleanup(
@@ -488,6 +498,9 @@
         * List the healthmonitors filtering to one of the three.
         * List the healthmonitors filtered, one field, and sorted.
         """
+        # IDs of health monitors created in the test
+        test_ids = []
+
         if (pool_algorithm == const.LB_ALGORITHM_SOURCE_IP_PORT and not
             self.mem_listener_client.is_version_supported(
                 self.api_version, '2.13')):
@@ -614,6 +627,7 @@
                                 const.ACTIVE,
                                 CONF.load_balancer.build_interval,
                                 CONF.load_balancer.build_timeout)
+        test_ids.append(hm1[const.ID])
         # Time resolution for created_at is only to the second, and we need to
         # ensure that each object has a distinct creation time. Delaying one
         # second is both a simple and a reliable way to accomplish this.
@@ -658,6 +672,7 @@
                                 const.ACTIVE,
                                 CONF.load_balancer.build_interval,
                                 CONF.load_balancer.build_timeout)
+        test_ids.append(hm2[const.ID])
         # Time resolution for created_at is only to the second, and we need to
         # ensure that each object has a distinct creation time. Delaying one
         # second is both a simple and a reliable way to accomplish this.
@@ -702,19 +717,65 @@
                                 const.ACTIVE,
                                 CONF.load_balancer.build_interval,
                                 CONF.load_balancer.build_timeout)
+        test_ids.append(hm3[const.ID])
 
-        # Test that a different user cannot list healthmonitors
-        if not CONF.load_balancer.RBAC_test_type == const.NONE:
-            member2_client = self.os_roles_lb_member2.healthmonitor_client
-            primary = member2_client.list_healthmonitors(
-                query_params='pool_id={pool_id}'.format(pool_id=pool1_id))
-            self.assertEqual(0, len(primary))
+        # Test that a different users cannot see the lb_member healthmonitors
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_primary', 'os_roles_lb_member2']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_admin', 'os_primary',
+                                'os_roles_lb_member2', 'os_roles_lb_observer',
+                                'os_roles_lb_global_observer']
+        if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
+            expected_allowed = ['os_roles_lb_observer', 'os_roles_lb_member2']
+        if expected_allowed:
+            self.check_list_RBAC_enforcement_count(
+                'HealthMonitorClient', 'list_healthmonitors',
+                expected_allowed, 0)
+
+        # Test credentials that should see these healthmonitors can see them.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_roles_lb_member']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_system_admin', 'os_system_reader',
+                                'os_roles_lb_member']
+        if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
+            expected_allowed = ['os_system_admin', 'os_system_reader',
+                                'os_roles_lb_admin', 'os_roles_lb_member',
+                                'os_roles_lb_global_observer']
+        if expected_allowed:
+            self.check_list_IDs_RBAC_enforcement(
+                'HealthMonitorClient', 'list_healthmonitors',
+                expected_allowed, test_ids)
 
         # Test that users without the lb member role cannot list healthmonitors
+        # Note: non-owners can still call this API, they will just get the list
+        #       of health monitors for their project (zero). The above tests
+        #       are intended to cover the cross project use case.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_primary', 'os_roles_lb_admin',
+                                'os_roles_lb_member', 'os_roles_lb_member2']
+        # Note: os_admin is here because it evaluaties to "project_admin"
+        #       in oslo_policy and since keystone considers "project_admin"
+        #       a superscope of "project_reader". This means it can read
+        #       objects in the "admin" credential's project.
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_admin', 'os_primary', 'os_system_admin',
+                                'os_system_reader', 'os_roles_lb_observer',
+                                'os_roles_lb_global_observer',
+                                'os_roles_lb_member', 'os_roles_lb_member2']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            self.assertRaises(
-                exceptions.Forbidden,
-                self.os_primary.healthmonitor_client.list_healthmonitors)
+            expected_allowed = ['os_system_admin', 'os_system_reader',
+                                'os_roles_lb_admin', 'os_roles_lb_observer',
+                                'os_roles_lb_global_observer',
+                                'os_roles_lb_member', 'os_roles_lb_member2']
+        if expected_allowed:
+            self.check_list_RBAC_enforcement(
+                'HealthMonitorClient', 'list_healthmonitors',
+                expected_allowed)
 
         # Check the default sort order, created_at
         hms = self.mem_healthmonitor_client.list_healthmonitors()
@@ -1125,33 +1186,24 @@
         for item in equal_items:
             self.assertEqual(hm_kwargs[item], hm[item])
 
-        # Test that a user with lb_admin role can see the healthmonitor
+        # Test that the appropriate users can see or not see the health
+        # monitors based on the API RBAC.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_roles_lb_admin',
+                                'os_roles_lb_member']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_system_admin', 'os_system_reader',
+                                'os_roles_lb_member']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            healthmonitor_client = self.os_roles_lb_admin.healthmonitor_client
-            hm_adm = healthmonitor_client.show_healthmonitor(hm[const.ID])
-            self.assertEqual(hm_name, hm_adm[const.NAME])
-
-        # Test that a user with cloud admin role can see the healthmonitor
-        if not CONF.load_balancer.RBAC_test_type == const.NONE:
-            adm = self.os_admin.healthmonitor_client.show_healthmonitor(
-                hm[const.ID])
-            self.assertEqual(hm_name, adm[const.NAME])
-
-        # Test that a different user, with loadbalancer member role, cannot
-        # see this healthmonitor
-        if not CONF.load_balancer.RBAC_test_type == const.NONE:
-            member2_client = self.os_roles_lb_member2.healthmonitor_client
-            self.assertRaises(exceptions.Forbidden,
-                              member2_client.show_healthmonitor,
-                              hm[const.ID])
-
-        # Test that a user, without the loadbalancer member role, cannot
-        # show healthmonitors
-        if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            self.assertRaises(
-                exceptions.Forbidden,
-                self.os_primary.healthmonitor_client.show_healthmonitor,
-                hm[const.ID])
+            expected_allowed = ['os_system_admin', 'os_system_reader',
+                                'os_roles_lb_admin',
+                                'os_roles_lb_global_observer',
+                                'os_roles_lb_member']
+        if expected_allowed:
+            self.check_show_RBAC_enforcement(
+                'HealthMonitorClient', 'show_healthmonitor',
+                expected_allowed, hm[const.ID])
 
     @decorators.idempotent_id('2417164b-ec03-4488-afd2-60b096dc0077')
     def test_LC_HTTP_healthmonitor_update(self):
@@ -1417,27 +1469,21 @@
             self.assertCountEqual(hm_kwargs[const.TAGS], hm[const.TAGS])
 
         # Test that a user, without the loadbalancer member role, cannot
-        # use this command
+        # update this healthmonitor.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_roles_lb_admin',
+                                'os_roles_lb_member']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_system_admin', 'os_roles_lb_member']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            self.assertRaises(
-                exceptions.Forbidden,
-                self.os_primary.healthmonitor_client.update_healthmonitor,
-                hm[const.ID], admin_state_up=True)
-
-        # Assert we didn't go into PENDING_*
-        hm_check = self.mem_healthmonitor_client.show_healthmonitor(
-            hm[const.ID])
-        self.assertEqual(const.ACTIVE,
-                         hm_check[const.PROVISIONING_STATUS])
-        self.assertFalse(hm_check[const.ADMIN_STATE_UP])
-
-        # Test that a user, without the loadbalancer member role, cannot
-        # update this healthmonitor
-        if not CONF.load_balancer.RBAC_test_type == const.NONE:
-            member2_client = self.os_roles_lb_member2.healthmonitor_client
-            self.assertRaises(exceptions.Forbidden,
-                              member2_client.update_healthmonitor,
-                              hm[const.ID], admin_state_up=True)
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+                                'os_roles_lb_member']
+        if expected_allowed:
+            self.check_update_RBAC_enforcement(
+                'HealthMonitorClient', 'update_healthmonitor',
+                expected_allowed, None, None, hm[const.ID],
+                admin_state_up=True)
 
         # Assert we didn't go into PENDING_*
         hm_check = self.mem_healthmonitor_client.show_healthmonitor(
@@ -1725,21 +1771,21 @@
             CONF.load_balancer.build_interval,
             CONF.load_balancer.build_timeout)
 
-        # Test that a user without the loadbalancer role cannot
-        # delete this healthmonitor
+        # Test that a user without the loadbalancer role cannot delete this
+        # healthmonitor.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_roles_lb_admin',
+                                'os_roles_lb_member']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_system_admin', 'os_roles_lb_member']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            self.assertRaises(
-                exceptions.Forbidden,
-                self.os_primary.healthmonitor_client.delete_healthmonitor,
-                hm[const.ID])
-
-        # Test that a different user, with the loadbalancer member role
-        # cannot delete this healthmonitor
-        if not CONF.load_balancer.RBAC_test_type == const.NONE:
-            member2_client = self.os_roles_lb_member2.healthmonitor_client
-            self.assertRaises(exceptions.Forbidden,
-                              member2_client.delete_healthmonitor,
-                              hm[const.ID])
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+                                'os_roles_lb_member']
+        if expected_allowed:
+            self.check_delete_RBAC_enforcement(
+                'HealthMonitorClient', 'delete_healthmonitor',
+                expected_allowed, None, None, hm[const.ID])
 
         self.mem_healthmonitor_client.delete_healthmonitor(hm[const.ID])
 
diff --git a/octavia_tempest_plugin/tests/api/v2/test_l7policy.py b/octavia_tempest_plugin/tests/api/v2/test_l7policy.py
index 440a533..e7ed5a6 100644
--- a/octavia_tempest_plugin/tests/api/v2/test_l7policy.py
+++ b/octavia_tempest_plugin/tests/api/v2/test_l7policy.py
@@ -19,7 +19,6 @@
 from tempest import config
 from tempest.lib.common.utils import data_utils
 from tempest.lib import decorators
-from tempest.lib import exceptions
 
 from octavia_tempest_plugin.common import constants as const
 from octavia_tempest_plugin.tests import test_base
@@ -135,11 +134,21 @@
 
         # Test that a user without the load balancer role cannot
         # create a l7policy
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_roles_lb_admin',
+                                'os_roles_lb_member']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_system_admin', 'os_roles_lb_member']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            self.assertRaises(
-                exceptions.Forbidden,
-                self.os_primary.l7policy_client.create_l7policy,
-                **l7policy_kwargs)
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+                                'os_roles_lb_member']
+        if expected_allowed:
+            self.check_create_RBAC_enforcement(
+                'L7PolicyClient', 'create_l7policy',
+                expected_allowed,
+                status_method=self.mem_lb_client.show_loadbalancer,
+                obj_id=self.lb_id, **l7policy_kwargs)
 
         l7policy = self.mem_l7policy_client.create_l7policy(**l7policy_kwargs)
 
@@ -208,6 +217,9 @@
         * List the l7policies filtering to one of the three.
         * List the l7policies filtered, one field, and sorted.
         """
+        # IDs of L7 policies created in the test
+        test_ids = []
+
         listener_name = data_utils.rand_name(
             "lb_member_listener2_l7policy-list")
         listener_kwargs = {
@@ -263,6 +275,7 @@
                                 const.ACTIVE,
                                 CONF.load_balancer.build_interval,
                                 CONF.load_balancer.build_timeout)
+        test_ids.append(l7policy1[const.ID])
         # Time resolution for created_at is only to the second, and we need to
         # ensure that each object has a distinct creation time. Delaying one
         # second is both a simple and a reliable way to accomplish this.
@@ -303,6 +316,7 @@
                                 const.ACTIVE,
                                 CONF.load_balancer.build_interval,
                                 CONF.load_balancer.build_timeout)
+        test_ids.append(l7policy2[const.ID])
         # Time resolution for created_at is only to the second, and we need to
         # ensure that each object has a distinct creation time. Delaying one
         # second is both a simple and a reliable way to accomplish this.
@@ -344,21 +358,67 @@
                                 const.ACTIVE,
                                 CONF.load_balancer.build_interval,
                                 CONF.load_balancer.build_timeout)
+        test_ids.append(l7policy3[const.ID])
 
-        # Test that a different user cannot list l7policies
-        if not CONF.load_balancer.RBAC_test_type == const.NONE:
-            member2_client = self.os_roles_lb_member2.l7policy_client
-            primary = member2_client.list_l7policies(
+        # Test that a different users cannot see the lb_member l7policies
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_primary', 'os_roles_lb_member2']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_admin', 'os_primary',
+                                'os_roles_lb_member2', 'os_roles_lb_observer',
+                                'os_roles_lb_global_observer']
+        if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
+            expected_allowed = ['os_roles_lb_observer', 'os_roles_lb_member2']
+        if expected_allowed:
+            self.check_list_RBAC_enforcement_count(
+                'L7PolicyClient', 'list_l7policies',
+                expected_allowed, 0)
+
+        # Test credentials that should see these l7policies can see them.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_roles_lb_member']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_system_admin', 'os_system_reader',
+                                'os_roles_lb_member']
+        if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
+            expected_allowed = ['os_system_admin', 'os_system_reader',
+                                'os_roles_lb_admin', 'os_roles_lb_member',
+                                'os_roles_lb_global_observer']
+        if expected_allowed:
+            self.check_list_IDs_RBAC_enforcement(
+                'L7PolicyClient', 'list_l7policies',
+                expected_allowed, test_ids,
                 query_params='listener_id={listener_id}'.format(
                     listener_id=listener_id))
-            self.assertEqual(0, len(primary))
 
-        # Test that a user without the lb member role cannot list load
-        # balancers
+        # Test that users without the lb member role cannot list l7policies
+        # Note: non-owners can still call this API, they will just get the list
+        #       of L7 policies for their project (zero). The above tests
+        #       are intended to cover the cross project use case.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_primary', 'os_roles_lb_admin',
+                                'os_roles_lb_member', 'os_roles_lb_member2']
+        # Note: os_admin is here because it evaluaties to "project_admin"
+        #       in oslo_policy and since keystone considers "project_admin"
+        #       a superscope of "project_reader". This means it can read
+        #       objects in the "admin" credential's project.
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_admin', 'os_primary', 'os_system_admin',
+                                'os_system_reader', 'os_roles_lb_observer',
+                                'os_roles_lb_global_observer',
+                                'os_roles_lb_member', 'os_roles_lb_member2']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            self.assertRaises(
-                exceptions.Forbidden,
-                self.os_primary.l7policy_client.list_l7policies)
+            expected_allowed = ['os_system_admin', 'os_system_reader',
+                                'os_roles_lb_admin', 'os_roles_lb_observer',
+                                'os_roles_lb_global_observer',
+                                'os_roles_lb_member', 'os_roles_lb_member2']
+        if expected_allowed:
+            self.check_list_RBAC_enforcement(
+                'L7PolicyClient', 'list_l7policies',
+                expected_allowed)
 
         # Check the default sort order, created_at
         l7policies = self.mem_l7policy_client.list_l7policies(
@@ -585,33 +645,24 @@
         self.assertIsNone(l7policy.pop(const.REDIRECT_URL, None))
         self.assertIsNone(l7policy.pop(const.REDIRECT_POOL_ID, None))
 
-        # Test that a user with lb_admin role can see the l7policy
+        # Test that the appropriate users can see or not see the L7 policies
+        # based on the API RBAC.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_roles_lb_admin',
+                                'os_roles_lb_member']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_system_admin', 'os_system_reader',
+                                'os_roles_lb_member']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            l7policy_client = self.os_roles_lb_admin.l7policy_client
-            l7policy_adm = l7policy_client.show_l7policy(l7policy[const.ID])
-            self.assertEqual(l7policy_name, l7policy_adm[const.NAME])
-
-        # Test that a user with cloud admin role can see the l7policy
-        if not CONF.load_balancer.RBAC_test_type == const.NONE:
-            adm = self.os_admin.l7policy_client.show_l7policy(
-                l7policy[const.ID])
-            self.assertEqual(l7policy_name, adm[const.NAME])
-
-        # Test that a different user, with load balancer member role, cannot
-        # see this l7policy
-        if not CONF.load_balancer.RBAC_test_type == const.NONE:
-            member2_client = self.os_roles_lb_member2.l7policy_client
-            self.assertRaises(exceptions.Forbidden,
-                              member2_client.show_l7policy,
-                              l7policy[const.ID])
-
-        # Test that a user, without the load balancer member role, cannot
-        # show l7policies
-        if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            self.assertRaises(
-                exceptions.Forbidden,
-                self.os_primary.l7policy_client.show_l7policy,
-                l7policy[const.ID])
+            expected_allowed = ['os_system_admin', 'os_system_reader',
+                                'os_roles_lb_admin',
+                                'os_roles_lb_global_observer',
+                                'os_roles_lb_member']
+        if expected_allowed:
+            self.check_show_RBAC_enforcement(
+                'L7PolicyClient', 'show_l7policy',
+                expected_allowed, l7policy[const.ID])
 
     @decorators.idempotent_id('08f73b22-550b-4e5a-b3d6-2ec03251ca13')
     def test_l7policy_update(self):
@@ -703,28 +754,22 @@
             self.assertCountEqual(l7policy_kwargs[const.TAGS],
                                   l7policy[const.TAGS])
 
-        # Test that a user, without the load balancer member role, cannot
-        # use this command
+        # Test that a user, without the loadbalancer member role, cannot
+        # update this L7 policy.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_roles_lb_admin',
+                                'os_roles_lb_member']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_system_admin', 'os_roles_lb_member']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            self.assertRaises(
-                exceptions.Forbidden,
-                self.os_primary.l7policy_client.update_l7policy,
-                l7policy[const.ID], admin_state_up=True)
-
-        # Assert we didn't go into PENDING_*
-        l7policy_check = self.mem_l7policy_client.show_l7policy(
-            l7policy[const.ID])
-        self.assertEqual(const.ACTIVE,
-                         l7policy_check[const.PROVISIONING_STATUS])
-        self.assertFalse(l7policy_check[const.ADMIN_STATE_UP])
-
-        # Test that a user, without the load balancer member role, cannot
-        # update this l7policy
-        if not CONF.load_balancer.RBAC_test_type == const.NONE:
-            member2_client = self.os_roles_lb_member2.l7policy_client
-            self.assertRaises(exceptions.Forbidden,
-                              member2_client.update_l7policy,
-                              l7policy[const.ID], admin_state_up=True)
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+                                'os_roles_lb_member']
+        if expected_allowed:
+            self.check_update_RBAC_enforcement(
+                'L7PolicyClient', 'update_l7policy',
+                expected_allowed, None, None, l7policy[const.ID],
+                admin_state_up=True)
 
         # Assert we didn't go into PENDING_*
         l7policy_check = self.mem_l7policy_client.show_l7policy(
@@ -820,21 +865,21 @@
             CONF.load_balancer.build_interval,
             CONF.load_balancer.build_timeout)
 
-        # Test that a user without the load balancer role cannot
-        # delete this l7policy
+        # Test that a user without the loadbalancer role cannot delete this
+        # L7 policy.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_roles_lb_admin',
+                                'os_roles_lb_member']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_system_admin', 'os_roles_lb_member']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            self.assertRaises(
-                exceptions.Forbidden,
-                self.os_primary.l7policy_client.delete_l7policy,
-                l7policy[const.ID])
-
-        # Test that a different user, with the load balancer member role
-        # cannot delete this l7policy
-        if not CONF.load_balancer.RBAC_test_type == const.NONE:
-            member2_client = self.os_roles_lb_member2.l7policy_client
-            self.assertRaises(exceptions.Forbidden,
-                              member2_client.delete_l7policy,
-                              l7policy[const.ID])
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+                                'os_roles_lb_member']
+        if expected_allowed:
+            self.check_delete_RBAC_enforcement(
+                'L7PolicyClient', 'delete_l7policy',
+                expected_allowed, None, None, l7policy[const.ID])
 
         self.mem_l7policy_client.delete_l7policy(l7policy[const.ID])
 
diff --git a/octavia_tempest_plugin/tests/api/v2/test_l7rule.py b/octavia_tempest_plugin/tests/api/v2/test_l7rule.py
index e44c9f8..5cb85c4 100644
--- a/octavia_tempest_plugin/tests/api/v2/test_l7rule.py
+++ b/octavia_tempest_plugin/tests/api/v2/test_l7rule.py
@@ -19,7 +19,6 @@
 from tempest import config
 from tempest.lib.common.utils import data_utils
 from tempest.lib import decorators
-from tempest.lib import exceptions
 
 from octavia_tempest_plugin.common import constants as const
 from octavia_tempest_plugin.tests import test_base
@@ -141,13 +140,23 @@
                 const.TAGS: l7_rule_tags
             })
 
-        # Test that a user without the load balancer role cannot
-        # create a l7rule
+        # Test that a user without the loadbalancer role cannot
+        # create an L7 rule.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_roles_lb_admin',
+                                'os_roles_lb_member']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_system_admin', 'os_roles_lb_member']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            self.assertRaises(
-                exceptions.Forbidden,
-                self.os_primary.l7rule_client.create_l7rule,
-                **l7rule_kwargs)
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+                                'os_roles_lb_member']
+        if expected_allowed:
+            self.check_create_RBAC_enforcement(
+                'L7RuleClient', 'create_l7rule',
+                expected_allowed,
+                status_method=self.mem_lb_client.show_loadbalancer,
+                obj_id=self.lb_id, **l7rule_kwargs)
 
         l7rule = self.mem_l7rule_client.create_l7rule(**l7rule_kwargs)
         self.addClassResourceCleanup(
@@ -211,6 +220,9 @@
         * List the l7rules filtering to one of the three.
         * List the l7rules filtered, one field, and sorted.
         """
+        # IDs of L7 rules created in the test
+        test_ids = []
+
         l7policy_name = data_utils.rand_name("lb_member_l7policy2_l7rule-list")
         l7policy = self.mem_l7policy_client.create_l7policy(
             name=l7policy_name, listener_id=self.listener_id,
@@ -260,6 +272,7 @@
                                 const.ACTIVE,
                                 CONF.load_balancer.check_interval,
                                 CONF.load_balancer.check_timeout)
+        test_ids.append(l7rule1[const.ID])
         # Time resolution for created_at is only to the second, and we need to
         # ensure that each object has a distinct creation time. Delaying one
         # second is both a simple and a reliable way to accomplish this.
@@ -298,6 +311,7 @@
                                 const.ACTIVE,
                                 CONF.load_balancer.check_interval,
                                 CONF.load_balancer.check_timeout)
+        test_ids.append(l7rule2[const.ID])
         # Time resolution for created_at is only to the second, and we need to
         # ensure that each object has a distinct creation time. Delaying one
         # second is both a simple and a reliable way to accomplish this.
@@ -336,23 +350,47 @@
                                 const.ACTIVE,
                                 CONF.load_balancer.check_interval,
                                 CONF.load_balancer.check_timeout)
+        test_ids.append(l7rule3[const.ID])
 
-        # Test that a different user cannot list l7rules
-        if not CONF.load_balancer.RBAC_test_type == const.NONE:
-            member2_client = self.os_roles_lb_member2.l7rule_client
-            self.assertRaises(
-                exceptions.Forbidden,
-                member2_client.list_l7rules,
-                l7policy_id)
-
-        # Test that a user without the lb l7rule role cannot list load
-        # balancers
+        # Test credentials that should see these L7 rules can see them.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_roles_lb_member']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_system_admin', 'os_system_reader',
+                                'os_roles_lb_member']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            self.assertRaises(
-                exceptions.Forbidden,
-                self.os_primary.l7rule_client.list_l7rules,
+            expected_allowed = ['os_system_admin', 'os_system_reader',
+                                'os_roles_lb_admin', 'os_roles_lb_member',
+                                'os_roles_lb_global_observer']
+        if expected_allowed:
+            self.check_list_IDs_RBAC_enforcement(
+                'L7RuleClient', 'list_l7rules', expected_allowed, test_ids,
                 l7policy_id)
 
+        # Test that users without the lb member role cannot list L7 rules.
+        # Note: The parent policy ID blocks non-owners from listing
+        #       L7 Rules.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_roles_lb_admin',
+                                'os_roles_lb_member']
+        # Note: os_admin is here because it evaluaties to "project_admin"
+        #       in oslo_policy and since keystone considers "project_admin"
+        #       a superscope of "project_reader". This means it can read
+        #       objects in the "admin" credential's project.
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_admin', 'os_system_admin',
+                                'os_system_reader', 'os_roles_lb_member']
+        if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
+            expected_allowed = ['os_system_admin', 'os_system_reader',
+                                'os_roles_lb_admin',
+                                'os_roles_lb_global_observer',
+                                'os_roles_lb_member']
+        if expected_allowed:
+            self.check_list_RBAC_enforcement(
+                'L7RuleClient', 'list_l7rules', expected_allowed, l7policy_id)
+
         # Check the default sort order, created_at
         l7rules = self.mem_l7rule_client.list_l7rules(l7policy_id)
         self.assertEqual(l7rule1[const.VALUE],
@@ -521,34 +559,25 @@
         for item in equal_items:
             self.assertEqual(l7rule_kwargs[item], l7rule[item])
 
-        # Test that a user with lb_admin role can see the l7rule
+        # Test that the appropriate users can see or not see the L7 rule
+        # based on the API RBAC.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_roles_lb_admin',
+                                'os_roles_lb_member']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_system_admin', 'os_system_reader',
+                                'os_roles_lb_member']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            l7rule_client = self.os_roles_lb_admin.l7rule_client
-            l7rule_adm = l7rule_client.show_l7rule(
-                l7rule[const.ID], l7policy_id=self.l7policy_id)
-            self.assertEqual(l7rule_kwargs[const.KEY], l7rule_adm[const.KEY])
-
-        # Test that a user with cloud admin role can see the l7rule
-        if not CONF.load_balancer.RBAC_test_type == const.NONE:
-            adm = self.os_admin.l7rule_client.show_l7rule(
-                l7rule[const.ID], l7policy_id=self.l7policy_id)
-            self.assertEqual(l7rule_kwargs[const.KEY], adm[const.KEY])
-
-        # Test that a different user, with load balancer member role, cannot
-        # see this l7rule
-        if not CONF.load_balancer.RBAC_test_type == const.NONE:
-            member2_client = self.os_roles_lb_member2.l7rule_client
-            self.assertRaises(exceptions.Forbidden,
-                              member2_client.show_l7rule,
-                              l7rule[const.ID], l7policy_id=self.l7policy_id)
-
-        # Test that a user, without the load balancer member role, cannot
-        # show l7rules
-        if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            self.assertRaises(
-                exceptions.Forbidden,
-                self.os_primary.l7rule_client.show_l7rule,
-                l7rule[const.ID], l7policy_id=self.l7policy_id)
+            expected_allowed = ['os_system_admin', 'os_system_reader',
+                                'os_roles_lb_admin',
+                                'os_roles_lb_global_observer',
+                                'os_roles_lb_member']
+        if expected_allowed:
+            self.check_show_RBAC_enforcement(
+                'L7RuleClient', 'show_l7rule',
+                expected_allowed, l7rule[const.ID],
+                l7policy_id=self.l7policy_id)
 
     @decorators.idempotent_id('f8cee23b-89b6-4f3a-a842-1463daf42cf7')
     def test_l7rule_update(self):
@@ -618,29 +647,22 @@
         for item in equal_items:
             self.assertEqual(l7rule_kwargs[item], l7rule[item])
 
-        # Test that a user, without the load balancer member role, cannot
-        # use this command
+        # Test that a user, without the loadbalancer member role, cannot
+        # update this L7 rule.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_roles_lb_admin',
+                                'os_roles_lb_member']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_system_admin', 'os_roles_lb_member']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            self.assertRaises(
-                exceptions.Forbidden,
-                self.os_primary.l7rule_client.update_l7rule,
-                l7rule[const.ID], l7policy_id=self.l7policy_id,
-                admin_state_up=True)
-
-        # Assert we didn't go into PENDING_*
-        l7rule_check = self.mem_l7rule_client.show_l7rule(
-            l7rule[const.ID], l7policy_id=self.l7policy_id)
-        self.assertEqual(const.ACTIVE, l7rule_check[const.PROVISIONING_STATUS])
-        self.assertFalse(l7rule_check[const.ADMIN_STATE_UP])
-
-        # Test that a user, without the load balancer member role, cannot
-        # update this l7rule
-        if not CONF.load_balancer.RBAC_test_type == const.NONE:
-            member2_client = self.os_roles_lb_member2.l7rule_client
-            self.assertRaises(exceptions.Forbidden,
-                              member2_client.update_l7rule,
-                              l7rule[const.ID], l7policy_id=self.l7policy_id,
-                              admin_state_up=True)
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+                                'os_roles_lb_member']
+        if expected_allowed:
+            self.check_update_RBAC_enforcement(
+                'L7RuleClient', 'update_l7rule',
+                expected_allowed, None, None, l7rule[const.ID],
+                l7policy_id=self.l7policy_id, admin_state_up=True)
 
         # Assert we didn't go into PENDING_*
         l7rule_check = self.mem_l7rule_client.show_l7rule(
@@ -727,21 +749,22 @@
             CONF.load_balancer.build_interval,
             CONF.load_balancer.build_timeout)
 
-        # Test that a user without the load balancer role cannot
-        # delete this l7rule
+        # Test that a user without the loadbalancer role cannot delete this
+        # L7 rule.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_roles_lb_admin',
+                                'os_roles_lb_member']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_system_admin', 'os_roles_lb_member']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            self.assertRaises(
-                exceptions.Forbidden,
-                self.os_primary.l7rule_client.delete_l7rule,
-                l7rule[const.ID], l7policy_id=self.l7policy_id)
-
-        # Test that a different user, with the load balancer member role
-        # cannot delete this l7rule
-        if not CONF.load_balancer.RBAC_test_type == const.NONE:
-            member2_client = self.os_roles_lb_member2.l7rule_client
-            self.assertRaises(exceptions.Forbidden,
-                              member2_client.delete_l7rule,
-                              l7rule[const.ID], l7policy_id=self.l7policy_id)
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+                                'os_roles_lb_member']
+        if expected_allowed:
+            self.check_delete_RBAC_enforcement(
+                'L7RuleClient', 'delete_l7rule',
+                expected_allowed, None, None, l7rule[const.ID],
+                l7policy_id=self.l7policy_id)
 
         self.mem_l7rule_client.delete_l7rule(l7rule[const.ID],
                                              l7policy_id=self.l7policy_id)
diff --git a/octavia_tempest_plugin/tests/api/v2/test_listener.py b/octavia_tempest_plugin/tests/api/v2/test_listener.py
index c688fdd..152f6ff 100644
--- a/octavia_tempest_plugin/tests/api/v2/test_listener.py
+++ b/octavia_tempest_plugin/tests/api/v2/test_listener.py
@@ -59,6 +59,14 @@
         if CONF.load_balancer.test_with_ipv6:
             cls.allowed_cidrs = ['2001:db8:a0b:12f0::/64']
 
+    @classmethod
+    def setup_clients(cls):
+        """Setup client aliases."""
+        super(ListenerAPITest, cls).setup_clients()
+        cls.listener_client = cls.os_primary.load_balancer_v2.ListenerClient()
+        cls.member2_listener_client = (
+            cls.os_roles_lb_member2.load_balancer_v2.ListenerClient())
+
     @decorators.idempotent_id('88d0ec83-7b08-48d9-96e2-0df1d2f8cd98')
     def test_http_listener_create(self):
         self._test_listener_create(const.HTTP, 8000)
@@ -143,13 +151,23 @@
 
             listener_kwargs.update({const.ALLOWED_CIDRS: self.allowed_cidrs})
 
-        # Test that a user without the load balancer role cannot
-        # create a listener
+        # Test that a user without the loadbalancer role cannot
+        # create a listener.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_roles_lb_admin',
+                                'os_roles_lb_member']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_system_admin', 'os_roles_lb_member']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            self.assertRaises(
-                exceptions.Forbidden,
-                self.os_primary.listener_client.create_listener,
-                **listener_kwargs)
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+                                'os_roles_lb_member']
+        if expected_allowed:
+            self.check_create_RBAC_enforcement(
+                'ListenerClient', 'create_listener',
+                expected_allowed,
+                status_method=self.mem_lb_client.show_loadbalancer,
+                obj_id=self.lb_id, **listener_kwargs)
 
         listener = self.mem_listener_client.create_listener(**listener_kwargs)
 
@@ -378,6 +396,9 @@
         * List the listeners filtering to one of the three.
         * List the listeners filtered, one field, and sorted.
         """
+        # IDs of listeners created in the test
+        test_ids = []
+
         lb_name = data_utils.rand_name("lb_member_lb2_listener-list")
         lb = self.mem_lb_client.create_loadbalancer(
             name=lb_name, provider=CONF.load_balancer.provider,
@@ -427,6 +448,7 @@
                                 const.ACTIVE,
                                 CONF.load_balancer.build_interval,
                                 CONF.load_balancer.build_timeout)
+        test_ids.append(listener1[const.ID])
         # Time resolution for created_at is only to the second, and we need to
         # ensure that each object has a distinct creation time. Delaying one
         # second is both a simple and a reliable way to accomplish this.
@@ -465,6 +487,7 @@
                                 const.ACTIVE,
                                 CONF.load_balancer.build_interval,
                                 CONF.load_balancer.build_timeout)
+        test_ids.append(listener2[const.ID])
         # Time resolution for created_at is only to the second, and we need to
         # ensure that each object has a distinct creation time. Delaying one
         # second is both a simple and a reliable way to accomplish this.
@@ -503,6 +526,7 @@
                                 const.ACTIVE,
                                 CONF.load_balancer.build_interval,
                                 CONF.load_balancer.build_timeout)
+        test_ids.append(listener3[const.ID])
 
         if not CONF.load_balancer.test_with_noop:
             # Wait for the enabled listeners to come ONLINE
@@ -517,18 +541,64 @@
                 CONF.load_balancer.build_interval,
                 CONF.load_balancer.build_timeout)
 
-        # Test that a different user cannot list listeners
-        if not CONF.load_balancer.RBAC_test_type == const.NONE:
-            member2_client = self.os_roles_lb_member2.listener_client
-            primary = member2_client.list_listeners(
-                query_params='loadbalancer_id={lb_id}'.format(lb_id=lb_id))
-            self.assertEqual(0, len(primary))
-
-        # Test that a user without the lb member role cannot list listeners
+        # Test that a different users cannot see the lb_member listeners.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_primary', 'os_roles_lb_member2']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_admin', 'os_primary',
+                                'os_roles_lb_member2', 'os_roles_lb_observer',
+                                'os_roles_lb_global_observer']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            self.assertRaises(
-                exceptions.Forbidden,
-                self.os_primary.listener_client.list_listeners)
+            expected_allowed = ['os_roles_lb_observer', 'os_roles_lb_member2']
+        if expected_allowed:
+            self.check_list_RBAC_enforcement_count(
+                'ListenerClient', 'list_listeners', expected_allowed, 0,
+                query_params='loadbalancer_id={lb_id}'.format(lb_id=lb_id))
+
+        # Test credentials that should see these listeners can see them.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_roles_lb_member']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_system_admin', 'os_system_reader',
+                                'os_roles_lb_member']
+        if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
+            expected_allowed = ['os_system_admin', 'os_system_reader',
+                                'os_roles_lb_admin', 'os_roles_lb_member',
+                                'os_roles_lb_global_observer']
+        if expected_allowed:
+            self.check_list_IDs_RBAC_enforcement(
+                'ListenerClient', 'list_listeners', expected_allowed,
+                test_ids,
+                query_params='loadbalancer_id={lb_id}'.format(lb_id=lb_id))
+
+        # Test that users without the lb member role cannot list listeners.
+        # Note: non-owners can still call this API, they will just get the list
+        #       of health monitors for their project (zero). The above tests
+        #       are intended to cover the cross project use case.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_primary', 'os_roles_lb_admin',
+                                'os_roles_lb_member', 'os_roles_lb_member2']
+        # Note: os_admin is here because it evaluaties to "project_admin"
+        #       in oslo_policy and since keystone considers "project_admin"
+        #       a superscope of "project_reader". This means it can read
+        #       objects in the "admin" credential's project.
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_admin', 'os_primary', 'os_system_admin',
+                                'os_system_reader', 'os_roles_lb_observer',
+                                'os_roles_lb_global_observer',
+                                'os_roles_lb_member', 'os_roles_lb_member2']
+        if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
+            expected_allowed = ['os_system_admin', 'os_system_reader',
+                                'os_roles_lb_admin', 'os_roles_lb_observer',
+                                'os_roles_lb_global_observer',
+                                'os_roles_lb_member', 'os_roles_lb_member2']
+        if expected_allowed:
+            self.check_list_RBAC_enforcement(
+                'ListenerClient', 'list_listeners', expected_allowed,
+                query_params='loadbalancer_id={lb_id}'.format(lb_id=lb_id))
 
         # Check the default sort order, created_at
         listeners = self.mem_listener_client.list_listeners(
@@ -786,33 +856,24 @@
                 self.api_version, '2.12'):
             self.assertEqual(self.allowed_cidrs, listener[const.ALLOWED_CIDRS])
 
-        # Test that a user with lb_admin role can see the listener
+        # Test that the appropriate users can see or not see the listener
+        # based on the API RBAC.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_roles_lb_admin',
+                                'os_roles_lb_member']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_system_admin', 'os_system_reader',
+                                'os_roles_lb_member']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            listener_client = self.os_roles_lb_admin.listener_client
-            listener_adm = listener_client.show_listener(listener[const.ID])
-            self.assertEqual(listener_name, listener_adm[const.NAME])
-
-        # Test that a user with cloud admin role can see the listener
-        if not CONF.load_balancer.RBAC_test_type == const.NONE:
-            adm = self.os_admin.listener_client.show_listener(
-                listener[const.ID])
-            self.assertEqual(listener_name, adm[const.NAME])
-
-        # Test that a different user, with load balancer member role, cannot
-        # see this listener
-        if not CONF.load_balancer.RBAC_test_type == const.NONE:
-            member2_client = self.os_roles_lb_member2.listener_client
-            self.assertRaises(exceptions.Forbidden,
-                              member2_client.show_listener,
-                              listener[const.ID])
-
-        # Test that a user, without the load balancer member role, cannot
-        # show listeners
-        if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            self.assertRaises(
-                exceptions.Forbidden,
-                self.os_primary.listener_client.show_listener,
-                listener[const.ID])
+            expected_allowed = ['os_system_admin', 'os_system_reader',
+                                'os_roles_lb_admin',
+                                'os_roles_lb_global_observer',
+                                'os_roles_lb_member']
+        if expected_allowed:
+            self.check_show_RBAC_enforcement(
+                'ListenerClient', 'show_listener',
+                expected_allowed, listener[const.ID])
 
     @decorators.idempotent_id('aaae0298-5778-4c7e-a27a-01549a71b319')
     def test_http_listener_update(self):
@@ -943,7 +1004,7 @@
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
             self.assertRaises(
                 exceptions.Forbidden,
-                self.os_primary.listener_client.update_listener,
+                self.listener_client.update_listener,
                 listener[const.ID], admin_state_up=True)
 
         # Assert we didn't go into PENDING_*
@@ -956,7 +1017,7 @@
         # Test that a user, without the load balancer member role, cannot
         # update this listener
         if not CONF.load_balancer.RBAC_test_type == const.NONE:
-            member2_client = self.os_roles_lb_member2.listener_client
+            member2_client = self.member2_listener_client
             self.assertRaises(exceptions.Forbidden,
                               member2_client.update_listener,
                               listener[const.ID], admin_state_up=True)
@@ -1127,13 +1188,13 @@
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
             self.assertRaises(
                 exceptions.Forbidden,
-                self.os_primary.listener_client.delete_listener,
+                self.listener_client.delete_listener,
                 listener[const.ID])
 
         # Test that a different user, with the load balancer member role
         # cannot delete this listener
         if not CONF.load_balancer.RBAC_test_type == const.NONE:
-            member2_client = self.os_roles_lb_member2.listener_client
+            member2_client = self.member2_listener_client
             self.assertRaises(exceptions.Forbidden,
                               member2_client.delete_listener,
                               listener[const.ID])
@@ -1226,13 +1287,13 @@
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
             self.assertRaises(
                 exceptions.Forbidden,
-                self.os_primary.listener_client.get_listener_stats,
+                self.listener_client.get_listener_stats,
                 listener[const.ID])
 
         # Test that a different user, with the load balancer role, cannot see
         # the listener stats
         if not CONF.load_balancer.RBAC_test_type == const.NONE:
-            member2_client = self.os_roles_lb_member2.listener_client
+            member2_client = self.member2_listener_client
             self.assertRaises(exceptions.Forbidden,
                               member2_client.get_listener_stats,
                               listener[const.ID])
diff --git a/octavia_tempest_plugin/tests/api/v2/test_load_balancer.py b/octavia_tempest_plugin/tests/api/v2/test_load_balancer.py
index 0cf3576..7ade642 100644
--- a/octavia_tempest_plugin/tests/api/v2/test_load_balancer.py
+++ b/octavia_tempest_plugin/tests/api/v2/test_load_balancer.py
@@ -14,6 +14,7 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
+import copy
 import testtools
 import time
 from uuid import UUID
@@ -78,13 +79,25 @@
 
         self._setup_lb_network_kwargs(lb_kwargs, ip_version, use_fixed_ip=True)
 
-        # Test that a user without the load balancer role cannot
-        # create a load balancer
+        # Test that a user without the loadbalancer role cannot
+        # create a load balancer.
+        lb_kwargs_with_project_id = copy.deepcopy(lb_kwargs)
+        lb_kwargs_with_project_id[const.PROJECT_ID] = (
+            self.os_roles_lb_member.credentials.project_id)
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_primary', 'os_roles_lb_admin',
+                                'os_roles_lb_member', 'os_roles_lb_member2']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_system_admin',
+                                'os_roles_lb_member', 'os_roles_lb_member2']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            self.assertRaises(
-                exceptions.Forbidden,
-                self.os_primary.loadbalancer_client.create_loadbalancer,
-                **lb_kwargs)
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+                                'os_roles_lb_member', 'os_roles_lb_member2']
+        if expected_allowed:
+            self.check_create_RBAC_enforcement(
+                'LoadbalancerClient', 'create_loadbalancer',
+                expected_allowed, None, None, **lb_kwargs_with_project_id)
 
         lb = self.mem_lb_client.create_loadbalancer(**lb_kwargs)
 
@@ -173,21 +186,21 @@
                                      CONF.load_balancer.lb_build_interval,
                                      CONF.load_balancer.lb_build_timeout)
 
-        # Test that a user without the load balancer role cannot
-        # delete this load balancer
+        # Test that a user without the loadbalancer role cannot delete this
+        # load balancer.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_roles_lb_admin',
+                                'os_roles_lb_member']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_system_admin', 'os_roles_lb_member']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            self.assertRaises(
-                exceptions.Forbidden,
-                self.os_primary.loadbalancer_client.delete_loadbalancer,
-                lb[const.ID])
-
-        # Test that a different user, with the load balancer member role
-        # cannot delete this load balancer
-        if not CONF.load_balancer.RBAC_test_type == const.NONE:
-            member2_client = self.os_roles_lb_member2.loadbalancer_client
-            self.assertRaises(exceptions.Forbidden,
-                              member2_client.delete_loadbalancer,
-                              lb[const.ID])
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+                                'os_roles_lb_member']
+        if expected_allowed:
+            self.check_delete_RBAC_enforcement(
+                'LoadbalancerClient', 'delete_loadbalancer',
+                expected_allowed, None, None, lb[const.ID])
 
         self.mem_lb_client.delete_loadbalancer(lb[const.ID])
 
@@ -222,21 +235,21 @@
 
         # TODO(johnsom) Add other objects when we have clients for them
 
-        # Test that a user without the load balancer role cannot
-        # delete this load balancer
+        # Test that a user without the loadbalancer role cannot delete this
+        # load balancer.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_roles_lb_admin',
+                                'os_roles_lb_member']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_system_admin', 'os_roles_lb_member']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            self.assertRaises(
-                exceptions.Forbidden,
-                self.os_primary.loadbalancer_client.delete_loadbalancer,
-                lb[const.ID], cascade=True)
-
-        # Test that a different user, with the load balancer member role
-        # cannot delete this load balancer
-        if not CONF.load_balancer.RBAC_test_type == const.NONE:
-            member2_client = self.os_roles_lb_member2.loadbalancer_client
-            self.assertRaises(exceptions.Forbidden,
-                              member2_client.delete_loadbalancer,
-                              lb[const.ID], cascade=True)
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+                                'os_roles_lb_member']
+        if expected_allowed:
+            self.check_delete_RBAC_enforcement(
+                'LoadbalancerClient', 'delete_loadbalancer',
+                expected_allowed, None, None, lb[const.ID], cascade=True)
 
         self.mem_lb_client.delete_loadbalancer(lb[const.ID], cascade=True)
 
@@ -267,6 +280,8 @@
         * List the load balancers filtering to one of the three.
         * List the load balancers filtered, one field, and sorted.
         """
+        # IDs of load balancers created in the test
+        test_ids = []
         # Get a list of pre-existing LBs to filter from test data
         pretest_lbs = self.mem_lb_client.list_loadbalancers()
         # Store their IDs for easy access
@@ -313,6 +328,7 @@
                                           const.ONLINE,
                                           CONF.load_balancer.check_interval,
                                           CONF.load_balancer.check_timeout)
+        test_ids.append(lb1[const.ID])
 
         # Time resolution for created_at is only to the second, and we need to
         # ensure that each object has a distinct creation time. Delaying one
@@ -356,6 +372,7 @@
                                           const.ONLINE,
                                           CONF.load_balancer.check_interval,
                                           CONF.load_balancer.check_timeout)
+        test_ids.append(lb2[const.ID])
 
         # Time resolution for created_at is only to the second, and we need to
         # ensure that each object has a distinct creation time. Delaying one
@@ -394,19 +411,64 @@
                                       const.ACTIVE,
                                       CONF.load_balancer.lb_build_interval,
                                       CONF.load_balancer.lb_build_timeout)
+        test_ids.append(lb3[const.ID])
 
-        # Test that a different user cannot list load balancers
-        if not CONF.load_balancer.RBAC_test_type == const.NONE:
-            member2_client = self.os_roles_lb_member2.loadbalancer_client
-            primary = member2_client.list_loadbalancers()
-            self.assertEqual(0, len(primary))
-
-        # Test that a user without the lb member role cannot list load
-        # balancers
+        # Test that a different users cannot see the lb_member load balancers.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_primary', 'os_roles_lb_member2']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_admin', 'os_primary',
+                                'os_roles_lb_member2', 'os_roles_lb_observer',
+                                'os_roles_lb_global_observer']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            self.assertRaises(
-                exceptions.Forbidden,
-                self.os_primary.loadbalancer_client.list_loadbalancers)
+            expected_allowed = ['os_roles_lb_observer', 'os_roles_lb_member2']
+        if expected_allowed:
+            self.check_list_RBAC_enforcement_count(
+                'LoadbalancerClient', 'list_loadbalancers',
+                expected_allowed, 0)
+
+        # Test credentials that should see these load balancers can see them.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_roles_lb_member']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_system_admin', 'os_system_reader',
+                                'os_roles_lb_member']
+        if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
+            expected_allowed = ['os_system_admin', 'os_system_reader',
+                                'os_roles_lb_admin', 'os_roles_lb_member',
+                                'os_roles_lb_global_observer']
+        if expected_allowed:
+            self.check_list_IDs_RBAC_enforcement(
+                'LoadbalancerClient', 'list_loadbalancers',
+                expected_allowed, test_ids)
+
+        # Test that users without the lb member role cannot list load balancers
+        # Note: non-owners can still call this API, they will just get the list
+        #       of load balancers for their project (zero). The above tests
+        #       are intended to cover the cross project use case.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_primary', 'os_roles_lb_admin',
+                                'os_roles_lb_member', 'os_roles_lb_member2']
+        # Note: os_admin is here because it evaluaties to "project_admin"
+        #       in oslo_policy and since keystone considers "project_admin"
+        #       a superscope of "project_reader". This means it can read
+        #       objects in the "admin" credential's project.
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_admin', 'os_primary', 'os_system_admin',
+                                'os_system_reader', 'os_roles_lb_observer',
+                                'os_roles_lb_global_observer',
+                                'os_roles_lb_member', 'os_roles_lb_member2']
+        if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
+            expected_allowed = ['os_system_admin', 'os_system_reader',
+                                'os_roles_lb_admin', 'os_roles_lb_observer',
+                                'os_roles_lb_global_observer',
+                                'os_roles_lb_member', 'os_roles_lb_member2']
+        if expected_allowed:
+            self.check_list_RBAC_enforcement(
+                'LoadbalancerClient', 'list_loadbalancers', expected_allowed)
 
         # Check the default sort order, created_at
         lbs = self.mem_lb_client.list_loadbalancers()
@@ -566,33 +628,24 @@
             self.assertEqual(lb_kwargs[const.VIP_SUBNET_ID],
                              lb[const.VIP_SUBNET_ID])
 
-        # Test that a user with lb_admin role can see the load balanacer
+        # Test that the appropriate users can see or not see the load
+        # balancer based on the API RBAC.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_roles_lb_admin',
+                                'os_roles_lb_member']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_system_admin', 'os_system_reader',
+                                'os_roles_lb_member']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            lb_client = self.os_roles_lb_admin.loadbalancer_client
-            lb_adm = lb_client.show_loadbalancer(lb[const.ID])
-            self.assertEqual(lb_name, lb_adm[const.NAME])
-
-        # Test that a user with cloud admin role can see the load balanacer
-        if not CONF.load_balancer.RBAC_test_type == const.NONE:
-            adm = self.os_admin.loadbalancer_client.show_loadbalancer(
-                lb[const.ID])
-            self.assertEqual(lb_name, adm[const.NAME])
-
-        # Test that a different user, with load balancer member role, cannot
-        # see this load balancer
-        if not CONF.load_balancer.RBAC_test_type == const.NONE:
-            member2_client = self.os_roles_lb_member2.loadbalancer_client
-            self.assertRaises(exceptions.Forbidden,
-                              member2_client.show_loadbalancer,
-                              lb[const.ID])
-
-        # Test that a user, without the load balancer member role, cannot
-        # show load balancers
-        if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            self.assertRaises(
-                exceptions.Forbidden,
-                self.os_primary.loadbalancer_client.show_loadbalancer,
-                lb[const.ID])
+            expected_allowed = ['os_system_admin', 'os_system_reader',
+                                'os_roles_lb_admin',
+                                'os_roles_lb_global_observer',
+                                'os_roles_lb_member']
+        if expected_allowed:
+            self.check_show_RBAC_enforcement(
+                'LoadbalancerClient', 'show_loadbalancer',
+                expected_allowed, lb[const.ID])
 
         # Attempt to clean up so that one full test run doesn't start 10+
         # amps before the cleanup phase fires
@@ -679,26 +732,22 @@
         new_description = data_utils.arbitrary_string(size=255,
                                                       base_text='new')
 
-        # Test that a user, without the load balancer member role, cannot
-        # use this command
+        # Test that a user, without the loadbalancer member role, cannot
+        # update this load balancer.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_roles_lb_admin',
+                                'os_roles_lb_member']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_system_admin', 'os_roles_lb_member']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            self.assertRaises(
-                exceptions.Forbidden,
-                self.os_primary.loadbalancer_client.update_loadbalancer,
-                lb[const.ID], admin_state_up=True)
-
-        # Assert we didn't go into PENDING_*
-        lb_check = self.mem_lb_client.show_loadbalancer(lb[const.ID])
-        self.assertEqual(const.ACTIVE, lb_check[const.PROVISIONING_STATUS])
-        self.assertFalse(lb_check[const.ADMIN_STATE_UP])
-
-        # Test that a user, without the load balancer member role, cannot
-        # update this load balancer
-        if not CONF.load_balancer.RBAC_test_type == const.NONE:
-            member2_client = self.os_roles_lb_member2.loadbalancer_client
-            self.assertRaises(exceptions.Forbidden,
-                              member2_client.update_loadbalancer,
-                              lb[const.ID], admin_state_up=True)
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+                                'os_roles_lb_member']
+        if expected_allowed:
+            self.check_update_RBAC_enforcement(
+                'LoadbalancerClient', 'update_loadbalancer',
+                expected_allowed, None, None, lb[const.ID],
+                admin_state_up=True)
 
         # Assert we didn't go into PENDING_*
         lb_check = self.mem_lb_client.show_loadbalancer(lb[const.ID])
@@ -775,21 +824,24 @@
                                      CONF.load_balancer.lb_build_interval,
                                      CONF.load_balancer.lb_build_timeout)
 
-        # Test that a user, without the load balancer member role, cannot
-        # use this command
+        # Test that the appropriate users can see or not see the load
+        # balancer stats based on the API RBAC.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_roles_lb_admin',
+                                'os_roles_lb_member']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_system_admin', 'os_system_reader',
+                                'os_roles_lb_member']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            self.assertRaises(
-                exceptions.Forbidden,
-                self.os_primary.loadbalancer_client.get_loadbalancer_stats,
-                lb[const.ID])
-
-        # Test that a different user, with the load balancer role, cannot see
-        # the load balancer stats
-        if not CONF.load_balancer.RBAC_test_type == const.NONE:
-            member2_client = self.os_roles_lb_member2.loadbalancer_client
-            self.assertRaises(exceptions.Forbidden,
-                              member2_client.get_loadbalancer_stats,
-                              lb[const.ID])
+            expected_allowed = ['os_system_admin', 'os_system_reader',
+                                'os_roles_lb_admin',
+                                'os_roles_lb_global_observer',
+                                'os_roles_lb_member']
+        if expected_allowed:
+            self.check_show_RBAC_enforcement(
+                'LoadbalancerClient', 'get_loadbalancer_stats',
+                expected_allowed, lb[const.ID])
 
         stats = self.mem_lb_client.get_loadbalancer_stats(lb[const.ID])
 
@@ -843,21 +895,24 @@
                                          CONF.load_balancer.check_interval,
                                          CONF.load_balancer.check_timeout)
 
-        # Test that a user, without the load balancer member role, cannot
-        # use this method
+        # Test that the appropriate users can see or not see the load
+        # balancer status based on the API RBAC.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_roles_lb_admin',
+                                'os_roles_lb_member']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_system_admin', 'os_system_reader',
+                                'os_roles_lb_member']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            self.assertRaises(
-                exceptions.Forbidden,
-                self.os_primary.loadbalancer_client.get_loadbalancer_status,
-                lb[const.ID])
-
-        # Test that a different user, with load balancer role, cannot see
-        # the load balancer status
-        if not CONF.load_balancer.RBAC_test_type == const.NONE:
-            member2_client = self.os_roles_lb_member2.loadbalancer_client
-            self.assertRaises(exceptions.Forbidden,
-                              member2_client.get_loadbalancer_status,
-                              lb[const.ID])
+            expected_allowed = ['os_system_admin', 'os_system_reader',
+                                'os_roles_lb_admin',
+                                'os_roles_lb_global_observer',
+                                'os_roles_lb_member']
+        if expected_allowed:
+            self.check_show_RBAC_enforcement(
+                'LoadbalancerClient', 'get_loadbalancer_status',
+                expected_allowed, lb[const.ID])
 
         status = self.mem_lb_client.get_loadbalancer_status(lb[const.ID])
 
@@ -917,6 +972,20 @@
                               self.mem_lb_client.failover_loadbalancer,
                               lb[const.ID])
 
+        # Test that a user without the load balancer admin role cannot
+        # failover a load balancer.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_roles_lb_admin']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+        if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+        if expected_allowed:
+            self.check_update_RBAC_enforcement(
+                'LoadbalancerClient', 'failover_loadbalancer',
+                expected_allowed, None, None, lb[const.ID])
+
         # Assert we didn't go into PENDING_*
         lb = self.mem_lb_client.show_loadbalancer(lb[const.ID])
         self.assertEqual(const.ACTIVE, lb[const.PROVISIONING_STATUS])
@@ -926,8 +995,9 @@
                 query_params='{loadbalancer_id}={lb_id}'.format(
                     loadbalancer_id=const.LOADBALANCER_ID, lb_id=lb[const.ID]))
 
-        self.os_roles_lb_admin.loadbalancer_client.failover_loadbalancer(
-            lb[const.ID])
+        admin_lb_client = (
+            self.os_roles_lb_admin.load_balancer_v2.LoadbalancerClient())
+        admin_lb_client.failover_loadbalancer(lb[const.ID])
 
         lb = waiters.wait_for_status(self.mem_lb_client.show_loadbalancer,
                                      lb[const.ID], const.PROVISIONING_STATUS,
diff --git a/octavia_tempest_plugin/tests/api/v2/test_member.py b/octavia_tempest_plugin/tests/api/v2/test_member.py
index 66e367e..aa7cf25 100644
--- a/octavia_tempest_plugin/tests/api/v2/test_member.py
+++ b/octavia_tempest_plugin/tests/api/v2/test_member.py
@@ -887,11 +887,30 @@
         # Test that a user without the load balancer role cannot
         # create a member
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
+            member_client = self.os_primary.load_balancer_v2.MemberClient()
             self.assertRaises(
                 exceptions.Forbidden,
-                self.os_primary.member_client.create_member,
+                member_client.create_member,
                 **member_kwargs)
 
+        # Test that a user without the loadbalancer role cannot
+        # create a member.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_roles_lb_admin',
+                                'os_roles_lb_member']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_system_admin', 'os_roles_lb_member']
+        if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+                                'os_roles_lb_member']
+        if expected_allowed:
+            self.check_create_RBAC_enforcement(
+                'MemberClient', 'create_member',
+                expected_allowed,
+                status_method=self.mem_lb_client.show_loadbalancer,
+                obj_id=self.lb_id, **member_kwargs)
+
         member = self.mem_member_client.create_member(**member_kwargs)
 
         waiters.wait_for_status(
@@ -1048,6 +1067,9 @@
         * List the members filtering to one of the three.
         * List the members filtered, one field, and sorted.
         """
+        # IDs of members created in the test
+        test_ids = []
+
         if (algorithm == const.LB_ALGORITHM_SOURCE_IP_PORT and not
             self.mem_listener_client.is_version_supported(
                 self.api_version, '2.13')):
@@ -1124,6 +1146,7 @@
                                 const.ACTIVE,
                                 CONF.load_balancer.check_interval,
                                 CONF.load_balancer.check_timeout)
+        test_ids.append(member1[const.ID])
         # Time resolution for created_at is only to the second, and we need to
         # ensure that each object has a distinct creation time. Delaying one
         # second is both a simple and a reliable way to accomplish this.
@@ -1162,6 +1185,7 @@
                                 const.ACTIVE,
                                 CONF.load_balancer.check_interval,
                                 CONF.load_balancer.check_timeout)
+        test_ids.append(member2[const.ID])
         # Time resolution for created_at is only to the second, and we need to
         # ensure that each object has a distinct creation time. Delaying one
         # second is both a simple and a reliable way to accomplish this.
@@ -1200,22 +1224,45 @@
                                 const.ACTIVE,
                                 CONF.load_balancer.check_interval,
                                 CONF.load_balancer.check_timeout)
+        test_ids.append(member3[const.ID])
 
-        # Test that a different user cannot list members
-        if not CONF.load_balancer.RBAC_test_type == const.NONE:
-            member2_client = self.os_roles_lb_member2.member_client
-            self.assertRaises(
-                exceptions.Forbidden,
-                member2_client.list_members,
-                pool_id)
-
-        # Test that a user without the lb member role cannot list load
-        # balancers
+        # Test credentials that should see these members can see them.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_roles_lb_member']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_system_admin', 'os_system_reader',
+                                'os_roles_lb_member']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            self.assertRaises(
-                exceptions.Forbidden,
-                self.os_primary.member_client.list_members,
-                pool_id)
+            expected_allowed = ['os_system_admin', 'os_system_reader',
+                                'os_roles_lb_admin', 'os_roles_lb_member',
+                                'os_roles_lb_global_observer']
+        if expected_allowed:
+            self.check_list_IDs_RBAC_enforcement(
+                'MemberClient', 'list_members', expected_allowed,
+                test_ids, pool_id)
+
+        # Test that users without the lb member role cannot list members
+        # Note: The parent pool ID blocks non-owners from listing members.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_roles_lb_admin',
+                                'os_roles_lb_member']
+        # Note: os_admin is here because it evaluaties to "project_admin"
+        #       in oslo_policy and since keystone considers "project_admin"
+        #       a superscope of "project_reader". This means it can read
+        #       objects in the "admin" credential's project.
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_admin', 'os_system_admin',
+                                'os_system_reader', 'os_roles_lb_member']
+        if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
+            expected_allowed = ['os_system_admin', 'os_system_reader',
+                                'os_roles_lb_admin',
+                                'os_roles_lb_global_observer',
+                                'os_roles_lb_member']
+        if expected_allowed:
+            self.check_list_RBAC_enforcement(
+                'MemberClient', 'list_members', expected_allowed, pool_id)
 
         # Check the default sort order, created_at
         members = self.mem_member_client.list_members(pool_id)
@@ -1740,34 +1787,25 @@
         for item in equal_items:
             self.assertEqual(member_kwargs[item], member[item])
 
-        # Test that a user with lb_admin role can see the member
+        # Test that the appropriate users can see or not see the member
+        # based on the API RBAC.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_roles_lb_admin',
+                                'os_roles_lb_member']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_system_admin', 'os_system_reader',
+                                'os_roles_lb_member']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            member_client = self.os_roles_lb_admin.member_client
-            member_adm = member_client.show_member(
-                member[const.ID], pool_id=pool_id)
-            self.assertEqual(member_name, member_adm[const.NAME])
-
-        # Test that a user with cloud admin role can see the member
-        if not CONF.load_balancer.RBAC_test_type == const.NONE:
-            adm = self.os_admin.member_client.show_member(
-                member[const.ID], pool_id=pool_id)
-            self.assertEqual(member_name, adm[const.NAME])
-
-        # Test that a different user, with load balancer member role, cannot
-        # see this member
-        if not CONF.load_balancer.RBAC_test_type == const.NONE:
-            member2_client = self.os_roles_lb_member2.member_client
-            self.assertRaises(exceptions.Forbidden,
-                              member2_client.show_member,
-                              member[const.ID], pool_id=pool_id)
-
-        # Test that a user, without the load balancer member role, cannot
-        # show members
-        if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            self.assertRaises(
-                exceptions.Forbidden,
-                self.os_primary.member_client.show_member,
-                member[const.ID], pool_id=pool_id)
+            expected_allowed = ['os_system_admin', 'os_system_reader',
+                                'os_roles_lb_admin',
+                                'os_roles_lb_global_observer',
+                                'os_roles_lb_member']
+        if expected_allowed:
+            self.check_show_RBAC_enforcement(
+                'MemberClient', 'show_member',
+                expected_allowed, member[const.ID],
+                pool_id=pool_id)
 
     @decorators.idempotent_id('65680d48-1d49-4959-a7d1-677797e54f6b')
     def test_HTTP_LC_alt_monitor_member_update(self):
@@ -2206,30 +2244,22 @@
         for item in equal_items:
             self.assertEqual(member_kwargs[item], member[item])
 
-        # Test that a user, without the load balancer member role, cannot
-        # use this command
+        # Test that a user, without the loadbalancer member role, cannot
+        # update this member.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_roles_lb_admin',
+                                'os_roles_lb_member']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_system_admin', 'os_roles_lb_member']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            self.assertRaises(
-                exceptions.Forbidden,
-                self.os_primary.member_client.update_member,
-                member[const.ID], pool_id=pool_id, admin_state_up=True)
-
-        # Assert we didn't go into PENDING_*
-        member_check = self.mem_member_client.show_member(
-            member[const.ID], pool_id=pool_id)
-        self.assertEqual(const.ACTIVE,
-                         member_check[const.PROVISIONING_STATUS])
-        self.assertEqual(member_kwargs[const.ADMIN_STATE_UP],
-                         member_check[const.ADMIN_STATE_UP])
-
-        # Test that a user, without the load balancer member role, cannot
-        # update this member
-        if not CONF.load_balancer.RBAC_test_type == const.NONE:
-            member2_client = self.os_roles_lb_member2.member_client
-            self.assertRaises(exceptions.Forbidden,
-                              member2_client.update_member,
-                              member[const.ID], pool_id=pool_id,
-                              admin_state_up=True)
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+                                'os_roles_lb_member']
+        if expected_allowed:
+            self.check_update_RBAC_enforcement(
+                'MemberClient', 'update_member',
+                expected_allowed, None, None, member[const.ID],
+                pool_id=pool_id, admin_state_up=True)
 
         # Assert we didn't go into PENDING_*
         member_check = self.mem_member_client.show_member(
@@ -2672,12 +2702,21 @@
         member2_kwargs.pop(const.POOL_ID)
         batch_update_list = [member2_kwargs, member3_kwargs]
 
-        # Test that a user, without the load balancer member role, cannot
-        # use this command
+        # Test that a user, without the loadbalancer member role, cannot
+        # batch update this member.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_roles_lb_admin',
+                                'os_roles_lb_member']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_system_admin', 'os_roles_lb_member']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            self.assertRaises(
-                exceptions.Forbidden,
-                self.os_primary.member_client.update_members,
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+                                'os_roles_lb_member']
+        if expected_allowed:
+            self.check_update_RBAC_enforcement(
+                'MemberClient', 'update_members',
+                expected_allowed, None, None,
                 pool_id=pool_id, members_list=batch_update_list)
 
         # Assert we didn't go into PENDING_*
@@ -2908,21 +2947,22 @@
             CONF.load_balancer.build_interval,
             CONF.load_balancer.build_timeout)
 
-        # Test that a user without the load balancer role cannot
-        # delete this member
+        # Test that a user without the loadbalancer role cannot delete this
+        # member.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_roles_lb_admin',
+                                'os_roles_lb_member']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_system_admin', 'os_roles_lb_member']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            self.assertRaises(
-                exceptions.Forbidden,
-                self.os_primary.member_client.delete_member,
-                member[const.ID], pool_id=pool_id)
-
-        # Test that a different user, with the load balancer member role
-        # cannot delete this member
-        if not CONF.load_balancer.RBAC_test_type == const.NONE:
-            member2_client = self.os_roles_lb_member2.member_client
-            self.assertRaises(exceptions.Forbidden,
-                              member2_client.delete_member,
-                              member[const.ID], pool_id=pool_id)
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+                                'os_roles_lb_member']
+        if expected_allowed:
+            self.check_delete_RBAC_enforcement(
+                'MemberClient', 'delete_member',
+                expected_allowed, None, None, member[const.ID],
+                pool_id=pool_id)
 
         self.mem_member_client.delete_member(member[const.ID],
                                              pool_id=pool_id)
diff --git a/octavia_tempest_plugin/tests/api/v2/test_pool.py b/octavia_tempest_plugin/tests/api/v2/test_pool.py
index 3ab4892..ba31a8e 100644
--- a/octavia_tempest_plugin/tests/api/v2/test_pool.py
+++ b/octavia_tempest_plugin/tests/api/v2/test_pool.py
@@ -401,13 +401,23 @@
         else:
             pool_kwargs[const.LOADBALANCER_ID] = self.lb_id
 
-        # Test that a user without the load balancer role cannot
-        # create a pool
+        # Test that a user without the loadbalancer role cannot
+        # create a pool.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_roles_lb_admin',
+                                'os_roles_lb_member']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_system_admin', 'os_roles_lb_member']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            self.assertRaises(
-                exceptions.Forbidden,
-                self.os_primary.pool_client.create_pool,
-                **pool_kwargs)
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+                                'os_roles_lb_member']
+        if expected_allowed:
+            self.check_create_RBAC_enforcement(
+                'PoolClient', 'create_pool',
+                expected_allowed,
+                status_method=self.mem_lb_client.show_loadbalancer,
+                obj_id=self.lb_id, **pool_kwargs)
 
         # This is a special case as the reference driver does not support
         # SOURCE-IP-PORT. Since it runs with not_implemented_is_error, we must
@@ -585,6 +595,9 @@
         * List the pools filtering to one of the three.
         * List the pools filtered, one field, and sorted.
         """
+        # IDs of pools created in the test
+        test_ids = []
+
         if (algorithm == const.LB_ALGORITHM_SOURCE_IP_PORT and not
             self.mem_listener_client.is_version_supported(
                 self.api_version, '2.13')):
@@ -655,6 +668,7 @@
                                 const.ACTIVE,
                                 CONF.load_balancer.build_interval,
                                 CONF.load_balancer.build_timeout)
+        test_ids.append(pool1[const.ID])
         # Time resolution for created_at is only to the second, and we need to
         # ensure that each object has a distinct creation time. Delaying one
         # second is both a simple and a reliable way to accomplish this.
@@ -694,6 +708,7 @@
                                 const.ACTIVE,
                                 CONF.load_balancer.build_interval,
                                 CONF.load_balancer.build_timeout)
+        test_ids.append(pool2[const.ID])
         # Time resolution for created_at is only to the second, and we need to
         # ensure that each object has a distinct creation time. Delaying one
         # second is both a simple and a reliable way to accomplish this.
@@ -733,20 +748,65 @@
                                 const.ACTIVE,
                                 CONF.load_balancer.build_interval,
                                 CONF.load_balancer.build_timeout)
+        test_ids.append(pool3[const.ID])
 
-        # Test that a different user cannot list pools
-        if not CONF.load_balancer.RBAC_test_type == const.NONE:
-            member2_client = self.os_roles_lb_member2.pool_client
-            primary = member2_client.list_pools(
-                query_params='loadbalancer_id={lb_id}'.format(lb_id=lb_id))
-            self.assertEqual(0, len(primary))
-
-        # Test that a user without the lb member role cannot list load
-        # balancers
+        # Test that a different users cannot see the lb_member pools.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_primary', 'os_roles_lb_member2']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_admin', 'os_primary',
+                                'os_roles_lb_member2', 'os_roles_lb_observer',
+                                'os_roles_lb_global_observer']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            self.assertRaises(
-                exceptions.Forbidden,
-                self.os_primary.pool_client.list_pools)
+            expected_allowed = ['os_roles_lb_observer', 'os_roles_lb_member2']
+        if expected_allowed:
+            self.check_list_RBAC_enforcement_count(
+                'PoolClient', 'list_pools', expected_allowed, 0,
+                query_params='loadbalancer_id={lb_id}'.format(lb_id=lb_id))
+
+        # Test credentials that should see these pools can see them.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_roles_lb_member']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_system_admin', 'os_system_reader',
+                                'os_roles_lb_member']
+        if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
+            expected_allowed = ['os_system_admin', 'os_system_reader',
+                                'os_roles_lb_admin', 'os_roles_lb_member',
+                                'os_roles_lb_global_observer']
+        if expected_allowed:
+            self.check_list_IDs_RBAC_enforcement(
+                'PoolClient', 'list_pools', expected_allowed, test_ids,
+                query_params='loadbalancer_id={lb_id}'.format(lb_id=lb_id))
+
+        # Test that users without the lb member role cannot list pools.
+        # Note: non-owners can still call this API, they will just get the list
+        #       of pools for their project (zero). The above tests
+        #       are intended to cover the cross project use case.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_primary', 'os_roles_lb_admin',
+                                'os_roles_lb_member', 'os_roles_lb_member2']
+        # Note: os_admin is here because it evaluaties to "project_admin"
+        #       in oslo_policy and since keystone considers "project_admin"
+        #       a superscope of "project_reader". This means it can read
+        #       objects in the "admin" credential's project.
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_admin', 'os_primary', 'os_system_admin',
+                                'os_system_reader', 'os_roles_lb_observer',
+                                'os_roles_lb_global_observer',
+                                'os_roles_lb_member', 'os_roles_lb_member2']
+        if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
+            expected_allowed = ['os_system_admin', 'os_system_reader',
+                                'os_roles_lb_admin', 'os_roles_lb_observer',
+                                'os_roles_lb_global_observer',
+                                'os_roles_lb_member', 'os_roles_lb_member2']
+        if expected_allowed:
+            self.check_list_RBAC_enforcement(
+                'PoolClient', 'list_pools', expected_allowed,
+                query_params='loadbalancer_id={lb_id}'.format(lb_id=lb_id))
 
         # Check the default sort order, created_at
         pools = self.mem_pool_client.list_pools(
@@ -960,7 +1020,7 @@
     @decorators.idempotent_id('bd732c36-bdaa-4591-bf4e-28268874d22c')
     def test_UDP_RR_source_IP_pool_show(self):
         self._test_pool_show(
-            const.HTTP, const.LB_ALGORITHM_ROUND_ROBIN,
+            const.UDP, const.LB_ALGORITHM_ROUND_ROBIN,
             session_persistence=const.SESSION_PERSISTENCE_SOURCE_IP)
 
     def _test_pool_show(self, pool_protocol, algorithm,
@@ -1062,33 +1122,24 @@
             self.assertEqual(const.SESSION_PERSISTENCE_SOURCE_IP,
                              pool[const.SESSION_PERSISTENCE][const.TYPE])
 
-        # Test that a user with lb_admin role can see the pool
+        # Test that the appropriate users can see or not see the pool
+        # based on the API RBAC.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_roles_lb_admin',
+                                'os_roles_lb_member']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_system_admin', 'os_system_reader',
+                                'os_roles_lb_member']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            pool_client = self.os_roles_lb_admin.pool_client
-            pool_adm = pool_client.show_pool(pool[const.ID])
-            self.assertEqual(pool_name, pool_adm[const.NAME])
-
-        # Test that a user with cloud admin role can see the pool
-        if not CONF.load_balancer.RBAC_test_type == const.NONE:
-            adm = self.os_admin.pool_client.show_pool(
-                pool[const.ID])
-            self.assertEqual(pool_name, adm[const.NAME])
-
-        # Test that a different user, with load balancer member role, cannot
-        # see this pool
-        if not CONF.load_balancer.RBAC_test_type == const.NONE:
-            member2_client = self.os_roles_lb_member2.pool_client
-            self.assertRaises(exceptions.Forbidden,
-                              member2_client.show_pool,
-                              pool[const.ID])
-
-        # Test that a user, without the load balancer member role, cannot
-        # show pools
-        if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            self.assertRaises(
-                exceptions.Forbidden,
-                self.os_primary.pool_client.show_pool,
-                pool[const.ID])
+            expected_allowed = ['os_system_admin', 'os_system_reader',
+                                'os_roles_lb_admin',
+                                'os_roles_lb_global_observer',
+                                'os_roles_lb_member']
+        if expected_allowed:
+            self.check_show_RBAC_enforcement(
+                'PoolClient', 'show_pool',
+                expected_allowed, pool[const.ID])
 
     @decorators.idempotent_id('d73755fe-ba3a-4248-9543-8e167a5aa7f4')
     def test_HTTP_LC_pool_update(self):
@@ -1195,7 +1246,7 @@
     @decorators.idempotent_id('28b90650-a612-4b10-981f-d3dd6a366e4f')
     def test_UDP_RR_source_IP_pool_update(self):
         self._test_pool_update(
-            const.HTTP, const.LB_ALGORITHM_ROUND_ROBIN,
+            const.UDP, const.LB_ALGORITHM_ROUND_ROBIN,
             session_persistence=const.SESSION_PERSISTENCE_SOURCE_IP)
 
     def _test_pool_update(self, pool_protocol, algorithm,
@@ -1306,28 +1357,22 @@
             self.assertEqual(const.SESSION_PERSISTENCE_SOURCE_IP,
                              pool[const.SESSION_PERSISTENCE][const.TYPE])
 
-        # Test that a user, without the load balancer member role, cannot
-        # use this command
+        # Test that a user, without the loadbalancer member role, cannot
+        # update this pool.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_roles_lb_admin',
+                                'os_roles_lb_member']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_system_admin', 'os_roles_lb_member']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            self.assertRaises(
-                exceptions.Forbidden,
-                self.os_primary.pool_client.update_pool,
-                pool[const.ID], admin_state_up=True)
-
-        # Assert we didn't go into PENDING_*
-        pool_check = self.mem_pool_client.show_pool(
-            pool[const.ID])
-        self.assertEqual(const.ACTIVE,
-                         pool_check[const.PROVISIONING_STATUS])
-        self.assertFalse(pool_check[const.ADMIN_STATE_UP])
-
-        # Test that a user, without the load balancer member role, cannot
-        # update this pool
-        if not CONF.load_balancer.RBAC_test_type == const.NONE:
-            member2_client = self.os_roles_lb_member2.pool_client
-            self.assertRaises(exceptions.Forbidden,
-                              member2_client.update_pool,
-                              pool[const.ID], admin_state_up=True)
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+                                'os_roles_lb_member']
+        if expected_allowed:
+            self.check_update_RBAC_enforcement(
+                'PoolClient', 'update_pool',
+                expected_allowed, None, None, pool[const.ID],
+                admin_state_up=True)
 
         # Assert we didn't go into PENDING_*
         pool_check = self.mem_pool_client.show_pool(
@@ -1545,7 +1590,7 @@
     @decorators.idempotent_id('cc69c0d0-9191-4faf-a154-e33df880f44e')
     def test_UDP_RR_source_IP_pool_delete(self):
         self._test_pool_delete(
-            const.HTTP, const.LB_ALGORITHM_ROUND_ROBIN,
+            const.UDP, const.LB_ALGORITHM_ROUND_ROBIN,
             session_persistence=const.SESSION_PERSISTENCE_SOURCE_IP)
 
     def _test_pool_delete(self, pool_protocol, algorithm,
@@ -1609,21 +1654,21 @@
             CONF.load_balancer.build_interval,
             CONF.load_balancer.build_timeout)
 
-        # Test that a user without the load balancer role cannot
-        # delete this pool
+        # Test that a user without the loadbalancer role cannot delete this
+        # pool.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = ['os_admin', 'os_roles_lb_admin',
+                                'os_roles_lb_member']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_system_admin', 'os_roles_lb_member']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            self.assertRaises(
-                exceptions.Forbidden,
-                self.os_primary.pool_client.delete_pool,
-                pool[const.ID])
-
-        # Test that a different user, with the load balancer member role
-        # cannot delete this pool
-        if not CONF.load_balancer.RBAC_test_type == const.NONE:
-            member2_client = self.os_roles_lb_member2.pool_client
-            self.assertRaises(exceptions.Forbidden,
-                              member2_client.delete_pool,
-                              pool[const.ID])
+            expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+                                'os_roles_lb_member']
+        if expected_allowed:
+            self.check_delete_RBAC_enforcement(
+                'PoolClient', 'delete_pool',
+                expected_allowed, None, None, pool[const.ID])
 
         self.mem_pool_client.delete_pool(pool[const.ID])
 
diff --git a/octavia_tempest_plugin/tests/api/v2/test_provider.py b/octavia_tempest_plugin/tests/api/v2/test_provider.py
index 917b365..9a9dd28 100644
--- a/octavia_tempest_plugin/tests/api/v2/test_provider.py
+++ b/octavia_tempest_plugin/tests/api/v2/test_provider.py
@@ -15,7 +15,6 @@
 from oslo_log import log as logging
 from tempest import config
 from tempest.lib import decorators
-from tempest.lib import exceptions
 
 from octavia_tempest_plugin.common import constants as const
 from octavia_tempest_plugin.tests import test_base
@@ -44,10 +43,24 @@
 
         # Test that a user without the load balancer role cannot
         # list providers.
+        expected_allowed = []
+        if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+            expected_allowed = [
+                'os_admin', 'os_primary', 'os_roles_lb_admin',
+                'os_roles_lb_member', 'os_roles_lb_member2']
+        if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+            expected_allowed = ['os_admin', 'os_primary', 'os_system_admin',
+                                'os_system_reader', 'os_roles_lb_observer',
+                                'os_roles_lb_global_observer',
+                                'os_roles_lb_member', 'os_roles_lb_member2']
         if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
-            self.assertRaises(
-                exceptions.Forbidden,
-                self.os_primary.provider_client.list_providers)
+            expected_allowed = [
+                'os_system_admin', 'os_system_reader', 'os_roles_lb_observer',
+                'os_roles_lb_global_observer', 'os_roles_lb_admin',
+                'os_roles_lb_member', 'os_roles_lb_member2']
+        if expected_allowed:
+            self.check_list_RBAC_enforcement(
+                'ProviderClient', 'list_providers', expected_allowed)
 
         providers = self.mem_provider_client.list_providers()
 
diff --git a/octavia_tempest_plugin/tests/scenario/v2/test_ipv6_traffic_ops.py b/octavia_tempest_plugin/tests/scenario/v2/test_ipv6_traffic_ops.py
index 1495b82..4efa241 100644
--- a/octavia_tempest_plugin/tests/scenario/v2/test_ipv6_traffic_ops.py
+++ b/octavia_tempest_plugin/tests/scenario/v2/test_ipv6_traffic_ops.py
@@ -297,7 +297,7 @@
     @decorators.idempotent_id('843a13f7-e00f-4151-8817-b5395eb69b52')
     def test_ipv6_tcp_LC_listener_with_allowed_cidrs(self):
         self._test_listener_with_allowed_cidrs(
-            const.TCP, 91, const.LB_ALGORITHM_LEAST_CONNECTIONS)
+            const.TCP, 91, const.LB_ALGORITHM_LEAST_CONNECTIONS, delay=0.2)
 
     @decorators.idempotent_id('cc0d55b1-87e8-4a87-bf50-66299947a469')
     def test_ipv6_udp_LC_listener_with_allowed_cidrs(self):
@@ -350,7 +350,7 @@
             const.UDP, 101, const.LB_ALGORITHM_SOURCE_IP_PORT)
 
     def _test_listener_with_allowed_cidrs(self, protocol, protocol_port,
-                                          algorithm):
+                                          algorithm, delay=None):
         """Tests traffic through a loadbalancer with allowed CIDRs set.
 
         * Set up listener with allowed CIDRS (allow all) on a loadbalancer.
@@ -476,7 +476,7 @@
         self.check_members_balanced(
             self.lb_vip_address, protocol=protocol,
             protocol_port=protocol_port, persistent=False,
-            traffic_member_count=members)
+            traffic_member_count=members, delay=delay)
 
         listener_kwargs = {
             const.LISTENER_ID: listener_id,
@@ -493,6 +493,8 @@
         # wait until Neutron completes the SG update.
         # See https://bugs.launchpad.net/neutron/+bug/1866353.
         def expect_timeout_error(address, protocol, protocol_port):
+            if protocol != const.UDP:
+                address = "[{}]".format(address)
             try:
                 self.make_request(address, protocol=protocol,
                                   protocol_port=protocol_port)
diff --git a/octavia_tempest_plugin/tests/scenario/v2/test_traffic_ops.py b/octavia_tempest_plugin/tests/scenario/v2/test_traffic_ops.py
index 356327b..d4d43b5 100644
--- a/octavia_tempest_plugin/tests/scenario/v2/test_traffic_ops.py
+++ b/octavia_tempest_plugin/tests/scenario/v2/test_traffic_ops.py
@@ -15,6 +15,7 @@
 import datetime
 import ipaddress
 import shlex
+import socket
 import testtools
 import time
 
@@ -90,7 +91,8 @@
 
     @classmethod
     def _listener_pool_create(cls, protocol, protocol_port,
-                              pool_algorithm=const.LB_ALGORITHM_ROUND_ROBIN):
+                              pool_algorithm=const.LB_ALGORITHM_ROUND_ROBIN,
+                              insert_headers_dic=None):
         if (protocol == const.UDP and
                 not cls.mem_listener_client.is_version_supported(
                     cls.api_version, '2.1')):
@@ -112,6 +114,10 @@
             # haproxy process and use haproxy>=1.8:
             const.CONNECTION_LIMIT: 200,
         }
+
+        if insert_headers_dic:
+            listener_kwargs[const.INSERT_HEADERS] = insert_headers_dic
+
         listener = cls.mem_listener_client.create_listener(**listener_kwargs)
 
         waiters.wait_for_status(cls.mem_lb_client.show_loadbalancer,
@@ -138,9 +144,8 @@
         return listener[const.ID], pool[const.ID]
 
     def _test_basic_traffic(
-            self, protocol, protocol_port, listener_id, pool_id,
-            persistent=True, traffic_member_count=2, source_port=None,
-            delay=None):
+            self, protocol, protocol_port, pool_id, persistent=True,
+            traffic_member_count=2, source_port=None, delay=None):
         """Tests sending traffic through a loadbalancer
 
         * Set up members on a loadbalancer.
@@ -204,15 +209,15 @@
                       'Traffic tests will not work in noop mode.')
     @decorators.idempotent_id('6751135d-e15a-4e22-89f4-bfcc3408d424')
     def test_basic_http_traffic(self):
-        listener_id, pool_id = self._listener_pool_create(const.HTTP, 80)
-        self._test_basic_traffic(const.HTTP, 80, listener_id, pool_id)
+        pool_id = self._listener_pool_create(const.HTTP, 80)[1]
+        self._test_basic_traffic(const.HTTP, 80, pool_id)
 
     @testtools.skipIf(CONF.load_balancer.test_with_noop,
                       'Traffic tests will not work in noop mode.')
     @decorators.idempotent_id('332a08e0-eff1-4c19-b46c-bf87148a6d84')
     def test_basic_tcp_traffic(self):
-        listener_id, pool_id = self._listener_pool_create(const.TCP, 81)
-        self._test_basic_traffic(const.TCP, 81, listener_id, pool_id,
+        pool_id = self._listener_pool_create(const.TCP, 81)[1]
+        self._test_basic_traffic(const.TCP, 81, pool_id,
                                  persistent=False)
 
     @testtools.skipIf(CONF.load_balancer.test_with_noop,
@@ -223,11 +228,11 @@
                 self.api_version, '2.1'):
             raise self.skipException('UDP listener support is only available '
                                      'in Octavia API version 2.1 or newer')
-        listener_id, pool_id = self._listener_pool_create(const.UDP, 8080)
-        self._test_basic_traffic(const.UDP, 8080, listener_id, pool_id)
+        pool_id = self._listener_pool_create(const.UDP, 8080)[1]
+        self._test_basic_traffic(const.UDP, 8080, pool_id)
 
     def _test_healthmonitor_traffic(self, protocol, protocol_port,
-                                    listener_id, pool_id, persistent=True):
+                                    pool_id, persistent=True):
         """Tests traffic is correctly routed based on healthmonitor status
 
         * Create three members:
@@ -462,13 +467,13 @@
 
     @decorators.idempotent_id('a16f8eb4-a77c-4b0e-8b1b-91c237039713')
     def test_healthmonitor_http_traffic(self):
-        listener_id, pool_id = self._listener_pool_create(const.HTTP, 82)
-        self._test_healthmonitor_traffic(const.HTTP, 82, listener_id, pool_id)
+        pool_id = self._listener_pool_create(const.HTTP, 82)[1]
+        self._test_healthmonitor_traffic(const.HTTP, 82, pool_id)
 
     @decorators.idempotent_id('22f00c34-343b-4aa9-90be-4567ecf85772')
     def test_healthmonitor_tcp_traffic(self):
-        listener_id, pool_id = self._listener_pool_create(const.TCP, 83)
-        self._test_healthmonitor_traffic(const.TCP, 83, listener_id, pool_id,
+        pool_id = self._listener_pool_create(const.TCP, 83)[1]
+        self._test_healthmonitor_traffic(const.TCP, 83, pool_id,
                                          persistent=False)
 
     @decorators.idempotent_id('80b86513-1a76-4e42-91c9-cb23c879e536')
@@ -478,8 +483,8 @@
             raise self.skipException('UDP listener support is only available '
                                      'in Octavia API version 2.1 or newer')
 
-        listener_id, pool_id = self._listener_pool_create(const.UDP, 8081)
-        self._test_healthmonitor_traffic(const.UDP, 8081, listener_id, pool_id)
+        pool_id = self._listener_pool_create(const.UDP, 8081)[1]
+        self._test_healthmonitor_traffic(const.UDP, 8081, pool_id)
 
     @decorators.idempotent_id('3558186d-6dcd-4d9d-b7f7-adc190b66149')
     def test_http_l7policies_and_l7rules(self):
@@ -727,8 +732,7 @@
                                       headers={'reject': 'true'})
 
     def _test_mixed_ipv4_ipv6_members_traffic(self, protocol, protocol_port,
-                                              listener_id, pool_id,
-                                              persistent=True):
+                                              pool_id, persistent=True):
         """Tests traffic through a loadbalancer with IPv4 and IPv6 members.
 
         * Set up members on a loadbalancer.
@@ -795,9 +799,8 @@
                           'Mixed IPv4/IPv6 member test requires IPv6.')
     @decorators.idempotent_id('20b6b671-0101-4bed-a249-9af6ee3aa6d9')
     def test_mixed_ipv4_ipv6_members_http_traffic(self):
-        listener_id, pool_id = self._listener_pool_create(const.HTTP, 85)
-        self._test_mixed_ipv4_ipv6_members_traffic(const.HTTP, 85,
-                                                   listener_id, pool_id)
+        pool_id = self._listener_pool_create(const.HTTP, 85)[1]
+        self._test_mixed_ipv4_ipv6_members_traffic(const.HTTP, 85, pool_id)
 
     @testtools.skipIf(CONF.load_balancer.test_with_noop,
                       'Traffic tests will not work in noop mode.')
@@ -805,10 +808,9 @@
                           'Mixed IPv4/IPv6 member test requires IPv6.')
     @decorators.idempotent_id('c442ae84-0abc-4470-8c7e-14a07e92a6fa')
     def test_mixed_ipv4_ipv6_members_tcp_traffic(self):
-        listener_id, pool_id = self._listener_pool_create(const.TCP, 86)
+        pool_id = self._listener_pool_create(const.TCP, 86)[1]
         self._test_mixed_ipv4_ipv6_members_traffic(const.TCP, 86,
-                                                   listener_id, pool_id,
-                                                   persistent=False)
+                                                   pool_id, persistent=False)
 
     @testtools.skipIf(CONF.load_balancer.test_with_noop,
                       'Traffic tests will not work in noop mode.')
@@ -827,26 +829,26 @@
                 self.api_version, '2.1'):
             raise self.skipException('UDP listener support is only available '
                                      'in Octavia API version 2.1 or newer')
-        listener_id, pool_id = self._listener_pool_create(const.UDP, 8082)
-        self._test_mixed_ipv4_ipv6_members_traffic(const.UDP, 8082,
-                                                   listener_id, pool_id)
+        pool_id = self._listener_pool_create(const.UDP, 8082)[1]
+        self._test_mixed_ipv4_ipv6_members_traffic(const.UDP, 8082, pool_id)
 
     @testtools.skipIf(CONF.load_balancer.test_with_noop,
                       'Traffic tests will not work in noop mode.')
     @decorators.idempotent_id('a58063fb-b9e8-4cfc-8a8c-7b2e9e884e7a')
     def test_least_connections_http_traffic(self):
-        listener_id, pool_id = self._listener_pool_create(
+        pool_id = self._listener_pool_create(
             const.HTTP, 87,
-            pool_algorithm=const.LB_ALGORITHM_LEAST_CONNECTIONS)
-        self._test_basic_traffic(const.HTTP, 87, listener_id, pool_id)
+            pool_algorithm=const.LB_ALGORITHM_LEAST_CONNECTIONS)[1]
+        self._test_basic_traffic(const.HTTP, 87, pool_id)
 
     @testtools.skipIf(CONF.load_balancer.test_with_noop,
                       'Traffic tests will not work in noop mode.')
     @decorators.idempotent_id('e1056709-6a1a-4a15-80c2-5cbb8279f924')
     def test_least_connections_tcp_traffic(self):
-        listener_id, pool_id = self._listener_pool_create(
-            const.TCP, 88, pool_algorithm=const.LB_ALGORITHM_LEAST_CONNECTIONS)
-        self._test_basic_traffic(const.TCP, 88, listener_id, pool_id,
+        pool_id = self._listener_pool_create(
+            const.TCP, 88,
+            pool_algorithm=const.LB_ALGORITHM_LEAST_CONNECTIONS)[1]
+        self._test_basic_traffic(const.TCP, 88, pool_id,
                                  persistent=False, delay=0.2)
 
     @testtools.skipIf(CONF.load_balancer.test_with_noop,
@@ -857,28 +859,28 @@
                 self.api_version, '2.1'):
             raise self.skipException('UDP listener support is only available '
                                      'in Octavia API version 2.1 or newer')
-        listener_id, pool_id = self._listener_pool_create(
+        pool_id = self._listener_pool_create(
             const.UDP, 8083,
-            pool_algorithm=const.LB_ALGORITHM_LEAST_CONNECTIONS)
-        self._test_basic_traffic(const.UDP, 8083, listener_id, pool_id)
+            pool_algorithm=const.LB_ALGORITHM_LEAST_CONNECTIONS)[1]
+        self._test_basic_traffic(const.UDP, 8083, pool_id)
 
     @testtools.skipIf(CONF.load_balancer.test_with_noop,
                       'Traffic tests will not work in noop mode.')
     @decorators.idempotent_id('881cc3e9-a011-4043-b0e3-a6185f736053')
     def test_source_ip_http_traffic(self):
-        listener_id, pool_id = self._listener_pool_create(
+        pool_id = self._listener_pool_create(
             const.HTTP, 89,
-            pool_algorithm=const.LB_ALGORITHM_SOURCE_IP)
-        self._test_basic_traffic(const.HTTP, 89, listener_id, pool_id,
+            pool_algorithm=const.LB_ALGORITHM_SOURCE_IP)[1]
+        self._test_basic_traffic(const.HTTP, 89, pool_id,
                                  traffic_member_count=1, persistent=False)
 
     @testtools.skipIf(CONF.load_balancer.test_with_noop,
                       'Traffic tests will not work in noop mode.')
     @decorators.idempotent_id('4568db0e-4243-4191-a822-9d327a55fa64')
     def test_source_ip_tcp_traffic(self):
-        listener_id, pool_id = self._listener_pool_create(
-            const.TCP, 90, pool_algorithm=const.LB_ALGORITHM_SOURCE_IP)
-        self._test_basic_traffic(const.TCP, 90, listener_id, pool_id,
+        pool_id = self._listener_pool_create(
+            const.TCP, 90, pool_algorithm=const.LB_ALGORITHM_SOURCE_IP)[1]
+        self._test_basic_traffic(const.TCP, 90, pool_id,
                                  traffic_member_count=1, persistent=False)
 
     @testtools.skipIf(CONF.load_balancer.test_with_noop,
@@ -889,10 +891,10 @@
                 self.api_version, '2.1'):
             raise self.skipException('UDP listener support is only available '
                                      'in Octavia API version 2.1 or newer')
-        listener_id, pool_id = self._listener_pool_create(
+        pool_id = self._listener_pool_create(
             const.UDP, 8084,
-            pool_algorithm=const.LB_ALGORITHM_SOURCE_IP)
-        self._test_basic_traffic(const.UDP, 8084, listener_id, pool_id,
+            pool_algorithm=const.LB_ALGORITHM_SOURCE_IP)[1]
+        self._test_basic_traffic(const.UDP, 8084, pool_id,
                                  traffic_member_count=1, persistent=False)
 
     @testtools.skipIf(CONF.load_balancer.test_with_noop,
@@ -903,11 +905,11 @@
         # this test. Since it runs with not_implemented_is_error, we must
         # handle this test case special.
         try:
-            listener_id, pool_id = self._listener_pool_create(
+            pool_id = self._listener_pool_create(
                 const.HTTP, 60091,
-                pool_algorithm=const.LB_ALGORITHM_SOURCE_IP_PORT)
+                pool_algorithm=const.LB_ALGORITHM_SOURCE_IP_PORT)[1]
             self._test_basic_traffic(
-                const.HTTP, 60091, listener_id, pool_id,
+                const.HTTP, 60091, pool_id,
                 traffic_member_count=1, persistent=False, source_port=60091)
         except exceptions.NotImplemented as e:
             message = ("The configured provider driver '{driver}' "
@@ -931,7 +933,7 @@
             # Without a delay this can trigger a "Cannot assign requested
             # address" warning setting the source port, leading to failure
             self._test_basic_traffic(
-                const.TCP, 60092, listener_id, pool_id, traffic_member_count=1,
+                const.TCP, 60092, pool_id, traffic_member_count=1,
                 persistent=False, source_port=60092, delay=0.2)
         except exceptions.NotImplemented as e:
             message = ("The configured provider driver '{driver}' "
@@ -953,11 +955,11 @@
         # this test. Since it runs with not_implemented_is_error, we must
         # handle this test case special.
         try:
-            listener_id, pool_id = self._listener_pool_create(
+            pool_id = self._listener_pool_create(
                 const.UDP, 8085,
-                pool_algorithm=const.LB_ALGORITHM_SOURCE_IP_PORT)
+                pool_algorithm=const.LB_ALGORITHM_SOURCE_IP_PORT)[1]
             self._test_basic_traffic(
-                const.UDP, 8085, listener_id, pool_id, traffic_member_count=1,
+                const.UDP, 8085, pool_id, traffic_member_count=1,
                 persistent=False, source_port=8085)
         except exceptions.NotImplemented as e:
             message = ("The configured provider driver '{driver}' "
@@ -1322,3 +1324,81 @@
                                                  protocol_port)
         self.assertConsistentResponse(
             (None, None), url_for_vip, repeat=3, expect_connection_error=True)
+
+    @decorators.idempotent_id('d3a28e76-76bc-11eb-a7c3-74e5f9e2a801')
+    def test_insert_headers(self):
+        # Create listener, enable insert of "X_FORWARDED_FOR" HTTP header
+        listener_port = 102
+        listener_id, pool_id = self._listener_pool_create(
+            const.HTTP, listener_port, insert_headers_dic={
+                const.X_FORWARDED_FOR: "true"})
+        self._test_basic_traffic(
+            const.HTTP, listener_port, pool_id)
+
+        # Initiate HTTP traffic
+        test_url = 'http://{}:{}/request'.format(
+            self.lb_vip_address, listener_port)
+        data = self.validate_URL_response(test_url)
+        LOG.info('Received payload is: {}'.format(data))
+
+        # Detect source IP that is used to create TCP socket toward LB_VIP.
+        try:
+            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+            s.connect((self.lb_vip_address, listener_port))
+            client_source_ip = s.getsockname()[0]
+            s.close()
+        except Exception:
+            LOG.exception('Failed to initiate TCP socket toward LB_VIP')
+            raise Exception('LB_VIP is not available')
+
+        # Function needed to parse the received payload from backend.
+        # Returns dictionary of relevant headers if found.
+        def _data_parser(payload, relevant_headers):
+            retrieved_headers = {}
+            for line in payload.split('\n'):
+                try:
+                    key, value = line.split(': ', 1)
+                except ValueError:
+                    continue
+                if key in relevant_headers:
+                    retrieved_headers[key] = value.lower()
+            return retrieved_headers
+
+        # Make sure that "X_FORWARDED_FOR" header was inserted with
+        # expected IP (client_source_ip). Should present in data.
+        expected_headers = {const.X_FORWARDED_FOR: client_source_ip}
+        received_headers = _data_parser(data, expected_headers)
+        self.assertEqual(expected_headers, received_headers)
+
+        # Update listener to insert: "X_FORWARDED_PORT" and
+        # "X_FORWARDED_PROTO"type headers.
+        listener_kwargs = {
+            const.LISTENER_ID: listener_id,
+            const.INSERT_HEADERS: {
+                const.X_FORWARDED_PORT: "true",
+                const.X_FORWARDED_PROTO: "true"}}
+        self.mem_listener_client.update_listener(**listener_kwargs)
+        waiters.wait_for_status(self.mem_lb_client.show_loadbalancer,
+                                self.lb_id, const.PROVISIONING_STATUS,
+                                const.ACTIVE,
+                                CONF.load_balancer.check_interval,
+                                CONF.load_balancer.check_timeout)
+
+        # Initiate HTTP traffic
+        data = self.validate_URL_response(test_url)
+        LOG.info('Received payload is: {}'.format(data))
+        expected_headers = {const.X_FORWARDED_PORT: '{}'.format(
+            listener_port), const.X_FORWARDED_PROTO: const.HTTP.lower()}
+        received_headers = _data_parser(data, expected_headers)
+        self.assertEqual(expected_headers, received_headers)
+
+    @decorators.idempotent_id('2b05229c-0254-11eb-8610-74e5f9e2a801')
+    def test_tcp_and_udp_traffic_on_same_port(self):
+        common_vip_port = 103
+        listener_id_udp, pool_id_udp = self._listener_pool_create(
+            const.UDP, common_vip_port)
+        listener_id_tcp, pool_id_tcp = self._listener_pool_create(
+            const.TCP, common_vip_port)
+        self._test_basic_traffic(const.UDP, common_vip_port, pool_id_udp)
+        self._test_basic_traffic(const.TCP, common_vip_port, pool_id_tcp,
+                                 persistent=False)
diff --git a/octavia_tempest_plugin/tests/spare_pool_scenario/v2/test_spare_pool.py b/octavia_tempest_plugin/tests/spare_pool_scenario/v2/test_spare_pool.py
index e641bc4..0ccfe55 100644
--- a/octavia_tempest_plugin/tests/spare_pool_scenario/v2/test_spare_pool.py
+++ b/octavia_tempest_plugin/tests/spare_pool_scenario/v2/test_spare_pool.py
@@ -57,10 +57,10 @@
         * Send traffic through load balancer
         * Validate amphora spare pool size is restored
         """
-
+        amphora_client = self.os_admin.load_balancer_v2.AmphoraClient()
         # Check there is at least one amphora in spare pool
         spare_amps = waiters.wait_for_spare_amps(
-            self.os_admin.amphora_client.list_amphorae,
+            amphora_client.list_amphorae,
             CONF.load_balancer.lb_build_interval,
             CONF.load_balancer.lb_build_timeout)
 
@@ -100,7 +100,7 @@
 
         # Confirm the spare pool has changed since last check
         spare_amps_2 = waiters.wait_for_spare_amps(
-            self.os_admin.amphora_client.list_amphorae,
+            amphora_client.list_amphorae,
             CONF.load_balancer.lb_build_interval,
             CONF.load_balancer.lb_build_timeout)
         self.assertNotEqual(spare_amps, spare_amps_2)
@@ -180,12 +180,12 @@
 
         # Check there is at least one amphora in spare pool
         spare_amps = waiters.wait_for_spare_amps(
-            self.os_admin.amphora_client.list_amphorae,
+            amphora_client.list_amphorae,
             CONF.load_balancer.lb_build_interval,
             CONF.load_balancer.lb_build_timeout)
 
         # Delete amphora compute instance
-        amp = self.os_admin.amphora_client.list_amphorae(
+        amp = amphora_client.list_amphorae(
             query_params='{loadbalancer_id}={lb_id}'.format(
                 loadbalancer_id=const.LOADBALANCER_ID, lb_id=self.lb_id))
 
@@ -211,12 +211,12 @@
 
         # Confirm the spare pool has changed since last check
         spare_amps_2 = waiters.wait_for_spare_amps(
-            self.os_admin.amphora_client.list_amphorae,
+            amphora_client.list_amphorae,
             CONF.load_balancer.lb_build_interval,
             CONF.load_balancer.lb_build_timeout)
         self.assertNotEqual(spare_amps, spare_amps_2)
 
         # Check there is at least one amphora in spare pool
-        waiters.wait_for_spare_amps(self.os_admin.amphora_client.list_amphorae,
+        waiters.wait_for_spare_amps(amphora_client.list_amphorae,
                                     CONF.load_balancer.lb_build_interval,
                                     CONF.load_balancer.lb_build_timeout)
diff --git a/octavia_tempest_plugin/tests/test_base.py b/octavia_tempest_plugin/tests/test_base.py
index 8a2c3c7..e7a344d 100644
--- a/octavia_tempest_plugin/tests/test_base.py
+++ b/octavia_tempest_plugin/tests/test_base.py
@@ -30,9 +30,9 @@
 from tempest import test
 import tenacity
 
-from octavia_tempest_plugin import clients
 from octavia_tempest_plugin.common import cert_utils
 from octavia_tempest_plugin.common import constants as const
+from octavia_tempest_plugin.tests import RBAC_tests
 from octavia_tempest_plugin.tests import validators
 from octavia_tempest_plugin.tests import waiters
 
@@ -45,17 +45,49 @@
 RETRY_MAX = 5
 
 
-class LoadBalancerBaseTest(validators.ValidatorsMixin, test.BaseTestCase):
+class LoadBalancerBaseTest(validators.ValidatorsMixin,
+                           RBAC_tests.RBACTestsMixin, test.BaseTestCase):
     """Base class for load balancer tests."""
 
-    # Setup cls.os_roles_lb_member. cls.os_primary, cls.os_roles_lb_member,
-    # and cls.os_roles_lb_admin credentials.
-    credentials = ['admin', 'primary',
-                   ['lb_member', CONF.load_balancer.member_role],
-                   ['lb_member2', CONF.load_balancer.member_role],
-                   ['lb_admin', CONF.load_balancer.admin_role]]
+    if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+        credentials = [
+            'admin', 'primary', ['lb_admin', CONF.load_balancer.admin_role],
+            ['lb_member', CONF.load_balancer.member_role],
+            ['lb_member2', CONF.load_balancer.member_role]]
+    elif CONF.load_balancer.enforce_new_defaults:
+        credentials = [
+            'admin', 'primary', ['lb_admin', CONF.load_balancer.admin_role],
+            ['lb_observer', CONF.load_balancer.observer_role, 'reader'],
+            ['lb_global_observer', CONF.load_balancer.global_observer_role,
+             'reader'],
+            ['lb_member', CONF.load_balancer.member_role, 'member'],
+            ['lb_member2', CONF.load_balancer.member_role, 'member'],
+            ['lb_member_not_default_member', CONF.load_balancer.member_role]]
+    else:
+        credentials = [
+            'admin', 'primary', ['lb_admin', CONF.load_balancer.admin_role],
+            ['lb_observer', CONF.load_balancer.observer_role, 'reader'],
+            ['lb_global_observer', CONF.load_balancer.global_observer_role,
+             'reader'],
+            ['lb_member', CONF.load_balancer.member_role],
+            ['lb_member2', CONF.load_balancer.member_role]]
 
-    client_manager = clients.ManagerV2
+    # If scope enforcement is enabled, add in the system scope credentials.
+    # The project scope is already handled by the above credentials.
+    if CONF.enforce_scope.octavia:
+        credentials.extend(['system_admin', 'system_reader'])
+
+    # A tuple of credentials that will be allocated by tempest using the
+    # 'credentials' list above. These are used to build RBAC test lists.
+    allocated_creds = []
+    for cred in credentials:
+        if isinstance(cred, list):
+            allocated_creds.append('os_roles_' + cred[0])
+        else:
+            allocated_creds.append('os_' + cred)
+    # Tests shall not mess with the list of allocated credentials
+    allocated_credentials = tuple(allocated_creds)
+
     webserver1_response = 1
     webserver2_response = 5
     used_ips = []
@@ -102,10 +134,36 @@
         cls.set_network_resources()
         super(LoadBalancerBaseTest, cls).setup_credentials()
 
+        # Log the user roles for this test run
+        role_name_cache = {}
+        for cred in cls.credentials:
+            user_roles = []
+            if isinstance(cred, list):
+                user_name = cred[0]
+                cred_obj = getattr(cls, 'os_roles_' + cred[0])
+            else:
+                user_name = cred
+                cred_obj = getattr(cls, 'os_' + cred)
+            params = {'user.id': cred_obj.credentials.user_id,
+                      'project.id': cred_obj.credentials.project_id}
+            roles = cls.os_admin.role_assignments_client.list_role_assignments(
+                **params)['role_assignments']
+            for role in roles:
+                role_id = role['role']['id']
+                try:
+                    role_name = role_name_cache[role_id]
+                except KeyError:
+                    role_name = cls.os_admin.roles_v3_client.show_role(
+                        role_id)['role']['name']
+                    role_name_cache[role_id] = role_name
+                user_roles.append([role_name, role['scope']])
+            LOG.info("User %s has roles: %s", user_name, user_roles)
+
     @classmethod
     def setup_clients(cls):
         """Setup client aliases."""
         super(LoadBalancerBaseTest, cls).setup_clients()
+        lb_admin_prefix = cls.os_roles_lb_admin.load_balancer_v2
         cls.lb_mem_float_ip_client = cls.os_roles_lb_member.floating_ips_client
         cls.lb_mem_keypairs_client = cls.os_roles_lb_member.keypairs_client
         cls.lb_mem_net_client = cls.os_roles_lb_member.networks_client
@@ -116,31 +174,41 @@
             cls.os_roles_lb_member.security_group_rules_client)
         cls.lb_mem_servers_client = cls.os_roles_lb_member.servers_client
         cls.lb_mem_subnet_client = cls.os_roles_lb_member.subnets_client
-        cls.mem_lb_client = cls.os_roles_lb_member.loadbalancer_client
-        cls.mem_listener_client = cls.os_roles_lb_member.listener_client
-        cls.mem_pool_client = cls.os_roles_lb_member.pool_client
-        cls.mem_member_client = cls.os_roles_lb_member.member_client
+        cls.mem_lb_client = (
+            cls.os_roles_lb_member.load_balancer_v2.LoadbalancerClient())
+        cls.mem_listener_client = (
+            cls.os_roles_lb_member.load_balancer_v2.ListenerClient())
+        cls.mem_pool_client = (
+            cls.os_roles_lb_member.load_balancer_v2.PoolClient())
+        cls.mem_member_client = (
+            cls.os_roles_lb_member.load_balancer_v2.MemberClient())
         cls.mem_healthmonitor_client = (
-            cls.os_roles_lb_member.healthmonitor_client)
-        cls.mem_l7policy_client = cls.os_roles_lb_member.l7policy_client
-        cls.mem_l7rule_client = cls.os_roles_lb_member.l7rule_client
-        cls.lb_admin_amphora_client = cls.os_roles_lb_admin.amphora_client
+            cls.os_roles_lb_member.load_balancer_v2.HealthMonitorClient())
+        cls.mem_l7policy_client = (
+            cls.os_roles_lb_member.load_balancer_v2.L7PolicyClient())
+        cls.mem_l7rule_client = (
+            cls.os_roles_lb_member.load_balancer_v2.L7RuleClient())
+        cls.lb_admin_amphora_client = lb_admin_prefix.AmphoraClient()
         cls.lb_admin_flavor_profile_client = (
-            cls.os_roles_lb_admin.flavor_profile_client)
-        cls.lb_admin_flavor_client = cls.os_roles_lb_admin.flavor_client
-        cls.mem_flavor_client = cls.os_roles_lb_member.flavor_client
-        cls.mem_provider_client = cls.os_roles_lb_member.provider_client
+            lb_admin_prefix.FlavorProfileClient())
+        cls.lb_admin_flavor_client = lb_admin_prefix.FlavorClient()
+        cls.mem_flavor_client = (
+            cls.os_roles_lb_member.load_balancer_v2.FlavorClient())
+        cls.mem_provider_client = (
+            cls.os_roles_lb_member.load_balancer_v2.ProviderClient())
         cls.os_admin_servers_client = cls.os_admin.servers_client
+        cls.os_admin_routers_client = cls.os_admin.routers_client
+        cls.os_admin_subnetpools_client = cls.os_admin.subnetpools_client
         cls.lb_admin_flavor_capabilities_client = (
-            cls.os_roles_lb_admin.flavor_capabilities_client)
+            lb_admin_prefix.FlavorCapabilitiesClient())
         cls.lb_admin_availability_zone_capabilities_client = (
-            cls.os_roles_lb_admin.availability_zone_capabilities_client)
+            lb_admin_prefix.AvailabilityZoneCapabilitiesClient())
         cls.lb_admin_availability_zone_profile_client = (
-            cls.os_roles_lb_admin.availability_zone_profile_client)
+            lb_admin_prefix.AvailabilityZoneProfileClient())
         cls.lb_admin_availability_zone_client = (
-            cls.os_roles_lb_admin.availability_zone_client)
+            lb_admin_prefix.AvailabilityZoneClient())
         cls.mem_availability_zone_client = (
-            cls.os_roles_lb_member.availability_zone_client)
+            cls.os_roles_lb_member.load_balancer_v2.AvailabilityZoneClient())
 
     @classmethod
     def resource_setup(cls):
@@ -327,33 +395,38 @@
 
         # Create tenant VIP IPv6 subnet
         if CONF.load_balancer.test_with_ipv6:
-            # See if ipv6-private-subnet exists and use it if so.
-            priv_ipv6_subnet = cls.os_admin.subnets_client.list_subnets(
-                name='ipv6-private-subnet')['subnets']
-
             cls.lb_member_vip_ipv6_subnet_stateful = False
-            if len(priv_ipv6_subnet) == 1:
-                if (priv_ipv6_subnet[0]['ipv6_address_mode'] ==
-                        'dhcpv6-stateful'):
-                    cls.lb_member_vip_ipv6_subnet_stateful = True
-                cls.lb_member_vip_ipv6_subnet = priv_ipv6_subnet[0]
-                cls.lb_member_vip_ipv6_net = {
-                    'id': priv_ipv6_subnet[0]['network_id']}
-            else:
-                subnet_kwargs = {
-                    'name': data_utils.rand_name("lb_member_vip_ipv6_subnet"),
-                    'network_id': cls.lb_member_vip_net['id'],
-                    'cidr': CONF.load_balancer.vip_ipv6_subnet_cidr,
-                    'ip_version': 6}
-                result = cls.lb_mem_subnet_client.create_subnet(
-                    **subnet_kwargs)
-                cls.lb_member_vip_ipv6_net = cls.lb_member_vip_net
-                cls.lb_member_vip_ipv6_subnet = result['subnet']
-                cls.addClassResourceCleanup(
-                    waiters.wait_for_not_found,
-                    cls._logging_delete_subnet,
-                    cls.lb_mem_subnet_client.show_subnet,
-                    cls.lb_member_vip_ipv6_subnet['id'])
+            cls.lb_member_vip_ipv6_subnet_use_subnetpool = False
+            subnet_kwargs = {
+                'name': data_utils.rand_name("lb_member_vip_ipv6_subnet"),
+                'network_id': cls.lb_member_vip_net['id'],
+                'ip_version': 6}
+
+            # Use a CIDR from devstack's default IPv6 subnetpool if it exists,
+            # the subnetpool's cidr is routable from the devstack node
+            # through the default router
+            subnetpool_name = CONF.load_balancer.default_ipv6_subnetpool
+            if subnetpool_name:
+                subnetpool = cls.os_admin_subnetpools_client.list_subnetpools(
+                    name=subnetpool_name)['subnetpools']
+                if len(subnetpool) == 1:
+                    subnetpool = subnetpool[0]
+                    subnet_kwargs['subnetpool_id'] = subnetpool['id']
+                    cls.lb_member_vip_ipv6_subnet_use_subnetpool = True
+
+            if 'subnetpool_id' not in subnet_kwargs:
+                subnet_kwargs['cidr'] = (
+                    CONF.load_balancer.vip_ipv6_subnet_cidr)
+
+            result = cls.lb_mem_subnet_client.create_subnet(
+                **subnet_kwargs)
+            cls.lb_member_vip_ipv6_net = cls.lb_member_vip_net
+            cls.lb_member_vip_ipv6_subnet = result['subnet']
+            cls.addClassResourceCleanup(
+                waiters.wait_for_not_found,
+                cls._logging_delete_subnet,
+                cls.lb_mem_subnet_client.show_subnet,
+                cls.lb_member_vip_ipv6_subnet['id'])
 
             LOG.info('lb_member_vip_ipv6_subnet: {}'.format(
                 cls.lb_member_vip_ipv6_subnet))
@@ -769,6 +842,30 @@
             cls.lb_member_router['id'],
             subnet_id=cls.lb_member_vip_subnet['id'])
 
+        if (CONF.load_balancer.test_with_ipv6 and
+                CONF.load_balancer.default_router and
+                cls.lb_member_vip_ipv6_subnet_use_subnetpool):
+
+            router_name = CONF.load_balancer.default_router
+            # if lb_member_vip_ipv6_subnet uses devstack's subnetpool,
+            # plug the subnet into the default router
+            router = cls.os_admin.routers_client.list_routers(
+                name=router_name)['routers']
+
+            if len(router) == 1:
+                router = router[0]
+
+                # Add IPv6 VIP subnet to router1
+                cls.os_admin_routers_client.add_router_interface(
+                    router['id'],
+                    subnet_id=cls.lb_member_vip_ipv6_subnet['id'])
+                cls.addClassResourceCleanup(
+                    waiters.wait_for_not_found,
+                    cls.os_admin_routers_client.remove_router_interface,
+                    cls.os_admin_routers_client.remove_router_interface,
+                    router['id'],
+                    subnet_id=cls.lb_member_vip_ipv6_subnet['id'])
+
         # Add member subnet 1 to router
         cls.lb_mem_routers_client.add_router_interface(
             cls.lb_member_router['id'],
diff --git a/playbooks/prepare-ovn-multinode.yaml b/playbooks/prepare-ovn-multinode.yaml
new file mode 100644
index 0000000..a653a6c
--- /dev/null
+++ b/playbooks/prepare-ovn-multinode.yaml
@@ -0,0 +1,4 @@
+- hosts: all
+  roles:
+    - multi-node-bridge
+    - multi-node-setup
diff --git a/releasenotes/notes/Add-RBAC-scoped-tokens-tests-920aa35faf4a8c9d.yaml b/releasenotes/notes/Add-RBAC-scoped-tokens-tests-920aa35faf4a8c9d.yaml
new file mode 100644
index 0000000..d2d5d23
--- /dev/null
+++ b/releasenotes/notes/Add-RBAC-scoped-tokens-tests-920aa35faf4a8c9d.yaml
@@ -0,0 +1,17 @@
+---
+features:
+  - |
+    Added API test support for keystone default roles and scoped tokens.
+issues:
+  - |
+    Currently the API tests will not pass with the
+    keystone_default_roles-policy.yaml override file. This is due to the
+    tempest framework credentials do not yet support token scopes.
+    This issue is tracked in https://bugs.launchpad.net/tempest/+bug/1917168
+    Once that bug is fixed, octavia-tempest-plugin can be updated to use the
+    required scope in the test credentials.
+upgrade:
+  - |
+    Two new tempest.conf settings enable/disable keystone default roles and
+    scoped token testing, [enforce_scope] octavia = True/False and
+    [load_balancer] enforce_new_defaults = True/False.
diff --git a/zuul.d/jobs.yaml b/zuul.d/jobs.yaml
index 20aa0b4..eb5a593 100644
--- a/zuul.d/jobs.yaml
+++ b/zuul.d/jobs.yaml
@@ -9,6 +9,16 @@
           - controller
 
 - nodeset:
+    name: octavia-single-node-ubuntu-focal
+    nodes:
+      - name: controller
+        label: nested-virt-ubuntu-focal
+    groups:
+      - name: tempest
+        nodes:
+          - controller
+
+- nodeset:
     name: octavia-single-node-centos-7
     nodes:
       - name: controller
@@ -32,9 +42,9 @@
     name: octavia-two-node
     nodes:
       - name: controller
-        label: nested-virt-ubuntu-bionic
+        label: nested-virt-ubuntu-focal
       - name: controller2
-        label: nested-virt-ubuntu-bionic
+        label: nested-virt-ubuntu-focal
     groups:
       - name: controller
         nodes:
@@ -179,7 +189,7 @@
 - job:
     name: octavia-dsvm-live-base
     parent: octavia-dsvm-base
-    nodeset: octavia-single-node-ubuntu-bionic
+    nodeset: octavia-single-node-ubuntu-focal
     timeout: 9000
     required-projects:
       - openstack/diskimage-builder
@@ -215,7 +225,7 @@
 - job:
     name: octavia-dsvm-live-base-ipv6-only
     parent: octavia-dsvm-base-ipv6-only
-    nodeset: octavia-single-node-ubuntu-bionic
+    nodeset: octavia-single-node-ubuntu-focal
     timeout: 9000
     required-projects:
       - openstack/diskimage-builder
@@ -248,9 +258,17 @@
     name: octavia-dsvm-live-two-node-base
     parent: octavia-dsvm-base
     nodeset: octavia-two-node
-    timeout: 9000
+    timeout: 10800
     required-projects:
       - openstack/diskimage-builder
+    roles:
+      - zuul: openstack/neutron-tempest-plugin
+    pre-run: playbooks/prepare-ovn-multinode.yaml
+    vars:
+      zuul_copy_output:
+        '/var/log/dib-build': logs
+        '/var/log/octavia-amphora.log': logs
+        '/var/log/octavia-tenant-traffic.log': logs
     host-vars:
       controller:
         configure_swap_size: 8192
@@ -290,10 +308,6 @@
           octavia-tempest-plugin: https://opendev.org/openstack/octavia-tempest-plugin.git
         tempest_plugins:
           - octavia-tempest-plugin
-        zuul_copy_output:
-          '/var/log/dib-build' : logs
-          '/var/log/octavia-amphora.log': logs
-          '/var/log/octavia-tenant-traffic.log': logs
       controller2:
         configure_swap_size: 8192
         devstack_localrc:
@@ -330,13 +344,8 @@
           OCTAVIA_USE_PREGENERATED_CERTS: true
           OCTAVIA_MGMT_PORT_IP: 192.168.0.4
         devstack_plugins:
+          neutron: https://opendev.org/openstack/neutron.git
           octavia: https://opendev.org/openstack/octavia.git
-          octavia-tempest-plugin: https://opendev.org/openstack/octavia-tempest-plugin.git
-        tempest_plugins:
-          - octavia-tempest-plugin
-        zuul_copy_output:
-          '/var/log/octavia-amphora.log': logs
-          '/var/log/octavia-tenant-traffic.log': logs
     group-vars:
       controller:
         devstack_local_conf:
@@ -348,6 +357,9 @@
                 api_v1_enabled: False
               amphora_agent:
                 forward_all_logs: True
+            "/$NEUTRON_CORE_PLUGIN_CONF":
+              ovn:
+                enable_distributed_floating_ip: True
         devstack_services:
           base: false
           barbican: false
@@ -371,11 +383,16 @@
           o-cw: true
           o-hm: true
           o-hk: true
+          ovn-controller: true
+          ovn-northd: true
+          ovn-vswitchd: true
+          ovsdb-server: true
           placement-api: true
-          q-agt: true
-          q-dhcp: true
-          q-l3: true
-          q-meta: true
+          q-agt: false
+          q-dhcp: false
+          q-l3: false
+          q-meta: false
+          q-ovn-metadata-agent: true
           q-svc: true
           rabbit: true
           tempest: true
@@ -390,6 +407,9 @@
                 api_v1_enabled: False
               amphora_agent:
                 forward_all_logs: True
+            "/$NEUTRON_CORE_PLUGIN_CONF":
+              ovn:
+                enable_distributed_floating_ip: True
         devstack_services:
           c-vol: false
           c-bak: false
@@ -398,7 +418,16 @@
           o-cw: true
           o-hm: true
           o-hk: true
-          q-agt: true
+          ovn-controller: true
+          ovn-northd: false
+          ovn-vswitchd: true
+          ovsdb-server: true
+          q-fake: true
+          q-agt: false
+          q-dhcp: false
+          q-l3: false
+          q-meta: false
+          q-ovn-metadata-agent: true
 
 - job:
     name: octavia-dsvm-noop-base
@@ -434,6 +463,7 @@
 - job:
     name: octavia-v2-dsvm-noop-api
     parent: octavia-dsvm-noop-base
+    timeout: 10800
     vars:
       devstack_local_conf:
         post-config:
@@ -456,6 +486,23 @@
       - ^octavia_tempest_plugin/tests/(?!api/|\w+\.py).*
 
 - job:
+    name: octavia-v2-dsvm-noop-api-scoped-tokens
+    parent: octavia-v2-dsvm-noop-api
+    vars:
+      devstack_local_conf:
+        post-config:
+          $OCTAVIA_CONF:
+            oslo_policy:
+              enforce_scope: True
+              enforce_new_defaults: True
+        test-config:
+          "$TEMPEST_CONFIG":
+            enforce_scope:
+              octavia: True
+            load_balancer:
+              enforce_new_defaults: True
+
+- job:
     name: octavia-v2-dsvm-noop-py2-api
     parent: octavia-v2-dsvm-noop-api
     vars:
@@ -463,6 +510,11 @@
         USE_PYTHON3: False
 
 - job:
+    name: octavia-v2-dsvm-noop-api-stable-wallaby
+    parent: octavia-v2-dsvm-noop-api
+    override-checkout: stable/wallaby
+
+- job:
     name: octavia-v2-dsvm-noop-api-stable-victoria
     parent: octavia-v2-dsvm-noop-api
     override-checkout: stable/victoria
@@ -470,11 +522,13 @@
 - job:
     name: octavia-v2-dsvm-noop-api-stable-ussuri
     parent: octavia-v2-dsvm-noop-api
+    nodeset: octavia-single-node-ubuntu-bionic
     override-checkout: stable/ussuri
 
 - job:
     name: octavia-v2-dsvm-noop-api-stable-train
     parent: octavia-v2-dsvm-noop-api
+    nodeset: octavia-single-node-ubuntu-bionic
     override-checkout: stable/train
 
 - job:
@@ -532,6 +586,10 @@
         override-checkout: 2.30.0
 
 - job:
+    name: octavia-v2-dsvm-scenario-stable-wallaby
+    parent: octavia-v2-dsvm-scenario
+    override-checkout: stable/wallaby
+- job:
     name: octavia-v2-dsvm-scenario-stable-victoria
     parent: octavia-v2-dsvm-scenario
     override-checkout: stable/victoria
@@ -539,11 +597,13 @@
 - job:
     name: octavia-v2-dsvm-scenario-stable-ussuri
     parent: octavia-v2-dsvm-scenario
+    nodeset: octavia-single-node-ubuntu-bionic
     override-checkout: stable/ussuri
 
 - job:
     name: octavia-v2-dsvm-scenario-stable-train
     parent: octavia-v2-dsvm-scenario
+    nodeset: octavia-single-node-ubuntu-bionic
     override-checkout: stable/train
 
 # Legacy jobs for the transition to the act-stdby two node jobs
@@ -583,19 +643,30 @@
 - job:
     name: octavia-v2-act-stdby-dsvm-scenario-two-node
     parent: octavia-dsvm-live-two-node-base
-    vars:
-      tempest_concurrency: 2
-      tempest_test_regex: ^octavia_tempest_plugin.tests.scenario.v2
-      tox_envlist: all
-      devstack_local_conf:
-        post-config:
+    group-vars:
+      controller:
+        tempest_concurrency: 2
+        tempest_test_regex: ^octavia_tempest_plugin.tests.scenario.v2
+        tox_envlist: all
+        devstack_local_conf:
+          post-config:
             $OCTAVIA_CONF:
               nova:
                 enable_anti_affinity: True
-        test-config:
-          "$TEMPEST_CONFIG":
-            load_balancer:
-              loadbalancer_topology: ACTIVE_STANDBY
+              controller_worker:
+                loadbalancer_topology: ACTIVE_STANDBY
+          test-config:
+            "$TEMPEST_CONFIG":
+              load_balancer:
+                loadbalancer_topology: ACTIVE_STANDBY
+      subnode:
+        devstack_local_conf:
+          post-config:
+            $OCTAVIA_CONF:
+              nova:
+                enable_anti_affinity: True
+              controller_worker:
+                loadbalancer_topology: ACTIVE_STANDBY
 
 - job:
     name: octavia-v2-dsvm-py2-scenario-centos-7
@@ -618,23 +689,12 @@
         OCTAVIA_AMP_IMAGE_SIZE: 3
 
 - job:
-    name: octavia-v2-dsvm-scenario-ubuntu-bionic
+    name: octavia-v2-dsvm-scenario-ubuntu-focal
     parent: octavia-v2-dsvm-scenario
     vars:
       devstack_localrc:
         OCTAVIA_AMP_BASE_OS: ubuntu
-        OCTAVIA_AMP_DISTRIBUTION_RELEASE_ID: bionic
-
-- job:
-    name: octavia-v2-dsvm-scenario-ubuntu-xenial
-    parent: octavia-v2-dsvm-scenario
-    nodeset: openstack-single-node-xenial
-    vars:
-      devstack_localrc:
-        OCTAVIA_AMP_BASE_OS: ubuntu
-        OCTAVIA_AMP_DISTRIBUTION_RELEASE_ID: xenial
-        USE_PYTHON3: false
-        TEMPEST_BRANCH: 23.0.0
+        OCTAVIA_AMP_DISTRIBUTION_RELEASE_ID: focal
 
 - job:
     name: octavia-v2-dsvm-tls-barbican
@@ -662,6 +722,11 @@
       - ^octavia_tempest_plugin/tests/(?!barbican_scenario/|\w+\.py).*
 
 - job:
+    name: octavia-v2-dsvm-tls-barbican-stable-wallaby
+    parent: octavia-v2-dsvm-tls-barbican
+    override-checkout: stable/wallaby
+
+- job:
     name: octavia-v2-dsvm-tls-barbican-stable-victoria
     parent: octavia-v2-dsvm-tls-barbican
     override-checkout: stable/victoria
@@ -669,11 +734,13 @@
 - job:
     name: octavia-v2-dsvm-tls-barbican-stable-ussuri
     parent: octavia-v2-dsvm-tls-barbican
+    nodeset: octavia-single-node-ubuntu-bionic
     override-checkout: stable/ussuri
 
 - job:
     name: octavia-v2-dsvm-tls-barbican-stable-train
     parent: octavia-v2-dsvm-tls-barbican
+    nodeset: octavia-single-node-ubuntu-bionic
     override-checkout: stable/train
 
 - job:
@@ -714,6 +781,11 @@
         override-checkout: 2.30.0
 
 - job:
+    name: octavia-v2-dsvm-spare-pool-stable-wallaby
+    parent: octavia-v2-dsvm-spare-pool
+    override-checkout: stable/wallaby
+
+- job:
     name: octavia-v2-dsvm-spare-pool-stable-victoria
     parent: octavia-v2-dsvm-spare-pool
     override-checkout: stable/victoria
@@ -721,11 +793,13 @@
 - job:
     name: octavia-v2-dsvm-spare-pool-stable-ussuri
     parent: octavia-v2-dsvm-spare-pool
+    nodeset: octavia-single-node-ubuntu-bionic
     override-checkout: stable/ussuri
 
 - job:
     name: octavia-v2-dsvm-spare-pool-stable-train
     parent: octavia-v2-dsvm-spare-pool
+    nodeset: octavia-single-node-ubuntu-bionic
     override-checkout: stable/train
 
 - job:
@@ -858,6 +932,11 @@
       tox_envlist: all
 
 - job:
+    name: octavia-v2-act-stdby-dsvm-scenario-stable-wallaby
+    parent: octavia-v2-act-stdby-dsvm-scenario
+    override-checkout: stable/wallaby
+
+- job:
     name: octavia-v2-act-stdby-dsvm-scenario-stable-victoria
     parent: octavia-v2-act-stdby-dsvm-scenario
     override-checkout: stable/victoria
@@ -865,11 +944,13 @@
 - job:
     name: octavia-v2-act-stdby-dsvm-scenario-stable-ussuri
     parent: octavia-v2-act-stdby-dsvm-scenario
+    nodeset: octavia-single-node-ubuntu-bionic
     override-checkout: stable/ussuri
 
 - job:
     name: octavia-v2-act-stdby-dsvm-scenario-stable-train
     parent: octavia-v2-act-stdby-dsvm-scenario
+    nodeset: octavia-single-node-ubuntu-bionic
     override-checkout: stable/train
 
 ######### Third party jobs ##########
diff --git a/zuul.d/projects.yaml b/zuul.d/projects.yaml
index d7a5ea4..02d3ac7 100644
--- a/zuul.d/projects.yaml
+++ b/zuul.d/projects.yaml
@@ -9,14 +9,18 @@
     check:
       jobs:
         - octavia-v2-dsvm-noop-api
+        - octavia-v2-dsvm-noop-api-stable-wallaby
         - octavia-v2-dsvm-noop-api-stable-victoria
         - octavia-v2-dsvm-noop-api-stable-ussuri
         - octavia-v2-dsvm-noop-api-stable-train
+        - octavia-v2-dsvm-noop-api-scoped-tokens
         - octavia-v2-dsvm-scenario
+        - octavia-v2-dsvm-scenario-stable-wallaby
         - octavia-v2-dsvm-scenario-stable-victoria
         - octavia-v2-dsvm-scenario-stable-ussuri
         - octavia-v2-dsvm-scenario-stable-train
         - octavia-v2-dsvm-tls-barbican
+        - octavia-v2-dsvm-tls-barbican-stable-wallaby
         - octavia-v2-dsvm-tls-barbican-stable-victoria
         - octavia-v2-dsvm-tls-barbican-stable-ussuri
         - octavia-v2-dsvm-tls-barbican-stable-train
@@ -28,6 +32,8 @@
             voting: false
         - octavia-v2-act-stdby-dsvm-scenario:
             voting: false
+        - octavia-v2-act-stdby-dsvm-scenario-stable-wallaby:
+            voting: false
         - octavia-v2-act-stdby-dsvm-scenario-stable-victoria:
             voting: false
         - octavia-v2-act-stdby-dsvm-scenario-stable-ussuri:
@@ -36,6 +42,8 @@
             voting: false
         - octavia-v2-dsvm-spare-pool:
             voting: false
+        - octavia-v2-dsvm-spare-pool-stable-wallaby:
+            voting: false
         - octavia-v2-dsvm-spare-pool-stable-victoria:
             voting: false
         - octavia-v2-dsvm-spare-pool-stable-ussuri:
@@ -54,14 +62,18 @@
       queue: octavia
       jobs:
         - octavia-v2-dsvm-noop-api
+        - octavia-v2-dsvm-noop-api-stable-wallaby
         - octavia-v2-dsvm-noop-api-stable-victoria
         - octavia-v2-dsvm-noop-api-stable-ussuri
         - octavia-v2-dsvm-noop-api-stable-train
+        - octavia-v2-dsvm-noop-api-scoped-tokens
         - octavia-v2-dsvm-scenario
+        - octavia-v2-dsvm-scenario-stable-wallaby
         - octavia-v2-dsvm-scenario-stable-victoria
         - octavia-v2-dsvm-scenario-stable-ussuri
         - octavia-v2-dsvm-scenario-stable-train
         - octavia-v2-dsvm-tls-barbican
+        - octavia-v2-dsvm-tls-barbican-stable-wallaby
         - octavia-v2-dsvm-tls-barbican-stable-victoria
         - octavia-v2-dsvm-tls-barbican-stable-ussuri
         - octavia-v2-dsvm-tls-barbican-stable-train