blob: 5c67d68984199af7483e6475f015efd343ae8dc7 [file] [log] [blame]
# Copyright 2012 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import netaddr
from tempest.common.utils import data_utils
from tempest import config
from tempest import exceptions
from tempest.lib.common.utils import test_utils
from tempest.lib import exceptions as lib_exc
import tempest.test
CONF = config.CONF
class BaseNetworkTest(tempest.test.BaseTestCase):
"""Base class for the Neutron tests
Per the Neutron API Guide, API v1.x was removed from the source code tree
(docs.openstack.org/api/openstack-network/2.0/content/Overview-d1e71.html)
Therefore, v2.x of the Neutron API is assumed. It is also assumed that the
following options are defined in the [network] section of etc/tempest.conf:
project_network_cidr with a block of cidr's from which smaller blocks
can be allocated for project networks
project_network_mask_bits with the mask bits to be used to partition
the block defined by project-network_cidr
Finally, it is assumed that the following option is defined in the
[service_available] section of etc/tempest.conf
neutron as True
"""
force_tenant_isolation = False
credentials = ['primary']
# Default to ipv4.
_ip_version = 4
@classmethod
def skip_checks(cls):
super(BaseNetworkTest, cls).skip_checks()
if not CONF.service_available.neutron:
raise cls.skipException("Neutron support is required")
if cls._ip_version == 6 and not CONF.network_feature_enabled.ipv6:
raise cls.skipException("IPv6 Tests are disabled.")
@classmethod
def setup_credentials(cls):
# Create no network resources for these test.
cls.set_network_resources()
super(BaseNetworkTest, cls).setup_credentials()
@classmethod
def setup_clients(cls):
super(BaseNetworkTest, cls).setup_clients()
cls.agents_client = cls.os.network_agents_client
cls.network_extensions_client = cls.os.network_extensions_client
cls.networks_client = cls.os.networks_client
cls.routers_client = cls.os.routers_client
cls.subnetpools_client = cls.os.subnetpools_client
cls.subnets_client = cls.os.subnets_client
cls.ports_client = cls.os.ports_client
cls.quotas_client = cls.os.network_quotas_client
cls.floating_ips_client = cls.os.floating_ips_client
cls.security_groups_client = cls.os.security_groups_client
cls.security_group_rules_client = (
cls.os.security_group_rules_client)
cls.network_versions_client = cls.os.network_versions_client
@classmethod
def resource_setup(cls):
super(BaseNetworkTest, cls).resource_setup()
cls.networks = []
cls.subnets = []
cls.ports = []
cls.routers = []
cls.floating_ips = []
cls.metering_labels = []
cls.metering_label_rules = []
cls.ethertype = "IPv" + str(cls._ip_version)
@classmethod
def resource_cleanup(cls):
if CONF.service_available.neutron:
# Clean up floating IPs
for floating_ip in cls.floating_ips:
test_utils.call_and_ignore_notfound_exc(
cls.floating_ips_client.delete_floatingip,
floating_ip['id'])
# Clean up metering label rules
# Not all classes in the hierarchy have the client class variable
if len(cls.metering_label_rules) > 0:
label_rules_client = cls.admin_metering_label_rules_client
for metering_label_rule in cls.metering_label_rules:
test_utils.call_and_ignore_notfound_exc(
label_rules_client.delete_metering_label_rule,
metering_label_rule['id'])
# Clean up metering labels
for metering_label in cls.metering_labels:
test_utils.call_and_ignore_notfound_exc(
cls.admin_metering_labels_client.delete_metering_label,
metering_label['id'])
# Clean up ports
for port in cls.ports:
test_utils.call_and_ignore_notfound_exc(
cls.ports_client.delete_port, port['id'])
# Clean up routers
for router in cls.routers:
test_utils.call_and_ignore_notfound_exc(
cls.delete_router, router)
# Clean up subnets
for subnet in cls.subnets:
test_utils.call_and_ignore_notfound_exc(
cls.subnets_client.delete_subnet, subnet['id'])
# Clean up networks
for network in cls.networks:
test_utils.call_and_ignore_notfound_exc(
cls.networks_client.delete_network, network['id'])
super(BaseNetworkTest, cls).resource_cleanup()
@classmethod
def create_network(cls, network_name=None):
"""Wrapper utility that returns a test network."""
network_name = network_name or data_utils.rand_name('test-network-')
body = cls.networks_client.create_network(name=network_name)
network = body['network']
cls.networks.append(network)
return network
@classmethod
def create_subnet(cls, network, gateway='', cidr=None, mask_bits=None,
ip_version=None, client=None, **kwargs):
"""Wrapper utility that returns a test subnet."""
# allow tests to use admin client
if not client:
client = cls.subnets_client
# The cidr and mask_bits depend on the ip version.
ip_version = ip_version if ip_version is not None else cls._ip_version
gateway_not_set = gateway == ''
if ip_version == 4:
cidr = cidr or netaddr.IPNetwork(CONF.network.project_network_cidr)
mask_bits = mask_bits or CONF.network.project_network_mask_bits
elif ip_version == 6:
cidr = (cidr or
netaddr.IPNetwork(CONF.network.project_network_v6_cidr))
mask_bits = mask_bits or CONF.network.project_network_v6_mask_bits
# Find a cidr that is not in use yet and create a subnet with it
for subnet_cidr in cidr.subnet(mask_bits):
if gateway_not_set:
gateway_ip = str(netaddr.IPAddress(subnet_cidr) + 1)
else:
gateway_ip = gateway
try:
body = client.create_subnet(
network_id=network['id'],
cidr=str(subnet_cidr),
ip_version=ip_version,
gateway_ip=gateway_ip,
**kwargs)
break
except lib_exc.BadRequest as e:
is_overlapping_cidr = 'overlaps with another subnet' in str(e)
if not is_overlapping_cidr:
raise
else:
message = 'Available CIDR for subnet creation could not be found'
raise exceptions.BuildErrorException(message)
subnet = body['subnet']
cls.subnets.append(subnet)
return subnet
@classmethod
def create_port(cls, network, **kwargs):
"""Wrapper utility that returns a test port."""
body = cls.ports_client.create_port(network_id=network['id'],
**kwargs)
port = body['port']
cls.ports.append(port)
return port
@classmethod
def update_port(cls, port, **kwargs):
"""Wrapper utility that updates a test port."""
body = cls.ports_client.update_port(port['id'],
**kwargs)
return body['port']
@classmethod
def create_router(cls, router_name=None, admin_state_up=False,
external_network_id=None, enable_snat=None,
**kwargs):
ext_gw_info = {}
if external_network_id:
ext_gw_info['network_id'] = external_network_id
if enable_snat is not None:
ext_gw_info['enable_snat'] = enable_snat
body = cls.routers_client.create_router(
name=router_name, external_gateway_info=ext_gw_info,
admin_state_up=admin_state_up, **kwargs)
router = body['router']
cls.routers.append(router)
return router
@classmethod
def create_floatingip(cls, external_network_id):
"""Wrapper utility that returns a test floating IP."""
body = cls.floating_ips_client.create_floatingip(
floating_network_id=external_network_id)
fip = body['floatingip']
cls.floating_ips.append(fip)
return fip
@classmethod
def create_router_interface(cls, router_id, subnet_id):
"""Wrapper utility that returns a router interface."""
interface = cls.routers_client.add_router_interface(
router_id, subnet_id=subnet_id)
return interface
@classmethod
def delete_router(cls, router):
body = cls.ports_client.list_ports(device_id=router['id'])
interfaces = body['ports']
for i in interfaces:
test_utils.call_and_ignore_notfound_exc(
cls.routers_client.remove_router_interface, router['id'],
subnet_id=i['fixed_ips'][0]['subnet_id'])
cls.routers_client.delete_router(router['id'])
class BaseAdminNetworkTest(BaseNetworkTest):
credentials = ['primary', 'admin']
@classmethod
def setup_clients(cls):
super(BaseAdminNetworkTest, cls).setup_clients()
cls.admin_agents_client = cls.os_adm.network_agents_client
cls.admin_networks_client = cls.os_adm.networks_client
cls.admin_routers_client = cls.os_adm.routers_client
cls.admin_subnets_client = cls.os_adm.subnets_client
cls.admin_ports_client = cls.os_adm.ports_client
cls.admin_quotas_client = cls.os_adm.network_quotas_client
cls.admin_floating_ips_client = cls.os_adm.floating_ips_client
cls.admin_metering_labels_client = cls.os_adm.metering_labels_client
cls.admin_metering_label_rules_client = (
cls.os_adm.metering_label_rules_client)
@classmethod
def create_metering_label(cls, name, description):
"""Wrapper utility that returns a test metering label."""
body = cls.admin_metering_labels_client.create_metering_label(
description=description,
name=name)
metering_label = body['metering_label']
cls.metering_labels.append(metering_label)
return metering_label
@classmethod
def create_metering_label_rule(cls, remote_ip_prefix, direction,
metering_label_id):
"""Wrapper utility that returns a test metering label rule."""
client = cls.admin_metering_label_rules_client
body = client.create_metering_label_rule(
remote_ip_prefix=remote_ip_prefix, direction=direction,
metering_label_id=metering_label_id)
metering_label_rule = body['metering_label_rule']
cls.metering_label_rules.append(metering_label_rule)
return metering_label_rule