Add floating IP QoS test cases
This patch adds test cases for neutron bp/floating-ip-rate-limit.
We will create a server with one floating IP. And attach qos
bandwidth rules to the floating IP. Then try to download data
from the server to calculate the bandwidth whether is under
the qos limit.
Depends-On: Ibef48e7842a276fe77c901403d67760871f2b7e0
Partially-Implements blueprint: floating-ip-rate-limit
Change-Id: I50d1775cd1f86201bd21d24650f2c52f316ef1b8
diff --git a/neutron_tempest_plugin/scenario/constants.py b/neutron_tempest_plugin/scenario/constants.py
index 258c587..ddd45ff 100644
--- a/neutron_tempest_plugin/scenario/constants.py
+++ b/neutron_tempest_plugin/scenario/constants.py
@@ -15,4 +15,5 @@
SERVER_STATUS_ACTIVE = 'ACTIVE'
DEFAULT_SECURITY_GROUP = 'default'
LIMIT_KILO_BITS_PER_SECOND = 1000
+LIMIT_KILO_BYTES = 1000
SOCKET_CONNECT_TIMEOUT = 60
diff --git a/neutron_tempest_plugin/scenario/test_floatingip.py b/neutron_tempest_plugin/scenario/test_floatingip.py
index b253890..90e416d 100644
--- a/neutron_tempest_plugin/scenario/test_floatingip.py
+++ b/neutron_tempest_plugin/scenario/test_floatingip.py
@@ -14,6 +14,8 @@
# under the License.
import netaddr
+from neutron_lib import constants as lib_constants
+from neutron_lib.services.qos import constants as qos_consts
from tempest.common import utils
from tempest.common import waiters
from tempest.lib.common.utils import data_utils
@@ -21,12 +23,13 @@
import testscenarios
from testscenarios.scenarios import multiply_scenarios
-from neutron_lib import constants as lib_constants
+from neutron_tempest_plugin.api import base as base_api
from neutron_tempest_plugin.common import ssh
from neutron_tempest_plugin.common import utils as common_utils
from neutron_tempest_plugin import config
from neutron_tempest_plugin.scenario import base
from neutron_tempest_plugin.scenario import constants
+from neutron_tempest_plugin.scenario import test_qos
CONF = config.CONF
@@ -196,3 +199,62 @@
proxy_client=proxy_client)
self.check_remote_connectivity(ssh_client,
gateway_external_ip)
+
+
+class FloatingIPQosTest(FloatingIpTestCasesMixin,
+ test_qos.QoSTest):
+
+ same_network = True
+
+ @classmethod
+ @utils.requires_ext(extension="router", service="network")
+ @utils.requires_ext(extension="qos", service="network")
+ @base_api.require_qos_rule_type(qos_consts.RULE_TYPE_BANDWIDTH_LIMIT)
+ def resource_setup(cls):
+ super(FloatingIPQosTest, cls).resource_setup()
+
+ @decorators.idempotent_id('5eb48aea-eaba-4c20-8a6f-7740070a0aa3')
+ def test_qos(self):
+ """Test floating IP is binding to a QoS policy with
+
+ ingress and egress bandwidth limit rules. And it applied correctly
+ by sending a file from the instance to the test node.
+ Then calculating the bandwidth every ~1 sec by the number of bits
+ received / elapsed time.
+ """
+
+ self._test_basic_resources()
+ policy_id = self._create_qos_policy()
+ ssh_client = self._create_ssh_client()
+ self.os_admin.network_client.create_bandwidth_limit_rule(
+ policy_id, max_kbps=constants.LIMIT_KILO_BITS_PER_SECOND,
+ max_burst_kbps=constants.LIMIT_KILO_BYTES,
+ direction=lib_constants.INGRESS_DIRECTION)
+ self.os_admin.network_client.create_bandwidth_limit_rule(
+ policy_id, max_kbps=constants.LIMIT_KILO_BITS_PER_SECOND,
+ max_burst_kbps=constants.LIMIT_KILO_BYTES,
+ direction=lib_constants.EGRESS_DIRECTION)
+
+ rules = self.os_admin.network_client.list_bandwidth_limit_rules(
+ policy_id)
+ self.assertEqual(2, len(rules['bandwidth_limit_rules']))
+
+ fip = self.os_admin.network_client.get_floatingip(
+ self.fip['id'])['floatingip']
+ self.assertEqual(self.port['id'], fip['port_id'])
+
+ self.os_admin.network_client.update_floatingip(
+ self.fip['id'],
+ qos_policy_id=policy_id)
+
+ fip = self.os_admin.network_client.get_floatingip(
+ self.fip['id'])['floatingip']
+ self.assertEqual(policy_id, fip['qos_policy_id'])
+
+ self._create_file_for_bw_tests(ssh_client)
+ common_utils.wait_until_true(lambda: self._check_bw(
+ ssh_client,
+ self.fip['floating_ip_address'],
+ port=self.NC_PORT),
+ timeout=120,
+ sleep=1)
diff --git a/neutron_tempest_plugin/scenario/test_qos.py b/neutron_tempest_plugin/scenario/test_qos.py
index 67c00c2..58accb0 100644
--- a/neutron_tempest_plugin/scenario/test_qos.py
+++ b/neutron_tempest_plugin/scenario/test_qos.py
@@ -79,6 +79,8 @@
* TOLERANCE_FACTOR / 8.0)
FILE_PATH = "/tmp/img"
+ NC_PORT = 1234
+
@classmethod
@tutils.requires_ext(extension="qos", service="network")
@base_api.require_qos_rule_type(qos_consts.RULE_TYPE_BANDWIDTH_LIMIT)
@@ -117,7 +119,7 @@
time_elapsed = time.time() - start_time
bytes_per_second = total_bytes_read / time_elapsed
- LOG.debug("time_elapsed = %(time_elapsed)d, "
+ LOG.debug("time_elapsed = %(time_elapsed).16f, "
"total_bytes_read = %(total_bytes_read)d, "
"bytes_per_second = %(bytes_per_second)d",
{'time_elapsed': time_elapsed,
@@ -126,6 +128,31 @@
return bytes_per_second <= QoSTest.LIMIT_BYTES_SEC
+ def _create_ssh_client(self):
+ return ssh.Client(self.fip['floating_ip_address'],
+ CONF.validation.image_ssh_user,
+ pkey=self.keypair['private_key'])
+
+ def _test_basic_resources(self):
+ self.setup_network_and_server()
+ self.check_connectivity(self.fip['floating_ip_address'],
+ CONF.validation.image_ssh_user,
+ self.keypair['private_key'])
+ rulesets = [{'protocol': 'tcp',
+ 'direction': 'ingress',
+ 'port_range_min': self.NC_PORT,
+ 'port_range_max': self.NC_PORT,
+ 'remote_ip_prefix': '0.0.0.0/0'}]
+ self.create_secgroup_rules(rulesets,
+ self.security_groups[-1]['id'])
+
+ def _create_qos_policy(self):
+ policy = self.os_admin.network_client.create_qos_policy(
+ name='test-policy',
+ description='test-qos-policy',
+ shared=True)
+ return policy['policy']['id']
+
@decorators.idempotent_id('1f7ed39b-428f-410a-bd2b-db9f465680df')
def test_qos(self):
"""This is a basic test that check that a QoS policy with
@@ -135,29 +162,9 @@
Then calculating the bandwidth every ~1 sec by the number of bits
received / elapsed time.
"""
-
- NC_PORT = 1234
-
- self.setup_network_and_server()
- self.check_connectivity(self.fip['floating_ip_address'],
- CONF.validation.image_ssh_user,
- self.keypair['private_key'])
- rulesets = [{'protocol': 'tcp',
- 'direction': 'ingress',
- 'port_range_min': NC_PORT,
- 'port_range_max': NC_PORT,
- 'remote_ip_prefix': '0.0.0.0/0'}]
- self.create_secgroup_rules(rulesets,
- self.security_groups[-1]['id'])
-
- ssh_client = ssh.Client(self.fip['floating_ip_address'],
- CONF.validation.image_ssh_user,
- pkey=self.keypair['private_key'])
- policy = self.os_admin.network_client.create_qos_policy(
- name='test-policy',
- description='test-qos-policy',
- shared=True)
- policy_id = policy['policy']['id']
+ self._test_basic_resources()
+ policy_id = self._create_qos_policy()
+ ssh_client = self._create_ssh_client()
self.os_admin.network_client.create_bandwidth_limit_rule(
policy_id, max_kbps=constants.LIMIT_KILO_BITS_PER_SECOND,
max_burst_kbps=constants.LIMIT_KILO_BITS_PER_SECOND)
@@ -170,6 +177,6 @@
utils.wait_until_true(lambda: self._check_bw(
ssh_client,
self.fip['floating_ip_address'],
- port=NC_PORT),
+ port=self.NC_PORT),
timeout=120,
sleep=1)