Merge "Update tests for scoped tokens and default roles"
diff --git a/octavia_tempest_plugin/common/constants.py b/octavia_tempest_plugin/common/constants.py
index 927aa84..aea476e 100644
--- a/octavia_tempest_plugin/common/constants.py
+++ b/octavia_tempest_plugin/common/constants.py
@@ -183,6 +183,7 @@
# RBAC options
ADVANCED = 'advanced'
+KEYSTONE_DEFAULT_ROLES = 'keystone_default_roles'
OWNERADMIN = 'owner_or_admin'
NONE = 'none'
diff --git a/octavia_tempest_plugin/config.py b/octavia_tempest_plugin/config.py
index 7af7a1f..4d1543b 100644
--- a/octavia_tempest_plugin/config.py
+++ b/octavia_tempest_plugin/config.py
@@ -86,6 +86,12 @@
cfg.StrOpt('admin_role',
default='load-balancer_admin',
help='The load balancing admin RBAC role.'),
+ cfg.StrOpt('observer_role',
+ default='load-balancer_observer',
+ help='The load balancing observer RBAC role.'),
+ cfg.StrOpt('global_observer_role',
+ default='load-balancer_global_observer',
+ help='The load balancing global observer RBAC role.'),
cfg.IntOpt('scp_connection_timeout',
default=5,
help='Timeout in seconds to wait for a '
@@ -97,10 +103,13 @@
default='octavia',
help='The provider driver to use for the tests.'),
cfg.StrOpt('RBAC_test_type', default=const.ADVANCED,
- choices=[const.ADVANCED, const.OWNERADMIN, const.NONE],
+ choices=[const.ADVANCED, const.KEYSTONE_DEFAULT_ROLES,
+ const.OWNERADMIN, const.NONE],
help='Type of RBAC tests to run. "advanced" runs the octavia '
'default RBAC tests. "owner_or_admin" runs the legacy '
- 'owner or admin tests. "none" disables the RBAC tests.'),
+ 'owner or admin tests. "keystone_default_roles" runs the '
+ 'tests using only the keystone default roles. "none" '
+ 'disables the RBAC tests.'),
cfg.DictOpt('enabled_provider_drivers',
help=('A comma separated list of dictionaries of the '
'enabled provider driver names and descriptions. '
@@ -224,6 +233,15 @@
default='/opt/octavia-tempest-plugin/test_server.bin',
help='Filesystem path to the test web server that will be '
'installed in the web server VMs.'),
+ # RBAC related options
+ # Note: Also see the enforce_scope section (from tempest) for Octavia API
+ # scope checking setting.
+ cfg.BoolOpt('enforce_new_defaults',
+ default=False,
+ help='Does the load-balancer service API policies enforce '
+ 'the new keystone default roles? This configuration '
+ 'value should be same as octavia.conf: '
+ '[oslo_policy].enforce_new_defaults option.'),
]
lb_feature_enabled_group = cfg.OptGroup(name='loadbalancer-feature-enabled',
@@ -268,3 +286,15 @@
"the tempest instance have access to the log files "
"specified in the tempest configuration."),
]
+
+# Extending this enforce_scope group defined in tempest
+enforce_scope_group = cfg.OptGroup(name="enforce_scope",
+ title="OpenStack Services with "
+ "enforce scope")
+EnforceScopeGroup = [
+ cfg.BoolOpt('octavia',
+ default=False,
+ help='Does the load-balancer service API policies enforce '
+ 'scope? This configuration value should be same as '
+ 'octavia.conf: [oslo_policy].enforce_scope option.'),
+]
diff --git a/octavia_tempest_plugin/plugin.py b/octavia_tempest_plugin/plugin.py
index ec093e7..655cdba 100644
--- a/octavia_tempest_plugin/plugin.py
+++ b/octavia_tempest_plugin/plugin.py
@@ -38,6 +38,8 @@
config.register_opt_group(conf,
project_config.lb_feature_enabled_group,
project_config.LBFeatureEnabledGroup)
+ config.register_opt_group(conf, project_config.enforce_scope_group,
+ project_config.EnforceScopeGroup)
def get_opt_lists(self):
return [
diff --git a/octavia_tempest_plugin/tests/RBAC_tests.py b/octavia_tempest_plugin/tests/RBAC_tests.py
new file mode 100644
index 0000000..b24e5c4
--- /dev/null
+++ b/octavia_tempest_plugin/tests/RBAC_tests.py
@@ -0,0 +1,472 @@
+# Copyright 2021 Red Hat, Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import copy
+
+from oslo_log import log as logging
+from tempest import config
+from tempest.lib import exceptions
+from tempest import test
+
+from octavia_tempest_plugin.common import constants
+from octavia_tempest_plugin.tests import waiters
+
+CONF = config.CONF
+LOG = logging.getLogger(__name__)
+
+
+class RBACTestsMixin(test.BaseTestCase):
+
+ def _check_allowed(self, client_str, method_str, allowed_list,
+ *args, **kwargs):
+ """Test an API call allowed RBAC enforcement.
+
+ :param client_str: The service client to use for the test, without the
+ credential. Example: 'amphora_client'
+ :param method_str: The method on the client to call for the test.
+ Example: 'list_amphorae'
+ :param allowed_list: The list of credentials expected to be
+ allowed. Example: ['os_roles_lb_member'].
+ :param args: Any positional parameters needed by the method.
+ :param kwargs: Any named parameters needed by the method.
+ :raises AssertionError: Raised if the RBAC tests fail.
+ :raises Forbidden: Raised if a credential that should have access does
+ not and is denied.
+ :raises InvalidScope: Raised if a credential that should have the
+ correct scope for access is denied.
+ :returns: None on success
+ """
+ for cred in allowed_list:
+ try:
+ cred_obj = getattr(self, cred)
+ except AttributeError:
+ # TODO(johnsom) Remove once scoped tokens is the default.
+ if ((cred == 'os_system_admin' or cred == 'os_system_reader')
+ and not CONF.enforce_scope.octavia):
+ LOG.info('Skipping %s allowed RBAC test because '
+ 'enforce_scope.octavia is not True', cred)
+ continue
+ else:
+ self.fail('Credential {} "expected_allowed" for RBAC '
+ 'testing was not created by tempest '
+ 'credentials setup. This is likely a bug in the '
+ 'test.'.format(cred))
+ client = getattr(cred_obj, client_str)
+ method = getattr(client, method_str)
+ try:
+ method(*args, **kwargs)
+ except exceptions.Forbidden as e:
+ self.fail('Method {}.{} failed to allow access via RBAC using '
+ 'credential {}. Error: {}'.format(
+ client_str, method_str, cred, str(e)))
+
+ def _check_disallowed(self, client_str, method_str, allowed_list,
+ status_method=None, obj_id=None, *args, **kwargs):
+ """Test an API call disallowed RBAC enforcement.
+
+ :param client_str: The service client to use for the test, without the
+ credential. Example: 'amphora_client'
+ :param method_str: The method on the client to call for the test.
+ Example: 'list_amphorae'
+ :param allowed_list: The list of credentials expected to be
+ allowed. Example: ['os_roles_lb_member'].
+ :param status_method: The service client method that will provide
+ the object status for a status change waiter.
+ :param obj_id: The ID of the object to check for the expected status
+ update.
+ :param args: Any positional parameters needed by the method.
+ :param kwargs: Any named parameters needed by the method.
+ :raises AssertionError: Raised if the RBAC tests fail.
+ :raises Forbidden: Raised if a credential that should have access does
+ not and is denied.
+ :raises InvalidScope: Raised if a credential that should have the
+ correct scope for access is denied.
+ :returns: None on success
+ """
+ expected_disallowed = (set(self.allocated_credentials) -
+ set(allowed_list))
+ for cred in expected_disallowed:
+ cred_obj = getattr(self, cred)
+ client = getattr(cred_obj, client_str)
+ method = getattr(client, method_str)
+
+ # Unfortunately tempest uses testtools assertRaises[1] which means
+ # we cannot use the unittest assertRaises context[2] with msg= to
+ # give a useful error.
+ # Also, testtools doesn't work with subTest[3], so we can't use
+ # that to expose the failing credential.
+ # This all means the exception raised testtools assertRaises
+ # is less than useful.
+ # TODO(johnsom) Remove this try block once testtools is useful.
+ # [1] https://testtools.readthedocs.io/en/latest/
+ # api.html#testtools.TestCase.assertRaises
+ # [2] https://docs.python.org/3/library/
+ # unittest.html#unittest.TestCase.assertRaises
+ # [3] https://github.com/testing-cabal/testtools/issues/235
+ try:
+ method(*args, **kwargs)
+ except exceptions.Forbidden:
+ if status_method:
+ waiters.wait_for_status(
+ status_method, obj_id,
+ constants.PROVISIONING_STATUS, constants.ACTIVE,
+ CONF.load_balancer.check_interval,
+ CONF.load_balancer.check_timeout)
+
+ continue
+ self.fail('Method {}.{} failed to deny access via RBAC using '
+ 'credential {}.'.format(client_str, method_str, cred))
+
+ def _list_get_RBAC_enforcement(self, client_str, method_str,
+ expected_allowed, *args, **kwargs):
+ """Test an API call RBAC enforcement.
+
+ :param client_str: The service client to use for the test, without the
+ credential. Example: 'amphora_client'
+ :param method_str: The method on the client to call for the test.
+ Example: 'list_amphorae'
+ :param expected_allowed: The list of credentials expected to be
+ allowed. Example: ['os_roles_lb_member'].
+ :param args: Any positional parameters needed by the method.
+ :param kwargs: Any named parameters needed by the method.
+ :raises AssertionError: Raised if the RBAC tests fail.
+ :raises Forbidden: Raised if a credential that should have access does
+ not and is denied.
+ :raises InvalidScope: Raised if a credential that should have the
+ correct scope for access is denied.
+ :returns: None on success
+ """
+
+ allowed_list = copy.deepcopy(expected_allowed)
+ # os_admin is a special case as it is valid with the old defaults,
+ # but will not be with the new defaults and/or token scoping.
+ # The old keystone role "admin" becomes project scoped "admin"
+ # instead of being a global admin.
+ # To keep the tests simple, handle that edge case here.
+ # TODO(johnsom) Once token scope is default, remove this.
+ if ('os_system_admin' in expected_allowed and
+ not CONF.load_balancer.enforce_new_defaults and
+ not CONF.enforce_scope.octavia):
+ allowed_list.append('os_admin')
+
+ # #### Test that disallowed credentials cannot access the API.
+ self._check_disallowed(client_str, method_str, allowed_list,
+ None, None, *args, **kwargs)
+
+ # #### Test that allowed credentials can access the API.
+ self._check_allowed(client_str, method_str, allowed_list,
+ *args, **kwargs)
+
+ def check_show_RBAC_enforcement(self, client_str, method_str,
+ expected_allowed, *args, **kwargs):
+ """Test an API show call RBAC enforcement.
+
+ :param client_str: The service client to use for the test, without the
+ credential. Example: 'amphora_client'
+ :param method_str: The method on the client to call for the test.
+ Example: 'list_amphorae'
+ :param expected_allowed: The list of credentials expected to be
+ allowed. Example: ['os_roles_lb_member'].
+ :param args: Any positional parameters needed by the method.
+ :param kwargs: Any named parameters needed by the method.
+ :raises AssertionError: Raised if the RBAC tests fail.
+ :raises Forbidden: Raised if a credential that should have access does
+ not and is denied.
+ :raises InvalidScope: Raised if a credential that should have the
+ correct scope for access is denied.
+ :returns: None on success
+ """
+ self._list_get_RBAC_enforcement(client_str, method_str,
+ expected_allowed, *args, **kwargs)
+
+ def check_list_RBAC_enforcement(self, client_str, method_str,
+ expected_allowed, *args, **kwargs):
+ """Test an API list call RBAC enforcement.
+
+ :param client_str: The service client to use for the test, without the
+ credential. Example: 'amphora_client'
+ :param method_str: The method on the client to call for the test.
+ Example: 'list_amphorae'
+ :param expected_allowed: The list of credentials expected to be
+ allowed. Example: ['os_roles_lb_member'].
+ :param args: Any positional parameters needed by the method.
+ :param kwargs: Any named parameters needed by the method.
+ :raises AssertionError: Raised if the RBAC tests fail.
+ :raises Forbidden: Raised if a credential that should have access does
+ not and is denied.
+ :raises InvalidScope: Raised if a credential that should have the
+ correct scope for access is denied.
+ :returns: None on success
+ """
+ self._list_get_RBAC_enforcement(client_str, method_str,
+ expected_allowed, *args, **kwargs)
+
+ def _CUD_RBAC_enforcement(self, client_str, method_str, expected_allowed,
+ status_method=None, obj_id=None,
+ *args, **kwargs):
+ """Test an API create/update/delete call RBAC enforcement.
+
+ :param client_str: The service client to use for the test, without the
+ credential. Example: 'amphora_client'
+ :param method_str: The method on the client to call for the test.
+ Example: 'list_amphorae'
+ :param expected_allowed: The list of credentials expected to be
+ allowed. Example: ['os_roles_lb_member'].
+ :param status_method: The service client method that will provide
+ the object status for a status change waiter.
+ :param obj_id: The ID of the object to check for the expected status
+ update.
+ :param args: Any positional parameters needed by the method.
+ :param kwargs: Any named parameters needed by the method.
+ :raises AssertionError: Raised if the RBAC tests fail.
+ :raises Forbidden: Raised if a credential that should have access does
+ not and is denied.
+ :raises InvalidScope: Raised if a credential that should have the
+ correct scope for access is denied.
+ :returns: None on success
+ """
+
+ allowed_list = copy.deepcopy(expected_allowed)
+ # os_admin is a special case as it is valid with the old defaults,
+ # but will not be with the new defaults and/or token scoping.
+ # The old keystone role "admin" becomes project scoped "admin"
+ # instead of being a global admin.
+ # To keep the tests simple, handle that edge case here.
+ # TODO(johnsom) Once token scope is default, remove this.
+ if ('os_system_admin' in expected_allowed and
+ not CONF.load_balancer.enforce_new_defaults and
+ not CONF.enforce_scope.octavia):
+ allowed_list.append('os_admin')
+
+ # #### Test that disallowed credentials cannot access the API.
+ self._check_disallowed(client_str, method_str, allowed_list,
+ status_method, obj_id, *args, **kwargs)
+
+ def check_create_RBAC_enforcement(
+ self, client_str, method_str, expected_allowed,
+ status_method=None, obj_id=None, *args, **kwargs):
+ """Test an API create call RBAC enforcement.
+
+ :param client_str: The service client to use for the test, without the
+ credential. Example: 'amphora_client'
+ :param method_str: The method on the client to call for the test.
+ Example: 'list_amphorae'
+ :param expected_allowed: The list of credentials expected to be
+ allowed. Example: ['os_roles_lb_member'].
+ :param status_method: The service client method that will provide
+ the object status for a status change waiter.
+ :param obj_id: The ID of the object to check for the expected status
+ update.
+ :param args: Any positional parameters needed by the method.
+ :param kwargs: Any named parameters needed by the method.
+ :raises AssertionError: Raised if the RBAC tests fail.
+ :raises Forbidden: Raised if a credential that should have access does
+ not and is denied.
+ :raises InvalidScope: Raised if a credential that should have the
+ correct scope for access is denied.
+ :returns: None on success
+ """
+ self._CUD_RBAC_enforcement(client_str, method_str, expected_allowed,
+ status_method, obj_id, *args, **kwargs)
+
+ def check_delete_RBAC_enforcement(
+ self, client_str, method_str, expected_allowed,
+ status_method=None, obj_id=None, *args, **kwargs):
+ """Test an API delete call RBAC enforcement.
+
+ :param client_str: The service client to use for the test, without the
+ credential. Example: 'amphora_client'
+ :param method_str: The method on the client to call for the test.
+ Example: 'list_amphorae'
+ :param expected_allowed: The list of credentials expected to be
+ allowed. Example: ['os_roles_lb_member'].
+ :param status_method: The service client method that will provide
+ the object status for a status change waiter.
+ :param obj_id: The ID of the object to check for the expected status
+ update.
+ :param args: Any positional parameters needed by the method.
+ :param kwargs: Any named parameters needed by the method.
+ :raises AssertionError: Raised if the RBAC tests fail.
+ :raises Forbidden: Raised if a credential that should have access does
+ not and is denied.
+ :raises InvalidScope: Raised if a credential that should have the
+ correct scope for access is denied.
+ :returns: None on success
+ """
+ self._CUD_RBAC_enforcement(client_str, method_str, expected_allowed,
+ status_method, obj_id, *args, **kwargs)
+
+ def check_update_RBAC_enforcement(
+ self, client_str, method_str, expected_allowed,
+ status_method=None, obj_id=None, *args, **kwargs):
+ """Test an API update call RBAC enforcement.
+
+ :param client_str: The service client to use for the test, without the
+ credential. Example: 'amphora_client'
+ :param method_str: The method on the client to call for the test.
+ Example: 'list_amphorae'
+ :param expected_allowed: The list of credentials expected to be
+ allowed. Example: ['os_roles_lb_member'].
+ :param status_method: The service client method that will provide
+ the object status for a status change waiter.
+ :param obj_id: The ID of the object to check for the expected status
+ update.
+ :param args: Any positional parameters needed by the method.
+ :param kwargs: Any named parameters needed by the method.
+ :raises AssertionError: Raised if the RBAC tests fail.
+ :raises Forbidden: Raised if a credential that should have access does
+ not and is denied.
+ :raises InvalidScope: Raised if a credential that should have the
+ correct scope for access is denied.
+ :returns: None on success
+ """
+ self._CUD_RBAC_enforcement(client_str, method_str, expected_allowed,
+ status_method, obj_id, *args, **kwargs)
+
+ def check_list_RBAC_enforcement_count(
+ self, client_str, method_str, expected_allowed, expected_count,
+ *args, **kwargs):
+ """Test an API list call RBAC enforcement result count.
+
+ List APIs will return the object list for the project associated
+ with the token used to access the API. This means most credentials
+ will have access, but will get differing results.
+
+ This test will query the list API using a list of credentials and
+ will validate that only the expected count of results are returned.
+
+ :param client_str: The service client to use for the test, without the
+ credential. Example: 'amphora_client'
+ :param method_str: The method on the client to call for the test.
+ Example: 'list_amphorae'
+ :param expected_allowed: The list of credentials expected to be
+ allowed. Example: ['os_roles_lb_member'].
+ :param expected_count: The number of results expected in the list
+ returned from the API.
+ :param args: Any positional parameters needed by the method.
+ :param kwargs: Any named parameters needed by the method.
+ :raises AssertionError: Raised if the RBAC tests fail.
+ :raises Forbidden: Raised if a credential that should have access does
+ not and is denied.
+ :raises InvalidScope: Raised if a credential that should have the
+ correct scope for access is denied.
+ :returns: None on success
+ """
+
+ allowed_list = copy.deepcopy(expected_allowed)
+ # os_admin is a special case as it is valid with the old defaults,
+ # but will not be with the new defaults and/or token scoping.
+ # The old keystone role "admin" becomes project scoped "admin"
+ # instead of being a global admin.
+ # To keep the tests simple, handle that edge case here.
+ # TODO(johnsom) Once token scope is default, remove this.
+ if ('os_system_admin' in expected_allowed and
+ not CONF.load_balancer.enforce_new_defaults and
+ not CONF.enforce_scope.octavia):
+ allowed_list.append('os_admin')
+
+ for cred in allowed_list:
+ try:
+ cred_obj = getattr(self, cred)
+ except AttributeError:
+ # TODO(johnsom) Remove once scoped tokens is the default.
+ if ((cred == 'os_system_admin' or cred == 'os_system_reader')
+ and not CONF.enforce_scope.octavia):
+ LOG.info('Skipping %s allowed RBAC test because '
+ 'enforce_scope.octavia is not True', cred)
+ continue
+ else:
+ self.fail('Credential {} "expected_allowed" for RBAC '
+ 'testing was not created by tempest '
+ 'credentials setup. This is likely a bug in the '
+ 'test.'.format(cred))
+ client = getattr(cred_obj, client_str)
+ method = getattr(client, method_str)
+ try:
+ result = method(*args, **kwargs)
+ except exceptions.Forbidden as e:
+ self.fail('Method {}.{} failed to allow access via RBAC using '
+ 'credential {}. Error: {}'.format(
+ client_str, method_str, cred, str(e)))
+ self.assertEqual(expected_count, len(result), message='Credential '
+ '{} saw {} objects when {} was expected.'.format(
+ cred, len(result), expected_count))
+
+ def check_list_IDs_RBAC_enforcement(
+ self, client_str, method_str, expected_allowed, expected_ids,
+ *args, **kwargs):
+ """Test an API list call RBAC enforcement result contains IDs.
+
+ List APIs will return the object list for the project associated
+ with the token used to access the API. This means most credentials
+ will have access, but will get differing results.
+
+ This test will query the list API using a list of credentials and
+ will validate that the expected object Ids in included in the results.
+
+ :param client_str: The service client to use for the test, without the
+ credential. Example: 'amphora_client'
+ :param method_str: The method on the client to call for the test.
+ Example: 'list_amphorae'
+ :param expected_allowed: The list of credentials expected to be
+ allowed. Example: ['os_roles_lb_member'].
+ :param expected_ids: The list of object IDs to validate are included
+ in the returned list from the API.
+ :param args: Any positional parameters needed by the method.
+ :param kwargs: Any named parameters needed by the method.
+ :raises AssertionError: Raised if the RBAC tests fail.
+ :raises Forbidden: Raised if a credential that should have access does
+ not and is denied.
+ :raises InvalidScope: Raised if a credential that should have the
+ correct scope for access is denied.
+ :returns: None on success
+ """
+
+ allowed_list = copy.deepcopy(expected_allowed)
+ # os_admin is a special case as it is valid with the old defaults,
+ # but will not be with the new defaults and/or token scoping.
+ # The old keystone role "admin" becomes project scoped "admin"
+ # instead of being a global admin.
+ # To keep the tests simple, handle that edge case here.
+ # TODO(johnsom) Once token scope is default, remove this.
+ if ('os_system_admin' in expected_allowed and
+ not CONF.load_balancer.enforce_new_defaults and
+ not CONF.enforce_scope.octavia):
+ allowed_list.append('os_admin')
+
+ for cred in allowed_list:
+ try:
+ cred_obj = getattr(self, cred)
+ except AttributeError:
+ # TODO(johnsom) Remove once scoped tokens is the default.
+ if ((cred == 'os_system_admin' or cred == 'os_system_reader')
+ and not CONF.enforce_scope.octavia):
+ LOG.info('Skipping %s allowed RBAC test because '
+ 'enforce_scope.octavia is not True', cred)
+ continue
+ else:
+ self.fail('Credential {} "expected_allowed" for RBAC '
+ 'testing was not created by tempest '
+ 'credentials setup. This is likely a bug in the '
+ 'test.'.format(cred))
+ client = getattr(cred_obj, client_str)
+ method = getattr(client, method_str)
+ try:
+ result = method(*args, **kwargs)
+ except exceptions.Forbidden as e:
+ self.fail('Method {}.{} failed to allow access via RBAC using '
+ 'credential {}. Error: {}'.format(
+ client_str, method_str, cred, str(e)))
+ result_ids = [lb[constants.ID] for lb in result]
+ self.assertTrue(set(expected_ids).issubset(set(result_ids)))
diff --git a/octavia_tempest_plugin/tests/api/v2/test_amphora.py b/octavia_tempest_plugin/tests/api/v2/test_amphora.py
index 146096d..a1a6441 100644
--- a/octavia_tempest_plugin/tests/api/v2/test_amphora.py
+++ b/octavia_tempest_plugin/tests/api/v2/test_amphora.py
@@ -20,7 +20,6 @@
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
-from tempest.lib import exceptions
from octavia_tempest_plugin.common import constants as const
from octavia_tempest_plugin.tests import test_base
@@ -90,12 +89,17 @@
CONF.load_balancer.lb_build_interval,
CONF.load_balancer.lb_build_timeout)
- # Test that a user, without the load balancer member role, cannot
- # list amphorae
+ # Test RBAC for list amphorae
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.amphora_client.list_amphorae)
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+ if expected_allowed:
+ self.check_list_RBAC_enforcement(
+ 'amphora_client', 'list_amphorae', expected_allowed)
# Get an actual list of the amphorae
amphorae = self.lb_admin_amphora_client.list_amphorae()
@@ -112,13 +116,11 @@
self.assertEqual(self._expected_amp_count(amphorae), len(amphorae))
self.assertEqual(self.lb_id, amphorae[0][const.LOADBALANCER_ID])
- # Test that a different user, with load balancer member role, cannot
- # see this amphora
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.amphora_client
- self.assertRaises(exceptions.Forbidden,
- member2_client.show_amphora,
- amphora_id=amphorae[0][const.ID])
+ # Test RBAC for show amphora
+ if expected_allowed:
+ self.check_show_RBAC_enforcement(
+ 'amphora_client', 'show_amphora', expected_allowed,
+ amphora_id=amphorae[0][const.ID])
show_amphora_response_fields = const.SHOW_AMPHORA_RESPONSE_FIELDS
if self.lb_admin_amphora_client.is_version_supported(
@@ -175,13 +177,18 @@
loadbalancer_id=const.LOADBALANCER_ID, lb_id=self.lb_id))
amphora_1 = amphorae[0]
- # Test that a user without the load balancer admin role cannot
- # create a flavor
+ # Test RBAC for update an amphora
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.amphora_client.update_amphora_config,
- amphora_1[const.ID])
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+ if expected_allowed:
+ self.check_update_RBAC_enforcement(
+ 'amphora_client', 'update_amphora_config', expected_allowed,
+ None, None, amphora_1[const.ID])
self.lb_admin_amphora_client.update_amphora_config(amphora_1[const.ID])
@@ -205,15 +212,18 @@
loadbalancer_id=const.LOADBALANCER_ID, lb_id=self.lb_id))
amphora_1 = amphorae[0]
- # Test RBAC not authorized for non-admin role
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- self.assertRaises(exceptions.Forbidden,
- self.os_primary.amphora_client.amphora_failover,
- amphora_1[const.ID])
- self.assertRaises(
- exceptions.Forbidden,
- self.os_roles_lb_member.amphora_client.amphora_failover,
- amphora_1[const.ID])
+ # Test RBAC for failover an amphora
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+ if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+ if expected_allowed:
+ self.check_update_RBAC_enforcement(
+ 'amphora_client', 'amphora_failover', expected_allowed,
+ None, None, amphora_1[const.ID])
self.lb_admin_amphora_client.amphora_failover(amphora_1[const.ID])
diff --git a/octavia_tempest_plugin/tests/api/v2/test_availability_zone.py b/octavia_tempest_plugin/tests/api/v2/test_availability_zone.py
index 29426c2..8ea6808 100644
--- a/octavia_tempest_plugin/tests/api/v2/test_availability_zone.py
+++ b/octavia_tempest_plugin/tests/api/v2/test_availability_zone.py
@@ -104,12 +104,18 @@
self.availability_zone_profile_id}
# Test that a user without the load balancer admin role cannot
- # create an availability zone
+ # create an availability zone.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(exceptions.Forbidden,
- self.os_primary.availability_zone_client
- .create_availability_zone,
- **availability_zone_kwargs)
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+ if expected_allowed:
+ self.check_create_RBAC_enforcement(
+ 'availability_zone_client', 'create_availability_zone',
+ expected_allowed, **availability_zone_kwargs)
# Happy path
availability_zone = (
@@ -220,11 +226,26 @@
# Test that a user without the load balancer role cannot
# list availability zones.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = [
+ 'os_admin', 'os_primary', 'os_roles_lb_admin',
+ 'os_roles_lb_observer', 'os_roles_lb_global_observer',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_admin', 'os_primary', 'os_system_admin',
+ 'os_system_reader', 'os_roles_lb_observer',
+ 'os_roles_lb_global_observer',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.availability_zone_client
- .list_availability_zones)
+ expected_allowed = [
+ 'os_system_admin', 'os_system_reader', 'os_roles_lb_admin',
+ 'os_roles_lb_observer', 'os_roles_lb_global_observer',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
+ if expected_allowed:
+ self.check_list_RBAC_enforcement(
+ 'availability_zone_client', 'list_availability_zones',
+ expected_allowed)
# Check the default sort order (by ID)
availability_zones = (
@@ -359,12 +380,26 @@
# Test that a user without the load balancer role cannot
# show availability zone details.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = [
+ 'os_admin', 'os_primary', 'os_roles_lb_admin',
+ 'os_roles_lb_observer', 'os_roles_lb_global_observer',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_admin', 'os_primary', 'os_system_admin',
+ 'os_system_reader', 'os_roles_lb_observer',
+ 'os_roles_lb_global_observer',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.availability_zone_client
- .show_availability_zone,
- availability_zone[const.NAME])
+ expected_allowed = [
+ 'os_system_admin', 'os_system_reader', 'os_roles_lb_admin',
+ 'os_roles_lb_observer', 'os_roles_lb_global_observer',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
+ if expected_allowed:
+ self.check_show_RBAC_enforcement(
+ 'availability_zone_client', 'show_availability_zone',
+ expected_allowed, availability_zone[const.NAME])
result = self.mem_availability_zone_client.show_availability_zone(
availability_zone[const.NAME])
@@ -420,13 +455,18 @@
const.ENABLED: False}
# Test that a user without the load balancer role cannot
- # show availability zone details.
+ # update availability zone details.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.availability_zone_client
- .update_availability_zone,
- availability_zone[const.NAME],
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+ if expected_allowed:
+ self.check_update_RBAC_enforcement(
+ 'availability_zone_client', 'update_availability_zone',
+ expected_allowed, None, None, availability_zone[const.NAME],
**availability_zone_updated_kwargs)
updated_availability_zone = (
@@ -493,12 +533,17 @@
# Test that a user without the load balancer admin role cannot
# delete an availability zone.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.availability_zone_client
- .delete_availability_zone,
- availability_zone[const.NAME])
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+ if expected_allowed:
+ self.check_delete_RBAC_enforcement(
+ 'availability_zone_client', 'delete_availability_zone',
+ expected_allowed, None, None, availability_zone[const.NAME])
# Happy path
self.lb_admin_availability_zone_client.delete_availability_zone(
diff --git a/octavia_tempest_plugin/tests/api/v2/test_availability_zone_capabilities.py b/octavia_tempest_plugin/tests/api/v2/test_availability_zone_capabilities.py
index 4a12057..4831ede 100644
--- a/octavia_tempest_plugin/tests/api/v2/test_availability_zone_capabilities.py
+++ b/octavia_tempest_plugin/tests/api/v2/test_availability_zone_capabilities.py
@@ -15,7 +15,6 @@
from tempest import config
from tempest.lib import decorators
-from tempest.lib import exceptions
from octavia_tempest_plugin.common import constants as const
from octavia_tempest_plugin.tests import test_base
@@ -45,13 +44,17 @@
# Test that a user without the load balancer admin role cannot
# list provider availability zone capabilities.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- os_primary_capabilities_client = (
- self.os_primary.availability_zone_capabilities_client)
- self.assertRaises(
- exceptions.Forbidden,
- (os_primary_capabilities_client
- .list_availability_zone_capabilities),
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+ if expected_allowed:
+ self.check_list_RBAC_enforcement(
+ 'availability_zone_capabilities_client',
+ 'list_availability_zone_capabilities', expected_allowed,
CONF.load_balancer.provider)
# Check for an expected availability zone capability for the
diff --git a/octavia_tempest_plugin/tests/api/v2/test_availability_zone_profile.py b/octavia_tempest_plugin/tests/api/v2/test_availability_zone_profile.py
index f5e4b7e..079d125 100644
--- a/octavia_tempest_plugin/tests/api/v2/test_availability_zone_profile.py
+++ b/octavia_tempest_plugin/tests/api/v2/test_availability_zone_profile.py
@@ -75,13 +75,19 @@
}
# Test that a user without the load balancer admin role cannot
- # create an availability zone profile profile
+ # create an availability zone profile.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.availability_zone_profile_client
- .create_availability_zone_profile,
- **availability_zone_profile_kwargs)
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+ if expected_allowed:
+ self.check_create_RBAC_enforcement(
+ 'availability_zone_profile_client',
+ 'create_availability_zone_profile',
+ expected_allowed, **availability_zone_profile_kwargs)
# Happy path
availability_zone_profile = (
@@ -225,11 +231,17 @@
# Test that a user without the load balancer admin role cannot
# list availability zone profiles.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.availability_zone_profile_client
- .list_availability_zone_profiles)
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+ if expected_allowed:
+ self.check_list_RBAC_enforcement(
+ 'availability_zone_profile_client',
+ 'list_availability_zone_profiles', expected_allowed)
# Check the default sort order (by ID)
profiles = (self.lb_admin_availability_zone_profile_client
@@ -379,12 +391,18 @@
availability_zone_profile[const.ID])
# Test that a user without the load balancer admin role cannot
- # show an availability zone profile profile
+ # show an availability zone profile
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.availability_zone_profile_client
- .show_availability_zone_profile,
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+ if expected_allowed:
+ self.check_show_RBAC_enforcement(
+ 'availability_zone_profile_client',
+ 'show_availability_zone_profile', expected_allowed,
availability_zone_profile[const.ID])
result = (
@@ -475,13 +493,19 @@
}
# Test that a user without the load balancer admin role cannot
- # create an availability zone profile profile
+ # update an availability zone profile.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.availability_zone_profile_client
- .update_availability_zone_profile,
- availability_zone_profile[const.ID],
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+ if expected_allowed:
+ self.check_update_RBAC_enforcement(
+ 'availability_zone_profile_client',
+ 'update_availability_zone_profile', expected_allowed,
+ None, None, availability_zone_profile[const.ID],
**availability_zone_profile_updated_kwargs)
result = (
@@ -551,13 +575,19 @@
availability_zone_profile[const.ID])
# Test that a user without the load balancer admin role cannot
- # delete an availability zone profile profile
+ # delete an availability zone profile.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.availability_zone_profile_client
- .delete_availability_zone_profile,
- availability_zone_profile[const.ID])
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+ if expected_allowed:
+ self.check_delete_RBAC_enforcement(
+ 'availability_zone_profile_client',
+ 'delete_availability_zone_profile', expected_allowed,
+ None, None, availability_zone_profile[const.ID])
# Happy path
(self.lb_admin_availability_zone_profile_client
diff --git a/octavia_tempest_plugin/tests/api/v2/test_flavor.py b/octavia_tempest_plugin/tests/api/v2/test_flavor.py
index 4333121..c30f830 100644
--- a/octavia_tempest_plugin/tests/api/v2/test_flavor.py
+++ b/octavia_tempest_plugin/tests/api/v2/test_flavor.py
@@ -87,11 +87,18 @@
const.FLAVOR_PROFILE_ID: self.flavor_profile_id}
# Test that a user without the load balancer admin role cannot
- # create a flavor
+ # create a flavor.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(exceptions.Forbidden,
- self.os_primary.flavor_client.create_flavor,
- **flavor_kwargs)
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+ if expected_allowed:
+ self.check_create_RBAC_enforcement(
+ 'flavor_client', 'create_flavor',
+ expected_allowed, None, None, **flavor_kwargs)
# Happy path
flavor = self.lb_admin_flavor_client.create_flavor(**flavor_kwargs)
@@ -185,10 +192,25 @@
# Test that a user without the load balancer role cannot
# list flavors.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = [
+ 'os_admin', 'os_primary', 'os_roles_lb_admin',
+ 'os_roles_lb_observer', 'os_roles_lb_global_observer',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_admin', 'os_primary', 'os_system_admin',
+ 'os_system_reader', 'os_roles_lb_observer',
+ 'os_roles_lb_global_observer',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.flavor_client.list_flavors)
+ expected_allowed = [
+ 'os_system_admin', 'os_system_reader', 'os_roles_lb_admin',
+ 'os_roles_lb_observer', 'os_roles_lb_global_observer',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
+ if expected_allowed:
+ self.check_list_RBAC_enforcement(
+ 'flavor_client', 'list_flavors', expected_allowed)
# Check the default sort order (by ID)
flavors = self.mem_flavor_client.list_flavors()
@@ -299,10 +321,25 @@
# Test that a user without the load balancer role cannot
# show flavor details.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = [
+ 'os_admin', 'os_primary', 'os_roles_lb_observer',
+ 'os_roles_lb_global_observer', 'os_roles_lb_admin',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_admin', 'os_primary', 'os_system_admin',
+ 'os_system_reader', 'os_roles_lb_observer',
+ 'os_roles_lb_global_observer',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.flavor_client.show_flavor,
+ expected_allowed = [
+ 'os_system_admin', 'os_system_reader', 'os_roles_lb_admin',
+ 'os_roles_lb_observer', 'os_roles_lb_global_observer',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
+ if expected_allowed:
+ self.check_show_RBAC_enforcement(
+ 'flavor_client', 'show_flavor', expected_allowed,
flavor[const.ID])
result = self.mem_flavor_client.show_flavor(flavor[const.ID])
@@ -354,11 +391,17 @@
const.ENABLED: False}
# Test that a user without the load balancer role cannot
- # show flavor details.
+ # update flavor details.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.flavor_client.update_flavor,
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+ if expected_allowed:
+ self.check_update_RBAC_enforcement(
+ 'flavor_client', 'update_flavor', expected_allowed, None, None,
flavor[const.ID], **flavor_updated_kwargs)
updated_flavor = self.lb_admin_flavor_client.update_flavor(
@@ -413,11 +456,17 @@
# Test that a user without the load balancer admin role cannot
# delete a flavor.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.flavor_client.delete_flavor,
- flavor[const.ID])
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+ if expected_allowed:
+ self.check_delete_RBAC_enforcement(
+ 'flavor_client', 'delete_flavor', expected_allowed,
+ None, None, flavor[const.ID])
# Happy path
self.lb_admin_flavor_client.delete_flavor(flavor[const.ID])
diff --git a/octavia_tempest_plugin/tests/api/v2/test_flavor_capabilities.py b/octavia_tempest_plugin/tests/api/v2/test_flavor_capabilities.py
index 7f9da51..5b48833 100644
--- a/octavia_tempest_plugin/tests/api/v2/test_flavor_capabilities.py
+++ b/octavia_tempest_plugin/tests/api/v2/test_flavor_capabilities.py
@@ -14,7 +14,6 @@
from tempest import config
from tempest.lib import decorators
-from tempest.lib import exceptions
from octavia_tempest_plugin.common import constants as const
from octavia_tempest_plugin.tests import test_base
@@ -43,12 +42,17 @@
# Test that a user without the load balancer admin role cannot
# list provider flavor capabilities.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- os_primary_capabilities_client = (
- self.os_primary.flavor_capabilities_client)
- self.assertRaises(
- exceptions.Forbidden,
- os_primary_capabilities_client.list_flavor_capabilities,
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+ if expected_allowed:
+ self.check_list_RBAC_enforcement(
+ 'flavor_capabilities_client',
+ 'list_flavor_capabilities', expected_allowed,
CONF.load_balancer.provider)
# Check for an expected flavor capability for the configured provider
diff --git a/octavia_tempest_plugin/tests/api/v2/test_flavor_profile.py b/octavia_tempest_plugin/tests/api/v2/test_flavor_profile.py
index 8f9c7d3..eec8679 100644
--- a/octavia_tempest_plugin/tests/api/v2/test_flavor_profile.py
+++ b/octavia_tempest_plugin/tests/api/v2/test_flavor_profile.py
@@ -60,11 +60,17 @@
# Test that a user without the load balancer admin role cannot
# create a flavor profile
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.flavor_profile_client.create_flavor_profile,
- **flavor_profile_kwargs)
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+ if expected_allowed:
+ self.check_create_RBAC_enforcement(
+ 'flavor_profile_client', 'create_flavor_profile',
+ expected_allowed, None, None, **flavor_profile_kwargs)
# Happy path
flavor_profile = (
@@ -174,10 +180,17 @@
# Test that a user without the load balancer admin role cannot
# list flavor profiles.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.flavor_profile_client.list_flavor_profiles)
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+ if expected_allowed:
+ self.check_list_RBAC_enforcement(
+ 'flavor_profile_client', 'list_flavor_profiles',
+ expected_allowed)
# Check the default sort order (by ID)
profiles = self.lb_admin_flavor_profile_client.list_flavor_profiles()
@@ -295,12 +308,18 @@
flavor_profile[const.ID])
# Test that a user without the load balancer admin role cannot
- # show a flavor profile
+ # show a flavor profile.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.flavor_profile_client.show_flavor_profile,
- flavor_profile[const.ID])
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+ if expected_allowed:
+ self.check_show_RBAC_enforcement(
+ 'flavor_profile_client', 'show_flavor_profile',
+ expected_allowed, flavor_profile[const.ID])
result = (
self.lb_admin_flavor_profile_client.show_flavor_profile(
@@ -367,12 +386,19 @@
}
# Test that a user without the load balancer admin role cannot
- # create a flavor profile
+ # update a flavor profile.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.flavor_profile_client.update_flavor_profile,
- flavor_profile[const.ID], **flavor_profile_updated_kwargs)
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+ if expected_allowed:
+ self.check_update_RBAC_enforcement(
+ 'flavor_profile_client', 'update_flavor_profile',
+ expected_allowed, None, None, flavor_profile[const.ID],
+ **flavor_profile_updated_kwargs)
result = self.lb_admin_flavor_profile_client.update_flavor_profile(
flavor_profile[const.ID], **flavor_profile_updated_kwargs)
@@ -428,11 +454,17 @@
# Test that a user without the load balancer admin role cannot
# delete a flavor profile
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.flavor_profile_client.delete_flavor_profile,
- flavor_profile[const.ID])
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+ if expected_allowed:
+ self.check_delete_RBAC_enforcement(
+ 'flavor_profile_client', 'delete_flavor_profile',
+ expected_allowed, None, None, flavor_profile[const.ID])
# Happy path
self.lb_admin_flavor_profile_client.delete_flavor_profile(
diff --git a/octavia_tempest_plugin/tests/api/v2/test_healthmonitor.py b/octavia_tempest_plugin/tests/api/v2/test_healthmonitor.py
index ff07c73..0b4036d 100644
--- a/octavia_tempest_plugin/tests/api/v2/test_healthmonitor.py
+++ b/octavia_tempest_plugin/tests/api/v2/test_healthmonitor.py
@@ -277,11 +277,21 @@
# Test that a user without the loadbalancer role cannot
# create a healthmonitor
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_member']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.healthmonitor_client.create_healthmonitor,
- **hm_kwargs)
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_create_RBAC_enforcement(
+ 'healthmonitor_client', 'create_healthmonitor',
+ expected_allowed,
+ status_method=self.mem_lb_client.show_loadbalancer,
+ obj_id=self.lb_id, **hm_kwargs)
hm = self.mem_healthmonitor_client.create_healthmonitor(**hm_kwargs)
self.addCleanup(
@@ -488,6 +498,9 @@
* List the healthmonitors filtering to one of the three.
* List the healthmonitors filtered, one field, and sorted.
"""
+ # IDs of health monitors created in the test
+ test_ids = []
+
if (pool_algorithm == const.LB_ALGORITHM_SOURCE_IP_PORT and not
self.mem_listener_client.is_version_supported(
self.api_version, '2.13')):
@@ -614,6 +627,7 @@
const.ACTIVE,
CONF.load_balancer.build_interval,
CONF.load_balancer.build_timeout)
+ test_ids.append(hm1[const.ID])
# Time resolution for created_at is only to the second, and we need to
# ensure that each object has a distinct creation time. Delaying one
# second is both a simple and a reliable way to accomplish this.
@@ -658,6 +672,7 @@
const.ACTIVE,
CONF.load_balancer.build_interval,
CONF.load_balancer.build_timeout)
+ test_ids.append(hm2[const.ID])
# Time resolution for created_at is only to the second, and we need to
# ensure that each object has a distinct creation time. Delaying one
# second is both a simple and a reliable way to accomplish this.
@@ -702,19 +717,68 @@
const.ACTIVE,
CONF.load_balancer.build_interval,
CONF.load_balancer.build_timeout)
+ test_ids.append(hm3[const.ID])
- # Test that a different user cannot list healthmonitors
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.healthmonitor_client
- primary = member2_client.list_healthmonitors(
- query_params='pool_id={pool_id}'.format(pool_id=pool1_id))
- self.assertEqual(0, len(primary))
+ # Test that a different users cannot see the lb_member healthmonitors
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_primary', 'os_roles_lb_member2',
+ 'os_roles_lb_observer']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_admin', 'os_primary',
+ 'os_roles_lb_member2', 'os_roles_lb_observer',
+ 'os_roles_lb_global_observer']
+ if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
+ expected_allowed = ['os_roles_lb_observer', 'os_roles_lb_member2']
+ if expected_allowed:
+ self.check_list_RBAC_enforcement_count(
+ 'healthmonitor_client', 'list_healthmonitors',
+ expected_allowed, 0)
+
+ # Test credentials that should see these healthmonitors can see them.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_admin', 'os_roles_lb_member',
+ 'os_roles_lb_global_observer']
+ if expected_allowed:
+ self.check_list_IDs_RBAC_enforcement(
+ 'healthmonitor_client', 'list_healthmonitors',
+ expected_allowed, test_ids)
# Test that users without the lb member role cannot list healthmonitors
+ # Note: non-owners can still call this API, they will just get the list
+ # of health monitors for their project (zero). The above tests
+ # are intended to cover the cross project use case.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_primary', 'os_roles_lb_admin',
+ 'os_roles_lb_member', 'os_roles_lb_member2',
+ 'os_roles_lb_observer',
+ 'os_roles_lb_global_observer']
+ # Note: os_admin is here because it evaluaties to "project_admin"
+ # in oslo_policy and since keystone considers "project_admin"
+ # a superscope of "project_reader". This means it can read
+ # objects in the "admin" credential's project.
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_admin', 'os_primary', 'os_system_admin',
+ 'os_system_reader', 'os_roles_lb_observer',
+ 'os_roles_lb_global_observer',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.healthmonitor_client.list_healthmonitors)
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_admin', 'os_roles_lb_observer',
+ 'os_roles_lb_global_observer',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
+ if expected_allowed:
+ self.check_list_RBAC_enforcement(
+ 'healthmonitor_client', 'list_healthmonitors',
+ expected_allowed)
# Check the default sort order, created_at
hms = self.mem_healthmonitor_client.list_healthmonitors()
@@ -1125,33 +1189,24 @@
for item in equal_items:
self.assertEqual(hm_kwargs[item], hm[item])
- # Test that a user with lb_admin role can see the healthmonitor
+ # Test that the appropriate users can see or not see the health
+ # monitors based on the API RBAC.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_member']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- healthmonitor_client = self.os_roles_lb_admin.healthmonitor_client
- hm_adm = healthmonitor_client.show_healthmonitor(hm[const.ID])
- self.assertEqual(hm_name, hm_adm[const.NAME])
-
- # Test that a user with cloud admin role can see the healthmonitor
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- adm = self.os_admin.healthmonitor_client.show_healthmonitor(
- hm[const.ID])
- self.assertEqual(hm_name, adm[const.NAME])
-
- # Test that a different user, with loadbalancer member role, cannot
- # see this healthmonitor
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.healthmonitor_client
- self.assertRaises(exceptions.Forbidden,
- member2_client.show_healthmonitor,
- hm[const.ID])
-
- # Test that a user, without the loadbalancer member role, cannot
- # show healthmonitors
- if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.healthmonitor_client.show_healthmonitor,
- hm[const.ID])
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_admin',
+ 'os_roles_lb_global_observer',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_show_RBAC_enforcement(
+ 'healthmonitor_client', 'show_healthmonitor',
+ expected_allowed, hm[const.ID])
@decorators.idempotent_id('2417164b-ec03-4488-afd2-60b096dc0077')
def test_LC_HTTP_healthmonitor_update(self):
@@ -1417,27 +1472,21 @@
self.assertCountEqual(hm_kwargs[const.TAGS], hm[const.TAGS])
# Test that a user, without the loadbalancer member role, cannot
- # use this command
+ # update this healthmonitor.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_member']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.healthmonitor_client.update_healthmonitor,
- hm[const.ID], admin_state_up=True)
-
- # Assert we didn't go into PENDING_*
- hm_check = self.mem_healthmonitor_client.show_healthmonitor(
- hm[const.ID])
- self.assertEqual(const.ACTIVE,
- hm_check[const.PROVISIONING_STATUS])
- self.assertFalse(hm_check[const.ADMIN_STATE_UP])
-
- # Test that a user, without the loadbalancer member role, cannot
- # update this healthmonitor
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.healthmonitor_client
- self.assertRaises(exceptions.Forbidden,
- member2_client.update_healthmonitor,
- hm[const.ID], admin_state_up=True)
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_update_RBAC_enforcement(
+ 'healthmonitor_client', 'update_healthmonitor',
+ expected_allowed, None, None, hm[const.ID],
+ admin_state_up=True)
# Assert we didn't go into PENDING_*
hm_check = self.mem_healthmonitor_client.show_healthmonitor(
@@ -1725,21 +1774,21 @@
CONF.load_balancer.build_interval,
CONF.load_balancer.build_timeout)
- # Test that a user without the loadbalancer role cannot
- # delete this healthmonitor
+ # Test that a user without the loadbalancer role cannot delete this
+ # healthmonitor.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_member']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.healthmonitor_client.delete_healthmonitor,
- hm[const.ID])
-
- # Test that a different user, with the loadbalancer member role
- # cannot delete this healthmonitor
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.healthmonitor_client
- self.assertRaises(exceptions.Forbidden,
- member2_client.delete_healthmonitor,
- hm[const.ID])
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_delete_RBAC_enforcement(
+ 'healthmonitor_client', 'delete_healthmonitor',
+ expected_allowed, None, None, hm[const.ID])
self.mem_healthmonitor_client.delete_healthmonitor(hm[const.ID])
diff --git a/octavia_tempest_plugin/tests/api/v2/test_l7policy.py b/octavia_tempest_plugin/tests/api/v2/test_l7policy.py
index 440a533..6e1406b 100644
--- a/octavia_tempest_plugin/tests/api/v2/test_l7policy.py
+++ b/octavia_tempest_plugin/tests/api/v2/test_l7policy.py
@@ -19,7 +19,6 @@
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
-from tempest.lib import exceptions
from octavia_tempest_plugin.common import constants as const
from octavia_tempest_plugin.tests import test_base
@@ -135,11 +134,21 @@
# Test that a user without the load balancer role cannot
# create a l7policy
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_member']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.l7policy_client.create_l7policy,
- **l7policy_kwargs)
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_create_RBAC_enforcement(
+ 'l7policy_client', 'create_l7policy',
+ expected_allowed,
+ status_method=self.mem_lb_client.show_loadbalancer,
+ obj_id=self.lb_id, **l7policy_kwargs)
l7policy = self.mem_l7policy_client.create_l7policy(**l7policy_kwargs)
@@ -208,6 +217,9 @@
* List the l7policies filtering to one of the three.
* List the l7policies filtered, one field, and sorted.
"""
+ # IDs of L7 policies created in the test
+ test_ids = []
+
listener_name = data_utils.rand_name(
"lb_member_listener2_l7policy-list")
listener_kwargs = {
@@ -263,6 +275,7 @@
const.ACTIVE,
CONF.load_balancer.build_interval,
CONF.load_balancer.build_timeout)
+ test_ids.append(l7policy1[const.ID])
# Time resolution for created_at is only to the second, and we need to
# ensure that each object has a distinct creation time. Delaying one
# second is both a simple and a reliable way to accomplish this.
@@ -303,6 +316,7 @@
const.ACTIVE,
CONF.load_balancer.build_interval,
CONF.load_balancer.build_timeout)
+ test_ids.append(l7policy2[const.ID])
# Time resolution for created_at is only to the second, and we need to
# ensure that each object has a distinct creation time. Delaying one
# second is both a simple and a reliable way to accomplish this.
@@ -344,21 +358,70 @@
const.ACTIVE,
CONF.load_balancer.build_interval,
CONF.load_balancer.build_timeout)
+ test_ids.append(l7policy3[const.ID])
- # Test that a different user cannot list l7policies
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.l7policy_client
- primary = member2_client.list_l7policies(
+ # Test that a different users cannot see the lb_member l7policies
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_primary', 'os_roles_lb_member2',
+ 'os_roles_lb_observer']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_admin', 'os_primary',
+ 'os_roles_lb_member2', 'os_roles_lb_observer',
+ 'os_roles_lb_global_observer']
+ if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
+ expected_allowed = ['os_roles_lb_observer', 'os_roles_lb_member2']
+ if expected_allowed:
+ self.check_list_RBAC_enforcement_count(
+ 'l7policy_client', 'list_l7policies',
+ expected_allowed, 0)
+
+ # Test credentials that should see these l7policies can see them.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_admin', 'os_roles_lb_member',
+ 'os_roles_lb_global_observer']
+ if expected_allowed:
+ self.check_list_IDs_RBAC_enforcement(
+ 'l7policy_client', 'list_l7policies',
+ expected_allowed, test_ids,
query_params='listener_id={listener_id}'.format(
listener_id=listener_id))
- self.assertEqual(0, len(primary))
- # Test that a user without the lb member role cannot list load
- # balancers
+ # Test that users without the lb member role cannot list l7policies
+ # Note: non-owners can still call this API, they will just get the list
+ # of L7 policies for their project (zero). The above tests
+ # are intended to cover the cross project use case.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_primary', 'os_roles_lb_admin',
+ 'os_roles_lb_member', 'os_roles_lb_member2',
+ 'os_roles_lb_observer',
+ 'os_roles_lb_global_observer']
+ # Note: os_admin is here because it evaluaties to "project_admin"
+ # in oslo_policy and since keystone considers "project_admin"
+ # a superscope of "project_reader". This means it can read
+ # objects in the "admin" credential's project.
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_admin', 'os_primary', 'os_system_admin',
+ 'os_system_reader', 'os_roles_lb_observer',
+ 'os_roles_lb_global_observer',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.l7policy_client.list_l7policies)
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_admin', 'os_roles_lb_observer',
+ 'os_roles_lb_global_observer',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
+ if expected_allowed:
+ self.check_list_RBAC_enforcement(
+ 'l7policy_client', 'list_l7policies',
+ expected_allowed)
# Check the default sort order, created_at
l7policies = self.mem_l7policy_client.list_l7policies(
@@ -585,33 +648,24 @@
self.assertIsNone(l7policy.pop(const.REDIRECT_URL, None))
self.assertIsNone(l7policy.pop(const.REDIRECT_POOL_ID, None))
- # Test that a user with lb_admin role can see the l7policy
+ # Test that the appropriate users can see or not see the L7 policies
+ # based on the API RBAC.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_member']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- l7policy_client = self.os_roles_lb_admin.l7policy_client
- l7policy_adm = l7policy_client.show_l7policy(l7policy[const.ID])
- self.assertEqual(l7policy_name, l7policy_adm[const.NAME])
-
- # Test that a user with cloud admin role can see the l7policy
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- adm = self.os_admin.l7policy_client.show_l7policy(
- l7policy[const.ID])
- self.assertEqual(l7policy_name, adm[const.NAME])
-
- # Test that a different user, with load balancer member role, cannot
- # see this l7policy
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.l7policy_client
- self.assertRaises(exceptions.Forbidden,
- member2_client.show_l7policy,
- l7policy[const.ID])
-
- # Test that a user, without the load balancer member role, cannot
- # show l7policies
- if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.l7policy_client.show_l7policy,
- l7policy[const.ID])
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_admin',
+ 'os_roles_lb_global_observer',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_show_RBAC_enforcement(
+ 'l7policy_client', 'show_l7policy',
+ expected_allowed, l7policy[const.ID])
@decorators.idempotent_id('08f73b22-550b-4e5a-b3d6-2ec03251ca13')
def test_l7policy_update(self):
@@ -703,28 +757,22 @@
self.assertCountEqual(l7policy_kwargs[const.TAGS],
l7policy[const.TAGS])
- # Test that a user, without the load balancer member role, cannot
- # use this command
+ # Test that a user, without the loadbalancer member role, cannot
+ # update this L7 policy.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_member']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.l7policy_client.update_l7policy,
- l7policy[const.ID], admin_state_up=True)
-
- # Assert we didn't go into PENDING_*
- l7policy_check = self.mem_l7policy_client.show_l7policy(
- l7policy[const.ID])
- self.assertEqual(const.ACTIVE,
- l7policy_check[const.PROVISIONING_STATUS])
- self.assertFalse(l7policy_check[const.ADMIN_STATE_UP])
-
- # Test that a user, without the load balancer member role, cannot
- # update this l7policy
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.l7policy_client
- self.assertRaises(exceptions.Forbidden,
- member2_client.update_l7policy,
- l7policy[const.ID], admin_state_up=True)
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_update_RBAC_enforcement(
+ 'l7policy_client', 'update_l7policy',
+ expected_allowed, None, None, l7policy[const.ID],
+ admin_state_up=True)
# Assert we didn't go into PENDING_*
l7policy_check = self.mem_l7policy_client.show_l7policy(
@@ -820,21 +868,21 @@
CONF.load_balancer.build_interval,
CONF.load_balancer.build_timeout)
- # Test that a user without the load balancer role cannot
- # delete this l7policy
+ # Test that a user without the loadbalancer role cannot delete this
+ # L7 policy.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_member']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.l7policy_client.delete_l7policy,
- l7policy[const.ID])
-
- # Test that a different user, with the load balancer member role
- # cannot delete this l7policy
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.l7policy_client
- self.assertRaises(exceptions.Forbidden,
- member2_client.delete_l7policy,
- l7policy[const.ID])
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_delete_RBAC_enforcement(
+ 'l7policy_client', 'delete_l7policy',
+ expected_allowed, None, None, l7policy[const.ID])
self.mem_l7policy_client.delete_l7policy(l7policy[const.ID])
diff --git a/octavia_tempest_plugin/tests/api/v2/test_l7rule.py b/octavia_tempest_plugin/tests/api/v2/test_l7rule.py
index e44c9f8..adaeaad 100644
--- a/octavia_tempest_plugin/tests/api/v2/test_l7rule.py
+++ b/octavia_tempest_plugin/tests/api/v2/test_l7rule.py
@@ -19,7 +19,6 @@
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
-from tempest.lib import exceptions
from octavia_tempest_plugin.common import constants as const
from octavia_tempest_plugin.tests import test_base
@@ -141,13 +140,23 @@
const.TAGS: l7_rule_tags
})
- # Test that a user without the load balancer role cannot
- # create a l7rule
+ # Test that a user without the loadbalancer role cannot
+ # create an L7 rule.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_member']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.l7rule_client.create_l7rule,
- **l7rule_kwargs)
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_create_RBAC_enforcement(
+ 'l7rule_client', 'create_l7rule',
+ expected_allowed,
+ status_method=self.mem_lb_client.show_loadbalancer,
+ obj_id=self.lb_id, **l7rule_kwargs)
l7rule = self.mem_l7rule_client.create_l7rule(**l7rule_kwargs)
self.addClassResourceCleanup(
@@ -211,6 +220,9 @@
* List the l7rules filtering to one of the three.
* List the l7rules filtered, one field, and sorted.
"""
+ # IDs of L7 rules created in the test
+ test_ids = []
+
l7policy_name = data_utils.rand_name("lb_member_l7policy2_l7rule-list")
l7policy = self.mem_l7policy_client.create_l7policy(
name=l7policy_name, listener_id=self.listener_id,
@@ -260,6 +272,7 @@
const.ACTIVE,
CONF.load_balancer.check_interval,
CONF.load_balancer.check_timeout)
+ test_ids.append(l7rule1[const.ID])
# Time resolution for created_at is only to the second, and we need to
# ensure that each object has a distinct creation time. Delaying one
# second is both a simple and a reliable way to accomplish this.
@@ -298,6 +311,7 @@
const.ACTIVE,
CONF.load_balancer.check_interval,
CONF.load_balancer.check_timeout)
+ test_ids.append(l7rule2[const.ID])
# Time resolution for created_at is only to the second, and we need to
# ensure that each object has a distinct creation time. Delaying one
# second is both a simple and a reliable way to accomplish this.
@@ -336,23 +350,47 @@
const.ACTIVE,
CONF.load_balancer.check_interval,
CONF.load_balancer.check_timeout)
+ test_ids.append(l7rule3[const.ID])
- # Test that a different user cannot list l7rules
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.l7rule_client
- self.assertRaises(
- exceptions.Forbidden,
- member2_client.list_l7rules,
- l7policy_id)
-
- # Test that a user without the lb l7rule role cannot list load
- # balancers
+ # Test credentials that should see these L7 rules can see them.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_member']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.l7rule_client.list_l7rules,
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_admin', 'os_roles_lb_member',
+ 'os_roles_lb_global_observer']
+ if expected_allowed:
+ self.check_list_IDs_RBAC_enforcement(
+ 'l7rule_client', 'list_l7rules', expected_allowed, test_ids,
l7policy_id)
+ # Test that users without the lb member role cannot list L7 rules.
+ # Note: The parent policy ID blocks non-owners from listing
+ # L7 Rules.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ # Note: os_admin is here because it evaluaties to "project_admin"
+ # in oslo_policy and since keystone considers "project_admin"
+ # a superscope of "project_reader". This means it can read
+ # objects in the "admin" credential's project.
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_admin', 'os_system_admin',
+ 'os_system_reader', 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_admin',
+ 'os_roles_lb_global_observer',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_list_RBAC_enforcement(
+ 'l7rule_client', 'list_l7rules', expected_allowed, l7policy_id)
+
# Check the default sort order, created_at
l7rules = self.mem_l7rule_client.list_l7rules(l7policy_id)
self.assertEqual(l7rule1[const.VALUE],
@@ -521,34 +559,25 @@
for item in equal_items:
self.assertEqual(l7rule_kwargs[item], l7rule[item])
- # Test that a user with lb_admin role can see the l7rule
+ # Test that the appropriate users can see or not see the L7 rule
+ # based on the API RBAC.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_member']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- l7rule_client = self.os_roles_lb_admin.l7rule_client
- l7rule_adm = l7rule_client.show_l7rule(
- l7rule[const.ID], l7policy_id=self.l7policy_id)
- self.assertEqual(l7rule_kwargs[const.KEY], l7rule_adm[const.KEY])
-
- # Test that a user with cloud admin role can see the l7rule
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- adm = self.os_admin.l7rule_client.show_l7rule(
- l7rule[const.ID], l7policy_id=self.l7policy_id)
- self.assertEqual(l7rule_kwargs[const.KEY], adm[const.KEY])
-
- # Test that a different user, with load balancer member role, cannot
- # see this l7rule
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.l7rule_client
- self.assertRaises(exceptions.Forbidden,
- member2_client.show_l7rule,
- l7rule[const.ID], l7policy_id=self.l7policy_id)
-
- # Test that a user, without the load balancer member role, cannot
- # show l7rules
- if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.l7rule_client.show_l7rule,
- l7rule[const.ID], l7policy_id=self.l7policy_id)
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_admin',
+ 'os_roles_lb_global_observer',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_show_RBAC_enforcement(
+ 'l7rule_client', 'show_l7rule',
+ expected_allowed, l7rule[const.ID],
+ l7policy_id=self.l7policy_id)
@decorators.idempotent_id('f8cee23b-89b6-4f3a-a842-1463daf42cf7')
def test_l7rule_update(self):
@@ -618,29 +647,22 @@
for item in equal_items:
self.assertEqual(l7rule_kwargs[item], l7rule[item])
- # Test that a user, without the load balancer member role, cannot
- # use this command
+ # Test that a user, without the loadbalancer member role, cannot
+ # update this L7 rule.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_member']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.l7rule_client.update_l7rule,
- l7rule[const.ID], l7policy_id=self.l7policy_id,
- admin_state_up=True)
-
- # Assert we didn't go into PENDING_*
- l7rule_check = self.mem_l7rule_client.show_l7rule(
- l7rule[const.ID], l7policy_id=self.l7policy_id)
- self.assertEqual(const.ACTIVE, l7rule_check[const.PROVISIONING_STATUS])
- self.assertFalse(l7rule_check[const.ADMIN_STATE_UP])
-
- # Test that a user, without the load balancer member role, cannot
- # update this l7rule
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.l7rule_client
- self.assertRaises(exceptions.Forbidden,
- member2_client.update_l7rule,
- l7rule[const.ID], l7policy_id=self.l7policy_id,
- admin_state_up=True)
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_update_RBAC_enforcement(
+ 'l7rule_client', 'update_l7rule',
+ expected_allowed, None, None, l7rule[const.ID],
+ l7policy_id=self.l7policy_id, admin_state_up=True)
# Assert we didn't go into PENDING_*
l7rule_check = self.mem_l7rule_client.show_l7rule(
@@ -727,21 +749,22 @@
CONF.load_balancer.build_interval,
CONF.load_balancer.build_timeout)
- # Test that a user without the load balancer role cannot
- # delete this l7rule
+ # Test that a user without the loadbalancer role cannot delete this
+ # L7 rule.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_member']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.l7rule_client.delete_l7rule,
- l7rule[const.ID], l7policy_id=self.l7policy_id)
-
- # Test that a different user, with the load balancer member role
- # cannot delete this l7rule
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.l7rule_client
- self.assertRaises(exceptions.Forbidden,
- member2_client.delete_l7rule,
- l7rule[const.ID], l7policy_id=self.l7policy_id)
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_delete_RBAC_enforcement(
+ 'l7rule_client', 'delete_l7rule',
+ expected_allowed, None, None, l7rule[const.ID],
+ l7policy_id=self.l7policy_id)
self.mem_l7rule_client.delete_l7rule(l7rule[const.ID],
l7policy_id=self.l7policy_id)
diff --git a/octavia_tempest_plugin/tests/api/v2/test_listener.py b/octavia_tempest_plugin/tests/api/v2/test_listener.py
index c688fdd..a482dc0 100644
--- a/octavia_tempest_plugin/tests/api/v2/test_listener.py
+++ b/octavia_tempest_plugin/tests/api/v2/test_listener.py
@@ -143,13 +143,23 @@
listener_kwargs.update({const.ALLOWED_CIDRS: self.allowed_cidrs})
- # Test that a user without the load balancer role cannot
- # create a listener
+ # Test that a user without the loadbalancer role cannot
+ # create a listener.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_member']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.listener_client.create_listener,
- **listener_kwargs)
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_create_RBAC_enforcement(
+ 'listener_client', 'create_listener',
+ expected_allowed,
+ status_method=self.mem_lb_client.show_loadbalancer,
+ obj_id=self.lb_id, **listener_kwargs)
listener = self.mem_listener_client.create_listener(**listener_kwargs)
@@ -378,6 +388,9 @@
* List the listeners filtering to one of the three.
* List the listeners filtered, one field, and sorted.
"""
+ # IDs of listeners created in the test
+ test_ids = []
+
lb_name = data_utils.rand_name("lb_member_lb2_listener-list")
lb = self.mem_lb_client.create_loadbalancer(
name=lb_name, provider=CONF.load_balancer.provider,
@@ -427,6 +440,7 @@
const.ACTIVE,
CONF.load_balancer.build_interval,
CONF.load_balancer.build_timeout)
+ test_ids.append(listener1[const.ID])
# Time resolution for created_at is only to the second, and we need to
# ensure that each object has a distinct creation time. Delaying one
# second is both a simple and a reliable way to accomplish this.
@@ -465,6 +479,7 @@
const.ACTIVE,
CONF.load_balancer.build_interval,
CONF.load_balancer.build_timeout)
+ test_ids.append(listener2[const.ID])
# Time resolution for created_at is only to the second, and we need to
# ensure that each object has a distinct creation time. Delaying one
# second is both a simple and a reliable way to accomplish this.
@@ -503,6 +518,7 @@
const.ACTIVE,
CONF.load_balancer.build_interval,
CONF.load_balancer.build_timeout)
+ test_ids.append(listener3[const.ID])
if not CONF.load_balancer.test_with_noop:
# Wait for the enabled listeners to come ONLINE
@@ -517,18 +533,67 @@
CONF.load_balancer.build_interval,
CONF.load_balancer.build_timeout)
- # Test that a different user cannot list listeners
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.listener_client
- primary = member2_client.list_listeners(
- query_params='loadbalancer_id={lb_id}'.format(lb_id=lb_id))
- self.assertEqual(0, len(primary))
-
- # Test that a user without the lb member role cannot list listeners
+ # Test that a different users cannot see the lb_member listeners.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_primary', 'os_roles_lb_member2',
+ 'os_roles_lb_observer']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_admin', 'os_primary',
+ 'os_roles_lb_member2', 'os_roles_lb_observer',
+ 'os_roles_lb_global_observer']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.listener_client.list_listeners)
+ expected_allowed = ['os_roles_lb_observer', 'os_roles_lb_member2']
+ if expected_allowed:
+ self.check_list_RBAC_enforcement_count(
+ 'listener_client', 'list_listeners', expected_allowed, 0,
+ query_params='loadbalancer_id={lb_id}'.format(lb_id=lb_id))
+
+ # Test credentials that should see these listeners can see them.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_admin', 'os_roles_lb_member',
+ 'os_roles_lb_global_observer']
+ if expected_allowed:
+ self.check_list_IDs_RBAC_enforcement(
+ 'listener_client', 'list_listeners', expected_allowed,
+ test_ids,
+ query_params='loadbalancer_id={lb_id}'.format(lb_id=lb_id))
+
+ # Test that users without the lb member role cannot list listeners.
+ # Note: non-owners can still call this API, they will just get the list
+ # of health monitors for their project (zero). The above tests
+ # are intended to cover the cross project use case.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_primary', 'os_roles_lb_admin',
+ 'os_roles_lb_member', 'os_roles_lb_member2',
+ 'os_roles_lb_observer',
+ 'os_roles_lb_global_observer']
+ # Note: os_admin is here because it evaluaties to "project_admin"
+ # in oslo_policy and since keystone considers "project_admin"
+ # a superscope of "project_reader". This means it can read
+ # objects in the "admin" credential's project.
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_admin', 'os_primary', 'os_system_admin',
+ 'os_system_reader', 'os_roles_lb_observer',
+ 'os_roles_lb_global_observer',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
+ if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_admin', 'os_roles_lb_observer',
+ 'os_roles_lb_global_observer',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
+ if expected_allowed:
+ self.check_list_RBAC_enforcement(
+ 'listener_client', 'list_listeners', expected_allowed,
+ query_params='loadbalancer_id={lb_id}'.format(lb_id=lb_id))
# Check the default sort order, created_at
listeners = self.mem_listener_client.list_listeners(
@@ -786,33 +851,24 @@
self.api_version, '2.12'):
self.assertEqual(self.allowed_cidrs, listener[const.ALLOWED_CIDRS])
- # Test that a user with lb_admin role can see the listener
+ # Test that the appropriate users can see or not see the listener
+ # based on the API RBAC.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_member']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- listener_client = self.os_roles_lb_admin.listener_client
- listener_adm = listener_client.show_listener(listener[const.ID])
- self.assertEqual(listener_name, listener_adm[const.NAME])
-
- # Test that a user with cloud admin role can see the listener
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- adm = self.os_admin.listener_client.show_listener(
- listener[const.ID])
- self.assertEqual(listener_name, adm[const.NAME])
-
- # Test that a different user, with load balancer member role, cannot
- # see this listener
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.listener_client
- self.assertRaises(exceptions.Forbidden,
- member2_client.show_listener,
- listener[const.ID])
-
- # Test that a user, without the load balancer member role, cannot
- # show listeners
- if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.listener_client.show_listener,
- listener[const.ID])
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_admin',
+ 'os_roles_lb_global_observer',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_show_RBAC_enforcement(
+ 'listener_client', 'show_listener',
+ expected_allowed, listener[const.ID])
@decorators.idempotent_id('aaae0298-5778-4c7e-a27a-01549a71b319')
def test_http_listener_update(self):
diff --git a/octavia_tempest_plugin/tests/api/v2/test_load_balancer.py b/octavia_tempest_plugin/tests/api/v2/test_load_balancer.py
index 0cf3576..ba1187c 100644
--- a/octavia_tempest_plugin/tests/api/v2/test_load_balancer.py
+++ b/octavia_tempest_plugin/tests/api/v2/test_load_balancer.py
@@ -14,6 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
+import copy
import testtools
import time
from uuid import UUID
@@ -78,13 +79,25 @@
self._setup_lb_network_kwargs(lb_kwargs, ip_version, use_fixed_ip=True)
- # Test that a user without the load balancer role cannot
- # create a load balancer
+ # Test that a user without the loadbalancer role cannot
+ # create a load balancer.
+ lb_kwargs_with_project_id = copy.deepcopy(lb_kwargs)
+ lb_kwargs_with_project_id[const.PROJECT_ID] = (
+ self.os_roles_lb_member.credentials.project_id)
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_primary', 'os_roles_lb_admin',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.loadbalancer_client.create_loadbalancer,
- **lb_kwargs)
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
+ if expected_allowed:
+ self.check_create_RBAC_enforcement(
+ 'loadbalancer_client', 'create_loadbalancer',
+ expected_allowed, None, None, **lb_kwargs_with_project_id)
lb = self.mem_lb_client.create_loadbalancer(**lb_kwargs)
@@ -173,21 +186,21 @@
CONF.load_balancer.lb_build_interval,
CONF.load_balancer.lb_build_timeout)
- # Test that a user without the load balancer role cannot
- # delete this load balancer
+ # Test that a user without the loadbalancer role cannot delete this
+ # load balancer.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_member']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.loadbalancer_client.delete_loadbalancer,
- lb[const.ID])
-
- # Test that a different user, with the load balancer member role
- # cannot delete this load balancer
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.loadbalancer_client
- self.assertRaises(exceptions.Forbidden,
- member2_client.delete_loadbalancer,
- lb[const.ID])
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_delete_RBAC_enforcement(
+ 'loadbalancer_client', 'delete_loadbalancer',
+ expected_allowed, None, None, lb[const.ID])
self.mem_lb_client.delete_loadbalancer(lb[const.ID])
@@ -222,21 +235,21 @@
# TODO(johnsom) Add other objects when we have clients for them
- # Test that a user without the load balancer role cannot
- # delete this load balancer
+ # Test that a user without the loadbalancer role cannot delete this
+ # load balancer.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_member']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.loadbalancer_client.delete_loadbalancer,
- lb[const.ID], cascade=True)
-
- # Test that a different user, with the load balancer member role
- # cannot delete this load balancer
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.loadbalancer_client
- self.assertRaises(exceptions.Forbidden,
- member2_client.delete_loadbalancer,
- lb[const.ID], cascade=True)
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_delete_RBAC_enforcement(
+ 'loadbalancer_client', 'delete_loadbalancer',
+ expected_allowed, None, None, lb[const.ID], cascade=True)
self.mem_lb_client.delete_loadbalancer(lb[const.ID], cascade=True)
@@ -267,6 +280,8 @@
* List the load balancers filtering to one of the three.
* List the load balancers filtered, one field, and sorted.
"""
+ # IDs of load balancers created in the test
+ test_ids = []
# Get a list of pre-existing LBs to filter from test data
pretest_lbs = self.mem_lb_client.list_loadbalancers()
# Store their IDs for easy access
@@ -313,6 +328,7 @@
const.ONLINE,
CONF.load_balancer.check_interval,
CONF.load_balancer.check_timeout)
+ test_ids.append(lb1[const.ID])
# Time resolution for created_at is only to the second, and we need to
# ensure that each object has a distinct creation time. Delaying one
@@ -356,6 +372,7 @@
const.ONLINE,
CONF.load_balancer.check_interval,
CONF.load_balancer.check_timeout)
+ test_ids.append(lb2[const.ID])
# Time resolution for created_at is only to the second, and we need to
# ensure that each object has a distinct creation time. Delaying one
@@ -394,19 +411,67 @@
const.ACTIVE,
CONF.load_balancer.lb_build_interval,
CONF.load_balancer.lb_build_timeout)
+ test_ids.append(lb3[const.ID])
- # Test that a different user cannot list load balancers
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.loadbalancer_client
- primary = member2_client.list_loadbalancers()
- self.assertEqual(0, len(primary))
-
- # Test that a user without the lb member role cannot list load
- # balancers
+ # Test that a different users cannot see the lb_member load balancers.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_primary', 'os_roles_lb_member2',
+ 'os_roles_lb_observer']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_admin', 'os_primary',
+ 'os_roles_lb_member2', 'os_roles_lb_observer',
+ 'os_roles_lb_global_observer']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.loadbalancer_client.list_loadbalancers)
+ expected_allowed = ['os_roles_lb_observer', 'os_roles_lb_member2']
+ if expected_allowed:
+ self.check_list_RBAC_enforcement_count(
+ 'loadbalancer_client', 'list_loadbalancers',
+ expected_allowed, 0)
+
+ # Test credentials that should see these load balancers can see them.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_admin', 'os_roles_lb_member',
+ 'os_roles_lb_global_observer']
+ if expected_allowed:
+ self.check_list_IDs_RBAC_enforcement(
+ 'loadbalancer_client', 'list_loadbalancers',
+ expected_allowed, test_ids)
+
+ # Test that users without the lb member role cannot list load balancers
+ # Note: non-owners can still call this API, they will just get the list
+ # of load balancers for their project (zero). The above tests
+ # are intended to cover the cross project use case.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_primary', 'os_roles_lb_admin',
+ 'os_roles_lb_member', 'os_roles_lb_member2',
+ 'os_roles_lb_observer',
+ 'os_roles_lb_global_observer']
+ # Note: os_admin is here because it evaluaties to "project_admin"
+ # in oslo_policy and since keystone considers "project_admin"
+ # a superscope of "project_reader". This means it can read
+ # objects in the "admin" credential's project.
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_admin', 'os_primary', 'os_system_admin',
+ 'os_system_reader', 'os_roles_lb_observer',
+ 'os_roles_lb_global_observer',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
+ if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_admin', 'os_roles_lb_observer',
+ 'os_roles_lb_global_observer',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
+ if expected_allowed:
+ self.check_list_RBAC_enforcement(
+ 'loadbalancer_client', 'list_loadbalancers', expected_allowed)
# Check the default sort order, created_at
lbs = self.mem_lb_client.list_loadbalancers()
@@ -566,33 +631,24 @@
self.assertEqual(lb_kwargs[const.VIP_SUBNET_ID],
lb[const.VIP_SUBNET_ID])
- # Test that a user with lb_admin role can see the load balanacer
+ # Test that the appropriate users can see or not see the load
+ # balancer based on the API RBAC.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_member']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- lb_client = self.os_roles_lb_admin.loadbalancer_client
- lb_adm = lb_client.show_loadbalancer(lb[const.ID])
- self.assertEqual(lb_name, lb_adm[const.NAME])
-
- # Test that a user with cloud admin role can see the load balanacer
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- adm = self.os_admin.loadbalancer_client.show_loadbalancer(
- lb[const.ID])
- self.assertEqual(lb_name, adm[const.NAME])
-
- # Test that a different user, with load balancer member role, cannot
- # see this load balancer
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.loadbalancer_client
- self.assertRaises(exceptions.Forbidden,
- member2_client.show_loadbalancer,
- lb[const.ID])
-
- # Test that a user, without the load balancer member role, cannot
- # show load balancers
- if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.loadbalancer_client.show_loadbalancer,
- lb[const.ID])
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_admin',
+ 'os_roles_lb_global_observer',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_show_RBAC_enforcement(
+ 'loadbalancer_client', 'show_loadbalancer',
+ expected_allowed, lb[const.ID])
# Attempt to clean up so that one full test run doesn't start 10+
# amps before the cleanup phase fires
@@ -679,26 +735,22 @@
new_description = data_utils.arbitrary_string(size=255,
base_text='new')
- # Test that a user, without the load balancer member role, cannot
- # use this command
+ # Test that a user, without the loadbalancer member role, cannot
+ # update this load balancer.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_member']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.loadbalancer_client.update_loadbalancer,
- lb[const.ID], admin_state_up=True)
-
- # Assert we didn't go into PENDING_*
- lb_check = self.mem_lb_client.show_loadbalancer(lb[const.ID])
- self.assertEqual(const.ACTIVE, lb_check[const.PROVISIONING_STATUS])
- self.assertFalse(lb_check[const.ADMIN_STATE_UP])
-
- # Test that a user, without the load balancer member role, cannot
- # update this load balancer
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.loadbalancer_client
- self.assertRaises(exceptions.Forbidden,
- member2_client.update_loadbalancer,
- lb[const.ID], admin_state_up=True)
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_update_RBAC_enforcement(
+ 'loadbalancer_client', 'update_loadbalancer',
+ expected_allowed, None, None, lb[const.ID],
+ admin_state_up=True)
# Assert we didn't go into PENDING_*
lb_check = self.mem_lb_client.show_loadbalancer(lb[const.ID])
@@ -775,21 +827,24 @@
CONF.load_balancer.lb_build_interval,
CONF.load_balancer.lb_build_timeout)
- # Test that a user, without the load balancer member role, cannot
- # use this command
+ # Test that the appropriate users can see or not see the load
+ # balancer stats based on the API RBAC.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_member']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.loadbalancer_client.get_loadbalancer_stats,
- lb[const.ID])
-
- # Test that a different user, with the load balancer role, cannot see
- # the load balancer stats
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.loadbalancer_client
- self.assertRaises(exceptions.Forbidden,
- member2_client.get_loadbalancer_stats,
- lb[const.ID])
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_admin',
+ 'os_roles_lb_global_observer',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_show_RBAC_enforcement(
+ 'loadbalancer_client', 'get_loadbalancer_stats',
+ expected_allowed, lb[const.ID])
stats = self.mem_lb_client.get_loadbalancer_stats(lb[const.ID])
@@ -843,21 +898,24 @@
CONF.load_balancer.check_interval,
CONF.load_balancer.check_timeout)
- # Test that a user, without the load balancer member role, cannot
- # use this method
+ # Test that the appropriate users can see or not see the load
+ # balancer status based on the API RBAC.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_member']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.loadbalancer_client.get_loadbalancer_status,
- lb[const.ID])
-
- # Test that a different user, with load balancer role, cannot see
- # the load balancer status
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.loadbalancer_client
- self.assertRaises(exceptions.Forbidden,
- member2_client.get_loadbalancer_status,
- lb[const.ID])
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_admin',
+ 'os_roles_lb_global_observer',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_show_RBAC_enforcement(
+ 'loadbalancer_client', 'get_loadbalancer_status',
+ expected_allowed, lb[const.ID])
status = self.mem_lb_client.get_loadbalancer_status(lb[const.ID])
@@ -917,6 +975,20 @@
self.mem_lb_client.failover_loadbalancer,
lb[const.ID])
+ # Test that a user without the load balancer admin role cannot
+ # failover a load balancer.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+ if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin']
+ if expected_allowed:
+ self.check_update_RBAC_enforcement(
+ 'loadbalancer_client', 'failover_loadbalancer',
+ expected_allowed, None, None, lb[const.ID])
+
# Assert we didn't go into PENDING_*
lb = self.mem_lb_client.show_loadbalancer(lb[const.ID])
self.assertEqual(const.ACTIVE, lb[const.PROVISIONING_STATUS])
diff --git a/octavia_tempest_plugin/tests/api/v2/test_member.py b/octavia_tempest_plugin/tests/api/v2/test_member.py
index 66e367e..a5607da 100644
--- a/octavia_tempest_plugin/tests/api/v2/test_member.py
+++ b/octavia_tempest_plugin/tests/api/v2/test_member.py
@@ -892,6 +892,24 @@
self.os_primary.member_client.create_member,
**member_kwargs)
+ # Test that a user without the loadbalancer role cannot
+ # create a member.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_create_RBAC_enforcement(
+ 'member_client', 'create_member',
+ expected_allowed,
+ status_method=self.mem_lb_client.show_loadbalancer,
+ obj_id=self.lb_id, **member_kwargs)
+
member = self.mem_member_client.create_member(**member_kwargs)
waiters.wait_for_status(
@@ -1048,6 +1066,9 @@
* List the members filtering to one of the three.
* List the members filtered, one field, and sorted.
"""
+ # IDs of members created in the test
+ test_ids = []
+
if (algorithm == const.LB_ALGORITHM_SOURCE_IP_PORT and not
self.mem_listener_client.is_version_supported(
self.api_version, '2.13')):
@@ -1124,6 +1145,7 @@
const.ACTIVE,
CONF.load_balancer.check_interval,
CONF.load_balancer.check_timeout)
+ test_ids.append(member1[const.ID])
# Time resolution for created_at is only to the second, and we need to
# ensure that each object has a distinct creation time. Delaying one
# second is both a simple and a reliable way to accomplish this.
@@ -1162,6 +1184,7 @@
const.ACTIVE,
CONF.load_balancer.check_interval,
CONF.load_balancer.check_timeout)
+ test_ids.append(member2[const.ID])
# Time resolution for created_at is only to the second, and we need to
# ensure that each object has a distinct creation time. Delaying one
# second is both a simple and a reliable way to accomplish this.
@@ -1200,22 +1223,45 @@
const.ACTIVE,
CONF.load_balancer.check_interval,
CONF.load_balancer.check_timeout)
+ test_ids.append(member3[const.ID])
- # Test that a different user cannot list members
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.member_client
- self.assertRaises(
- exceptions.Forbidden,
- member2_client.list_members,
- pool_id)
-
- # Test that a user without the lb member role cannot list load
- # balancers
+ # Test credentials that should see these members can see them.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_member']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.member_client.list_members,
- pool_id)
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_admin', 'os_roles_lb_member',
+ 'os_roles_lb_global_observer']
+ if expected_allowed:
+ self.check_list_IDs_RBAC_enforcement(
+ 'member_client', 'list_members', expected_allowed,
+ test_ids, pool_id)
+
+ # Test that users without the lb member role cannot list members
+ # Note: The parent pool ID blocks non-owners from listing members.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ # Note: os_admin is here because it evaluaties to "project_admin"
+ # in oslo_policy and since keystone considers "project_admin"
+ # a superscope of "project_reader". This means it can read
+ # objects in the "admin" credential's project.
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_admin', 'os_system_admin',
+ 'os_system_reader', 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_admin',
+ 'os_roles_lb_global_observer',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_list_RBAC_enforcement(
+ 'member_client', 'list_members', expected_allowed, pool_id)
# Check the default sort order, created_at
members = self.mem_member_client.list_members(pool_id)
@@ -1740,34 +1786,25 @@
for item in equal_items:
self.assertEqual(member_kwargs[item], member[item])
- # Test that a user with lb_admin role can see the member
+ # Test that the appropriate users can see or not see the member
+ # based on the API RBAC.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_member']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- member_client = self.os_roles_lb_admin.member_client
- member_adm = member_client.show_member(
- member[const.ID], pool_id=pool_id)
- self.assertEqual(member_name, member_adm[const.NAME])
-
- # Test that a user with cloud admin role can see the member
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- adm = self.os_admin.member_client.show_member(
- member[const.ID], pool_id=pool_id)
- self.assertEqual(member_name, adm[const.NAME])
-
- # Test that a different user, with load balancer member role, cannot
- # see this member
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.member_client
- self.assertRaises(exceptions.Forbidden,
- member2_client.show_member,
- member[const.ID], pool_id=pool_id)
-
- # Test that a user, without the load balancer member role, cannot
- # show members
- if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.member_client.show_member,
- member[const.ID], pool_id=pool_id)
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_admin',
+ 'os_roles_lb_global_observer',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_show_RBAC_enforcement(
+ 'member_client', 'show_member',
+ expected_allowed, member[const.ID],
+ pool_id=pool_id)
@decorators.idempotent_id('65680d48-1d49-4959-a7d1-677797e54f6b')
def test_HTTP_LC_alt_monitor_member_update(self):
@@ -2206,30 +2243,22 @@
for item in equal_items:
self.assertEqual(member_kwargs[item], member[item])
- # Test that a user, without the load balancer member role, cannot
- # use this command
+ # Test that a user, without the loadbalancer member role, cannot
+ # update this member.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_member']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.member_client.update_member,
- member[const.ID], pool_id=pool_id, admin_state_up=True)
-
- # Assert we didn't go into PENDING_*
- member_check = self.mem_member_client.show_member(
- member[const.ID], pool_id=pool_id)
- self.assertEqual(const.ACTIVE,
- member_check[const.PROVISIONING_STATUS])
- self.assertEqual(member_kwargs[const.ADMIN_STATE_UP],
- member_check[const.ADMIN_STATE_UP])
-
- # Test that a user, without the load balancer member role, cannot
- # update this member
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.member_client
- self.assertRaises(exceptions.Forbidden,
- member2_client.update_member,
- member[const.ID], pool_id=pool_id,
- admin_state_up=True)
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_update_RBAC_enforcement(
+ 'member_client', 'update_member',
+ expected_allowed, None, None, member[const.ID],
+ pool_id=pool_id, admin_state_up=True)
# Assert we didn't go into PENDING_*
member_check = self.mem_member_client.show_member(
@@ -2672,12 +2701,21 @@
member2_kwargs.pop(const.POOL_ID)
batch_update_list = [member2_kwargs, member3_kwargs]
- # Test that a user, without the load balancer member role, cannot
- # use this command
+ # Test that a user, without the loadbalancer member role, cannot
+ # batch update this member.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_member']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.member_client.update_members,
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_update_RBAC_enforcement(
+ 'member_client', 'update_members',
+ expected_allowed, None, None,
pool_id=pool_id, members_list=batch_update_list)
# Assert we didn't go into PENDING_*
@@ -2908,21 +2946,22 @@
CONF.load_balancer.build_interval,
CONF.load_balancer.build_timeout)
- # Test that a user without the load balancer role cannot
- # delete this member
+ # Test that a user without the loadbalancer role cannot delete this
+ # member.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_member']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.member_client.delete_member,
- member[const.ID], pool_id=pool_id)
-
- # Test that a different user, with the load balancer member role
- # cannot delete this member
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.member_client
- self.assertRaises(exceptions.Forbidden,
- member2_client.delete_member,
- member[const.ID], pool_id=pool_id)
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_delete_RBAC_enforcement(
+ 'member_client', 'delete_member',
+ expected_allowed, None, None, member[const.ID],
+ pool_id=pool_id)
self.mem_member_client.delete_member(member[const.ID],
pool_id=pool_id)
diff --git a/octavia_tempest_plugin/tests/api/v2/test_pool.py b/octavia_tempest_plugin/tests/api/v2/test_pool.py
index 3ab4892..61a17cb 100644
--- a/octavia_tempest_plugin/tests/api/v2/test_pool.py
+++ b/octavia_tempest_plugin/tests/api/v2/test_pool.py
@@ -401,13 +401,23 @@
else:
pool_kwargs[const.LOADBALANCER_ID] = self.lb_id
- # Test that a user without the load balancer role cannot
- # create a pool
+ # Test that a user without the loadbalancer role cannot
+ # create a pool.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_member']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.pool_client.create_pool,
- **pool_kwargs)
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_create_RBAC_enforcement(
+ 'pool_client', 'create_pool',
+ expected_allowed,
+ status_method=self.mem_lb_client.show_loadbalancer,
+ obj_id=self.lb_id, **pool_kwargs)
# This is a special case as the reference driver does not support
# SOURCE-IP-PORT. Since it runs with not_implemented_is_error, we must
@@ -585,6 +595,9 @@
* List the pools filtering to one of the three.
* List the pools filtered, one field, and sorted.
"""
+ # IDs of pools created in the test
+ test_ids = []
+
if (algorithm == const.LB_ALGORITHM_SOURCE_IP_PORT and not
self.mem_listener_client.is_version_supported(
self.api_version, '2.13')):
@@ -655,6 +668,7 @@
const.ACTIVE,
CONF.load_balancer.build_interval,
CONF.load_balancer.build_timeout)
+ test_ids.append(pool1[const.ID])
# Time resolution for created_at is only to the second, and we need to
# ensure that each object has a distinct creation time. Delaying one
# second is both a simple and a reliable way to accomplish this.
@@ -694,6 +708,7 @@
const.ACTIVE,
CONF.load_balancer.build_interval,
CONF.load_balancer.build_timeout)
+ test_ids.append(pool2[const.ID])
# Time resolution for created_at is only to the second, and we need to
# ensure that each object has a distinct creation time. Delaying one
# second is both a simple and a reliable way to accomplish this.
@@ -733,20 +748,68 @@
const.ACTIVE,
CONF.load_balancer.build_interval,
CONF.load_balancer.build_timeout)
+ test_ids.append(pool3[const.ID])
- # Test that a different user cannot list pools
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.pool_client
- primary = member2_client.list_pools(
- query_params='loadbalancer_id={lb_id}'.format(lb_id=lb_id))
- self.assertEqual(0, len(primary))
-
- # Test that a user without the lb member role cannot list load
- # balancers
+ # Test that a different users cannot see the lb_member pools.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_primary', 'os_roles_lb_member2',
+ 'os_roles_lb_observer']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_admin', 'os_primary',
+ 'os_roles_lb_member2', 'os_roles_lb_observer',
+ 'os_roles_lb_global_observer']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.pool_client.list_pools)
+ expected_allowed = ['os_roles_lb_observer', 'os_roles_lb_member2']
+ if expected_allowed:
+ self.check_list_RBAC_enforcement_count(
+ 'pool_client', 'list_pools', expected_allowed, 0,
+ query_params='loadbalancer_id={lb_id}'.format(lb_id=lb_id))
+
+ # Test credentials that should see these pools can see them.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_admin', 'os_roles_lb_member',
+ 'os_roles_lb_global_observer']
+ if expected_allowed:
+ self.check_list_IDs_RBAC_enforcement(
+ 'pool_client', 'list_pools', expected_allowed, test_ids,
+ query_params='loadbalancer_id={lb_id}'.format(lb_id=lb_id))
+
+ # Test that users without the lb member role cannot list pools.
+ # Note: non-owners can still call this API, they will just get the list
+ # of pools for their project (zero). The above tests
+ # are intended to cover the cross project use case.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_primary', 'os_roles_lb_admin',
+ 'os_roles_lb_member', 'os_roles_lb_member2',
+ 'os_roles_lb_observer',
+ 'os_roles_lb_global_observer']
+ # Note: os_admin is here because it evaluaties to "project_admin"
+ # in oslo_policy and since keystone considers "project_admin"
+ # a superscope of "project_reader". This means it can read
+ # objects in the "admin" credential's project.
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_admin', 'os_primary', 'os_system_admin',
+ 'os_system_reader', 'os_roles_lb_observer',
+ 'os_roles_lb_global_observer',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
+ if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_admin', 'os_roles_lb_observer',
+ 'os_roles_lb_global_observer',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
+ if expected_allowed:
+ self.check_list_RBAC_enforcement(
+ 'pool_client', 'list_pools', expected_allowed,
+ query_params='loadbalancer_id={lb_id}'.format(lb_id=lb_id))
# Check the default sort order, created_at
pools = self.mem_pool_client.list_pools(
@@ -1062,33 +1125,24 @@
self.assertEqual(const.SESSION_PERSISTENCE_SOURCE_IP,
pool[const.SESSION_PERSISTENCE][const.TYPE])
- # Test that a user with lb_admin role can see the pool
+ # Test that the appropriate users can see or not see the pool
+ # based on the API RBAC.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_member']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- pool_client = self.os_roles_lb_admin.pool_client
- pool_adm = pool_client.show_pool(pool[const.ID])
- self.assertEqual(pool_name, pool_adm[const.NAME])
-
- # Test that a user with cloud admin role can see the pool
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- adm = self.os_admin.pool_client.show_pool(
- pool[const.ID])
- self.assertEqual(pool_name, adm[const.NAME])
-
- # Test that a different user, with load balancer member role, cannot
- # see this pool
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.pool_client
- self.assertRaises(exceptions.Forbidden,
- member2_client.show_pool,
- pool[const.ID])
-
- # Test that a user, without the load balancer member role, cannot
- # show pools
- if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.pool_client.show_pool,
- pool[const.ID])
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_roles_lb_admin',
+ 'os_roles_lb_global_observer',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_show_RBAC_enforcement(
+ 'pool_client', 'show_pool',
+ expected_allowed, pool[const.ID])
@decorators.idempotent_id('d73755fe-ba3a-4248-9543-8e167a5aa7f4')
def test_HTTP_LC_pool_update(self):
@@ -1306,28 +1360,22 @@
self.assertEqual(const.SESSION_PERSISTENCE_SOURCE_IP,
pool[const.SESSION_PERSISTENCE][const.TYPE])
- # Test that a user, without the load balancer member role, cannot
- # use this command
+ # Test that a user, without the loadbalancer member role, cannot
+ # update this pool.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_member']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.pool_client.update_pool,
- pool[const.ID], admin_state_up=True)
-
- # Assert we didn't go into PENDING_*
- pool_check = self.mem_pool_client.show_pool(
- pool[const.ID])
- self.assertEqual(const.ACTIVE,
- pool_check[const.PROVISIONING_STATUS])
- self.assertFalse(pool_check[const.ADMIN_STATE_UP])
-
- # Test that a user, without the load balancer member role, cannot
- # update this pool
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.pool_client
- self.assertRaises(exceptions.Forbidden,
- member2_client.update_pool,
- pool[const.ID], admin_state_up=True)
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_update_RBAC_enforcement(
+ 'pool_client', 'update_pool',
+ expected_allowed, None, None, pool[const.ID],
+ admin_state_up=True)
# Assert we didn't go into PENDING_*
pool_check = self.mem_pool_client.show_pool(
@@ -1609,21 +1657,21 @@
CONF.load_balancer.build_interval,
CONF.load_balancer.build_timeout)
- # Test that a user without the load balancer role cannot
- # delete this pool
+ # Test that a user without the loadbalancer role cannot delete this
+ # pool.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = ['os_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_system_admin', 'os_roles_lb_member']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.pool_client.delete_pool,
- pool[const.ID])
-
- # Test that a different user, with the load balancer member role
- # cannot delete this pool
- if not CONF.load_balancer.RBAC_test_type == const.NONE:
- member2_client = self.os_roles_lb_member2.pool_client
- self.assertRaises(exceptions.Forbidden,
- member2_client.delete_pool,
- pool[const.ID])
+ expected_allowed = ['os_system_admin', 'os_roles_lb_admin',
+ 'os_roles_lb_member']
+ if expected_allowed:
+ self.check_delete_RBAC_enforcement(
+ 'pool_client', 'delete_pool',
+ expected_allowed, None, None, pool[const.ID])
self.mem_pool_client.delete_pool(pool[const.ID])
diff --git a/octavia_tempest_plugin/tests/api/v2/test_provider.py b/octavia_tempest_plugin/tests/api/v2/test_provider.py
index 917b365..f85d598 100644
--- a/octavia_tempest_plugin/tests/api/v2/test_provider.py
+++ b/octavia_tempest_plugin/tests/api/v2/test_provider.py
@@ -15,7 +15,6 @@
from oslo_log import log as logging
from tempest import config
from tempest.lib import decorators
-from tempest.lib import exceptions
from octavia_tempest_plugin.common import constants as const
from octavia_tempest_plugin.tests import test_base
@@ -44,10 +43,25 @@
# Test that a user without the load balancer role cannot
# list providers.
+ expected_allowed = []
+ if CONF.load_balancer.RBAC_test_type == const.OWNERADMIN:
+ expected_allowed = [
+ 'os_admin', 'os_primary', 'os_roles_lb_admin',
+ 'os_roles_lb_observer', 'os_roles_lb_global_observer',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
+ if CONF.load_balancer.RBAC_test_type == const.KEYSTONE_DEFAULT_ROLES:
+ expected_allowed = ['os_admin', 'os_primary', 'os_system_admin',
+ 'os_system_reader', 'os_roles_lb_observer',
+ 'os_roles_lb_global_observer',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
- self.assertRaises(
- exceptions.Forbidden,
- self.os_primary.provider_client.list_providers)
+ expected_allowed = [
+ 'os_system_admin', 'os_system_reader', 'os_roles_lb_observer',
+ 'os_roles_lb_global_observer', 'os_roles_lb_admin',
+ 'os_roles_lb_member', 'os_roles_lb_member2']
+ if expected_allowed:
+ self.check_list_RBAC_enforcement(
+ 'provider_client', 'list_providers', expected_allowed)
providers = self.mem_provider_client.list_providers()
diff --git a/octavia_tempest_plugin/tests/test_base.py b/octavia_tempest_plugin/tests/test_base.py
index d14ab58..e182d58 100644
--- a/octavia_tempest_plugin/tests/test_base.py
+++ b/octavia_tempest_plugin/tests/test_base.py
@@ -33,6 +33,7 @@
from octavia_tempest_plugin import clients
from octavia_tempest_plugin.common import cert_utils
from octavia_tempest_plugin.common import constants as const
+from octavia_tempest_plugin.tests import RBAC_tests
from octavia_tempest_plugin.tests import validators
from octavia_tempest_plugin.tests import waiters
@@ -45,15 +46,43 @@
RETRY_MAX = 5
-class LoadBalancerBaseTest(validators.ValidatorsMixin, test.BaseTestCase):
+class LoadBalancerBaseTest(validators.ValidatorsMixin,
+ RBAC_tests.RBACTestsMixin, test.BaseTestCase):
"""Base class for load balancer tests."""
- # Setup cls.os_roles_lb_member. cls.os_primary, cls.os_roles_lb_member,
- # and cls.os_roles_lb_admin credentials.
- credentials = ['admin', 'primary',
- ['lb_member', CONF.load_balancer.member_role],
- ['lb_member2', CONF.load_balancer.member_role],
- ['lb_admin', CONF.load_balancer.admin_role]]
+ if CONF.load_balancer.enforce_new_defaults:
+ credentials = [
+ 'admin', 'primary', ['lb_admin', CONF.load_balancer.admin_role],
+ ['lb_observer', CONF.load_balancer.observer_role, 'reader'],
+ ['lb_global_observer', CONF.load_balancer.global_observer_role,
+ 'reader'],
+ ['lb_member', CONF.load_balancer.member_role, 'member'],
+ ['lb_member2', CONF.load_balancer.member_role, 'member'],
+ ['lb_member_not_default_member', CONF.load_balancer.member_role]]
+ else:
+ credentials = [
+ 'admin', 'primary', ['lb_admin', CONF.load_balancer.admin_role],
+ ['lb_observer', CONF.load_balancer.observer_role, 'reader'],
+ ['lb_global_observer', CONF.load_balancer.global_observer_role,
+ 'reader'],
+ ['lb_member', CONF.load_balancer.member_role],
+ ['lb_member2', CONF.load_balancer.member_role]]
+
+ # If scope enforcement is enabled, add in the system scope credentials.
+ # The project scope is already handled by the above credentials.
+ if CONF.enforce_scope.octavia:
+ credentials.extend(['system_admin', 'system_reader'])
+
+ # A tuple of credentials that will be allocated by tempest using the
+ # 'credentials' list above. These are used to build RBAC test lists.
+ allocated_creds = []
+ for cred in credentials:
+ if isinstance(cred, list):
+ allocated_creds.append('os_roles_' + cred[0])
+ else:
+ allocated_creds.append('os_' + cred)
+ # Tests shall not mess with the list of allocated credentials
+ allocated_credentials = tuple(allocated_creds)
client_manager = clients.ManagerV2
webserver1_response = 1
@@ -102,6 +131,31 @@
cls.set_network_resources()
super(LoadBalancerBaseTest, cls).setup_credentials()
+ # Log the user roles for this test run
+ role_name_cache = {}
+ for cred in cls.credentials:
+ user_roles = []
+ if isinstance(cred, list):
+ user_name = cred[0]
+ cred_obj = getattr(cls, 'os_roles_' + cred[0])
+ else:
+ user_name = cred
+ cred_obj = getattr(cls, 'os_' + cred)
+ params = {'user.id': cred_obj.credentials.user_id,
+ 'project.id': cred_obj.credentials.project_id}
+ roles = cls.os_admin.role_assignments_client.list_role_assignments(
+ **params)['role_assignments']
+ for role in roles:
+ role_id = role['role']['id']
+ try:
+ role_name = role_name_cache[role_id]
+ except KeyError:
+ role_name = cls.os_admin.roles_v3_client.show_role(
+ role_id)['role']['name']
+ role_name_cache[role_id] = role_name
+ user_roles.append([role_name, role['scope']])
+ LOG.info("User %s has roles: %s", user_name, user_roles)
+
@classmethod
def setup_clients(cls):
"""Setup client aliases."""
diff --git a/releasenotes/notes/Add-RBAC-scoped-tokens-tests-920aa35faf4a8c9d.yaml b/releasenotes/notes/Add-RBAC-scoped-tokens-tests-920aa35faf4a8c9d.yaml
new file mode 100644
index 0000000..d2d5d23
--- /dev/null
+++ b/releasenotes/notes/Add-RBAC-scoped-tokens-tests-920aa35faf4a8c9d.yaml
@@ -0,0 +1,17 @@
+---
+features:
+ - |
+ Added API test support for keystone default roles and scoped tokens.
+issues:
+ - |
+ Currently the API tests will not pass with the
+ keystone_default_roles-policy.yaml override file. This is due to the
+ tempest framework credentials do not yet support token scopes.
+ This issue is tracked in https://bugs.launchpad.net/tempest/+bug/1917168
+ Once that bug is fixed, octavia-tempest-plugin can be updated to use the
+ required scope in the test credentials.
+upgrade:
+ - |
+ Two new tempest.conf settings enable/disable keystone default roles and
+ scoped token testing, [enforce_scope] octavia = True/False and
+ [load_balancer] enforce_new_defaults = True/False.
diff --git a/zuul.d/jobs.yaml b/zuul.d/jobs.yaml
index 20aa0b4..08263e3 100644
--- a/zuul.d/jobs.yaml
+++ b/zuul.d/jobs.yaml
@@ -456,6 +456,23 @@
- ^octavia_tempest_plugin/tests/(?!api/|\w+\.py).*
- job:
+ name: octavia-v2-dsvm-noop-api-scoped-tokens
+ parent: octavia-v2-dsvm-noop-api
+ vars:
+ devstack_local_conf:
+ post-config:
+ $OCTAVIA_CONF:
+ oslo_policy:
+ enforce_scope: True
+ enforce_new_defaults: True
+ test-config:
+ "$TEMPEST_CONFIG":
+ enforce_scope:
+ octavia: True
+ load_balancer:
+ enforce_new_defaults: True
+
+- job:
name: octavia-v2-dsvm-noop-py2-api
parent: octavia-v2-dsvm-noop-api
vars:
diff --git a/zuul.d/projects.yaml b/zuul.d/projects.yaml
index d7a5ea4..b3782b9 100644
--- a/zuul.d/projects.yaml
+++ b/zuul.d/projects.yaml
@@ -12,6 +12,7 @@
- octavia-v2-dsvm-noop-api-stable-victoria
- octavia-v2-dsvm-noop-api-stable-ussuri
- octavia-v2-dsvm-noop-api-stable-train
+ - octavia-v2-dsvm-noop-api-scoped-tokens
- octavia-v2-dsvm-scenario
- octavia-v2-dsvm-scenario-stable-victoria
- octavia-v2-dsvm-scenario-stable-ussuri
@@ -57,6 +58,7 @@
- octavia-v2-dsvm-noop-api-stable-victoria
- octavia-v2-dsvm-noop-api-stable-ussuri
- octavia-v2-dsvm-noop-api-stable-train
+ - octavia-v2-dsvm-noop-api-scoped-tokens
- octavia-v2-dsvm-scenario
- octavia-v2-dsvm-scenario-stable-victoria
- octavia-v2-dsvm-scenario-stable-ussuri