Merge "Test multi-cast messaging between VMs."
diff --git a/neutron_tempest_plugin/common/socat.py b/neutron_tempest_plugin/common/socat.py
deleted file mode 100644
index 6bd1fdc..0000000
--- a/neutron_tempest_plugin/common/socat.py
+++ /dev/null
@@ -1,105 +0,0 @@
-# Copyright 2018 Red Hat, Inc.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-
-COMMAND = 'socat'
-
-
-class SocatAddress(object):
-
- def __init__(self, address, args=None, options=None):
- self.address = address
- self.args = args
- self.options = options
-
- @classmethod
- def udp_datagram(cls, host, port, options=None, ip_version=None):
- address = 'UDP{}-DATAGRAM'.format(ip_version or '')
- return cls(address, (host, int(port)), options)
-
- @classmethod
- def udp_recvfrom(cls, port, options=None, ip_version=None):
- address = 'UDP{}-RECVFROM'.format(ip_version or '')
- return cls(address, (int(port),), options)
-
- @classmethod
- def stdio(cls):
- return cls('STDIO')
-
- def __str__(self):
- address = self.address
- if self.args:
- address += ':' + ':'.join(str(a) for a in self.args)
- if self.options:
- address += ',' + ','.join(str(o) for o in self.options)
- return address
-
- def format(self, *args, **kwargs):
- return str(self).format(*args, **kwargs)
-
-
-STDIO = SocatAddress.stdio()
-
-
-class SocatOption(object):
-
- def __init__(self, name, *args):
- self.name = name
- self.args = args
-
- @classmethod
- def bind(cls, host):
- return cls('bind', host)
-
- @classmethod
- def fork(cls):
- return cls('fork')
-
- @classmethod
- def ip_multicast_ttl(cls, ttl):
- return cls('ip-multicast-ttl', int(ttl))
-
- @classmethod
- def ip_multicast_if(cls, interface_address):
- return cls('ip-multicast-if', interface_address)
-
- @classmethod
- def ip_add_membership(cls, multicast_address, interface_address):
- return cls('ip-add-membership', multicast_address, interface_address)
-
- def __str__(self):
- result = self.name
- args = self.args
- if args:
- result += '=' + ':'.join(str(a) for a in args)
- return result
-
-
-class SocatCommand(object):
-
- def __init__(self, source=STDIO, destination=STDIO, command=COMMAND):
- self.source = source
- self.destination = destination
- self.command = command
-
- def __str__(self):
- words = [self.command, self.source, self.destination]
- return ' '.join(str(obj) for obj in words)
-
-
-def socat_command(source=STDIO, destination=STDIO, command=COMMAND):
- command = SocatCommand(source=source, destination=destination,
- command=command)
- return str(command)
diff --git a/neutron_tempest_plugin/config.py b/neutron_tempest_plugin/config.py
index 7581f3c..54dc16e 100644
--- a/neutron_tempest_plugin/config.py
+++ b/neutron_tempest_plugin/config.py
@@ -65,6 +65,12 @@
choices=['None', 'linuxbridge', 'ovs', 'sriov'],
help='Agent used for devstack@q-agt.service'),
+ # Multicast tests settings
+ cfg.StrOpt('multicast_group_range',
+ default='224.0.0.120-224.0.0.250',
+ help='Unallocated multi-cast IPv4 range, which will be used to '
+ 'test the multi-cast support.'),
+
# Option for feature to connect via SSH to VMs using an intermediate SSH
# server
cfg.StrOpt('ssh_proxy_jump_host',
diff --git a/neutron_tempest_plugin/scenario/test_multicast.py b/neutron_tempest_plugin/scenario/test_multicast.py
new file mode 100644
index 0000000..cfaa73f
--- /dev/null
+++ b/neutron_tempest_plugin/scenario/test_multicast.py
@@ -0,0 +1,297 @@
+# Copyright 2018 Red Hat, Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import netaddr
+from neutron_lib import constants
+from oslo_log import log
+from tempest.lib.common.utils import data_utils
+from tempest.lib import decorators
+
+from neutron_tempest_plugin.common import ssh
+from neutron_tempest_plugin.common import utils
+from neutron_tempest_plugin import config
+from neutron_tempest_plugin.scenario import base
+
+
+CONF = config.CONF
+LOG = log.getLogger(__name__)
+
+
+def get_receiver_script(group, port, hello_message, ack_message, result_file):
+
+ return """
+import socket
+import struct
+import sys
+
+multicast_group = '%(group)s'
+server_address = ('', %(port)s)
+
+# Create the socket
+sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
+
+# Bind to the server address
+sock.bind(server_address)
+
+# Tell the operating system to add the socket to the multicast group
+# on all interfaces.
+group = socket.inet_aton(multicast_group)
+mreq = struct.pack('4sL', group, socket.INADDR_ANY)
+sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
+
+# Receive/respond loop
+with open('%(result_file)s', 'w') as f:
+ f.write('%(hello_message)s')
+ f.flush()
+ data, address = sock.recvfrom(1024)
+ f.write('received ' + str(len(data)) + ' bytes from ' + str(address))
+ f.write(str(data))
+sock.sendto(b'%(ack_message)s', address)
+ """ % {'group': group,
+ 'port': port,
+ 'hello_message': hello_message,
+ 'ack_message': ack_message,
+ 'result_file': result_file}
+
+
+def get_sender_script(group, port, message, result_file):
+
+ return """
+import socket
+import sys
+
+message = b'%(message)s'
+multicast_group = ('%(group)s', %(port)s)
+
+# Create the datagram socket
+sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
+# Set the time-to-live for messages to 1 so they do not go past the
+# local network segment.
+sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 1)
+
+# Set a timeout so the socket does not block indefinitely when trying
+# to receive data.
+sock.settimeout(1)
+
+with open('%(result_file)s', 'w') as f:
+ try:
+ # Send data to the multicast group
+ sent = sock.sendto(message, multicast_group)
+
+ # Look for responses from all recipients
+ while True:
+ try:
+ data, server = sock.recvfrom(1024)
+ except socket.timeout:
+ f.write('timed out, no more responses')
+ break
+ else:
+ f.write('received reply ' + str(data) + ' from ' + str(server))
+ finally:
+ sys.stdout.write('closing socket')
+ sock.close()
+ """ % {'group': group,
+ 'port': port,
+ 'message': message,
+ 'result_file': result_file}
+
+
+class BaseMulticastTest(object):
+
+ credentials = ['primary']
+ force_tenant_isolation = False
+
+ # Import configuration options
+ available_type_drivers = (
+ CONF.neutron_plugin_options.available_type_drivers)
+
+ hello_message = "I am waiting..."
+ multicast_port = 5007
+ multicast_message = "Big Bang"
+ receiver_output_file = "/tmp/receiver_mcast_out"
+ sender_output_file = "/tmp/sender_mcast_out"
+
+ @classmethod
+ def skip_checks(cls):
+ super(BaseMulticastTest, cls).skip_checks()
+ advanced_image_available = (
+ CONF.neutron_plugin_options.advanced_image_ref or
+ CONF.neutron_plugin_options.default_image_is_advanced)
+ if not advanced_image_available:
+ skip_reason = "This test require advanced tools for this test"
+ raise cls.skipException(skip_reason)
+
+ @classmethod
+ def resource_setup(cls):
+ super(BaseMulticastTest, cls).resource_setup()
+
+ if CONF.neutron_plugin_options.default_image_is_advanced:
+ cls.flavor_ref = CONF.compute.flavor_ref
+ cls.image_ref = CONF.compute.image_ref
+ cls.username = CONF.validation.image_ssh_user
+ else:
+ cls.flavor_ref = (
+ CONF.neutron_plugin_options.advanced_image_flavor_ref)
+ cls.image_ref = CONF.neutron_plugin_options.advanced_image_ref
+ cls.username = CONF.neutron_plugin_options.advanced_image_ssh_user
+
+ # setup basic topology for servers we can log into it
+ cls.network = cls.create_network()
+ cls.subnet = cls.create_subnet(cls.network)
+ cls.router = cls.create_router_by_client()
+ cls.create_router_interface(cls.router['id'], cls.subnet['id'])
+
+ cls.keypair = cls.create_keypair()
+
+ cls.secgroup = cls.os_primary.network_client.create_security_group(
+ name='secgroup_mtu')
+ cls.security_groups.append(cls.secgroup['security_group'])
+ cls.create_loginable_secgroup_rule(
+ secgroup_id=cls.secgroup['security_group']['id'])
+ cls.create_pingable_secgroup_rule(
+ secgroup_id=cls.secgroup['security_group']['id'])
+ # Create security group rule for UDP (multicast traffic)
+ cls.create_secgroup_rules(
+ rule_list=[dict(protocol=constants.PROTO_NAME_UDP,
+ direction=constants.INGRESS_DIRECTION,
+ remote_ip_prefix=cls.any_addresses,
+ ethertype=cls.ethertype)],
+ secgroup_id=cls.secgroup['security_group']['id'])
+
+ # Multicast IP range to be used for multicast group IP asignement
+ if '-' in cls.multicast_group_range:
+ multicast_group_range = netaddr.IPRange(
+ *cls.multicast_group_range.split('-'))
+ else:
+ multicast_group_range = netaddr.IPNetwork(
+ cls.multicast_group_range)
+ cls.multicast_group_iter = iter(multicast_group_range)
+
+ def _create_server(self):
+ name = data_utils.rand_name("multicast-server")
+ server = self.create_server(
+ flavor_ref=self.flavor_ref,
+ image_ref=self.image_ref,
+ key_name=self.keypair['name'], name=name,
+ networks=[{'uuid': self.network['id']}],
+ security_groups=[{'name': self.secgroup['security_group']['name']}]
+ )['server']
+ self.wait_for_server_active(server)
+ port = self.client.list_ports(
+ network_id=self.network['id'], device_id=server['id'])['ports'][0]
+ server['fip'] = self.create_floatingip(port=port)
+ return server
+
+ def _prepare_sender(self, server, mcast_address):
+ check_script = get_sender_script(
+ group=mcast_address, port=self.multicast_port,
+ message=self.multicast_message,
+ result_file=self.sender_output_file)
+ ssh_client = ssh.Client(server['fip']['floating_ip_address'],
+ self.username,
+ pkey=self.keypair['private_key'])
+
+ ssh_client.execute_script(
+ 'echo "%s" > ~/multicast_traffic_sender.py' % check_script)
+ return ssh_client
+
+ def _prepare_receiver(self, server, mcast_address):
+ check_script = get_receiver_script(
+ group=mcast_address, port=self.multicast_port,
+ hello_message=self.hello_message, ack_message=server['id'],
+ result_file=self.receiver_output_file)
+ ssh_client = ssh.Client(
+ server['fip']['floating_ip_address'],
+ self.username,
+ pkey=self.keypair['private_key'])
+ ssh_client.execute_script(
+ 'echo "%s" > ~/multicast_traffic_receiver.py' % check_script)
+ return ssh_client
+
+ @decorators.idempotent_id('113486fc-24c9-4be4-8361-03b1c9892867')
+ def test_multicast_between_vms_on_same_network(self):
+ """Test multicast messaging between two servers on the same network
+
+ [Sender server] -> (Multicast network) -> [Receiver server]
+ """
+ sender = self._create_server()
+ receivers = [self._create_server() for _ in range(1)]
+ # Sender can be also receiver of multicast traffic
+ receivers.append(sender)
+ self._check_multicast_conectivity(sender=sender, receivers=receivers)
+
+ def _check_multicast_conectivity(self, sender, receivers):
+ """Test multi-cast messaging between two servers
+
+ [Sender server] -> ... some network topology ... -> [Receiver server]
+ """
+ mcast_address = next(self.multicast_group_iter)
+ LOG.debug("Multicast group address: %s", mcast_address)
+
+ def _message_received(client, msg, file_path):
+ result = client.execute_script(
+ "cat {path} || echo '{path} not exists yet'".format(
+ path=file_path))
+ return msg in result
+
+ sender_ssh_client = self._prepare_sender(sender, mcast_address)
+ receiver_ssh_clients = []
+ receiver_ids = []
+ for receiver in receivers:
+ receiver_ssh_client = self._prepare_receiver(
+ receiver, mcast_address)
+ receiver_ssh_client.execute_script(
+ "python3 ~/multicast_traffic_receiver.py &", shell="bash")
+ utils.wait_until_true(
+ lambda: _message_received(
+ receiver_ssh_client, self.hello_message,
+ self.receiver_output_file),
+ exception=RuntimeError(
+ "Receiver script didn't start properly on server "
+ "{!r}.".format(receiver['id'])))
+
+ receiver_ssh_clients.append(receiver_ssh_client)
+ receiver_ids.append(receiver['id'])
+
+ # Now lets run scripts on sender
+ sender_ssh_client.execute_script(
+ "python3 ~/multicast_traffic_sender.py")
+
+ # And check if message was received
+ for receiver_ssh_client in receiver_ssh_clients:
+ utils.wait_until_true(
+ lambda: _message_received(
+ receiver_ssh_client, self.multicast_message,
+ self.receiver_output_file),
+ exception=RuntimeError(
+ "Receiver {!r} didn't get multicast message".format(
+ receiver['id'])))
+
+ # TODO(slaweq): add validation of answears on sended server
+ replies_result = sender_ssh_client.execute_script(
+ "cat {path} || echo '{path} not exists yet'".format(
+ path=self.sender_output_file))
+ for receiver_id in receiver_ids:
+ self.assertIn(receiver_id, replies_result)
+
+
+class MulticastTestIPv4(BaseMulticastTest, base.BaseTempestTestCase):
+
+ # Import configuration options
+ multicast_group_range = CONF.neutron_plugin_options.multicast_group_range
+
+ # IP version specific parameters
+ _ip_version = constants.IP_VERSION_4
+ any_addresses = constants.IPv4_ANY