Fixes bug/902405. Adds users tests and methods to admin client
Change-Id: Ifa9d5001c9961f747b91ff7cec9b7ad2fc5a4aaa
diff --git a/tempest/common/rest_client.py b/tempest/common/rest_client.py
index fd2f684..a18e563 100644
--- a/tempest/common/rest_client.py
+++ b/tempest/common/rest_client.py
@@ -18,7 +18,6 @@
import json
import httplib2
import logging
-import sys
import time
from tempest import exceptions
@@ -74,6 +73,15 @@
self.token = None
self.base_url = None
+ def get_auth(self):
+ """Returns the token of the current request or sets the token if
+ none"""
+
+ if not self.token:
+ self._set_auth()
+
+ return self.token
+
def basic_auth(self, user, password, auth_url):
"""
Provides authentication for the target API
@@ -177,7 +185,7 @@
req_url = "%s/%s" % (self.base_url, url)
resp, resp_body = self.http_obj.request(req_url, method,
headers=headers, body=body)
- if resp.status == 401:
+ if resp.status == 401 or resp.status == 403:
self._log(req_url, body, resp, resp_body)
raise exceptions.Unauthorized()
diff --git a/tempest/openstack.py b/tempest/openstack.py
index 3e19ba5..6e58272 100644
--- a/tempest/openstack.py
+++ b/tempest/openstack.py
@@ -31,6 +31,7 @@
from tempest.services.nova.json.keypairs_client import KeyPairsClient
from tempest.services.nova.json.volumes_client import VolumesClient
from tempest.services.identity.json.admin_client import AdminClient
+from tempest.services.identity.json.admin_client import TokenClient
LOG = logging.getLogger(__name__)
@@ -81,6 +82,7 @@
self.floating_ips_client = FloatingIPsClient(*client_args)
self.volumes_client = VolumesClient(*client_args)
self.admin_client = AdminClient(*client_args)
+ self.token_client = TokenClient(self.config)
class AltManager(Manager):
diff --git a/tempest/services/identity/json/admin_client.py b/tempest/services/identity/json/admin_client.py
index 3b3e86c..7784d81 100644
--- a/tempest/services/identity/json/admin_client.py
+++ b/tempest/services/identity/json/admin_client.py
@@ -1,4 +1,6 @@
from tempest.common.rest_client import RestClient
+from tempest import exceptions
+import httplib2
import json
@@ -95,3 +97,86 @@
self.headers)
body = json.loads(body)
return resp, body['tenant']
+
+ def create_user(self, name, password, tenant_id, email):
+ """Create a user"""
+ post_body = {
+ 'name': name,
+ 'password': password,
+ 'tenantId': tenant_id,
+ 'email': email
+ }
+ post_body = json.dumps({'user': post_body})
+ resp, body = self.post('users', post_body, self.headers)
+ body = json.loads(body)
+ return resp, body['user']
+
+ def delete_user(self, user_id):
+ """Delete a user"""
+ resp, body = self.delete("users/%s" % user_id)
+ return resp, body
+
+ def get_users(self):
+ """Get the list of users"""
+ resp, body = self.get("users")
+ body = json.loads(body)
+ return resp, body['users']
+
+ def enable_disable_user(self, user_id, enabled):
+ """Enables or disables a user"""
+ put_body = {
+ 'enabled': enabled
+ }
+ put_body = json.dumps({'user': put_body})
+ resp, body = self.put('users/%s/enabled' % user_id,
+ put_body, self.headers)
+ body = json.loads(body)
+ return resp, body
+
+ def delete_token(self, token_id):
+ """Delete a token"""
+ resp, body = self.delete("tokens/%s" % token_id)
+ return resp, body
+
+
+class TokenClient(RestClient):
+
+ def __init__(self, config):
+ self.auth_url = config.identity.auth_url
+
+ def auth(self, user, password, tenant):
+ creds = {'auth': {
+ 'passwordCredentials': {
+ 'username': user,
+ 'password': password,
+ },
+ 'tenantName': tenant
+ }
+ }
+ headers = {'Content-Type': 'application/json'}
+ body = json.dumps(creds)
+ resp, body = self.post(self.auth_url, headers=headers, body=body)
+ return resp, body
+
+ def request(self, method, url, headers=None, body=None):
+ """A simple HTTP request interface."""
+ self.http_obj = httplib2.Http()
+ if headers == None:
+ headers = {}
+
+ resp, resp_body = self.http_obj.request(url, method,
+ headers=headers, body=body)
+
+ if resp.status in (401, 403):
+ resp_body = json.loads(resp_body)
+ raise exceptions.Unauthorized(resp_body['error']['message'])
+
+ return resp, resp_body
+
+ def get_token(self, user, password, tenant):
+ resp, body = self.auth(user, password, tenant)
+ if resp['status'] != '202':
+ body = json.loads(body)
+ access = body['access']
+ token = access['token']
+ return token['id']
diff --git a/tempest/tests/identity/base_admin_test.py b/tempest/tests/identity/base_admin_test.py
new file mode 100644
index 0000000..e8796c2
--- /dev/null
+++ b/tempest/tests/identity/base_admin_test.py
@@ -0,0 +1,105 @@
+import unittest2 as unittest
+import nose
+import tempest.config
+from tempest import openstack
+from tempest.common.utils.data_utils import rand_name
+
+
+class BaseAdminTest(unittest.TestCase):
+ """Base class for Identity Admin Tests"""
+
+ @classmethod
+ def setUpClass(cls):
+ cls.config = tempest.config.TempestConfig()
+ cls.admin_username = cls.config.compute_admin.username
+ cls.admin_password = cls.config.compute_admin.password
+ cls.admin_tenant = cls.config.compute_admin.tenant_name
+
+ if not(cls.admin_username and cls.admin_password and cls.admin_tenant):
+ raise nose.SkipTest("Missing Admin credentials in configuration")
+
+ cls.admin_os = openstack.AdminManager()
+ cls.client = cls.admin_os.admin_client
+ cls.token_client = cls.admin_os.token_client
+
+ if not cls.client.has_admin_extensions():
+ raise nose.SkipTest("Admin extensions disabled")
+
+ cls.os = openstack.Manager()
+ cls.non_admin_client = cls.os.admin_client
+ cls.data = DataGenerator(cls.client)
+
+ @classmethod
+ def tearDownClass(cls):
+ cls.data.teardown_all()
+
+ def disable_user(self, user_name):
+ user = self.get_user_by_name(user_name)
+ self.client.enable_disable_user(user['id'], False)
+
+ def disable_tenant(self, tenant_name):
+ tenant = self.get_tenant_by_name(tenant_name)
+ self.client.update_tenant(tenant['id'], tenant['description'], False)
+
+ def get_user_by_name(self, name):
+ _, users = self.client.get_users()
+ user = [u for u in users if u['name'] == name]
+ if len(user) > 0:
+ return user[0]
+
+ def get_tenant_by_name(self, name):
+ _, tenants = self.client.get_tenants()
+ tenant = [t for t in tenants if t['name'] == name]
+ if len(tenant) > 0:
+ return tenant[0]
+
+ def get_role_by_name(self, name):
+ _, roles = self.client.get_roles()
+ role = [r for r in roles if r['name'] == name]
+ if len(role) > 0:
+ return role[0]
+
+
+class DataGenerator(object):
+
+ def __init__(self, client):
+ self.client = client
+ self.users = []
+ self.tenants = []
+ self.roles = []
+ self.role_name = None
+
+ def setup_test_user(self):
+ """Set up a test user"""
+ self.setup_test_tenant()
+ self.test_user = rand_name('test_user_')
+ self.test_password = rand_name('pass_')
+ self.test_email = self.test_user + '@testmail.tm'
+ resp, self.user = self.client.create_user(self.test_user,
+ self.test_password,
+ self.tenant['id'],
+ self.test_email)
+ self.users.append(self.user)
+
+ def setup_test_tenant(self):
+ """Set up a test tenant"""
+ self.test_tenant = rand_name('test_tenant_')
+ self.test_description = rand_name('desc_')
+ resp, self.tenant = self.client.create_tenant(
+ name=self.test_tenant,
+ description=self.test_description)
+ self.tenants.append(self.tenant)
+
+ def setup_test_role(self):
+ """Set up a test role"""
+ self.role_name = rand_name('role')
+ resp, role = self.client.create_role(self.role_name)
+ self.roles.append(role)
+
+ def teardown_all(self):
+ for user in self.users:
+ self.client.delete_user(user['id'])
+ for tenant in self.tenants:
+ self.client.delete_tenant(tenant['id'])
+ for role in self.roles:
+ self.client.delete_role(role['id'])
diff --git a/tempest/tests/identity/test_tenants.py b/tempest/tests/identity/test_tenants.py
index e98048b..92feea8 100644
--- a/tempest/tests/identity/test_tenants.py
+++ b/tempest/tests/identity/test_tenants.py
@@ -1,39 +1,33 @@
-import unittest2 as unittest
import nose
-from tempest import openstack
from tempest import exceptions
from tempest.common.utils.data_utils import rand_name
-from tempest.tests import utils
+from base_admin_test import BaseAdminTest
-class TenantsTest(unittest.TestCase):
+class TenantsTest(BaseAdminTest):
@classmethod
def setUpClass(cls):
- cls.os = openstack.AdminManager()
- cls.client = cls.os.admin_client
- cls.config = cls.os.config
+ super(TenantsTest, cls).setUpClass()
if not cls.client.has_admin_extensions():
raise nose.SkipTest("Admin extensions disabled")
- cls.tenants = []
for _ in xrange(5):
- resp, body = cls.client.create_tenant(rand_name('tenant-'))
- cls.tenants.append(body['id'])
+ resp, tenant = cls.client.create_tenant(rand_name('tenant-'))
+ cls.data.tenants.append(tenant)
@classmethod
def tearDownClass(cls):
- for tenant in cls.tenants:
- cls.client.delete_tenant(tenant)
+ super(TenantsTest, cls).tearDownClass()
def test_list_tenants(self):
"""Return a list of all tenants"""
resp, body = self.client.list_tenants()
- found = [tenant for tenant in body if tenant['id'] in self.tenants]
+ found = [tenant for tenant in body if tenant in self.data.tenants]
self.assertTrue(any(found), 'List did not return newly created '
'tenants')
- self.assertEqual(len(found), len(self.tenants))
+ self.assertEqual(len(found), len(self.data.tenants))
self.assertTrue(resp['status'].startswith('2'))
def test_tenant_delete(self):
diff --git a/tempest/tests/identity/test_users.py b/tempest/tests/identity/test_users.py
new file mode 100644
index 0000000..109ce37
--- /dev/null
+++ b/tempest/tests/identity/test_users.py
@@ -0,0 +1,230 @@
+import unittest2 as unittest
+from nose.plugins.attrib import attr
+from base_admin_test import BaseAdminTest
+from tempest import exceptions
+from tempest.common.utils.data_utils import rand_name
+
+
+class UsersTest(BaseAdminTest):
+
+ alt_user = rand_name('test_user_')
+ alt_password = rand_name('pass_')
+ alt_email = alt_user + '@testmail.tm'
+ alt_tenant = rand_name('test_tenant_')
+ alt_description = rand_name('desc_')
+
+ @attr(type='smoke')
+ def test_create_user(self):
+ """Create a user"""
+ self.data.setup_test_tenant()
+ resp, user = self.client.create_user(self.alt_user, self.alt_password,
+ self.data.tenant['id'],
+ self.alt_email)
+ self.data.users.append(user)
+ self.assertEqual('200', resp['status'])
+ self.assertEqual(self.alt_user, user['name'])
+
+ @attr(type='negative')
+ def test_create_user_by_unauthorized_user(self):
+ """Non-admin should not be authorized to create a user"""
+ self.data.setup_test_tenant()
+ self.assertRaises(exceptions.Unauthorized,
+ self.non_admin_client.create_user, self.alt_user,
+ self.alt_password, self.data.tenant['id'],
+ self.alt_email)
+
+ @attr(type='negative')
+ @unittest.skip("Until Bug 987121 is fixed")
+ def test_create_user_with_empty_name(self):
+ """User with an empty name should not be created"""
+ self.data.setup_test_tenant()
+ self.assertRaises(exceptions.BadRequest, self.client.create_user, '',
+ self.alt_password, self.data.tenant['id'],
+ self.alt_email)
+
+ @attr(type='negative')
+ @unittest.skip("Until Bug 966249 is fixed")
+ def test_create_user_with_name_length_over_64(self):
+ """Length of user name filed should be restricted to 64 characters"""
+ self.data.setup_test_tenant()
+ self.assertRaises(exceptions.BadRequest, self.client.create_user,
+ 'a' * 64, self.alt_password,
+ self.data.tenant['id'], self.alt_email)
+
+ @attr(type='negative')
+ def test_create_user_with_duplicate_name(self):
+ """Duplicate user should not be created"""
+ self.data.setup_test_user()
+ self.assertRaises(exceptions.Duplicate, self.client.create_user,
+ self.data.test_user, self.data.test_password,
+ self.data.tenant['id'], self.data.test_email)
+
+ @attr(type='negative')
+ @unittest.skip("Until Bug 999084 is fixed")
+ def test_create_user_with_empty_password(self):
+ """User with an empty password should not be created"""
+ self.data.setup_test_tenant()
+ self.assertRaises(exceptions.BadRequest, self.client.create_user,
+ self.alt_user, '', self.data.tenant['id'],
+ self.alt_email)
+
+ @attr(type='nagative')
+ @unittest.skip("Until Bug 999084 is fixed")
+ def test_create_user_with_long_password(self):
+ """User having password exceeding max length should not be created"""
+ self.data.setup_test_tenant()
+ self.assertRaises(exceptions.BadRequest, self.client.create_user,
+ self.alt_user, 'a' * 64, self.data.tenant['id'],
+ self.alt_email)
+
+ @attr(type='negative')
+ @unittest.skip("Until Bug 999084 is fixed")
+ def test_create_user_with_invalid_email_format(self):
+ """Email format should be validated while creating a user"""
+ self.data.setup_test_tenant()
+ self.assertRaises(exceptions.BadRequest, self.client.create_user,
+ self.alt_user, '', self.data.tenant['id'], '12345')
+
+ @attr(type='negative')
+ def test_create_user_for_non_existant_tenant(self):
+ """Attempt to create a user in a non-existent tenant should fail"""
+ self.assertRaises(exceptions.NotFound, self.client.create_user,
+ self.alt_user, self.alt_password, '49ffgg99999',
+ self.alt_email)
+
+ @attr(type='negative')
+ def test_create_user_request_without_a_token(self):
+ """Request to create a user without a valid token should fail"""
+ self.data.setup_test_tenant()
+ # Get the token of the current client
+ token = self.client.get_auth()
+ # Delete the token from database
+ self.client.delete_token(token)
+ self.assertRaises(exceptions.Unauthorized, self.client.create_user,
+ self.alt_user, self.alt_password,
+ self.data.tenant['id'], self.alt_email)
+
+ # Unset the token to allow further tests to generate a new token
+ self.client.clear_auth()
+
+ @attr(type='smoke')
+ def test_delete_user(self):
+ """Delete a user"""
+ self.data.setup_test_tenant()
+ resp, user = self.client.create_user('user_1234', self.alt_password,
+ self.data.tenant['id'],
+ self.alt_email)
+ resp, body = self.client.delete_user(user['id'])
+ self.assertEquals('204', resp['status'])
+
+ @attr(type='negative')
+ def test_delete_users_by_unauthorized_user(self):
+ """Non admin user should not be authorized to delete a user"""
+ self.data.setup_test_user()
+ self.assertRaises(exceptions.Unauthorized,
+ self.non_admin_client.delete_user,
+ self.data.user['id'])
+
+ @attr(type='negative')
+ def test_delete_non_existant_user(self):
+ """Attempt to delete a non-existent user should fail"""
+ self.assertRaises(exceptions.NotFound, self.client.delete_user,
+ 'junk12345123')
+
+ @attr(type='smoke')
+ def test_user_authentication(self):
+ """Valid user's token is authenticated"""
+ self.data.setup_test_user()
+ # Get a token
+ self.token_client.auth(self.data.test_user, self.data.test_password,
+ self.data.test_tenant)
+ # Re-auth
+ resp, body = self.token_client.auth(self.data.test_user,
+ self.data.test_password,
+ self.data.test_tenant)
+ self.assertEqual('200', resp['status'])
+
+ @attr(type='negative')
+ def test_authentication_for_disabled_user(self):
+ """Disabled user's token should not get authenticated"""
+ self.data.setup_test_user()
+ self.disable_user(self.data.test_user)
+ self.assertRaises(exceptions.Unauthorized, self.token_client.auth,
+ self.data.test_user,
+ self.data.test_password,
+ self.data.test_tenant)
+
+ @attr(type='negative')
+ @unittest.skip('Until Bug 988920 is fixed')
+ def test_authentication_when_tenant_is_disabled(self):
+ """User's token for a disabled tenant should not be authenticated"""
+ self.data.setup_test_user()
+ self.disable_tenant(self.data.test_tenant)
+ self.assertRaises(exceptions.Unauthorized, self.token_client.auth,
+ self.data.test_user,
+ self.data.test_password,
+ self.data.test_tenant)
+
+ @attr(type='negative')
+ @unittest.skip('Until Bug 988920 is fixed')
+ def test_authentication_with_invalid_tenant(self):
+ """User's token for an invalid tenant should not be authenticated"""
+ self.data.setup_one_user()
+ self.assertRaises(exceptions.Unauthorized, self.token_client.auth,
+ self.data.test_user,
+ self.data.test_password,
+ 'junktenant1234')
+
+ @attr(type='negative')
+ def test_authentication_with_invalid_username(self):
+ """Non-existent user's token should not get authenticated"""
+ self.assertRaises(exceptions.Unauthorized, self.token_client.auth,
+ 'junkuser123', self.data.test_password,
+ self.data.test_tenant)
+
+ @attr(type='negative')
+ def test_authentication_with_invalid_password(self):
+ """User's token with invalid password should not be authenticated"""
+ self.data.setup_test_user()
+ self.assertRaises(exceptions.Unauthorized, self.token_client.auth,
+ self.data.test_user, 'junkpass1234',
+ self.data.test_tenant)
+
+ @attr(type='positive')
+ def test_authentication_request_without_token(self):
+ """Request for token authentication with a valid token in header"""
+ self.data.setup_test_user()
+ self.token_client.auth(self.data.test_user, self.data.test_password,
+ self.data.test_tenant)
+ # Get the token of the current client
+ token = self.client.get_auth()
+ # Delete the token from database
+ self.client.delete_token(token)
+ # Re-auth
+ resp, body = self.token_client.auth(self.data.test_user,
+ self.data.test_password,
+ self.data.test_tenant)
+ self.assertEqual('200', resp['status'])
+ self.client.clear_auth()
+
+ @attr(type='smoke')
+ def test_get_users(self):
+ """Get a list of users and find the test user"""
+ self.data.setup_test_user()
+ resp, users = self.client.get_users()
+ self.assertIn(self.data.test_user, [u['name'] for u in users],
+ "Could not find %s" % self.data.test_user)
+
+ @attr(type='negative')
+ def test_get_users_by_unauthorized_user(self):
+ """Non admin user should not be authorized to get user list"""
+ self.data.setup_test_user()
+ self.assertRaises(exceptions.Unauthorized,
+ self.non_admin_client.get_users)
+
+ def test_get_users_request_without_token(self):
+ """Request to get list of users without a valid token should fail"""
+ token = self.client.get_auth()
+ self.client.delete_token(token)
+ self.assertRaises(exceptions.Unauthorized, self.client.get_users)
+ self.client.clear_auth()