blob: d512ae898438e167c493e4cd7fed684b3a6d2f5b [file] [log] [blame]
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 OpenStack, LLC
# Copyright 2013 IBM Corp.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import subprocess
# Default client libs
import cinderclient.client
import glanceclient
import keystoneclient.v2_0.client
import netaddr
from neutronclient.common import exceptions as exc
import neutronclient.v2_0.client
import novaclient.client
from tempest.api.network import common as net_common
from tempest.common import isolated_creds
from tempest.common import ssh
from tempest.common.utils.data_utils import rand_name
import tempest.manager
from tempest.openstack.common import log as logging
import tempest.test
LOG = logging.getLogger(__name__)
class OfficialClientManager(tempest.manager.Manager):
"""
Manager that provides access to the official python clients for
calling various OpenStack APIs.
"""
NOVACLIENT_VERSION = '2'
CINDERCLIENT_VERSION = '1'
def __init__(self, username, password, tenant_name):
super(OfficialClientManager, self).__init__()
self.compute_client = self._get_compute_client(username,
password,
tenant_name)
self.identity_client = self._get_identity_client(username,
password,
tenant_name)
self.image_client = self._get_image_client()
self.network_client = self._get_network_client()
self.volume_client = self._get_volume_client(username,
password,
tenant_name)
def _get_compute_client(self, username, password, tenant_name):
# Novaclient will not execute operations for anyone but the
# identified user, so a new client needs to be created for
# each user that operations need to be performed for.
self._validate_credentials(username, password, tenant_name)
auth_url = self.config.identity.uri
dscv = self.config.identity.disable_ssl_certificate_validation
client_args = (username, password, tenant_name, auth_url)
# Create our default Nova client to use in testing
service_type = self.config.compute.catalog_type
return novaclient.client.Client(self.NOVACLIENT_VERSION,
*client_args,
service_type=service_type,
no_cache=True,
insecure=dscv)
def _get_image_client(self):
token = self.identity_client.auth_token
endpoint = self.identity_client.service_catalog.url_for(
service_type='image', endpoint_type='publicURL')
dscv = self.config.identity.disable_ssl_certificate_validation
return glanceclient.Client('1', endpoint=endpoint, token=token,
insecure=dscv)
def _get_volume_client(self, username, password, tenant_name):
auth_url = self.config.identity.uri
return cinderclient.client.Client(self.CINDERCLIENT_VERSION,
username,
password,
tenant_name,
auth_url)
def _get_identity_client(self, username, password, tenant_name):
# This identity client is not intended to check the security
# of the identity service, so use admin credentials by default.
self._validate_credentials(username, password, tenant_name)
auth_url = self.config.identity.uri
dscv = self.config.identity.disable_ssl_certificate_validation
return keystoneclient.v2_0.client.Client(username=username,
password=password,
tenant_name=tenant_name,
auth_url=auth_url,
insecure=dscv)
def _get_network_client(self):
# The intended configuration is for the network client to have
# admin privileges and indicate for whom resources are being
# created via a 'tenant_id' parameter. This will often be
# preferable to authenticating as a specific user because
# working with certain resources (public routers and networks)
# often requires admin privileges anyway.
username = self.config.identity.admin_username
password = self.config.identity.admin_password
tenant_name = self.config.identity.admin_tenant_name
self._validate_credentials(username, password, tenant_name)
auth_url = self.config.identity.uri
dscv = self.config.identity.disable_ssl_certificate_validation
return neutronclient.v2_0.client.Client(username=username,
password=password,
tenant_name=tenant_name,
auth_url=auth_url,
insecure=dscv)
class OfficialClientTest(tempest.test.BaseTestCase):
"""
Official Client test base class for scenario testing.
Official Client tests are tests that have the following characteristics:
* Test basic operations of an API, typically in an order that
a regular user would perform those operations
* Test only the correct inputs and action paths -- no fuzz or
random input data is sent, only valid inputs.
* Use only the default client tool for calling an API
"""
@classmethod
def setUpClass(cls):
super(OfficialClientTest, cls).setUpClass()
cls.isolated_creds = isolated_creds.IsolatedCreds(
__name__, tempest_client=False)
if cls.config.compute.allow_tenant_isolation:
creds = cls.isolated_creds.get_primary_creds()
username, tenant_name, password = creds
else:
username = cls.config.identity.username
password = cls.config.identity.password
tenant_name = cls.config.identity.tenant_name
cls.manager = OfficialClientManager(username, password, tenant_name)
cls.compute_client = cls.manager.compute_client
cls.image_client = cls.manager.image_client
cls.identity_client = cls.manager.identity_client
cls.network_client = cls.manager.network_client
cls.volume_client = cls.manager.volume_client
cls.resource_keys = {}
cls.os_resources = []
@classmethod
def tearDownClass(cls):
# NOTE(jaypipes): Because scenario tests are typically run in a
# specific order, and because test methods in scenario tests
# generally create resources in a particular order, we destroy
# resources in the reverse order in which resources are added to
# the scenario test class object
while cls.os_resources:
thing = cls.os_resources.pop()
LOG.debug("Deleting %r from shared resources of %s" %
(thing, cls.__name__))
try:
# OpenStack resources are assumed to have a delete()
# method which destroys the resource...
thing.delete()
except Exception as e:
# If the resource is already missing, mission accomplished.
if e.__class__.__name__ == 'NotFound':
continue
raise
def is_deletion_complete():
# Deletion testing is only required for objects whose
# existence cannot be checked via retrieval.
if isinstance(thing, dict):
return True
try:
thing.get()
except Exception as e:
# Clients are expected to return an exception
# called 'NotFound' if retrieval fails.
if e.__class__.__name__ == 'NotFound':
return True
raise
return False
# Block until resource deletion has completed or timed-out
tempest.test.call_until_true(is_deletion_complete, 10, 1)
cls.isolated_creds.clear_isolated_creds()
super(OfficialClientTest, cls).tearDownClass()
@classmethod
def set_resource(cls, key, thing):
LOG.debug("Adding %r to shared resources of %s" %
(thing, cls.__name__))
cls.resource_keys[key] = thing
cls.os_resources.append(thing)
@classmethod
def get_resource(cls, key):
return cls.resource_keys[key]
@classmethod
def remove_resource(cls, key):
thing = cls.resource_keys[key]
cls.os_resources.remove(thing)
del cls.resource_keys[key]
def status_timeout(self, things, thing_id, expected_status):
"""
Given a thing and an expected status, do a loop, sleeping
for a configurable amount of time, checking for the
expected status to show. At any time, if the returned
status of the thing is ERROR, fail out.
"""
def check_status():
# python-novaclient has resources available to its client
# that all implement a get() method taking an identifier
# for the singular resource to retrieve.
thing = things.get(thing_id)
new_status = thing.status
if new_status == 'ERROR':
self.fail("%s failed to get to expected status. "
"In ERROR state."
% thing)
elif new_status == expected_status:
return True # All good.
LOG.debug("Waiting for %s to get to %s status. "
"Currently in %s status",
thing, expected_status, new_status)
if not tempest.test.call_until_true(
check_status,
self.config.compute.build_timeout,
self.config.compute.build_interval):
self.fail("Timed out waiting for thing %s to become %s"
% (thing_id, expected_status))
def create_server(self, client, name=None, image=None, flavor=None,
create_kwargs={}):
if name is None:
name = rand_name('scenario-server-')
if image is None:
image = self.config.compute.image_ref
if flavor is None:
flavor = self.config.compute.flavor_ref
LOG.debug("Creating a server (name: %s, image: %s, flavor: %s)",
name, image, flavor)
server = client.servers.create(name, image, flavor, **create_kwargs)
try:
self.assertEqual(server.name, name)
self.set_resource(name, server)
except AttributeError:
self.fail("Server not successfully created.")
self.status_timeout(client.servers, server.id, 'ACTIVE')
# The instance retrieved on creation is missing network
# details, necessitating retrieval after it becomes active to
# ensure correct details.
server = client.servers.get(server.id)
self.set_resource(name, server)
LOG.debug("Created server: %s", server)
return server
class NetworkScenarioTest(OfficialClientTest):
"""
Base class for network scenario tests
"""
@classmethod
def check_preconditions(cls):
if (cls.config.service_available.neutron):
cls.enabled = True
#verify that neutron_available is telling the truth
try:
cls.network_client.list_networks()
except exc.EndpointNotFound:
cls.enabled = False
raise
else:
cls.enabled = False
msg = 'Neutron not available'
raise cls.skipException(msg)
@classmethod
def setUpClass(cls):
super(NetworkScenarioTest, cls).setUpClass()
cls.tenant_id = cls.manager._get_identity_client(
cls.config.identity.username,
cls.config.identity.password,
cls.config.identity.tenant_name).tenant_id
def _create_keypair(self, client, namestart='keypair-smoke-'):
kp_name = rand_name(namestart)
keypair = client.keypairs.create(kp_name)
try:
self.assertEqual(keypair.id, kp_name)
self.set_resource(kp_name, keypair)
except AttributeError:
self.fail("Keypair object not successfully created.")
return keypair
def _create_security_group(self, client, namestart='secgroup-smoke-'):
# Create security group
sg_name = rand_name(namestart)
sg_desc = sg_name + " description"
secgroup = client.security_groups.create(sg_name, sg_desc)
try:
self.assertEqual(secgroup.name, sg_name)
self.assertEqual(secgroup.description, sg_desc)
self.set_resource(sg_name, secgroup)
except AttributeError:
self.fail("SecurityGroup object not successfully created.")
# Add rules to the security group
# These rules are intended to permit inbound ssh and icmp
# traffic from all sources, so no group_id is provided.
# Setting a group_id would only permit traffic from ports
# belonging to the same security group.
rulesets = [
{
# ssh
'ip_protocol': 'tcp',
'from_port': 22,
'to_port': 22,
'cidr': '0.0.0.0/0',
},
{
# ping
'ip_protocol': 'icmp',
'from_port': -1,
'to_port': -1,
'cidr': '0.0.0.0/0',
}
]
for ruleset in rulesets:
try:
client.security_group_rules.create(secgroup.id, **ruleset)
except Exception:
self.fail("Failed to create rule in security group.")
return secgroup
def _create_network(self, tenant_id, namestart='network-smoke-'):
name = rand_name(namestart)
body = dict(
network=dict(
name=name,
tenant_id=tenant_id,
),
)
result = self.network_client.create_network(body=body)
network = net_common.DeletableNetwork(client=self.network_client,
**result['network'])
self.assertEqual(network.name, name)
self.set_resource(name, network)
return network
def _list_networks(self):
nets = self.network_client.list_networks()
return nets['networks']
def _list_subnets(self):
subnets = self.network_client.list_subnets()
return subnets['subnets']
def _list_routers(self):
routers = self.network_client.list_routers()
return routers['routers']
def _create_subnet(self, network, namestart='subnet-smoke-'):
"""
Create a subnet for the given network within the cidr block
configured for tenant networks.
"""
cfg = self.config.network
tenant_cidr = netaddr.IPNetwork(cfg.tenant_network_cidr)
result = None
# Repeatedly attempt subnet creation with sequential cidr
# blocks until an unallocated block is found.
for subnet_cidr in tenant_cidr.subnet(cfg.tenant_network_mask_bits):
body = dict(
subnet=dict(
ip_version=4,
network_id=network.id,
tenant_id=network.tenant_id,
cidr=str(subnet_cidr),
),
)
try:
result = self.network_client.create_subnet(body=body)
break
except exc.NeutronClientException as e:
is_overlapping_cidr = 'overlaps with another subnet' in str(e)
if not is_overlapping_cidr:
raise
self.assertIsNotNone(result, 'Unable to allocate tenant network')
subnet = net_common.DeletableSubnet(client=self.network_client,
**result['subnet'])
self.assertEqual(subnet.cidr, str(subnet_cidr))
self.set_resource(rand_name(namestart), subnet)
return subnet
def _create_port(self, network, namestart='port-quotatest-'):
name = rand_name(namestart)
body = dict(
port=dict(name=name,
network_id=network.id,
tenant_id=network.tenant_id))
result = self.network_client.create_port(body=body)
self.assertIsNotNone(result, 'Unable to allocate port')
port = net_common.DeletablePort(client=self.network_client,
**result['port'])
self.set_resource(name, port)
return port
def _create_floating_ip(self, server, external_network_id):
result = self.network_client.list_ports(device_id=server.id)
ports = result.get('ports', [])
self.assertEqual(len(ports), 1,
"Unable to determine which port to target.")
port_id = ports[0]['id']
body = dict(
floatingip=dict(
floating_network_id=external_network_id,
port_id=port_id,
tenant_id=server.tenant_id,
)
)
result = self.network_client.create_floatingip(body=body)
floating_ip = net_common.DeletableFloatingIp(
client=self.network_client,
**result['floatingip'])
self.set_resource(rand_name('floatingip-'), floating_ip)
return floating_ip
def _ping_ip_address(self, ip_address):
cmd = ['ping', '-c1', '-w1', ip_address]
def ping():
proc = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
proc.wait()
if proc.returncode == 0:
return True
return tempest.test.call_until_true(
ping, self.config.compute.ping_timeout, 1)
def _is_reachable_via_ssh(self, ip_address, username, private_key,
timeout):
ssh_client = ssh.Client(ip_address, username,
pkey=private_key,
timeout=timeout)
return ssh_client.test_connection_auth()
def _check_vm_connectivity(self, ip_address, username, private_key):
self.assertTrue(self._ping_ip_address(ip_address),
"Timed out waiting for %s to become "
"reachable" % ip_address)
self.assertTrue(self._is_reachable_via_ssh(
ip_address,
username,
private_key,
timeout=self.config.compute.ssh_timeout),
'Auth failure in connecting to %s@%s via ssh' %
(username, ip_address))