blob: 30a94fb9829b0d7518af56f25bed39f79952b3ff [file] [log] [blame]
# Copyright 2012 OpenStack Foundation
# Copyright 2013 IBM Corp.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log
from oslo_utils import uuidutils
from tempest.common import image as common_image
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib.common.utils import test_utils
from tempest.lib import exceptions as lib_exc
from tempest.scenario import manager
CONF = config.CONF
LOG = log.getLogger(__name__)
class ScenarioTest(manager.NetworkScenarioTest):
"""Base class for scenario tests. Uses tempest own clients. """
credentials = ['primary']
@classmethod
def setup_clients(cls):
super(ScenarioTest, cls).setup_clients()
# Clients (in alphabetical order)
cls.flavors_client = cls.os_primary.flavors_client
cls.compute_floating_ips_client = (
cls.os_primary.compute_floating_ips_client)
if CONF.service_available.glance:
# Check if glance v1 is available to determine which client to use.
if CONF.image_feature_enabled.api_v1:
cls.image_client = cls.os_primary.image_client
elif CONF.image_feature_enabled.api_v2:
cls.image_client = cls.os_primary.image_client_v2
else:
raise lib_exc.InvalidConfiguration(
'Either api_v1 or api_v2 must be True in '
'[image-feature-enabled].')
# Compute image client
cls.compute_images_client = cls.os_primary.compute_images_client
cls.keypairs_client = cls.os_primary.keypairs_client
# Nova security groups client
cls.compute_security_groups_client = (
cls.os_primary.compute_security_groups_client)
cls.compute_security_group_rules_client = (
cls.os_primary.compute_security_group_rules_client)
cls.servers_client = cls.os_primary.servers_client
cls.interface_client = cls.os_primary.interfaces_client
# Neutron network client
cls.networks_client = cls.os_primary.networks_client
cls.ports_client = cls.os_primary.ports_client
cls.routers_client = cls.os_primary.routers_client
cls.subnets_client = cls.os_primary.subnets_client
cls.floating_ips_client = cls.os_primary.floating_ips_client
cls.security_groups_client = cls.os_primary.security_groups_client
cls.security_group_rules_client = (
cls.os_primary.security_group_rules_client)
# ## Test functions library
#
# The create_[resource] functions only return body and discard the
# resp part which is not used in scenario tests
def _image_create(self, name, fmt, path,
disk_format=None, properties=None):
if properties is None:
properties = {}
name = data_utils.rand_name('%s-' % name)
params = {
'name': name,
'container_format': fmt,
'disk_format': disk_format or fmt,
}
if CONF.image_feature_enabled.api_v1:
params['is_public'] = 'False'
params['properties'] = properties
params = {'headers': common_image.image_meta_to_headers(**params)}
else:
params['visibility'] = 'private'
# Additional properties are flattened out in the v2 API.
params.update(properties)
body = self.image_client.create_image(**params)
image = body['image'] if 'image' in body else body
self.addCleanup(self.image_client.delete_image, image['id'])
self.assertEqual("queued", image['status'])
with open(path, 'rb') as image_file:
if CONF.image_feature_enabled.api_v1:
self.image_client.update_image(image['id'], data=image_file)
else:
self.image_client.store_image_file(image['id'], image_file)
return image['id']
def glance_image_create(self):
img_path = CONF.scenario.img_file
img_container_format = CONF.scenario.img_container_format
img_disk_format = CONF.scenario.img_disk_format
img_properties = CONF.scenario.img_properties
LOG.debug("paths: img: %s, container_format: %s, disk_format: %s, "
"properties: %s",
img_path, img_container_format, img_disk_format,
img_properties)
image = self._image_create('scenario-img',
img_container_format,
img_path,
disk_format=img_disk_format,
properties=img_properties)
LOG.debug("image:%s", image)
return image
def _log_net_info(self, exc):
# network debug is called as part of ssh init
if not isinstance(exc, lib_exc.SSHTimeout):
LOG.debug('Network information on a devstack host')
class NetworkScenarioTest(ScenarioTest):
"""Base class for network scenario tests.
This class provide helpers for network scenario tests, using the neutron
API. Helpers from ancestor which use the nova network API are overridden
with the neutron API.
This Class also enforces using Neutron instead of novanetwork.
Subclassed tests will be skipped if Neutron is not enabled
"""
credentials = ['primary', 'admin']
@classmethod
def skip_checks(cls):
super(NetworkScenarioTest, cls).skip_checks()
if not CONF.service_available.neutron:
raise cls.skipException('Neutron not available')
def _get_network_by_name_or_id(self, identifier):
if uuidutils.is_uuid_like(identifier):
return self.os_admin.networks_client.show_network(
identifier)['network']
networks = self.os_admin.networks_client.list_networks(
name=identifier)['networks']
self.assertNotEqual(len(networks), 0,
"Unable to get network by name: %s" % identifier)
return networks[0]
def get_networks(self):
return self.os_admin.networks_client.list_networks()['networks']
def create_floating_ip(self, thing, external_network_id=None,
port_id=None, ip_addr=None, client=None):
"""Create a floating IP and associates to a resource/port on Neutron"""
if not external_network_id:
external_network_id = CONF.network.public_network_id
if not client:
client = self.floating_ips_client
if not port_id:
port_id, ip4 = self.get_server_port_id_and_ip4(thing,
ip_addr=ip_addr)
else:
ip4 = None
result = client.create_floatingip(
floating_network_id=external_network_id,
port_id=port_id,
tenant_id=thing['tenant_id'],
fixed_ip_address=ip4
)
floating_ip = result['floatingip']
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
client.delete_floatingip,
floating_ip['id'])
return floating_ip
def _create_security_group(self, security_group_rules_client=None,
tenant_id=None,
namestart='secgroup-smoke',
security_groups_client=None):
if security_group_rules_client is None:
security_group_rules_client = self.security_group_rules_client
if security_groups_client is None:
security_groups_client = self.security_groups_client
if tenant_id is None:
tenant_id = security_groups_client.tenant_id
secgroup = self._create_empty_security_group(
namestart=namestart, client=security_groups_client,
tenant_id=tenant_id)
# Add rules to the security group
rules = self.create_loginable_secgroup_rule(
security_group_rules_client=security_group_rules_client,
secgroup=secgroup,
security_groups_client=security_groups_client)
for rule in rules:
self.assertEqual(tenant_id, rule['tenant_id'])
self.assertEqual(secgroup['id'], rule['security_group_id'])
return secgroup
def _create_empty_security_group(self, client=None, tenant_id=None,
namestart='secgroup-smoke'):
"""Create a security group without rules.
Default rules will be created:
- IPv4 egress to any
- IPv6 egress to any
:param tenant_id: secgroup will be created in this tenant
:returns: the created security group
"""
if client is None:
client = self.security_groups_client
if not tenant_id:
tenant_id = client.tenant_id
sg_name = data_utils.rand_name(namestart)
sg_desc = sg_name + " description"
sg_dict = dict(name=sg_name,
description=sg_desc)
sg_dict['tenant_id'] = tenant_id
result = client.create_security_group(**sg_dict)
secgroup = result['security_group']
self.assertEqual(secgroup['name'], sg_name)
self.assertEqual(tenant_id, secgroup['tenant_id'])
self.assertEqual(secgroup['description'], sg_desc)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
client.delete_security_group, secgroup['id'])
return secgroup
def _default_security_group(self, client=None, tenant_id=None):
"""Get default secgroup for given tenant_id.
:returns: default secgroup for given tenant
"""
if client is None:
client = self.security_groups_client
if not tenant_id:
tenant_id = client.tenant_id
sgs = [
sg for sg in list(client.list_security_groups().values())[0]
if sg['tenant_id'] == tenant_id and sg['name'] == 'default'
]
msg = "No default security group for tenant %s." % (tenant_id)
self.assertGreater(len(sgs), 0, msg)
return sgs[0]
def _create_security_group_rule(self, secgroup=None,
sec_group_rules_client=None,
tenant_id=None,
security_groups_client=None, **kwargs):
"""Create a rule from a dictionary of rule parameters.
Create a rule in a secgroup. if secgroup not defined will search for
default secgroup in tenant_id.
:param secgroup: the security group.
:param tenant_id: if secgroup not passed -- the tenant in which to
search for default secgroup
:param kwargs: a dictionary containing rule parameters:
for example, to allow incoming ssh:
rule = {
direction: 'ingress'
protocol:'tcp',
port_range_min: 22,
port_range_max: 22
}
"""
if sec_group_rules_client is None:
sec_group_rules_client = self.security_group_rules_client
if security_groups_client is None:
security_groups_client = self.security_groups_client
if not tenant_id:
tenant_id = security_groups_client.tenant_id
if secgroup is None:
secgroup = self._default_security_group(
client=security_groups_client, tenant_id=tenant_id)
ruleset = dict(security_group_id=secgroup['id'],
tenant_id=secgroup['tenant_id'])
ruleset.update(kwargs)
sg_rule = sec_group_rules_client.create_security_group_rule(**ruleset)
sg_rule = sg_rule['security_group_rule']
self.assertEqual(secgroup['tenant_id'], sg_rule['tenant_id'])
self.assertEqual(secgroup['id'], sg_rule['security_group_id'])
return sg_rule
def create_loginable_secgroup_rule(self, security_group_rules_client=None,
secgroup=None,
security_groups_client=None):
"""Create loginable security group rule
This function will create:
1. egress and ingress tcp port 22 allow rule in order to allow ssh
access for ipv4.
2. egress and ingress ipv6 icmp allow rule, in order to allow icmpv6.
3. egress and ingress ipv4 icmp allow rule, in order to allow icmpv4.
"""
if security_group_rules_client is None:
security_group_rules_client = self.security_group_rules_client
if security_groups_client is None:
security_groups_client = self.security_groups_client
rules = []
rulesets = [
dict(
# ssh
protocol='tcp',
port_range_min=22,
port_range_max=22,
),
dict(
# ipv6-ssh
protocol='tcp',
port_range_min=22,
port_range_max=22,
ethertype='IPv6',
),
dict(
# ping
protocol='icmp',
),
dict(
# ipv6-icmp for ping6
protocol='icmp',
ethertype='IPv6',
)
]
sec_group_rules_client = security_group_rules_client
for ruleset in rulesets:
for r_direction in ['ingress', 'egress']:
ruleset['direction'] = r_direction
try:
sg_rule = self._create_security_group_rule(
sec_group_rules_client=sec_group_rules_client,
secgroup=secgroup,
security_groups_client=security_groups_client,
**ruleset)
except lib_exc.Conflict as ex:
# if rule already exist - skip rule and continue
msg = 'Security group rule already exists'
if msg not in ex._error_string:
raise ex
else:
self.assertEqual(r_direction, sg_rule['direction'])
rules.append(sg_rule)
return rules