Merge "Use system_reader in identity v3 admin tests"
diff --git a/tempest/api/identity/admin/v3/test_domains.py b/tempest/api/identity/admin/v3/test_domains.py
index 80c4d1c..7291a0b 100644
--- a/tempest/api/identity/admin/v3/test_domains.py
+++ b/tempest/api/identity/admin/v3/test_domains.py
@@ -26,6 +26,27 @@
class DomainsTestJSON(base.BaseIdentityV3AdminTest):
"""Test identity domains"""
+ credentials = ['primary', 'admin', 'system_reader']
+
+ @classmethod
+ def setup_clients(cls):
+ super(DomainsTestJSON, cls).setup_clients()
+ if CONF.identity.use_system_token:
+ # Use system reader for listing/showing domains
+ cls.reader_domains_client = (
+ cls.os_system_reader.domains_client)
+ # Use system reader for showing users
+ cls.reader_users_client = (
+ cls.os_system_reader.users_v3_client)
+ # Use system reader for showing groups
+ cls.reader_groups_client = (
+ cls.os_system_reader.groups_client)
+ else:
+ # Use admin client by default
+ cls.reader_domains_client = cls.domains_client
+ cls.reader_users_client = cls.users_client
+ cls.reader_groups_client = cls.groups_client
+
@classmethod
def resource_setup(cls):
super(DomainsTestJSON, cls).resource_setup()
@@ -41,7 +62,7 @@
"""Test listing domains"""
fetched_ids = list()
# List and Verify Domains
- body = self.domains_client.list_domains()['domains']
+ body = self.reader_domains_client.list_domains()['domains']
for d in body:
fetched_ids.append(d['id'])
missing_doms = [d for d in self.setup_domains
@@ -52,7 +73,7 @@
def test_list_domains_filter_by_name(self):
"""Test listing domains filtering by name"""
params = {'name': self.setup_domains[0]['name']}
- fetched_domains = self.domains_client.list_domains(
+ fetched_domains = self.reader_domains_client.list_domains(
**params)['domains']
# Verify the filtered list is correct, domain names are unique
# so exactly one domain should be found with the provided name
@@ -64,7 +85,7 @@
def test_list_domains_filter_by_enabled(self):
"""Test listing domains filtering by enabled domains"""
params = {'enabled': True}
- fetched_domains = self.domains_client.list_domains(
+ fetched_domains = self.reader_domains_client.list_domains(
**params)['domains']
# Verify the filtered list is correct
self.assertIn(self.setup_domains[0], fetched_domains)
@@ -108,14 +129,14 @@
self.assertEqual(new_desc, updated_domain['description'])
self.assertEqual(False, updated_domain['enabled'])
# Show domain
- fetched_domain = self.domains_client.show_domain(
+ fetched_domain = self.reader_domains_client.show_domain(
domain['id'])['domain']
self.assertEqual(new_name, fetched_domain['name'])
self.assertEqual(new_desc, fetched_domain['description'])
self.assertEqual(False, fetched_domain['enabled'])
# Delete domain
self.domains_client.delete_domain(domain['id'])
- body = self.domains_client.list_domains()['domains']
+ body = self.reader_domains_client.list_domains()['domains']
domains_list = [d['id'] for d in body]
self.assertNotIn(domain['id'], domains_list)
@@ -130,11 +151,11 @@
self.delete_domain(domain['id'])
# Check the domain, its users and groups are gone
self.assertRaises(exceptions.NotFound,
- self.domains_client.show_domain, domain['id'])
+ self.reader_domains_client.show_domain, domain['id'])
self.assertRaises(exceptions.NotFound,
- self.users_client.show_user, user['id'])
+ self.reader_users_client.show_user, user['id'])
self.assertRaises(exceptions.NotFound,
- self.groups_client.show_group, group['id'])
+ self.reader_groups_client.show_group, group['id'])
@decorators.idempotent_id('036df86e-bb5d-42c0-a7c2-66b9db3a6046')
def test_create_domain_with_disabled_status(self):
diff --git a/tempest/api/identity/admin/v3/test_endpoints.py b/tempest/api/identity/admin/v3/test_endpoints.py
index f9f3e72..defdcc7 100644
--- a/tempest/api/identity/admin/v3/test_endpoints.py
+++ b/tempest/api/identity/admin/v3/test_endpoints.py
@@ -30,10 +30,21 @@
# pre-provisioned credentials provider.
force_tenant_isolation = False
+ credentials = ['primary', 'admin', 'system_reader']
+
@classmethod
def setup_clients(cls):
super(EndPointsTestJSON, cls).setup_clients()
cls.client = cls.endpoints_client
+ if CONF.identity.use_system_token:
+ # Use system reader for listing/showing endpoints
+ cls.reader_client = cls.os_system_reader.endpoints_v3_client
+ # Use system reader for showing regions
+ cls.reader_regions_client = cls.os_system_reader.regions_client
+ else:
+ # Use admin client by default
+ cls.reader_client = cls.client
+ cls.reader_regions_client = cls.regions_client
@classmethod
def resource_setup(cls):
@@ -55,7 +66,8 @@
endpoint = cls.client.create_endpoint(
service_id=cls.service_ids[i], interface=interfaces[i],
url=url, region=region_name, enabled=True)['endpoint']
- region = cls.regions_client.show_region(region_name)['region']
+ region = cls.reader_regions_client.show_region(region_name)[
+ 'region']
cls.addClassResourceCleanup(
cls.regions_client.delete_region, region['id'])
cls.addClassResourceCleanup(
@@ -81,7 +93,7 @@
def test_list_endpoints(self):
"""Test listing keystone endpoints by filters"""
# Get the list of all the endpoints.
- fetched_endpoints = self.client.list_endpoints()['endpoints']
+ fetched_endpoints = self.reader_client.list_endpoints()['endpoints']
fetched_endpoint_ids = [e['id'] for e in fetched_endpoints]
# Check that all the created endpoints are present in
# "fetched_endpoints".
@@ -93,9 +105,9 @@
', '.join(str(e) for e in missing_endpoints))
# Check that filtering endpoints by service_id works.
- fetched_endpoints_for_service = self.client.list_endpoints(
+ fetched_endpoints_for_service = self.reader_client.list_endpoints(
service_id=self.service_ids[0])['endpoints']
- fetched_endpoints_for_alt_service = self.client.list_endpoints(
+ fetched_endpoints_for_alt_service = self.reader_client.list_endpoints(
service_id=self.service_ids[1])['endpoints']
# Assert that both filters returned the correct result.
@@ -106,9 +118,9 @@
fetched_endpoints_for_alt_service[0]['id']]))
# Check that filtering endpoints by interface works.
- fetched_public_endpoints = self.client.list_endpoints(
+ fetched_public_endpoints = self.reader_client.list_endpoints(
interface='public')['endpoints']
- fetched_internal_endpoints = self.client.list_endpoints(
+ fetched_internal_endpoints = self.reader_client.list_endpoints(
interface='internal')['endpoints']
# Check that the expected endpoint_id is present per filter. [0] is
@@ -129,7 +141,7 @@
interface=interface,
url=url, region=region_name,
enabled=True)['endpoint']
- region = self.regions_client.show_region(region_name)['region']
+ region = self.reader_regions_client.show_region(region_name)['region']
self.addCleanup(self.regions_client.delete_region, region['id'])
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.client.delete_endpoint, endpoint['id'])
@@ -138,13 +150,13 @@
self.assertEqual(url, endpoint['url'])
# Checking if created endpoint is present in the list of endpoints
- fetched_endpoints = self.client.list_endpoints()['endpoints']
+ fetched_endpoints = self.reader_client.list_endpoints()['endpoints']
fetched_endpoints_id = [e['id'] for e in fetched_endpoints]
self.assertIn(endpoint['id'], fetched_endpoints_id)
# Show endpoint
fetched_endpoint = (
- self.client.show_endpoint(endpoint['id'])['endpoint'])
+ self.reader_client.show_endpoint(endpoint['id'])['endpoint'])
# Asserting if the attributes of endpoint are the same
self.assertEqual(self.service_ids[0], fetched_endpoint['service_id'])
self.assertEqual(interface, fetched_endpoint['interface'])
@@ -156,7 +168,7 @@
self.client.delete_endpoint(endpoint['id'])
# Checking whether endpoint is deleted successfully
- fetched_endpoints = self.client.list_endpoints()['endpoints']
+ fetched_endpoints = self.reader_client.list_endpoints()['endpoints']
fetched_endpoints_id = [e['id'] for e in fetched_endpoints]
self.assertNotIn(endpoint['id'], fetched_endpoints_id)
@@ -187,7 +199,8 @@
interface=interface1,
url=url1, region=region1_name,
enabled=True)['endpoint'])
- region1 = self.regions_client.show_region(region1_name)['region']
+ region1 = self.reader_regions_client.show_region(region1_name)[
+ 'region']
self.addCleanup(self.regions_client.delete_region, region1['id'])
# Updating endpoint with new values
@@ -199,7 +212,8 @@
interface=interface2,
url=url2, region=region2_name,
enabled=False)['endpoint']
- region2 = self.regions_client.show_region(region2_name)['region']
+ region2 = self.reader_regions_client.show_region(region2_name)[
+ 'region']
self.addCleanup(self.regions_client.delete_region, region2['id'])
self.addCleanup(self.client.delete_endpoint, endpoint_for_update['id'])
diff --git a/tempest/api/identity/admin/v3/test_groups.py b/tempest/api/identity/admin/v3/test_groups.py
index 96218bb..f704f02 100644
--- a/tempest/api/identity/admin/v3/test_groups.py
+++ b/tempest/api/identity/admin/v3/test_groups.py
@@ -30,6 +30,23 @@
# pre-provisioned credentials provider.
force_tenant_isolation = False
+ credentials = ['primary', 'admin', 'system_reader']
+
+ @classmethod
+ def setup_clients(cls):
+ super(GroupsV3TestJSON, cls).setup_clients()
+ if CONF.identity.use_system_token:
+ # Use system reader for listing/showing groups
+ cls.reader_groups_client = (
+ cls.os_system_reader.groups_client)
+ # Use system reader for listing user groups
+ cls.reader_users_client = (
+ cls.os_system_reader.users_v3_client)
+ else:
+ # Use admin client by default
+ cls.reader_groups_client = cls.groups_client
+ cls.reader_users_client = cls.users_client
+
@classmethod
def resource_setup(cls):
super(GroupsV3TestJSON, cls).resource_setup()
@@ -60,7 +77,7 @@
self.assertEqual(updated_group['description'], first_desc_update)
# Verify that the updated values are reflected after performing show.
- new_group = self.groups_client.show_group(group['id'])['group']
+ new_group = self.reader_groups_client.show_group(group['id'])['group']
self.assertEqual(group['id'], new_group['id'])
self.assertEqual(first_name_update, new_group['name'])
self.assertEqual(first_desc_update, new_group['description'])
@@ -94,7 +111,8 @@
self.groups_client.add_group_user(group['id'], user['id'])
# list users in group
- group_users = self.groups_client.list_group_users(group['id'])['users']
+ group_users = self.reader_groups_client.list_group_users(group['id'])[
+ 'users']
self.assertEqual(sorted(users, key=lambda k: k['name']),
sorted(group_users, key=lambda k: k['name']))
# check and delete user in group
@@ -102,7 +120,8 @@
self.groups_client.check_group_user_existence(
group['id'], user['id'])
self.groups_client.delete_group_user(group['id'], user['id'])
- group_users = self.groups_client.list_group_users(group['id'])['users']
+ group_users = self.reader_groups_client.list_group_users(group['id'])[
+ 'users']
self.assertEqual(len(group_users), 0)
@decorators.idempotent_id('64573281-d26a-4a52-b899-503cb0f4e4ec')
@@ -121,7 +140,8 @@
groups.append(group)
self.groups_client.add_group_user(group['id'], user['id'])
# list groups which user belongs to
- user_groups = self.users_client.list_user_groups(user['id'])['groups']
+ user_groups = self.reader_users_client.list_user_groups(user['id'])[
+ 'groups']
# The `membership_expires_at` attribute is present when listing user
# group memberships, and is not an attribute of the groups themselves.
# Therefore we remove it from the comparison.
@@ -146,10 +166,10 @@
# of listing all users and listing all groups are not supported,
# they need a domain filter to be specified
if CONF.identity_feature_enabled.domain_specific_drivers:
- body = self.groups_client.list_groups(
+ body = self.reader_groups_client.list_groups(
domain_id=self.domain['id'])['groups']
else:
- body = self.groups_client.list_groups()['groups']
+ body = self.reader_groups_client.list_groups()['groups']
for g in body:
fetched_ids.append(g['id'])
missing_groups = [g for g in group_ids if g not in fetched_ids]
diff --git a/tempest/api/identity/admin/v3/test_list_projects.py b/tempest/api/identity/admin/v3/test_list_projects.py
index 2135fcc..c758dfa 100644
--- a/tempest/api/identity/admin/v3/test_list_projects.py
+++ b/tempest/api/identity/admin/v3/test_list_projects.py
@@ -26,13 +26,26 @@
class BaseListProjectsTestJSON(base.BaseIdentityV3AdminTest):
+ credentials = ['primary', 'admin', 'system_reader']
+
+ @classmethod
+ def setup_clients(cls):
+ super(BaseListProjectsTestJSON, cls).setup_clients()
+ if CONF.identity.use_system_token:
+ # Use system reader for listing projects
+ cls.reader_projects_client = (
+ cls.os_system_reader.projects_client)
+ else:
+ # Use admin client by default
+ cls.reader_projects_client = cls.projects_client
+
def _list_projects_with_params(self, included, excluded, params, key):
# Validate that projects in ``included`` belongs to the projects
# returned that match ``params`` but not projects in ``excluded``
- all_projects = self.projects_client.list_projects()['projects']
+ all_projects = self.reader_projects_client.list_projects()['projects']
LOG.debug("Complete list of projects available in keystone: %s",
all_projects)
- body = self.projects_client.list_projects(params)['projects']
+ body = self.reader_projects_client.list_projects(params)['projects']
for p in included:
self.assertIn(p[key], map(lambda x: x[key], body))
for p in excluded:
@@ -75,7 +88,7 @@
def test_list_projects_with_parent(self):
"""Test listing projects with parent"""
params = {'parent_id': self.p3['parent_id']}
- fetched_projects = self.projects_client.list_projects(
+ fetched_projects = self.reader_projects_client.list_projects(
params)['projects']
self.assertNotEmpty(fetched_projects)
for project in fetched_projects:
@@ -111,10 +124,10 @@
@decorators.idempotent_id('1d830662-22ad-427c-8c3e-4ec854b0af44')
def test_list_projects(self):
"""Test listing projects"""
- list_projects = self.projects_client.list_projects()['projects']
+ list_projects = self.reader_projects_client.list_projects()['projects']
for p in [self.p1, self.p2]:
- show_project = self.projects_client.show_project(p['id'])[
+ show_project = self.reader_projects_client.show_project(p['id'])[
'project']
self.assertIn(show_project, list_projects)
diff --git a/tempest/api/identity/admin/v3/test_list_users.py b/tempest/api/identity/admin/v3/test_list_users.py
index 3884989..e8d0ff5 100644
--- a/tempest/api/identity/admin/v3/test_list_users.py
+++ b/tempest/api/identity/admin/v3/test_list_users.py
@@ -24,12 +24,25 @@
class UsersV3TestJSON(base.BaseIdentityV3AdminTest):
"""Test listing keystone users"""
+ credentials = ['primary', 'admin', 'system_reader']
+
+ @classmethod
+ def setup_clients(cls):
+ super(UsersV3TestJSON, cls).setup_clients()
+ if CONF.identity.use_system_token:
+ # Use system reader for listing users
+ cls.reader_users_client = (
+ cls.os_system_reader.users_v3_client)
+ else:
+ # Use admin client by default
+ cls.reader_users_client = cls.users_client
+
def _list_users_with_params(self, params, key, expected, not_expected):
# Helper method to list users filtered with params and
# assert the response based on expected and not_expected
# expected: user expected in the list response
# not_expected: user, which should not be present in list response
- body = self.users_client.list_users(**params)['users']
+ body = self.reader_users_client.list_users(**params)['users']
self.assertIn(expected[key], map(lambda x: x[key], body))
self.assertNotIn(not_expected[key],
map(lambda x: x[key], body))
@@ -105,13 +118,13 @@
# of listing all users and listing all groups are not supported,
# they need a domain filter to be specified
if CONF.identity_feature_enabled.domain_specific_drivers:
- body_enabled_user = self.users_client.list_users(
+ body_enabled_user = self.reader_users_client.list_users(
domain_id=self.domain_enabled_user['domain_id'])['users']
- body_non_enabled_user = self.users_client.list_users(
+ body_non_enabled_user = self.reader_users_client.list_users(
domain_id=self.non_domain_enabled_user['domain_id'])['users']
body = (body_enabled_user + body_non_enabled_user)
else:
- body = self.users_client.list_users()['users']
+ body = self.reader_users_client.list_users()['users']
fetched_ids = [u['id'] for u in body]
missing_users = [u['id'] for u in self.users
@@ -123,7 +136,7 @@
@decorators.idempotent_id('b4baa3ae-ac00-4b4e-9e27-80deaad7771f')
def test_get_user(self):
"""Get a user detail"""
- user = self.users_client.show_user(self.users[0]['id'])['user']
+ user = self.reader_users_client.show_user(self.users[0]['id'])['user']
self.assertEqual(self.users[0]['id'], user['id'])
self.assertEqual(self.users[0]['name'], user['name'])
self.assertEqual(self.alt_email, user['email'])
diff --git a/tempest/api/identity/admin/v3/test_policies.py b/tempest/api/identity/admin/v3/test_policies.py
index 2d3775a..6bce533 100644
--- a/tempest/api/identity/admin/v3/test_policies.py
+++ b/tempest/api/identity/admin/v3/test_policies.py
@@ -24,6 +24,19 @@
class PoliciesTestJSON(base.BaseIdentityV3AdminTest):
"""Test keystone policies"""
+ credentials = ['primary', 'admin', 'system_reader']
+
+ @classmethod
+ def setup_clients(cls):
+ super(PoliciesTestJSON, cls).setup_clients()
+ if CONF.identity.use_system_token:
+ # Use system reader for listing/showing policies
+ cls.reader_policies_client = (
+ cls.os_system_reader.policies_client)
+ else:
+ # Use admin client by default
+ cls.reader_policies_client = cls.policies_client
+
def _delete_policy(self, policy_id):
self.policies_client.delete_policy(policy_id)
@@ -43,7 +56,7 @@
self.addCleanup(self._delete_policy, policy['id'])
policy_ids.append(policy['id'])
# List and Verify Policies
- body = self.policies_client.list_policies()['policies']
+ body = self.reader_policies_client.list_policies()['policies']
for p in body:
fetched_ids.append(p['id'])
missing_pols = [p for p in policy_ids if p not in fetched_ids]
@@ -70,7 +83,7 @@
policy['id'], type=update_type)['policy']
self.assertIn('type', data)
# Assertion for updated value with fetched value
- fetched_policy = self.policies_client.show_policy(
+ fetched_policy = self.reader_policies_client.show_policy(
policy['id'])['policy']
self.assertIn('id', fetched_policy)
self.assertIn('blob', fetched_policy)
diff --git a/tempest/api/identity/admin/v3/test_projects.py b/tempest/api/identity/admin/v3/test_projects.py
index 3b0052c..c191955 100644
--- a/tempest/api/identity/admin/v3/test_projects.py
+++ b/tempest/api/identity/admin/v3/test_projects.py
@@ -30,6 +30,27 @@
# pre-provisioned credentials provider.
force_tenant_isolation = False
+ credentials = ['primary', 'admin', 'system_reader']
+
+ @classmethod
+ def setup_clients(cls):
+ super(ProjectsTestJSON, cls).setup_clients()
+ if CONF.identity.use_system_token:
+ # Use system reader for listing/showing projects
+ cls.reader_projects_client = (
+ cls.os_system_reader.projects_client)
+ # Use system reader for listing/showing domains
+ cls.reader_domains_client = (
+ cls.os_system_reader.domains_client)
+ # Use system reader for showing users
+ cls.reader_users_client = (
+ cls.os_system_reader.users_v3_client)
+ else:
+ # Use admin client by default
+ cls.reader_projects_client = cls.projects_client
+ cls.reader_domains_client = cls.domains_client
+ cls.reader_users_client = cls.users_client
+
@decorators.idempotent_id('0ecf465c-0dc4-4532-ab53-91ffeb74d12d')
def test_project_create_with_description(self):
"""Test creating project with a description"""
@@ -40,7 +61,7 @@
desc1 = project['description']
self.assertEqual(desc1, project_desc, 'Description should have '
'been sent in response for create')
- body = self.projects_client.show_project(project_id)['project']
+ body = self.reader_projects_client.show_project(project_id)['project']
desc2 = body['description']
self.assertEqual(desc2, project_desc, 'Description does not appear '
'to be set')
@@ -56,7 +77,7 @@
project_id = project['id']
self.assertEqual(project_name, project['name'])
self.assertEqual(domain['id'], project['domain_id'])
- body = self.projects_client.show_project(project_id)['project']
+ body = self.reader_projects_client.show_project(project_id)['project']
self.assertEqual(project_name, body['name'])
self.assertEqual(domain['id'], body['domain_id'])
@@ -97,15 +118,15 @@
# Check if the is_domain project is correctly returned by both
# project and domain APIs
- projects_list = self.projects_client.list_projects(
+ projects_list = self.reader_projects_client.list_projects(
params={'is_domain': True})['projects']
project_ids = [p['id'] for p in projects_list]
self.assertIn(project['id'], project_ids)
# The domains API return different attributes for the entity, so we
# compare the entities IDs
- domains_ids = [d['id'] for d in self.domains_client.list_domains()[
- 'domains']]
+ domains_list = self.reader_domains_client.list_domains()['domains']
+ domains_ids = [d['id'] for d in domains_list]
self.assertIn(project['id'], domains_ids)
@decorators.idempotent_id('1f66dc76-50cc-4741-a200-af984509e480')
@@ -115,7 +136,7 @@
project_id = project['id']
self.assertTrue(project['enabled'],
'Enable should be True in response')
- body = self.projects_client.show_project(project_id)['project']
+ body = self.reader_projects_client.show_project(project_id)['project']
self.assertTrue(body['enabled'], 'Enable should be True in lookup')
@decorators.idempotent_id('78f96a9c-e0e0-4ee6-a3ba-fbf6dfd03207')
@@ -124,7 +145,8 @@
project = self.setup_test_project(enabled=False)
self.assertFalse(project['enabled'],
'Enable should be False in response')
- body = self.projects_client.show_project(project['id'])['project']
+ body = self.reader_projects_client.show_project(project['id'])[
+ 'project']
self.assertFalse(body['enabled'],
'Enable should be False in lookup')
@@ -144,7 +166,8 @@
resp2_name = body['name']
self.assertNotEqual(resp1_name, resp2_name)
- body = self.projects_client.show_project(project['id'])['project']
+ body = self.reader_projects_client.show_project(project['id'])[
+ 'project']
resp3_name = body['name']
self.assertNotEqual(resp1_name, resp3_name)
@@ -166,7 +189,8 @@
resp2_desc = body['description']
self.assertNotEqual(resp1_desc, resp2_desc)
- body = self.projects_client.show_project(project['id'])['project']
+ body = self.reader_projects_client.show_project(project['id'])[
+ 'project']
resp3_desc = body['description']
self.assertNotEqual(resp1_desc, resp3_desc)
@@ -187,7 +211,8 @@
resp2_en = body['enabled']
self.assertNotEqual(resp1_en, resp2_en)
- body = self.projects_client.show_project(project['id'])['project']
+ body = self.reader_projects_client.show_project(project['id'])[
+ 'project']
resp3_en = body['enabled']
self.assertNotEqual(resp1_en, resp3_en)
@@ -217,7 +242,7 @@
self.addCleanup(self.users_client.delete_user, user['id'])
# Get User To validate the user details
- new_user_get = self.users_client.show_user(user['id'])['user']
+ new_user_get = self.reader_users_client.show_user(user['id'])['user']
# Assert response body of GET
self.assertEqual(u_name, new_user_get['name'])
self.assertEqual(u_desc, new_user_get['description'])
@@ -238,9 +263,9 @@
project = self.setup_test_project(tags=tags)
# Show and list for the project
- project_get = self.projects_client.show_project(
+ project_get = self.reader_projects_client.show_project(
project['id'])['project']
- _projects = self.projects_client.list_projects()['projects']
+ _projects = self.reader_projects_client.list_projects()['projects']
project_list = next(x for x in _projects if x['id'] == project['id'])
# Assert the expected fields exist. More fields than expected may
diff --git a/tempest/api/identity/admin/v3/test_regions.py b/tempest/api/identity/admin/v3/test_regions.py
index 870a406..f021cc2 100644
--- a/tempest/api/identity/admin/v3/test_regions.py
+++ b/tempest/api/identity/admin/v3/test_regions.py
@@ -30,10 +30,18 @@
# pre-provisioned credentials provider.
force_tenant_isolation = False
+ credentials = ['primary', 'admin', 'system_reader']
+
@classmethod
def setup_clients(cls):
super(RegionsTestJSON, cls).setup_clients()
cls.client = cls.regions_client
+ if CONF.identity.use_system_token:
+ # Use system reader for listing/showing regions
+ cls.reader_client = cls.os_system_reader.regions_client
+ else:
+ # Use admin client by default
+ cls.reader_client = cls.client
@classmethod
def resource_setup(cls):
@@ -77,13 +85,13 @@
self.assertEqual(self.setup_regions[1]['id'],
region['parent_region_id'])
# Get the details of region
- region = self.client.show_region(region['id'])['region']
+ region = self.reader_client.show_region(region['id'])['region']
self.assertEqual(r_alt_description, region['description'])
self.assertEqual(self.setup_regions[1]['id'],
region['parent_region_id'])
# Delete the region
self.client.delete_region(region['id'])
- body = self.client.list_regions()['regions']
+ body = self.reader_client.list_regions()['regions']
regions_list = [r['id'] for r in body]
self.assertNotIn(region['id'], regions_list)
@@ -104,7 +112,7 @@
@decorators.idempotent_id('d180bf99-544a-445c-ad0d-0c0d27663796')
def test_list_regions(self):
"""Test getting a list of regions"""
- fetched_regions = self.client.list_regions()['regions']
+ fetched_regions = self.reader_client.list_regions()['regions']
missing_regions =\
[e for e in self.setup_regions if e not in fetched_regions]
# Asserting List Regions response
@@ -124,7 +132,8 @@
self.addCleanup(self.client.delete_region, region['id'])
# Get the list of regions filtering with the parent_region_id
params = {'parent_region_id': self.setup_regions[0]['id']}
- fetched_regions = self.client.list_regions(params=params)['regions']
+ fetched_regions = self.reader_client.list_regions(params=params)[
+ 'regions']
# Asserting list regions response
self.assertIn(region, fetched_regions)
for r in fetched_regions:
diff --git a/tempest/api/identity/admin/v3/test_roles.py b/tempest/api/identity/admin/v3/test_roles.py
index ab96027..d1c90dc 100644
--- a/tempest/api/identity/admin/v3/test_roles.py
+++ b/tempest/api/identity/admin/v3/test_roles.py
@@ -32,6 +32,19 @@
# pre-provisioned credentials provider.
force_tenant_isolation = False
+ credentials = ['primary', 'admin', 'system_reader']
+
+ @classmethod
+ def setup_clients(cls):
+ super(RolesV3TestJSON, cls).setup_clients()
+ if CONF.identity.use_system_token:
+ # Use system reader for listing/showing roles
+ cls.reader_roles_client = (
+ cls.os_system_reader.roles_v3_client)
+ else:
+ # Use admin client by default
+ cls.reader_roles_client = cls.roles_client
+
@classmethod
def resource_setup(cls):
super(RolesV3TestJSON, cls).resource_setup()
@@ -97,11 +110,11 @@
self.assertIn('links', updated_role)
self.assertNotEqual(r_name, updated_role['name'])
- new_role = self.roles_client.show_role(role['id'])['role']
+ new_role = self.reader_roles_client.show_role(role['id'])['role']
self.assertEqual(new_name, new_role['name'])
self.assertEqual(updated_role['id'], new_role['id'])
- roles = self.roles_client.list_roles()['roles']
+ roles = self.reader_roles_client.list_roles()['roles']
self.assertIn(role['id'], [r['id'] for r in roles])
@decorators.idempotent_id('c6b80012-fe4a-498b-9ce8-eb391c05169f')
@@ -114,7 +127,7 @@
self.user_body['id'],
self.role['id'])
- roles = self.roles_client.list_user_roles_on_project(
+ roles = self.reader_roles_client.list_user_roles_on_project(
self.project['id'], self.user_body['id'])['roles']
self.assertEqual(1, len(roles))
@@ -135,7 +148,7 @@
self.roles_client.create_user_role_on_domain(
self.domain['id'], self.user_body['id'], self.role['id'])
- roles = self.roles_client.list_user_roles_on_domain(
+ roles = self.reader_roles_client.list_user_roles_on_domain(
self.domain['id'], self.user_body['id'])['roles']
self.assertEqual(1, len(roles))
@@ -155,7 +168,7 @@
self.roles_client.create_user_role_on_system(
self.user_body['id'], self.role['id'])
- roles = self.roles_client.list_user_roles_on_system(
+ roles = self.reader_roles_client.list_user_roles_on_system(
self.user_body['id'])['roles']
self.assertEqual(1, len(roles))
@@ -177,7 +190,7 @@
self.roles_client.create_group_role_on_project(
self.project['id'], self.group_body['id'], self.role['id'])
# List group roles on project
- roles = self.roles_client.list_group_roles_on_project(
+ roles = self.reader_roles_client.list_group_roles_on_project(
self.project['id'], self.group_body['id'])['roles']
self.assertEqual(1, len(roles))
@@ -210,7 +223,7 @@
self.roles_client.create_group_role_on_domain(
self.domain['id'], self.group_body['id'], self.role['id'])
- roles = self.roles_client.list_group_roles_on_domain(
+ roles = self.reader_roles_client.list_group_roles_on_domain(
self.domain['id'], self.group_body['id'])['roles']
self.assertEqual(1, len(roles))
@@ -227,7 +240,7 @@
self.roles_client.create_group_role_on_system(
self.group_body['id'], self.role['id'])
- roles = self.roles_client.list_group_roles_on_system(
+ roles = self.reader_roles_client.list_group_roles_on_system(
self.group_body['id'])['roles']
self.assertEqual(1, len(roles))
@@ -243,7 +256,7 @@
def test_list_roles(self):
"""Test listing roles"""
# Return a list of all roles
- body = self.roles_client.list_roles()['roles']
+ body = self.reader_roles_client.list_roles()['roles']
found = [role for role in body if role in self.roles]
self.assertEqual(len(found), len(self.roles))
@@ -278,7 +291,7 @@
prior_role_id, implies_role_id)
# Show the inference rule and check its elements
- resp_body = self.roles_client.show_role_inference_rule(
+ resp_body = self.reader_roles_client.show_role_inference_rule(
prior_role_id, implies_role_id)
self.assertIn('role_inference', resp_body)
role_inference = resp_body['role_inference']
@@ -293,7 +306,7 @@
# Check if the inference rule no longer exists
self.assertRaises(
lib_exc.NotFound,
- self.roles_client.show_role_inference_rule,
+ self.reader_roles_client.show_role_inference_rule,
prior_role_id,
implies_role_id)
@@ -313,14 +326,14 @@
self.roles[2]['id'], self.role['id'])
# Listing inferences rules from "roles[2]" should only return "role"
- rules = self.roles_client.list_role_inferences_rules(
+ rules = self.reader_roles_client.list_role_inferences_rules(
self.roles[2]['id'])['role_inference']
self.assertEqual(1, len(rules['implies']))
self.assertEqual(self.role['id'], rules['implies'][0]['id'])
# Listing inferences rules from "roles[0]" should return "roles[1]" and
# "roles[2]" (only direct rules are listed)
- rules = self.roles_client.list_role_inferences_rules(
+ rules = self.reader_roles_client.list_role_inferences_rules(
self.roles[0]['id'])['role_inference']
implies_ids = [role['id'] for role in rules['implies']]
self.assertEqual(2, len(implies_ids))
@@ -384,13 +397,13 @@
self.roles_client.delete_role,
domain_role['id'])
- domain_roles = self.roles_client.list_roles(
+ domain_roles = self.reader_roles_client.list_roles(
domain_id=self.domain['id'])['roles']
self.assertEqual(1, len(domain_roles))
self.assertIn(domain_role, domain_roles)
self.roles_client.delete_role(domain_role['id'])
- domain_roles = self.roles_client.list_roles(
+ domain_roles = self.reader_roles_client.list_roles(
domain_id=self.domain['id'])['roles']
self.assertEmpty(domain_roles)
@@ -465,7 +478,7 @@
self._create_implied_role(
self.roles[2]['id'], self.role['id'])
- rules = self.roles_client.list_all_role_inference_rules()[
+ rules = self.reader_roles_client.list_all_role_inference_rules()[
'role_inferences']
# NOTE(jaosorior): With the work related to the define-default-roles
diff --git a/tempest/api/identity/admin/v3/test_services.py b/tempest/api/identity/admin/v3/test_services.py
index b67e175..3379c3e 100644
--- a/tempest/api/identity/admin/v3/test_services.py
+++ b/tempest/api/identity/admin/v3/test_services.py
@@ -25,11 +25,25 @@
class ServicesTestJSON(base.BaseIdentityV3AdminTest):
"""Test keystone services"""
+ credentials = ['primary', 'admin', 'system_reader']
+
+ @classmethod
+ def setup_clients(cls):
+ super(ServicesTestJSON, cls).setup_clients()
+ if CONF.identity.use_system_token:
+ # Use system reader for listing/showing services
+ cls.reader_services_client = (
+ cls.os_system_reader.identity_services_v3_client)
+ else:
+ # Use admin client by default
+ cls.reader_services_client = cls.services_client
+
def _del_service(self, service_id):
# Used for deleting the services created in this class
self.services_client.delete_service(service_id)
# Checking whether service is deleted successfully
- self.assertRaises(lib_exc.NotFound, self.services_client.show_service,
+ self.assertRaises(lib_exc.NotFound,
+ self.reader_services_client.show_service,
service_id)
@decorators.attr(type='smoke')
@@ -61,7 +75,8 @@
self.assertNotEqual(resp1_desc, resp2_desc)
# Get service
- fetched_service = self.services_client.show_service(s_id)['service']
+ fetched_service = self.reader_services_client.show_service(s_id)[
+ 'service']
resp3_desc = fetched_service['description']
self.assertEqual(resp2_desc, resp3_desc)
@@ -100,14 +115,14 @@
service_types.append(serv_type)
# List and Verify Services
- services = self.services_client.list_services()['services']
+ services = self.reader_services_client.list_services()['services']
fetched_ids = [service['id'] for service in services]
found = [s for s in fetched_ids if s in service_ids]
self.assertEqual(len(found), len(service_ids))
# Check that filtering by service type works.
for serv_type in service_types:
- fetched_services = self.services_client.list_services(
+ fetched_services = self.reader_services_client.list_services(
type=serv_type)['services']
self.assertEqual(1, len(fetched_services))
self.assertEqual(serv_type, fetched_services[0]['type'])
diff --git a/tempest/api/identity/admin/v3/test_trusts.py b/tempest/api/identity/admin/v3/test_trusts.py
index 5bd6756..d843abf 100644
--- a/tempest/api/identity/admin/v3/test_trusts.py
+++ b/tempest/api/identity/admin/v3/test_trusts.py
@@ -29,6 +29,19 @@
class TrustsV3TestJSON(base.BaseIdentityV3AdminTest):
"""Test keystone trusts"""
+ credentials = ['primary', 'admin', 'system_reader']
+
+ @classmethod
+ def setup_clients(cls):
+ super(TrustsV3TestJSON, cls).setup_clients()
+ if CONF.identity.use_system_token:
+ # Use system reader for listing trusts
+ cls.reader_trusts_client = (
+ cls.os_system_reader.trusts_client)
+ else:
+ # Use admin client by default
+ cls.reader_trusts_client = cls.trusts_client
+
@classmethod
def skip_checks(cls):
super(TrustsV3TestJSON, cls).skip_checks()
@@ -293,7 +306,7 @@
original_scope = self.os_admin.auth_provider.scope
set_scope(self.os_admin.auth_provider, 'project')
self.addCleanup(set_scope, self.os_admin.auth_provider, original_scope)
- trusts_get = self.trusts_client.list_trusts()['trusts']
+ trusts_get = self.reader_trusts_client.list_trusts()['trusts']
trusts = [t for t in trusts_get
if t['id'] == self.trust_id]
self.assertEqual(1, len(trusts))
diff --git a/tempest/api/identity/admin/v3/test_users.py b/tempest/api/identity/admin/v3/test_users.py
index 9bcbba5..1272adb 100644
--- a/tempest/api/identity/admin/v3/test_users.py
+++ b/tempest/api/identity/admin/v3/test_users.py
@@ -29,6 +29,27 @@
class UsersV3TestJSON(base.BaseIdentityV3AdminTest):
"""Test keystone users"""
+ credentials = ['primary', 'admin', 'system_reader']
+
+ @classmethod
+ def setup_clients(cls):
+ super(UsersV3TestJSON, cls).setup_clients()
+ if CONF.identity.use_system_token:
+ # Use system reader for listing/showing users
+ cls.reader_users_client = (
+ cls.os_system_reader.users_v3_client)
+ # Use system reader for showing roles
+ cls.reader_roles_client = (
+ cls.os_system_reader.roles_v3_client)
+ # Use system reader for showing projects
+ cls.reader_projects_client = (
+ cls.os_system_reader.projects_client)
+ else:
+ # Use admin client by default
+ cls.reader_users_client = cls.users_client
+ cls.reader_roles_client = cls.roles_client
+ cls.reader_projects_client = cls.projects_client
+
@classmethod
def skip_checks(cls):
super(UsersV3TestJSON, cls).skip_checks()
@@ -67,7 +88,7 @@
self.assertEqual(update_kwargs[field], updated_user[field])
# GET by id after updating
- new_user_get = self.users_client.show_user(user['id'])['user']
+ new_user_get = self.reader_users_client.show_user(user['id'])['user']
# Assert response body of GET after updation
for field in update_kwargs:
self.assertEqual(update_kwargs[field], new_user_get[field])
@@ -120,19 +141,20 @@
# Creating Role
role_body = self.setup_test_role()
- user = self.users_client.show_user(user_body['id'])['user']
- role = self.roles_client.show_role(role_body['id'])['role']
+ user = self.reader_users_client.show_user(user_body['id'])['user']
+ role = self.reader_roles_client.show_role(role_body['id'])['role']
for _ in range(2):
# Creating project so as to assign role
project_body = self.setup_test_project()
- project = self.projects_client.show_project(
+ project = self.reader_projects_client.show_project(
project_body['id'])['project']
# Assigning roles to user on project
self.roles_client.create_user_role_on_project(project['id'],
user['id'],
role['id'])
assigned_project_ids.append(project['id'])
- body = self.users_client.list_user_projects(user['id'])['projects']
+ body = self.reader_users_client.list_user_projects(user['id'])[
+ 'projects']
for i in body:
fetched_project_ids.append(i['id'])
# verifying the project ids in list
@@ -148,7 +170,7 @@
def test_get_user(self):
"""Test getting a user detail"""
user = self.setup_test_user()
- fetched_user = self.users_client.show_user(user['id'])['user']
+ fetched_user = self.reader_users_client.show_user(user['id'])['user']
self.assertEqual(user['id'], fetched_user['id'])
@testtools.skipUnless(CONF.identity_feature_enabled.security_compliance,