Add test "attach_previously_used_port_to_new_instance"

Ports with attached QoS policy should keep it even after
deletion of associated VMs. This test validates that
the policy's rules are enforced on new VM that launched using
such port.

Change-Id: I26b576c02921df0944abd850ad3ec3a174ef5cc9
diff --git a/neutron_tempest_plugin/scenario/test_qos.py b/neutron_tempest_plugin/scenario/test_qos.py
index ba8cc88..a59441c 100644
--- a/neutron_tempest_plugin/scenario/test_qos.py
+++ b/neutron_tempest_plugin/scenario/test_qos.py
@@ -19,6 +19,7 @@
 from neutron_lib.services.qos import constants as qos_consts
 from oslo_log import log as logging
 from tempest.common import utils as tutils
+from tempest.common import waiters
 from tempest.lib import decorators
 from tempest.lib import exceptions
 
@@ -160,6 +161,47 @@
                                         shared=True)
         return policy['policy']['id']
 
+    def _create_server_by_port(self, port=None):
+        """Launch an instance using a port interface;
+
+        In case that the given port is None, a new port is created,
+        activated and configured with inbound SSH and TCP connection.
+        """
+        # Create and activate the port that will be assign to the instance.
+        if port is None:
+            secgroup = self.create_security_group()
+            self.create_loginable_secgroup_rule(
+                secgroup_id=secgroup['id'])
+
+            secgroup_rules = [{'protocol': 'tcp',
+                               'direction': 'ingress',
+                               'port_range_min': self.NC_PORT,
+                               'port_range_max': self.NC_PORT,
+                               'remote_ip_prefix': '0.0.0.0/0'}]
+
+            self.create_secgroup_rules(secgroup_rules,
+                                       secgroup['id'])
+
+            port = self.create_port(self.network,
+                                    security_groups=[secgroup['id']])
+            self.fip = self.create_floatingip(port=port)
+
+        keypair = self.create_keypair()
+
+        server_kwargs = {
+            'flavor_ref': CONF.compute.flavor_ref,
+            'image_ref': CONF.compute.image_ref,
+            'key_name': keypair['name'],
+            'networks': [{'port': port['id']}],
+        }
+
+        server = self.create_server(**server_kwargs)
+        self.wait_for_server_active(server['server'])
+        self.check_connectivity(self.fip['floating_ip_address'],
+                                CONF.validation.image_ssh_user,
+                                keypair['private_key'])
+        return server, port
+
 
 class QoSTest(QoSTestMixin, base.BaseTempestTestCase):
     @classmethod
@@ -283,3 +325,58 @@
             port=self.NC_PORT, expected_bw=QoSTest.LIMIT_BYTES_SEC * 3),
             timeout=self.FILE_DOWNLOAD_TIMEOUT,
             sleep=1)
+
+    @decorators.idempotent_id('66e5673e-0522-11ea-8d71-362b9e155667')
+    def test_attach_previously_used_port_to_new_instance(self):
+        """The test spawns new instance using port with QoS policy.
+
+        Ports with attached QoS policy could be used multiple times.
+        The policy rules have to be enforced on the new machines.
+        """
+        self.network = self.create_network()
+        self.subnet = self.create_subnet(self.network)
+        self.router = self.create_router_by_client()
+        self.create_router_interface(self.router['id'], self.subnet['id'])
+
+        vm, vm_port = self._create_server_by_port()
+
+        port_policy = self.os_admin.network_client.create_qos_policy(
+            name='port-policy',
+            description='policy for attach',
+            shared=False)['policy']
+
+        rule = self.os_admin.network_client.create_bandwidth_limit_rule(
+            policy_id=port_policy['id'],
+            max_kbps=constants.LIMIT_KILO_BITS_PER_SECOND,
+            max_burst_kbps=constants.LIMIT_KILO_BITS_PER_SECOND)[
+                    'bandwidth_limit_rule']
+
+        self.os_admin.network_client.update_port(
+            vm_port['id'], qos_policy_id=port_policy['id'])
+
+        self.os_primary.servers_client.delete_server(vm['server']['id'])
+        waiters.wait_for_server_termination(
+            self.os_primary.servers_client,
+            vm['server']['id'])
+
+        # Launch a new server using the same port with attached policy
+        self._create_server_by_port(port=vm_port)
+
+        retrieved_port = self.os_admin.network_client.show_port(
+            vm_port['id'])
+        self.assertEqual(port_policy['id'],
+                         retrieved_port['port']['qos_policy_id'],
+                         """The expected policy ID is {0},
+                         the actual value is {1}""".
+                         format(port_policy['id'],
+                                retrieved_port['port']['qos_policy_id']))
+
+        retrieved_policy = self.os_admin.network_client.show_qos_policy(
+                           retrieved_port['port']['qos_policy_id'])
+
+        retrieved_rule_id = retrieved_policy['policy']['rules'][0]['id']
+        self.assertEqual(rule['id'],
+                         retrieved_rule_id,
+                         """The expected rule ID is {0},
+                         the actual value is {1}""".
+                         format(rule['id'], retrieved_rule_id))