| # Copyright 2016 Red Hat, Inc. |
| # All Rights Reserved. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may |
| # not use this file except in compliance with the License. You may obtain |
| # a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations |
| # under the License. |
| import errno |
| import socket |
| import time |
| |
| from neutron_lib.services.qos import constants as qos_consts |
| from oslo_log import log as logging |
| from tempest.common import utils as tutils |
| from tempest.common import waiters |
| from tempest.lib import decorators |
| |
| from neutron_tempest_plugin.api import base as base_api |
| from neutron_tempest_plugin.common import ssh |
| from neutron_tempest_plugin.common import utils |
| from neutron_tempest_plugin import config |
| from neutron_tempest_plugin.scenario import base |
| from neutron_tempest_plugin.scenario import constants |
| from neutron_tempest_plugin.scenario import exceptions as sc_exceptions |
| |
| CONF = config.CONF |
| LOG = logging.getLogger(__name__) |
| |
| |
| def _try_connect(host_ip, port, socket_timeout): |
| try: |
| client_socket = socket.socket(socket.AF_INET, |
| socket.SOCK_STREAM) |
| client_socket.connect((host_ip, port)) |
| client_socket.settimeout(socket_timeout) |
| return client_socket |
| except socket.error as serr: |
| if serr.errno == errno.ECONNREFUSED: |
| raise sc_exceptions.SocketConnectionRefused(host=host_ip, |
| port=port) |
| else: |
| raise |
| |
| |
| def _connect_socket(host, port, socket_timeout): |
| """Try to initiate a connection to a host using an ip address and a port. |
| |
| Trying couple of times until a timeout is reached in case the listening |
| host is not ready yet. |
| """ |
| |
| start = time.time() |
| while True: |
| try: |
| return _try_connect(host, port, socket_timeout) |
| except sc_exceptions.SocketConnectionRefused: |
| if time.time() - start > constants.SOCKET_CONNECT_TIMEOUT: |
| raise sc_exceptions.ConnectionTimeoutException(host=host, |
| port=port) |
| |
| |
| class QoSTestMixin(object): |
| credentials = ['primary', 'admin'] |
| force_tenant_isolation = False |
| |
| FILE_SIZE = 1024 * 1024 |
| TOLERANCE_FACTOR = 1.5 |
| BUFFER_SIZE = 512 |
| COUNT = FILE_SIZE / BUFFER_SIZE |
| LIMIT_BYTES_SEC = (constants.LIMIT_KILO_BITS_PER_SECOND * 1024 * |
| TOLERANCE_FACTOR / 8.0) |
| FILE_PATH = "/tmp/img" |
| |
| NC_PORT = 1234 |
| FILE_DOWNLOAD_TIMEOUT = 120 |
| |
| def _create_file_for_bw_tests(self, ssh_client): |
| cmd = ("(dd if=/dev/zero bs=%(bs)d count=%(count)d of=%(file_path)s) " |
| % {'bs': self.BUFFER_SIZE, 'count': self.COUNT, |
| 'file_path': self.FILE_PATH}) |
| ssh_client.exec_command(cmd, timeout=5) |
| cmd = "stat -c %%s %s" % self.FILE_PATH |
| filesize = ssh_client.exec_command(cmd, timeout=5) |
| if int(filesize.strip()) != self.FILE_SIZE: |
| raise sc_exceptions.FileCreationFailedException( |
| file=self.FILE_PATH) |
| |
| def _check_bw(self, ssh_client, host, port, expected_bw=LIMIT_BYTES_SEC): |
| utils.kill_nc_process(ssh_client) |
| cmd = ("(nc -ll -p %(port)d < %(file_path)s > /dev/null &)" % { |
| 'port': port, 'file_path': self.FILE_PATH}) |
| ssh_client.exec_command(cmd, timeout=5) |
| |
| # Open TCP socket to remote VM and download big file |
| start_time = time.time() |
| socket_timeout = self.FILE_SIZE * self.TOLERANCE_FACTOR / expected_bw |
| client_socket = _connect_socket(host, port, socket_timeout) |
| total_bytes_read = 0 |
| try: |
| while total_bytes_read < self.FILE_SIZE: |
| data = client_socket.recv(self.BUFFER_SIZE) |
| total_bytes_read += len(data) |
| |
| # Calculate and return actual BW + logging result |
| time_elapsed = time.time() - start_time |
| bytes_per_second = total_bytes_read / time_elapsed |
| |
| LOG.debug("time_elapsed = %(time_elapsed).16f, " |
| "total_bytes_read = %(total_bytes_read)d, " |
| "bytes_per_second = %(bytes_per_second)d", |
| {'time_elapsed': time_elapsed, |
| 'total_bytes_read': total_bytes_read, |
| 'bytes_per_second': bytes_per_second}) |
| return bytes_per_second <= expected_bw |
| except socket.timeout: |
| LOG.warning('Socket timeout while reading the remote file, bytes ' |
| 'read: %s', total_bytes_read) |
| utils.kill_nc_process(ssh_client) |
| return False |
| finally: |
| client_socket.close() |
| |
| def _create_ssh_client(self): |
| return ssh.Client(self.fip['floating_ip_address'], |
| CONF.validation.image_ssh_user, |
| pkey=self.keypair['private_key']) |
| |
| def _test_basic_resources(self): |
| self.setup_network_and_server() |
| self.check_connectivity(self.fip['floating_ip_address'], |
| CONF.validation.image_ssh_user, |
| self.keypair['private_key']) |
| rulesets = [{'protocol': 'tcp', |
| 'direction': 'ingress', |
| 'port_range_min': self.NC_PORT, |
| 'port_range_max': self.NC_PORT, |
| 'remote_ip_prefix': '0.0.0.0/0'}] |
| self.create_secgroup_rules(rulesets, |
| self.security_groups[-1]['id']) |
| |
| def _create_qos_policy(self): |
| policy = self.os_admin.network_client.create_qos_policy( |
| name='test-policy', |
| description='test-qos-policy', |
| shared=True) |
| return policy['policy']['id'] |
| |
| def _create_server_by_port(self, port=None): |
| """Launch an instance using a port interface; |
| |
| In case that the given port is None, a new port is created, |
| activated and configured with inbound SSH and TCP connection. |
| """ |
| # Create and activate the port that will be assign to the instance. |
| if port is None: |
| secgroup = self.create_security_group() |
| self.create_loginable_secgroup_rule( |
| secgroup_id=secgroup['id']) |
| |
| secgroup_rules = [{'protocol': 'tcp', |
| 'direction': 'ingress', |
| 'port_range_min': self.NC_PORT, |
| 'port_range_max': self.NC_PORT, |
| 'remote_ip_prefix': '0.0.0.0/0'}] |
| |
| self.create_secgroup_rules(secgroup_rules, |
| secgroup['id']) |
| |
| port = self.create_port(self.network, |
| security_groups=[secgroup['id']]) |
| self.fip = self.create_floatingip(port=port) |
| |
| keypair = self.create_keypair() |
| |
| server_kwargs = { |
| 'flavor_ref': CONF.compute.flavor_ref, |
| 'image_ref': CONF.compute.image_ref, |
| 'key_name': keypair['name'], |
| 'networks': [{'port': port['id']}], |
| } |
| |
| server = self.create_server(**server_kwargs) |
| self.wait_for_server_active(server['server']) |
| self.check_connectivity(self.fip['floating_ip_address'], |
| CONF.validation.image_ssh_user, |
| keypair['private_key']) |
| return server, port |
| |
| |
| class QoSTest(QoSTestMixin, base.BaseTempestTestCase): |
| @classmethod |
| @tutils.requires_ext(extension="qos", service="network") |
| @base_api.require_qos_rule_type(qos_consts.RULE_TYPE_BANDWIDTH_LIMIT) |
| def resource_setup(cls): |
| super(QoSTest, cls).resource_setup() |
| |
| @decorators.idempotent_id('00682a0c-b72e-11e8-b81e-8c16450ea513') |
| def test_qos_basic_and_update(self): |
| """This test covers both: |
| |
| 1) Basic QoS functionality |
| This is a basic test that check that a QoS policy with |
| a bandwidth limit rule is applied correctly by sending |
| a file from the instance to the test node. |
| Then calculating the bandwidth every ~1 sec by the number of bits |
| received / elapsed time. |
| |
| 2) Update QoS policy |
| Administrator has the ability to update existing QoS policy, |
| this test is planned to verify that: |
| - actual BW is affected as expected after updating QoS policy. |
| Test scenario: |
| 1) Associating QoS Policy with "Original_bandwidth" |
| to the test node |
| 2) BW validation - by downloading file on test node. |
| ("Original_bandwidth" is expected) |
| 3) Updating existing QoS Policy to a new BW value |
| "Updated_bandwidth" |
| 4) BW validation - by downloading file on test node. |
| ("Updated_bandwidth" is expected) |
| Note: |
| There are two options to associate QoS policy to VM: |
| "Neutron Port" or "Network", in this test |
| both options are covered. |
| """ |
| |
| # Setup resources |
| self._test_basic_resources() |
| ssh_client = self._create_ssh_client() |
| |
| # Create QoS policy |
| bw_limit_policy_id = self._create_qos_policy() |
| |
| # As admin user create QoS rule |
| rule_id = self.os_admin.network_client.create_bandwidth_limit_rule( |
| policy_id=bw_limit_policy_id, |
| max_kbps=constants.LIMIT_KILO_BITS_PER_SECOND, |
| max_burst_kbps=constants.LIMIT_KILO_BITS_PER_SECOND)[ |
| 'bandwidth_limit_rule']['id'] |
| |
| # Associate QoS to the network |
| self.os_admin.network_client.update_network( |
| self.network['id'], qos_policy_id=bw_limit_policy_id) |
| |
| # Create file on VM |
| self._create_file_for_bw_tests(ssh_client) |
| |
| # Basic test, Check that actual BW while downloading file |
| # is as expected (Original BW) |
| utils.wait_until_true(lambda: self._check_bw( |
| ssh_client, |
| self.fip['floating_ip_address'], |
| port=self.NC_PORT), |
| timeout=self.FILE_DOWNLOAD_TIMEOUT, |
| sleep=1) |
| |
| # As admin user update QoS rule |
| self.os_admin.network_client.update_bandwidth_limit_rule( |
| bw_limit_policy_id, |
| rule_id, |
| max_kbps=constants.LIMIT_KILO_BITS_PER_SECOND * 2, |
| max_burst_kbps=constants.LIMIT_KILO_BITS_PER_SECOND * 2) |
| |
| # Check that actual BW while downloading file |
| # is as expected (Update BW) |
| utils.wait_until_true(lambda: self._check_bw( |
| ssh_client, |
| self.fip['floating_ip_address'], |
| port=self.NC_PORT, |
| expected_bw=QoSTest.LIMIT_BYTES_SEC * 2), |
| timeout=self.FILE_DOWNLOAD_TIMEOUT, |
| sleep=1) |
| |
| # Create a new QoS policy |
| bw_limit_policy_id_new = self._create_qos_policy() |
| |
| # As admin user create a new QoS rule |
| rule_id_new = self.os_admin.network_client.create_bandwidth_limit_rule( |
| policy_id=bw_limit_policy_id_new, |
| max_kbps=constants.LIMIT_KILO_BITS_PER_SECOND, |
| max_burst_kbps=constants.LIMIT_KILO_BITS_PER_SECOND)[ |
| 'bandwidth_limit_rule']['id'] |
| |
| # Associate a new QoS policy to Neutron port |
| self.os_admin.network_client.update_port( |
| self.port['id'], qos_policy_id=bw_limit_policy_id_new) |
| |
| # Check that actual BW while downloading file |
| # is as expected (Original BW) |
| utils.wait_until_true(lambda: self._check_bw( |
| ssh_client, |
| self.fip['floating_ip_address'], |
| port=self.NC_PORT), |
| timeout=self.FILE_DOWNLOAD_TIMEOUT, |
| sleep=1) |
| |
| # As admin user update QoS rule |
| self.os_admin.network_client.update_bandwidth_limit_rule( |
| bw_limit_policy_id_new, |
| rule_id_new, |
| max_kbps=constants.LIMIT_KILO_BITS_PER_SECOND * 3, |
| max_burst_kbps=constants.LIMIT_KILO_BITS_PER_SECOND * 3) |
| |
| # Check that actual BW while downloading file |
| # is as expected (Update BW) |
| utils.wait_until_true(lambda: self._check_bw( |
| ssh_client, |
| self.fip['floating_ip_address'], |
| port=self.NC_PORT, expected_bw=QoSTest.LIMIT_BYTES_SEC * 3), |
| timeout=self.FILE_DOWNLOAD_TIMEOUT, |
| sleep=1) |
| |
| @decorators.idempotent_id('66e5673e-0522-11ea-8d71-362b9e155667') |
| def test_attach_previously_used_port_to_new_instance(self): |
| """The test spawns new instance using port with QoS policy. |
| |
| Ports with attached QoS policy could be used multiple times. |
| The policy rules have to be enforced on the new machines. |
| """ |
| self.network = self.create_network() |
| self.subnet = self.create_subnet(self.network) |
| self.router = self.create_router_by_client() |
| self.create_router_interface(self.router['id'], self.subnet['id']) |
| |
| vm, vm_port = self._create_server_by_port() |
| |
| port_policy = self.os_admin.network_client.create_qos_policy( |
| name='port-policy', |
| description='policy for attach', |
| shared=False)['policy'] |
| |
| rule = self.os_admin.network_client.create_bandwidth_limit_rule( |
| policy_id=port_policy['id'], |
| max_kbps=constants.LIMIT_KILO_BITS_PER_SECOND, |
| max_burst_kbps=constants.LIMIT_KILO_BITS_PER_SECOND)[ |
| 'bandwidth_limit_rule'] |
| |
| self.os_admin.network_client.update_port( |
| vm_port['id'], qos_policy_id=port_policy['id']) |
| |
| self.os_primary.servers_client.delete_server(vm['server']['id']) |
| waiters.wait_for_server_termination( |
| self.os_primary.servers_client, |
| vm['server']['id']) |
| |
| # Launch a new server using the same port with attached policy |
| self._create_server_by_port(port=vm_port) |
| |
| retrieved_port = self.os_admin.network_client.show_port( |
| vm_port['id']) |
| self.assertEqual(port_policy['id'], |
| retrieved_port['port']['qos_policy_id'], |
| """The expected policy ID is {0}, |
| the actual value is {1}""". |
| format(port_policy['id'], |
| retrieved_port['port']['qos_policy_id'])) |
| |
| retrieved_policy = self.os_admin.network_client.show_qos_policy( |
| retrieved_port['port']['qos_policy_id']) |
| |
| retrieved_rule_id = retrieved_policy['policy']['rules'][0]['id'] |
| self.assertEqual(rule['id'], |
| retrieved_rule_id, |
| """The expected rule ID is {0}, |
| the actual value is {1}""". |
| format(rule['id'], retrieved_rule_id)) |