Use tempest plugin interface
Make use of the Tempest plugin interface instead of copying Neutron
files into Tempest. This will remove the burden to port Neutron
tests onto Tempest master recurrently.
It uses neutron/tests/tempest/ as new top folder for all Tempest
tests. It follows the model of Heat [1].
[1]: https://github.com/openstack/heat/tree/master/heat_integrationtests
Partially implements bp external-plugin-interface
Change-Id: Ia233aa162746845f6ae08a8157dcd242dcd58eab
diff --git a/neutron/tests/tempest/api/base.py b/neutron/tests/tempest/api/base.py
new file mode 100644
index 0000000..c1bd622
--- /dev/null
+++ b/neutron/tests/tempest/api/base.py
@@ -0,0 +1,462 @@
+# Copyright 2012 OpenStack Foundation
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import netaddr
+from tempest.lib.common.utils import data_utils
+from tempest.lib import exceptions as lib_exc
+from tempest import test
+
+from neutron.tests.tempest.api import clients
+from neutron.tests.tempest import config
+from neutron.tests.tempest import exceptions
+
+CONF = config.CONF
+
+
+class BaseNetworkTest(test.BaseTestCase):
+
+ """
+ Base class for the Neutron tests that use the Tempest Neutron REST client
+
+ Per the Neutron API Guide, API v1.x was removed from the source code tree
+ (docs.openstack.org/api/openstack-network/2.0/content/Overview-d1e71.html)
+ Therefore, v2.x of the Neutron API is assumed. It is also assumed that the
+ following options are defined in the [network] section of etc/tempest.conf:
+
+ project_network_cidr with a block of cidr's from which smaller blocks
+ can be allocated for tenant networks
+
+ project_network_mask_bits with the mask bits to be used to partition
+ the block defined by tenant-network_cidr
+
+ Finally, it is assumed that the following option is defined in the
+ [service_available] section of etc/tempest.conf
+
+ neutron as True
+ """
+
+ force_tenant_isolation = False
+ credentials = ['primary']
+
+ # Default to ipv4.
+ _ip_version = 4
+
+ @classmethod
+ def get_client_manager(cls, credential_type=None, roles=None,
+ force_new=None):
+ manager = test.BaseTestCase.get_client_manager(
+ credential_type=credential_type,
+ roles=roles,
+ force_new=force_new)
+ # Neutron uses a different clients manager than the one in the Tempest
+ return clients.Manager(manager.credentials)
+
+ @classmethod
+ def skip_checks(cls):
+ super(BaseNetworkTest, cls).skip_checks()
+ if not CONF.service_available.neutron:
+ raise cls.skipException("Neutron support is required")
+ if cls._ip_version == 6 and not CONF.network_feature_enabled.ipv6:
+ raise cls.skipException("IPv6 Tests are disabled.")
+
+ @classmethod
+ def setup_credentials(cls):
+ # Create no network resources for these test.
+ cls.set_network_resources()
+ super(BaseNetworkTest, cls).setup_credentials()
+
+ @classmethod
+ def setup_clients(cls):
+ super(BaseNetworkTest, cls).setup_clients()
+ cls.client = cls.os.network_client
+
+ @classmethod
+ def resource_setup(cls):
+ super(BaseNetworkTest, cls).resource_setup()
+
+ cls.networks = []
+ cls.shared_networks = []
+ cls.subnets = []
+ cls.ports = []
+ cls.routers = []
+ cls.floating_ips = []
+ cls.metering_labels = []
+ cls.service_profiles = []
+ cls.flavors = []
+ cls.metering_label_rules = []
+ cls.qos_rules = []
+ cls.qos_policies = []
+ cls.ethertype = "IPv" + str(cls._ip_version)
+ cls.address_scopes = []
+ cls.admin_address_scopes = []
+ cls.subnetpools = []
+ cls.admin_subnetpools = []
+
+ @classmethod
+ def resource_cleanup(cls):
+ if CONF.service_available.neutron:
+ # Clean up QoS rules
+ for qos_rule in cls.qos_rules:
+ cls._try_delete_resource(cls.admin_client.delete_qos_rule,
+ qos_rule['id'])
+ # Clean up QoS policies
+ for qos_policy in cls.qos_policies:
+ cls._try_delete_resource(cls.admin_client.delete_qos_policy,
+ qos_policy['id'])
+ # Clean up floating IPs
+ for floating_ip in cls.floating_ips:
+ cls._try_delete_resource(cls.client.delete_floatingip,
+ floating_ip['id'])
+ # Clean up routers
+ for router in cls.routers:
+ cls._try_delete_resource(cls.delete_router,
+ router)
+ # Clean up metering label rules
+ for metering_label_rule in cls.metering_label_rules:
+ cls._try_delete_resource(
+ cls.admin_client.delete_metering_label_rule,
+ metering_label_rule['id'])
+ # Clean up metering labels
+ for metering_label in cls.metering_labels:
+ cls._try_delete_resource(
+ cls.admin_client.delete_metering_label,
+ metering_label['id'])
+ # Clean up flavors
+ for flavor in cls.flavors:
+ cls._try_delete_resource(
+ cls.admin_client.delete_flavor,
+ flavor['id'])
+ # Clean up service profiles
+ for service_profile in cls.service_profiles:
+ cls._try_delete_resource(
+ cls.admin_client.delete_service_profile,
+ service_profile['id'])
+ # Clean up ports
+ for port in cls.ports:
+ cls._try_delete_resource(cls.client.delete_port,
+ port['id'])
+ # Clean up subnets
+ for subnet in cls.subnets:
+ cls._try_delete_resource(cls.client.delete_subnet,
+ subnet['id'])
+ # Clean up networks
+ for network in cls.networks:
+ cls._try_delete_resource(cls.client.delete_network,
+ network['id'])
+
+ # Clean up shared networks
+ for network in cls.shared_networks:
+ cls._try_delete_resource(cls.admin_client.delete_network,
+ network['id'])
+
+ for subnetpool in cls.subnetpools:
+ cls._try_delete_resource(cls.client.delete_subnetpool,
+ subnetpool['id'])
+
+ for subnetpool in cls.admin_subnetpools:
+ cls._try_delete_resource(cls.admin_client.delete_subnetpool,
+ subnetpool['id'])
+
+ for address_scope in cls.address_scopes:
+ cls._try_delete_resource(cls.client.delete_address_scope,
+ address_scope['id'])
+
+ for address_scope in cls.admin_address_scopes:
+ cls._try_delete_resource(
+ cls.admin_client.delete_address_scope,
+ address_scope['id'])
+
+ super(BaseNetworkTest, cls).resource_cleanup()
+
+ @classmethod
+ def _try_delete_resource(cls, delete_callable, *args, **kwargs):
+ """Cleanup resources in case of test-failure
+
+ Some resources are explicitly deleted by the test.
+ If the test failed to delete a resource, this method will execute
+ the appropriate delete methods. Otherwise, the method ignores NotFound
+ exceptions thrown for resources that were correctly deleted by the
+ test.
+
+ :param delete_callable: delete method
+ :param args: arguments for delete method
+ :param kwargs: keyword arguments for delete method
+ """
+ try:
+ delete_callable(*args, **kwargs)
+ # if resource is not found, this means it was deleted in the test
+ except lib_exc.NotFound:
+ pass
+
+ @classmethod
+ def create_network(cls, network_name=None, **kwargs):
+ """Wrapper utility that returns a test network."""
+ network_name = network_name or data_utils.rand_name('test-network-')
+
+ body = cls.client.create_network(name=network_name, **kwargs)
+ network = body['network']
+ cls.networks.append(network)
+ return network
+
+ @classmethod
+ def create_shared_network(cls, network_name=None, **post_body):
+ network_name = network_name or data_utils.rand_name('sharednetwork-')
+ post_body.update({'name': network_name, 'shared': True})
+ body = cls.admin_client.create_network(**post_body)
+ network = body['network']
+ cls.shared_networks.append(network)
+ return network
+
+ @classmethod
+ def create_subnet(cls, network, gateway='', cidr=None, mask_bits=None,
+ ip_version=None, client=None, **kwargs):
+ """Wrapper utility that returns a test subnet."""
+
+ # allow tests to use admin client
+ if not client:
+ client = cls.client
+
+ # The cidr and mask_bits depend on the ip version.
+ ip_version = ip_version if ip_version is not None else cls._ip_version
+ gateway_not_set = gateway == ''
+ if ip_version == 4:
+ cidr = cidr or netaddr.IPNetwork(
+ config.safe_get_config_value(
+ 'network', 'project_network_cidr'))
+ mask_bits = (
+ mask_bits or config.safe_get_config_value(
+ 'network', 'project_network_mask_bits'))
+ elif ip_version == 6:
+ cidr = (
+ cidr or netaddr.IPNetwork(
+ config.safe_get_config_value(
+ 'network', 'project_network_v6_cidr')))
+ mask_bits = (
+ mask_bits or config.safe_get_config_value(
+ 'network', 'project_network_v6_mask_bits'))
+ # Find a cidr that is not in use yet and create a subnet with it
+ for subnet_cidr in cidr.subnet(mask_bits):
+ if gateway_not_set:
+ gateway_ip = str(netaddr.IPAddress(subnet_cidr) + 1)
+ else:
+ gateway_ip = gateway
+ try:
+ body = client.create_subnet(
+ network_id=network['id'],
+ cidr=str(subnet_cidr),
+ ip_version=ip_version,
+ gateway_ip=gateway_ip,
+ **kwargs)
+ break
+ except lib_exc.BadRequest as e:
+ is_overlapping_cidr = 'overlaps with another subnet' in str(e)
+ if not is_overlapping_cidr:
+ raise
+ else:
+ message = 'Available CIDR for subnet creation could not be found'
+ raise ValueError(message)
+ subnet = body['subnet']
+ cls.subnets.append(subnet)
+ return subnet
+
+ @classmethod
+ def create_port(cls, network, **kwargs):
+ """Wrapper utility that returns a test port."""
+ body = cls.client.create_port(network_id=network['id'],
+ **kwargs)
+ port = body['port']
+ cls.ports.append(port)
+ return port
+
+ @classmethod
+ def update_port(cls, port, **kwargs):
+ """Wrapper utility that updates a test port."""
+ body = cls.client.update_port(port['id'],
+ **kwargs)
+ return body['port']
+
+ @classmethod
+ def create_router(cls, router_name=None, admin_state_up=False,
+ external_network_id=None, enable_snat=None,
+ **kwargs):
+ ext_gw_info = {}
+ if external_network_id:
+ ext_gw_info['network_id'] = external_network_id
+ if enable_snat:
+ ext_gw_info['enable_snat'] = enable_snat
+ body = cls.client.create_router(
+ router_name, external_gateway_info=ext_gw_info,
+ admin_state_up=admin_state_up, **kwargs)
+ router = body['router']
+ cls.routers.append(router)
+ return router
+
+ @classmethod
+ def create_floatingip(cls, external_network_id):
+ """Wrapper utility that returns a test floating IP."""
+ body = cls.client.create_floatingip(
+ floating_network_id=external_network_id)
+ fip = body['floatingip']
+ cls.floating_ips.append(fip)
+ return fip
+
+ @classmethod
+ def create_router_interface(cls, router_id, subnet_id):
+ """Wrapper utility that returns a router interface."""
+ interface = cls.client.add_router_interface_with_subnet_id(
+ router_id, subnet_id)
+ return interface
+
+ @classmethod
+ def create_qos_policy(cls, name, description, shared, tenant_id=None):
+ """Wrapper utility that returns a test QoS policy."""
+ body = cls.admin_client.create_qos_policy(
+ name, description, shared, tenant_id)
+ qos_policy = body['policy']
+ cls.qos_policies.append(qos_policy)
+ return qos_policy
+
+ @classmethod
+ def create_qos_bandwidth_limit_rule(cls, policy_id,
+ max_kbps, max_burst_kbps):
+ """Wrapper utility that returns a test QoS bandwidth limit rule."""
+ body = cls.admin_client.create_bandwidth_limit_rule(
+ policy_id, max_kbps, max_burst_kbps)
+ qos_rule = body['bandwidth_limit_rule']
+ cls.qos_rules.append(qos_rule)
+ return qos_rule
+
+ @classmethod
+ def delete_router(cls, router):
+ body = cls.client.list_router_interfaces(router['id'])
+ interfaces = body['ports']
+ for i in interfaces:
+ try:
+ cls.client.remove_router_interface_with_subnet_id(
+ router['id'], i['fixed_ips'][0]['subnet_id'])
+ except lib_exc.NotFound:
+ pass
+ cls.client.delete_router(router['id'])
+
+ @classmethod
+ def create_address_scope(cls, name, is_admin=False, **kwargs):
+ if is_admin:
+ body = cls.admin_client.create_address_scope(name=name, **kwargs)
+ cls.admin_address_scopes.append(body['address_scope'])
+ else:
+ body = cls.client.create_address_scope(name=name, **kwargs)
+ cls.address_scopes.append(body['address_scope'])
+ return body['address_scope']
+
+ @classmethod
+ def create_subnetpool(cls, name, is_admin=False, **kwargs):
+ if is_admin:
+ body = cls.admin_client.create_subnetpool(name, **kwargs)
+ cls.admin_subnetpools.append(body['subnetpool'])
+ else:
+ body = cls.client.create_subnetpool(name, **kwargs)
+ cls.subnetpools.append(body['subnetpool'])
+ return body['subnetpool']
+
+
+class BaseAdminNetworkTest(BaseNetworkTest):
+
+ credentials = ['primary', 'admin']
+
+ @classmethod
+ def setup_clients(cls):
+ super(BaseAdminNetworkTest, cls).setup_clients()
+ cls.admin_client = cls.os_adm.network_client
+ cls.identity_admin_client = cls.os_adm.tenants_client
+
+ @classmethod
+ def create_metering_label(cls, name, description):
+ """Wrapper utility that returns a test metering label."""
+ body = cls.admin_client.create_metering_label(
+ description=description,
+ name=data_utils.rand_name("metering-label"))
+ metering_label = body['metering_label']
+ cls.metering_labels.append(metering_label)
+ return metering_label
+
+ @classmethod
+ def create_metering_label_rule(cls, remote_ip_prefix, direction,
+ metering_label_id):
+ """Wrapper utility that returns a test metering label rule."""
+ body = cls.admin_client.create_metering_label_rule(
+ remote_ip_prefix=remote_ip_prefix, direction=direction,
+ metering_label_id=metering_label_id)
+ metering_label_rule = body['metering_label_rule']
+ cls.metering_label_rules.append(metering_label_rule)
+ return metering_label_rule
+
+ @classmethod
+ def create_flavor(cls, name, description, service_type):
+ """Wrapper utility that returns a test flavor."""
+ body = cls.admin_client.create_flavor(
+ description=description, service_type=service_type,
+ name=name)
+ flavor = body['flavor']
+ cls.flavors.append(flavor)
+ return flavor
+
+ @classmethod
+ def create_service_profile(cls, description, metainfo, driver):
+ """Wrapper utility that returns a test service profile."""
+ body = cls.admin_client.create_service_profile(
+ driver=driver, metainfo=metainfo, description=description)
+ service_profile = body['service_profile']
+ cls.service_profiles.append(service_profile)
+ return service_profile
+
+ @classmethod
+ def get_unused_ip(cls, net_id, ip_version=None):
+ """Get an unused ip address in a allocaion pool of net"""
+ body = cls.admin_client.list_ports(network_id=net_id)
+ ports = body['ports']
+ used_ips = []
+ for port in ports:
+ used_ips.extend(
+ [fixed_ip['ip_address'] for fixed_ip in port['fixed_ips']])
+ body = cls.admin_client.list_subnets(network_id=net_id)
+ subnets = body['subnets']
+
+ for subnet in subnets:
+ if ip_version and subnet['ip_version'] != ip_version:
+ continue
+ cidr = subnet['cidr']
+ allocation_pools = subnet['allocation_pools']
+ iterators = []
+ if allocation_pools:
+ for allocation_pool in allocation_pools:
+ iterators.append(netaddr.iter_iprange(
+ allocation_pool['start'], allocation_pool['end']))
+ else:
+ net = netaddr.IPNetwork(cidr)
+
+ def _iterip():
+ for ip in net:
+ if ip not in (net.network, net.broadcast):
+ yield ip
+ iterators.append(iter(_iterip()))
+
+ for iterator in iterators:
+ for ip in iterator:
+ if str(ip) not in used_ips:
+ return str(ip)
+
+ message = (
+ "net(%s) has no usable IP address in allocation pools" % net_id)
+ raise exceptions.InvalidConfiguration(message)