Add scenario test for trunk E2E flow

Add Scenario test "test_parent_port_connectivity_after_trunk_deleted"
to verify the E2E flow of fix proposed for Bug: #1794424
"Enable delete bound trunk for linux bridge agent"

Co-Authored-By: Allain Legacy <Allain.legacy@windriver.com>

Depends-On: https://review.openstack.org/#/c/605589/
Change-Id: Ic2e02f4b5dc8d7930e251340d8be194733b0a4f7
Related-Bug: #1794424
Story: 2003889
diff --git a/neutron_tempest_plugin/scenario/test_trunk.py b/neutron_tempest_plugin/scenario/test_trunk.py
index 1903180..85b16cb 100644
--- a/neutron_tempest_plugin/scenario/test_trunk.py
+++ b/neutron_tempest_plugin/scenario/test_trunk.py
@@ -47,8 +47,8 @@
         # setup basic topology for servers we can log into
         cls.network = cls.create_network()
         cls.subnet = cls.create_subnet(cls.network)
-        router = cls.create_router_by_client()
-        cls.create_router_interface(router['id'], cls.subnet['id'])
+        cls.router = cls.create_router_by_client()
+        cls.create_router_interface(cls.router['id'], cls.subnet['id'])
         cls.keypair = cls.create_keypair()
         cls.secgroup = cls.os_primary.network_client.create_security_group(
             name=data_utils.rand_name('secgroup'))
@@ -95,6 +95,27 @@
         t = self.client.show_trunk(trunk_id)['trunk']
         return t['status'] == 'ACTIVE'
 
+    def _create_server_with_network(self, network, use_advanced_image=False):
+        port = self.create_port(network, security_groups=[
+            self.secgroup['security_group']['id']])
+        server, fip = self._create_server_with_fip(
+            port['id'], use_advanced_image=use_advanced_image)
+        ssh_user = CONF.validation.image_ssh_user
+        if use_advanced_image:
+            ssh_user = CONF.neutron_plugin_options.advanced_image_ssh_user
+
+        server_ssh_client = ssh.Client(
+            fip['floating_ip_address'],
+            ssh_user,
+            pkey=self.keypair['private_key'])
+
+        return {
+            'server': server,
+            'fip': fip,
+            'ssh_client': server_ssh_client,
+            'port': port,
+        }
+
     def _create_server_with_port_and_subport(self, vlan_network, vlan_tag,
                                              use_advanced_image=False):
         parent_port = self.create_port(self.network, security_groups=[
@@ -107,7 +128,7 @@
             'port_id': port_for_subport['id'],
             'segmentation_type': 'vlan',
             'segmentation_id': vlan_tag}
-        self.create_trunk(parent_port, [subport])
+        trunk = self.create_trunk(parent_port, [subport])
 
         server, fip = self._create_server_with_fip(
             parent_port['id'], use_advanced_image=use_advanced_image)
@@ -126,6 +147,8 @@
             'fip': fip,
             'ssh_client': server_ssh_client,
             'subport': port_for_subport,
+            'parentport': parent_port,
+            'trunk': trunk,
         }
 
     def _wait_for_server(self, server, advanced_image=False):
@@ -260,3 +283,78 @@
             servers[1]['subport']['fixed_ips'][0]['ip_address'],
             should_succeed=True
         )
+
+    @testtools.skipUnless(
+          CONF.neutron_plugin_options.advanced_image_ref,
+          "Advanced image is required to run this test.")
+    @testtools.skipUnless(
+          CONF.neutron_plugin_options.q_agent == "linuxbridge",
+          "Linux bridge agent is required to run this test.")
+    @decorators.idempotent_id('d61cbdf6-1896-491c-b4b4-871caf7fbffe')
+    def test_parent_port_connectivity_after_trunk_deleted_lb(self):
+        vlan_tag = 10
+
+        vlan_network = self.create_network()
+        vlan_subnet = self.create_subnet(vlan_network)
+        self.create_router_interface(self.router['id'], vlan_subnet['id'])
+
+        trunk_network_server = self._create_server_with_port_and_subport(
+            vlan_network, vlan_tag, use_advanced_image=True)
+        normal_network_server = self._create_server_with_network(self.network)
+        vlan_network_server = self._create_server_with_network(vlan_network)
+
+        self._wait_for_server(trunk_network_server, advanced_image=True)
+        # Configure VLAN interfaces on server
+        command = CONFIGURE_VLAN_INTERFACE_COMMANDS % {'tag': vlan_tag}
+        trunk_network_server['ssh_client'].exec_command(command)
+        out = trunk_network_server['ssh_client'].exec_command(
+            'PATH=$PATH:/usr/sbin;ip addr list')
+        LOG.debug("Interfaces on server %s: %s", trunk_network_server, out)
+
+        self._wait_for_server(normal_network_server)
+        self._wait_for_server(vlan_network_server)
+
+        # allow intra-securitygroup traffic
+        rule = self.client.create_security_group_rule(
+            security_group_id=self.secgroup['security_group']['id'],
+            direction='ingress', ethertype='IPv4', protocol='icmp',
+            remote_group_id=self.secgroup['security_group']['id'])
+        self.addCleanup(self.client.delete_security_group_rule,
+                        rule['security_group_rule']['id'])
+
+        # Ping from trunk_network_server to normal_network_server
+        # via parent port
+        self.check_remote_connectivity(
+            trunk_network_server['ssh_client'],
+            normal_network_server['port']['fixed_ips'][0]['ip_address'],
+            should_succeed=True
+        )
+
+        # Ping from trunk_network_server to vlan_network_server via VLAN
+        # interface should success
+        self.check_remote_connectivity(
+            trunk_network_server['ssh_client'],
+            vlan_network_server['port']['fixed_ips'][0]['ip_address'],
+            should_succeed=True
+        )
+
+        # Delete the trunk
+        self.delete_trunk(trunk_network_server['trunk'],
+            detach_parent_port=False)
+        LOG.debug("Trunk %s is deleted.", trunk_network_server['trunk']['id'])
+
+        # Ping from trunk_network_server to normal_network_server
+        # via parent port success after trunk deleted
+        self.check_remote_connectivity(
+            trunk_network_server['ssh_client'],
+            normal_network_server['port']['fixed_ips'][0]['ip_address'],
+            should_succeed=True
+        )
+
+        # Ping from trunk_network_server to vlan_network_server via VLAN
+        # interface should fail after trunk deleted
+        self.check_remote_connectivity(
+            trunk_network_server['ssh_client'],
+            vlan_network_server['port']['fixed_ips'][0]['ip_address'],
+            should_succeed=False
+        )