Fixes bug 902402- Integration Testcases for Keystone - users, Roles, and tenants
Change-Id: Id191c62aae76375c7f6205f80a52d45d0c645ed7
diff --git a/tempest/services/identity/json/admin_client.py b/tempest/services/identity/json/admin_client.py
index b5aef5f..89a068e 100644
--- a/tempest/services/identity/json/admin_client.py
+++ b/tempest/services/identity/json/admin_client.py
@@ -159,6 +159,36 @@
resp, body = self.delete("tokens/%s" % token_id)
return resp, body
+ def list_users_for_tenant(self, tenant_id):
+ """List users for a Tenant"""
+ resp, body = self.get('/tenants/%s/users' % tenant_id)
+ body = json.loads(body)
+ return resp, body['users']
+
+ def create_service(self, name, type, **kwargs):
+ """Create a service"""
+ post_body = {
+ 'name': name,
+ 'type': type,
+ 'description': kwargs.get('description')}
+ post_body = json.dumps({'OS-KSADM:service': post_body})
+ resp, body = self.post('/OS-KSADM/services', post_body,
+ self.headers)
+ body = json.loads(body)
+ return resp, body['OS-KSADM:service']
+
+ def get_service(self, service_id):
+ """Get Service"""
+ url = '/OS-KSADM/services/%s' % service_id
+ resp, body = self.get(url)
+ body = json.loads(body)
+ return resp, body['OS-KSADM:service']
+
+ def delete_service(self, service_id):
+ """Delete Service"""
+ url = '/OS-KSADM/services/%s' % service_id
+ return self.delete(url)
+
class TokenClient(RestClient):
diff --git a/tempest/tests/identity/admin/test_services.py b/tempest/tests/identity/admin/test_services.py
new file mode 100644
index 0000000..d107a6e
--- /dev/null
+++ b/tempest/tests/identity/admin/test_services.py
@@ -0,0 +1,66 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012 OpenStack, LLC
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from nose.plugins.attrib import attr
+import unittest2 as unittest
+
+from tempest import exceptions
+from tempest.common.utils.data_utils import rand_name
+from tempest.tests.identity.base import BaseIdentityAdminTest
+
+
+class ServicesTest(BaseIdentityAdminTest):
+
+ def test_create_get_delete_service(self):
+ """GET Service"""
+ try:
+ #Creating a Service
+ name = rand_name('service-')
+ type = rand_name('type--')
+ description = rand_name('description-')
+ resp, service_data = \
+ self.client.create_service(name, type, description=description)
+ self.assertTrue(resp['status'].startswith('2'))
+ #Verifying response body of create service
+ self.assertTrue('id' in service_data)
+ self.assertFalse(service_data['id'] is None)
+ self.assertTrue('name' in service_data)
+ self.assertEqual(name, service_data['name'])
+ self.assertTrue('type' in service_data)
+ self.assertEqual(type, service_data['type'])
+ self.assertTrue('description' in service_data)
+ self.assertEqual(description, service_data['description'])
+ #Get service
+ resp, fetched_service = self.client.get_service(service_data['id'])
+ self.assertTrue(resp['status'].startswith('2'))
+ #verifying the existence of service created
+ self.assertTrue('id' in fetched_service)
+ self.assertEquals(fetched_service['id'], service_data['id'])
+ self.assertTrue('name' in fetched_service)
+ self.assertEqual(fetched_service['name'], service_data['name'])
+ self.assertTrue('type' in fetched_service)
+ self.assertEqual(fetched_service['type'], service_data['type'])
+ self.assertTrue('description' in fetched_service)
+ self.assertEqual(fetched_service['description'],
+ service_data['description'])
+ finally:
+ #Deleting the service created in this method
+ resp, _ = self.client.delete_service(service_data['id'])
+ self.assertTrue(resp['status'].startswith('2'))
+ #Checking whether service is deleted successfully
+ self.assertRaises(exceptions.NotFound, self.client.get_service,
+ service_data['id'])
diff --git a/tempest/tests/identity/admin/test_users.py b/tempest/tests/identity/admin/test_users.py
index 12dcb5d..1e1b752 100644
--- a/tempest/tests/identity/admin/test_users.py
+++ b/tempest/tests/identity/admin/test_users.py
@@ -239,9 +239,95 @@
self.assertRaises(exceptions.Unauthorized,
self.non_admin_client.get_users)
+ @attr(type='negative')
def test_get_users_request_without_token(self):
"""Request to get list of users without a valid token should fail"""
token = self.client.get_auth()
self.client.delete_token(token)
self.assertRaises(exceptions.Unauthorized, self.client.get_users)
self.client.clear_auth()
+
+ @attr(type='positive')
+ def test_list_users_for_tenant(self):
+ """Return a list of all users for a tenant"""
+ self.data.setup_test_tenant()
+ user_ids = list()
+ fetched_user_ids = list()
+ resp, user1 = self.client.create_user('tenant_user1', 'password1',
+ self.data.tenant['id'],
+ 'user1@123')
+ user_ids.append(user1['id'])
+ self.data.users.append(user1)
+ resp, user2 = self.client.create_user('tenant_user2', 'password2',
+ self.data.tenant['id'],
+ 'user2@123')
+ user_ids.append(user2['id'])
+ self.data.users.append(user2)
+ #List of users for the respective tenant ID
+ resp, body = self.client.list_users_for_tenant(self.data.tenant['id'])
+ self.assertTrue(resp['status'].startswith('2'))
+ for i in body:
+ fetched_user_ids.append(i['id'])
+ #verifying the user Id in the list
+ missing_users =\
+ [user for user in user_ids if user not in fetched_user_ids]
+ self.assertEqual(0, len(missing_users),
+ "Failed to find user %s in fetched list"
+ % ', '.join(m_user for m_user in missing_users))
+
+ @attr(type='positive')
+ def test_list_users_with_roles_for_tenant(self):
+ """Return list of users on tenant when roles are assigned to users"""
+ self.data.setup_test_user()
+ self.data.setup_test_role()
+ user = self.get_user_by_name(self.data.test_user)
+ tenant = self.get_tenant_by_name(self.data.test_tenant)
+ role = self.get_role_by_name(self.data.test_role)
+ #Assigning roles to two users
+ user_ids = list()
+ fetched_user_ids = list()
+ user_ids.append(user['id'])
+ self.client.assign_user_role(tenant['id'], user['id'], role['id'])
+ resp, second_user = self.client.create_user('second_user', 'password1',
+ self.data.tenant['id'],
+ 'user1@123')
+ user_ids.append(second_user['id'])
+ self.data.users.append(second_user)
+ self.client.assign_user_role(tenant['id'], second_user['id'],
+ role['id'])
+ #List of users with roles for the respective tenant ID
+ resp, body = self.client.list_users_for_tenant(self.data.tenant['id'])
+ self.assertTrue(resp['status'].startswith('2'))
+ for i in body:
+ fetched_user_ids.append(i['id'])
+ #verifying the user Id in the list
+ missing_users =\
+ [user for user in user_ids if user not in fetched_user_ids]
+ self.assertEqual(0, len(missing_users),
+ "Failed to find user %s in fetched list"
+ % ', '.join(m_user for m_user in missing_users))
+
+ @attr(type='negative')
+ def test_list_users_with_invalid_tenant(self):
+ """
+ Should not be able to return a list of all
+ users for a nonexistant tenant
+ """
+ #Assign invalid tenant ids
+ invalid_id = list()
+ invalid_id.append(rand_name('999'))
+ invalid_id.append('alpha')
+ invalid_id.append(rand_name("dddd@#%%^$"))
+ invalid_id.append('!@#()$%^&*?<>{}[]')
+ #List the users with invalid tenant id
+ fail = list()
+ for invalid in invalid_id:
+ try:
+ resp, body = self.client.list_users_for_tenant(invalid)
+ except exceptions.NotFound:
+ pass
+ else:
+ fail.append(invalid)
+ if len(fail) != 0:
+ self.fail('Should raise Not Found when list users with invalid'
+ 'tenant ids %s' % fail)