|  | # Copyright 2012 OpenStack Foundation | 
|  | # All Rights Reserved. | 
|  | # | 
|  | #    Licensed under the Apache License, Version 2.0 (the "License"); you may | 
|  | #    not use this file except in compliance with the License. You may obtain | 
|  | #    a copy of the License at | 
|  | # | 
|  | #         http://www.apache.org/licenses/LICENSE-2.0 | 
|  | # | 
|  | #    Unless required by applicable law or agreed to in writing, software | 
|  | #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | 
|  | #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | 
|  | #    License for the specific language governing permissions and limitations | 
|  | #    under the License. | 
|  |  | 
|  | import functools | 
|  | import math | 
|  | import time | 
|  |  | 
|  | import netaddr | 
|  | from neutron_lib import constants as const | 
|  | from oslo_log import log | 
|  | from tempest.common import utils as tutils | 
|  | from tempest.lib.common.utils import data_utils | 
|  | from tempest.lib import exceptions as lib_exc | 
|  | from tempest import test | 
|  |  | 
|  | from neutron_tempest_plugin.api import clients | 
|  | from neutron_tempest_plugin.common import constants | 
|  | from neutron_tempest_plugin.common import utils | 
|  | from neutron_tempest_plugin import config | 
|  | from neutron_tempest_plugin import exceptions | 
|  |  | 
|  | CONF = config.CONF | 
|  |  | 
|  | LOG = log.getLogger(__name__) | 
|  |  | 
|  |  | 
|  | class BaseNetworkTest(test.BaseTestCase): | 
|  |  | 
|  | """Base class for Neutron tests that use the Tempest Neutron REST client | 
|  |  | 
|  | Per the Neutron API Guide, API v1.x was removed from the source code tree | 
|  | (docs.openstack.org/api/openstack-network/2.0/content/Overview-d1e71.html) | 
|  | Therefore, v2.x of the Neutron API is assumed. It is also assumed that the | 
|  | following options are defined in the [network] section of etc/tempest.conf: | 
|  |  | 
|  | project_network_cidr with a block of cidr's from which smaller blocks | 
|  | can be allocated for tenant networks | 
|  |  | 
|  | project_network_mask_bits with the mask bits to be used to partition | 
|  | the block defined by tenant-network_cidr | 
|  |  | 
|  | Finally, it is assumed that the following option is defined in the | 
|  | [service_available] section of etc/tempest.conf | 
|  |  | 
|  | neutron as True | 
|  | """ | 
|  |  | 
|  | force_tenant_isolation = False | 
|  | credentials = ['primary'] | 
|  |  | 
|  | # Default to ipv4. | 
|  | _ip_version = const.IP_VERSION_4 | 
|  |  | 
|  | # Derive from BaseAdminNetworkTest class to have this initialized | 
|  | admin_client = None | 
|  |  | 
|  | external_network_id = CONF.network.public_network_id | 
|  |  | 
|  | __is_driver_ovn = None | 
|  |  | 
|  | @classmethod | 
|  | def _is_driver_ovn(cls): | 
|  | ovn_agents = cls.os_admin.network_client.list_agents( | 
|  | binary='ovn-controller')['agents'] | 
|  | return len(ovn_agents) > 0 | 
|  |  | 
|  | @property | 
|  | def is_driver_ovn(self): | 
|  | if self.__is_driver_ovn is None: | 
|  | if hasattr(self, 'os_admin'): | 
|  | self.__is_driver_ovn = self._is_driver_ovn() | 
|  | return self.__is_driver_ovn | 
|  |  | 
|  | @classmethod | 
|  | def get_client_manager(cls, credential_type=None, roles=None, | 
|  | force_new=None): | 
|  | manager = super(BaseNetworkTest, cls).get_client_manager( | 
|  | credential_type=credential_type, | 
|  | roles=roles, | 
|  | force_new=force_new | 
|  | ) | 
|  | # Neutron uses a different clients manager than the one in the Tempest | 
|  | # save the original in case mixed tests need it | 
|  | if credential_type == 'primary': | 
|  | cls.os_tempest = manager | 
|  | return clients.Manager(manager.credentials) | 
|  |  | 
|  | @classmethod | 
|  | def skip_checks(cls): | 
|  | super(BaseNetworkTest, cls).skip_checks() | 
|  | if not CONF.service_available.neutron: | 
|  | raise cls.skipException("Neutron support is required") | 
|  | if (cls._ip_version == const.IP_VERSION_6 and | 
|  | not CONF.network_feature_enabled.ipv6): | 
|  | raise cls.skipException("IPv6 Tests are disabled.") | 
|  | for req_ext in getattr(cls, 'required_extensions', []): | 
|  | if not tutils.is_extension_enabled(req_ext, 'network'): | 
|  | msg = "%s extension not enabled." % req_ext | 
|  | raise cls.skipException(msg) | 
|  |  | 
|  | @classmethod | 
|  | def setup_credentials(cls): | 
|  | # Create no network resources for these test. | 
|  | cls.set_network_resources() | 
|  | super(BaseNetworkTest, cls).setup_credentials() | 
|  |  | 
|  | @classmethod | 
|  | def setup_clients(cls): | 
|  | super(BaseNetworkTest, cls).setup_clients() | 
|  | cls.client = cls.os_primary.network_client | 
|  |  | 
|  | @classmethod | 
|  | def resource_setup(cls): | 
|  | super(BaseNetworkTest, cls).resource_setup() | 
|  |  | 
|  | cls.networks = [] | 
|  | cls.admin_networks = [] | 
|  | cls.subnets = [] | 
|  | cls.admin_subnets = [] | 
|  | cls.ports = [] | 
|  | cls.routers = [] | 
|  | cls.floating_ips = [] | 
|  | cls.port_forwardings = [] | 
|  | cls.local_ips = [] | 
|  | cls.local_ip_associations = [] | 
|  | cls.metering_labels = [] | 
|  | cls.service_profiles = [] | 
|  | cls.flavors = [] | 
|  | cls.metering_label_rules = [] | 
|  | cls.qos_rules = [] | 
|  | cls.qos_policies = [] | 
|  | cls.ethertype = "IPv" + str(cls._ip_version) | 
|  | cls.address_groups = [] | 
|  | cls.admin_address_groups = [] | 
|  | cls.address_scopes = [] | 
|  | cls.admin_address_scopes = [] | 
|  | cls.subnetpools = [] | 
|  | cls.admin_subnetpools = [] | 
|  | cls.security_groups = [] | 
|  | cls.admin_security_groups = [] | 
|  | cls.sg_rule_templates = [] | 
|  | cls.projects = [] | 
|  | cls.log_objects = [] | 
|  | cls.reserved_subnet_cidrs = set() | 
|  | cls.keypairs = [] | 
|  | cls.trunks = [] | 
|  | cls.network_segment_ranges = [] | 
|  | cls.conntrack_helpers = [] | 
|  | cls.ndp_proxies = [] | 
|  |  | 
|  | @classmethod | 
|  | def reserve_external_subnet_cidrs(cls): | 
|  | client = cls.os_admin.network_client | 
|  | ext_nets = client.list_networks( | 
|  | **{"router:external": True})['networks'] | 
|  | for ext_net in ext_nets: | 
|  | ext_subnets = client.list_subnets( | 
|  | network_id=ext_net['id'])['subnets'] | 
|  | for ext_subnet in ext_subnets: | 
|  | cls.reserve_subnet_cidr(ext_subnet['cidr']) | 
|  |  | 
|  | @classmethod | 
|  | def resource_cleanup(cls): | 
|  | if CONF.service_available.neutron: | 
|  | # Clean up trunks | 
|  | for trunk in cls.trunks: | 
|  | cls._try_delete_resource(cls.delete_trunk, trunk) | 
|  |  | 
|  | # Clean up ndp proxy | 
|  | for ndp_proxy in cls.ndp_proxies: | 
|  | cls._try_delete_resource(cls.delete_ndp_proxy, ndp_proxy) | 
|  |  | 
|  | # Clean up port forwardings | 
|  | for pf in cls.port_forwardings: | 
|  | cls._try_delete_resource(cls.delete_port_forwarding, pf) | 
|  |  | 
|  | # Clean up floating IPs | 
|  | for floating_ip in cls.floating_ips: | 
|  | cls._try_delete_resource(cls.delete_floatingip, floating_ip) | 
|  |  | 
|  | # Clean up Local IP Associations | 
|  | for association in cls.local_ip_associations: | 
|  | cls._try_delete_resource(cls.delete_local_ip_association, | 
|  | association) | 
|  | # Clean up Local IPs | 
|  | for local_ip in cls.local_ips: | 
|  | cls._try_delete_resource(cls.delete_local_ip, | 
|  | local_ip) | 
|  |  | 
|  | # Clean up conntrack helpers | 
|  | for cth in cls.conntrack_helpers: | 
|  | cls._try_delete_resource(cls.delete_conntrack_helper, cth) | 
|  |  | 
|  | # Clean up routers | 
|  | for router in cls.routers: | 
|  | cls._try_delete_resource(cls.delete_router, | 
|  | router) | 
|  | # Clean up metering label rules | 
|  | for metering_label_rule in cls.metering_label_rules: | 
|  | cls._try_delete_resource( | 
|  | cls.admin_client.delete_metering_label_rule, | 
|  | metering_label_rule['id']) | 
|  | # Clean up metering labels | 
|  | for metering_label in cls.metering_labels: | 
|  | cls._try_delete_resource( | 
|  | cls.admin_client.delete_metering_label, | 
|  | metering_label['id']) | 
|  | # Clean up flavors | 
|  | for flavor in cls.flavors: | 
|  | cls._try_delete_resource( | 
|  | cls.admin_client.delete_flavor, | 
|  | flavor['id']) | 
|  | # Clean up service profiles | 
|  | for service_profile in cls.service_profiles: | 
|  | cls._try_delete_resource( | 
|  | cls.admin_client.delete_service_profile, | 
|  | service_profile['id']) | 
|  | # Clean up ports | 
|  | for port in cls.ports: | 
|  | cls._try_delete_resource(cls.client.delete_port, | 
|  | port['id']) | 
|  | # Clean up subnets | 
|  | for subnet in cls.subnets: | 
|  | cls._try_delete_resource(cls.client.delete_subnet, | 
|  | subnet['id']) | 
|  | # Clean up admin subnets | 
|  | for subnet in cls.admin_subnets: | 
|  | cls._try_delete_resource(cls.admin_client.delete_subnet, | 
|  | subnet['id']) | 
|  | # Clean up networks | 
|  | for network in cls.networks: | 
|  | cls._try_delete_resource(cls.delete_network, network) | 
|  |  | 
|  | # Clean up admin networks | 
|  | for network in cls.admin_networks: | 
|  | cls._try_delete_resource(cls.admin_client.delete_network, | 
|  | network['id']) | 
|  |  | 
|  | # Clean up security groups | 
|  | for security_group in cls.security_groups: | 
|  | cls._try_delete_resource(cls.delete_security_group, | 
|  | security_group) | 
|  |  | 
|  | # Clean up admin security groups | 
|  | for security_group in cls.admin_security_groups: | 
|  | cls._try_delete_resource(cls.delete_security_group, | 
|  | security_group, | 
|  | client=cls.admin_client) | 
|  |  | 
|  | # Clean up security group rule templates | 
|  | for sg_rule_template in cls.sg_rule_templates: | 
|  | cls._try_delete_resource( | 
|  | cls.admin_client.delete_default_security_group_rule, | 
|  | sg_rule_template['id']) | 
|  |  | 
|  | for subnetpool in cls.subnetpools: | 
|  | cls._try_delete_resource(cls.client.delete_subnetpool, | 
|  | subnetpool['id']) | 
|  |  | 
|  | for subnetpool in cls.admin_subnetpools: | 
|  | cls._try_delete_resource(cls.admin_client.delete_subnetpool, | 
|  | subnetpool['id']) | 
|  |  | 
|  | for address_scope in cls.address_scopes: | 
|  | cls._try_delete_resource(cls.client.delete_address_scope, | 
|  | address_scope['id']) | 
|  |  | 
|  | for address_scope in cls.admin_address_scopes: | 
|  | cls._try_delete_resource( | 
|  | cls.admin_client.delete_address_scope, | 
|  | address_scope['id']) | 
|  |  | 
|  | for project in cls.projects: | 
|  | cls._try_delete_resource( | 
|  | cls.identity_admin_client.delete_project, | 
|  | project['id']) | 
|  |  | 
|  | # Clean up QoS rules | 
|  | for qos_rule in cls.qos_rules: | 
|  | cls._try_delete_resource(cls.admin_client.delete_qos_rule, | 
|  | qos_rule['id']) | 
|  | # Clean up QoS policies | 
|  | # as all networks and ports are already removed, QoS policies | 
|  | # shouldn't be "in use" | 
|  | for qos_policy in cls.qos_policies: | 
|  | cls._try_delete_resource(cls.admin_client.delete_qos_policy, | 
|  | qos_policy['id']) | 
|  |  | 
|  | # Clean up log_objects | 
|  | for log_object in cls.log_objects: | 
|  | cls._try_delete_resource(cls.admin_client.delete_log, | 
|  | log_object['id']) | 
|  |  | 
|  | for keypair in cls.keypairs: | 
|  | cls._try_delete_resource(cls.delete_keypair, keypair) | 
|  |  | 
|  | # Clean up network_segment_ranges | 
|  | for network_segment_range in cls.network_segment_ranges: | 
|  | cls._try_delete_resource( | 
|  | cls.admin_client.delete_network_segment_range, | 
|  | network_segment_range['id']) | 
|  |  | 
|  | super(BaseNetworkTest, cls).resource_cleanup() | 
|  |  | 
|  | @classmethod | 
|  | def _try_delete_resource(cls, delete_callable, *args, **kwargs): | 
|  | """Cleanup resources in case of test-failure | 
|  |  | 
|  | Some resources are explicitly deleted by the test. | 
|  | If the test failed to delete a resource, this method will execute | 
|  | the appropriate delete methods. Otherwise, the method ignores NotFound | 
|  | exceptions thrown for resources that were correctly deleted by the | 
|  | test. | 
|  |  | 
|  | :param delete_callable: delete method | 
|  | :param args: arguments for delete method | 
|  | :param kwargs: keyword arguments for delete method | 
|  | """ | 
|  | try: | 
|  | delete_callable(*args, **kwargs) | 
|  | # if resource is not found, this means it was deleted in the test | 
|  | except lib_exc.NotFound: | 
|  | pass | 
|  |  | 
|  | @classmethod | 
|  | def create_network(cls, network_name=None, client=None, external=None, | 
|  | shared=None, provider_network_type=None, | 
|  | provider_physical_network=None, | 
|  | provider_segmentation_id=None, **kwargs): | 
|  | """Create a network. | 
|  |  | 
|  | When client is not provider and admin_client is attribute is not None | 
|  | (for example when using BaseAdminNetworkTest base class) and using any | 
|  | of the convenience parameters (external, shared, provider_network_type, | 
|  | provider_physical_network and provider_segmentation_id) it silently | 
|  | uses admin_client. If the network is not shared then it uses the same | 
|  | project_id as regular client. | 
|  |  | 
|  | :param network_name: Human-readable name of the network | 
|  |  | 
|  | :param client: client to be used for connecting to network service | 
|  |  | 
|  | :param external: indicates whether the network has an external routing | 
|  | facility that's not managed by the networking service. | 
|  |  | 
|  | :param shared: indicates whether this resource is shared across all | 
|  | projects. By default, only administrative users can change this value. | 
|  | If True and admin_client attribute is not None, then the network is | 
|  | created under administrative project. | 
|  |  | 
|  | :param provider_network_type: the type of physical network that this | 
|  | network should be mapped to. For example, 'flat', 'vlan', 'vxlan', or | 
|  | 'gre'. Valid values depend on a networking back-end. | 
|  |  | 
|  | :param provider_physical_network: the physical network where this | 
|  | network should be implemented. The Networking API v2.0 does not provide | 
|  | a way to list available physical networks. For example, the Open | 
|  | vSwitch plug-in configuration file defines a symbolic name that maps to | 
|  | specific bridges on each compute host. | 
|  |  | 
|  | :param provider_segmentation_id: The ID of the isolated segment on the | 
|  | physical network. The network_type attribute defines the segmentation | 
|  | model. For example, if the network_type value is 'vlan', this ID is a | 
|  | vlan identifier. If the network_type value is 'gre', this ID is a gre | 
|  | key. | 
|  |  | 
|  | :param **kwargs: extra parameters to be forwarded to network service | 
|  | """ | 
|  |  | 
|  | name = (network_name or kwargs.pop('name', None) or | 
|  | data_utils.rand_name('test-network-')) | 
|  |  | 
|  | # translate convenience parameters | 
|  | admin_client_required = False | 
|  | if provider_network_type: | 
|  | admin_client_required = True | 
|  | kwargs['provider:network_type'] = provider_network_type | 
|  | if provider_physical_network: | 
|  | admin_client_required = True | 
|  | kwargs['provider:physical_network'] = provider_physical_network | 
|  | if provider_segmentation_id: | 
|  | admin_client_required = True | 
|  | kwargs['provider:segmentation_id'] = provider_segmentation_id | 
|  | if external is not None: | 
|  | admin_client_required = True | 
|  | kwargs['router:external'] = bool(external) | 
|  | if shared is not None: | 
|  | admin_client_required = True | 
|  | kwargs['shared'] = bool(shared) | 
|  |  | 
|  | if not client: | 
|  | if admin_client_required and cls.admin_client: | 
|  | # For convenience silently switch to admin client | 
|  | client = cls.admin_client | 
|  | if not shared: | 
|  | # Keep this network visible from current project | 
|  | project_id = (kwargs.get('project_id') or | 
|  | kwargs.get('tenant_id') or | 
|  | cls.client.project_id) | 
|  | kwargs.update(project_id=project_id, tenant_id=project_id) | 
|  | else: | 
|  | # Use default client | 
|  | client = cls.client | 
|  |  | 
|  | network = client.create_network(name=name, **kwargs)['network'] | 
|  | network['client'] = client | 
|  | cls.networks.append(network) | 
|  | return network | 
|  |  | 
|  | @classmethod | 
|  | def delete_network(cls, network, client=None): | 
|  | client = client or network.get('client') or cls.client | 
|  | client.delete_network(network['id']) | 
|  |  | 
|  | @classmethod | 
|  | def create_shared_network(cls, network_name=None, **kwargs): | 
|  | return cls.create_network(name=network_name, shared=True, **kwargs) | 
|  |  | 
|  | @classmethod | 
|  | def create_subnet(cls, network, gateway='', cidr=None, mask_bits=None, | 
|  | ip_version=None, client=None, reserve_cidr=True, | 
|  | allocation_pool_size=None, **kwargs): | 
|  | """Wrapper utility that returns a test subnet. | 
|  |  | 
|  | Convenient wrapper for client.create_subnet method. It reserves and | 
|  | allocates CIDRs to avoid creating overlapping subnets. | 
|  |  | 
|  | :param network: network where to create the subnet | 
|  | network['id'] must contain the ID of the network | 
|  |  | 
|  | :param gateway: gateway IP address | 
|  | It can be a str or a netaddr.IPAddress | 
|  | If gateway is not given, then it will use default address for | 
|  | given subnet CIDR, like "192.168.0.1" for "192.168.0.0/24" CIDR | 
|  | if gateway is given as None then no gateway will be assigned | 
|  |  | 
|  | :param cidr: CIDR of the subnet to create | 
|  | It can be either None, a str or a netaddr.IPNetwork instance | 
|  |  | 
|  | :param mask_bits: CIDR prefix length | 
|  | It can be either None or a numeric value. | 
|  | If cidr parameter is given then mask_bits is used to determinate a | 
|  | sequence of valid CIDR to use as generated. | 
|  | Please see netaddr.IPNetwork.subnet method documentation[1] | 
|  |  | 
|  | :param ip_version: ip version of generated subnet CIDRs | 
|  | It can be None, IP_VERSION_4 or IP_VERSION_6 | 
|  | It has to match given either given CIDR and gateway | 
|  |  | 
|  | :param ip_version: numeric value (either IP_VERSION_4 or IP_VERSION_6) | 
|  | this value must match CIDR and gateway IP versions if any of them is | 
|  | given | 
|  |  | 
|  | :param client: client to be used to connect to network service | 
|  |  | 
|  | :param reserve_cidr: if True then it reserves assigned CIDR to avoid | 
|  | using the same CIDR for further subnets in the scope of the same | 
|  | test case class | 
|  |  | 
|  | :param allocation_pool_size: if the CIDR is not defined, this method | 
|  | will assign one in ``get_subnet_cidrs``. Once done, the allocation pool | 
|  | will be defined reserving the number of IP addresses requested, | 
|  | starting from the end of the assigned CIDR. | 
|  |  | 
|  | :param **kwargs: optional parameters to be forwarded to wrapped method | 
|  |  | 
|  | [1] http://netaddr.readthedocs.io/en/latest/tutorial_01.html#supernets-and-subnets  # noqa | 
|  | """ | 
|  | def allocation_pool(cidr, pool_size): | 
|  | start = str(netaddr.IPAddress(cidr.last) - pool_size) | 
|  | end = str(netaddr.IPAddress(cidr.last) - 1) | 
|  | return {'start': start, 'end': end} | 
|  |  | 
|  | # allow tests to use admin client | 
|  | if not client: | 
|  | client = cls.client | 
|  |  | 
|  | if gateway: | 
|  | gateway_ip = netaddr.IPAddress(gateway) | 
|  | if ip_version: | 
|  | if ip_version != gateway_ip.version: | 
|  | raise ValueError( | 
|  | "Gateway IP version doesn't match IP version") | 
|  | else: | 
|  | ip_version = gateway_ip.version | 
|  | else: | 
|  | ip_version = ip_version or cls._ip_version | 
|  |  | 
|  | for subnet_cidr in cls.get_subnet_cidrs( | 
|  | ip_version=ip_version, cidr=cidr, mask_bits=mask_bits): | 
|  | if gateway is not None: | 
|  | kwargs['gateway_ip'] = str(gateway or (subnet_cidr.ip + 1)) | 
|  | else: | 
|  | kwargs['gateway_ip'] = None | 
|  | if allocation_pool_size: | 
|  | kwargs['allocation_pools'] = [ | 
|  | allocation_pool(subnet_cidr, allocation_pool_size)] | 
|  | try: | 
|  | body = client.create_subnet( | 
|  | network_id=network['id'], | 
|  | cidr=str(subnet_cidr), | 
|  | ip_version=subnet_cidr.version, | 
|  | **kwargs) | 
|  | break | 
|  | except lib_exc.BadRequest as e: | 
|  | if 'overlaps with another subnet' not in str(e): | 
|  | raise | 
|  | else: | 
|  | message = 'Available CIDR for subnet creation could not be found' | 
|  | raise ValueError(message) | 
|  | subnet = body['subnet'] | 
|  | if client is cls.client: | 
|  | cls.subnets.append(subnet) | 
|  | else: | 
|  | cls.admin_subnets.append(subnet) | 
|  | if reserve_cidr: | 
|  | cls.reserve_subnet_cidr(subnet_cidr) | 
|  | return subnet | 
|  |  | 
|  | @classmethod | 
|  | def reserve_subnet_cidr(cls, addr, **ipnetwork_kwargs): | 
|  | """Reserve given subnet CIDR making sure it's not used by create_subnet | 
|  |  | 
|  | :param addr: the CIDR address to be reserved | 
|  | It can be a str or netaddr.IPNetwork instance | 
|  |  | 
|  | :param **ipnetwork_kwargs: optional netaddr.IPNetwork constructor | 
|  | parameters | 
|  | """ | 
|  |  | 
|  | if not cls.try_reserve_subnet_cidr(addr, **ipnetwork_kwargs): | 
|  | raise ValueError('Subnet CIDR already reserved: {0!r}'.format( | 
|  | addr)) | 
|  |  | 
|  | @classmethod | 
|  | def try_reserve_subnet_cidr(cls, addr, **ipnetwork_kwargs): | 
|  | """Reserve given subnet CIDR if it hasn't been reserved before | 
|  |  | 
|  | :param addr: the CIDR address to be reserved | 
|  | It can be a str or netaddr.IPNetwork instance | 
|  |  | 
|  | :param **ipnetwork_kwargs: optional netaddr.IPNetwork constructor | 
|  | parameters | 
|  |  | 
|  | :return: True if it wasn't reserved before, False elsewhere. | 
|  | """ | 
|  |  | 
|  | subnet_cidr = netaddr.IPNetwork(addr, **ipnetwork_kwargs) | 
|  | if subnet_cidr in cls.reserved_subnet_cidrs: | 
|  | return False | 
|  | else: | 
|  | cls.reserved_subnet_cidrs.add(subnet_cidr) | 
|  | return True | 
|  |  | 
|  | @classmethod | 
|  | def get_subnet_cidrs( | 
|  | cls, cidr=None, mask_bits=None, ip_version=None): | 
|  | """Iterate over a sequence of unused subnet CIDR for IP version | 
|  |  | 
|  | :param cidr: CIDR of the subnet to create | 
|  | It can be either None, a str or a netaddr.IPNetwork instance | 
|  |  | 
|  | :param mask_bits: CIDR prefix length | 
|  | It can be either None or a numeric value. | 
|  | If cidr parameter is given then mask_bits is used to determinate a | 
|  | sequence of valid CIDR to use as generated. | 
|  | Please see netaddr.IPNetwork.subnet method documentation[1] | 
|  |  | 
|  | :param ip_version: ip version of generated subnet CIDRs | 
|  | It can be None, IP_VERSION_4 or IP_VERSION_6 | 
|  | It has to match given CIDR if given | 
|  |  | 
|  | :return: iterator over reserved CIDRs of type netaddr.IPNetwork | 
|  |  | 
|  | [1] http://netaddr.readthedocs.io/en/latest/tutorial_01.html#supernets-and-subnets  # noqa | 
|  | """ | 
|  |  | 
|  | if cidr: | 
|  | # Generate subnet CIDRs starting from given CIDR | 
|  | # checking it is of requested IP version | 
|  | cidr = netaddr.IPNetwork(cidr, version=ip_version) | 
|  | else: | 
|  | # Generate subnet CIDRs starting from configured values | 
|  | ip_version = ip_version or cls._ip_version | 
|  | if ip_version == const.IP_VERSION_4: | 
|  | mask_bits = mask_bits or CONF.network.project_network_mask_bits | 
|  | cidr = netaddr.IPNetwork(CONF.network.project_network_cidr) | 
|  | elif ip_version == const.IP_VERSION_6: | 
|  | mask_bits = CONF.network.project_network_v6_mask_bits | 
|  | cidr = netaddr.IPNetwork(CONF.network.project_network_v6_cidr) | 
|  | else: | 
|  | raise ValueError('Invalid IP version: {!r}'.format(ip_version)) | 
|  |  | 
|  | if mask_bits: | 
|  | subnet_cidrs = cidr.subnet(mask_bits) | 
|  | else: | 
|  | subnet_cidrs = iter([cidr]) | 
|  |  | 
|  | for subnet_cidr in subnet_cidrs: | 
|  | if subnet_cidr not in cls.reserved_subnet_cidrs: | 
|  | yield subnet_cidr | 
|  |  | 
|  | @classmethod | 
|  | def create_port(cls, network, **kwargs): | 
|  | """Wrapper utility that returns a test port.""" | 
|  | if CONF.network.port_vnic_type and 'binding:vnic_type' not in kwargs: | 
|  | kwargs['binding:vnic_type'] = CONF.network.port_vnic_type | 
|  | if CONF.network.port_profile and 'binding:profile' not in kwargs: | 
|  | kwargs['binding:profile'] = CONF.network.port_profile | 
|  | body = cls.client.create_port(network_id=network['id'], | 
|  | **kwargs) | 
|  | port = body['port'] | 
|  | cls.ports.append(port) | 
|  | return port | 
|  |  | 
|  | @classmethod | 
|  | def update_port(cls, port, **kwargs): | 
|  | """Wrapper utility that updates a test port.""" | 
|  | body = cls.client.update_port(port['id'], | 
|  | **kwargs) | 
|  | return body['port'] | 
|  |  | 
|  | @classmethod | 
|  | def _create_router_with_client( | 
|  | cls, client, router_name=None, admin_state_up=False, | 
|  | external_network_id=None, enable_snat=None, **kwargs | 
|  | ): | 
|  | ext_gw_info = {} | 
|  | if external_network_id: | 
|  | ext_gw_info['network_id'] = external_network_id | 
|  | if enable_snat is not None: | 
|  | ext_gw_info['enable_snat'] = enable_snat | 
|  | body = client.create_router( | 
|  | router_name, external_gateway_info=ext_gw_info, | 
|  | admin_state_up=admin_state_up, **kwargs) | 
|  | router = body['router'] | 
|  | cls.routers.append(router) | 
|  | return router | 
|  |  | 
|  | @classmethod | 
|  | def create_router(cls, *args, **kwargs): | 
|  | return cls._create_router_with_client(cls.client, *args, **kwargs) | 
|  |  | 
|  | @classmethod | 
|  | def create_admin_router(cls, *args, **kwargs): | 
|  | return cls._create_router_with_client(cls.os_admin.network_client, | 
|  | *args, **kwargs) | 
|  |  | 
|  | @classmethod | 
|  | def create_floatingip(cls, external_network_id=None, port=None, | 
|  | client=None, **kwargs): | 
|  | """Creates a floating IP. | 
|  |  | 
|  | Create a floating IP and schedule it for later deletion. | 
|  | If a client is passed, then it is used for deleting the IP too. | 
|  |  | 
|  | :param external_network_id: network ID where to create | 
|  | By default this is 'CONF.network.public_network_id'. | 
|  |  | 
|  | :param port: port to bind floating IP to | 
|  | This is translated to 'port_id=port['id']' | 
|  | By default it is None. | 
|  |  | 
|  | :param client: network client to be used for creating and cleaning up | 
|  | the floating IP. | 
|  |  | 
|  | :param **kwargs: additional creation parameters to be forwarded to | 
|  | networking server. | 
|  | """ | 
|  |  | 
|  | client = client or cls.client | 
|  | external_network_id = (external_network_id or | 
|  | cls.external_network_id) | 
|  |  | 
|  | if port: | 
|  | port_id = kwargs.setdefault('port_id', port['id']) | 
|  | if port_id != port['id']: | 
|  | message = "Port ID specified twice: {!s} != {!s}".format( | 
|  | port_id, port['id']) | 
|  | raise ValueError(message) | 
|  |  | 
|  | fip = client.create_floatingip(external_network_id, | 
|  | **kwargs)['floatingip'] | 
|  |  | 
|  | # save client to be used later in cls.delete_floatingip | 
|  | # for final cleanup | 
|  | fip['client'] = client | 
|  | cls.floating_ips.append(fip) | 
|  | return fip | 
|  |  | 
|  | @classmethod | 
|  | def delete_floatingip(cls, floating_ip, client=None): | 
|  | """Delete floating IP | 
|  |  | 
|  | :param client: Client to be used | 
|  | If client is not given it will use the client used to create | 
|  | the floating IP, or cls.client if unknown. | 
|  | """ | 
|  |  | 
|  | client = client or floating_ip.get('client') or cls.client | 
|  | client.delete_floatingip(floating_ip['id']) | 
|  |  | 
|  | @classmethod | 
|  | def create_port_forwarding(cls, fip_id, internal_port_id, | 
|  | internal_port, external_port, | 
|  | internal_ip_address=None, protocol="tcp", | 
|  | client=None): | 
|  | """Creates a port forwarding. | 
|  |  | 
|  | Create a port forwarding and schedule it for later deletion. | 
|  | If a client is passed, then it is used for deleting the PF too. | 
|  |  | 
|  | :param fip_id: The ID of the floating IP address. | 
|  |  | 
|  | :param internal_port_id: The ID of the Neutron port associated to | 
|  | the floating IP port forwarding. | 
|  |  | 
|  | :param internal_port: The TCP/UDP/other protocol port number of the | 
|  | Neutron port fixed IP address associated to the floating ip | 
|  | port forwarding. | 
|  |  | 
|  | :param external_port: The TCP/UDP/other protocol port number of | 
|  | the port forwarding floating IP address. | 
|  |  | 
|  | :param internal_ip_address: The fixed IPv4 address of the Neutron | 
|  | port associated to the floating IP port forwarding. | 
|  |  | 
|  | :param protocol: The IP protocol used in the floating IP port | 
|  | forwarding. | 
|  |  | 
|  | :param client: network client to be used for creating and cleaning up | 
|  | the floating IP port forwarding. | 
|  | """ | 
|  |  | 
|  | client = client or cls.client | 
|  |  | 
|  | pf = client.create_port_forwarding( | 
|  | fip_id, internal_port_id, internal_port, external_port, | 
|  | internal_ip_address, protocol)['port_forwarding'] | 
|  |  | 
|  | # save ID of floating IP associated with port forwarding for final | 
|  | # cleanup | 
|  | pf['floatingip_id'] = fip_id | 
|  |  | 
|  | # save client to be used later in cls.delete_port_forwarding | 
|  | # for final cleanup | 
|  | pf['client'] = client | 
|  | cls.port_forwardings.append(pf) | 
|  | return pf | 
|  |  | 
|  | @classmethod | 
|  | def update_port_forwarding(cls, fip_id, pf_id, client=None, **kwargs): | 
|  | """Wrapper utility for update_port_forwarding.""" | 
|  | client = client or cls.client | 
|  | return client.update_port_forwarding(fip_id, pf_id, **kwargs) | 
|  |  | 
|  | @classmethod | 
|  | def delete_port_forwarding(cls, pf, client=None): | 
|  | """Delete port forwarding | 
|  |  | 
|  | :param client: Client to be used | 
|  | If client is not given it will use the client used to create | 
|  | the port forwarding, or cls.client if unknown. | 
|  | """ | 
|  |  | 
|  | client = client or pf.get('client') or cls.client | 
|  | client.delete_port_forwarding(pf['floatingip_id'], pf['id']) | 
|  |  | 
|  | def create_local_ip(cls, network_id=None, | 
|  | client=None, **kwargs): | 
|  | """Creates a Local IP. | 
|  |  | 
|  | Create a Local IP and schedule it for later deletion. | 
|  | If a client is passed, then it is used for deleting the IP too. | 
|  |  | 
|  | :param network_id: network ID where to create | 
|  | By default this is 'CONF.network.public_network_id'. | 
|  |  | 
|  | :param client: network client to be used for creating and cleaning up | 
|  | the Local IP. | 
|  |  | 
|  | :param **kwargs: additional creation parameters to be forwarded to | 
|  | networking server. | 
|  | """ | 
|  |  | 
|  | client = client or cls.client | 
|  | network_id = (network_id or | 
|  | cls.external_network_id) | 
|  |  | 
|  | local_ip = client.create_local_ip(network_id, | 
|  | **kwargs)['local_ip'] | 
|  |  | 
|  | # save client to be used later in cls.delete_local_ip | 
|  | # for final cleanup | 
|  | local_ip['client'] = client | 
|  | cls.local_ips.append(local_ip) | 
|  | return local_ip | 
|  |  | 
|  | @classmethod | 
|  | def delete_local_ip(cls, local_ip, client=None): | 
|  | """Delete Local IP | 
|  |  | 
|  | :param client: Client to be used | 
|  | If client is not given it will use the client used to create | 
|  | the Local IP, or cls.client if unknown. | 
|  | """ | 
|  |  | 
|  | client = client or local_ip.get('client') or cls.client | 
|  | client.delete_local_ip(local_ip['id']) | 
|  |  | 
|  | @classmethod | 
|  | def create_local_ip_association(cls, local_ip_id, fixed_port_id, | 
|  | fixed_ip_address=None, client=None): | 
|  | """Creates a Local IP association. | 
|  |  | 
|  | Create a Local IP Association and schedule it for later deletion. | 
|  | If a client is passed, then it is used for deleting the association | 
|  | too. | 
|  |  | 
|  | :param local_ip_id: The ID of the Local IP. | 
|  |  | 
|  | :param fixed_port_id: The ID of the Neutron port | 
|  | to be associated with the Local IP | 
|  |  | 
|  | :param fixed_ip_address: The fixed IPv4 address of the Neutron | 
|  | port to be associated with the Local IP | 
|  |  | 
|  | :param client: network client to be used for creating and cleaning up | 
|  | the Local IP Association. | 
|  | """ | 
|  |  | 
|  | client = client or cls.client | 
|  |  | 
|  | association = client.create_local_ip_association( | 
|  | local_ip_id, fixed_port_id, | 
|  | fixed_ip_address)['port_association'] | 
|  |  | 
|  | # save ID of Local IP  for final cleanup | 
|  | association['local_ip_id'] = local_ip_id | 
|  |  | 
|  | # save client to be used later in | 
|  | # cls.delete_local_ip_association for final cleanup | 
|  | association['client'] = client | 
|  | cls.local_ip_associations.append(association) | 
|  | return association | 
|  |  | 
|  | @classmethod | 
|  | def delete_local_ip_association(cls, association, client=None): | 
|  |  | 
|  | """Delete Local IP Association | 
|  |  | 
|  | :param client: Client to be used | 
|  | If client is not given it will use the client used to create | 
|  | the local IP association, or cls.client if unknown. | 
|  | """ | 
|  |  | 
|  | client = client or association.get('client') or cls.client | 
|  | client.delete_local_ip_association(association['local_ip_id'], | 
|  | association['fixed_port_id']) | 
|  |  | 
|  | @classmethod | 
|  | def create_router_interface(cls, router_id, subnet_id, client=None): | 
|  | """Wrapper utility that returns a router interface.""" | 
|  | client = client or cls.client | 
|  | interface = client.add_router_interface_with_subnet_id( | 
|  | router_id, subnet_id) | 
|  | return interface | 
|  |  | 
|  | @classmethod | 
|  | def add_extra_routes_atomic(cls, *args, **kwargs): | 
|  | return cls.client.add_extra_routes_atomic(*args, **kwargs) | 
|  |  | 
|  | @classmethod | 
|  | def remove_extra_routes_atomic(cls, *args, **kwargs): | 
|  | return cls.client.remove_extra_routes_atomic(*args, **kwargs) | 
|  |  | 
|  | @classmethod | 
|  | def get_supported_qos_rule_types(cls): | 
|  | body = cls.client.list_qos_rule_types() | 
|  | return [rule_type['type'] for rule_type in body['rule_types']] | 
|  |  | 
|  | @classmethod | 
|  | def create_qos_policy(cls, name, description=None, shared=False, | 
|  | project_id=None, is_default=False): | 
|  | """Wrapper utility that returns a test QoS policy.""" | 
|  | body = cls.admin_client.create_qos_policy( | 
|  | name, description, shared, project_id, is_default) | 
|  | qos_policy = body['policy'] | 
|  | cls.qos_policies.append(qos_policy) | 
|  | return qos_policy | 
|  |  | 
|  | @classmethod | 
|  | def create_qos_dscp_marking_rule(cls, policy_id, dscp_mark): | 
|  | """Wrapper utility that creates and returns a QoS dscp rule.""" | 
|  | body = cls.admin_client.create_dscp_marking_rule( | 
|  | policy_id, dscp_mark) | 
|  | qos_rule = body['dscp_marking_rule'] | 
|  | cls.qos_rules.append(qos_rule) | 
|  | return qos_rule | 
|  |  | 
|  | @classmethod | 
|  | def delete_router(cls, router, client=None): | 
|  | client = client or cls.client | 
|  | if 'routes' in router: | 
|  | client.remove_router_extra_routes(router['id']) | 
|  | body = client.list_router_interfaces(router['id']) | 
|  | interfaces = [port for port in body['ports'] | 
|  | if port['device_owner'] in const.ROUTER_INTERFACE_OWNERS] | 
|  | for i in interfaces: | 
|  | try: | 
|  | client.remove_router_interface_with_subnet_id( | 
|  | router['id'], i['fixed_ips'][0]['subnet_id']) | 
|  | except lib_exc.NotFound: | 
|  | pass | 
|  | client.delete_router(router['id']) | 
|  |  | 
|  | @classmethod | 
|  | def create_address_scope(cls, name, is_admin=False, **kwargs): | 
|  | if is_admin: | 
|  | body = cls.admin_client.create_address_scope(name=name, **kwargs) | 
|  | cls.admin_address_scopes.append(body['address_scope']) | 
|  | else: | 
|  | body = cls.client.create_address_scope(name=name, **kwargs) | 
|  | cls.address_scopes.append(body['address_scope']) | 
|  | return body['address_scope'] | 
|  |  | 
|  | @classmethod | 
|  | def create_subnetpool(cls, name, is_admin=False, client=None, **kwargs): | 
|  | if client is None: | 
|  | client = cls.admin_client if is_admin else cls.client | 
|  |  | 
|  | if is_admin: | 
|  | body = client.create_subnetpool(name, **kwargs) | 
|  | cls.admin_subnetpools.append(body['subnetpool']) | 
|  | else: | 
|  | body = client.create_subnetpool(name, **kwargs) | 
|  | cls.subnetpools.append(body['subnetpool']) | 
|  | return body['subnetpool'] | 
|  |  | 
|  | @classmethod | 
|  | def create_address_group(cls, name, is_admin=False, **kwargs): | 
|  | if is_admin: | 
|  | body = cls.admin_client.create_address_group(name=name, **kwargs) | 
|  | cls.admin_address_groups.append(body['address_group']) | 
|  | else: | 
|  | body = cls.client.create_address_group(name=name, **kwargs) | 
|  | cls.address_groups.append(body['address_group']) | 
|  | return body['address_group'] | 
|  |  | 
|  | @classmethod | 
|  | def create_project(cls, name=None, description=None): | 
|  | test_project = name or data_utils.rand_name('test_project_') | 
|  | test_description = description or data_utils.rand_name('desc_') | 
|  | project = cls.identity_admin_client.create_project( | 
|  | name=test_project, | 
|  | description=test_description)['project'] | 
|  | cls.projects.append(project) | 
|  | # Create a project will create a default security group. | 
|  | sgs_list = cls.admin_client.list_security_groups( | 
|  | tenant_id=project['id'])['security_groups'] | 
|  | for security_group in sgs_list: | 
|  | # Make sure delete_security_group method will use | 
|  | # the admin client for this group | 
|  | security_group['client'] = cls.admin_client | 
|  | cls.security_groups.append(security_group) | 
|  | return project | 
|  |  | 
|  | @classmethod | 
|  | def create_security_group(cls, name=None, project=None, client=None, | 
|  | **kwargs): | 
|  | if project: | 
|  | client = client or cls.admin_client | 
|  | project_id = kwargs.setdefault('project_id', project['id']) | 
|  | tenant_id = kwargs.setdefault('tenant_id', project['id']) | 
|  | if project_id != project['id'] or tenant_id != project['id']: | 
|  | raise ValueError('Project ID specified multiple times') | 
|  | else: | 
|  | client = client or cls.client | 
|  |  | 
|  | name = name or data_utils.rand_name(cls.__name__) | 
|  | security_group = client.create_security_group(name=name, **kwargs)[ | 
|  | 'security_group'] | 
|  | security_group['client'] = client | 
|  | cls.security_groups.append(security_group) | 
|  | return security_group | 
|  |  | 
|  | @classmethod | 
|  | def delete_security_group(cls, security_group, client=None): | 
|  | client = client or security_group.get('client') or cls.client | 
|  | client.delete_security_group(security_group['id']) | 
|  |  | 
|  | @classmethod | 
|  | def get_security_group(cls, name='default', client=None): | 
|  | client = client or cls.client | 
|  | security_groups = client.list_security_groups()['security_groups'] | 
|  | for security_group in security_groups: | 
|  | if security_group['name'] == name: | 
|  | return security_group | 
|  | raise ValueError("No such security group named {!r}".format(name)) | 
|  |  | 
|  | @classmethod | 
|  | def create_security_group_rule(cls, security_group=None, project=None, | 
|  | client=None, ip_version=None, **kwargs): | 
|  | if project: | 
|  | client = client or cls.admin_client | 
|  | project_id = kwargs.setdefault('project_id', project['id']) | 
|  | tenant_id = kwargs.setdefault('tenant_id', project['id']) | 
|  | if project_id != project['id'] or tenant_id != project['id']: | 
|  | raise ValueError('Project ID specified multiple times') | 
|  |  | 
|  | if 'security_group_id' not in kwargs: | 
|  | security_group = (security_group or | 
|  | cls.get_security_group(client=client)) | 
|  |  | 
|  | if security_group: | 
|  | client = client or security_group.get('client') | 
|  | security_group_id = kwargs.setdefault('security_group_id', | 
|  | security_group['id']) | 
|  | if security_group_id != security_group['id']: | 
|  | raise ValueError('Security group ID specified multiple times.') | 
|  |  | 
|  | ip_version = ip_version or cls._ip_version | 
|  | default_params = ( | 
|  | constants.DEFAULT_SECURITY_GROUP_RULE_PARAMS[ip_version]) | 
|  | if (('remote_address_group_id' in kwargs or | 
|  | 'remote_group_id' in kwargs) and | 
|  | 'remote_ip_prefix' in default_params): | 
|  | default_params.pop('remote_ip_prefix') | 
|  | for key, value in default_params.items(): | 
|  | kwargs.setdefault(key, value) | 
|  |  | 
|  | client = client or cls.client | 
|  | return client.create_security_group_rule(**kwargs)[ | 
|  | 'security_group_rule'] | 
|  |  | 
|  | @classmethod | 
|  | def create_default_security_group_rule(cls, **kwargs): | 
|  | body = cls.admin_client.create_default_security_group_rule(**kwargs) | 
|  | default_sg_rule = body['default_security_group_rule'] | 
|  | cls.sg_rule_templates.append(default_sg_rule) | 
|  | return default_sg_rule | 
|  |  | 
|  | @classmethod | 
|  | def create_keypair(cls, client=None, name=None, **kwargs): | 
|  | client = client or cls.os_primary.keypairs_client | 
|  | name = name or data_utils.rand_name('keypair-test') | 
|  | keypair = client.create_keypair(name=name, **kwargs)['keypair'] | 
|  |  | 
|  | # save client for later cleanup | 
|  | keypair['client'] = client | 
|  | cls.keypairs.append(keypair) | 
|  | return keypair | 
|  |  | 
|  | @classmethod | 
|  | def delete_keypair(cls, keypair, client=None): | 
|  | client = (client or keypair.get('client') or | 
|  | cls.os_primary.keypairs_client) | 
|  | client.delete_keypair(keypair_name=keypair['name']) | 
|  |  | 
|  | @classmethod | 
|  | def create_trunk(cls, port=None, subports=None, client=None, **kwargs): | 
|  | """Create network trunk | 
|  |  | 
|  | :param port: dictionary containing parent port ID (port['id']) | 
|  | :param client: client to be used for connecting to networking service | 
|  | :param **kwargs: extra parameters to be forwarded to network service | 
|  |  | 
|  | :returns: dictionary containing created trunk details | 
|  | """ | 
|  | client = client or cls.client | 
|  |  | 
|  | if port: | 
|  | kwargs['port_id'] = port['id'] | 
|  |  | 
|  | trunk = client.create_trunk(subports=subports, **kwargs)['trunk'] | 
|  | # Save client reference for later deletion | 
|  | trunk['client'] = client | 
|  | cls.trunks.append(trunk) | 
|  | return trunk | 
|  |  | 
|  | @classmethod | 
|  | def delete_trunk(cls, trunk, client=None, detach_parent_port=True): | 
|  | """Delete network trunk | 
|  |  | 
|  | :param trunk: dictionary containing trunk ID (trunk['id']) | 
|  |  | 
|  | :param client: client to be used for connecting to networking service | 
|  | """ | 
|  | client = client or trunk.get('client') or cls.client | 
|  | trunk.update(client.show_trunk(trunk['id'])['trunk']) | 
|  |  | 
|  | if not trunk['admin_state_up']: | 
|  | # Cannot touch trunk before admin_state_up is True | 
|  | client.update_trunk(trunk['id'], admin_state_up=True) | 
|  | if trunk['sub_ports']: | 
|  | # Removes trunk ports before deleting it | 
|  | cls._try_delete_resource(client.remove_subports, trunk['id'], | 
|  | trunk['sub_ports']) | 
|  |  | 
|  | # we have to detach the interface from the server before | 
|  | # the trunk can be deleted. | 
|  | parent_port = {'id': trunk['port_id']} | 
|  |  | 
|  | def is_parent_port_detached(): | 
|  | parent_port.update(client.show_port(parent_port['id'])['port']) | 
|  | return not parent_port['device_id'] | 
|  |  | 
|  | if detach_parent_port and not is_parent_port_detached(): | 
|  | # this could probably happen when trunk is deleted and parent port | 
|  | # has been assigned to a VM that is still running. Here we are | 
|  | # assuming that device_id points to such VM. | 
|  | cls.os_primary.compute.InterfacesClient().delete_interface( | 
|  | parent_port['device_id'], parent_port['id']) | 
|  | utils.wait_until_true(is_parent_port_detached) | 
|  |  | 
|  | client.delete_trunk(trunk['id']) | 
|  |  | 
|  | @classmethod | 
|  | def create_conntrack_helper(cls, router_id, helper, protocol, port, | 
|  | client=None): | 
|  | """Create a conntrack helper | 
|  |  | 
|  | Create a conntrack helper and schedule it for later deletion. If a | 
|  | client is passed, then it is used for deleteing the CTH too. | 
|  |  | 
|  | :param router_id: The ID of the Neutron router associated to the | 
|  | conntrack helper. | 
|  |  | 
|  | :param helper: The conntrack helper module alias | 
|  |  | 
|  | :param protocol: The conntrack helper IP protocol used in the conntrack | 
|  | helper. | 
|  |  | 
|  | :param port: The conntrack helper IP protocol port number for the | 
|  | conntrack helper. | 
|  |  | 
|  | :param client: network client to be used for creating and cleaning up | 
|  | the conntrack helper. | 
|  | """ | 
|  |  | 
|  | client = client or cls.client | 
|  |  | 
|  | cth = client.create_conntrack_helper(router_id, helper, protocol, | 
|  | port)['conntrack_helper'] | 
|  |  | 
|  | # save ID of router associated with conntrack helper for final cleanup | 
|  | cth['router_id'] = router_id | 
|  |  | 
|  | # save client to be used later in cls.delete_conntrack_helper for final | 
|  | # cleanup | 
|  | cth['client'] = client | 
|  | cls.conntrack_helpers.append(cth) | 
|  | return cth | 
|  |  | 
|  | @classmethod | 
|  | def delete_conntrack_helper(cls, cth, client=None): | 
|  | """Delete conntrack helper | 
|  |  | 
|  | :param client: Client to be used | 
|  | If client is not given it will use the client used to create the | 
|  | conntrack helper, or cls.client if unknown. | 
|  | """ | 
|  |  | 
|  | client = client or cth.get('client') or cls.client | 
|  | client.delete_conntrack_helper(cth['router_id'], cth['id']) | 
|  |  | 
|  | @classmethod | 
|  | def create_ndp_proxy(cls, router_id, port_id, client=None, **kwargs): | 
|  | """Creates a ndp proxy. | 
|  |  | 
|  | Create a ndp proxy and schedule it for later deletion. | 
|  | If a client is passed, then it is used for deleting the NDP proxy too. | 
|  |  | 
|  | :param router_id: router ID where to create the ndp proxy. | 
|  |  | 
|  | :param port_id: port ID which the ndp proxy associate with | 
|  |  | 
|  | :param client: network client to be used for creating and cleaning up | 
|  | the ndp proxy. | 
|  |  | 
|  | :param **kwargs: additional creation parameters to be forwarded to | 
|  | networking server. | 
|  | """ | 
|  | client = client or cls.client | 
|  |  | 
|  | data = {'router_id': router_id, 'port_id': port_id} | 
|  | if kwargs: | 
|  | data.update(kwargs) | 
|  | ndp_proxy = client.create_ndp_proxy(**data)['ndp_proxy'] | 
|  |  | 
|  | # save client to be used later in cls.delete_ndp_proxy | 
|  | # for final cleanup | 
|  | ndp_proxy['client'] = client | 
|  | cls.ndp_proxies.append(ndp_proxy) | 
|  | return ndp_proxy | 
|  |  | 
|  | @classmethod | 
|  | def delete_ndp_proxy(cls, ndp_proxy, client=None): | 
|  | """Delete ndp proxy | 
|  |  | 
|  | :param client: Client to be used | 
|  | If client is not given it will use the client used to create | 
|  | the ndp proxy, or cls.client if unknown. | 
|  | """ | 
|  | client = client or ndp_proxy.get('client') or cls.client | 
|  | client.delete_ndp_proxy(ndp_proxy['id']) | 
|  |  | 
|  | @classmethod | 
|  | def get_loaded_network_extensions(cls): | 
|  | """Return the network service loaded extensions | 
|  |  | 
|  | :return: list of strings with the alias of the network service loaded | 
|  | extensions. | 
|  | """ | 
|  | body = cls.client.list_extensions() | 
|  | return [net_ext['alias'] for net_ext in body['extensions']] | 
|  |  | 
|  |  | 
|  | class BaseAdminNetworkTest(BaseNetworkTest): | 
|  |  | 
|  | credentials = ['primary', 'admin'] | 
|  |  | 
|  | @classmethod | 
|  | def setup_clients(cls): | 
|  | super(BaseAdminNetworkTest, cls).setup_clients() | 
|  | cls.admin_client = cls.os_admin.network_client | 
|  | cls.identity_admin_client = cls.os_admin.projects_client | 
|  |  | 
|  | @classmethod | 
|  | def create_metering_label(cls, name, description): | 
|  | """Wrapper utility that returns a test metering label.""" | 
|  | body = cls.admin_client.create_metering_label( | 
|  | description=description, | 
|  | name=data_utils.rand_name("metering-label")) | 
|  | metering_label = body['metering_label'] | 
|  | cls.metering_labels.append(metering_label) | 
|  | return metering_label | 
|  |  | 
|  | @classmethod | 
|  | def create_metering_label_rule(cls, remote_ip_prefix, direction, | 
|  | metering_label_id): | 
|  | """Wrapper utility that returns a test metering label rule.""" | 
|  | body = cls.admin_client.create_metering_label_rule( | 
|  | remote_ip_prefix=remote_ip_prefix, direction=direction, | 
|  | metering_label_id=metering_label_id) | 
|  | metering_label_rule = body['metering_label_rule'] | 
|  | cls.metering_label_rules.append(metering_label_rule) | 
|  | return metering_label_rule | 
|  |  | 
|  | @classmethod | 
|  | def create_network_segment_range(cls, name, shared, | 
|  | project_id, network_type, | 
|  | physical_network, minimum, | 
|  | maximum): | 
|  | """Wrapper utility that returns a test network segment range.""" | 
|  | network_segment_range_args = {'name': name, | 
|  | 'shared': shared, | 
|  | 'project_id': project_id, | 
|  | 'network_type': network_type, | 
|  | 'physical_network': physical_network, | 
|  | 'minimum': minimum, | 
|  | 'maximum': maximum} | 
|  | body = cls.admin_client.create_network_segment_range( | 
|  | **network_segment_range_args) | 
|  | network_segment_range = body['network_segment_range'] | 
|  | cls.network_segment_ranges.append(network_segment_range) | 
|  | return network_segment_range | 
|  |  | 
|  | @classmethod | 
|  | def create_flavor(cls, name, description, service_type): | 
|  | """Wrapper utility that returns a test flavor.""" | 
|  | body = cls.admin_client.create_flavor( | 
|  | description=description, service_type=service_type, | 
|  | name=name) | 
|  | flavor = body['flavor'] | 
|  | cls.flavors.append(flavor) | 
|  | return flavor | 
|  |  | 
|  | @classmethod | 
|  | def create_service_profile(cls, description, metainfo, driver): | 
|  | """Wrapper utility that returns a test service profile.""" | 
|  | body = cls.admin_client.create_service_profile( | 
|  | driver=driver, metainfo=metainfo, description=description) | 
|  | service_profile = body['service_profile'] | 
|  | cls.service_profiles.append(service_profile) | 
|  | return service_profile | 
|  |  | 
|  | @classmethod | 
|  | def create_log(cls, name, description=None, | 
|  | resource_type='security_group', resource_id=None, | 
|  | target_id=None, event='ALL', enabled=True): | 
|  | """Wrapper utility that returns a test log object.""" | 
|  | log_args = {'name': name, | 
|  | 'resource_type': resource_type, | 
|  | 'resource_id': resource_id, | 
|  | 'target_id': target_id, | 
|  | 'event': event, | 
|  | 'enabled': enabled} | 
|  | if description: | 
|  | log_args['description'] = description | 
|  | body = cls.admin_client.create_log(**log_args) | 
|  | log_object = body['log'] | 
|  | cls.log_objects.append(log_object) | 
|  | return log_object | 
|  |  | 
|  | @classmethod | 
|  | def get_unused_ip(cls, net_id, ip_version=None): | 
|  | """Get an unused ip address in a allocation pool of net""" | 
|  | body = cls.admin_client.list_ports(network_id=net_id) | 
|  | ports = body['ports'] | 
|  | used_ips = [] | 
|  | for port in ports: | 
|  | used_ips.extend( | 
|  | [fixed_ip['ip_address'] for fixed_ip in port['fixed_ips']]) | 
|  | body = cls.admin_client.list_subnets(network_id=net_id) | 
|  | subnets = body['subnets'] | 
|  |  | 
|  | for subnet in subnets: | 
|  | if ip_version and subnet['ip_version'] != ip_version: | 
|  | continue | 
|  | cidr = subnet['cidr'] | 
|  | allocation_pools = subnet['allocation_pools'] | 
|  | iterators = [] | 
|  | if allocation_pools: | 
|  | for allocation_pool in allocation_pools: | 
|  | iterators.append(netaddr.iter_iprange( | 
|  | allocation_pool['start'], allocation_pool['end'])) | 
|  | else: | 
|  | net = netaddr.IPNetwork(cidr) | 
|  |  | 
|  | def _iterip(): | 
|  | for ip in net: | 
|  | if ip not in (net.network, net.broadcast): | 
|  | yield ip | 
|  | iterators.append(iter(_iterip())) | 
|  |  | 
|  | for iterator in iterators: | 
|  | for ip in iterator: | 
|  | if str(ip) not in used_ips: | 
|  | return str(ip) | 
|  |  | 
|  | message = ( | 
|  | "net(%s) has no usable IP address in allocation pools" % net_id) | 
|  | raise exceptions.InvalidConfiguration(message) | 
|  |  | 
|  | @classmethod | 
|  | def create_provider_network(cls, physnet_name, start_segmentation_id, | 
|  | max_attempts=30, external=False): | 
|  | segmentation_id = start_segmentation_id | 
|  | for attempts in range(max_attempts): | 
|  | try: | 
|  | return cls.create_network( | 
|  | name=data_utils.rand_name('test_net'), | 
|  | shared=not external, | 
|  | external=external, | 
|  | provider_network_type='vlan', | 
|  | provider_physical_network=physnet_name, | 
|  | provider_segmentation_id=segmentation_id) | 
|  | except lib_exc.Conflict: | 
|  | segmentation_id += 1 | 
|  | if segmentation_id > 4095: | 
|  | raise lib_exc.TempestException( | 
|  | "No free segmentation id was found for provider " | 
|  | "network creation!") | 
|  | time.sleep(CONF.network.build_interval) | 
|  | LOG.exception("Failed to create provider network after " | 
|  | "%d attempts", max_attempts) | 
|  | raise lib_exc.TimeoutException | 
|  |  | 
|  |  | 
|  | def require_qos_rule_type(rule_type): | 
|  | def decorator(f): | 
|  | @functools.wraps(f) | 
|  | def wrapper(self, *func_args, **func_kwargs): | 
|  | if rule_type not in self.get_supported_qos_rule_types(): | 
|  | raise self.skipException( | 
|  | "%s rule type is required." % rule_type) | 
|  | return f(self, *func_args, **func_kwargs) | 
|  | return wrapper | 
|  | return decorator | 
|  |  | 
|  |  | 
|  | def _require_sorting(f): | 
|  | @functools.wraps(f) | 
|  | def inner(self, *args, **kwargs): | 
|  | if not tutils.is_extension_enabled("sorting", "network"): | 
|  | self.skipTest('Sorting feature is required') | 
|  | return f(self, *args, **kwargs) | 
|  | return inner | 
|  |  | 
|  |  | 
|  | def _require_pagination(f): | 
|  | @functools.wraps(f) | 
|  | def inner(self, *args, **kwargs): | 
|  | if not tutils.is_extension_enabled("pagination", "network"): | 
|  | self.skipTest('Pagination feature is required') | 
|  | return f(self, *args, **kwargs) | 
|  | return inner | 
|  |  | 
|  |  | 
|  | class BaseSearchCriteriaTest(BaseNetworkTest): | 
|  |  | 
|  | # This should be defined by subclasses to reflect resource name to test | 
|  | resource = None | 
|  |  | 
|  | field = 'name' | 
|  |  | 
|  | # NOTE(ihrachys): some names, like those starting with an underscore (_) | 
|  | # are sorted differently depending on whether the plugin implements native | 
|  | # sorting support, or not. So we avoid any such cases here, sticking to | 
|  | # alphanumeric. Also test a case when there are multiple resources with the | 
|  | # same name | 
|  | resource_names = ('test1', 'abc1', 'test10', '123test') + ('test1',) | 
|  |  | 
|  | force_tenant_isolation = True | 
|  |  | 
|  | list_kwargs = {} | 
|  |  | 
|  | list_as_admin = False | 
|  |  | 
|  | def assertSameOrder(self, original, actual): | 
|  | # gracefully handle iterators passed | 
|  | original = list(original) | 
|  | actual = list(actual) | 
|  | self.assertEqual(len(original), len(actual)) | 
|  | for expected, res in zip(original, actual): | 
|  | self.assertEqual(expected[self.field], res[self.field]) | 
|  |  | 
|  | @utils.classproperty | 
|  | def plural_name(self): | 
|  | return '%ss' % self.resource | 
|  |  | 
|  | @property | 
|  | def list_client(self): | 
|  | return self.admin_client if self.list_as_admin else self.client | 
|  |  | 
|  | def list_method(self, *args, **kwargs): | 
|  | method = getattr(self.list_client, 'list_%s' % self.plural_name) | 
|  | kwargs.update(self.list_kwargs) | 
|  | return method(*args, **kwargs) | 
|  |  | 
|  | def get_bare_url(self, url): | 
|  | base_url = self.client.base_url | 
|  | base_url_normalized = utils.normalize_url(base_url) | 
|  | url_normalized = utils.normalize_url(url) | 
|  | self.assertTrue(url_normalized.startswith(base_url_normalized)) | 
|  | return url_normalized[len(base_url_normalized):] | 
|  |  | 
|  | @classmethod | 
|  | def _extract_resources(cls, body): | 
|  | return body[cls.plural_name] | 
|  |  | 
|  | @classmethod | 
|  | def _test_resources(cls, resources): | 
|  | return [res for res in resources if res["name"] in cls.resource_names] | 
|  |  | 
|  | def _test_list_sorts(self, direction): | 
|  | sort_args = { | 
|  | 'sort_dir': direction, | 
|  | 'sort_key': self.field | 
|  | } | 
|  | body = self.list_method(**sort_args) | 
|  | resources = self._extract_resources(body) | 
|  | self.assertNotEmpty( | 
|  | resources, "%s list returned is empty" % self.resource) | 
|  | retrieved_names = [res[self.field] for res in resources] | 
|  | # sort without taking into account whether the network is named with | 
|  | # a capital letter or not | 
|  | expected = sorted(retrieved_names, key=lambda v: v.upper()) | 
|  | if direction == constants.SORT_DIRECTION_DESC: | 
|  | expected = list(reversed(expected)) | 
|  | self.assertEqual(expected, retrieved_names) | 
|  |  | 
|  | @_require_sorting | 
|  | def _test_list_sorts_asc(self): | 
|  | self._test_list_sorts(constants.SORT_DIRECTION_ASC) | 
|  |  | 
|  | @_require_sorting | 
|  | def _test_list_sorts_desc(self): | 
|  | self._test_list_sorts(constants.SORT_DIRECTION_DESC) | 
|  |  | 
|  | @_require_pagination | 
|  | def _test_list_pagination(self): | 
|  | for limit in range(1, len(self.resource_names) + 1): | 
|  | pagination_args = { | 
|  | 'limit': limit, | 
|  | } | 
|  | body = self.list_method(**pagination_args) | 
|  | resources = self._extract_resources(body) | 
|  | self.assertEqual(limit, len(resources)) | 
|  |  | 
|  | @_require_pagination | 
|  | def _test_list_no_pagination_limit_0(self): | 
|  | pagination_args = { | 
|  | 'limit': 0, | 
|  | } | 
|  | body = self.list_method(**pagination_args) | 
|  | resources = self._extract_resources(body) | 
|  | self.assertGreaterEqual(len(resources), len(self.resource_names)) | 
|  |  | 
|  | def _test_list_pagination_iteratively(self, lister): | 
|  | # first, collect all resources for later comparison | 
|  | sort_args = { | 
|  | 'sort_dir': constants.SORT_DIRECTION_ASC, | 
|  | 'sort_key': self.field | 
|  | } | 
|  | body = self.list_method(**sort_args) | 
|  | total_resources = self._extract_resources(body) | 
|  | expected_resources = self._test_resources(total_resources) | 
|  | self.assertNotEmpty(expected_resources) | 
|  |  | 
|  | resources = lister( | 
|  | len(total_resources), sort_args | 
|  | ) | 
|  |  | 
|  | # finally, compare that the list retrieved in one go is identical to | 
|  | # the one containing pagination results | 
|  | self.assertSameOrder(expected_resources, resources) | 
|  |  | 
|  | def _list_all_with_marker(self, niterations, sort_args): | 
|  | # paginate resources one by one, using last fetched resource as a | 
|  | # marker | 
|  | resources = [] | 
|  | for i in range(niterations): | 
|  | pagination_args = sort_args.copy() | 
|  | pagination_args['limit'] = 1 | 
|  | if resources: | 
|  | pagination_args['marker'] = resources[-1]['id'] | 
|  | body = self.list_method(**pagination_args) | 
|  | resources_ = self._extract_resources(body) | 
|  | # Empty resource list can be returned when any concurrent | 
|  | # tests delete them | 
|  | self.assertGreaterEqual(1, len(resources_)) | 
|  | resources.extend(resources_) | 
|  | return self._test_resources(resources) | 
|  |  | 
|  | @_require_pagination | 
|  | @_require_sorting | 
|  | def _test_list_pagination_with_marker(self): | 
|  | self._test_list_pagination_iteratively(self._list_all_with_marker) | 
|  |  | 
|  | def _list_all_with_hrefs(self, niterations, sort_args): | 
|  | # paginate resources one by one, using next href links | 
|  | resources = [] | 
|  | prev_links = {} | 
|  |  | 
|  | for i in range(niterations): | 
|  | if prev_links: | 
|  | uri = self.get_bare_url(prev_links['next']) | 
|  | else: | 
|  | sort_args.update(self.list_kwargs) | 
|  | uri = self.list_client.build_uri( | 
|  | self.plural_name, limit=1, **sort_args) | 
|  | prev_links, body = self.list_client.get_uri_with_links( | 
|  | self.plural_name, uri | 
|  | ) | 
|  | resources_ = self._extract_resources(body) | 
|  | # Empty resource list can be returned when any concurrent | 
|  | # tests delete them | 
|  | self.assertGreaterEqual(1, len(resources_)) | 
|  | resources.extend(self._test_resources(resources_)) | 
|  |  | 
|  | # The last element is empty and does not contain 'next' link | 
|  | uri = self.get_bare_url(prev_links['next']) | 
|  | prev_links, body = self.client.get_uri_with_links( | 
|  | self.plural_name, uri | 
|  | ) | 
|  | self.assertNotIn('next', prev_links) | 
|  |  | 
|  | # Now walk backwards and compare results | 
|  | resources2 = [] | 
|  | for i in range(niterations): | 
|  | uri = self.get_bare_url(prev_links['previous']) | 
|  | prev_links, body = self.list_client.get_uri_with_links( | 
|  | self.plural_name, uri | 
|  | ) | 
|  | resources_ = self._extract_resources(body) | 
|  | # Empty resource list can be returned when any concurrent | 
|  | # tests delete them | 
|  | self.assertGreaterEqual(1, len(resources_)) | 
|  | resources2.extend(self._test_resources(resources_)) | 
|  |  | 
|  | self.assertSameOrder(resources, reversed(resources2)) | 
|  |  | 
|  | return resources | 
|  |  | 
|  | @_require_pagination | 
|  | @_require_sorting | 
|  | def _test_list_pagination_with_href_links(self): | 
|  | self._test_list_pagination_iteratively(self._list_all_with_hrefs) | 
|  |  | 
|  | @_require_pagination | 
|  | @_require_sorting | 
|  | def _test_list_pagination_page_reverse_with_href_links( | 
|  | self, direction=constants.SORT_DIRECTION_ASC): | 
|  | pagination_args = { | 
|  | 'sort_dir': direction, | 
|  | 'sort_key': self.field, | 
|  | } | 
|  | body = self.list_method(**pagination_args) | 
|  | total_resources = self._extract_resources(body) | 
|  | expected_resources = self._test_resources(total_resources) | 
|  |  | 
|  | page_size = 2 | 
|  | pagination_args['limit'] = page_size | 
|  |  | 
|  | prev_links = {} | 
|  | resources = [] | 
|  | num_resources = len(total_resources) | 
|  | niterations = int(math.ceil(float(num_resources) / page_size)) | 
|  | for i in range(niterations): | 
|  | if prev_links: | 
|  | uri = self.get_bare_url(prev_links['previous']) | 
|  | else: | 
|  | pagination_args.update(self.list_kwargs) | 
|  | uri = self.list_client.build_uri( | 
|  | self.plural_name, page_reverse=True, **pagination_args) | 
|  | prev_links, body = self.list_client.get_uri_with_links( | 
|  | self.plural_name, uri | 
|  | ) | 
|  | resources_ = self._test_resources(self._extract_resources(body)) | 
|  | self.assertGreaterEqual(page_size, len(resources_)) | 
|  | resources.extend(reversed(resources_)) | 
|  |  | 
|  | self.assertSameOrder(expected_resources, reversed(resources)) | 
|  |  | 
|  | @_require_pagination | 
|  | @_require_sorting | 
|  | def _test_list_pagination_page_reverse_asc(self): | 
|  | self._test_list_pagination_page_reverse( | 
|  | direction=constants.SORT_DIRECTION_ASC) | 
|  |  | 
|  | @_require_pagination | 
|  | @_require_sorting | 
|  | def _test_list_pagination_page_reverse_desc(self): | 
|  | self._test_list_pagination_page_reverse( | 
|  | direction=constants.SORT_DIRECTION_DESC) | 
|  |  | 
|  | def _test_list_pagination_page_reverse(self, direction): | 
|  | pagination_args = { | 
|  | 'sort_dir': direction, | 
|  | 'sort_key': self.field, | 
|  | 'limit': 3, | 
|  | } | 
|  | body = self.list_method(**pagination_args) | 
|  | expected_resources = self._extract_resources(body) | 
|  |  | 
|  | pagination_args['limit'] -= 1 | 
|  | pagination_args['marker'] = expected_resources[-1]['id'] | 
|  | pagination_args['page_reverse'] = True | 
|  | body = self.list_method(**pagination_args) | 
|  |  | 
|  | self.assertSameOrder( | 
|  | # the last entry is not included in 2nd result when used as a | 
|  | # marker | 
|  | expected_resources[:-1], | 
|  | self._extract_resources(body)) | 
|  |  | 
|  | @tutils.requires_ext(extension="filter-validation", service="network") | 
|  | def _test_list_validation_filters( | 
|  | self, validation_args, filter_is_valid=True): | 
|  | if not filter_is_valid: | 
|  | self.assertRaises(lib_exc.BadRequest, self.list_method, | 
|  | **validation_args) | 
|  | else: | 
|  | body = self.list_method(**validation_args) | 
|  | resources = self._extract_resources(body) | 
|  | for resource in resources: | 
|  | self.assertIn(resource['name'], self.resource_names) |