Merge "Fix logging of console output in scenario tests"
diff --git a/neutron_tempest_plugin/api/test_security_groups.py b/neutron_tempest_plugin/api/test_security_groups.py
index 0221e79..a11a031 100644
--- a/neutron_tempest_plugin/api/test_security_groups.py
+++ b/neutron_tempest_plugin/api/test_security_groups.py
@@ -17,6 +17,7 @@
from neutron_lib import constants
from tempest.lib.common.utils import data_utils
+from tempest.lib.common.utils import test_utils
from tempest.lib import decorators
from tempest.lib import exceptions
import testtools
@@ -226,7 +227,7 @@
def _create_security_group_rules(self, amount, port_index=1):
for i in range(amount):
- self.create_security_group_rules(**{
+ self.create_security_group_rule(**{
'project_id': self.client.tenant_id,
'direction': 'ingress',
'port_range_max': port_index + i,
@@ -237,6 +238,8 @@
sg_rules_quota = self._get_sg_rules_quota()
new_sg_rules_quota = 2 * sg_rules_quota
self._set_sg_rules_quota(new_sg_rules_quota)
+ self.assertGreater(self._get_sg_rules_quota(), sg_rules_quota,
+ "Security group rules quota wasn't changed correctly")
return new_sg_rules_quota
def _decrease_sg_rules_quota(self):
@@ -264,6 +267,28 @@
return len(security_group_rules['security_group_rules'])
+class SecGroupRulesQuotaTest(BaseSecGroupRulesQuota):
+
+ credentials = ['primary', 'admin']
+ required_extensions = ['security-group', 'quotas']
+
+ def setUp(self):
+ super(SecGroupRulesQuotaTest, self).setUp()
+ self.addCleanup(test_utils.call_and_ignore_notfound_exc,
+ self.admin_client.reset_quotas, self.client.tenant_id)
+ self._set_sg_rules_quota(10)
+
+ @decorators.idempotent_id('77ec038c-5638-11ea-8e2d-0242ac130003')
+ def test_sg_rules_quota_increased(self):
+ self._create_max_allowed_sg_rules_amount()
+ new_quota = self._increase_sg_rules_quota()
+ port_index = new_quota
+ self._create_max_allowed_sg_rules_amount(port_index)
+ quota_set = self._get_sg_rules_quota()
+ self.assertEqual(quota_set, self._get_sg_rules_amount(),
+ "Amount of security groups rules doesn't match quota")
+
+
class SecGroupProtocolTest(base.BaseNetworkTest):
protocol_names = base_security_groups.V4_PROTOCOL_NAMES
diff --git a/neutron_tempest_plugin/scenario/test_qos.py b/neutron_tempest_plugin/scenario/test_qos.py
index f8f1b03..e84fb3c 100644
--- a/neutron_tempest_plugin/scenario/test_qos.py
+++ b/neutron_tempest_plugin/scenario/test_qos.py
@@ -19,6 +19,7 @@
from neutron_lib.services.qos import constants as qos_consts
from oslo_log import log as logging
from tempest.common import utils as tutils
+from tempest.common import waiters
from tempest.lib import decorators
from neutron_tempest_plugin.api import base as base_api
@@ -151,6 +152,47 @@
shared=True)
return policy['policy']['id']
+ def _create_server_by_port(self, port=None):
+ """Launch an instance using a port interface;
+
+ In case that the given port is None, a new port is created,
+ activated and configured with inbound SSH and TCP connection.
+ """
+ # Create and activate the port that will be assign to the instance.
+ if port is None:
+ secgroup = self.create_security_group()
+ self.create_loginable_secgroup_rule(
+ secgroup_id=secgroup['id'])
+
+ secgroup_rules = [{'protocol': 'tcp',
+ 'direction': 'ingress',
+ 'port_range_min': self.NC_PORT,
+ 'port_range_max': self.NC_PORT,
+ 'remote_ip_prefix': '0.0.0.0/0'}]
+
+ self.create_secgroup_rules(secgroup_rules,
+ secgroup['id'])
+
+ port = self.create_port(self.network,
+ security_groups=[secgroup['id']])
+ self.fip = self.create_floatingip(port=port)
+
+ keypair = self.create_keypair()
+
+ server_kwargs = {
+ 'flavor_ref': CONF.compute.flavor_ref,
+ 'image_ref': CONF.compute.image_ref,
+ 'key_name': keypair['name'],
+ 'networks': [{'port': port['id']}],
+ }
+
+ server = self.create_server(**server_kwargs)
+ self.wait_for_server_active(server['server'])
+ self.check_connectivity(self.fip['floating_ip_address'],
+ CONF.validation.image_ssh_user,
+ keypair['private_key'])
+ return server, port
+
class QoSTest(QoSTestMixin, base.BaseTempestTestCase):
@classmethod
@@ -274,3 +316,58 @@
port=self.NC_PORT, expected_bw=QoSTest.LIMIT_BYTES_SEC * 3),
timeout=self.FILE_DOWNLOAD_TIMEOUT,
sleep=1)
+
+ @decorators.idempotent_id('66e5673e-0522-11ea-8d71-362b9e155667')
+ def test_attach_previously_used_port_to_new_instance(self):
+ """The test spawns new instance using port with QoS policy.
+
+ Ports with attached QoS policy could be used multiple times.
+ The policy rules have to be enforced on the new machines.
+ """
+ self.network = self.create_network()
+ self.subnet = self.create_subnet(self.network)
+ self.router = self.create_router_by_client()
+ self.create_router_interface(self.router['id'], self.subnet['id'])
+
+ vm, vm_port = self._create_server_by_port()
+
+ port_policy = self.os_admin.network_client.create_qos_policy(
+ name='port-policy',
+ description='policy for attach',
+ shared=False)['policy']
+
+ rule = self.os_admin.network_client.create_bandwidth_limit_rule(
+ policy_id=port_policy['id'],
+ max_kbps=constants.LIMIT_KILO_BITS_PER_SECOND,
+ max_burst_kbps=constants.LIMIT_KILO_BITS_PER_SECOND)[
+ 'bandwidth_limit_rule']
+
+ self.os_admin.network_client.update_port(
+ vm_port['id'], qos_policy_id=port_policy['id'])
+
+ self.os_primary.servers_client.delete_server(vm['server']['id'])
+ waiters.wait_for_server_termination(
+ self.os_primary.servers_client,
+ vm['server']['id'])
+
+ # Launch a new server using the same port with attached policy
+ self._create_server_by_port(port=vm_port)
+
+ retrieved_port = self.os_admin.network_client.show_port(
+ vm_port['id'])
+ self.assertEqual(port_policy['id'],
+ retrieved_port['port']['qos_policy_id'],
+ """The expected policy ID is {0},
+ the actual value is {1}""".
+ format(port_policy['id'],
+ retrieved_port['port']['qos_policy_id']))
+
+ retrieved_policy = self.os_admin.network_client.show_qos_policy(
+ retrieved_port['port']['qos_policy_id'])
+
+ retrieved_rule_id = retrieved_policy['policy']['rules'][0]['id']
+ self.assertEqual(rule['id'],
+ retrieved_rule_id,
+ """The expected rule ID is {0},
+ the actual value is {1}""".
+ format(rule['id'], retrieved_rule_id))