Add scenario test for trunk E2E flow
Add Scenario test "test_parent_port_connectivity_after_trunk_deleted"
to verify the E2E flow of fix proposed for Bug: #1794424
"Enable delete bound trunk for linux bridge agent"
Co-Authored-By: Allain Legacy <Allain.legacy@windriver.com>
Depends-On: https://review.openstack.org/#/c/605589/
Change-Id: Ic2e02f4b5dc8d7930e251340d8be194733b0a4f7
Related-Bug: #1794424
Story: 2003889
diff --git a/neutron_tempest_plugin/scenario/test_trunk.py b/neutron_tempest_plugin/scenario/test_trunk.py
index 1903180..85b16cb 100644
--- a/neutron_tempest_plugin/scenario/test_trunk.py
+++ b/neutron_tempest_plugin/scenario/test_trunk.py
@@ -47,8 +47,8 @@
# setup basic topology for servers we can log into
cls.network = cls.create_network()
cls.subnet = cls.create_subnet(cls.network)
- router = cls.create_router_by_client()
- cls.create_router_interface(router['id'], cls.subnet['id'])
+ cls.router = cls.create_router_by_client()
+ cls.create_router_interface(cls.router['id'], cls.subnet['id'])
cls.keypair = cls.create_keypair()
cls.secgroup = cls.os_primary.network_client.create_security_group(
name=data_utils.rand_name('secgroup'))
@@ -95,6 +95,27 @@
t = self.client.show_trunk(trunk_id)['trunk']
return t['status'] == 'ACTIVE'
+ def _create_server_with_network(self, network, use_advanced_image=False):
+ port = self.create_port(network, security_groups=[
+ self.secgroup['security_group']['id']])
+ server, fip = self._create_server_with_fip(
+ port['id'], use_advanced_image=use_advanced_image)
+ ssh_user = CONF.validation.image_ssh_user
+ if use_advanced_image:
+ ssh_user = CONF.neutron_plugin_options.advanced_image_ssh_user
+
+ server_ssh_client = ssh.Client(
+ fip['floating_ip_address'],
+ ssh_user,
+ pkey=self.keypair['private_key'])
+
+ return {
+ 'server': server,
+ 'fip': fip,
+ 'ssh_client': server_ssh_client,
+ 'port': port,
+ }
+
def _create_server_with_port_and_subport(self, vlan_network, vlan_tag,
use_advanced_image=False):
parent_port = self.create_port(self.network, security_groups=[
@@ -107,7 +128,7 @@
'port_id': port_for_subport['id'],
'segmentation_type': 'vlan',
'segmentation_id': vlan_tag}
- self.create_trunk(parent_port, [subport])
+ trunk = self.create_trunk(parent_port, [subport])
server, fip = self._create_server_with_fip(
parent_port['id'], use_advanced_image=use_advanced_image)
@@ -126,6 +147,8 @@
'fip': fip,
'ssh_client': server_ssh_client,
'subport': port_for_subport,
+ 'parentport': parent_port,
+ 'trunk': trunk,
}
def _wait_for_server(self, server, advanced_image=False):
@@ -260,3 +283,78 @@
servers[1]['subport']['fixed_ips'][0]['ip_address'],
should_succeed=True
)
+
+ @testtools.skipUnless(
+ CONF.neutron_plugin_options.advanced_image_ref,
+ "Advanced image is required to run this test.")
+ @testtools.skipUnless(
+ CONF.neutron_plugin_options.q_agent == "linuxbridge",
+ "Linux bridge agent is required to run this test.")
+ @decorators.idempotent_id('d61cbdf6-1896-491c-b4b4-871caf7fbffe')
+ def test_parent_port_connectivity_after_trunk_deleted_lb(self):
+ vlan_tag = 10
+
+ vlan_network = self.create_network()
+ vlan_subnet = self.create_subnet(vlan_network)
+ self.create_router_interface(self.router['id'], vlan_subnet['id'])
+
+ trunk_network_server = self._create_server_with_port_and_subport(
+ vlan_network, vlan_tag, use_advanced_image=True)
+ normal_network_server = self._create_server_with_network(self.network)
+ vlan_network_server = self._create_server_with_network(vlan_network)
+
+ self._wait_for_server(trunk_network_server, advanced_image=True)
+ # Configure VLAN interfaces on server
+ command = CONFIGURE_VLAN_INTERFACE_COMMANDS % {'tag': vlan_tag}
+ trunk_network_server['ssh_client'].exec_command(command)
+ out = trunk_network_server['ssh_client'].exec_command(
+ 'PATH=$PATH:/usr/sbin;ip addr list')
+ LOG.debug("Interfaces on server %s: %s", trunk_network_server, out)
+
+ self._wait_for_server(normal_network_server)
+ self._wait_for_server(vlan_network_server)
+
+ # allow intra-securitygroup traffic
+ rule = self.client.create_security_group_rule(
+ security_group_id=self.secgroup['security_group']['id'],
+ direction='ingress', ethertype='IPv4', protocol='icmp',
+ remote_group_id=self.secgroup['security_group']['id'])
+ self.addCleanup(self.client.delete_security_group_rule,
+ rule['security_group_rule']['id'])
+
+ # Ping from trunk_network_server to normal_network_server
+ # via parent port
+ self.check_remote_connectivity(
+ trunk_network_server['ssh_client'],
+ normal_network_server['port']['fixed_ips'][0]['ip_address'],
+ should_succeed=True
+ )
+
+ # Ping from trunk_network_server to vlan_network_server via VLAN
+ # interface should success
+ self.check_remote_connectivity(
+ trunk_network_server['ssh_client'],
+ vlan_network_server['port']['fixed_ips'][0]['ip_address'],
+ should_succeed=True
+ )
+
+ # Delete the trunk
+ self.delete_trunk(trunk_network_server['trunk'],
+ detach_parent_port=False)
+ LOG.debug("Trunk %s is deleted.", trunk_network_server['trunk']['id'])
+
+ # Ping from trunk_network_server to normal_network_server
+ # via parent port success after trunk deleted
+ self.check_remote_connectivity(
+ trunk_network_server['ssh_client'],
+ normal_network_server['port']['fixed_ips'][0]['ip_address'],
+ should_succeed=True
+ )
+
+ # Ping from trunk_network_server to vlan_network_server via VLAN
+ # interface should fail after trunk deleted
+ self.check_remote_connectivity(
+ trunk_network_server['ssh_client'],
+ vlan_network_server['port']['fixed_ips'][0]['ip_address'],
+ should_succeed=False
+ )