Merge "Adds a pool re-encryption scenario test"
diff --git a/devstack/plugin.sh b/devstack/plugin.sh
index 5eac133..fb4e932 100644
--- a/devstack/plugin.sh
+++ b/devstack/plugin.sh
@@ -24,6 +24,15 @@
${DEST}/octavia-tempest-plugin/octavia_tempest_plugin/contrib/test_server/test_server.go
}
+function _configure_tempest {
+ if [ -n "$Q_ROUTER_NAME" ]; then
+ iniset $TEMPEST_CONFIG load_balancer default_router "$Q_ROUTER_NAME"
+ fi
+ if [ -n "$SUBNETPOOL_NAME_V6" ]; then
+ iniset $TEMPEST_CONFIG load_balancer default_ipv6_subnetpool "$SUBNETPOOL_NAME_V6"
+ fi
+}
+
if [[ "$1" == "stack" ]]; then
case "$2" in
install)
@@ -40,6 +49,7 @@
test-config)
echo_summary "Building backend test server"
build_backend_test_server
+ _configure_tempest
;;
esac
fi
diff --git a/octavia_tempest_plugin/clients.py b/octavia_tempest_plugin/clients.py
deleted file mode 100644
index 7fe3606..0000000
--- a/octavia_tempest_plugin/clients.py
+++ /dev/null
@@ -1,90 +0,0 @@
-# Copyright 2017 GoDaddy
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-#
-from tempest import clients
-from tempest import config
-
-from octavia_tempest_plugin.services.load_balancer.v2 import (
- amphora_client)
-from octavia_tempest_plugin.services.load_balancer.v2 import (
- availability_zone_capabilities_client)
-from octavia_tempest_plugin.services.load_balancer.v2 import (
- availability_zone_client)
-from octavia_tempest_plugin.services.load_balancer.v2 import (
- availability_zone_profile_client)
-from octavia_tempest_plugin.services.load_balancer.v2 import (
- flavor_capabilities_client)
-from octavia_tempest_plugin.services.load_balancer.v2 import (
- flavor_client)
-from octavia_tempest_plugin.services.load_balancer.v2 import (
- flavor_profile_client)
-from octavia_tempest_plugin.services.load_balancer.v2 import (
- healthmonitor_client)
-from octavia_tempest_plugin.services.load_balancer.v2 import (
- l7policy_client)
-from octavia_tempest_plugin.services.load_balancer.v2 import (
- l7rule_client)
-from octavia_tempest_plugin.services.load_balancer.v2 import (
- listener_client)
-from octavia_tempest_plugin.services.load_balancer.v2 import (
- loadbalancer_client)
-from octavia_tempest_plugin.services.load_balancer.v2 import (
- member_client)
-from octavia_tempest_plugin.services.load_balancer.v2 import (
- pool_client)
-from octavia_tempest_plugin.services.load_balancer.v2 import (
- provider_client)
-
-CONF = config.CONF
-
-
-class ManagerV2(clients.Manager):
-
- def __init__(self, credentials):
- super(ManagerV2, self).__init__(credentials)
-
- params = dict(self.default_params)
- params.update({
- 'auth_provider': self.auth_provider,
- 'service': CONF.load_balancer.catalog_type,
- 'region': CONF.load_balancer.region or CONF.identity.region,
- 'endpoint_type': CONF.load_balancer.endpoint_type,
- 'build_interval': CONF.load_balancer.build_interval,
- 'build_timeout': CONF.load_balancer.build_timeout
- })
-
- self.loadbalancer_client = loadbalancer_client.LoadbalancerClient(
- **params)
- self.listener_client = listener_client.ListenerClient(**params)
- self.pool_client = pool_client.PoolClient(**params)
- self.member_client = member_client.MemberClient(**params)
- self.healthmonitor_client = healthmonitor_client.HealthMonitorClient(
- **params)
- self.l7policy_client = l7policy_client.L7PolicyClient(**params)
- self.l7rule_client = l7rule_client.L7RuleClient(**params)
- self.amphora_client = amphora_client.AmphoraClient(**params)
- self.flavor_profile_client = flavor_profile_client.FlavorProfileClient(
- **params)
- self.flavor_client = flavor_client.FlavorClient(**params)
- self.provider_client = provider_client.ProviderClient(**params)
- self.flavor_capabilities_client = (
- flavor_capabilities_client.FlavorCapabilitiesClient(**params))
- self.availability_zone_capabilities_client = (
- availability_zone_capabilities_client
- .AvailabilityZoneCapabilitiesClient(**params))
- self.availability_zone_profile_client = (
- availability_zone_profile_client.AvailabilityZoneProfileClient(
- **params))
- self.availability_zone_client = (
- availability_zone_client.AvailabilityZoneClient(**params))
diff --git a/octavia_tempest_plugin/common/constants.py b/octavia_tempest_plugin/common/constants.py
index d466082..174589c 100644
--- a/octavia_tempest_plugin/common/constants.py
+++ b/octavia_tempest_plugin/common/constants.py
@@ -186,6 +186,7 @@
# RBAC options
ADVANCED = 'advanced'
+KEYSTONE_DEFAULT_ROLES = 'keystone_default_roles'
OWNERADMIN = 'owner_or_admin'
NONE = 'none'
diff --git a/octavia_tempest_plugin/config.py b/octavia_tempest_plugin/config.py
index f44bf96..4d1543b 100644
--- a/octavia_tempest_plugin/config.py
+++ b/octavia_tempest_plugin/config.py
@@ -86,6 +86,12 @@
cfg.StrOpt('admin_role',
default='load-balancer_admin',
help='The load balancing admin RBAC role.'),
+ cfg.StrOpt('observer_role',
+ default='load-balancer_observer',
+ help='The load balancing observer RBAC role.'),
+ cfg.StrOpt('global_observer_role',
+ default='load-balancer_global_observer',
+ help='The load balancing global observer RBAC role.'),
cfg.IntOpt('scp_connection_timeout',
default=5,
help='Timeout in seconds to wait for a '
@@ -97,10 +103,13 @@
default='octavia',
help='The provider driver to use for the tests.'),
cfg.StrOpt('RBAC_test_type', default=const.ADVANCED,
- choices=[const.ADVANCED, const.OWNERADMIN, const.NONE],
+ choices=[const.ADVANCED, const.KEYSTONE_DEFAULT_ROLES,
+ const.OWNERADMIN, const.NONE],
help='Type of RBAC tests to run. "advanced" runs the octavia '
'default RBAC tests. "owner_or_admin" runs the legacy '
- 'owner or admin tests. "none" disables the RBAC tests.'),
+ 'owner or admin tests. "keystone_default_roles" runs the '
+ 'tests using only the keystone default roles. "none" '
+ 'disables the RBAC tests.'),
cfg.DictOpt('enabled_provider_drivers',
help=('A comma separated list of dictionaries of the '
'enabled provider driver names and descriptions. '
@@ -179,6 +188,13 @@
cfg.StrOpt('member_2_ipv6_subnet_cidr',
default='fd77:1457:4cf0:26a8::/64',
help='CIDR format subnet to use for the member 1 ipv6 subnet.'),
+ cfg.StrOpt('default_router',
+ default='router1',
+ help='The default router connected to the public network.'),
+ cfg.StrOpt('default_ipv6_subnetpool',
+ default='shared-default-subnetpool-v6',
+ help='The default IPv6 subnetpool to use when creating the '
+ 'IPv6 VIP subnet.'),
# Amphora specific options
cfg.StrOpt('amphora_ssh_user',
default='ubuntu',
@@ -217,6 +233,15 @@
default='/opt/octavia-tempest-plugin/test_server.bin',
help='Filesystem path to the test web server that will be '
'installed in the web server VMs.'),
+ # RBAC related options
+ # Note: Also see the enforce_scope section (from tempest) for Octavia API
+ # scope checking setting.
+ cfg.BoolOpt('enforce_new_defaults',
+ default=False,
+ help='Does the load-balancer service API policies enforce '
+ 'the new keystone default roles? This configuration '
+ 'value should be same as octavia.conf: '
+ '[oslo_policy].enforce_new_defaults option.'),
]
lb_feature_enabled_group = cfg.OptGroup(name='loadbalancer-feature-enabled',
@@ -261,3 +286,15 @@
"the tempest instance have access to the log files "
"specified in the tempest configuration."),
]
+
+# Extending this enforce_scope group defined in tempest
+enforce_scope_group = cfg.OptGroup(name="enforce_scope",
+ title="OpenStack Services with "
+ "enforce scope")
+EnforceScopeGroup = [
+ cfg.BoolOpt('octavia',
+ default=False,
+ help='Does the load-balancer service API policies enforce '
+ 'scope? This configuration value should be same as '
+ 'octavia.conf: [oslo_policy].enforce_scope option.'),
+]
diff --git a/octavia_tempest_plugin/plugin.py b/octavia_tempest_plugin/plugin.py
index ec093e7..ec8e7c5 100644
--- a/octavia_tempest_plugin/plugin.py
+++ b/octavia_tempest_plugin/plugin.py
@@ -19,6 +19,7 @@
from tempest.test_discover import plugins
from octavia_tempest_plugin import config as project_config
+from octavia_tempest_plugin.services.load_balancer import v2 as lb_v2_services
class OctaviaTempestPlugin(plugins.TempestPlugin):
@@ -38,6 +39,8 @@
config.register_opt_group(conf,
project_config.lb_feature_enabled_group,
project_config.LBFeatureEnabledGroup)
+ config.register_opt_group(conf, project_config.enforce_scope_group,
+ project_config.EnforceScopeGroup)
def get_opt_lists(self):
return [
@@ -55,10 +58,10 @@
)
params = {
- 'name': 'load-balancer_v2',
+ 'name': 'load_balancer_v2',
'service_version': 'load-balancer.v2',
'module_path': 'octavia_tempest_plugin.services.load_balancer.v2',
- 'client_names': ['LoadbalancerClient'],
+ 'client_names': lb_v2_services.__all__,
}
params.update(octavia_config)
diff --git a/octavia_tempest_plugin/services/load_balancer/v2/__init__.py b/octavia_tempest_plugin/services/load_balancer/v2/__init__.py
index 04cb473..2067372 100644
--- a/octavia_tempest_plugin/services/load_balancer/v2/__init__.py
+++ b/octavia_tempest_plugin/services/load_balancer/v2/__init__.py
@@ -12,6 +12,35 @@
# License for the specific language governing permissions and limitations
# under the License.
+from .amphora_client import AmphoraClient
+from .availability_zone_capabilities_client import (
+ AvailabilityZoneCapabilitiesClient)
+from .availability_zone_client import AvailabilityZoneClient
+from .availability_zone_profile_client import AvailabilityZoneProfileClient
+from .flavor_capabilities_client import FlavorCapabilitiesClient
+from .flavor_client import FlavorClient
+from .flavor_profile_client import FlavorProfileClient
+from .healthmonitor_client import HealthMonitorClient
+from .l7policy_client import L7PolicyClient
+from .l7rule_client import L7RuleClient
+from .listener_client import ListenerClient
from .loadbalancer_client import LoadbalancerClient
+from .member_client import MemberClient
+from .pool_client import PoolClient
+from .provider_client import ProviderClient
-__all__ = ['LoadbalancerClient']
+__all__ = ['LoadbalancerClient',
+ 'ListenerClient',
+ 'PoolClient',
+ 'MemberClient',
+ 'HealthMonitorClient',
+ 'L7PolicyClient',
+ 'L7RuleClient',
+ 'FlavorClient',
+ 'FlavorProfileClient',
+ 'FlavorCapabilitiesClient',
+ 'AmphoraClient',
+ 'ProviderClient',
+ 'AvailabilityZoneClient',
+ 'AvailabilityZoneProfileClient',
+ 'AvailabilityZoneCapabilitiesClient']
diff --git a/octavia_tempest_plugin/tests/RBAC_tests.py b/octavia_tempest_plugin/tests/RBAC_tests.py
new file mode 100644
index 0000000..d31d506
--- /dev/null
+++ b/octavia_tempest_plugin/tests/RBAC_tests.py
@@ -0,0 +1,476 @@
+# Copyright 2021 Red Hat, Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import copy
+
+from oslo_log import log as logging
+from tempest import config
+from tempest.lib import exceptions
+from tempest import test
+
+from octavia_tempest_plugin.common import constants
+from octavia_tempest_plugin.tests import waiters
+
+CONF = config.CONF
+LOG = logging.getLogger(__name__)
+
+
+class RBACTestsMixin(test.BaseTestCase):
+
+ def _get_client_method(self, cred_obj, client_str, method_str):
+ """Get requested method from registered clients in Tempest."""
+ lb_clients = getattr(cred_obj, 'load_balancer_v2')
+ client = getattr(lb_clients, client_str)
+ client_obj = client()
+ method = getattr(client_obj, method_str)
+ return method
+
+ def _check_allowed(self, client_str, method_str, allowed_list,
+ *args, **kwargs):
+ """Test an API call allowed RBAC enforcement.
+
+ :param client_str: The service client to use for the test, without the
+ credential. Example: 'AmphoraClient'
+ :param method_str: The method on the client to call for the test.
+ Example: 'list_amphorae'
+ :param allowed_list: The list of credentials expected to be
+ allowed. Example: ['os_roles_lb_member'].
+ :param args: Any positional parameters needed by the method.
+ :param kwargs: Any named parameters needed by the method.
+ :raises AssertionError: Raised if the RBAC tests fail.
+ :raises Forbidden: Raised if a credential that should have access does
+ not and is denied.
+ :raises InvalidScope: Raised if a credential that should have the
+ correct scope for access is denied.
+ :returns: None on success
+ """
+ for cred in allowed_list:
+ try:
+ cred_obj = getattr(self, cred)
+ except AttributeError:
+ # TODO(johnsom) Remove once scoped tokens is the default.
+ if ((cred == 'os_system_admin' or cred == 'os_system_reader')
+ and not CONF.enforce_scope.octavia):
+ LOG.info('Skipping %s allowed RBAC test because '
+ 'enforce_scope.octavia is not True', cred)
+ continue
+ else:
+ self.fail('Credential {} "expected_allowed" for RBAC '
+ 'testing was not created by tempest '
+ 'credentials setup. This is likely a bug in the '
+ 'test.'.format(cred))
+ method = self._get_client_method(cred_obj, client_str, method_str)
+ try:
+ method(*args, **kwargs)
+ except exceptions.Forbidden as e:
+ self.fail('Method {}.{} failed to allow access via RBAC using '
+ 'credential {}. Error: {}'.format(
+ client_str, method_str, cred, str(e)))
+
+ def _check_disallowed(self, client_str, method_str, allowed_list,
+ status_method=None, obj_id=None, *args, **kwargs):
+ """Test an API call disallowed RBAC enforcement.
+
+ :param client_str: The service client to use for the test, without the
+ credential. Example: 'AmphoraClient'
+ :param method_str: The method on the client to call for the test.
+ Example: 'list_amphorae'
+ :param allowed_list: The list of credentials expected to be
+ allowed. Example: ['os_roles_lb_member'].
+ :param status_method: The service client method that will provide
+ the object status for a status change waiter.
+ :param obj_id: The ID of the object to check for the expected status
+ update.
+ :param args: Any positional parameters needed by the method.
+ :param kwargs: Any named parameters needed by the method.
+ :raises AssertionError: Raised if the RBAC tests fail.
+ :raises Forbidden: Raised if a credential that should have access does
+ not and is denied.
+ :raises InvalidScope: Raised if a credential that should have the
+ correct scope for access is denied.
+ :returns: None on success
+ """
+ expected_disallowed = (set(self.allocated_credentials) -
+ set(allowed_list))
+ for cred in expected_disallowed:
+ cred_obj = getattr(self, cred)
+ method = self._get_client_method(cred_obj, client_str, method_str)
+
+ # Unfortunately tempest uses testtools assertRaises[1] which means
+ # we cannot use the unittest assertRaises context[2] with msg= to
+ # give a useful error.
+ # Also, testtools doesn't work with subTest[3], so we can't use
+ # that to expose the failing credential.
+ # This all means the exception raised testtools assertRaises
+ # is less than useful.
+ # TODO(johnsom) Remove this try block once testtools is useful.
+ # [1] https://testtools.readthedocs.io/en/latest/
+ # api.html#testtools.TestCase.assertRaises
+ # [2] https://docs.python.org/3/library/
+ # unittest.html#unittest.TestCase.assertRaises
+ # [3] https://github.com/testing-cabal/testtools/issues/235
+ try:
+ method(*args, **kwargs)
+ except exceptions.Forbidden:
+ if status_method:
+ waiters.wait_for_status(
+ status_method, obj_id,
+ constants.PROVISIONING_STATUS, constants.ACTIVE,
+ CONF.load_balancer.check_interval,
+ CONF.load_balancer.check_timeout)
+
+ continue
+ self.fail('Method {}.{} failed to deny access via RBAC using '
+ 'credential {}.'.format(client_str, method_str, cred))
+
+ def _list_get_RBAC_enforcement(self, client_str, method_str,
+ expected_allowed, *args, **kwargs):
+ """Test an API call RBAC enforcement.
+
+ :param client_str: The service client to use for the test, without the
+ credential. Example: 'AmphoraClient'
+ :param method_str: The method on the client to call for the test.
+ Example: 'list_amphorae'
+ :param expected_allowed: The list of credentials expected to be
+ allowed. Example: ['os_roles_lb_member'].
+ :param args: Any positional parameters needed by the method.
+ :param kwargs: Any named parameters needed by the method.
+ :raises AssertionError: Raised if the RBAC tests fail.
+ :raises Forbidden: Raised if a credential that should have access does
+ not and is denied.
+ :raises InvalidScope: Raised if a credential that should have the
+ correct scope for access is denied.
+ :returns: None on success
+ """
+
+ allowed_list = copy.deepcopy(expected_allowed)
+ # os_admin is a special case as it is valid with the old defaults,
+ # but will not be with the new defaults and/or token scoping.
+ # The old keystone role "admin" becomes project scoped "admin"
+ # instead of being a global admin.
+ # To keep the tests simple, handle that edge case here.
+ # TODO(johnsom) Once token scope is default, remove this.
+ if ('os_system_admin' in expected_allowed and
+ not CONF.load_balancer.enforce_new_defaults and
+ not CONF.enforce_scope.octavia):
+ allowed_list.append('os_admin')
+
+ # #### Test that disallowed credentials cannot access the API.
+ self._check_disallowed(client_str, method_str, allowed_list,
+ None, None, *args, **kwargs)
+
+ # #### Test that allowed credentials can access the API.
+ self._check_allowed(client_str, method_str, allowed_list,
+ *args, **kwargs)
+
+ def check_show_RBAC_enforcement(self, client_str, method_str,
+ expected_allowed, *args, **kwargs):
+ """Test an API show call RBAC enforcement.
+
+ :param client_str: The service client to use for the test, without the
+ credential. Example: 'AmphoraClient'
+ :param method_str: The method on the client to call for the test.
+ Example: 'list_amphorae'
+ :param expected_allowed: The list of credentials expected to be
+ allowed. Example: ['os_roles_lb_member'].
+ :param args: Any positional parameters needed by the method.
+ :param kwargs: Any named parameters needed by the method.
+ :raises AssertionError: Raised if the RBAC tests fail.
+ :raises Forbidden: Raised if a credential that should have access does
+ not and is denied.
+ :raises InvalidScope: Raised if a credential that should have the
+ correct scope for access is denied.
+ :returns: None on success
+ """
+ self._list_get_RBAC_enforcement(client_str, method_str,
+ expected_allowed, *args, **kwargs)
+
+ def check_list_RBAC_enforcement(self, client_str, method_str,
+ expected_allowed, *args, **kwargs):
+ """Test an API list call RBAC enforcement.
+
+ :param client_str: The service client to use for the test, without the
+ credential. Example: 'AmphoraClient'
+ :param method_str: The method on the client to call for the test.
+ Example: 'list_amphorae'
+ :param expected_allowed: The list of credentials expected to be
+ allowed. Example: ['os_roles_lb_member'].
+ :param args: Any positional parameters needed by the method.
+ :param kwargs: Any named parameters needed by the method.
+ :raises AssertionError: Raised if the RBAC tests fail.
+ :raises Forbidden: Raised if a credential that should have access does
+ not and is denied.
+ :raises InvalidScope: Raised if a credential that should have the
+ correct scope for access is denied.
+ :returns: None on success
+ """
+ self._list_get_RBAC_enforcement(client_str, method_str,
+ expected_allowed, *args, **kwargs)
+
+ def _CUD_RBAC_enforcement(self, client_str, method_str, expected_allowed,
+ status_method=None, obj_id=None,
+ *args, **kwargs):
+ """Test an API create/update/delete call RBAC enforcement.
+
+ :param client_str: The service client to use for the test, without the
+ credential. Example: 'AmphoraClient'
+ :param method_str: The method on the client to call for the test.
+ Example: 'list_amphorae'
+ :param expected_allowed: The list of credentials expected to be
+ allowed. Example: ['os_roles_lb_member'].
+ :param status_method: The service client method that will provide
+ the object status for a status change waiter.
+ :param obj_id: The ID of the object to check for the expected status
+ update.
+ :param args: Any positional parameters needed by the method.
+ :param kwargs: Any named parameters needed by the method.
+ :raises AssertionError: Raised if the RBAC tests fail.
+ :raises Forbidden: Raised if a credential that should have access does
+ not and is denied.
+ :raises InvalidScope: Raised if a credential that should have the
+ correct scope for access is denied.
+ :returns: None on success
+ """
+
+ allowed_list = copy.deepcopy(expected_allowed)
+ # os_admin is a special case as it is valid with the old defaults,
+ # but will not be with the new defaults and/or token scoping.
+ # The old keystone role "admin" becomes project scoped "admin"
+ # instead of being a global admin.
+ # To keep the tests simple, handle that edge case here.
+ # TODO(johnsom) Once token scope is default, remove this.
+ if ('os_system_admin' in expected_allowed and
+ not CONF.load_balancer.enforce_new_defaults and
+ not CONF.enforce_scope.octavia):
+ allowed_list.append('os_admin')
+
+ # #### Test that disallowed credentials cannot access the API.
+ self._check_disallowed(client_str, method_str, allowed_list,
+ status_method, obj_id, *args, **kwargs)
+
+ def check_create_RBAC_enforcement(
+ self, client_str, method_str, expected_allowed,
+ status_method=None, obj_id=None, *args, **kwargs):
+ """Test an API create call RBAC enforcement.
+
+ :param client_str: The service client to use for the test, without the
+ credential. Example: 'AmphoraClient'
+ :param method_str: The method on the client to call for the test.
+ Example: 'list_amphorae'
+ :param expected_allowed: The list of credentials expected to be
+ allowed. Example: ['os_roles_lb_member'].
+ :param status_method: The service client method that will provide
+ the object status for a status change waiter.
+ :param obj_id: The ID of the object to check for the expected status
+ update.
+ :param args: Any positional parameters needed by the method.
+ :param kwargs: Any named parameters needed by the method.
+ :raises AssertionError: Raised if the RBAC tests fail.
+ :raises Forbidden: Raised if a credential that should have access does
+ not and is denied.
+ :raises InvalidScope: Raised if a credential that should have the
+ correct scope for access is denied.
+ :returns: None on success
+ """
+ self._CUD_RBAC_enforcement(client_str, method_str, expected_allowed,
+ status_method, obj_id, *args, **kwargs)
+
+ def check_delete_RBAC_enforcement(
+ self, client_str, method_str, expected_allowed,
+ status_method=None, obj_id=None, *args, **kwargs):
+ """Test an API delete call RBAC enforcement.
+
+ :param client_str: The service client to use for the test, without the
+ credential. Example: 'AmphoraClient'
+ :param method_str: The method on the client to call for the test.
+ Example: 'list_amphorae'
+ :param expected_allowed: The list of credentials expected to be
+ allowed. Example: ['os_roles_lb_member'].
+ :param status_method: The service client method that will provide
+ the object status for a status change waiter.
+ :param obj_id: The ID of the object to check for the expected status
+ update.
+ :param args: Any positional parameters needed by the method.
+ :param kwargs: Any named parameters needed by the method.
+ :raises AssertionError: Raised if the RBAC tests fail.
+ :raises Forbidden: Raised if a credential that should have access does
+ not and is denied.
+ :raises InvalidScope: Raised if a credential that should have the
+ correct scope for access is denied.
+ :returns: None on success
+ """
+ self._CUD_RBAC_enforcement(client_str, method_str, expected_allowed,
+ status_method, obj_id, *args, **kwargs)
+
+ def check_update_RBAC_enforcement(
+ self, client_str, method_str, expected_allowed,
+ status_method=None, obj_id=None, *args, **kwargs):
+ """Test an API update call RBAC enforcement.
+
+ :param client_str: The service client to use for the test, without the
+ credential. Example: 'AmphoraClient'
+ :param method_str: The method on the client to call for the test.
+ Example: 'list_amphorae'
+ :param expected_allowed: The list of credentials expected to be
+ allowed. Example: ['os_roles_lb_member'].
+ :param status_method: The service client method that will provide
+ the object status for a status change waiter.
+ :param obj_id: The ID of the object to check for the expected status
+ update.
+ :param args: Any positional parameters needed by the method.
+ :param kwargs: Any named parameters needed by the method.
+ :raises AssertionError: Raised if the RBAC tests fail.
+ :raises Forbidden: Raised if a credential that should have access does
+ not and is denied.
+ :raises InvalidScope: Raised if a credential that should have the
+ correct scope for access is denied.
+ :returns: None on success
+ """
+ self._CUD_RBAC_enforcement(client_str, method_str, expected_allowed,
+ status_method, obj_id, *args, **kwargs)
+
+ def check_list_RBAC_enforcement_count(
+ self, client_str, method_str, expected_allowed, expected_count,
+ *args, **kwargs):
+ """Test an API list call RBAC enforcement result count.
+
+ List APIs will return the object list for the project associated
+ with the token used to access the API. This means most credentials
+ will have access, but will get differing results.
+
+ This test will query the list API using a list of credentials and
+ will validate that only the expected count of results are returned.
+
+ :param client_str: The service client to use for the test, without the
+ credential. Example: 'AmphoraClient'
+ :param method_str: The method on the client to call for the test.
+ Example: 'list_amphorae'
+ :param expected_allowed: The list of credentials expected to be
+ allowed. Example: ['os_roles_lb_member'].
+ :param expected_count: The number of results expected in the list
+ returned from the API.
+ :param args: Any positional parameters needed by the method.
+ :param kwargs: Any named parameters needed by the method.
+ :raises AssertionError: Raised if the RBAC tests fail.
+ :raises Forbidden: Raised if a credential that should have access does
+ not and is denied.
+ :raises InvalidScope: Raised if a credential that should have the
+ correct scope for access is denied.
+ :returns: None on success
+ """
+
+ allowed_list = copy.deepcopy(expected_allowed)
+ # os_admin is a special case as it is valid with the old defaults,
+ # but will not be with the new defaults and/or token scoping.
+ # The old keystone role "admin" becomes project scoped "admin"
+ # instead of being a global admin.
+ # To keep the tests simple, handle that edge case here.
+ # TODO(johnsom) Once token scope is default, remove this.
+ if ('os_system_admin' in expected_allowed and
+ not CONF.load_balancer.enforce_new_defaults and
+ not CONF.enforce_scope.octavia):
+ allowed_list.append('os_admin')
+
+ for cred in allowed_list:
+ try:
+ cred_obj = getattr(self, cred)
+ except AttributeError:
+ # TODO(johnsom) Remove once scoped tokens is the default.
+ if ((cred == 'os_system_admin' or cred == 'os_system_reader')
+ and not CONF.enforce_scope.octavia):
+ LOG.info('Skipping %s allowed RBAC test because '
+ 'enforce_scope.octavia is not True', cred)
+ continue
+ else:
+ self.fail('Credential {} "expected_allowed" for RBAC '
+ 'testing was not created by tempest '
+ 'credentials setup. This is likely a bug in the '
+ 'test.'.format(cred))
+ method = self._get_client_method(cred_obj, client_str, method_str)
+ try:
+ result = method(*args, **kwargs)
+ except exceptions.Forbidden as e:
+ self.fail('Method {}.{} failed to allow access via RBAC using '
+ 'credential {}. Error: {}'.format(
+ client_str, method_str, cred, str(e)))
+ self.assertEqual(expected_count, len(result), message='Credential '
+ '{} saw {} objects when {} was expected.'.format(
+ cred, len(result), expected_count))
+
+ def check_list_IDs_RBAC_enforcement(
+ self, client_str, method_str, expected_allowed, expected_ids,
+ *args, **kwargs):
+ """Test an API list call RBAC enforcement result contains IDs.
+
+ List APIs will return the object list for the project associated
+ with the token used to access the API. This means most credentials
+ will have access, but will get differing results.
+
+ This test will query the list API using a list of credentials and
+ will validate that the expected object Ids in included in the results.
+
+ :param client_str: The service client to use for the test, without the
+ credential. Example: 'AmphoraClient'
+ :param method_str: The method on the client to call for the test.
+ Example: 'list_amphorae'
+ :param expected_allowed: The list of credentials expected to be
+ allowed. Example: ['os_roles_lb_member'].
+ :param expected_ids: The list of object IDs to validate are included
+ in the returned list from the API.
+ :param args: Any positional parameters needed by the method.
+ :param kwargs: Any named parameters needed by the method.
+ :raises AssertionError: Raised if the RBAC tests fail.
+ :raises Forbidden: Raised if a credential that should have access does
+ not and is denied.
+ :raises InvalidScope: Raised if a credential that should have the
+ correct scope for access is denied.
+ :returns: None on success
+ """
+
+ allowed_list = copy.deepcopy(expected_allowed)
+ # os_admin is a special case as it is valid with the old defaults,
+ # but will not be with the new defaults and/or token scoping.
+ # The old keystone role "admin" becomes project scoped "admin"
+ # instead of being a global admin.
+ # To keep the tests simple, handle that edge case here.
+ # TODO(johnsom) Once token scope is default, remove this.
+ if ('os_system_admin' in expected_allowed and
+ not CONF.load_balancer.enforce_new_defaults and
+ not CONF.enforce_scope.octavia):
+ allowed_list.append('os_admin')
+
+ for cred in allowed_list:
+ try:
+ cred_obj = getattr(self, cred)
+ except AttributeError:
+ # TODO(johnsom) Remove once scoped tokens is the default.
+ if ((cred == 'os_system_admin' or cred == 'os_system_reader')
+ and not CONF.enforce_scope.octavia):
+ LOG.info('Skipping %s allowed RBAC test because '
+ 'enforce_scope.octavia is not True', cred)
+ continue
+ else:
+ self.fail('Credential {} "expected_allowed" for RBAC '
+ 'testing was not created by tempest '
+ 'credentials setup. This is likely a bug in the '
+ 'test.'.format(cred))
+ method = self._get_client_method(cred_obj, client_str, method_str)
+ try:
+ result = method(*args, **kwargs)
+ except exceptions.Forbidden as e:
+ self.fail('Method {}.{} failed to allow access via RBAC using '
+ 'credential {}. Error: {}'.format(
+ client_str, method_str, cred, str(e)))
+ result_ids = [lb[constants.ID] for lb in result]
+ self.assertTrue(set(expected_ids).issubset(set(result_ids)))
diff --git a/octavia_tempest_plugin/tests/act_stdby_scenario/v2/test_active_standby.py b/octavia_tempest_plugin/tests/act_stdby_scenario/v2/test_active_standby.py
index 5655b3f..11b0cec 100644
--- a/octavia_tempest_plugin/tests/act_stdby_scenario/v2/test_active_standby.py
+++ b/octavia_tempest_plugin/tests/act_stdby_scenario/v2/test_active_standby.py
@@ -183,6 +183,7 @@
* Sends traffic through the load balancer
* Validates that the Backup has assumed the Master role
"""
+ amphora_client = self.os_admin.load_balancer_v2.AmphoraClient()
# We have to do this here as the api_version and clients are not
# setup in time to use a decorator or the skip_checks mixin
if not self.mem_listener_client.is_version_supported(
@@ -197,7 +198,7 @@
self.check_members_balanced(self.lb_vip_address)
# Get the amphorae associated with this load balancer
- amphorae = self.os_admin.amphora_client.list_amphorae(
+ amphorae = amphora_client.list_amphorae(
query_params='{loadbalancer_id}={lb_id}'.format(
loadbalancer_id=const.LOADBALANCER_ID,
lb_id=self.lb_id))
@@ -216,7 +217,7 @@
start = int(time.time())
while True:
for amp in amphorae:
- amphora_stats = self.os_admin.amphora_client.get_amphora_stats(
+ amphora_stats = amphora_client.get_amphora_stats(
amp[const.ID])
for listener in amphora_stats:
if listener[const.TOTAL_CONNECTIONS] > 0:
@@ -243,7 +244,7 @@
else:
backup_amp = amp
self.assertIsNotNone(backup_amp)
- amphora_stats = self.os_admin.amphora_client.get_amphora_stats(
+ amphora_stats = amphora_client.get_amphora_stats(
backup_amp[const.ID])
for listener in amphora_stats:
self.assertEqual(0, listener[const.TOTAL_CONNECTIONS])
@@ -265,7 +266,7 @@
time.sleep(1)
# Check that the Backup amphora is now Master
- amphora_stats = self.os_admin.amphora_client.get_amphora_stats(
+ amphora_stats = amphora_client.get_amphora_stats(
backup_amp[const.ID])
connections = 0
for listener in amphora_stats:
diff --git a/octavia_tempest_plugin/tests/api/v2/test_amphora.py b/octavia_tempest_plugin/tests/api/v2/test_amphora.py
index 146096d..180e4f3 100644
--- a/octavia_tempest_plugin/tests/api/v2/test_amphora.py
+++ b/octavia_tempest_plugin/tests/api/v2/test_amphora.py
@@ -20,7 +20,6 @@
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
-from tempest.lib import exceptions
from octavia_tempest_plugin.common import constants as const
from octavia_tempest_plugin.tests import test_base
@@ -90,12 +89,17 @@
CONF.load_balancer.lb_build_interval,
CONF.load_balancer.lb_build_timeout)
- # Test that a user, without the load balancer member role, cannot
- # list amphorae
+ # Test RBAC for list amphorae
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.amphora_client.list_amphorae)
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+ if expected_allowed:
+ self.check_list_RBAC_enforcement(
+ 'AmphoraClient', 'list_amphorae', expected_allowed)
# Get an actual list of the amphorae
amphorae = self.lb_admin_amphora_client.list_amphorae()
@@ -112,13 +116,11 @@
self.assertEqual(self._expected_amp_count(amphorae), len(amphorae))
self.assertEqual(self.lb_id, amphorae[0][const.LOADBALANCER_ID])
- # Test that a different user, with load balancer member role, cannot
- # see this amphora
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.amphora_client
- self.assertRaises(exceptions.Forbidden,
- member2_client.show_amphora,
- amphora_id=amphorae[0][const.ID])
+ # Test RBAC for show amphora
+ if expected_allowed:
+ self.check_show_RBAC_enforcement(
+ 'AmphoraClient', 'show_amphora', expected_allowed,
+ amphora_id=amphorae[0][const.ID])
show_amphora_response_fields = const.SHOW_AMPHORA_RESPONSE_FIELDS
if self.lb_admin_amphora_client.is_version_supported(
@@ -175,13 +177,18 @@
loadbalancer_id=const.LOADBALANCER_ID, lb_id=self.lb_id))
amphora_1 = amphorae[0]
- # Test that a user without the load balancer admin role cannot
- # create a flavor
+ # Test RBAC for update an amphora
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.amphora_client.update_amphora_config,
- amphora_1[const.ID])
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+ if expected_allowed:
+ self.check_update_RBAC_enforcement(
+ 'AmphoraClient', 'update_amphora_config', expected_allowed,
+ None, None, amphora_1[const.ID])
self.lb_admin_amphora_client.update_amphora_config(amphora_1[const.ID])
@@ -205,15 +212,18 @@
loadbalancer_id=const.LOADBALANCER_ID, lb_id=self.lb_id))
amphora_1 = amphorae[0]
- # Test RBAC not authorized for non-admin role
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- self.assertRaises(exceptions.Forbidden,
- self.os_primary.amphora_client.amphora_failover,
- amphora_1[const.ID])
- self.assertRaises(
- exceptions.Forbidden,
- self.os_roles_lb_member.amphora_client.amphora_failover,
- amphora_1[const.ID])
+ # Test RBAC for failover an amphora
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+ if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+ if expected_allowed:
+ self.check_update_RBAC_enforcement(
+ 'AmphoraClient', 'amphora_failover', expected_allowed,
+ None, None, amphora_1[const.ID])
self.lb_admin_amphora_client.amphora_failover(amphora_1[const.ID])
diff --git a/octavia_tempest_plugin/tests/api/v2/test_availability_zone.py b/octavia_tempest_plugin/tests/api/v2/test_availability_zone.py
index 29426c2..fa7b6a4 100644
--- a/octavia_tempest_plugin/tests/api/v2/test_availability_zone.py
+++ b/octavia_tempest_plugin/tests/api/v2/test_availability_zone.py
@@ -104,12 +104,18 @@
self.availability_zone_profile_id}
# Test that a user without the load balancer admin role cannot
- # create an availability zone
+ # create an availability zone.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(exceptions.Forbidden,
- self.os_primary.availability_zone_client
- .create_availability_zone,
- **availability_zone_kwargs)
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+ if expected_allowed:
+ self.check_create_RBAC_enforcement(
+ 'AvailabilityZoneClient', 'create_availability_zone',
+ expected_allowed, **availability_zone_kwargs)
# Happy path
availability_zone = (
@@ -220,11 +226,25 @@
# Test that a user without the load balancer role cannot
# list availability zones.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = [
+ 'os_admin', 'os_primary', 'os_roles_lb_admin',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_admin', 'os_primary', 'os_system_admin',
+ 'os_system_reader', 'os_roles_lb_observer',
+ 'os_roles_lb_global_observer',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.availability_zone_client
- .list_availability_zones)
+ expected_allowed = [
+ 'os_system_admin', 'os_system_reader', 'os_roles_lb_admin',
+ 'os_roles_lb_observer', 'os_roles_lb_global_observer',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
+ if expected_allowed:
+ self.check_list_RBAC_enforcement(
+ 'AvailabilityZoneClient', 'list_availability_zones',
+ expected_allowed)
# Check the default sort order (by ID)
availability_zones = (
@@ -359,12 +379,25 @@
# Test that a user without the load balancer role cannot
# show availability zone details.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = [
+ 'os_admin', 'os_primary', 'os_roles_lb_admin',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_admin', 'os_primary', 'os_system_admin',
+ 'os_system_reader', 'os_roles_lb_observer',
+ 'os_roles_lb_global_observer',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.availability_zone_client
- .show_availability_zone,
- availability_zone[const.NAME])
+ expected_allowed = [
+ 'os_system_admin', 'os_system_reader', 'os_roles_lb_admin',
+ 'os_roles_lb_observer', 'os_roles_lb_global_observer',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
+ if expected_allowed:
+ self.check_show_RBAC_enforcement(
+ 'AvailabilityZoneClient', 'show_availability_zone',
+ expected_allowed, availability_zone[const.NAME])
result = self.mem_availability_zone_client.show_availability_zone(
availability_zone[const.NAME])
@@ -420,13 +453,18 @@
const.ENABLED: False}
# Test that a user without the load balancer role cannot
- # show availability zone details.
+ # update availability zone details.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.availability_zone_client
- .update_availability_zone,
- availability_zone[const.NAME],
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+ if expected_allowed:
+ self.check_update_RBAC_enforcement(
+ 'AvailabilityZoneClient', 'update_availability_zone',
+ expected_allowed, None, None, availability_zone[const.NAME],
**availability_zone_updated_kwargs)
updated_availability_zone = (
@@ -493,12 +531,17 @@
# Test that a user without the load balancer admin role cannot
# delete an availability zone.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.availability_zone_client
- .delete_availability_zone,
- availability_zone[const.NAME])
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+ if expected_allowed:
+ self.check_delete_RBAC_enforcement(
+ 'AvailabilityZoneClient', 'delete_availability_zone',
+ expected_allowed, None, None, availability_zone[const.NAME])
# Happy path
self.lb_admin_availability_zone_client.delete_availability_zone(
diff --git a/octavia_tempest_plugin/tests/api/v2/test_availability_zone_capabilities.py b/octavia_tempest_plugin/tests/api/v2/test_availability_zone_capabilities.py
index 4a12057..d3833f6 100644
--- a/octavia_tempest_plugin/tests/api/v2/test_availability_zone_capabilities.py
+++ b/octavia_tempest_plugin/tests/api/v2/test_availability_zone_capabilities.py
@@ -15,7 +15,6 @@
from tempest import config
from tempest.lib import decorators
-from tempest.lib import exceptions
from octavia_tempest_plugin.common import constants as const
from octavia_tempest_plugin.tests import test_base
@@ -45,13 +44,17 @@
# Test that a user without the load balancer admin role cannot
# list provider availability zone capabilities.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- os_primary_capabilities_client = (
- self.os_primary.availability_zone_capabilities_client)
- self.assertRaises(
- exceptions.Forbidden,
- (os_primary_capabilities_client
- .list_availability_zone_capabilities),
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+ if expected_allowed:
+ self.check_list_RBAC_enforcement(
+ 'AvailabilityZoneCapabilitiesClient',
+ 'list_availability_zone_capabilities', expected_allowed,
CONF.load_balancer.provider)
# Check for an expected availability zone capability for the
diff --git a/octavia_tempest_plugin/tests/api/v2/test_availability_zone_profile.py b/octavia_tempest_plugin/tests/api/v2/test_availability_zone_profile.py
index f5e4b7e..456a01e 100644
--- a/octavia_tempest_plugin/tests/api/v2/test_availability_zone_profile.py
+++ b/octavia_tempest_plugin/tests/api/v2/test_availability_zone_profile.py
@@ -75,13 +75,19 @@
}
# Test that a user without the load balancer admin role cannot
- # create an availability zone profile profile
+ # create an availability zone profile.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.availability_zone_profile_client
- .create_availability_zone_profile,
- **availability_zone_profile_kwargs)
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+ if expected_allowed:
+ self.check_create_RBAC_enforcement(
+ 'AvailabilityZoneProfileClient',
+ 'create_availability_zone_profile',
+ expected_allowed, **availability_zone_profile_kwargs)
# Happy path
availability_zone_profile = (
@@ -225,11 +231,17 @@
# Test that a user without the load balancer admin role cannot
# list availability zone profiles.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.availability_zone_profile_client
- .list_availability_zone_profiles)
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+ if expected_allowed:
+ self.check_list_RBAC_enforcement(
+ 'AvailabilityZoneProfileClient',
+ 'list_availability_zone_profiles', expected_allowed)
# Check the default sort order (by ID)
profiles = (self.lb_admin_availability_zone_profile_client
@@ -379,12 +391,18 @@
availability_zone_profile[const.ID])
# Test that a user without the load balancer admin role cannot
- # show an availability zone profile profile
+ # show an availability zone profile
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.availability_zone_profile_client
- .show_availability_zone_profile,
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+ if expected_allowed:
+ self.check_show_RBAC_enforcement(
+ 'AvailabilityZoneProfileClient',
+ 'show_availability_zone_profile', expected_allowed,
availability_zone_profile[const.ID])
result = (
@@ -475,13 +493,19 @@
}
# Test that a user without the load balancer admin role cannot
- # create an availability zone profile profile
+ # update an availability zone profile.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.availability_zone_profile_client
- .update_availability_zone_profile,
- availability_zone_profile[const.ID],
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+ if expected_allowed:
+ self.check_update_RBAC_enforcement(
+ 'AvailabilityZoneProfileClient',
+ 'update_availability_zone_profile', expected_allowed,
+ None, None, availability_zone_profile[const.ID],
**availability_zone_profile_updated_kwargs)
result = (
@@ -551,13 +575,19 @@
availability_zone_profile[const.ID])
# Test that a user without the load balancer admin role cannot
- # delete an availability zone profile profile
+ # delete an availability zone profile.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.availability_zone_profile_client
- .delete_availability_zone_profile,
- availability_zone_profile[const.ID])
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+ if expected_allowed:
+ self.check_delete_RBAC_enforcement(
+ 'AvailabilityZoneProfileClient',
+ 'delete_availability_zone_profile', expected_allowed,
+ None, None, availability_zone_profile[const.ID])
# Happy path
(self.lb_admin_availability_zone_profile_client
diff --git a/octavia_tempest_plugin/tests/api/v2/test_flavor.py b/octavia_tempest_plugin/tests/api/v2/test_flavor.py
index 4333121..b5b4254 100644
--- a/octavia_tempest_plugin/tests/api/v2/test_flavor.py
+++ b/octavia_tempest_plugin/tests/api/v2/test_flavor.py
@@ -87,11 +87,18 @@
const.FLAVOR_PROFILE_ID: self.flavor_profile_id}
# Test that a user without the load balancer admin role cannot
- # create a flavor
+ # create a flavor.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(exceptions.Forbidden,
- self.os_primary.flavor_client.create_flavor,
- **flavor_kwargs)
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+ if expected_allowed:
+ self.check_create_RBAC_enforcement(
+ 'FlavorClient', 'create_flavor',
+ expected_allowed, None, None, **flavor_kwargs)
# Happy path
flavor = self.lb_admin_flavor_client.create_flavor(**flavor_kwargs)
@@ -185,10 +192,24 @@
# Test that a user without the load balancer role cannot
# list flavors.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = [
+ 'os_admin', 'os_primary', 'os_roles_lb_admin',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_admin', 'os_primary', 'os_system_admin',
+ 'os_system_reader', 'os_roles_lb_observer',
+ 'os_roles_lb_global_observer',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.flavor_client.list_flavors)
+ expected_allowed = [
+ 'os_system_admin', 'os_system_reader', 'os_roles_lb_admin',
+ 'os_roles_lb_observer', 'os_roles_lb_global_observer',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
+ if expected_allowed:
+ self.check_list_RBAC_enforcement(
+ 'FlavorClient', 'list_flavors', expected_allowed)
# Check the default sort order (by ID)
flavors = self.mem_flavor_client.list_flavors()
@@ -299,10 +320,24 @@
# Test that a user without the load balancer role cannot
# show flavor details.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = [
+ 'os_admin', 'os_primary', 'os_roles_lb_admin',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_admin', 'os_primary', 'os_system_admin',
+ 'os_system_reader', 'os_roles_lb_observer',
+ 'os_roles_lb_global_observer',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.flavor_client.show_flavor,
+ expected_allowed = [
+ 'os_system_admin', 'os_system_reader', 'os_roles_lb_admin',
+ 'os_roles_lb_observer', 'os_roles_lb_global_observer',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
+ if expected_allowed:
+ self.check_show_RBAC_enforcement(
+ 'FlavorClient', 'show_flavor', expected_allowed,
flavor[const.ID])
result = self.mem_flavor_client.show_flavor(flavor[const.ID])
@@ -354,11 +389,17 @@
const.ENABLED: False}
# Test that a user without the load balancer role cannot
- # show flavor details.
+ # update flavor details.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.flavor_client.update_flavor,
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+ if expected_allowed:
+ self.check_update_RBAC_enforcement(
+ 'FlavorClient', 'update_flavor', expected_allowed, None, None,
flavor[const.ID], **flavor_updated_kwargs)
updated_flavor = self.lb_admin_flavor_client.update_flavor(
@@ -413,11 +454,17 @@
# Test that a user without the load balancer admin role cannot
# delete a flavor.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.flavor_client.delete_flavor,
- flavor[const.ID])
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+ if expected_allowed:
+ self.check_delete_RBAC_enforcement(
+ 'FlavorClient', 'delete_flavor', expected_allowed,
+ None, None, flavor[const.ID])
# Happy path
self.lb_admin_flavor_client.delete_flavor(flavor[const.ID])
diff --git a/octavia_tempest_plugin/tests/api/v2/test_flavor_capabilities.py b/octavia_tempest_plugin/tests/api/v2/test_flavor_capabilities.py
index 7f9da51..884f656 100644
--- a/octavia_tempest_plugin/tests/api/v2/test_flavor_capabilities.py
+++ b/octavia_tempest_plugin/tests/api/v2/test_flavor_capabilities.py
@@ -14,7 +14,6 @@
from tempest import config
from tempest.lib import decorators
-from tempest.lib import exceptions
from octavia_tempest_plugin.common import constants as const
from octavia_tempest_plugin.tests import test_base
@@ -43,12 +42,17 @@
# Test that a user without the load balancer admin role cannot
# list provider flavor capabilities.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- os_primary_capabilities_client = (
- self.os_primary.flavor_capabilities_client)
- self.assertRaises(
- exceptions.Forbidden,
- os_primary_capabilities_client.list_flavor_capabilities,
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+ if expected_allowed:
+ self.check_list_RBAC_enforcement(
+ 'FlavorCapabilitiesClient',
+ 'list_flavor_capabilities', expected_allowed,
CONF.load_balancer.provider)
# Check for an expected flavor capability for the configured provider
diff --git a/octavia_tempest_plugin/tests/api/v2/test_flavor_profile.py b/octavia_tempest_plugin/tests/api/v2/test_flavor_profile.py
index 8f9c7d3..39f3338 100644
--- a/octavia_tempest_plugin/tests/api/v2/test_flavor_profile.py
+++ b/octavia_tempest_plugin/tests/api/v2/test_flavor_profile.py
@@ -60,11 +60,17 @@
# Test that a user without the load balancer admin role cannot
# create a flavor profile
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.flavor_profile_client.create_flavor_profile,
- **flavor_profile_kwargs)
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+ if expected_allowed:
+ self.check_create_RBAC_enforcement(
+ 'FlavorProfileClient', 'create_flavor_profile',
+ expected_allowed, None, None, **flavor_profile_kwargs)
# Happy path
flavor_profile = (
@@ -174,10 +180,17 @@
# Test that a user without the load balancer admin role cannot
# list flavor profiles.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.flavor_profile_client.list_flavor_profiles)
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+ if expected_allowed:
+ self.check_list_RBAC_enforcement(
+ 'FlavorProfileClient', 'list_flavor_profiles',
+ expected_allowed)
# Check the default sort order (by ID)
profiles = self.lb_admin_flavor_profile_client.list_flavor_profiles()
@@ -295,12 +308,18 @@
flavor_profile[const.ID])
# Test that a user without the load balancer admin role cannot
- # show a flavor profile
+ # show a flavor profile.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.flavor_profile_client.show_flavor_profile,
- flavor_profile[const.ID])
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+ if expected_allowed:
+ self.check_show_RBAC_enforcement(
+ 'FlavorProfileClient', 'show_flavor_profile',
+ expected_allowed, flavor_profile[const.ID])
result = (
self.lb_admin_flavor_profile_client.show_flavor_profile(
@@ -367,12 +386,19 @@
}
# Test that a user without the load balancer admin role cannot
- # create a flavor profile
+ # update a flavor profile.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.flavor_profile_client.update_flavor_profile,
- flavor_profile[const.ID], **flavor_profile_updated_kwargs)
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+ if expected_allowed:
+ self.check_update_RBAC_enforcement(
+ 'FlavorProfileClient', 'update_flavor_profile',
+ expected_allowed, None, None, flavor_profile[const.ID],
+ **flavor_profile_updated_kwargs)
result = self.lb_admin_flavor_profile_client.update_flavor_profile(
flavor_profile[const.ID], **flavor_profile_updated_kwargs)
@@ -428,11 +454,17 @@
# Test that a user without the load balancer admin role cannot
# delete a flavor profile
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.flavor_profile_client.delete_flavor_profile,
- flavor_profile[const.ID])
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+ if expected_allowed:
+ self.check_delete_RBAC_enforcement(
+ 'FlavorProfileClient', 'delete_flavor_profile',
+ expected_allowed, None, None, flavor_profile[const.ID])
# Happy path
self.lb_admin_flavor_profile_client.delete_flavor_profile(
diff --git a/octavia_tempest_plugin/tests/api/v2/test_healthmonitor.py b/octavia_tempest_plugin/tests/api/v2/test_healthmonitor.py
index ff07c73..a305ead 100644
--- a/octavia_tempest_plugin/tests/api/v2/test_healthmonitor.py
+++ b/octavia_tempest_plugin/tests/api/v2/test_healthmonitor.py
@@ -277,11 +277,21 @@
# Test that a user without the loadbalancer role cannot
# create a healthmonitor
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_member']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.healthmonitor_client.create_healthmonitor,
- **hm_kwargs)
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_create_RBAC_enforcement(
+ 'HealthMonitorClient', 'create_healthmonitor',
+ expected_allowed,
+ status_method=self.mem_lb_client.show_loadbalancer,
+ obj_id=self.lb_id, **hm_kwargs)
hm = self.mem_healthmonitor_client.create_healthmonitor(**hm_kwargs)
self.addCleanup(
@@ -488,6 +498,9 @@
* List the healthmonitors filtering to one of the three.
* List the healthmonitors filtered, one field, and sorted.
"""
+ # IDs of health monitors created in the test
+ test_ids = []
+
if (pool_algorithm == const.LB_ALGORITHM_SOURCE_IP_PORT and not
self.mem_listener_client.is_version_supported(
self.api_version, '2.13')):
@@ -614,6 +627,7 @@
const.ACTIVE,
CONF.load_balancer.build_interval,
CONF.load_balancer.build_timeout)
+ test_ids.append(hm1[const.ID])
# Time resolution for created_at is only to the second, and we need to
# ensure that each object has a distinct creation time. Delaying one
# second is both a simple and a reliable way to accomplish this.
@@ -658,6 +672,7 @@
const.ACTIVE,
CONF.load_balancer.build_interval,
CONF.load_balancer.build_timeout)
+ test_ids.append(hm2[const.ID])
# Time resolution for created_at is only to the second, and we need to
# ensure that each object has a distinct creation time. Delaying one
# second is both a simple and a reliable way to accomplish this.
@@ -702,19 +717,65 @@
const.ACTIVE,
CONF.load_balancer.build_interval,
CONF.load_balancer.build_timeout)
+ test_ids.append(hm3[const.ID])
- # Test that a different user cannot list healthmonitors
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.healthmonitor_client
- primary = member2_client.list_healthmonitors(
- query_params='pool_id={pool_id}'.format(pool_id=pool1_id))
- self.assertEqual(0, len(primary))
+ # Test that a different users cannot see the lb_member healthmonitors
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_primary', 'os_roles_lb_member2']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_admin', 'os_primary',
+ 'os_roles_lb_member2', 'os_roles_lb_observer',
+ 'os_roles_lb_global_observer']
+ if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
+ expected_allowed = ['os_roles_lb_observer', 'os_roles_lb_member2']
+ if expected_allowed:
+ self.check_list_RBAC_enforcement_count(
+ 'HealthMonitorClient', 'list_healthmonitors',
+ expected_allowed, 0)
+
+ # Test credentials that should see these healthmonitors can see them.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_admin', 'os_roles_lb_member',
+ 'os_roles_lb_global_observer']
+ if expected_allowed:
+ self.check_list_IDs_RBAC_enforcement(
+ 'HealthMonitorClient', 'list_healthmonitors',
+ expected_allowed, test_ids)
# Test that users without the lb member role cannot list healthmonitors
+ # Note: non-owners can still call this API, they will just get the list
+ # of health monitors for their project (zero). The above tests
+ # are intended to cover the cross project use case.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_primary', 'os_roles_lb_admin',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
+ # Note: os_admin is here because it evaluaties to "project_admin"
+ # in oslo_policy and since keystone considers "project_admin"
+ # a superscope of "project_reader". This means it can read
+ # objects in the "admin" credential's project.
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_admin', 'os_primary', 'os_system_admin',
+ 'os_system_reader', 'os_roles_lb_observer',
+ 'os_roles_lb_global_observer',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.healthmonitor_client.list_healthmonitors)
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_admin', 'os_roles_lb_observer',
+ 'os_roles_lb_global_observer',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
+ if expected_allowed:
+ self.check_list_RBAC_enforcement(
+ 'HealthMonitorClient', 'list_healthmonitors',
+ expected_allowed)
# Check the default sort order, created_at
hms = self.mem_healthmonitor_client.list_healthmonitors()
@@ -1125,33 +1186,24 @@
for item in equal_items:
self.assertEqual(hm_kwargs[item], hm[item])
- # Test that a user with lb_admin role can see the healthmonitor
+ # Test that the appropriate users can see or not see the health
+ # monitors based on the API RBAC.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_member']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- healthmonitor_client = self.os_roles_lb_admin.healthmonitor_client
- hm_adm = healthmonitor_client.show_healthmonitor(hm[const.ID])
- self.assertEqual(hm_name, hm_adm[const.NAME])
-
- # Test that a user with cloud admin role can see the healthmonitor
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- adm = self.os_admin.healthmonitor_client.show_healthmonitor(
- hm[const.ID])
- self.assertEqual(hm_name, adm[const.NAME])
-
- # Test that a different user, with loadbalancer member role, cannot
- # see this healthmonitor
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.healthmonitor_client
- self.assertRaises(exceptions.Forbidden,
- member2_client.show_healthmonitor,
- hm[const.ID])
-
- # Test that a user, without the loadbalancer member role, cannot
- # show healthmonitors
- if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.healthmonitor_client.show_healthmonitor,
- hm[const.ID])
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_admin',
+ 'os_roles_lb_global_observer',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_show_RBAC_enforcement(
+ 'HealthMonitorClient', 'show_healthmonitor',
+ expected_allowed, hm[const.ID])
@decorators.idempotent_id('2417164b-ec03-4488-afd2-60b096dc0077')
def test_LC_HTTP_healthmonitor_update(self):
@@ -1417,27 +1469,21 @@
self.assertCountEqual(hm_kwargs[const.TAGS], hm[const.TAGS])
# Test that a user, without the loadbalancer member role, cannot
- # use this command
+ # update this healthmonitor.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_member']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.healthmonitor_client.update_healthmonitor,
- hm[const.ID], admin_state_up=True)
-
- # Assert we didn't go into PENDING_*
- hm_check = self.mem_healthmonitor_client.show_healthmonitor(
- hm[const.ID])
- self.assertEqual(const.ACTIVE,
- hm_check[const.PROVISIONING_STATUS])
- self.assertFalse(hm_check[const.ADMIN_STATE_UP])
-
- # Test that a user, without the loadbalancer member role, cannot
- # update this healthmonitor
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.healthmonitor_client
- self.assertRaises(exceptions.Forbidden,
- member2_client.update_healthmonitor,
- hm[const.ID], admin_state_up=True)
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_update_RBAC_enforcement(
+ 'HealthMonitorClient', 'update_healthmonitor',
+ expected_allowed, None, None, hm[const.ID],
+ admin_state_up=True)
# Assert we didn't go into PENDING_*
hm_check = self.mem_healthmonitor_client.show_healthmonitor(
@@ -1725,21 +1771,21 @@
CONF.load_balancer.build_interval,
CONF.load_balancer.build_timeout)
- # Test that a user without the loadbalancer role cannot
- # delete this healthmonitor
+ # Test that a user without the loadbalancer role cannot delete this
+ # healthmonitor.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_member']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.healthmonitor_client.delete_healthmonitor,
- hm[const.ID])
-
- # Test that a different user, with the loadbalancer member role
- # cannot delete this healthmonitor
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.healthmonitor_client
- self.assertRaises(exceptions.Forbidden,
- member2_client.delete_healthmonitor,
- hm[const.ID])
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_delete_RBAC_enforcement(
+ 'HealthMonitorClient', 'delete_healthmonitor',
+ expected_allowed, None, None, hm[const.ID])
self.mem_healthmonitor_client.delete_healthmonitor(hm[const.ID])
diff --git a/octavia_tempest_plugin/tests/api/v2/test_l7policy.py b/octavia_tempest_plugin/tests/api/v2/test_l7policy.py
index 440a533..e7ed5a6 100644
--- a/octavia_tempest_plugin/tests/api/v2/test_l7policy.py
+++ b/octavia_tempest_plugin/tests/api/v2/test_l7policy.py
@@ -19,7 +19,6 @@
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
-from tempest.lib import exceptions
from octavia_tempest_plugin.common import constants as const
from octavia_tempest_plugin.tests import test_base
@@ -135,11 +134,21 @@
# Test that a user without the load balancer role cannot
# create a l7policy
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_member']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.l7policy_client.create_l7policy,
- **l7policy_kwargs)
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_create_RBAC_enforcement(
+ 'L7PolicyClient', 'create_l7policy',
+ expected_allowed,
+ status_method=self.mem_lb_client.show_loadbalancer,
+ obj_id=self.lb_id, **l7policy_kwargs)
l7policy = self.mem_l7policy_client.create_l7policy(**l7policy_kwargs)
@@ -208,6 +217,9 @@
* List the l7policies filtering to one of the three.
* List the l7policies filtered, one field, and sorted.
"""
+ # IDs of L7 policies created in the test
+ test_ids = []
+
listener_name = data_utils.rand_name(
"lb_member_listener2_l7policy-list")
listener_kwargs = {
@@ -263,6 +275,7 @@
const.ACTIVE,
CONF.load_balancer.build_interval,
CONF.load_balancer.build_timeout)
+ test_ids.append(l7policy1[const.ID])
# Time resolution for created_at is only to the second, and we need to
# ensure that each object has a distinct creation time. Delaying one
# second is both a simple and a reliable way to accomplish this.
@@ -303,6 +316,7 @@
const.ACTIVE,
CONF.load_balancer.build_interval,
CONF.load_balancer.build_timeout)
+ test_ids.append(l7policy2[const.ID])
# Time resolution for created_at is only to the second, and we need to
# ensure that each object has a distinct creation time. Delaying one
# second is both a simple and a reliable way to accomplish this.
@@ -344,21 +358,67 @@
const.ACTIVE,
CONF.load_balancer.build_interval,
CONF.load_balancer.build_timeout)
+ test_ids.append(l7policy3[const.ID])
- # Test that a different user cannot list l7policies
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.l7policy_client
- primary = member2_client.list_l7policies(
+ # Test that a different users cannot see the lb_member l7policies
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_primary', 'os_roles_lb_member2']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_admin', 'os_primary',
+ 'os_roles_lb_member2', 'os_roles_lb_observer',
+ 'os_roles_lb_global_observer']
+ if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
+ expected_allowed = ['os_roles_lb_observer', 'os_roles_lb_member2']
+ if expected_allowed:
+ self.check_list_RBAC_enforcement_count(
+ 'L7PolicyClient', 'list_l7policies',
+ expected_allowed, 0)
+
+ # Test credentials that should see these l7policies can see them.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_admin', 'os_roles_lb_member',
+ 'os_roles_lb_global_observer']
+ if expected_allowed:
+ self.check_list_IDs_RBAC_enforcement(
+ 'L7PolicyClient', 'list_l7policies',
+ expected_allowed, test_ids,
query_params='listener_id={listener_id}'.format(
listener_id=listener_id))
- self.assertEqual(0, len(primary))
- # Test that a user without the lb member role cannot list load
- # balancers
+ # Test that users without the lb member role cannot list l7policies
+ # Note: non-owners can still call this API, they will just get the list
+ # of L7 policies for their project (zero). The above tests
+ # are intended to cover the cross project use case.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_primary', 'os_roles_lb_admin',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
+ # Note: os_admin is here because it evaluaties to "project_admin"
+ # in oslo_policy and since keystone considers "project_admin"
+ # a superscope of "project_reader". This means it can read
+ # objects in the "admin" credential's project.
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_admin', 'os_primary', 'os_system_admin',
+ 'os_system_reader', 'os_roles_lb_observer',
+ 'os_roles_lb_global_observer',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.l7policy_client.list_l7policies)
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_admin', 'os_roles_lb_observer',
+ 'os_roles_lb_global_observer',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
+ if expected_allowed:
+ self.check_list_RBAC_enforcement(
+ 'L7PolicyClient', 'list_l7policies',
+ expected_allowed)
# Check the default sort order, created_at
l7policies = self.mem_l7policy_client.list_l7policies(
@@ -585,33 +645,24 @@
self.assertIsNone(l7policy.pop(const.REDIRECT_URL, None))
self.assertIsNone(l7policy.pop(const.REDIRECT_POOL_ID, None))
- # Test that a user with lb_admin role can see the l7policy
+ # Test that the appropriate users can see or not see the L7 policies
+ # based on the API RBAC.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_member']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- l7policy_client = self.os_roles_lb_admin.l7policy_client
- l7policy_adm = l7policy_client.show_l7policy(l7policy[const.ID])
- self.assertEqual(l7policy_name, l7policy_adm[const.NAME])
-
- # Test that a user with cloud admin role can see the l7policy
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- adm = self.os_admin.l7policy_client.show_l7policy(
- l7policy[const.ID])
- self.assertEqual(l7policy_name, adm[const.NAME])
-
- # Test that a different user, with load balancer member role, cannot
- # see this l7policy
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.l7policy_client
- self.assertRaises(exceptions.Forbidden,
- member2_client.show_l7policy,
- l7policy[const.ID])
-
- # Test that a user, without the load balancer member role, cannot
- # show l7policies
- if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.l7policy_client.show_l7policy,
- l7policy[const.ID])
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_admin',
+ 'os_roles_lb_global_observer',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_show_RBAC_enforcement(
+ 'L7PolicyClient', 'show_l7policy',
+ expected_allowed, l7policy[const.ID])
@decorators.idempotent_id('08f73b22-550b-4e5a-b3d6-2ec03251ca13')
def test_l7policy_update(self):
@@ -703,28 +754,22 @@
self.assertCountEqual(l7policy_kwargs[const.TAGS],
l7policy[const.TAGS])
- # Test that a user, without the load balancer member role, cannot
- # use this command
+ # Test that a user, without the loadbalancer member role, cannot
+ # update this L7 policy.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_member']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.l7policy_client.update_l7policy,
- l7policy[const.ID], admin_state_up=True)
-
- # Assert we didn't go into PENDING_*
- l7policy_check = self.mem_l7policy_client.show_l7policy(
- l7policy[const.ID])
- self.assertEqual(const.ACTIVE,
- l7policy_check[const.PROVISIONING_STATUS])
- self.assertFalse(l7policy_check[const.ADMIN_STATE_UP])
-
- # Test that a user, without the load balancer member role, cannot
- # update this l7policy
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.l7policy_client
- self.assertRaises(exceptions.Forbidden,
- member2_client.update_l7policy,
- l7policy[const.ID], admin_state_up=True)
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_update_RBAC_enforcement(
+ 'L7PolicyClient', 'update_l7policy',
+ expected_allowed, None, None, l7policy[const.ID],
+ admin_state_up=True)
# Assert we didn't go into PENDING_*
l7policy_check = self.mem_l7policy_client.show_l7policy(
@@ -820,21 +865,21 @@
CONF.load_balancer.build_interval,
CONF.load_balancer.build_timeout)
- # Test that a user without the load balancer role cannot
- # delete this l7policy
+ # Test that a user without the loadbalancer role cannot delete this
+ # L7 policy.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_member']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.l7policy_client.delete_l7policy,
- l7policy[const.ID])
-
- # Test that a different user, with the load balancer member role
- # cannot delete this l7policy
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.l7policy_client
- self.assertRaises(exceptions.Forbidden,
- member2_client.delete_l7policy,
- l7policy[const.ID])
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_delete_RBAC_enforcement(
+ 'L7PolicyClient', 'delete_l7policy',
+ expected_allowed, None, None, l7policy[const.ID])
self.mem_l7policy_client.delete_l7policy(l7policy[const.ID])
diff --git a/octavia_tempest_plugin/tests/api/v2/test_l7rule.py b/octavia_tempest_plugin/tests/api/v2/test_l7rule.py
index e44c9f8..5cb85c4 100644
--- a/octavia_tempest_plugin/tests/api/v2/test_l7rule.py
+++ b/octavia_tempest_plugin/tests/api/v2/test_l7rule.py
@@ -19,7 +19,6 @@
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
-from tempest.lib import exceptions
from octavia_tempest_plugin.common import constants as const
from octavia_tempest_plugin.tests import test_base
@@ -141,13 +140,23 @@
const.TAGS: l7_rule_tags
})
- # Test that a user without the load balancer role cannot
- # create a l7rule
+ # Test that a user without the loadbalancer role cannot
+ # create an L7 rule.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_member']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.l7rule_client.create_l7rule,
- **l7rule_kwargs)
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_create_RBAC_enforcement(
+ 'L7RuleClient', 'create_l7rule',
+ expected_allowed,
+ status_method=self.mem_lb_client.show_loadbalancer,
+ obj_id=self.lb_id, **l7rule_kwargs)
l7rule = self.mem_l7rule_client.create_l7rule(**l7rule_kwargs)
self.addClassResourceCleanup(
@@ -211,6 +220,9 @@
* List the l7rules filtering to one of the three.
* List the l7rules filtered, one field, and sorted.
"""
+ # IDs of L7 rules created in the test
+ test_ids = []
+
l7policy_name = data_utils.rand_name("lb_member_l7policy2_l7rule-list")
l7policy = self.mem_l7policy_client.create_l7policy(
name=l7policy_name, listener_id=self.listener_id,
@@ -260,6 +272,7 @@
const.ACTIVE,
CONF.load_balancer.check_interval,
CONF.load_balancer.check_timeout)
+ test_ids.append(l7rule1[const.ID])
# Time resolution for created_at is only to the second, and we need to
# ensure that each object has a distinct creation time. Delaying one
# second is both a simple and a reliable way to accomplish this.
@@ -298,6 +311,7 @@
const.ACTIVE,
CONF.load_balancer.check_interval,
CONF.load_balancer.check_timeout)
+ test_ids.append(l7rule2[const.ID])
# Time resolution for created_at is only to the second, and we need to
# ensure that each object has a distinct creation time. Delaying one
# second is both a simple and a reliable way to accomplish this.
@@ -336,23 +350,47 @@
const.ACTIVE,
CONF.load_balancer.check_interval,
CONF.load_balancer.check_timeout)
+ test_ids.append(l7rule3[const.ID])
- # Test that a different user cannot list l7rules
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.l7rule_client
- self.assertRaises(
- exceptions.Forbidden,
- member2_client.list_l7rules,
- l7policy_id)
-
- # Test that a user without the lb l7rule role cannot list load
- # balancers
+ # Test credentials that should see these L7 rules can see them.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_member']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.l7rule_client.list_l7rules,
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_admin', 'os_roles_lb_member',
+ 'os_roles_lb_global_observer']
+ if expected_allowed:
+ self.check_list_IDs_RBAC_enforcement(
+ 'L7RuleClient', 'list_l7rules', expected_allowed, test_ids,
l7policy_id)
+ # Test that users without the lb member role cannot list L7 rules.
+ # Note: The parent policy ID blocks non-owners from listing
+ # L7 Rules.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ # Note: os_admin is here because it evaluaties to "project_admin"
+ # in oslo_policy and since keystone considers "project_admin"
+ # a superscope of "project_reader". This means it can read
+ # objects in the "admin" credential's project.
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_admin', 'os_system_admin',
+ 'os_system_reader', 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_admin',
+ 'os_roles_lb_global_observer',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_list_RBAC_enforcement(
+ 'L7RuleClient', 'list_l7rules', expected_allowed, l7policy_id)
+
# Check the default sort order, created_at
l7rules = self.mem_l7rule_client.list_l7rules(l7policy_id)
self.assertEqual(l7rule1[const.VALUE],
@@ -521,34 +559,25 @@
for item in equal_items:
self.assertEqual(l7rule_kwargs[item], l7rule[item])
- # Test that a user with lb_admin role can see the l7rule
+ # Test that the appropriate users can see or not see the L7 rule
+ # based on the API RBAC.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_member']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- l7rule_client = self.os_roles_lb_admin.l7rule_client
- l7rule_adm = l7rule_client.show_l7rule(
- l7rule[const.ID], l7policy_id=self.l7policy_id)
- self.assertEqual(l7rule_kwargs[const.KEY], l7rule_adm[const.KEY])
-
- # Test that a user with cloud admin role can see the l7rule
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- adm = self.os_admin.l7rule_client.show_l7rule(
- l7rule[const.ID], l7policy_id=self.l7policy_id)
- self.assertEqual(l7rule_kwargs[const.KEY], adm[const.KEY])
-
- # Test that a different user, with load balancer member role, cannot
- # see this l7rule
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.l7rule_client
- self.assertRaises(exceptions.Forbidden,
- member2_client.show_l7rule,
- l7rule[const.ID], l7policy_id=self.l7policy_id)
-
- # Test that a user, without the load balancer member role, cannot
- # show l7rules
- if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.l7rule_client.show_l7rule,
- l7rule[const.ID], l7policy_id=self.l7policy_id)
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_admin',
+ 'os_roles_lb_global_observer',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_show_RBAC_enforcement(
+ 'L7RuleClient', 'show_l7rule',
+ expected_allowed, l7rule[const.ID],
+ l7policy_id=self.l7policy_id)
@decorators.idempotent_id('f8cee23b-89b6-4f3a-a842-1463daf42cf7')
def test_l7rule_update(self):
@@ -618,29 +647,22 @@
for item in equal_items:
self.assertEqual(l7rule_kwargs[item], l7rule[item])
- # Test that a user, without the load balancer member role, cannot
- # use this command
+ # Test that a user, without the loadbalancer member role, cannot
+ # update this L7 rule.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_member']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.l7rule_client.update_l7rule,
- l7rule[const.ID], l7policy_id=self.l7policy_id,
- admin_state_up=True)
-
- # Assert we didn't go into PENDING_*
- l7rule_check = self.mem_l7rule_client.show_l7rule(
- l7rule[const.ID], l7policy_id=self.l7policy_id)
- self.assertEqual(const.ACTIVE, l7rule_check[const.PROVISIONING_STATUS])
- self.assertFalse(l7rule_check[const.ADMIN_STATE_UP])
-
- # Test that a user, without the load balancer member role, cannot
- # update this l7rule
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.l7rule_client
- self.assertRaises(exceptions.Forbidden,
- member2_client.update_l7rule,
- l7rule[const.ID], l7policy_id=self.l7policy_id,
- admin_state_up=True)
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_update_RBAC_enforcement(
+ 'L7RuleClient', 'update_l7rule',
+ expected_allowed, None, None, l7rule[const.ID],
+ l7policy_id=self.l7policy_id, admin_state_up=True)
# Assert we didn't go into PENDING_*
l7rule_check = self.mem_l7rule_client.show_l7rule(
@@ -727,21 +749,22 @@
CONF.load_balancer.build_interval,
CONF.load_balancer.build_timeout)
- # Test that a user without the load balancer role cannot
- # delete this l7rule
+ # Test that a user without the loadbalancer role cannot delete this
+ # L7 rule.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_member']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.l7rule_client.delete_l7rule,
- l7rule[const.ID], l7policy_id=self.l7policy_id)
-
- # Test that a different user, with the load balancer member role
- # cannot delete this l7rule
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.l7rule_client
- self.assertRaises(exceptions.Forbidden,
- member2_client.delete_l7rule,
- l7rule[const.ID], l7policy_id=self.l7policy_id)
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_delete_RBAC_enforcement(
+ 'L7RuleClient', 'delete_l7rule',
+ expected_allowed, None, None, l7rule[const.ID],
+ l7policy_id=self.l7policy_id)
self.mem_l7rule_client.delete_l7rule(l7rule[const.ID],
l7policy_id=self.l7policy_id)
diff --git a/octavia_tempest_plugin/tests/api/v2/test_listener.py b/octavia_tempest_plugin/tests/api/v2/test_listener.py
index c688fdd..152f6ff 100644
--- a/octavia_tempest_plugin/tests/api/v2/test_listener.py
+++ b/octavia_tempest_plugin/tests/api/v2/test_listener.py
@@ -59,6 +59,14 @@
if CONF.load_balancer.test_with_ipv6:
cls.allowed_cidrs = ['2001:db8:a0b:12f0::/64']
+ @classmethod
+ def setup_clients(cls):
+ """Setup client aliases."""
+ super(ListenerAPITest, cls).setup_clients()
+ cls.listener_client = cls.os_primary.load_balancer_v2.ListenerClient()
+ cls.member2_listener_client = (
+ cls.os_roles_lb_member2.load_balancer_v2.ListenerClient())
+
@decorators.idempotent_id('88d0ec83-7b08-48d9-96e2-0df1d2f8cd98')
def test_http_listener_create(self):
self._test_listener_create(const.HTTP, 8000)
@@ -143,13 +151,23 @@
listener_kwargs.update({const.ALLOWED_CIDRS: self.allowed_cidrs})
- # Test that a user without the load balancer role cannot
- # create a listener
+ # Test that a user without the loadbalancer role cannot
+ # create a listener.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_member']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.listener_client.create_listener,
- **listener_kwargs)
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_create_RBAC_enforcement(
+ 'ListenerClient', 'create_listener',
+ expected_allowed,
+ status_method=self.mem_lb_client.show_loadbalancer,
+ obj_id=self.lb_id, **listener_kwargs)
listener = self.mem_listener_client.create_listener(**listener_kwargs)
@@ -378,6 +396,9 @@
* List the listeners filtering to one of the three.
* List the listeners filtered, one field, and sorted.
"""
+ # IDs of listeners created in the test
+ test_ids = []
+
lb_name = data_utils.rand_name("lb_member_lb2_listener-list")
lb = self.mem_lb_client.create_loadbalancer(
name=lb_name, provider=CONF.load_balancer.provider,
@@ -427,6 +448,7 @@
const.ACTIVE,
CONF.load_balancer.build_interval,
CONF.load_balancer.build_timeout)
+ test_ids.append(listener1[const.ID])
# Time resolution for created_at is only to the second, and we need to
# ensure that each object has a distinct creation time. Delaying one
# second is both a simple and a reliable way to accomplish this.
@@ -465,6 +487,7 @@
const.ACTIVE,
CONF.load_balancer.build_interval,
CONF.load_balancer.build_timeout)
+ test_ids.append(listener2[const.ID])
# Time resolution for created_at is only to the second, and we need to
# ensure that each object has a distinct creation time. Delaying one
# second is both a simple and a reliable way to accomplish this.
@@ -503,6 +526,7 @@
const.ACTIVE,
CONF.load_balancer.build_interval,
CONF.load_balancer.build_timeout)
+ test_ids.append(listener3[const.ID])
if not CONF.load_balancer.test_with_noop:
# Wait for the enabled listeners to come ONLINE
@@ -517,18 +541,64 @@
CONF.load_balancer.build_interval,
CONF.load_balancer.build_timeout)
- # Test that a different user cannot list listeners
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.listener_client
- primary = member2_client.list_listeners(
- query_params='loadbalancer_id={lb_id}'.format(lb_id=lb_id))
- self.assertEqual(0, len(primary))
-
- # Test that a user without the lb member role cannot list listeners
+ # Test that a different users cannot see the lb_member listeners.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_primary', 'os_roles_lb_member2']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_admin', 'os_primary',
+ 'os_roles_lb_member2', 'os_roles_lb_observer',
+ 'os_roles_lb_global_observer']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.listener_client.list_listeners)
+ expected_allowed = ['os_roles_lb_observer', 'os_roles_lb_member2']
+ if expected_allowed:
+ self.check_list_RBAC_enforcement_count(
+ 'ListenerClient', 'list_listeners', expected_allowed, 0,
+ query_params='loadbalancer_id={lb_id}'.format(lb_id=lb_id))
+
+ # Test credentials that should see these listeners can see them.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_admin', 'os_roles_lb_member',
+ 'os_roles_lb_global_observer']
+ if expected_allowed:
+ self.check_list_IDs_RBAC_enforcement(
+ 'ListenerClient', 'list_listeners', expected_allowed,
+ test_ids,
+ query_params='loadbalancer_id={lb_id}'.format(lb_id=lb_id))
+
+ # Test that users without the lb member role cannot list listeners.
+ # Note: non-owners can still call this API, they will just get the list
+ # of health monitors for their project (zero). The above tests
+ # are intended to cover the cross project use case.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_primary', 'os_roles_lb_admin',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
+ # Note: os_admin is here because it evaluaties to "project_admin"
+ # in oslo_policy and since keystone considers "project_admin"
+ # a superscope of "project_reader". This means it can read
+ # objects in the "admin" credential's project.
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_admin', 'os_primary', 'os_system_admin',
+ 'os_system_reader', 'os_roles_lb_observer',
+ 'os_roles_lb_global_observer',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
+ if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_admin', 'os_roles_lb_observer',
+ 'os_roles_lb_global_observer',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
+ if expected_allowed:
+ self.check_list_RBAC_enforcement(
+ 'ListenerClient', 'list_listeners', expected_allowed,
+ query_params='loadbalancer_id={lb_id}'.format(lb_id=lb_id))
# Check the default sort order, created_at
listeners = self.mem_listener_client.list_listeners(
@@ -786,33 +856,24 @@
self.api_version, '2.12'):
self.assertEqual(self.allowed_cidrs, listener[const.ALLOWED_CIDRS])
- # Test that a user with lb_admin role can see the listener
+ # Test that the appropriate users can see or not see the listener
+ # based on the API RBAC.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_member']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- listener_client = self.os_roles_lb_admin.listener_client
- listener_adm = listener_client.show_listener(listener[const.ID])
- self.assertEqual(listener_name, listener_adm[const.NAME])
-
- # Test that a user with cloud admin role can see the listener
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- adm = self.os_admin.listener_client.show_listener(
- listener[const.ID])
- self.assertEqual(listener_name, adm[const.NAME])
-
- # Test that a different user, with load balancer member role, cannot
- # see this listener
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.listener_client
- self.assertRaises(exceptions.Forbidden,
- member2_client.show_listener,
- listener[const.ID])
-
- # Test that a user, without the load balancer member role, cannot
- # show listeners
- if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.listener_client.show_listener,
- listener[const.ID])
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_admin',
+ 'os_roles_lb_global_observer',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_show_RBAC_enforcement(
+ 'ListenerClient', 'show_listener',
+ expected_allowed, listener[const.ID])
@decorators.idempotent_id('aaae0298-5778-4c7e-a27a-01549a71b319')
def test_http_listener_update(self):
@@ -943,7 +1004,7 @@
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
self.assertRaises(
exceptions.Forbidden,
- self.os_primary.listener_client.update_listener,
+ self.listener_client.update_listener,
listener[const.ID], admin_state_up=True)
# Assert we didn't go into PENDING_*
@@ -956,7 +1017,7 @@
# Test that a user, without the load balancer member role, cannot
# update this listener
if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.listener_client
+ member2_client = self.member2_listener_client
self.assertRaises(exceptions.Forbidden,
member2_client.update_listener,
listener[const.ID], admin_state_up=True)
@@ -1127,13 +1188,13 @@
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
self.assertRaises(
exceptions.Forbidden,
- self.os_primary.listener_client.delete_listener,
+ self.listener_client.delete_listener,
listener[const.ID])
# Test that a different user, with the load balancer member role
# cannot delete this listener
if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.listener_client
+ member2_client = self.member2_listener_client
self.assertRaises(exceptions.Forbidden,
member2_client.delete_listener,
listener[const.ID])
@@ -1226,13 +1287,13 @@
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
self.assertRaises(
exceptions.Forbidden,
- self.os_primary.listener_client.get_listener_stats,
+ self.listener_client.get_listener_stats,
listener[const.ID])
# Test that a different user, with the load balancer role, cannot see
# the listener stats
if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.listener_client
+ member2_client = self.member2_listener_client
self.assertRaises(exceptions.Forbidden,
member2_client.get_listener_stats,
listener[const.ID])
diff --git a/octavia_tempest_plugin/tests/api/v2/test_load_balancer.py b/octavia_tempest_plugin/tests/api/v2/test_load_balancer.py
index 0cf3576..7ade642 100644
--- a/octavia_tempest_plugin/tests/api/v2/test_load_balancer.py
+++ b/octavia_tempest_plugin/tests/api/v2/test_load_balancer.py
@@ -14,6 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
+import copy
import testtools
import time
from uuid import UUID
@@ -78,13 +79,25 @@
self._setup_lb_network_kwargs(lb_kwargs, ip_version, use_fixed_ip=True)
- # Test that a user without the load balancer role cannot
- # create a load balancer
+ # Test that a user without the loadbalancer role cannot
+ # create a load balancer.
+ lb_kwargs_with_project_id = copy.deepcopy(lb_kwargs)
+ lb_kwargs_with_project_id[const.PROJECT_ID] = (
+ self.os_roles_lb_member.credentials.project_id)
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_primary', 'os_roles_lb_admin',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.loadbalancer_client.create_loadbalancer,
- **lb_kwargs)
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
+ if expected_allowed:
+ self.check_create_RBAC_enforcement(
+ 'LoadbalancerClient', 'create_loadbalancer',
+ expected_allowed, None, None, **lb_kwargs_with_project_id)
lb = self.mem_lb_client.create_loadbalancer(**lb_kwargs)
@@ -173,21 +186,21 @@
CONF.load_balancer.lb_build_interval,
CONF.load_balancer.lb_build_timeout)
- # Test that a user without the load balancer role cannot
- # delete this load balancer
+ # Test that a user without the loadbalancer role cannot delete this
+ # load balancer.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_member']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.loadbalancer_client.delete_loadbalancer,
- lb[const.ID])
-
- # Test that a different user, with the load balancer member role
- # cannot delete this load balancer
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.loadbalancer_client
- self.assertRaises(exceptions.Forbidden,
- member2_client.delete_loadbalancer,
- lb[const.ID])
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_delete_RBAC_enforcement(
+ 'LoadbalancerClient', 'delete_loadbalancer',
+ expected_allowed, None, None, lb[const.ID])
self.mem_lb_client.delete_loadbalancer(lb[const.ID])
@@ -222,21 +235,21 @@
# TODO(johnsom) Add other objects when we have clients for them
- # Test that a user without the load balancer role cannot
- # delete this load balancer
+ # Test that a user without the loadbalancer role cannot delete this
+ # load balancer.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_member']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.loadbalancer_client.delete_loadbalancer,
- lb[const.ID], cascade=True)
-
- # Test that a different user, with the load balancer member role
- # cannot delete this load balancer
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.loadbalancer_client
- self.assertRaises(exceptions.Forbidden,
- member2_client.delete_loadbalancer,
- lb[const.ID], cascade=True)
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_delete_RBAC_enforcement(
+ 'LoadbalancerClient', 'delete_loadbalancer',
+ expected_allowed, None, None, lb[const.ID], cascade=True)
self.mem_lb_client.delete_loadbalancer(lb[const.ID], cascade=True)
@@ -267,6 +280,8 @@
* List the load balancers filtering to one of the three.
* List the load balancers filtered, one field, and sorted.
"""
+ # IDs of load balancers created in the test
+ test_ids = []
# Get a list of pre-existing LBs to filter from test data
pretest_lbs = self.mem_lb_client.list_loadbalancers()
# Store their IDs for easy access
@@ -313,6 +328,7 @@
const.ONLINE,
CONF.load_balancer.check_interval,
CONF.load_balancer.check_timeout)
+ test_ids.append(lb1[const.ID])
# Time resolution for created_at is only to the second, and we need to
# ensure that each object has a distinct creation time. Delaying one
@@ -356,6 +372,7 @@
const.ONLINE,
CONF.load_balancer.check_interval,
CONF.load_balancer.check_timeout)
+ test_ids.append(lb2[const.ID])
# Time resolution for created_at is only to the second, and we need to
# ensure that each object has a distinct creation time. Delaying one
@@ -394,19 +411,64 @@
const.ACTIVE,
CONF.load_balancer.lb_build_interval,
CONF.load_balancer.lb_build_timeout)
+ test_ids.append(lb3[const.ID])
- # Test that a different user cannot list load balancers
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.loadbalancer_client
- primary = member2_client.list_loadbalancers()
- self.assertEqual(0, len(primary))
-
- # Test that a user without the lb member role cannot list load
- # balancers
+ # Test that a different users cannot see the lb_member load balancers.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_primary', 'os_roles_lb_member2']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_admin', 'os_primary',
+ 'os_roles_lb_member2', 'os_roles_lb_observer',
+ 'os_roles_lb_global_observer']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.loadbalancer_client.list_loadbalancers)
+ expected_allowed = ['os_roles_lb_observer', 'os_roles_lb_member2']
+ if expected_allowed:
+ self.check_list_RBAC_enforcement_count(
+ 'LoadbalancerClient', 'list_loadbalancers',
+ expected_allowed, 0)
+
+ # Test credentials that should see these load balancers can see them.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_admin', 'os_roles_lb_member',
+ 'os_roles_lb_global_observer']
+ if expected_allowed:
+ self.check_list_IDs_RBAC_enforcement(
+ 'LoadbalancerClient', 'list_loadbalancers',
+ expected_allowed, test_ids)
+
+ # Test that users without the lb member role cannot list load balancers
+ # Note: non-owners can still call this API, they will just get the list
+ # of load balancers for their project (zero). The above tests
+ # are intended to cover the cross project use case.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_primary', 'os_roles_lb_admin',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
+ # Note: os_admin is here because it evaluaties to "project_admin"
+ # in oslo_policy and since keystone considers "project_admin"
+ # a superscope of "project_reader". This means it can read
+ # objects in the "admin" credential's project.
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_admin', 'os_primary', 'os_system_admin',
+ 'os_system_reader', 'os_roles_lb_observer',
+ 'os_roles_lb_global_observer',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
+ if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_admin', 'os_roles_lb_observer',
+ 'os_roles_lb_global_observer',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
+ if expected_allowed:
+ self.check_list_RBAC_enforcement(
+ 'LoadbalancerClient', 'list_loadbalancers', expected_allowed)
# Check the default sort order, created_at
lbs = self.mem_lb_client.list_loadbalancers()
@@ -566,33 +628,24 @@
self.assertEqual(lb_kwargs[const.VIP_SUBNET_ID],
lb[const.VIP_SUBNET_ID])
- # Test that a user with lb_admin role can see the load balanacer
+ # Test that the appropriate users can see or not see the load
+ # balancer based on the API RBAC.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_member']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- lb_client = self.os_roles_lb_admin.loadbalancer_client
- lb_adm = lb_client.show_loadbalancer(lb[const.ID])
- self.assertEqual(lb_name, lb_adm[const.NAME])
-
- # Test that a user with cloud admin role can see the load balanacer
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- adm = self.os_admin.loadbalancer_client.show_loadbalancer(
- lb[const.ID])
- self.assertEqual(lb_name, adm[const.NAME])
-
- # Test that a different user, with load balancer member role, cannot
- # see this load balancer
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.loadbalancer_client
- self.assertRaises(exceptions.Forbidden,
- member2_client.show_loadbalancer,
- lb[const.ID])
-
- # Test that a user, without the load balancer member role, cannot
- # show load balancers
- if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.loadbalancer_client.show_loadbalancer,
- lb[const.ID])
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_admin',
+ 'os_roles_lb_global_observer',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_show_RBAC_enforcement(
+ 'LoadbalancerClient', 'show_loadbalancer',
+ expected_allowed, lb[const.ID])
# Attempt to clean up so that one full test run doesn't start 10+
# amps before the cleanup phase fires
@@ -679,26 +732,22 @@
new_description = data_utils.arbitrary_string(size=255,
base_text='new')
- # Test that a user, without the load balancer member role, cannot
- # use this command
+ # Test that a user, without the loadbalancer member role, cannot
+ # update this load balancer.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_member']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.loadbalancer_client.update_loadbalancer,
- lb[const.ID], admin_state_up=True)
-
- # Assert we didn't go into PENDING_*
- lb_check = self.mem_lb_client.show_loadbalancer(lb[const.ID])
- self.assertEqual(const.ACTIVE, lb_check[const.PROVISIONING_STATUS])
- self.assertFalse(lb_check[const.ADMIN_STATE_UP])
-
- # Test that a user, without the load balancer member role, cannot
- # update this load balancer
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.loadbalancer_client
- self.assertRaises(exceptions.Forbidden,
- member2_client.update_loadbalancer,
- lb[const.ID], admin_state_up=True)
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_update_RBAC_enforcement(
+ 'LoadbalancerClient', 'update_loadbalancer',
+ expected_allowed, None, None, lb[const.ID],
+ admin_state_up=True)
# Assert we didn't go into PENDING_*
lb_check = self.mem_lb_client.show_loadbalancer(lb[const.ID])
@@ -775,21 +824,24 @@
CONF.load_balancer.lb_build_interval,
CONF.load_balancer.lb_build_timeout)
- # Test that a user, without the load balancer member role, cannot
- # use this command
+ # Test that the appropriate users can see or not see the load
+ # balancer stats based on the API RBAC.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_member']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.loadbalancer_client.get_loadbalancer_stats,
- lb[const.ID])
-
- # Test that a different user, with the load balancer role, cannot see
- # the load balancer stats
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.loadbalancer_client
- self.assertRaises(exceptions.Forbidden,
- member2_client.get_loadbalancer_stats,
- lb[const.ID])
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_admin',
+ 'os_roles_lb_global_observer',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_show_RBAC_enforcement(
+ 'LoadbalancerClient', 'get_loadbalancer_stats',
+ expected_allowed, lb[const.ID])
stats = self.mem_lb_client.get_loadbalancer_stats(lb[const.ID])
@@ -843,21 +895,24 @@
CONF.load_balancer.check_interval,
CONF.load_balancer.check_timeout)
- # Test that a user, without the load balancer member role, cannot
- # use this method
+ # Test that the appropriate users can see or not see the load
+ # balancer status based on the API RBAC.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_member']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.loadbalancer_client.get_loadbalancer_status,
- lb[const.ID])
-
- # Test that a different user, with load balancer role, cannot see
- # the load balancer status
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.loadbalancer_client
- self.assertRaises(exceptions.Forbidden,
- member2_client.get_loadbalancer_status,
- lb[const.ID])
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_admin',
+ 'os_roles_lb_global_observer',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_show_RBAC_enforcement(
+ 'LoadbalancerClient', 'get_loadbalancer_status',
+ expected_allowed, lb[const.ID])
status = self.mem_lb_client.get_loadbalancer_status(lb[const.ID])
@@ -917,6 +972,20 @@
self.mem_lb_client.failover_loadbalancer,
lb[const.ID])
+ # Test that a user without the load balancer admin role cannot
+ # failover a load balancer.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+ if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+ if expected_allowed:
+ self.check_update_RBAC_enforcement(
+ 'LoadbalancerClient', 'failover_loadbalancer',
+ expected_allowed, None, None, lb[const.ID])
+
# Assert we didn't go into PENDING_*
lb = self.mem_lb_client.show_loadbalancer(lb[const.ID])
self.assertEqual(const.ACTIVE, lb[const.PROVISIONING_STATUS])
@@ -926,8 +995,9 @@
query_params='{loadbalancer_id}={lb_id}'.format(
loadbalancer_id=const.LOADBALANCER_ID, lb_id=lb[const.ID]))
- self.os_roles_lb_admin.loadbalancer_client.failover_loadbalancer(
- lb[const.ID])
+ admin_lb_client = (
+ self.os_roles_lb_admin.load_balancer_v2.LoadbalancerClient())
+ admin_lb_client.failover_loadbalancer(lb[const.ID])
lb = waiters.wait_for_status(self.mem_lb_client.show_loadbalancer,
lb[const.ID], const.PROVISIONING_STATUS,
diff --git a/octavia_tempest_plugin/tests/api/v2/test_member.py b/octavia_tempest_plugin/tests/api/v2/test_member.py
index 66e367e..aa7cf25 100644
--- a/octavia_tempest_plugin/tests/api/v2/test_member.py
+++ b/octavia_tempest_plugin/tests/api/v2/test_member.py
@@ -887,11 +887,30 @@
# Test that a user without the load balancer role cannot
# create a member
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
+ member_client = self.os_primary.load_balancer_v2.MemberClient()
self.assertRaises(
exceptions.Forbidden,
- self.os_primary.member_client.create_member,
+ member_client.create_member,
**member_kwargs)
+ # Test that a user without the loadbalancer role cannot
+ # create a member.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_create_RBAC_enforcement(
+ 'MemberClient', 'create_member',
+ expected_allowed,
+ status_method=self.mem_lb_client.show_loadbalancer,
+ obj_id=self.lb_id, **member_kwargs)
+
member = self.mem_member_client.create_member(**member_kwargs)
waiters.wait_for_status(
@@ -1048,6 +1067,9 @@
* List the members filtering to one of the three.
* List the members filtered, one field, and sorted.
"""
+ # IDs of members created in the test
+ test_ids = []
+
if (algorithm == const.LB_ALGORITHM_SOURCE_IP_PORT and not
self.mem_listener_client.is_version_supported(
self.api_version, '2.13')):
@@ -1124,6 +1146,7 @@
const.ACTIVE,
CONF.load_balancer.check_interval,
CONF.load_balancer.check_timeout)
+ test_ids.append(member1[const.ID])
# Time resolution for created_at is only to the second, and we need to
# ensure that each object has a distinct creation time. Delaying one
# second is both a simple and a reliable way to accomplish this.
@@ -1162,6 +1185,7 @@
const.ACTIVE,
CONF.load_balancer.check_interval,
CONF.load_balancer.check_timeout)
+ test_ids.append(member2[const.ID])
# Time resolution for created_at is only to the second, and we need to
# ensure that each object has a distinct creation time. Delaying one
# second is both a simple and a reliable way to accomplish this.
@@ -1200,22 +1224,45 @@
const.ACTIVE,
CONF.load_balancer.check_interval,
CONF.load_balancer.check_timeout)
+ test_ids.append(member3[const.ID])
- # Test that a different user cannot list members
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.member_client
- self.assertRaises(
- exceptions.Forbidden,
- member2_client.list_members,
- pool_id)
-
- # Test that a user without the lb member role cannot list load
- # balancers
+ # Test credentials that should see these members can see them.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_member']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.member_client.list_members,
- pool_id)
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_admin', 'os_roles_lb_member',
+ 'os_roles_lb_global_observer']
+ if expected_allowed:
+ self.check_list_IDs_RBAC_enforcement(
+ 'MemberClient', 'list_members', expected_allowed,
+ test_ids, pool_id)
+
+ # Test that users without the lb member role cannot list members
+ # Note: The parent pool ID blocks non-owners from listing members.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ # Note: os_admin is here because it evaluaties to "project_admin"
+ # in oslo_policy and since keystone considers "project_admin"
+ # a superscope of "project_reader". This means it can read
+ # objects in the "admin" credential's project.
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_admin', 'os_system_admin',
+ 'os_system_reader', 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_admin',
+ 'os_roles_lb_global_observer',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_list_RBAC_enforcement(
+ 'MemberClient', 'list_members', expected_allowed, pool_id)
# Check the default sort order, created_at
members = self.mem_member_client.list_members(pool_id)
@@ -1740,34 +1787,25 @@
for item in equal_items:
self.assertEqual(member_kwargs[item], member[item])
- # Test that a user with lb_admin role can see the member
+ # Test that the appropriate users can see or not see the member
+ # based on the API RBAC.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_member']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- member_client = self.os_roles_lb_admin.member_client
- member_adm = member_client.show_member(
- member[const.ID], pool_id=pool_id)
- self.assertEqual(member_name, member_adm[const.NAME])
-
- # Test that a user with cloud admin role can see the member
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- adm = self.os_admin.member_client.show_member(
- member[const.ID], pool_id=pool_id)
- self.assertEqual(member_name, adm[const.NAME])
-
- # Test that a different user, with load balancer member role, cannot
- # see this member
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.member_client
- self.assertRaises(exceptions.Forbidden,
- member2_client.show_member,
- member[const.ID], pool_id=pool_id)
-
- # Test that a user, without the load balancer member role, cannot
- # show members
- if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.member_client.show_member,
- member[const.ID], pool_id=pool_id)
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_admin',
+ 'os_roles_lb_global_observer',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_show_RBAC_enforcement(
+ 'MemberClient', 'show_member',
+ expected_allowed, member[const.ID],
+ pool_id=pool_id)
@decorators.idempotent_id('65680d48-1d49-4959-a7d1-677797e54f6b')
def test_HTTP_LC_alt_monitor_member_update(self):
@@ -2206,30 +2244,22 @@
for item in equal_items:
self.assertEqual(member_kwargs[item], member[item])
- # Test that a user, without the load balancer member role, cannot
- # use this command
+ # Test that a user, without the loadbalancer member role, cannot
+ # update this member.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_member']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.member_client.update_member,
- member[const.ID], pool_id=pool_id, admin_state_up=True)
-
- # Assert we didn't go into PENDING_*
- member_check = self.mem_member_client.show_member(
- member[const.ID], pool_id=pool_id)
- self.assertEqual(const.ACTIVE,
- member_check[const.PROVISIONING_STATUS])
- self.assertEqual(member_kwargs[const.ADMIN_STATE_UP],
- member_check[const.ADMIN_STATE_UP])
-
- # Test that a user, without the load balancer member role, cannot
- # update this member
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.member_client
- self.assertRaises(exceptions.Forbidden,
- member2_client.update_member,
- member[const.ID], pool_id=pool_id,
- admin_state_up=True)
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_update_RBAC_enforcement(
+ 'MemberClient', 'update_member',
+ expected_allowed, None, None, member[const.ID],
+ pool_id=pool_id, admin_state_up=True)
# Assert we didn't go into PENDING_*
member_check = self.mem_member_client.show_member(
@@ -2672,12 +2702,21 @@
member2_kwargs.pop(const.POOL_ID)
batch_update_list = [member2_kwargs, member3_kwargs]
- # Test that a user, without the load balancer member role, cannot
- # use this command
+ # Test that a user, without the loadbalancer member role, cannot
+ # batch update this member.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_member']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.member_client.update_members,
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_update_RBAC_enforcement(
+ 'MemberClient', 'update_members',
+ expected_allowed, None, None,
pool_id=pool_id, members_list=batch_update_list)
# Assert we didn't go into PENDING_*
@@ -2908,21 +2947,22 @@
CONF.load_balancer.build_interval,
CONF.load_balancer.build_timeout)
- # Test that a user without the load balancer role cannot
- # delete this member
+ # Test that a user without the loadbalancer role cannot delete this
+ # member.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_member']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.member_client.delete_member,
- member[const.ID], pool_id=pool_id)
-
- # Test that a different user, with the load balancer member role
- # cannot delete this member
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.member_client
- self.assertRaises(exceptions.Forbidden,
- member2_client.delete_member,
- member[const.ID], pool_id=pool_id)
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_delete_RBAC_enforcement(
+ 'MemberClient', 'delete_member',
+ expected_allowed, None, None, member[const.ID],
+ pool_id=pool_id)
self.mem_member_client.delete_member(member[const.ID],
pool_id=pool_id)
diff --git a/octavia_tempest_plugin/tests/api/v2/test_pool.py b/octavia_tempest_plugin/tests/api/v2/test_pool.py
index 3ab4892..ba31a8e 100644
--- a/octavia_tempest_plugin/tests/api/v2/test_pool.py
+++ b/octavia_tempest_plugin/tests/api/v2/test_pool.py
@@ -401,13 +401,23 @@
else:
pool_kwargs[const.LOADBALANCER_ID] = self.lb_id
- # Test that a user without the load balancer role cannot
- # create a pool
+ # Test that a user without the loadbalancer role cannot
+ # create a pool.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_member']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.pool_client.create_pool,
- **pool_kwargs)
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_create_RBAC_enforcement(
+ 'PoolClient', 'create_pool',
+ expected_allowed,
+ status_method=self.mem_lb_client.show_loadbalancer,
+ obj_id=self.lb_id, **pool_kwargs)
# This is a special case as the reference driver does not support
# SOURCE-IP-PORT. Since it runs with not_implemented_is_error, we must
@@ -585,6 +595,9 @@
* List the pools filtering to one of the three.
* List the pools filtered, one field, and sorted.
"""
+ # IDs of pools created in the test
+ test_ids = []
+
if (algorithm == const.LB_ALGORITHM_SOURCE_IP_PORT and not
self.mem_listener_client.is_version_supported(
self.api_version, '2.13')):
@@ -655,6 +668,7 @@
const.ACTIVE,
CONF.load_balancer.build_interval,
CONF.load_balancer.build_timeout)
+ test_ids.append(pool1[const.ID])
# Time resolution for created_at is only to the second, and we need to
# ensure that each object has a distinct creation time. Delaying one
# second is both a simple and a reliable way to accomplish this.
@@ -694,6 +708,7 @@
const.ACTIVE,
CONF.load_balancer.build_interval,
CONF.load_balancer.build_timeout)
+ test_ids.append(pool2[const.ID])
# Time resolution for created_at is only to the second, and we need to
# ensure that each object has a distinct creation time. Delaying one
# second is both a simple and a reliable way to accomplish this.
@@ -733,20 +748,65 @@
const.ACTIVE,
CONF.load_balancer.build_interval,
CONF.load_balancer.build_timeout)
+ test_ids.append(pool3[const.ID])
- # Test that a different user cannot list pools
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.pool_client
- primary = member2_client.list_pools(
- query_params='loadbalancer_id={lb_id}'.format(lb_id=lb_id))
- self.assertEqual(0, len(primary))
-
- # Test that a user without the lb member role cannot list load
- # balancers
+ # Test that a different users cannot see the lb_member pools.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_primary', 'os_roles_lb_member2']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_admin', 'os_primary',
+ 'os_roles_lb_member2', 'os_roles_lb_observer',
+ 'os_roles_lb_global_observer']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.pool_client.list_pools)
+ expected_allowed = ['os_roles_lb_observer', 'os_roles_lb_member2']
+ if expected_allowed:
+ self.check_list_RBAC_enforcement_count(
+ 'PoolClient', 'list_pools', expected_allowed, 0,
+ query_params='loadbalancer_id={lb_id}'.format(lb_id=lb_id))
+
+ # Test credentials that should see these pools can see them.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_admin', 'os_roles_lb_member',
+ 'os_roles_lb_global_observer']
+ if expected_allowed:
+ self.check_list_IDs_RBAC_enforcement(
+ 'PoolClient', 'list_pools', expected_allowed, test_ids,
+ query_params='loadbalancer_id={lb_id}'.format(lb_id=lb_id))
+
+ # Test that users without the lb member role cannot list pools.
+ # Note: non-owners can still call this API, they will just get the list
+ # of pools for their project (zero). The above tests
+ # are intended to cover the cross project use case.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_primary', 'os_roles_lb_admin',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
+ # Note: os_admin is here because it evaluaties to "project_admin"
+ # in oslo_policy and since keystone considers "project_admin"
+ # a superscope of "project_reader". This means it can read
+ # objects in the "admin" credential's project.
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_admin', 'os_primary', 'os_system_admin',
+ 'os_system_reader', 'os_roles_lb_observer',
+ 'os_roles_lb_global_observer',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
+ if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_admin', 'os_roles_lb_observer',
+ 'os_roles_lb_global_observer',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
+ if expected_allowed:
+ self.check_list_RBAC_enforcement(
+ 'PoolClient', 'list_pools', expected_allowed,
+ query_params='loadbalancer_id={lb_id}'.format(lb_id=lb_id))
# Check the default sort order, created_at
pools = self.mem_pool_client.list_pools(
@@ -960,7 +1020,7 @@
@decorators.idempotent_id('bd732c36-bdaa-4591-bf4e-28268874d22c')
def test_UDP_RR_source_IP_pool_show(self):
self._test_pool_show(
- const.HTTP, const.LB_ALGORITHM_ROUND_ROBIN,
+ const.UDP, const.LB_ALGORITHM_ROUND_ROBIN,
session_persistence=const.SESSION_PERSISTENCE_SOURCE_IP)
def _test_pool_show(self, pool_protocol, algorithm,
@@ -1062,33 +1122,24 @@
self.assertEqual(const.SESSION_PERSISTENCE_SOURCE_IP,
pool[const.SESSION_PERSISTENCE][const.TYPE])
- # Test that a user with lb_admin role can see the pool
+ # Test that the appropriate users can see or not see the pool
+ # based on the API RBAC.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_member']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- pool_client = self.os_roles_lb_admin.pool_client
- pool_adm = pool_client.show_pool(pool[const.ID])
- self.assertEqual(pool_name, pool_adm[const.NAME])
-
- # Test that a user with cloud admin role can see the pool
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- adm = self.os_admin.pool_client.show_pool(
- pool[const.ID])
- self.assertEqual(pool_name, adm[const.NAME])
-
- # Test that a different user, with load balancer member role, cannot
- # see this pool
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.pool_client
- self.assertRaises(exceptions.Forbidden,
- member2_client.show_pool,
- pool[const.ID])
-
- # Test that a user, without the load balancer member role, cannot
- # show pools
- if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.pool_client.show_pool,
- pool[const.ID])
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_admin',
+ 'os_roles_lb_global_observer',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_show_RBAC_enforcement(
+ 'PoolClient', 'show_pool',
+ expected_allowed, pool[const.ID])
@decorators.idempotent_id('d73755fe-ba3a-4248-9543-8e167a5aa7f4')
def test_HTTP_LC_pool_update(self):
@@ -1195,7 +1246,7 @@
@decorators.idempotent_id('28b90650-a612-4b10-981f-d3dd6a366e4f')
def test_UDP_RR_source_IP_pool_update(self):
self._test_pool_update(
- const.HTTP, const.LB_ALGORITHM_ROUND_ROBIN,
+ const.UDP, const.LB_ALGORITHM_ROUND_ROBIN,
session_persistence=const.SESSION_PERSISTENCE_SOURCE_IP)
def _test_pool_update(self, pool_protocol, algorithm,
@@ -1306,28 +1357,22 @@
self.assertEqual(const.SESSION_PERSISTENCE_SOURCE_IP,
pool[const.SESSION_PERSISTENCE][const.TYPE])
- # Test that a user, without the load balancer member role, cannot
- # use this command
+ # Test that a user, without the loadbalancer member role, cannot
+ # update this pool.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_member']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.pool_client.update_pool,
- pool[const.ID], admin_state_up=True)
-
- # Assert we didn't go into PENDING_*
- pool_check = self.mem_pool_client.show_pool(
- pool[const.ID])
- self.assertEqual(const.ACTIVE,
- pool_check[const.PROVISIONING_STATUS])
- self.assertFalse(pool_check[const.ADMIN_STATE_UP])
-
- # Test that a user, without the load balancer member role, cannot
- # update this pool
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.pool_client
- self.assertRaises(exceptions.Forbidden,
- member2_client.update_pool,
- pool[const.ID], admin_state_up=True)
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_update_RBAC_enforcement(
+ 'PoolClient', 'update_pool',
+ expected_allowed, None, None, pool[const.ID],
+ admin_state_up=True)
# Assert we didn't go into PENDING_*
pool_check = self.mem_pool_client.show_pool(
@@ -1545,7 +1590,7 @@
@decorators.idempotent_id('cc69c0d0-9191-4faf-a154-e33df880f44e')
def test_UDP_RR_source_IP_pool_delete(self):
self._test_pool_delete(
- const.HTTP, const.LB_ALGORITHM_ROUND_ROBIN,
+ const.UDP, const.LB_ALGORITHM_ROUND_ROBIN,
session_persistence=const.SESSION_PERSISTENCE_SOURCE_IP)
def _test_pool_delete(self, pool_protocol, algorithm,
@@ -1609,21 +1654,21 @@
CONF.load_balancer.build_interval,
CONF.load_balancer.build_timeout)
- # Test that a user without the load balancer role cannot
- # delete this pool
+ # Test that a user without the loadbalancer role cannot delete this
+ # pool.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_member']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.pool_client.delete_pool,
- pool[const.ID])
-
- # Test that a different user, with the load balancer member role
- # cannot delete this pool
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.pool_client
- self.assertRaises(exceptions.Forbidden,
- member2_client.delete_pool,
- pool[const.ID])
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_delete_RBAC_enforcement(
+ 'PoolClient', 'delete_pool',
+ expected_allowed, None, None, pool[const.ID])
self.mem_pool_client.delete_pool(pool[const.ID])
diff --git a/octavia_tempest_plugin/tests/api/v2/test_provider.py b/octavia_tempest_plugin/tests/api/v2/test_provider.py
index 917b365..9a9dd28 100644
--- a/octavia_tempest_plugin/tests/api/v2/test_provider.py
+++ b/octavia_tempest_plugin/tests/api/v2/test_provider.py
@@ -15,7 +15,6 @@
from oslo_log import log as logging
from tempest import config
from tempest.lib import decorators
-from tempest.lib import exceptions
from octavia_tempest_plugin.common import constants as const
from octavia_tempest_plugin.tests import test_base
@@ -44,10 +43,24 @@
# Test that a user without the load balancer role cannot
# list providers.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = [
+ 'os_admin', 'os_primary', 'os_roles_lb_admin',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_admin', 'os_primary', 'os_system_admin',
+ 'os_system_reader', 'os_roles_lb_observer',
+ 'os_roles_lb_global_observer',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.provider_client.list_providers)
+ expected_allowed = [
+ 'os_system_admin', 'os_system_reader', 'os_roles_lb_observer',
+ 'os_roles_lb_global_observer', 'os_roles_lb_admin',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
+ if expected_allowed:
+ self.check_list_RBAC_enforcement(
+ 'ProviderClient', 'list_providers', expected_allowed)
providers = self.mem_provider_client.list_providers()
diff --git a/octavia_tempest_plugin/tests/scenario/v2/test_ipv6_traffic_ops.py b/octavia_tempest_plugin/tests/scenario/v2/test_ipv6_traffic_ops.py
index 1495b82..4efa241 100644
--- a/octavia_tempest_plugin/tests/scenario/v2/test_ipv6_traffic_ops.py
+++ b/octavia_tempest_plugin/tests/scenario/v2/test_ipv6_traffic_ops.py
@@ -297,7 +297,7 @@
@decorators.idempotent_id('843a13f7-e00f-4151-8817-b5395eb69b52')
def test_ipv6_tcp_LC_listener_with_allowed_cidrs(self):
self._test_listener_with_allowed_cidrs(
- const.TCP, 91, const.LB_ALGORITHM_LEAST_CONNECTIONS)
+ const.TCP, 91, const.LB_ALGORITHM_LEAST_CONNECTIONS, delay=0.2)
@decorators.idempotent_id('cc0d55b1-87e8-4a87-bf50-66299947a469')
def test_ipv6_udp_LC_listener_with_allowed_cidrs(self):
@@ -350,7 +350,7 @@
const.UDP, 101, const.LB_ALGORITHM_SOURCE_IP_PORT)
def _test_listener_with_allowed_cidrs(self, protocol, protocol_port,
- algorithm):
+ algorithm, delay=None):
"""Tests traffic through a loadbalancer with allowed CIDRs set.
* Set up listener with allowed CIDRS (allow all) on a loadbalancer.
@@ -476,7 +476,7 @@
self.check_members_balanced(
self.lb_vip_address, protocol=protocol,
protocol_port=protocol_port, persistent=False,
- traffic_member_count=members)
+ traffic_member_count=members, delay=delay)
listener_kwargs = {
const.LISTENER_ID: listener_id,
@@ -493,6 +493,8 @@
# wait until Neutron completes the SG update.
# See https://bugs.launchpad.net/neutron/+bug/1866353.
def expect_timeout_error(address, protocol, protocol_port):
+ if protocol != const.UDP:
+ address = "[{}]".format(address)
try:
self.make_request(address, protocol=protocol,
protocol_port=protocol_port)
diff --git a/octavia_tempest_plugin/tests/scenario/v2/test_traffic_ops.py b/octavia_tempest_plugin/tests/scenario/v2/test_traffic_ops.py
index 356327b..d4d43b5 100644
--- a/octavia_tempest_plugin/tests/scenario/v2/test_traffic_ops.py
+++ b/octavia_tempest_plugin/tests/scenario/v2/test_traffic_ops.py
@@ -15,6 +15,7 @@
import datetime
import ipaddress
import shlex
+import socket
import testtools
import time
@@ -90,7 +91,8 @@
@classmethod
def _listener_pool_create(cls, protocol, protocol_port,
- pool_algorithm=const.LB_ALGORITHM_ROUND_ROBIN):
+ pool_algorithm=const.LB_ALGORITHM_ROUND_ROBIN,
+ insert_headers_dic=None):
if (protocol == const.UDP and
not cls.mem_listener_client.is_version_supported(
cls.api_version, '2.1')):
@@ -112,6 +114,10 @@
# haproxy process and use haproxy>=1.8:
const.CONNECTION_LIMIT: 200,
}
+
+ if insert_headers_dic:
+ listener_kwargs[const.INSERT_HEADERS] = insert_headers_dic
+
listener = cls.mem_listener_client.create_listener(**listener_kwargs)
waiters.wait_for_status(cls.mem_lb_client.show_loadbalancer,
@@ -138,9 +144,8 @@
return listener[const.ID], pool[const.ID]
def _test_basic_traffic(
- self, protocol, protocol_port, listener_id, pool_id,
- persistent=True, traffic_member_count=2, source_port=None,
- delay=None):
+ self, protocol, protocol_port, pool_id, persistent=True,
+ traffic_member_count=2, source_port=None, delay=None):
"""Tests sending traffic through a loadbalancer
* Set up members on a loadbalancer.
@@ -204,15 +209,15 @@
'Traffic tests will not work in noop mode.')
@decorators.idempotent_id('6751135d-e15a-4e22-89f4-bfcc3408d424')
def test_basic_http_traffic(self):
- listener_id, pool_id = self._listener_pool_create(const.HTTP, 80)
- self._test_basic_traffic(const.HTTP, 80, listener_id, pool_id)
+ pool_id = self._listener_pool_create(const.HTTP, 80)[1]
+ self._test_basic_traffic(const.HTTP, 80, pool_id)
@testtools.skipIf(CONF.load_balancer.test_with_noop,
'Traffic tests will not work in noop mode.')
@decorators.idempotent_id('332a08e0-eff1-4c19-b46c-bf87148a6d84')
def test_basic_tcp_traffic(self):
- listener_id, pool_id = self._listener_pool_create(const.TCP, 81)
- self._test_basic_traffic(const.TCP, 81, listener_id, pool_id,
+ pool_id = self._listener_pool_create(const.TCP, 81)[1]
+ self._test_basic_traffic(const.TCP, 81, pool_id,
persistent=False)
@testtools.skipIf(CONF.load_balancer.test_with_noop,
@@ -223,11 +228,11 @@
self.api_version, '2.1'):
raise self.skipException('UDP listener support is only available '
'in Octavia API version 2.1 or newer')
- listener_id, pool_id = self._listener_pool_create(const.UDP, 8080)
- self._test_basic_traffic(const.UDP, 8080, listener_id, pool_id)
+ pool_id = self._listener_pool_create(const.UDP, 8080)[1]
+ self._test_basic_traffic(const.UDP, 8080, pool_id)
def _test_healthmonitor_traffic(self, protocol, protocol_port,
- listener_id, pool_id, persistent=True):
+ pool_id, persistent=True):
"""Tests traffic is correctly routed based on healthmonitor status
* Create three members:
@@ -462,13 +467,13 @@
@decorators.idempotent_id('a16f8eb4-a77c-4b0e-8b1b-91c237039713')
def test_healthmonitor_http_traffic(self):
- listener_id, pool_id = self._listener_pool_create(const.HTTP, 82)
- self._test_healthmonitor_traffic(const.HTTP, 82, listener_id, pool_id)
+ pool_id = self._listener_pool_create(const.HTTP, 82)[1]
+ self._test_healthmonitor_traffic(const.HTTP, 82, pool_id)
@decorators.idempotent_id('22f00c34-343b-4aa9-90be-4567ecf85772')
def test_healthmonitor_tcp_traffic(self):
- listener_id, pool_id = self._listener_pool_create(const.TCP, 83)
- self._test_healthmonitor_traffic(const.TCP, 83, listener_id, pool_id,
+ pool_id = self._listener_pool_create(const.TCP, 83)[1]
+ self._test_healthmonitor_traffic(const.TCP, 83, pool_id,
persistent=False)
@decorators.idempotent_id('80b86513-1a76-4e42-91c9-cb23c879e536')
@@ -478,8 +483,8 @@
raise self.skipException('UDP listener support is only available '
'in Octavia API version 2.1 or newer')
- listener_id, pool_id = self._listener_pool_create(const.UDP, 8081)
- self._test_healthmonitor_traffic(const.UDP, 8081, listener_id, pool_id)
+ pool_id = self._listener_pool_create(const.UDP, 8081)[1]
+ self._test_healthmonitor_traffic(const.UDP, 8081, pool_id)
@decorators.idempotent_id('3558186d-6dcd-4d9d-b7f7-adc190b66149')
def test_http_l7policies_and_l7rules(self):
@@ -727,8 +732,7 @@
headers={'reject': 'true'})
def _test_mixed_ipv4_ipv6_members_traffic(self, protocol, protocol_port,
- listener_id, pool_id,
- persistent=True):
+ pool_id, persistent=True):
"""Tests traffic through a loadbalancer with IPv4 and IPv6 members.
* Set up members on a loadbalancer.
@@ -795,9 +799,8 @@
'Mixed IPv4/IPv6 member test requires IPv6.')
@decorators.idempotent_id('20b6b671-0101-4bed-a249-9af6ee3aa6d9')
def test_mixed_ipv4_ipv6_members_http_traffic(self):
- listener_id, pool_id = self._listener_pool_create(const.HTTP, 85)
- self._test_mixed_ipv4_ipv6_members_traffic(const.HTTP, 85,
- listener_id, pool_id)
+ pool_id = self._listener_pool_create(const.HTTP, 85)[1]
+ self._test_mixed_ipv4_ipv6_members_traffic(const.HTTP, 85, pool_id)
@testtools.skipIf(CONF.load_balancer.test_with_noop,
'Traffic tests will not work in noop mode.')
@@ -805,10 +808,9 @@
'Mixed IPv4/IPv6 member test requires IPv6.')
@decorators.idempotent_id('c442ae84-0abc-4470-8c7e-14a07e92a6fa')
def test_mixed_ipv4_ipv6_members_tcp_traffic(self):
- listener_id, pool_id = self._listener_pool_create(const.TCP, 86)
+ pool_id = self._listener_pool_create(const.TCP, 86)[1]
self._test_mixed_ipv4_ipv6_members_traffic(const.TCP, 86,
- listener_id, pool_id,
- persistent=False)
+ pool_id, persistent=False)
@testtools.skipIf(CONF.load_balancer.test_with_noop,
'Traffic tests will not work in noop mode.')
@@ -827,26 +829,26 @@
self.api_version, '2.1'):
raise self.skipException('UDP listener support is only available '
'in Octavia API version 2.1 or newer')
- listener_id, pool_id = self._listener_pool_create(const.UDP, 8082)
- self._test_mixed_ipv4_ipv6_members_traffic(const.UDP, 8082,
- listener_id, pool_id)
+ pool_id = self._listener_pool_create(const.UDP, 8082)[1]
+ self._test_mixed_ipv4_ipv6_members_traffic(const.UDP, 8082, pool_id)
@testtools.skipIf(CONF.load_balancer.test_with_noop,
'Traffic tests will not work in noop mode.')
@decorators.idempotent_id('a58063fb-b9e8-4cfc-8a8c-7b2e9e884e7a')
def test_least_connections_http_traffic(self):
- listener_id, pool_id = self._listener_pool_create(
+ pool_id = self._listener_pool_create(
const.HTTP, 87,
- pool_algorithm=const.LB_ALGORITHM_LEAST_CONNECTIONS)
- self._test_basic_traffic(const.HTTP, 87, listener_id, pool_id)
+ pool_algorithm=const.LB_ALGORITHM_LEAST_CONNECTIONS)[1]
+ self._test_basic_traffic(const.HTTP, 87, pool_id)
@testtools.skipIf(CONF.load_balancer.test_with_noop,
'Traffic tests will not work in noop mode.')
@decorators.idempotent_id('e1056709-6a1a-4a15-80c2-5cbb8279f924')
def test_least_connections_tcp_traffic(self):
- listener_id, pool_id = self._listener_pool_create(
- const.TCP, 88, pool_algorithm=const.LB_ALGORITHM_LEAST_CONNECTIONS)
- self._test_basic_traffic(const.TCP, 88, listener_id, pool_id,
+ pool_id = self._listener_pool_create(
+ const.TCP, 88,
+ pool_algorithm=const.LB_ALGORITHM_LEAST_CONNECTIONS)[1]
+ self._test_basic_traffic(const.TCP, 88, pool_id,
persistent=False, delay=0.2)
@testtools.skipIf(CONF.load_balancer.test_with_noop,
@@ -857,28 +859,28 @@
self.api_version, '2.1'):
raise self.skipException('UDP listener support is only available '
'in Octavia API version 2.1 or newer')
- listener_id, pool_id = self._listener_pool_create(
+ pool_id = self._listener_pool_create(
const.UDP, 8083,
- pool_algorithm=const.LB_ALGORITHM_LEAST_CONNECTIONS)
- self._test_basic_traffic(const.UDP, 8083, listener_id, pool_id)
+ pool_algorithm=const.LB_ALGORITHM_LEAST_CONNECTIONS)[1]
+ self._test_basic_traffic(const.UDP, 8083, pool_id)
@testtools.skipIf(CONF.load_balancer.test_with_noop,
'Traffic tests will not work in noop mode.')
@decorators.idempotent_id('881cc3e9-a011-4043-b0e3-a6185f736053')
def test_source_ip_http_traffic(self):
- listener_id, pool_id = self._listener_pool_create(
+ pool_id = self._listener_pool_create(
const.HTTP, 89,
- pool_algorithm=const.LB_ALGORITHM_SOURCE_IP)
- self._test_basic_traffic(const.HTTP, 89, listener_id, pool_id,
+ pool_algorithm=const.LB_ALGORITHM_SOURCE_IP)[1]
+ self._test_basic_traffic(const.HTTP, 89, pool_id,
traffic_member_count=1, persistent=False)
@testtools.skipIf(CONF.load_balancer.test_with_noop,
'Traffic tests will not work in noop mode.')
@decorators.idempotent_id('4568db0e-4243-4191-a822-9d327a55fa64')
def test_source_ip_tcp_traffic(self):
- listener_id, pool_id = self._listener_pool_create(
- const.TCP, 90, pool_algorithm=const.LB_ALGORITHM_SOURCE_IP)
- self._test_basic_traffic(const.TCP, 90, listener_id, pool_id,
+ pool_id = self._listener_pool_create(
+ const.TCP, 90, pool_algorithm=const.LB_ALGORITHM_SOURCE_IP)[1]
+ self._test_basic_traffic(const.TCP, 90, pool_id,
traffic_member_count=1, persistent=False)
@testtools.skipIf(CONF.load_balancer.test_with_noop,
@@ -889,10 +891,10 @@
self.api_version, '2.1'):
raise self.skipException('UDP listener support is only available '
'in Octavia API version 2.1 or newer')
- listener_id, pool_id = self._listener_pool_create(
+ pool_id = self._listener_pool_create(
const.UDP, 8084,
- pool_algorithm=const.LB_ALGORITHM_SOURCE_IP)
- self._test_basic_traffic(const.UDP, 8084, listener_id, pool_id,
+ pool_algorithm=const.LB_ALGORITHM_SOURCE_IP)[1]
+ self._test_basic_traffic(const.UDP, 8084, pool_id,
traffic_member_count=1, persistent=False)
@testtools.skipIf(CONF.load_balancer.test_with_noop,
@@ -903,11 +905,11 @@
# this test. Since it runs with not_implemented_is_error, we must
# handle this test case special.
try:
- listener_id, pool_id = self._listener_pool_create(
+ pool_id = self._listener_pool_create(
const.HTTP, 60091,
- pool_algorithm=const.LB_ALGORITHM_SOURCE_IP_PORT)
+ pool_algorithm=const.LB_ALGORITHM_SOURCE_IP_PORT)[1]
self._test_basic_traffic(
- const.HTTP, 60091, listener_id, pool_id,
+ const.HTTP, 60091, pool_id,
traffic_member_count=1, persistent=False, source_port=60091)
except exceptions.NotImplemented as e:
message = ("The configured provider driver '{driver}' "
@@ -931,7 +933,7 @@
# Without a delay this can trigger a "Cannot assign requested
# address" warning setting the source port, leading to failure
self._test_basic_traffic(
- const.TCP, 60092, listener_id, pool_id, traffic_member_count=1,
+ const.TCP, 60092, pool_id, traffic_member_count=1,
persistent=False, source_port=60092, delay=0.2)
except exceptions.NotImplemented as e:
message = ("The configured provider driver '{driver}' "
@@ -953,11 +955,11 @@
# this test. Since it runs with not_implemented_is_error, we must
# handle this test case special.
try:
- listener_id, pool_id = self._listener_pool_create(
+ pool_id = self._listener_pool_create(
const.UDP, 8085,
- pool_algorithm=const.LB_ALGORITHM_SOURCE_IP_PORT)
+ pool_algorithm=const.LB_ALGORITHM_SOURCE_IP_PORT)[1]
self._test_basic_traffic(
- const.UDP, 8085, listener_id, pool_id, traffic_member_count=1,
+ const.UDP, 8085, pool_id, traffic_member_count=1,
persistent=False, source_port=8085)
except exceptions.NotImplemented as e:
message = ("The configured provider driver '{driver}' "
@@ -1322,3 +1324,81 @@
protocol_port)
self.assertConsistentResponse(
(None, None), url_for_vip, repeat=3, expect_connection_error=True)
+
+ @decorators.idempotent_id('d3a28e76-76bc-11eb-a7c3-74e5f9e2a801')
+ def test_insert_headers(self):
+ # Create listener, enable insert of "X_FORWARDED_FOR" HTTP header
+ listener_port = 102
+ listener_id, pool_id = self._listener_pool_create(
+ const.HTTP, listener_port, insert_headers_dic={
+ const.X_FORWARDED_FOR: "true"})
+ self._test_basic_traffic(
+ const.HTTP, listener_port, pool_id)
+
+ # Initiate HTTP traffic
+ test_url = 'http://{}:{}/request'.format(
+ self.lb_vip_address, listener_port)
+ data = self.validate_URL_response(test_url)
+ LOG.info('Received payload is: {}'.format(data))
+
+ # Detect source IP that is used to create TCP socket toward LB_VIP.
+ try:
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ s.connect((self.lb_vip_address, listener_port))
+ client_source_ip = s.getsockname()[0]
+ s.close()
+ except Exception:
+ LOG.exception('Failed to initiate TCP socket toward LB_VIP')
+ raise Exception('LB_VIP is not available')
+
+ # Function needed to parse the received payload from backend.
+ # Returns dictionary of relevant headers if found.
+ def _data_parser(payload, relevant_headers):
+ retrieved_headers = {}
+ for line in payload.split('\n'):
+ try:
+ key, value = line.split(': ', 1)
+ except ValueError:
+ continue
+ if key in relevant_headers:
+ retrieved_headers[key] = value.lower()
+ return retrieved_headers
+
+ # Make sure that "X_FORWARDED_FOR" header was inserted with
+ # expected IP (client_source_ip). Should present in data.
+ expected_headers = {const.X_FORWARDED_FOR: client_source_ip}
+ received_headers = _data_parser(data, expected_headers)
+ self.assertEqual(expected_headers, received_headers)
+
+ # Update listener to insert: "X_FORWARDED_PORT" and
+ # "X_FORWARDED_PROTO"type headers.
+ listener_kwargs = {
+ const.LISTENER_ID: listener_id,
+ const.INSERT_HEADERS: {
+ const.X_FORWARDED_PORT: "true",
+ const.X_FORWARDED_PROTO: "true"}}
+ self.mem_listener_client.update_listener(**listener_kwargs)
+ waiters.wait_for_status(self.mem_lb_client.show_loadbalancer,
+ self.lb_id, const.PROVISIONING_STATUS,
+ const.ACTIVE,
+ CONF.load_balancer.check_interval,
+ CONF.load_balancer.check_timeout)
+
+ # Initiate HTTP traffic
+ data = self.validate_URL_response(test_url)
+ LOG.info('Received payload is: {}'.format(data))
+ expected_headers = {const.X_FORWARDED_PORT: '{}'.format(
+ listener_port), const.X_FORWARDED_PROTO: const.HTTP.lower()}
+ received_headers = _data_parser(data, expected_headers)
+ self.assertEqual(expected_headers, received_headers)
+
+ @decorators.idempotent_id('2b05229c-0254-11eb-8610-74e5f9e2a801')
+ def test_tcp_and_udp_traffic_on_same_port(self):
+ common_vip_port = 103
+ listener_id_udp, pool_id_udp = self._listener_pool_create(
+ const.UDP, common_vip_port)
+ listener_id_tcp, pool_id_tcp = self._listener_pool_create(
+ const.TCP, common_vip_port)
+ self._test_basic_traffic(const.UDP, common_vip_port, pool_id_udp)
+ self._test_basic_traffic(const.TCP, common_vip_port, pool_id_tcp,
+ persistent=False)
diff --git a/octavia_tempest_plugin/tests/spare_pool_scenario/v2/test_spare_pool.py b/octavia_tempest_plugin/tests/spare_pool_scenario/v2/test_spare_pool.py
index e641bc4..0ccfe55 100644
--- a/octavia_tempest_plugin/tests/spare_pool_scenario/v2/test_spare_pool.py
+++ b/octavia_tempest_plugin/tests/spare_pool_scenario/v2/test_spare_pool.py
@@ -57,10 +57,10 @@
* Send traffic through load balancer
* Validate amphora spare pool size is restored
"""
-
+ amphora_client = self.os_admin.load_balancer_v2.AmphoraClient()
# Check there is at least one amphora in spare pool
spare_amps = waiters.wait_for_spare_amps(
- self.os_admin.amphora_client.list_amphorae,
+ amphora_client.list_amphorae,
CONF.load_balancer.lb_build_interval,
CONF.load_balancer.lb_build_timeout)
@@ -100,7 +100,7 @@
# Confirm the spare pool has changed since last check
spare_amps_2 = waiters.wait_for_spare_amps(
- self.os_admin.amphora_client.list_amphorae,
+ amphora_client.list_amphorae,
CONF.load_balancer.lb_build_interval,
CONF.load_balancer.lb_build_timeout)
self.assertNotEqual(spare_amps, spare_amps_2)
@@ -180,12 +180,12 @@
# Check there is at least one amphora in spare pool
spare_amps = waiters.wait_for_spare_amps(
- self.os_admin.amphora_client.list_amphorae,
+ amphora_client.list_amphorae,
CONF.load_balancer.lb_build_interval,
CONF.load_balancer.lb_build_timeout)
# Delete amphora compute instance
- amp = self.os_admin.amphora_client.list_amphorae(
+ amp = amphora_client.list_amphorae(
query_params='{loadbalancer_id}={lb_id}'.format(
loadbalancer_id=const.LOADBALANCER_ID, lb_id=self.lb_id))
@@ -211,12 +211,12 @@
# Confirm the spare pool has changed since last check
spare_amps_2 = waiters.wait_for_spare_amps(
- self.os_admin.amphora_client.list_amphorae,
+ amphora_client.list_amphorae,
CONF.load_balancer.lb_build_interval,
CONF.load_balancer.lb_build_timeout)
self.assertNotEqual(spare_amps, spare_amps_2)
# Check there is at least one amphora in spare pool
- waiters.wait_for_spare_amps(self.os_admin.amphora_client.list_amphorae,
+ waiters.wait_for_spare_amps(amphora_client.list_amphorae,
CONF.load_balancer.lb_build_interval,
CONF.load_balancer.lb_build_timeout)
diff --git a/octavia_tempest_plugin/tests/test_base.py b/octavia_tempest_plugin/tests/test_base.py
index 8a2c3c7..e7a344d 100644
--- a/octavia_tempest_plugin/tests/test_base.py
+++ b/octavia_tempest_plugin/tests/test_base.py
@@ -30,9 +30,9 @@
from tempest import test
import tenacity
-from octavia_tempest_plugin import clients
from octavia_tempest_plugin.common import cert_utils
from octavia_tempest_plugin.common import constants as const
+from octavia_tempest_plugin.tests import RBAC_tests
from octavia_tempest_plugin.tests import validators
from octavia_tempest_plugin.tests import waiters
@@ -45,17 +45,49 @@
RETRY_MAX = 5
-class LoadBalancerBaseTest(validators.ValidatorsMixin, test.BaseTestCase):
+class LoadBalancerBaseTest(validators.ValidatorsMixin,
+ RBAC_tests.RBACTestsMixin, test.BaseTestCase):
"""Base class for load balancer tests."""
- # Setup cls.os_roles_lb_member. cls.os_primary, cls.os_roles_lb_member,
- # and cls.os_roles_lb_admin credentials.
- credentials = ['admin', 'primary',
- ['lb_member', CONF.load_balancer.member_role],
- ['lb_member2', CONF.load_balancer.member_role],
- ['lb_admin', CONF.load_balancer.admin_role]]
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ credentials = [
+ 'admin', 'primary', ['lb_admin', CONF.load_balancer.admin_role],
+ ['lb_member', CONF.load_balancer.member_role],
+ ['lb_member2', CONF.load_balancer.member_role]]
+ elif CONF.load_balancer.enforce_new_defaults:
+ credentials = [
+ 'admin', 'primary', ['lb_admin', CONF.load_balancer.admin_role],
+ ['lb_observer', CONF.load_balancer.observer_role, 'reader'],
+ ['lb_global_observer', CONF.load_balancer.global_observer_role,
+ 'reader'],
+ ['lb_member', CONF.load_balancer.member_role, 'member'],
+ ['lb_member2', CONF.load_balancer.member_role, 'member'],
+ ['lb_member_not_default_member', CONF.load_balancer.member_role]]
+ else:
+ credentials = [
+ 'admin', 'primary', ['lb_admin', CONF.load_balancer.admin_role],
+ ['lb_observer', CONF.load_balancer.observer_role, 'reader'],
+ ['lb_global_observer', CONF.load_balancer.global_observer_role,
+ 'reader'],
+ ['lb_member', CONF.load_balancer.member_role],
+ ['lb_member2', CONF.load_balancer.member_role]]
- client_manager = clients.ManagerV2
+ # If scope enforcement is enabled, add in the system scope credentials.
+ # The project scope is already handled by the above credentials.
+ if CONF.enforce_scope.octavia:
+ credentials.extend(['system_admin', 'system_reader'])
+
+ # A tuple of credentials that will be allocated by tempest using the
+ # 'credentials' list above. These are used to build RBAC test lists.
+ allocated_creds = []
+ for cred in credentials:
+ if isinstance(cred, list):
+ allocated_creds.append('os_roles_' + cred[0])
+ else:
+ allocated_creds.append('os_' + cred)
+ # Tests shall not mess with the list of allocated credentials
+ allocated_credentials = tuple(allocated_creds)
+
webserver1_response = 1
webserver2_response = 5
used_ips = []
@@ -102,10 +134,36 @@
cls.set_network_resources()
super(LoadBalancerBaseTest, cls).setup_credentials()
+ # Log the user roles for this test run
+ role_name_cache = {}
+ for cred in cls.credentials:
+ user_roles = []
+ if isinstance(cred, list):
+ user_name = cred[0]
+ cred_obj = getattr(cls, 'os_roles_' + cred[0])
+ else:
+ user_name = cred
+ cred_obj = getattr(cls, 'os_' + cred)
+ params = {'user.id': cred_obj.credentials.user_id,
+ 'project.id': cred_obj.credentials.project_id}
+ roles = cls.os_admin.role_assignments_client.list_role_assignments(
+ **params)['role_assignments']
+ for role in roles:
+ role_id = role['role']['id']
+ try:
+ role_name = role_name_cache[role_id]
+ except KeyError:
+ role_name = cls.os_admin.roles_v3_client.show_role(
+ role_id)['role']['name']
+ role_name_cache[role_id] = role_name
+ user_roles.append([role_name, role['scope']])
+ LOG.info("User %s has roles: %s", user_name, user_roles)
+
@classmethod
def setup_clients(cls):
"""Setup client aliases."""
super(LoadBalancerBaseTest, cls).setup_clients()
+ lb_admin_prefix = cls.os_roles_lb_admin.load_balancer_v2
cls.lb_mem_float_ip_client = cls.os_roles_lb_member.floating_ips_client
cls.lb_mem_keypairs_client = cls.os_roles_lb_member.keypairs_client
cls.lb_mem_net_client = cls.os_roles_lb_member.networks_client
@@ -116,31 +174,41 @@
cls.os_roles_lb_member.security_group_rules_client)
cls.lb_mem_servers_client = cls.os_roles_lb_member.servers_client
cls.lb_mem_subnet_client = cls.os_roles_lb_member.subnets_client
- cls.mem_lb_client = cls.os_roles_lb_member.loadbalancer_client
- cls.mem_listener_client = cls.os_roles_lb_member.listener_client
- cls.mem_pool_client = cls.os_roles_lb_member.pool_client
- cls.mem_member_client = cls.os_roles_lb_member.member_client
+ cls.mem_lb_client = (
+ cls.os_roles_lb_member.load_balancer_v2.LoadbalancerClient())
+ cls.mem_listener_client = (
+ cls.os_roles_lb_member.load_balancer_v2.ListenerClient())
+ cls.mem_pool_client = (
+ cls.os_roles_lb_member.load_balancer_v2.PoolClient())
+ cls.mem_member_client = (
+ cls.os_roles_lb_member.load_balancer_v2.MemberClient())
cls.mem_healthmonitor_client = (
- cls.os_roles_lb_member.healthmonitor_client)
- cls.mem_l7policy_client = cls.os_roles_lb_member.l7policy_client
- cls.mem_l7rule_client = cls.os_roles_lb_member.l7rule_client
- cls.lb_admin_amphora_client = cls.os_roles_lb_admin.amphora_client
+ cls.os_roles_lb_member.load_balancer_v2.HealthMonitorClient())
+ cls.mem_l7policy_client = (
+ cls.os_roles_lb_member.load_balancer_v2.L7PolicyClient())
+ cls.mem_l7rule_client = (
+ cls.os_roles_lb_member.load_balancer_v2.L7RuleClient())
+ cls.lb_admin_amphora_client = lb_admin_prefix.AmphoraClient()
cls.lb_admin_flavor_profile_client = (
- cls.os_roles_lb_admin.flavor_profile_client)
- cls.lb_admin_flavor_client = cls.os_roles_lb_admin.flavor_client
- cls.mem_flavor_client = cls.os_roles_lb_member.flavor_client
- cls.mem_provider_client = cls.os_roles_lb_member.provider_client
+ lb_admin_prefix.FlavorProfileClient())
+ cls.lb_admin_flavor_client = lb_admin_prefix.FlavorClient()
+ cls.mem_flavor_client = (
+ cls.os_roles_lb_member.load_balancer_v2.FlavorClient())
+ cls.mem_provider_client = (
+ cls.os_roles_lb_member.load_balancer_v2.ProviderClient())
cls.os_admin_servers_client = cls.os_admin.servers_client
+ cls.os_admin_routers_client = cls.os_admin.routers_client
+ cls.os_admin_subnetpools_client = cls.os_admin.subnetpools_client
cls.lb_admin_flavor_capabilities_client = (
- cls.os_roles_lb_admin.flavor_capabilities_client)
+ lb_admin_prefix.FlavorCapabilitiesClient())
cls.lb_admin_availability_zone_capabilities_client = (
- cls.os_roles_lb_admin.availability_zone_capabilities_client)
+ lb_admin_prefix.AvailabilityZoneCapabilitiesClient())
cls.lb_admin_availability_zone_profile_client = (
- cls.os_roles_lb_admin.availability_zone_profile_client)
+ lb_admin_prefix.AvailabilityZoneProfileClient())
cls.lb_admin_availability_zone_client = (
- cls.os_roles_lb_admin.availability_zone_client)
+ lb_admin_prefix.AvailabilityZoneClient())
cls.mem_availability_zone_client = (
- cls.os_roles_lb_member.availability_zone_client)
+ cls.os_roles_lb_member.load_balancer_v2.AvailabilityZoneClient())
@classmethod
def resource_setup(cls):
@@ -327,33 +395,38 @@
# Create tenant VIP IPv6 subnet
if CONF.load_balancer.test_with_ipv6:
- # See if ipv6-private-subnet exists and use it if so.
- priv_ipv6_subnet = cls.os_admin.subnets_client.list_subnets(
- name='ipv6-private-subnet')['subnets']
-
cls.lb_member_vip_ipv6_subnet_stateful = False
- if len(priv_ipv6_subnet) == 1:
- if (priv_ipv6_subnet[0]['ipv6_address_mode'] ==
- 'dhcpv6-stateful'):
- cls.lb_member_vip_ipv6_subnet_stateful = True
- cls.lb_member_vip_ipv6_subnet = priv_ipv6_subnet[0]
- cls.lb_member_vip_ipv6_net = {
- 'id': priv_ipv6_subnet[0]['network_id']}
- else:
- subnet_kwargs = {
- 'name': data_utils.rand_name("lb_member_vip_ipv6_subnet"),
- 'network_id': cls.lb_member_vip_net['id'],
- 'cidr': CONF.load_balancer.vip_ipv6_subnet_cidr,
- 'ip_version': 6}
- result = cls.lb_mem_subnet_client.create_subnet(
- **subnet_kwargs)
- cls.lb_member_vip_ipv6_net = cls.lb_member_vip_net
- cls.lb_member_vip_ipv6_subnet = result['subnet']
- cls.addClassResourceCleanup(
- waiters.wait_for_not_found,
- cls._logging_delete_subnet,
- cls.lb_mem_subnet_client.show_subnet,
- cls.lb_member_vip_ipv6_subnet['id'])
+ cls.lb_member_vip_ipv6_subnet_use_subnetpool = False
+ subnet_kwargs = {
+ 'name': data_utils.rand_name("lb_member_vip_ipv6_subnet"),
+ 'network_id': cls.lb_member_vip_net['id'],
+ 'ip_version': 6}
+
+ # Use a CIDR from devstack's default IPv6 subnetpool if it exists,
+ # the subnetpool's cidr is routable from the devstack node
+ # through the default router
+ subnetpool_name = CONF.load_balancer.default_ipv6_subnetpool
+ if subnetpool_name:
+ subnetpool = cls.os_admin_subnetpools_client.list_subnetpools(
+ name=subnetpool_name)['subnetpools']
+ if len(subnetpool) == 1:
+ subnetpool = subnetpool[0]
+ subnet_kwargs['subnetpool_id'] = subnetpool['id']
+ cls.lb_member_vip_ipv6_subnet_use_subnetpool = True
+
+ if 'subnetpool_id' not in subnet_kwargs:
+ subnet_kwargs['cidr'] = (
+ CONF.load_balancer.vip_ipv6_subnet_cidr)
+
+ result = cls.lb_mem_subnet_client.create_subnet(
+ **subnet_kwargs)
+ cls.lb_member_vip_ipv6_net = cls.lb_member_vip_net
+ cls.lb_member_vip_ipv6_subnet = result['subnet']
+ cls.addClassResourceCleanup(
+ waiters.wait_for_not_found,
+ cls._logging_delete_subnet,
+ cls.lb_mem_subnet_client.show_subnet,
+ cls.lb_member_vip_ipv6_subnet['id'])
LOG.info('lb_member_vip_ipv6_subnet: {}'.format(
cls.lb_member_vip_ipv6_subnet))
@@ -769,6 +842,30 @@
cls.lb_member_router['id'],
subnet_id=cls.lb_member_vip_subnet['id'])
+ if (CONF.load_balancer.test_with_ipv6 and
+ CONF.load_balancer.default_router and
+ cls.lb_member_vip_ipv6_subnet_use_subnetpool):
+
+ router_name = CONF.load_balancer.default_router
+ # if lb_member_vip_ipv6_subnet uses devstack's subnetpool,
+ # plug the subnet into the default router
+ router = cls.os_admin.routers_client.list_routers(
+ name=router_name)['routers']
+
+ if len(router) == 1:
+ router = router[0]
+
+ # Add IPv6 VIP subnet to router1
+ cls.os_admin_routers_client.add_router_interface(
+ router['id'],
+ subnet_id=cls.lb_member_vip_ipv6_subnet['id'])
+ cls.addClassResourceCleanup(
+ waiters.wait_for_not_found,
+ cls.os_admin_routers_client.remove_router_interface,
+ cls.os_admin_routers_client.remove_router_interface,
+ router['id'],
+ subnet_id=cls.lb_member_vip_ipv6_subnet['id'])
+
# Add member subnet 1 to router
cls.lb_mem_routers_client.add_router_interface(
cls.lb_member_router['id'],
diff --git a/playbooks/prepare-ovn-multinode.yaml b/playbooks/prepare-ovn-multinode.yaml
new file mode 100644
index 0000000..a653a6c
--- /dev/null
+++ b/playbooks/prepare-ovn-multinode.yaml
@@ -0,0 +1,4 @@
+- hosts: all
+ roles:
+ - multi-node-bridge
+ - multi-node-setup
diff --git a/releasenotes/notes/Add-RBAC-scoped-tokens-tests-920aa35faf4a8c9d.yaml b/releasenotes/notes/Add-RBAC-scoped-tokens-tests-920aa35faf4a8c9d.yaml
new file mode 100644
index 0000000..d2d5d23
--- /dev/null
+++ b/releasenotes/notes/Add-RBAC-scoped-tokens-tests-920aa35faf4a8c9d.yaml
@@ -0,0 +1,17 @@
+---
+features:
+ - |
+ Added API test support for keystone default roles and scoped tokens.
+issues:
+ - |
+ Currently the API tests will not pass with the
+ keystone_default_roles-policy.yaml override file. This is due to the
+ tempest framework credentials do not yet support token scopes.
+ This issue is tracked in https://bugs.launchpad.net/tempest/+bug/1917168
+ Once that bug is fixed, octavia-tempest-plugin can be updated to use the
+ required scope in the test credentials.
+upgrade:
+ - |
+ Two new tempest.conf settings enable/disable keystone default roles and
+ scoped token testing, [enforce_scope] octavia = True/False and
+ [load_balancer] enforce_new_defaults = True/False.
diff --git a/zuul.d/jobs.yaml b/zuul.d/jobs.yaml
index 20aa0b4..eb5a593 100644
--- a/zuul.d/jobs.yaml
+++ b/zuul.d/jobs.yaml
@@ -9,6 +9,16 @@
- controller
- nodeset:
+ name: octavia-single-node-ubuntu-focal
+ nodes:
+ - name: controller
+ label: nested-virt-ubuntu-focal
+ groups:
+ - name: tempest
+ nodes:
+ - controller
+
+- nodeset:
name: octavia-single-node-centos-7
nodes:
- name: controller
@@ -32,9 +42,9 @@
name: octavia-two-node
nodes:
- name: controller
- label: nested-virt-ubuntu-bionic
+ label: nested-virt-ubuntu-focal
- name: controller2
- label: nested-virt-ubuntu-bionic
+ label: nested-virt-ubuntu-focal
groups:
- name: controller
nodes:
@@ -179,7 +189,7 @@
- job:
name: octavia-dsvm-live-base
parent: octavia-dsvm-base
- nodeset: octavia-single-node-ubuntu-bionic
+ nodeset: octavia-single-node-ubuntu-focal
timeout: 9000
required-projects:
- openstack/diskimage-builder
@@ -215,7 +225,7 @@
- job:
name: octavia-dsvm-live-base-ipv6-only
parent: octavia-dsvm-base-ipv6-only
- nodeset: octavia-single-node-ubuntu-bionic
+ nodeset: octavia-single-node-ubuntu-focal
timeout: 9000
required-projects:
- openstack/diskimage-builder
@@ -248,9 +258,17 @@
name: octavia-dsvm-live-two-node-base
parent: octavia-dsvm-base
nodeset: octavia-two-node
- timeout: 9000
+ timeout: 10800
required-projects:
- openstack/diskimage-builder
+ roles:
+ - zuul: openstack/neutron-tempest-plugin
+ pre-run: playbooks/prepare-ovn-multinode.yaml
+ vars:
+ zuul_copy_output:
+ '/var/log/dib-build': logs
+ '/var/log/octavia-amphora.log': logs
+ '/var/log/octavia-tenant-traffic.log': logs
host-vars:
controller:
configure_swap_size: 8192
@@ -290,10 +308,6 @@
octavia-tempest-plugin: https://opendev.org/openstack/octavia-tempest-plugin.git
tempest_plugins:
- octavia-tempest-plugin
- zuul_copy_output:
- '/var/log/dib-build' : logs
- '/var/log/octavia-amphora.log': logs
- '/var/log/octavia-tenant-traffic.log': logs
controller2:
configure_swap_size: 8192
devstack_localrc:
@@ -330,13 +344,8 @@
OCTAVIA_USE_PREGENERATED_CERTS: true
OCTAVIA_MGMT_PORT_IP: 192.168.0.4
devstack_plugins:
+ neutron: https://opendev.org/openstack/neutron.git
octavia: https://opendev.org/openstack/octavia.git
- octavia-tempest-plugin: https://opendev.org/openstack/octavia-tempest-plugin.git
- tempest_plugins:
- - octavia-tempest-plugin
- zuul_copy_output:
- '/var/log/octavia-amphora.log': logs
- '/var/log/octavia-tenant-traffic.log': logs
group-vars:
controller:
devstack_local_conf:
@@ -348,6 +357,9 @@
api_v1_enabled: False
amphora_agent:
forward_all_logs: True
+ "/$NEUTRON_CORE_PLUGIN_CONF":
+ ovn:
+ enable_distributed_floating_ip: True
devstack_services:
base: false
barbican: false
@@ -371,11 +383,16 @@
o-cw: true
o-hm: true
o-hk: true
+ ovn-controller: true
+ ovn-northd: true
+ ovn-vswitchd: true
+ ovsdb-server: true
placement-api: true
- q-agt: true
- q-dhcp: true
- q-l3: true
- q-meta: true
+ q-agt: false
+ q-dhcp: false
+ q-l3: false
+ q-meta: false
+ q-ovn-metadata-agent: true
q-svc: true
rabbit: true
tempest: true
@@ -390,6 +407,9 @@
api_v1_enabled: False
amphora_agent:
forward_all_logs: True
+ "/$NEUTRON_CORE_PLUGIN_CONF":
+ ovn:
+ enable_distributed_floating_ip: True
devstack_services:
c-vol: false
c-bak: false
@@ -398,7 +418,16 @@
o-cw: true
o-hm: true
o-hk: true
- q-agt: true
+ ovn-controller: true
+ ovn-northd: false
+ ovn-vswitchd: true
+ ovsdb-server: true
+ q-fake: true
+ q-agt: false
+ q-dhcp: false
+ q-l3: false
+ q-meta: false
+ q-ovn-metadata-agent: true
- job:
name: octavia-dsvm-noop-base
@@ -434,6 +463,7 @@
- job:
name: octavia-v2-dsvm-noop-api
parent: octavia-dsvm-noop-base
+ timeout: 10800
vars:
devstack_local_conf:
post-config:
@@ -456,6 +486,23 @@
- ^octavia_tempest_plugin/tests/(?!api/|\w+\.py).*
- job:
+ name: octavia-v2-dsvm-noop-api-scoped-tokens
+ parent: octavia-v2-dsvm-noop-api
+ vars:
+ devstack_local_conf:
+ post-config:
+ $OCTAVIA_CONF:
+ oslo_policy:
+ enforce_scope: True
+ enforce_new_defaults: True
+ test-config:
+ "$TEMPEST_CONFIG":
+ enforce_scope:
+ octavia: True
+ load_balancer:
+ enforce_new_defaults: True
+
+- job:
name: octavia-v2-dsvm-noop-py2-api
parent: octavia-v2-dsvm-noop-api
vars:
@@ -463,6 +510,11 @@
USE_PYTHON3: False
- job:
+ name: octavia-v2-dsvm-noop-api-stable-wallaby
+ parent: octavia-v2-dsvm-noop-api
+ override-checkout: stable/wallaby
+
+- job:
name: octavia-v2-dsvm-noop-api-stable-victoria
parent: octavia-v2-dsvm-noop-api
override-checkout: stable/victoria
@@ -470,11 +522,13 @@
- job:
name: octavia-v2-dsvm-noop-api-stable-ussuri
parent: octavia-v2-dsvm-noop-api
+ nodeset: octavia-single-node-ubuntu-bionic
override-checkout: stable/ussuri
- job:
name: octavia-v2-dsvm-noop-api-stable-train
parent: octavia-v2-dsvm-noop-api
+ nodeset: octavia-single-node-ubuntu-bionic
override-checkout: stable/train
- job:
@@ -532,6 +586,10 @@
override-checkout: 2.30.0
- job:
+ name: octavia-v2-dsvm-scenario-stable-wallaby
+ parent: octavia-v2-dsvm-scenario
+ override-checkout: stable/wallaby
+- job:
name: octavia-v2-dsvm-scenario-stable-victoria
parent: octavia-v2-dsvm-scenario
override-checkout: stable/victoria
@@ -539,11 +597,13 @@
- job:
name: octavia-v2-dsvm-scenario-stable-ussuri
parent: octavia-v2-dsvm-scenario
+ nodeset: octavia-single-node-ubuntu-bionic
override-checkout: stable/ussuri
- job:
name: octavia-v2-dsvm-scenario-stable-train
parent: octavia-v2-dsvm-scenario
+ nodeset: octavia-single-node-ubuntu-bionic
override-checkout: stable/train
# Legacy jobs for the transition to the act-stdby two node jobs
@@ -583,19 +643,30 @@
- job:
name: octavia-v2-act-stdby-dsvm-scenario-two-node
parent: octavia-dsvm-live-two-node-base
- vars:
- tempest_concurrency: 2
- tempest_test_regex: ^octavia_tempest_plugin.tests.scenario.v2
- tox_envlist: all
- devstack_local_conf:
- post-config:
+ group-vars:
+ controller:
+ tempest_concurrency: 2
+ tempest_test_regex: ^octavia_tempest_plugin.tests.scenario.v2
+ tox_envlist: all
+ devstack_local_conf:
+ post-config:
$OCTAVIA_CONF:
nova:
enable_anti_affinity: True
- test-config:
- "$TEMPEST_CONFIG":
- load_balancer:
- loadbalancer_topology: ACTIVE_STANDBY
+ controller_worker:
+ loadbalancer_topology: ACTIVE_STANDBY
+ test-config:
+ "$TEMPEST_CONFIG":
+ load_balancer:
+ loadbalancer_topology: ACTIVE_STANDBY
+ subnode:
+ devstack_local_conf:
+ post-config:
+ $OCTAVIA_CONF:
+ nova:
+ enable_anti_affinity: True
+ controller_worker:
+ loadbalancer_topology: ACTIVE_STANDBY
- job:
name: octavia-v2-dsvm-py2-scenario-centos-7
@@ -618,23 +689,12 @@
OCTAVIA_AMP_IMAGE_SIZE: 3
- job:
- name: octavia-v2-dsvm-scenario-ubuntu-bionic
+ name: octavia-v2-dsvm-scenario-ubuntu-focal
parent: octavia-v2-dsvm-scenario
vars:
devstack_localrc:
OCTAVIA_AMP_BASE_OS: ubuntu
- OCTAVIA_AMP_DISTRIBUTION_RELEASE_ID: bionic
-
-- job:
- name: octavia-v2-dsvm-scenario-ubuntu-xenial
- parent: octavia-v2-dsvm-scenario
- nodeset: openstack-single-node-xenial
- vars:
- devstack_localrc:
- OCTAVIA_AMP_BASE_OS: ubuntu
- OCTAVIA_AMP_DISTRIBUTION_RELEASE_ID: xenial
- USE_PYTHON3: false
- TEMPEST_BRANCH: 23.0.0
+ OCTAVIA_AMP_DISTRIBUTION_RELEASE_ID: focal
- job:
name: octavia-v2-dsvm-tls-barbican
@@ -662,6 +722,11 @@
- ^octavia_tempest_plugin/tests/(?!barbican_scenario/|\w+\.py).*
- job:
+ name: octavia-v2-dsvm-tls-barbican-stable-wallaby
+ parent: octavia-v2-dsvm-tls-barbican
+ override-checkout: stable/wallaby
+
+- job:
name: octavia-v2-dsvm-tls-barbican-stable-victoria
parent: octavia-v2-dsvm-tls-barbican
override-checkout: stable/victoria
@@ -669,11 +734,13 @@
- job:
name: octavia-v2-dsvm-tls-barbican-stable-ussuri
parent: octavia-v2-dsvm-tls-barbican
+ nodeset: octavia-single-node-ubuntu-bionic
override-checkout: stable/ussuri
- job:
name: octavia-v2-dsvm-tls-barbican-stable-train
parent: octavia-v2-dsvm-tls-barbican
+ nodeset: octavia-single-node-ubuntu-bionic
override-checkout: stable/train
- job:
@@ -714,6 +781,11 @@
override-checkout: 2.30.0
- job:
+ name: octavia-v2-dsvm-spare-pool-stable-wallaby
+ parent: octavia-v2-dsvm-spare-pool
+ override-checkout: stable/wallaby
+
+- job:
name: octavia-v2-dsvm-spare-pool-stable-victoria
parent: octavia-v2-dsvm-spare-pool
override-checkout: stable/victoria
@@ -721,11 +793,13 @@
- job:
name: octavia-v2-dsvm-spare-pool-stable-ussuri
parent: octavia-v2-dsvm-spare-pool
+ nodeset: octavia-single-node-ubuntu-bionic
override-checkout: stable/ussuri
- job:
name: octavia-v2-dsvm-spare-pool-stable-train
parent: octavia-v2-dsvm-spare-pool
+ nodeset: octavia-single-node-ubuntu-bionic
override-checkout: stable/train
- job:
@@ -858,6 +932,11 @@
tox_envlist: all
- job:
+ name: octavia-v2-act-stdby-dsvm-scenario-stable-wallaby
+ parent: octavia-v2-act-stdby-dsvm-scenario
+ override-checkout: stable/wallaby
+
+- job:
name: octavia-v2-act-stdby-dsvm-scenario-stable-victoria
parent: octavia-v2-act-stdby-dsvm-scenario
override-checkout: stable/victoria
@@ -865,11 +944,13 @@
- job:
name: octavia-v2-act-stdby-dsvm-scenario-stable-ussuri
parent: octavia-v2-act-stdby-dsvm-scenario
+ nodeset: octavia-single-node-ubuntu-bionic
override-checkout: stable/ussuri
- job:
name: octavia-v2-act-stdby-dsvm-scenario-stable-train
parent: octavia-v2-act-stdby-dsvm-scenario
+ nodeset: octavia-single-node-ubuntu-bionic
override-checkout: stable/train
######### Third party jobs ##########
diff --git a/zuul.d/projects.yaml b/zuul.d/projects.yaml
index d7a5ea4..02d3ac7 100644
--- a/zuul.d/projects.yaml
+++ b/zuul.d/projects.yaml
@@ -9,14 +9,18 @@
check:
jobs:
- octavia-v2-dsvm-noop-api
+ - octavia-v2-dsvm-noop-api-stable-wallaby
- octavia-v2-dsvm-noop-api-stable-victoria
- octavia-v2-dsvm-noop-api-stable-ussuri
- octavia-v2-dsvm-noop-api-stable-train
+ - octavia-v2-dsvm-noop-api-scoped-tokens
- octavia-v2-dsvm-scenario
+ - octavia-v2-dsvm-scenario-stable-wallaby
- octavia-v2-dsvm-scenario-stable-victoria
- octavia-v2-dsvm-scenario-stable-ussuri
- octavia-v2-dsvm-scenario-stable-train
- octavia-v2-dsvm-tls-barbican
+ - octavia-v2-dsvm-tls-barbican-stable-wallaby
- octavia-v2-dsvm-tls-barbican-stable-victoria
- octavia-v2-dsvm-tls-barbican-stable-ussuri
- octavia-v2-dsvm-tls-barbican-stable-train
@@ -28,6 +32,8 @@
voting: false
- octavia-v2-act-stdby-dsvm-scenario:
voting: false
+ - octavia-v2-act-stdby-dsvm-scenario-stable-wallaby:
+ voting: false
- octavia-v2-act-stdby-dsvm-scenario-stable-victoria:
voting: false
- octavia-v2-act-stdby-dsvm-scenario-stable-ussuri:
@@ -36,6 +42,8 @@
voting: false
- octavia-v2-dsvm-spare-pool:
voting: false
+ - octavia-v2-dsvm-spare-pool-stable-wallaby:
+ voting: false
- octavia-v2-dsvm-spare-pool-stable-victoria:
voting: false
- octavia-v2-dsvm-spare-pool-stable-ussuri:
@@ -54,14 +62,18 @@
queue: octavia
jobs:
- octavia-v2-dsvm-noop-api
+ - octavia-v2-dsvm-noop-api-stable-wallaby
- octavia-v2-dsvm-noop-api-stable-victoria
- octavia-v2-dsvm-noop-api-stable-ussuri
- octavia-v2-dsvm-noop-api-stable-train
+ - octavia-v2-dsvm-noop-api-scoped-tokens
- octavia-v2-dsvm-scenario
+ - octavia-v2-dsvm-scenario-stable-wallaby
- octavia-v2-dsvm-scenario-stable-victoria
- octavia-v2-dsvm-scenario-stable-ussuri
- octavia-v2-dsvm-scenario-stable-train
- octavia-v2-dsvm-tls-barbican
+ - octavia-v2-dsvm-tls-barbican-stable-wallaby
- octavia-v2-dsvm-tls-barbican-stable-victoria
- octavia-v2-dsvm-tls-barbican-stable-ussuri
- octavia-v2-dsvm-tls-barbican-stable-train