Switch to enabled version of identity clients
Patrole always uses (e.g.) v3 roles client to retrieve
list of roles which is bad if the v3 identity service is not enabled.
Cases like the following:
self.roles_client = test_obj.os_admin.roles_v3_client
Should be changed to:
self.roles_client = test_obj.os_admin.roles_v3_client \
if CONF.identity_feature_enabled.api_v3 \
else test_obj.os_admin.roles_client
This commit switches between the correct identity client
depending on the identity version that is enabled in tempest.conf.
The v3 client is prioritized as identity v3 is current.
This commit also corrects/improves upon some documentation errata.
Change-Id: I9a12196f11473ac4e045ae90c4321219beab7ca6
Closes-Bug: #1702980
diff --git a/patrole_tempest_plugin/rbac_policy_parser.py b/patrole_tempest_plugin/rbac_policy_parser.py
index 41871cf..254bb18 100644
--- a/patrole_tempest_plugin/rbac_policy_parser.py
+++ b/patrole_tempest_plugin/rbac_policy_parser.py
@@ -17,17 +17,17 @@
import json
import os
-from oslo_config import cfg
from oslo_log import log as logging
from oslo_policy import policy
import stevedore
from tempest.common import credentials_factory as credentials
+from tempest import config
from patrole_tempest_plugin import rbac_exceptions
from patrole_tempest_plugin.rbac_utils import RbacAuthority
-CONF = cfg.CONF
+CONF = config.CONF
LOG = logging.getLogger(__name__)
@@ -94,8 +94,10 @@
# doing an API call every time.
if not hasattr(cls, 'available_services'):
admin_mgr = credentials.AdminManager()
- services = admin_mgr.identity_services_v3_client.\
- list_services()['services']
+ services_client = (admin_mgr.identity_services_v3_client
+ if CONF.identity_feature_enabled.api_v3
+ else admin_mgr.identity_services_client)
+ services = services_client.list_services()['services']
cls.available_services = [s['name'] for s in services]
if not service or service not in cls.available_services:
diff --git a/patrole_tempest_plugin/rbac_rule_validation.py b/patrole_tempest_plugin/rbac_rule_validation.py
index 0a101d7..51b9d92 100644
--- a/patrole_tempest_plugin/rbac_rule_validation.py
+++ b/patrole_tempest_plugin/rbac_rule_validation.py
@@ -143,6 +143,9 @@
reference nested BaseTestCase attributes
:returns: True if the current RBAC role can perform the policy action else
False
+
+ :raises RbacResourceSetupFailed: if project_id or user_id are missing from
+ the Tempest test object's `auth_provider`
:raises RbacParsingException: if ``CONF.rbac.strict_policy_check`` is
enabled and the ``rule_name`` does not exist in the system
:raises skipException: if ``CONF.rbac.strict_policy_check`` is
diff --git a/patrole_tempest_plugin/rbac_utils.py b/patrole_tempest_plugin/rbac_utils.py
index 9fd8aee..9d7a807 100644
--- a/patrole_tempest_plugin/rbac_utils.py
+++ b/patrole_tempest_plugin/rbac_utils.py
@@ -76,6 +76,15 @@
rbac_role_id = None
def switch_role(self, test_obj, toggle_rbac_role=False):
+ """Switch the role used by `os_primary` Tempest credentials.
+
+ Switch the role used by `os_primary` credentials to:
+ * admin if `toggle_rbac_role` is False
+ * `CONF.rbac.rbac_test_role` if `toggle_rbac_role` is True
+
+ :param test_obj: test object of type tempest.lib.base.BaseTestCase
+ :param toggle_rbac_role: role to switch `os_primary` Tempest creds to
+ """
self.user_id = test_obj.os_primary.credentials.user_id
self.project_id = test_obj.os_primary.credentials.tenant_id
self.token = test_obj.os_primary.auth_provider.get_token()
@@ -92,20 +101,16 @@
else:
self._add_role_to_user(self.admin_role_id)
except Exception as exp:
- LOG.error(exp)
+ LOG.exception(exp)
raise
finally:
- # NOTE(felipemonteiro): These two comments below are copied from
- # tempest.api.identity.v2/v3.test_users.
- #
- # Reset auth again to verify the password restore does work.
- # Clear auth restores the original credentials and deletes
- # cached auth data.
test_obj.os_primary.auth_provider.clear_auth()
- # Fernet tokens are not subsecond aware and Keystone should only be
- # precise to the second. Sleep to ensure we are passing the second
- # boundary before attempting to authenticate. If token is of type
- # uuid, then do not sleep.
+ # Fernet tokens are not subsecond aware so sleep to ensure we are
+ # passing the second boundary before attempting to authenticate.
+ #
+ # FIXME(felipemonteiro): Rather than skipping sleep if the token
+ # is not uuid, this should instead be skipped if the token is not
+ # Fernet.
if not uuid_utils.is_uuid_like(self.token):
time.sleep(1)
test_obj.os_primary.auth_provider.set_auth()
@@ -140,6 +145,9 @@
* `switch_role` is not called with a boolean value
* `switch_role` is never called in a test file, except in tearDown
* `switch_role` is called with the same boolean value twice
+
+ If a `skipException` is thrown then this is a legitimate reason why
+ `switch_role` is not called.
"""
if not isinstance(toggle_rbac_role, bool):
raise rbac_exceptions.RbacResourceSetupFailed(
diff --git a/patrole_tempest_plugin/tests/unit/test_rbac_policy_parser.py b/patrole_tempest_plugin/tests/unit/test_rbac_policy_parser.py
index 95e6bff..7ce925a 100644
--- a/patrole_tempest_plugin/tests/unit/test_rbac_policy_parser.py
+++ b/patrole_tempest_plugin/tests/unit/test_rbac_policy_parser.py
@@ -40,10 +40,12 @@
def setUp(self):
super(RbacPolicyTest, self).setUp()
- self.mock_admin_mgr = self.patchobject(
- rbac_policy_parser, 'credentials')
- self.mock_admin_mgr.AdminManager().identity_services_v3_client.\
- list_services.return_value = self.services
+
+ m_creds = self.patchobject(rbac_policy_parser, 'credentials')
+ m_creds.AdminManager().identity_services_client.list_services.\
+ return_value = self.services
+ m_creds.AdminManager().identity_services_v3_client.list_services.\
+ return_value = self.services
current_directory = os.path.dirname(os.path.realpath(__file__))
self.custom_policy_file = os.path.join(current_directory,
@@ -71,6 +73,13 @@
if attr in dir(rbac_policy_parser.RbacPolicyParser):
delattr(rbac_policy_parser.RbacPolicyParser, attr)
+ # TODO(fm577c): Use fixture for setting/clearing CONF.
+ CONF.set_override('api_v3', True, group='identity-feature-enabled')
+ self.addCleanup(CONF.clear_override, 'api_v2',
+ group='identity-feature-enabled')
+ self.addCleanup(CONF.clear_override, 'api_v3',
+ group='identity-feature-enabled')
+
def _get_fake_policy_rule(self, name, rule):
fake_rule = mock.Mock(check=rule)
fake_rule.name = name
@@ -449,13 +458,15 @@
@mock.patch.object(rbac_policy_parser, 'policy', autospec=True)
@mock.patch.object(rbac_policy_parser.RbacPolicyParser, '_get_policy_data',
autospec=True)
+ @mock.patch.object(rbac_policy_parser, 'credentials', autospec=True)
@mock.patch.object(rbac_policy_parser, 'os', autospec=True)
- def test_discover_policy_files_with_many_invalid_one_valid(self, m_os, *a):
+ def test_discover_policy_files_with_many_invalid_one_valid(self, m_os,
+ m_creds, *args):
# Only the 3rd path is valid.
m_os.path.isfile.side_effect = [False, False, True, False]
# Ensure the outer for loop runs only once in `discover_policy_files`.
- self.mock_admin_mgr.AdminManager().identity_services_v3_client.\
+ m_creds.AdminManager().identity_services_v3_client.\
list_services.return_value = {
'services': [{'name': 'test_service'}]}
@@ -490,3 +501,94 @@
self.assertNotIn(
'test_service',
rbac_policy_parser.RbacPolicyParser.policy_files.keys())
+
+ def _test_validate_service(self, v2_services, v3_services,
+ expected_failure=False, expected_services=None):
+ with mock.patch.object(rbac_policy_parser, 'credentials') as m_creds:
+ m_creds.AdminManager().identity_services_client.list_services.\
+ return_value = v2_services
+ m_creds.AdminManager().identity_services_v3_client.list_services.\
+ return_value = v3_services
+
+ test_tenant_id = mock.sentinel.tenant_id
+ test_user_id = mock.sentinel.user_id
+
+ mock_os = self.patchobject(rbac_policy_parser, 'os')
+ mock_os.path.join.return_value = self.admin_policy_file
+
+ if not expected_services:
+ expected_services = [s['name'] for s in self.services['services']]
+
+ # Guarantee a blank slate for this test.
+ if hasattr(rbac_policy_parser.RbacPolicyParser, 'available_services'):
+ delattr(rbac_policy_parser.RbacPolicyParser,
+ 'available_services')
+
+ if expected_failure:
+ policy_parser = None
+
+ expected_exception = 'invalid_service is NOT a valid service'
+ with self.assertRaisesRegex(rbac_exceptions.RbacInvalidService,
+ expected_exception):
+ rbac_policy_parser.RbacPolicyParser(
+ test_tenant_id, test_user_id, "INVALID_SERVICE")
+ else:
+ policy_parser = rbac_policy_parser.RbacPolicyParser(
+ test_tenant_id, test_user_id, "tenant_rbac_policy")
+
+ # Check that the attribute is available at object and class levels.
+ # If initialization failed, only check at class level.
+ if policy_parser:
+ self.assertTrue(hasattr(policy_parser, 'available_services'))
+ self.assertEqual(expected_services,
+ policy_parser.available_services)
+ self.assertTrue(hasattr(rbac_policy_parser.RbacPolicyParser,
+ 'available_services'))
+ self.assertEqual(
+ expected_services,
+ rbac_policy_parser.RbacPolicyParser.available_services)
+
+ def test_validate_service(self):
+ """Positive test case to ensure ``validate_service`` works.
+
+ There are 3 possibilities:
+ 1) Identity v3 API enabled.
+ 2) Identity v2 API enabled.
+ 3) Both are enabled.
+ """
+ CONF.set_override('api_v2', True, group='identity-feature-enabled')
+ CONF.set_override('api_v3', False, group='identity-feature-enabled')
+ self._test_validate_service(self.services, [], False)
+
+ CONF.set_override('api_v2', False, group='identity-feature-enabled')
+ CONF.set_override('api_v3', True, group='identity-feature-enabled')
+ self._test_validate_service([], self.services, False)
+
+ CONF.set_override('api_v2', True, group='identity-feature-enabled')
+ CONF.set_override('api_v3', True, group='identity-feature-enabled')
+ self._test_validate_service(self.services, self.services, False)
+
+ def test_validate_service_except_invalid_service(self):
+ """Negative test case to ensure ``validate_service`` works.
+
+ There are 4 possibilities:
+ 1) Identity v3 API enabled.
+ 2) Identity v2 API enabled.
+ 3) Both are enabled.
+ 4) Neither are enabled.
+ """
+ CONF.set_override('api_v2', True, group='identity-feature-enabled')
+ CONF.set_override('api_v3', False, group='identity-feature-enabled')
+ self._test_validate_service(self.services, [], True)
+
+ CONF.set_override('api_v2', False, group='identity-feature-enabled')
+ CONF.set_override('api_v3', True, group='identity-feature-enabled')
+ self._test_validate_service([], self.services, True)
+
+ CONF.set_override('api_v2', True, group='identity-feature-enabled')
+ CONF.set_override('api_v3', True, group='identity-feature-enabled')
+ self._test_validate_service(self.services, self.services, True)
+
+ CONF.set_override('api_v2', False, group='identity-feature-enabled')
+ CONF.set_override('api_v3', False, group='identity-feature-enabled')
+ self._test_validate_service([], [], True, [])