Merge "Fix FloatingIpTestCasesAdmin.test_two_vms_fips" into mcp/caracal
diff --git a/neutron_tempest_plugin/vpnaas/api/base_vpnaas.py b/neutron_tempest_plugin/vpnaas/api/base_vpnaas.py
index 3ccf48b..db9d79e 100644
--- a/neutron_tempest_plugin/vpnaas/api/base_vpnaas.py
+++ b/neutron_tempest_plugin/vpnaas/api/base_vpnaas.py
@@ -97,9 +97,9 @@
return vpnservice
@classmethod
- def create_ikepolicy(cls, name):
+ def create_ikepolicy(cls, name, ike_version='v1'):
"""Wrapper utility that returns a test ike policy."""
- body = cls.client.create_ikepolicy(name=name)
+ body = cls.client.create_ikepolicy(name=name, ike_version=ike_version)
ikepolicy = body['ikepolicy']
cls.ikepolicies.append(ikepolicy)
return ikepolicy
@@ -118,24 +118,34 @@
peer_address="172.24.4.233",
peer_id="172.24.4.233",
peer_cidrs=None,
- name=None):
+ name=None,
+ local_ep_group_id=None,
+ peer_ep_group_id=None):
"""Wrapper utility that returns a test vpn connection."""
if peer_cidrs is None:
peer_cidrs = ['1.1.1.0/24', '2.2.2.0/24']
if name is None:
name = data_utils.rand_name("ipsec_site_connection-")
- body = cls.client.create_ipsec_site_connection(
- psk=psk,
- initiator="bi-directional",
- ipsecpolicy_id=ipsecpolicy_id,
- admin_state_up=True,
- mtu=1500,
- ikepolicy_id=ikepolicy_id,
- vpnservice_id=vpnservice_id,
- peer_address=peer_address,
- peer_id=peer_id,
- peer_cidrs=peer_cidrs,
- name=name)
+ creation_args = {
+ "psk": psk,
+ "initiator": "bi-directional",
+ "ipsecpolicy_id": ipsecpolicy_id,
+ "admin_state_up": True,
+ "mtu": 1500,
+ "ikepolicy_id": ikepolicy_id,
+ "vpnservice_id": vpnservice_id,
+ "peer_address": peer_address,
+ "peer_id": peer_id,
+ "name": name,
+ }
+ # Using endpoint groups has priority over peer_cidrs
+ if local_ep_group_id and peer_ep_group_id:
+ creation_args["local_ep_group_id"] = local_ep_group_id
+ creation_args["peer_ep_group_id"] = peer_ep_group_id
+ else:
+ creation_args["peer_cidrs"] = peer_cidrs
+
+ body = cls.client.create_ipsec_site_connection(**creation_args)
ipsec_site_connection = body['ipsec_site_connection']
cls.ipsec_site_connections.append(ipsec_site_connection)
return ipsec_site_connection
@@ -167,7 +177,7 @@
@classmethod
def create_endpoint_group(cls, name, type, endpoints):
- """Wrapper utility that returns a test ipsec policy."""
+ """Wrapper utility that returns a test endpoint group."""
body = cls.client.create_endpoint_group(
endpoints=endpoints,
type=type,
diff --git a/neutron_tempest_plugin/vpnaas/scenario/test_vpnaas.py b/neutron_tempest_plugin/vpnaas/scenario/test_vpnaas.py
index 942e7f0..f881a84 100644
--- a/neutron_tempest_plugin/vpnaas/scenario/test_vpnaas.py
+++ b/neutron_tempest_plugin/vpnaas/scenario/test_vpnaas.py
@@ -105,7 +105,7 @@
cls.create_loginable_secgroup_rule(secgroup_id=cls.secgroup['id'])
cls.create_pingable_secgroup_rule(secgroup_id=cls.secgroup['id'])
cls.ikepolicy = cls.create_ikepolicy(
- data_utils.rand_name("ike-policy-"))
+ data_utils.rand_name("ike-policy-"), 'v2')
cls.ipsecpolicy = cls.create_ipsecpolicy(
data_utils.rand_name("ipsec-policy-"))
@@ -196,7 +196,6 @@
cls.os_admin.network_client.update_subnet(
subnet['id'], host_routes=[{"destination": cls.left_cidr,
"nexthop": snat_ip}])
-
return network, subnet, router
def _create_server(self, create_floating_ip=True, network=None):
@@ -348,3 +347,278 @@
@test.unstable_test("bug 1882220")
def test_vpnaas_6in6(self):
self._test_vpnaas()
+
+
+class VpnaasWithEndpointGroupBase(Vpnaas):
+ """Test the following topology
+
+ .. code-block:: HTML
+
+ +-------------------+
+ | public |
+ | network |
+ | |
+ ++-----------------++
+ | |
+ | |
+ +-------+-+ +-+-------+
+ | LEFT | | RIGHT |
+ | router | <--VPN--> | router |
+ | | | |
+ +-+-----+-+ +-+-----+-+
+ | | | |
+ +-------+-+ +-+-------+ +-------+-+ +-+-------+
+ | LEFT | | LEFT | | RIGHT | | RIGHT |
+ | network | | network2| | network | | network2|
+ | | | | | | | |
+ +---------+ +---------+ +---------+ +---------+
+ """
+
+ check_failures = ""
+
+ @classmethod
+ @utils.requires_ext(extension="vpnaas", service="network")
+ def resource_setup(cls):
+ super(VpnaasWithEndpointGroupBase, cls).resource_setup()
+
+ left_v4_cidr2 = netaddr.IPNetwork('10.220.2.0/24')
+ left_v6_cidr2 = netaddr.IPNetwork('2001:db8:0:3::/64')
+ right_v4_cidr2 = netaddr.IPNetwork('10.210.2.0/24')
+ right_v6_cidr2 = netaddr.IPNetwork('2001:db8:0:4::/64')
+
+ cls.left_cidr2 = left_v6_cidr2 if cls.inner_ipv6 else left_v4_cidr2
+ cls.right_cidr2 = right_v6_cidr2 if cls.inner_ipv6 else right_v4_cidr2
+
+ cls.network2, cls.subnet2 = cls._add_network(
+ 'left2',
+ cls.router,
+ cls.left_cidr2
+ )
+
+ cls._right_network2, cls._right_subnet2 = cls._add_network(
+ 'right2',
+ cls._right_router,
+ cls.right_cidr2
+ )
+
+ # Update subnets in case of distributed routers
+ cls._update_host_routes()
+
+ cls.left_ep_group_subnet = cls.create_endpoint_group(
+ name=data_utils.rand_name("left-endpoint-group-subnet-"),
+ type="subnet",
+ endpoints=[cls.subnet['id'], cls.subnet2['id']])
+ cls.left_ep_group_cidr = cls.create_endpoint_group(
+ name=data_utils.rand_name("left-endpoint-group-cidr-"),
+ type="cidr",
+ endpoints=[cls.left_cidr, cls.left_cidr2])
+ cls.right_ep_group_subnet = cls.create_endpoint_group(
+ name=data_utils.rand_name("right-endpoint-group-subnet-"),
+ type="subnet",
+ endpoints=[cls._right_subnet['id'], cls._right_subnet2['id']])
+ cls.right_ep_group_cidr = cls.create_endpoint_group(
+ name=data_utils.rand_name("right-endpoint-group-cidr-"),
+ type="cidr",
+ endpoints=[cls.right_cidr, cls.right_cidr2])
+
+ @classmethod
+ def _add_network(cls, prefix, router, cidr):
+ network = cls.create_network(network_name=f"{prefix}-network")
+ ip_version = 6 if cls.inner_ipv6 else 4
+ subnet = cls.create_subnet(
+ network, ip_version=ip_version, cidr=cidr,
+ name=f"{prefix}-subnet", **cls.extra_subnet_attributes)
+ cls.create_router_interface(router['id'], subnet['id'])
+
+ return network, subnet
+
+ @classmethod
+ def _update_host_routes(cls):
+ is_left_router_distributed = cls.os_admin.network_client.show_router(
+ cls.router['id'])['router'].get('distributed')
+ if is_left_router_distributed:
+ snat_port = cls.os_admin.network_client.list_ports(
+ device_id=cls.router['id'],
+ device_owner='network:router_centralized_snat')
+ for subnet in [cls.subnet, cls.subnet2]:
+ snat_ip = cls._get_snat_ip(cls, snat_port['ports'], subnet)
+ host_routes = [
+ {"destination": cls.right_cidr, "nexthop": snat_ip},
+ {"destination": cls.right_cidr2, "nexthop": snat_ip}
+ ]
+ cls.os_admin.network_client.update_subnet(
+ subnet['id'], host_routes=host_routes)
+ is_right_router_distributed = cls.os_admin.network_client.show_router(
+ cls._right_router['id'])['router'].get('distributed')
+ if is_right_router_distributed:
+ snat_port = cls.os_admin.network_client.list_ports(
+ device_id=cls._right_router['id'],
+ device_owner='network:router_centralized_snat')
+ for subnet in [cls._right_subnet, cls._right_subnet2]:
+ snat_ip = cls._get_snat_ip(cls, snat_port['ports'], subnet)
+ host_routes = [
+ {"destination": cls.left_cidr, "nexthop": snat_ip},
+ {"destination": cls.left_cidr2, "nexthop": snat_ip}
+ ]
+ cls.os_admin.network_client.update_subnet(
+ subnet['id'], host_routes=host_routes)
+
+ def _get_snat_ip(self, ports, subnet):
+ snat_ip = None
+ for port in ports:
+ if subnet['network_id'] == port['network_id']:
+ snat_ip = self._get_ip_on_subnet_for_port(
+ self, port, subnet['id'])
+ break
+ return snat_ip
+
+ def _setup_vpn(self):
+ sites = [
+ dict(
+ name="left",
+ local_ep_group_id=self.left_ep_group_subnet['id'],
+ peer_ep_group_id=self.right_ep_group_cidr['id'],
+ router=self.router,
+ ),
+ dict(
+ name="right",
+ local_ep_group_id=self.right_ep_group_subnet['id'],
+ peer_ep_group_id=self.left_ep_group_cidr['id'],
+ router=self._right_router,
+ ),
+ ]
+ psk = data_utils.rand_name('mysecret')
+ for i in range(0, 2):
+ site = sites[i]
+ site['vpnservice'] = self.create_vpnservice_no_subnet(
+ site['router']['id'])
+ site_connections = []
+ for i in range(0, 2):
+ site = sites[i]
+ vpnservice = site['vpnservice']
+ peer = sites[1 - i]
+ if self.outer_ipv6:
+ peer_address = peer['vpnservice']['external_v6_ip']
+ if not peer_address:
+ msg = "Public network must have an IPv6 subnet."
+ raise self.skipException(msg)
+ else:
+ peer_address = peer['vpnservice']['external_v4_ip']
+ site_connection = self.create_ipsec_site_connection(
+ self.ikepolicy['id'],
+ self.ipsecpolicy['id'],
+ vpnservice['id'],
+ peer_address=peer_address,
+ peer_id=peer_address,
+ local_ep_group_id=site['local_ep_group_id'],
+ peer_ep_group_id=site['peer_ep_group_id'],
+ psk=psk,
+ name=data_utils.rand_name(
+ '%s-ipsec-site-connection' % site['name']))
+ site_connections.append(site_connection)
+ for site_connection in site_connections:
+ self.wait_ipsec_site_connection_status(site_connection['id'],
+ status="ACTIVE")
+
+ def _union_tests(description):
+ def decorator(func):
+ def f(self, *args, **kwargs):
+ try:
+ func(self, *args, **kwargs)
+ except Exception:
+ self.check_failures += (
+ f"\nTest connection {description} '{args[0].host} " +
+ f"--> {args[1]}' failed"
+ )
+ return f
+ return decorator
+
+ @_union_tests('without VPN')
+ def _test_check_connectivity_fail(self, fip, dst_ip):
+ self.check_remote_connectivity(fip, dst_ip, should_succeed=False)
+
+ @_union_tests('via VPN')
+ def _test_check_connectivity_with_vpn(self, fip, dst_ip):
+ self.check_remote_connectivity(fip, dst_ip)
+
+ @_union_tests('via floating IP')
+ def _test_check_connectivity_with_fip(self, fip, dst_ip):
+ self.check_remote_connectivity(fip, dst_ip)
+
+ def _test_vpnaas(self, right_servers_fip=False):
+ # RIGHT
+ self.right_server_A = self._create_server(network=self._right_network,
+ create_floating_ip=right_servers_fip)
+ self.right_ip_A = self._get_ip_on_subnet_for_port(
+ self.right_server_A['port'], self._right_subnet['id'])
+ self.right_server_B = self._create_server(network=self._right_network2,
+ create_floating_ip=right_servers_fip)
+ self.right_ip_B = self._get_ip_on_subnet_for_port(
+ self.right_server_B['port'], self._right_subnet2['id'])
+
+ # LEFT
+ left_server_A = self._create_server()
+ self.ssh_client_A = ssh.Client(
+ left_server_A['fip']['floating_ip_address'],
+ CONF.validation.image_ssh_user,
+ pkey=self.keypair['private_key'],
+ ssh_key_type=CONF.validation.ssh_key_type
+ )
+ left_server_B = self._create_server(network=self.network2)
+ self.ssh_client_B = ssh.Client(
+ left_server_B['fip']['floating_ip_address'],
+ CONF.validation.image_ssh_user,
+ pkey=self.keypair['private_key'],
+ ssh_key_type=CONF.validation.ssh_key_type
+ )
+
+ # check LEFT -> RIGHT connectivity without VPN
+ self._test_check_connectivity_fail(self.ssh_client_A, self.right_ip_A)
+ self._test_check_connectivity_fail(self.ssh_client_B, self.right_ip_A)
+ self._test_check_connectivity_fail(self.ssh_client_A, self.right_ip_B)
+ self._test_check_connectivity_fail(self.ssh_client_B, self.right_ip_B)
+ self.assertEmpty(self.check_failures, f"{self.check_failures}")
+
+ # check LEFT -> RIGHT connectivity via VPN
+ self._setup_vpn()
+ self._test_check_connectivity_with_vpn(
+ self.ssh_client_A, self.right_ip_A
+ )
+ self._test_check_connectivity_with_vpn(
+ self.ssh_client_B, self.right_ip_A
+ )
+ self._test_check_connectivity_with_vpn(
+ self.ssh_client_A, self.right_ip_B
+ )
+ self._test_check_connectivity_with_vpn(
+ self.ssh_client_B, self.right_ip_B
+ )
+ self.assertEmpty(self.check_failures, f"{self.check_failures}")
+
+ def _test_vpnaas_with_fip(self):
+ self._test_vpnaas(right_servers_fip=True)
+ # check LEFT -> RIGHT connectivity via floating IP
+ self._test_check_connectivity_with_fip(
+ self.ssh_client_A,
+ self.right_server_A['fip']['floating_ip_address']
+ )
+ self._test_check_connectivity_with_fip(
+ self.ssh_client_B,
+ self.right_server_A['fip']['floating_ip_address']
+ )
+ self._test_check_connectivity_with_fip(
+ self.ssh_client_A,
+ self.right_server_B['fip']['floating_ip_address']
+ )
+ self._test_check_connectivity_with_fip(
+ self.ssh_client_B,
+ self.right_server_B['fip']['floating_ip_address']
+ )
+ self.assertEmpty(self.check_failures, f"{self.check_failures}")
+
+
+class VpnaasEG4in4(VpnaasWithEndpointGroupBase):
+
+ @decorators.idempotent_id('26dad126-665f-4f59-ba2d-e7e27a9675de')
+ def test_vpnaas_with_fip(self):
+ self._test_vpnaas_with_fip()