Merge "Add test "attach_previously_used_port_to_new_instance""
diff --git a/neutron_tempest_plugin/scenario/test_qos.py b/neutron_tempest_plugin/scenario/test_qos.py
index f8f1b03..e84fb3c 100644
--- a/neutron_tempest_plugin/scenario/test_qos.py
+++ b/neutron_tempest_plugin/scenario/test_qos.py
@@ -19,6 +19,7 @@
from neutron_lib.services.qos import constants as qos_consts
from oslo_log import log as logging
from tempest.common import utils as tutils
+from tempest.common import waiters
from tempest.lib import decorators
from neutron_tempest_plugin.api import base as base_api
@@ -151,6 +152,47 @@
shared=True)
return policy['policy']['id']
+ def _create_server_by_port(self, port=None):
+ """Launch an instance using a port interface;
+
+ In case that the given port is None, a new port is created,
+ activated and configured with inbound SSH and TCP connection.
+ """
+ # Create and activate the port that will be assign to the instance.
+ if port is None:
+ secgroup = self.create_security_group()
+ self.create_loginable_secgroup_rule(
+ secgroup_id=secgroup['id'])
+
+ secgroup_rules = [{'protocol': 'tcp',
+ 'direction': 'ingress',
+ 'port_range_min': self.NC_PORT,
+ 'port_range_max': self.NC_PORT,
+ 'remote_ip_prefix': '0.0.0.0/0'}]
+
+ self.create_secgroup_rules(secgroup_rules,
+ secgroup['id'])
+
+ port = self.create_port(self.network,
+ security_groups=[secgroup['id']])
+ self.fip = self.create_floatingip(port=port)
+
+ keypair = self.create_keypair()
+
+ server_kwargs = {
+ 'flavor_ref': CONF.compute.flavor_ref,
+ 'image_ref': CONF.compute.image_ref,
+ 'key_name': keypair['name'],
+ 'networks': [{'port': port['id']}],
+ }
+
+ server = self.create_server(**server_kwargs)
+ self.wait_for_server_active(server['server'])
+ self.check_connectivity(self.fip['floating_ip_address'],
+ CONF.validation.image_ssh_user,
+ keypair['private_key'])
+ return server, port
+
class QoSTest(QoSTestMixin, base.BaseTempestTestCase):
@classmethod
@@ -274,3 +316,58 @@
port=self.NC_PORT, expected_bw=QoSTest.LIMIT_BYTES_SEC * 3),
timeout=self.FILE_DOWNLOAD_TIMEOUT,
sleep=1)
+
+ @decorators.idempotent_id('66e5673e-0522-11ea-8d71-362b9e155667')
+ def test_attach_previously_used_port_to_new_instance(self):
+ """The test spawns new instance using port with QoS policy.
+
+ Ports with attached QoS policy could be used multiple times.
+ The policy rules have to be enforced on the new machines.
+ """
+ self.network = self.create_network()
+ self.subnet = self.create_subnet(self.network)
+ self.router = self.create_router_by_client()
+ self.create_router_interface(self.router['id'], self.subnet['id'])
+
+ vm, vm_port = self._create_server_by_port()
+
+ port_policy = self.os_admin.network_client.create_qos_policy(
+ name='port-policy',
+ description='policy for attach',
+ shared=False)['policy']
+
+ rule = self.os_admin.network_client.create_bandwidth_limit_rule(
+ policy_id=port_policy['id'],
+ max_kbps=constants.LIMIT_KILO_BITS_PER_SECOND,
+ max_burst_kbps=constants.LIMIT_KILO_BITS_PER_SECOND)[
+ 'bandwidth_limit_rule']
+
+ self.os_admin.network_client.update_port(
+ vm_port['id'], qos_policy_id=port_policy['id'])
+
+ self.os_primary.servers_client.delete_server(vm['server']['id'])
+ waiters.wait_for_server_termination(
+ self.os_primary.servers_client,
+ vm['server']['id'])
+
+ # Launch a new server using the same port with attached policy
+ self._create_server_by_port(port=vm_port)
+
+ retrieved_port = self.os_admin.network_client.show_port(
+ vm_port['id'])
+ self.assertEqual(port_policy['id'],
+ retrieved_port['port']['qos_policy_id'],
+ """The expected policy ID is {0},
+ the actual value is {1}""".
+ format(port_policy['id'],
+ retrieved_port['port']['qos_policy_id']))
+
+ retrieved_policy = self.os_admin.network_client.show_qos_policy(
+ retrieved_port['port']['qos_policy_id'])
+
+ retrieved_rule_id = retrieved_policy['policy']['rules'][0]['id']
+ self.assertEqual(rule['id'],
+ retrieved_rule_id,
+ """The expected rule ID is {0},
+ the actual value is {1}""".
+ format(rule['id'], retrieved_rule_id))