Addresses lp#940832
* Refactored openstack class to be parameterizable
* Added basic authorization tests
* Added unauthorized exception
* Renamed users to be more specific of their type
Change-Id: I53fc0aa96c23b8cb33f329ff0d753f7f2d6e8d14
diff --git a/etc/tempest.conf.sample b/etc/tempest.conf.sample
index 780e48d..22d9f7e 100644
--- a/etc/tempest.conf.sample
+++ b/etc/tempest.conf.sample
@@ -4,9 +4,12 @@
port=5000
api_version=v2.0
path=tokens
-user=admin
-password=admin-password
-tenant_name=admin-project
+nonadmin_user1=user1
+nonadmin_user1_password=password
+nonadmin_user1_tenant_name=user1-project
+nonadmin_user2=user2
+nonadmin_user2_password=password
+nonadmin_user2_tenant_name=user2-project
strategy=keystone
[compute]
diff --git a/tempest/common/rest_client.py b/tempest/common/rest_client.py
index 325d56b..d4ad014 100644
--- a/tempest/common/rest_client.py
+++ b/tempest/common/rest_client.py
@@ -124,6 +124,9 @@
req_url = "%s/%s" % (self.base_url, url)
resp, resp_body = self.http_obj.request(req_url, method,
headers=headers, body=body)
+ if resp.status == 401:
+ self._log(req_url, body, resp, resp_body)
+ raise exceptions.Unauthorized()
if resp.status == 404:
self._log(req_url, body, resp, resp_body)
diff --git a/tempest/config.py b/tempest/config.py
index dc93910..f805d93 100644
--- a/tempest/config.py
+++ b/tempest/config.py
@@ -55,19 +55,34 @@
return self.get("use_ssl", 'false').lower() != 'false'
@property
- def username(self):
- """Username to use for Identity API requests."""
- return self.get("user", None)
+ def nonadmin_user1(self):
+ """Username to use for Nova API requests."""
+ return self.get("nonadmin_user1")
@property
- def tenant_name(self):
- """Tenant name to use for Identity API requests."""
- return self.get("tenant_name", None)
+ def nonadmin_user1_tenant_name(self):
+ """Tenant name to use for Nova API requests."""
+ return self.get("nonadmin_user1_tenant_name")
@property
- def password(self):
- """Password to use when authenticating."""
- return self.get("password", None)
+ def nonadmin_user1_password(self):
+ """API key to use when authenticating."""
+ return self.get("nonadmin_user1_password")
+
+ @property
+ def nonadmin_user2(self):
+ """Alternate username to use for Nova API requests."""
+ return self.get("nonadmin_user2")
+
+ @property
+ def nonadmin_user2_tenant_name(self):
+ """Alternate tenant name for Nova API requests."""
+ return self.get("nonadmin_user2_tenant_name")
+
+ @property
+ def nonadmin_user2_password(self):
+ """Alternate API key to use when authenticating."""
+ return self.get("nonadmin_user2_password")
@property
def strategy(self):
diff --git a/tempest/exceptions.py b/tempest/exceptions.py
index 9f33740..b49e9e6 100644
--- a/tempest/exceptions.py
+++ b/tempest/exceptions.py
@@ -35,6 +35,10 @@
message = "Object not found"
+class Unauthorized(TempestException):
+ message = 'Unauthorized'
+
+
class TimeoutException(TempestException):
message = "Request timed out"
diff --git a/tempest/openstack.py b/tempest/openstack.py
index 862b517..68d924c 100644
--- a/tempest/openstack.py
+++ b/tempest/openstack.py
@@ -14,16 +14,20 @@
class Manager(object):
- def __init__(self):
+ def __init__(self, username=None, api_key=None, tenant_name=None):
"""
Top level manager for all Openstack APIs
"""
self.config = tempest.config.TempestConfig()
- username = self.config.identity.username
- password = self.config.identity.password
- tenant_name = self.config.identity.tenant_name
if None in [username, password, tenant_name]:
+ # Pull from the default, the first non-admin user
+ username = self.config.identity.nonadmin_user1
+ password = self.config.identity.nonadmin_user1_password
+ tenant_name = self.config.identity.nonadmin_user1_tenant_name
+
+ if None in [username, password, tenant_name]:
+ # We can't find any usable credentials, fail early
raise exceptions.InvalidConfiguration(message="Missing complete \
user credentials.")
auth_url = self.config.identity.auth_url
diff --git a/tempest/tests/test_authorization.py b/tempest/tests/test_authorization.py
new file mode 100644
index 0000000..f71073d
--- /dev/null
+++ b/tempest/tests/test_authorization.py
@@ -0,0 +1,161 @@
+import unittest2 as unittest
+
+from nose.plugins.attrib import attr
+from nose.tools import raises
+
+from tempest import openstack
+from tempest.common.utils.data_utils import rand_name
+from tempest.exceptions import NotFound, ComputeFault, BadRequest, Unauthorized
+from tempest.tests import utils
+
+
+class AuthorizationTest(unittest.TestCase):
+
+ @classmethod
+ def setUpClass(cls):
+ cls.os = openstack.Manager()
+ cls.client = cls.os.servers_client
+ cls.images_client = cls.os.images_client
+ cls.config = cls.os.config
+ cls.image_ref = cls.config.compute.image_ref
+ cls.flavor_ref = cls.config.compute.flavor_ref
+ cls.image_ref_alt = cls.config.compute.image_ref_alt
+ cls.flavor_ref_alt = cls.config.compute.flavor_ref_alt
+
+ # Verify the second user is not the same as the first and is configured
+ cls.user1 = cls.config.identity.nonadmin_user1
+ cls.user2 = cls.config.identity.nonadmin_user2
+ cls.user2_password = cls.config.identity.nonadmin_user2_password
+ cls.user2_tenant_name = cls.config.identity.nonadmin_user2_tenant_name
+ cls.multi_user = False
+
+ if (cls.user2 != None and cls.user1 != cls.user2
+ and cls.user2_password != None
+ and cls.user2_tenant_name != None):
+
+ # Setup a client instance for the second user
+ cls.multi_user = True
+ cls.os_other = openstack.Manager(cls.user2, cls.user2_password,
+ cls.user2_tenant_name)
+ cls.other_client = cls.os_other.servers_client
+ cls.other_images_client = cls.os_other.images_client
+
+ name = rand_name('server')
+ resp, server = cls.client.create_server(name, cls.image_ref,
+ cls.flavor_ref)
+ cls.client.wait_for_server_status(server['id'], 'ACTIVE')
+ resp, cls.server = cls.client.get_server(server['id'])
+
+ name = rand_name('image')
+ resp, body = cls.client.create_image(server['id'], name)
+ image_id = cls._parse_image_id(resp['location'])
+ cls.images_client.wait_for_image_resp_code(image_id, 200)
+ cls.images_client.wait_for_image_status(image_id, 'ACTIVE')
+ resp, cls.image = cls.images_client.get_image(image_id)
+
+ @classmethod
+ def tearDownClass(cls):
+ if cls.multi_user:
+ cls.client.delete_server(cls.server['id'])
+ cls.images_client.delete_image(cls.image['id'])
+
+ @raises(NotFound)
+ @attr(type='negative')
+ @utils.skip_unless_attr('multi_user', 'Second user not configured')
+ def test_get_server_for_other_account_fails(self):
+ """A GET request for a server on another user's account should fail"""
+ self.other_client.get_server(self.server['id'])
+
+ @raises(NotFound)
+ @attr(type='negative')
+ @utils.skip_unless_attr('multi_user', 'Second user not configured')
+ def test_delete_server_for_other_account_fails(self):
+ """A DELETE request for another user's server should fail"""
+ self.other_client.delete_server(self.server['id'])
+
+ @raises(ComputeFault)
+ @attr(type='negative')
+ @utils.skip_unless_attr('multi_user', 'Second user not configured')
+ def test_update_server_for_other_account_fails(self):
+ """An update server request for another user's server should fail"""
+ self.other_client.update_server(self.server['id'], name='test')
+
+ @raises(NotFound)
+ @attr(type='negative')
+ @utils.skip_unless_attr('multi_user', 'Second user not configured')
+ def test_list_server_addresses_for_other_account_fails(self):
+ """A list addresses request for another user's server should fail"""
+ self.other_client.list_addresses(self.server['id'])
+
+ @raises(NotFound)
+ @attr(type='negative')
+ @utils.skip_unless_attr('multi_user', 'Second user not configured')
+ def test_list_server_addresses_by_network_for_other_account_fails(self):
+ """
+ A list address/network request for another user's server should fail
+ """
+ server_id = self.server['id']
+ self.other_client.list_addresses_by_network(server_id, 'public')
+
+ @raises(NotFound)
+ @attr(type='negative')
+ @utils.skip_unless_attr('multi_user', 'Second user not configured')
+ def test_change_password_for_other_account_fails(self):
+ """A change password request for another user's server should fail"""
+ self.other_client.change_password(self.server['id'], 'newpass')
+
+ @raises(NotFound)
+ @attr(type='negative')
+ @utils.skip_unless_attr('multi_user', 'Second user not configured')
+ def test_reboot_server_for_other_account_fails(self):
+ """A reboot request for another user's server should fail"""
+ self.other_client.reboot(self.server['id'], 'HARD')
+
+ @raises(NotFound)
+ @attr(type='negative')
+ @utils.skip_unless_attr('multi_user', 'Second user not configured')
+ def test_rebuild_server_for_other_account_fails(self):
+ """A rebuild request for another user's server should fail"""
+ self.other_client.rebuild(self.server['id'], self.image_ref_alt)
+
+ @raises(NotFound)
+ @attr(type='negative')
+ @utils.skip_unless_attr('multi_user', 'Second user not configured')
+ def test_resize_server_for_other_account_fails(self):
+ """A resize request for another user's server should fail"""
+ self.other_client.resize(self.server['id'], self.flavor_ref_alt)
+
+ @raises(NotFound)
+ @attr(type='negative')
+ @utils.skip_unless_attr('multi_user', 'Second user not configured')
+ def test_create_image_for_other_account_fails(self):
+ """A create image request for another user's server should fail"""
+ self.other_images_client.create_image(self.server['id'], 'testImage')
+
+ @raises(BadRequest)
+ @attr(type='negative')
+ @utils.skip_unless_attr('multi_user', 'Second user not configured')
+ def test_create_server_with_unauthorized_image(self):
+ """Server creation with another user's image should fail"""
+ self.other_client.create_server('test', self.image['id'],
+ self.flavor_ref)
+
+ @raises(Unauthorized)
+ @attr(type='negative')
+ @utils.skip_unless_attr('multi_user', 'Second user not configured')
+ def test_create_server_fails_when_tenant_incorrect(self):
+ """
+ A create server request should fail if the tenant id does not match
+ the current user
+ """
+ os_other = openstack.Manager(self.user2, self.user2_password,
+ self.user2_tenant_name)
+ os_other.servers_client.client.base_url = self.client.client.base_url
+ os_other.servers_client.create_server('test', self.image['id'],
+ self.flavor_ref)
+
+ @classmethod
+ def _parse_image_id(self, image_ref):
+ temp = image_ref.rsplit('/')
+ #Return the last item, which is the image id
+ return temp[len(temp) - 1]