blob: 6cbaa8b38573d19182cb97916c0c1a1f19ddf7c4 [file] [log] [blame]
# Copyright 2012 OpenStack Foundation
# Copyright 2013 IBM Corp.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import netaddr
from oslo_log import log
from oslo_utils import uuidutils
from tempest.common import image as common_image
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib.common.utils import test_utils
from tempest.lib import exceptions as lib_exc
from tempest.scenario import manager
CONF = config.CONF
LOG = log.getLogger(__name__)
class ScenarioTest(manager.NetworkScenarioTest):
"""Base class for scenario tests. Uses tempest own clients. """
credentials = ['primary']
@classmethod
def setup_clients(cls):
super(ScenarioTest, cls).setup_clients()
# Clients (in alphabetical order)
cls.flavors_client = cls.os_primary.flavors_client
cls.compute_floating_ips_client = (
cls.os_primary.compute_floating_ips_client)
if CONF.service_available.glance:
# Check if glance v1 is available to determine which client to use.
if CONF.image_feature_enabled.api_v1:
cls.image_client = cls.os_primary.image_client
elif CONF.image_feature_enabled.api_v2:
cls.image_client = cls.os_primary.image_client_v2
else:
raise lib_exc.InvalidConfiguration(
'Either api_v1 or api_v2 must be True in '
'[image-feature-enabled].')
# Compute image client
cls.compute_images_client = cls.os_primary.compute_images_client
cls.keypairs_client = cls.os_primary.keypairs_client
# Nova security groups client
cls.compute_security_groups_client = (
cls.os_primary.compute_security_groups_client)
cls.compute_security_group_rules_client = (
cls.os_primary.compute_security_group_rules_client)
cls.servers_client = cls.os_primary.servers_client
cls.interface_client = cls.os_primary.interfaces_client
# Neutron network client
cls.networks_client = cls.os_primary.networks_client
cls.ports_client = cls.os_primary.ports_client
cls.routers_client = cls.os_primary.routers_client
cls.subnets_client = cls.os_primary.subnets_client
cls.floating_ips_client = cls.os_primary.floating_ips_client
cls.security_groups_client = cls.os_primary.security_groups_client
cls.security_group_rules_client = (
cls.os_primary.security_group_rules_client)
# ## Test functions library
#
# The create_[resource] functions only return body and discard the
# resp part which is not used in scenario tests
def _image_create(self, name, fmt, path,
disk_format=None, properties=None):
if properties is None:
properties = {}
name = data_utils.rand_name('%s-' % name)
params = {
'name': name,
'container_format': fmt,
'disk_format': disk_format or fmt,
}
if CONF.image_feature_enabled.api_v1:
params['is_public'] = 'False'
params['properties'] = properties
params = {'headers': common_image.image_meta_to_headers(**params)}
else:
params['visibility'] = 'private'
# Additional properties are flattened out in the v2 API.
params.update(properties)
body = self.image_client.create_image(**params)
image = body['image'] if 'image' in body else body
self.addCleanup(self.image_client.delete_image, image['id'])
self.assertEqual("queued", image['status'])
with open(path, 'rb') as image_file:
if CONF.image_feature_enabled.api_v1:
self.image_client.update_image(image['id'], data=image_file)
else:
self.image_client.store_image_file(image['id'], image_file)
return image['id']
def glance_image_create(self):
img_path = CONF.scenario.img_file
img_container_format = CONF.scenario.img_container_format
img_disk_format = CONF.scenario.img_disk_format
img_properties = CONF.scenario.img_properties
LOG.debug("paths: img: %s, container_format: %s, disk_format: %s, "
"properties: %s",
img_path, img_container_format, img_disk_format,
img_properties)
image = self._image_create('scenario-img',
img_container_format,
img_path,
disk_format=img_disk_format,
properties=img_properties)
LOG.debug("image:%s", image)
return image
def _log_net_info(self, exc):
# network debug is called as part of ssh init
if not isinstance(exc, lib_exc.SSHTimeout):
LOG.debug('Network information on a devstack host')
class NetworkScenarioTest(ScenarioTest):
"""Base class for network scenario tests.
This class provide helpers for network scenario tests, using the neutron
API. Helpers from ancestor which use the nova network API are overridden
with the neutron API.
This Class also enforces using Neutron instead of novanetwork.
Subclassed tests will be skipped if Neutron is not enabled
"""
credentials = ['primary', 'admin']
@classmethod
def skip_checks(cls):
super(NetworkScenarioTest, cls).skip_checks()
if not CONF.service_available.neutron:
raise cls.skipException('Neutron not available')
def _create_subnet(self, network, subnets_client=None,
routers_client=None, namestart='subnet-smoke',
**kwargs):
"""Create a subnet for the given network
within the cidr block configured for tenant networks.
"""
if not subnets_client:
subnets_client = self.subnets_client
if not routers_client:
routers_client = self.routers_client
def cidr_in_use(cidr, tenant_id):
"""Check cidr existence
:returns: True if subnet with cidr already exist in tenant
False else
"""
cidr_in_use = self.os_admin.subnets_client.list_subnets(
tenant_id=tenant_id, cidr=cidr)['subnets']
return len(cidr_in_use) != 0
def _make_create_subnet_request(namestart, network,
ip_version, subnets_client, **kwargs):
subnet = dict(
name=data_utils.rand_name(namestart),
network_id=network['id'],
tenant_id=network['tenant_id'],
ip_version=ip_version,
**kwargs
)
if ip_version == 6:
subnet['ipv6_address_mode'] = 'slaac'
subnet['ipv6_ra_mode'] = 'slaac'
try:
return subnets_client.create_subnet(**subnet)
except lib_exc.Conflict as e:
if 'overlaps with another subnet' not in str(e):
raise
result = None
str_cidr = None
use_default_subnetpool = kwargs.get('use_default_subnetpool', False)
ip_version = kwargs.pop('ip_version', 4)
if not use_default_subnetpool:
if ip_version == 6:
tenant_cidr = netaddr.IPNetwork(
CONF.network.project_network_v6_cidr)
num_bits = CONF.network.project_network_v6_mask_bits
else:
tenant_cidr = netaddr.IPNetwork(
CONF.network.project_network_cidr)
num_bits = CONF.network.project_network_mask_bits
# Repeatedly attempt subnet creation with sequential cidr
# blocks until an unallocated block is found.
for subnet_cidr in tenant_cidr.subnet(num_bits):
str_cidr = str(subnet_cidr)
if cidr_in_use(str_cidr, tenant_id=network['tenant_id']):
continue
result = _make_create_subnet_request(
namestart, network, ip_version, subnets_client,
cidr=str_cidr, **kwargs)
if result is not None:
break
else:
result = _make_create_subnet_request(
namestart, network, ip_version, subnets_client,
**kwargs)
self.assertIsNotNone(result)
subnet = result['subnet']
if str_cidr is not None:
self.assertEqual(subnet['cidr'], str_cidr)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
subnets_client.delete_subnet, subnet['id'])
return subnet
def _get_network_by_name_or_id(self, identifier):
if uuidutils.is_uuid_like(identifier):
return self.os_admin.networks_client.show_network(
identifier)['network']
networks = self.os_admin.networks_client.list_networks(
name=identifier)['networks']
self.assertNotEqual(len(networks), 0,
"Unable to get network by name: %s" % identifier)
return networks[0]
def get_networks(self):
return self.os_admin.networks_client.list_networks()['networks']
def create_floating_ip(self, thing, external_network_id=None,
port_id=None, ip_addr=None, client=None):
"""Create a floating IP and associates to a resource/port on Neutron"""
if not external_network_id:
external_network_id = CONF.network.public_network_id
if not client:
client = self.floating_ips_client
if not port_id:
port_id, ip4 = self.get_server_port_id_and_ip4(thing,
ip_addr=ip_addr)
else:
ip4 = None
result = client.create_floatingip(
floating_network_id=external_network_id,
port_id=port_id,
tenant_id=thing['tenant_id'],
fixed_ip_address=ip4
)
floating_ip = result['floatingip']
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
client.delete_floatingip,
floating_ip['id'])
return floating_ip
def _check_tenant_network_connectivity(self, server,
username,
private_key,
should_connect=True,
servers_for_debug=None):
if not CONF.network.project_networks_reachable:
msg = 'Tenant networks not configured to be reachable.'
LOG.info(msg)
return
# The target login is assumed to have been configured for
# key-based authentication by cloud-init.
try:
for ip_addresses in server['addresses'].values():
for ip_address in ip_addresses:
self.check_vm_connectivity(ip_address['addr'],
username,
private_key,
should_connect=should_connect,
server=server)
except Exception as e:
LOG.exception('Tenant network connectivity check failed')
self.log_console_output(servers_for_debug)
self._log_net_info(e)
raise
def _check_remote_connectivity(self, source, dest, should_succeed=True,
nic=None):
"""assert ping server via source ssh connection
Note: This is an internal method. Use check_remote_connectivity
instead.
:param source: RemoteClient: an ssh connection from which to ping
:param dest: and IP to ping against
:param should_succeed: boolean should ping succeed or not
:param nic: specific network interface to ping from
"""
def ping_remote():
try:
source.ping_host(dest, nic=nic)
except lib_exc.SSHExecCommandFailed:
LOG.warning('Failed to ping IP: %s via a ssh connection '
'from: %s.', dest, source.ssh_client.host)
return not should_succeed
return should_succeed
return test_utils.call_until_true(ping_remote,
CONF.validation.ping_timeout,
1)
def check_remote_connectivity(self, source, dest, should_succeed=True,
nic=None):
"""assert ping server via source ssh connection
:param source: RemoteClient: an ssh connection from which to ping
:param dest: and IP to ping against
:param should_succeed: boolean should ping succeed or not
:param nic: specific network interface to ping from
"""
result = self._check_remote_connectivity(source, dest, should_succeed,
nic)
source_host = source.ssh_client.host
if should_succeed:
msg = ("Timed out waiting for %s to become reachable from %s"
% (dest, source_host))
else:
msg = "%s is reachable from %s" % (dest, source_host)
self.assertTrue(result, msg)
def _create_security_group(self, security_group_rules_client=None,
tenant_id=None,
namestart='secgroup-smoke',
security_groups_client=None):
if security_group_rules_client is None:
security_group_rules_client = self.security_group_rules_client
if security_groups_client is None:
security_groups_client = self.security_groups_client
if tenant_id is None:
tenant_id = security_groups_client.tenant_id
secgroup = self._create_empty_security_group(
namestart=namestart, client=security_groups_client,
tenant_id=tenant_id)
# Add rules to the security group
rules = self._create_loginable_secgroup_rule(
security_group_rules_client=security_group_rules_client,
secgroup=secgroup,
security_groups_client=security_groups_client)
for rule in rules:
self.assertEqual(tenant_id, rule['tenant_id'])
self.assertEqual(secgroup['id'], rule['security_group_id'])
return secgroup
def _create_empty_security_group(self, client=None, tenant_id=None,
namestart='secgroup-smoke'):
"""Create a security group without rules.
Default rules will be created:
- IPv4 egress to any
- IPv6 egress to any
:param tenant_id: secgroup will be created in this tenant
:returns: the created security group
"""
if client is None:
client = self.security_groups_client
if not tenant_id:
tenant_id = client.tenant_id
sg_name = data_utils.rand_name(namestart)
sg_desc = sg_name + " description"
sg_dict = dict(name=sg_name,
description=sg_desc)
sg_dict['tenant_id'] = tenant_id
result = client.create_security_group(**sg_dict)
secgroup = result['security_group']
self.assertEqual(secgroup['name'], sg_name)
self.assertEqual(tenant_id, secgroup['tenant_id'])
self.assertEqual(secgroup['description'], sg_desc)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
client.delete_security_group, secgroup['id'])
return secgroup
def _default_security_group(self, client=None, tenant_id=None):
"""Get default secgroup for given tenant_id.
:returns: default secgroup for given tenant
"""
if client is None:
client = self.security_groups_client
if not tenant_id:
tenant_id = client.tenant_id
sgs = [
sg for sg in list(client.list_security_groups().values())[0]
if sg['tenant_id'] == tenant_id and sg['name'] == 'default'
]
msg = "No default security group for tenant %s." % (tenant_id)
self.assertGreater(len(sgs), 0, msg)
return sgs[0]
def _create_security_group_rule(self, secgroup=None,
sec_group_rules_client=None,
tenant_id=None,
security_groups_client=None, **kwargs):
"""Create a rule from a dictionary of rule parameters.
Create a rule in a secgroup. if secgroup not defined will search for
default secgroup in tenant_id.
:param secgroup: the security group.
:param tenant_id: if secgroup not passed -- the tenant in which to
search for default secgroup
:param kwargs: a dictionary containing rule parameters:
for example, to allow incoming ssh:
rule = {
direction: 'ingress'
protocol:'tcp',
port_range_min: 22,
port_range_max: 22
}
"""
if sec_group_rules_client is None:
sec_group_rules_client = self.security_group_rules_client
if security_groups_client is None:
security_groups_client = self.security_groups_client
if not tenant_id:
tenant_id = security_groups_client.tenant_id
if secgroup is None:
secgroup = self._default_security_group(
client=security_groups_client, tenant_id=tenant_id)
ruleset = dict(security_group_id=secgroup['id'],
tenant_id=secgroup['tenant_id'])
ruleset.update(kwargs)
sg_rule = sec_group_rules_client.create_security_group_rule(**ruleset)
sg_rule = sg_rule['security_group_rule']
self.assertEqual(secgroup['tenant_id'], sg_rule['tenant_id'])
self.assertEqual(secgroup['id'], sg_rule['security_group_id'])
return sg_rule
def _create_loginable_secgroup_rule(self, security_group_rules_client=None,
secgroup=None,
security_groups_client=None):
"""Create loginable security group rule
This function will create:
1. egress and ingress tcp port 22 allow rule in order to allow ssh
access for ipv4.
2. egress and ingress ipv6 icmp allow rule, in order to allow icmpv6.
3. egress and ingress ipv4 icmp allow rule, in order to allow icmpv4.
"""
if security_group_rules_client is None:
security_group_rules_client = self.security_group_rules_client
if security_groups_client is None:
security_groups_client = self.security_groups_client
rules = []
rulesets = [
dict(
# ssh
protocol='tcp',
port_range_min=22,
port_range_max=22,
),
dict(
# ipv6-ssh
protocol='tcp',
port_range_min=22,
port_range_max=22,
ethertype='IPv6',
),
dict(
# ping
protocol='icmp',
),
dict(
# ipv6-icmp for ping6
protocol='icmp',
ethertype='IPv6',
)
]
sec_group_rules_client = security_group_rules_client
for ruleset in rulesets:
for r_direction in ['ingress', 'egress']:
ruleset['direction'] = r_direction
try:
sg_rule = self._create_security_group_rule(
sec_group_rules_client=sec_group_rules_client,
secgroup=secgroup,
security_groups_client=security_groups_client,
**ruleset)
except lib_exc.Conflict as ex:
# if rule already exist - skip rule and continue
msg = 'Security group rule already exists'
if msg not in ex._error_string:
raise ex
else:
self.assertEqual(r_direction, sg_rule['direction'])
rules.append(sg_rule)
return rules
def _get_router(self, client=None, tenant_id=None):
"""Retrieve a router for the given tenant id.
If a public router has been configured, it will be returned.
If a public router has not been configured, but a public
network has, a tenant router will be created and returned that
routes traffic to the public network.
"""
if not client:
client = self.routers_client
if not tenant_id:
tenant_id = client.tenant_id
router_id = CONF.network.public_router_id
network_id = CONF.network.public_network_id
if router_id:
body = client.show_router(router_id)
return body['router']
elif network_id:
router = self._create_router(client, tenant_id)
kwargs = {'external_gateway_info': dict(network_id=network_id)}
router = client.update_router(router['id'], **kwargs)['router']
return router
else:
raise Exception("Neither of 'public_router_id' or "
"'public_network_id' has been defined.")
def _create_router(self, client=None, tenant_id=None,
namestart='router-smoke'):
if not client:
client = self.routers_client
if not tenant_id:
tenant_id = client.tenant_id
name = data_utils.rand_name(namestart)
result = client.create_router(name=name,
admin_state_up=True,
tenant_id=tenant_id)
router = result['router']
self.assertEqual(router['name'], name)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
client.delete_router,
router['id'])
return router
def _update_router_admin_state(self, router, admin_state_up):
kwargs = dict(admin_state_up=admin_state_up)
router = self.routers_client.update_router(
router['id'], **kwargs)['router']
self.assertEqual(admin_state_up, router['admin_state_up'])
def create_networks(self, networks_client=None,
routers_client=None, subnets_client=None,
tenant_id=None, dns_nameservers=None,
port_security_enabled=True):
"""Create a network with a subnet connected to a router.
The baremetal driver is a special case since all nodes are
on the same shared network.
:param tenant_id: id of tenant to create resources in.
:param dns_nameservers: list of dns servers to send to subnet.
:returns: network, subnet, router
"""
if CONF.network.shared_physical_network:
# NOTE(Shrews): This exception is for environments where tenant
# credential isolation is available, but network separation is
# not (the current baremetal case). Likely can be removed when
# test account mgmt is reworked:
# https://blueprints.launchpad.net/tempest/+spec/test-accounts
if not CONF.compute.fixed_network_name:
m = 'fixed_network_name must be specified in config'
raise lib_exc.InvalidConfiguration(m)
network = self._get_network_by_name_or_id(
CONF.compute.fixed_network_name)
router = None
subnet = None
else:
network = self.create_network(
networks_client=networks_client,
tenant_id=tenant_id,
port_security_enabled=port_security_enabled)
router = self._get_router(client=routers_client,
tenant_id=tenant_id)
subnet_kwargs = dict(network=network,
subnets_client=subnets_client,
routers_client=routers_client)
# use explicit check because empty list is a valid option
if dns_nameservers is not None:
subnet_kwargs['dns_nameservers'] = dns_nameservers
subnet = self._create_subnet(**subnet_kwargs)
if not routers_client:
routers_client = self.routers_client
router_id = router['id']
routers_client.add_router_interface(router_id,
subnet_id=subnet['id'])
# save a cleanup job to remove this association between
# router and subnet
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
routers_client.remove_router_interface, router_id,
subnet_id=subnet['id'])
return network, subnet, router