blob: d9ba6dcb0de9079cee052a2f485dc4240e8a62d5 [file] [log] [blame]
# Copyright 2012 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import functools
import math
import netaddr
from tempest.lib.common.utils import data_utils
from tempest.lib import exceptions as lib_exc
from tempest import test
from neutron.tests.tempest.common import constants
from neutron.tests.tempest.common import utils
from neutron.tests.tempest.api import clients
from neutron.tests.tempest import config
from neutron.tests.tempest import exceptions
CONF = config.CONF
class BaseNetworkTest(test.BaseTestCase):
"""
Base class for the Neutron tests that use the Tempest Neutron REST client
Per the Neutron API Guide, API v1.x was removed from the source code tree
(docs.openstack.org/api/openstack-network/2.0/content/Overview-d1e71.html)
Therefore, v2.x of the Neutron API is assumed. It is also assumed that the
following options are defined in the [network] section of etc/tempest.conf:
project_network_cidr with a block of cidr's from which smaller blocks
can be allocated for tenant networks
project_network_mask_bits with the mask bits to be used to partition
the block defined by tenant-network_cidr
Finally, it is assumed that the following option is defined in the
[service_available] section of etc/tempest.conf
neutron as True
"""
force_tenant_isolation = False
credentials = ['primary']
# Default to ipv4.
_ip_version = 4
@classmethod
def get_client_manager(cls, credential_type=None, roles=None,
force_new=None):
manager = super(BaseNetworkTest, cls).get_client_manager(
credential_type=credential_type,
roles=roles,
force_new=force_new
)
# Neutron uses a different clients manager than the one in the Tempest
return clients.Manager(manager.credentials)
@classmethod
def skip_checks(cls):
super(BaseNetworkTest, cls).skip_checks()
if not CONF.service_available.neutron:
raise cls.skipException("Neutron support is required")
if cls._ip_version == 6 and not CONF.network_feature_enabled.ipv6:
raise cls.skipException("IPv6 Tests are disabled.")
for req_ext in getattr(cls, 'required_extensions', []):
if not test.is_extension_enabled(req_ext, 'network'):
msg = "%s extension not enabled." % req_ext
raise cls.skipException(msg)
@classmethod
def setup_credentials(cls):
# Create no network resources for these test.
cls.set_network_resources()
super(BaseNetworkTest, cls).setup_credentials()
@classmethod
def setup_clients(cls):
super(BaseNetworkTest, cls).setup_clients()
cls.client = cls.os_primary.network_client
@classmethod
def resource_setup(cls):
super(BaseNetworkTest, cls).resource_setup()
cls.networks = []
cls.admin_networks = []
cls.subnets = []
cls.admin_subnets = []
cls.ports = []
cls.routers = []
cls.floating_ips = []
cls.metering_labels = []
cls.service_profiles = []
cls.flavors = []
cls.metering_label_rules = []
cls.qos_rules = []
cls.qos_policies = []
cls.ethertype = "IPv" + str(cls._ip_version)
cls.address_scopes = []
cls.admin_address_scopes = []
cls.subnetpools = []
cls.admin_subnetpools = []
cls.security_groups = []
@classmethod
def resource_cleanup(cls):
if CONF.service_available.neutron:
# Clean up floating IPs
for floating_ip in cls.floating_ips:
cls._try_delete_resource(cls.client.delete_floatingip,
floating_ip['id'])
# Clean up routers
for router in cls.routers:
cls._try_delete_resource(cls.delete_router,
router)
# Clean up metering label rules
for metering_label_rule in cls.metering_label_rules:
cls._try_delete_resource(
cls.admin_client.delete_metering_label_rule,
metering_label_rule['id'])
# Clean up metering labels
for metering_label in cls.metering_labels:
cls._try_delete_resource(
cls.admin_client.delete_metering_label,
metering_label['id'])
# Clean up flavors
for flavor in cls.flavors:
cls._try_delete_resource(
cls.admin_client.delete_flavor,
flavor['id'])
# Clean up service profiles
for service_profile in cls.service_profiles:
cls._try_delete_resource(
cls.admin_client.delete_service_profile,
service_profile['id'])
# Clean up ports
for port in cls.ports:
cls._try_delete_resource(cls.client.delete_port,
port['id'])
# Clean up subnets
for subnet in cls.subnets:
cls._try_delete_resource(cls.client.delete_subnet,
subnet['id'])
# Clean up admin subnets
for subnet in cls.admin_subnets:
cls._try_delete_resource(cls.admin_client.delete_subnet,
subnet['id'])
# Clean up networks
for network in cls.networks:
cls._try_delete_resource(cls.client.delete_network,
network['id'])
# Clean up admin networks
for network in cls.admin_networks:
cls._try_delete_resource(cls.admin_client.delete_network,
network['id'])
# Clean up security groups
for secgroup in cls.security_groups:
cls._try_delete_resource(cls.client.delete_security_group,
secgroup['id'])
for subnetpool in cls.subnetpools:
cls._try_delete_resource(cls.client.delete_subnetpool,
subnetpool['id'])
for subnetpool in cls.admin_subnetpools:
cls._try_delete_resource(cls.admin_client.delete_subnetpool,
subnetpool['id'])
for address_scope in cls.address_scopes:
cls._try_delete_resource(cls.client.delete_address_scope,
address_scope['id'])
for address_scope in cls.admin_address_scopes:
cls._try_delete_resource(
cls.admin_client.delete_address_scope,
address_scope['id'])
# Clean up QoS rules
for qos_rule in cls.qos_rules:
cls._try_delete_resource(cls.admin_client.delete_qos_rule,
qos_rule['id'])
# Clean up QoS policies
# as all networks and ports are already removed, QoS policies
# shouldn't be "in use"
for qos_policy in cls.qos_policies:
cls._try_delete_resource(cls.admin_client.delete_qos_policy,
qos_policy['id'])
super(BaseNetworkTest, cls).resource_cleanup()
@classmethod
def _try_delete_resource(cls, delete_callable, *args, **kwargs):
"""Cleanup resources in case of test-failure
Some resources are explicitly deleted by the test.
If the test failed to delete a resource, this method will execute
the appropriate delete methods. Otherwise, the method ignores NotFound
exceptions thrown for resources that were correctly deleted by the
test.
:param delete_callable: delete method
:param args: arguments for delete method
:param kwargs: keyword arguments for delete method
"""
try:
delete_callable(*args, **kwargs)
# if resource is not found, this means it was deleted in the test
except lib_exc.NotFound:
pass
@classmethod
def create_network(cls, network_name=None, client=None, **kwargs):
"""Wrapper utility that returns a test network."""
network_name = network_name or data_utils.rand_name('test-network-')
client = client or cls.client
body = client.create_network(name=network_name, **kwargs)
network = body['network']
if client is cls.client:
cls.networks.append(network)
else:
cls.admin_networks.append(network)
return network
@classmethod
def create_shared_network(cls, network_name=None, **post_body):
network_name = network_name or data_utils.rand_name('sharednetwork-')
post_body.update({'name': network_name, 'shared': True})
body = cls.admin_client.create_network(**post_body)
network = body['network']
cls.admin_networks.append(network)
return network
@classmethod
def create_network_keystone_v3(cls, network_name=None, project_id=None,
tenant_id=None, client=None):
"""Wrapper utility that creates a test network with project_id."""
client = client or cls.client
network_name = network_name or data_utils.rand_name(
'test-network-with-project_id')
project_id = cls.client.tenant_id
body = client.create_network_keystone_v3(network_name, project_id,
tenant_id)
network = body['network']
if client is cls.client:
cls.networks.append(network)
else:
cls.admin_networks.append(network)
return network
@classmethod
def create_subnet(cls, network, gateway='', cidr=None, mask_bits=None,
ip_version=None, client=None, **kwargs):
"""Wrapper utility that returns a test subnet."""
# allow tests to use admin client
if not client:
client = cls.client
# The cidr and mask_bits depend on the ip version.
ip_version = ip_version if ip_version is not None else cls._ip_version
gateway_not_set = gateway == ''
if ip_version == 4:
cidr = cidr or netaddr.IPNetwork(
config.safe_get_config_value(
'network', 'project_network_cidr'))
mask_bits = (
mask_bits or config.safe_get_config_value(
'network', 'project_network_mask_bits'))
elif ip_version == 6:
cidr = (
cidr or netaddr.IPNetwork(
config.safe_get_config_value(
'network', 'project_network_v6_cidr')))
mask_bits = (
mask_bits or config.safe_get_config_value(
'network', 'project_network_v6_mask_bits'))
# Find a cidr that is not in use yet and create a subnet with it
for subnet_cidr in cidr.subnet(mask_bits):
if gateway_not_set:
gateway_ip = str(netaddr.IPAddress(subnet_cidr) + 1)
else:
gateway_ip = gateway
try:
body = client.create_subnet(
network_id=network['id'],
cidr=str(subnet_cidr),
ip_version=ip_version,
gateway_ip=gateway_ip,
**kwargs)
break
except lib_exc.BadRequest as e:
is_overlapping_cidr = 'overlaps with another subnet' in str(e)
if not is_overlapping_cidr:
raise
else:
message = 'Available CIDR for subnet creation could not be found'
raise ValueError(message)
subnet = body['subnet']
if client is cls.client:
cls.subnets.append(subnet)
else:
cls.admin_subnets.append(subnet)
return subnet
@classmethod
def create_port(cls, network, **kwargs):
"""Wrapper utility that returns a test port."""
body = cls.client.create_port(network_id=network['id'],
**kwargs)
port = body['port']
cls.ports.append(port)
return port
@classmethod
def update_port(cls, port, **kwargs):
"""Wrapper utility that updates a test port."""
body = cls.client.update_port(port['id'],
**kwargs)
return body['port']
@classmethod
def _create_router_with_client(
cls, client, router_name=None, admin_state_up=False,
external_network_id=None, enable_snat=None, **kwargs
):
ext_gw_info = {}
if external_network_id:
ext_gw_info['network_id'] = external_network_id
if enable_snat is not None:
ext_gw_info['enable_snat'] = enable_snat
body = client.create_router(
router_name, external_gateway_info=ext_gw_info,
admin_state_up=admin_state_up, **kwargs)
router = body['router']
cls.routers.append(router)
return router
@classmethod
def create_router(cls, *args, **kwargs):
return cls._create_router_with_client(cls.client, *args, **kwargs)
@classmethod
def create_admin_router(cls, *args, **kwargs):
return cls._create_router_with_client(cls.os_admin.network_client,
*args, **kwargs)
@classmethod
def create_floatingip(cls, external_network_id):
"""Wrapper utility that returns a test floating IP."""
body = cls.client.create_floatingip(
floating_network_id=external_network_id)
fip = body['floatingip']
cls.floating_ips.append(fip)
return fip
@classmethod
def create_router_interface(cls, router_id, subnet_id):
"""Wrapper utility that returns a router interface."""
interface = cls.client.add_router_interface_with_subnet_id(
router_id, subnet_id)
return interface
@classmethod
def get_supported_qos_rule_types(cls):
body = cls.client.list_qos_rule_types()
return [rule_type['type'] for rule_type in body['rule_types']]
@classmethod
def create_qos_policy(cls, name, description=None, shared=False,
tenant_id=None, is_default=False):
"""Wrapper utility that returns a test QoS policy."""
body = cls.admin_client.create_qos_policy(
name, description, shared, tenant_id, is_default)
qos_policy = body['policy']
cls.qos_policies.append(qos_policy)
return qos_policy
@classmethod
def create_qos_bandwidth_limit_rule(cls, policy_id, max_kbps,
max_burst_kbps,
direction=constants.EGRESS_DIRECTION):
"""Wrapper utility that returns a test QoS bandwidth limit rule."""
body = cls.admin_client.create_bandwidth_limit_rule(
policy_id, max_kbps, max_burst_kbps, direction)
qos_rule = body['bandwidth_limit_rule']
cls.qos_rules.append(qos_rule)
return qos_rule
@classmethod
def delete_router(cls, router):
body = cls.client.list_router_interfaces(router['id'])
interfaces = body['ports']
for i in interfaces:
try:
cls.client.remove_router_interface_with_subnet_id(
router['id'], i['fixed_ips'][0]['subnet_id'])
except lib_exc.NotFound:
pass
cls.client.delete_router(router['id'])
@classmethod
def create_address_scope(cls, name, is_admin=False, **kwargs):
if is_admin:
body = cls.admin_client.create_address_scope(name=name, **kwargs)
cls.admin_address_scopes.append(body['address_scope'])
else:
body = cls.client.create_address_scope(name=name, **kwargs)
cls.address_scopes.append(body['address_scope'])
return body['address_scope']
@classmethod
def create_subnetpool(cls, name, is_admin=False, **kwargs):
if is_admin:
body = cls.admin_client.create_subnetpool(name, **kwargs)
cls.admin_subnetpools.append(body['subnetpool'])
else:
body = cls.client.create_subnetpool(name, **kwargs)
cls.subnetpools.append(body['subnetpool'])
return body['subnetpool']
class BaseAdminNetworkTest(BaseNetworkTest):
credentials = ['primary', 'admin']
@classmethod
def setup_clients(cls):
super(BaseAdminNetworkTest, cls).setup_clients()
cls.admin_client = cls.os_admin.network_client
cls.identity_admin_client = cls.os_admin.projects_client
@classmethod
def create_metering_label(cls, name, description):
"""Wrapper utility that returns a test metering label."""
body = cls.admin_client.create_metering_label(
description=description,
name=data_utils.rand_name("metering-label"))
metering_label = body['metering_label']
cls.metering_labels.append(metering_label)
return metering_label
@classmethod
def create_metering_label_rule(cls, remote_ip_prefix, direction,
metering_label_id):
"""Wrapper utility that returns a test metering label rule."""
body = cls.admin_client.create_metering_label_rule(
remote_ip_prefix=remote_ip_prefix, direction=direction,
metering_label_id=metering_label_id)
metering_label_rule = body['metering_label_rule']
cls.metering_label_rules.append(metering_label_rule)
return metering_label_rule
@classmethod
def create_flavor(cls, name, description, service_type):
"""Wrapper utility that returns a test flavor."""
body = cls.admin_client.create_flavor(
description=description, service_type=service_type,
name=name)
flavor = body['flavor']
cls.flavors.append(flavor)
return flavor
@classmethod
def create_service_profile(cls, description, metainfo, driver):
"""Wrapper utility that returns a test service profile."""
body = cls.admin_client.create_service_profile(
driver=driver, metainfo=metainfo, description=description)
service_profile = body['service_profile']
cls.service_profiles.append(service_profile)
return service_profile
@classmethod
def get_unused_ip(cls, net_id, ip_version=None):
"""Get an unused ip address in a allocation pool of net"""
body = cls.admin_client.list_ports(network_id=net_id)
ports = body['ports']
used_ips = []
for port in ports:
used_ips.extend(
[fixed_ip['ip_address'] for fixed_ip in port['fixed_ips']])
body = cls.admin_client.list_subnets(network_id=net_id)
subnets = body['subnets']
for subnet in subnets:
if ip_version and subnet['ip_version'] != ip_version:
continue
cidr = subnet['cidr']
allocation_pools = subnet['allocation_pools']
iterators = []
if allocation_pools:
for allocation_pool in allocation_pools:
iterators.append(netaddr.iter_iprange(
allocation_pool['start'], allocation_pool['end']))
else:
net = netaddr.IPNetwork(cidr)
def _iterip():
for ip in net:
if ip not in (net.network, net.broadcast):
yield ip
iterators.append(iter(_iterip()))
for iterator in iterators:
for ip in iterator:
if str(ip) not in used_ips:
return str(ip)
message = (
"net(%s) has no usable IP address in allocation pools" % net_id)
raise exceptions.InvalidConfiguration(message)
def require_qos_rule_type(rule_type):
def decorator(f):
@functools.wraps(f)
def wrapper(self, *func_args, **func_kwargs):
if rule_type not in self.get_supported_qos_rule_types():
raise self.skipException(
"%s rule type is required." % rule_type)
return f(self, *func_args, **func_kwargs)
return wrapper
return decorator
def _require_sorting(f):
@functools.wraps(f)
def inner(self, *args, **kwargs):
if not test.is_extension_enabled("sorting", "network"):
self.skipTest('Sorting feature is required')
return f(self, *args, **kwargs)
return inner
def _require_pagination(f):
@functools.wraps(f)
def inner(self, *args, **kwargs):
if not test.is_extension_enabled("pagination", "network"):
self.skipTest('Pagination feature is required')
return f(self, *args, **kwargs)
return inner
class BaseSearchCriteriaTest(BaseNetworkTest):
# This should be defined by subclasses to reflect resource name to test
resource = None
field = 'name'
# NOTE(ihrachys): some names, like those starting with an underscore (_)
# are sorted differently depending on whether the plugin implements native
# sorting support, or not. So we avoid any such cases here, sticking to
# alphanumeric. Also test a case when there are multiple resources with the
# same name
resource_names = ('test1', 'abc1', 'test10', '123test') + ('test1',)
force_tenant_isolation = True
list_kwargs = {}
list_as_admin = False
def assertSameOrder(self, original, actual):
# gracefully handle iterators passed
original = list(original)
actual = list(actual)
self.assertEqual(len(original), len(actual))
for expected, res in zip(original, actual):
self.assertEqual(expected[self.field], res[self.field])
@utils.classproperty
def plural_name(self):
return '%ss' % self.resource
@property
def list_client(self):
return self.admin_client if self.list_as_admin else self.client
def list_method(self, *args, **kwargs):
method = getattr(self.list_client, 'list_%s' % self.plural_name)
kwargs.update(self.list_kwargs)
return method(*args, **kwargs)
def get_bare_url(self, url):
base_url = self.client.base_url
self.assertTrue(url.startswith(base_url))
return url[len(base_url):]
@classmethod
def _extract_resources(cls, body):
return body[cls.plural_name]
def _test_list_sorts(self, direction):
sort_args = {
'sort_dir': direction,
'sort_key': self.field
}
body = self.list_method(**sort_args)
resources = self._extract_resources(body)
self.assertNotEmpty(
resources, "%s list returned is empty" % self.resource)
retrieved_names = [res[self.field] for res in resources]
expected = sorted(retrieved_names)
if direction == constants.SORT_DIRECTION_DESC:
expected = list(reversed(expected))
self.assertEqual(expected, retrieved_names)
@_require_sorting
def _test_list_sorts_asc(self):
self._test_list_sorts(constants.SORT_DIRECTION_ASC)
@_require_sorting
def _test_list_sorts_desc(self):
self._test_list_sorts(constants.SORT_DIRECTION_DESC)
@_require_pagination
def _test_list_pagination(self):
for limit in range(1, len(self.resource_names) + 1):
pagination_args = {
'limit': limit,
}
body = self.list_method(**pagination_args)
resources = self._extract_resources(body)
self.assertEqual(limit, len(resources))
@_require_pagination
def _test_list_no_pagination_limit_0(self):
pagination_args = {
'limit': 0,
}
body = self.list_method(**pagination_args)
resources = self._extract_resources(body)
self.assertGreaterEqual(len(resources), len(self.resource_names))
def _test_list_pagination_iteratively(self, lister):
# first, collect all resources for later comparison
sort_args = {
'sort_dir': constants.SORT_DIRECTION_ASC,
'sort_key': self.field
}
body = self.list_method(**sort_args)
expected_resources = self._extract_resources(body)
self.assertNotEmpty(expected_resources)
resources = lister(
len(expected_resources), sort_args
)
# finally, compare that the list retrieved in one go is identical to
# the one containing pagination results
self.assertSameOrder(expected_resources, resources)
def _list_all_with_marker(self, niterations, sort_args):
# paginate resources one by one, using last fetched resource as a
# marker
resources = []
for i in range(niterations):
pagination_args = sort_args.copy()
pagination_args['limit'] = 1
if resources:
pagination_args['marker'] = resources[-1]['id']
body = self.list_method(**pagination_args)
resources_ = self._extract_resources(body)
self.assertEqual(1, len(resources_))
resources.extend(resources_)
return resources
@_require_pagination
@_require_sorting
def _test_list_pagination_with_marker(self):
self._test_list_pagination_iteratively(self._list_all_with_marker)
def _list_all_with_hrefs(self, niterations, sort_args):
# paginate resources one by one, using next href links
resources = []
prev_links = {}
for i in range(niterations):
if prev_links:
uri = self.get_bare_url(prev_links['next'])
else:
sort_args.update(self.list_kwargs)
uri = self.list_client.build_uri(
self.plural_name, limit=1, **sort_args)
prev_links, body = self.list_client.get_uri_with_links(
self.plural_name, uri
)
resources_ = self._extract_resources(body)
self.assertEqual(1, len(resources_))
resources.extend(resources_)
# The last element is empty and does not contain 'next' link
uri = self.get_bare_url(prev_links['next'])
prev_links, body = self.client.get_uri_with_links(
self.plural_name, uri
)
self.assertNotIn('next', prev_links)
# Now walk backwards and compare results
resources2 = []
for i in range(niterations):
uri = self.get_bare_url(prev_links['previous'])
prev_links, body = self.list_client.get_uri_with_links(
self.plural_name, uri
)
resources_ = self._extract_resources(body)
self.assertEqual(1, len(resources_))
resources2.extend(resources_)
self.assertSameOrder(resources, reversed(resources2))
return resources
@_require_pagination
@_require_sorting
def _test_list_pagination_with_href_links(self):
self._test_list_pagination_iteratively(self._list_all_with_hrefs)
@_require_pagination
@_require_sorting
def _test_list_pagination_page_reverse_with_href_links(
self, direction=constants.SORT_DIRECTION_ASC):
pagination_args = {
'sort_dir': direction,
'sort_key': self.field,
}
body = self.list_method(**pagination_args)
expected_resources = self._extract_resources(body)
page_size = 2
pagination_args['limit'] = page_size
prev_links = {}
resources = []
num_resources = len(expected_resources)
niterations = int(math.ceil(float(num_resources) / page_size))
for i in range(niterations):
if prev_links:
uri = self.get_bare_url(prev_links['previous'])
else:
pagination_args.update(self.list_kwargs)
uri = self.list_client.build_uri(
self.plural_name, page_reverse=True, **pagination_args)
prev_links, body = self.list_client.get_uri_with_links(
self.plural_name, uri
)
resources_ = self._extract_resources(body)
self.assertGreaterEqual(page_size, len(resources_))
resources.extend(reversed(resources_))
self.assertSameOrder(expected_resources, reversed(resources))
@_require_pagination
@_require_sorting
def _test_list_pagination_page_reverse_asc(self):
self._test_list_pagination_page_reverse(
direction=constants.SORT_DIRECTION_ASC)
@_require_pagination
@_require_sorting
def _test_list_pagination_page_reverse_desc(self):
self._test_list_pagination_page_reverse(
direction=constants.SORT_DIRECTION_DESC)
def _test_list_pagination_page_reverse(self, direction):
pagination_args = {
'sort_dir': direction,
'sort_key': self.field,
'limit': 3,
}
body = self.list_method(**pagination_args)
expected_resources = self._extract_resources(body)
pagination_args['limit'] -= 1
pagination_args['marker'] = expected_resources[-1]['id']
pagination_args['page_reverse'] = True
body = self.list_method(**pagination_args)
self.assertSameOrder(
# the last entry is not included in 2nd result when used as a
# marker
expected_resources[:-1],
self._extract_resources(body))
def _test_list_validation_filters(self):
validation_args = {
'unknown_filter': 'value',
}
body = self.list_method(**validation_args)
resources = self._extract_resources(body)
for resource in resources:
self.assertIn(resource['name'], self.resource_names)