blob: a38dcd14460b584c947920bd6e5ed1ca3f2f775d [file] [log] [blame]
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 OpenStack, LLC
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
from tempest.api import compute
from tempest import clients
from tempest.common import log as logging
from tempest.common.utils.data_utils import parse_image_id
from tempest.common.utils.data_utils import rand_name
from tempest import exceptions
import tempest.test
LOG = logging.getLogger(__name__)
class BaseComputeTest(tempest.test.BaseTestCase):
"""Base test case class for all Compute API tests."""
conclusion = compute.generic_setup_package()
@classmethod
def setUpClass(cls):
if not cls.config.service_available.nova:
skip_msg = ("%s skipped as nova is not available" % cls.__name__)
raise cls.skipException(skip_msg)
cls.isolated_creds = []
if cls.config.compute.allow_tenant_isolation:
creds = cls._get_isolated_creds()
username, tenant_name, password = creds
os = clients.Manager(username=username,
password=password,
tenant_name=tenant_name,
interface=cls._interface)
else:
os = clients.Manager(interface=cls._interface)
cls.os = os
cls.servers_client = os.servers_client
cls.flavors_client = os.flavors_client
cls.images_client = os.images_client
cls.extensions_client = os.extensions_client
cls.floating_ips_client = os.floating_ips_client
cls.keypairs_client = os.keypairs_client
cls.security_groups_client = os.security_groups_client
cls.quotas_client = os.quotas_client
cls.limits_client = os.limits_client
cls.volumes_extensions_client = os.volumes_extensions_client
cls.volumes_client = os.volumes_client
cls.interfaces_client = os.interfaces_client
cls.fixed_ips_client = os.fixed_ips_client
cls.availability_zone_client = os.availability_zone_client
cls.aggregates_client = os.aggregates_client
cls.services_client = os.services_client
cls.hypervisor_client = os.hypervisor_client
cls.build_interval = cls.config.compute.build_interval
cls.build_timeout = cls.config.compute.build_timeout
cls.ssh_user = cls.config.compute.ssh_user
cls.image_ref = cls.config.compute.image_ref
cls.image_ref_alt = cls.config.compute.image_ref_alt
cls.flavor_ref = cls.config.compute.flavor_ref
cls.flavor_ref_alt = cls.config.compute.flavor_ref_alt
cls.servers = []
cls.images = []
cls.servers_client_v3_auth = os.servers_client_v3_auth
@classmethod
def _get_identity_admin_client(cls):
"""
Returns an instance of the Identity Admin API client
"""
os = clients.AdminManager(interface=cls._interface)
admin_client = os.identity_client
return admin_client
@classmethod
def _get_client_args(cls):
return (
cls.config,
cls.config.identity.admin_username,
cls.config.identity.admin_password,
cls.config.identity.uri
)
@classmethod
def _get_isolated_creds(cls, admin=False):
"""
Creates a new set of user/tenant/password credentials for a
**regular** user of the Compute API so that a test case can
operate in an isolated tenant container.
"""
admin_client = cls._get_identity_admin_client()
password = "pass"
while True:
try:
rand_name_root = rand_name(cls.__name__)
if cls.isolated_creds:
# Main user already created. Create the alt one...
rand_name_root += '-alt'
tenant_name = rand_name_root + "-tenant"
tenant_desc = tenant_name + "-desc"
resp, tenant = admin_client.create_tenant(
name=tenant_name, description=tenant_desc)
break
except exceptions.Duplicate:
if cls.config.compute.allow_tenant_reuse:
tenant = admin_client.get_tenant_by_name(tenant_name)
LOG.info('Re-using existing tenant %s', tenant)
break
while True:
try:
rand_name_root = rand_name(cls.__name__)
if cls.isolated_creds:
# Main user already created. Create the alt one...
rand_name_root += '-alt'
username = rand_name_root + "-user"
email = rand_name_root + "@example.com"
resp, user = admin_client.create_user(username,
password,
tenant['id'],
email)
break
except exceptions.Duplicate:
if cls.config.compute.allow_tenant_reuse:
user = admin_client.get_user_by_username(tenant['id'],
username)
LOG.info('Re-using existing user %s', user)
break
# Store the complete creds (including UUID ids...) for later
# but return just the username, tenant_name, password tuple
# that the various clients will use.
cls.isolated_creds.append((user, tenant))
# Assign admin role if this is for admin creds
if admin:
_, roles = admin_client.list_roles()
role = None
try:
_, roles = admin_client.list_roles()
role = next(r for r in roles if r['name'] == 'admin')
except StopIteration:
msg = "No admin role found"
raise exceptions.NotFound(msg)
admin_client.assign_user_role(tenant['id'], user['id'], role['id'])
return username, tenant_name, password
@classmethod
def clear_isolated_creds(cls):
if not cls.isolated_creds:
return
admin_client = cls._get_identity_admin_client()
for user, tenant in cls.isolated_creds:
admin_client.delete_user(user['id'])
admin_client.delete_tenant(tenant['id'])
@classmethod
def clear_servers(cls):
for server in cls.servers:
try:
cls.servers_client.delete_server(server['id'])
except Exception:
pass
for server in cls.servers:
try:
cls.servers_client.wait_for_server_termination(server['id'])
except Exception:
pass
@classmethod
def clear_images(cls):
for image_id in cls.images:
try:
cls.images_client.delete_image(image_id)
except Exception as exc:
LOG.info('Exception raised deleting image %s', image_id)
LOG.exception(exc)
pass
@classmethod
def tearDownClass(cls):
cls.clear_images()
cls.clear_servers()
cls.clear_isolated_creds()
@classmethod
def create_server(cls, **kwargs):
"""Wrapper utility that returns a test server."""
name = rand_name(cls.__name__ + "-instance")
if 'name' in kwargs:
name = kwargs.pop('name')
flavor = kwargs.get('flavor', cls.flavor_ref)
image_id = kwargs.get('image_id', cls.image_ref)
resp, body = cls.servers_client.create_server(
name, image_id, flavor, **kwargs)
# handle the case of multiple servers
servers = [body]
if 'min_count' in kwargs or 'max_count' in kwargs:
# Get servers created which name match with name param.
r, b = cls.servers_client.list_servers()
servers = [s for s in b['servers'] if s['name'].startswith(name)]
cls.servers.extend(servers)
if 'wait_until' in kwargs:
for server in servers:
cls.servers_client.wait_for_server_status(
server['id'], kwargs['wait_until'])
return resp, body
@classmethod
def create_image_from_server(cls, server_id, **kwargs):
"""Wrapper utility that returns a test server."""
name = rand_name(cls.__name__ + "-image")
if 'name' in kwargs:
name = kwargs.pop('name')
resp, image = cls.images_client.create_image(
server_id, name)
image_id = parse_image_id(resp['location'])
cls.images.append(image_id)
if 'wait_until' in kwargs:
cls.images_client.wait_for_image_status(image_id,
kwargs['wait_until'])
resp, image = cls.images_client.get_image(image_id)
return resp, image
def wait_for(self, condition):
"""Repeatedly calls condition() until a timeout."""
start_time = int(time.time())
while True:
try:
condition()
except Exception:
pass
else:
return
if int(time.time()) - start_time >= self.build_timeout:
condition()
return
time.sleep(self.build_interval)
class BaseComputeAdminTest(BaseComputeTest):
"""Base test case class for all Compute Admin API tests."""
@classmethod
def setUpClass(cls):
super(BaseComputeAdminTest, cls).setUpClass()
admin_username = cls.config.compute_admin.username
admin_password = cls.config.compute_admin.password
admin_tenant = cls.config.compute_admin.tenant_name
if not (admin_username and admin_password and admin_tenant):
msg = ("Missing Compute Admin API credentials "
"in configuration.")
raise cls.skipException(msg)
if cls.config.compute.allow_tenant_isolation:
creds = cls._get_isolated_creds(admin=True)
admin_username, admin_tenant_name, admin_password = creds
cls.os_adm = clients.Manager(username=admin_username,
password=admin_password,
tenant_name=admin_tenant_name,
interface=cls._interface)
else:
cls.os_adm = clients.ComputeAdminManager(interface=cls._interface)