Test multi-cast messaging between VMs.

Test multi-cast messages are delivered bewteen two VMs
using IPv4 through the same tenant network.

This new test scenario requires advanced image to be run as
it needs to run python3 scripts and python3 is not available
on Cirros.

Co-authored-by: Slawek Kaplonski <skaplons@redhat.com>

Change-Id: Idd1589adbff6e556290f43fabbb8a23737a34adf
diff --git a/neutron_tempest_plugin/common/socat.py b/neutron_tempest_plugin/common/socat.py
deleted file mode 100644
index 6bd1fdc..0000000
--- a/neutron_tempest_plugin/common/socat.py
+++ /dev/null
@@ -1,105 +0,0 @@
-# Copyright 2018 Red Hat, Inc.
-# All Rights Reserved.
-#    Licensed under the Apache License, Version 2.0 (the "License"); you may
-#    not use this file except in compliance with the License. You may obtain
-#    a copy of the License at
-#         http://www.apache.org/licenses/LICENSE-2.0
-#    Unless required by applicable law or agreed to in writing, software
-#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-#    License for the specific language governing permissions and limitations
-#    under the License.
-COMMAND = 'socat'
-class SocatAddress(object):
-    def __init__(self, address, args=None, options=None):
-        self.address = address
-        self.args = args
-        self.options = options
-    @classmethod
-    def udp_datagram(cls, host, port, options=None, ip_version=None):
-        address = 'UDP{}-DATAGRAM'.format(ip_version or '')
-        return cls(address, (host, int(port)), options)
-    @classmethod
-    def udp_recvfrom(cls, port, options=None, ip_version=None):
-        address = 'UDP{}-RECVFROM'.format(ip_version or '')
-        return cls(address, (int(port),), options)
-    @classmethod
-    def stdio(cls):
-        return cls('STDIO')
-    def __str__(self):
-        address = self.address
-        if self.args:
-            address += ':' + ':'.join(str(a) for a in self.args)
-        if self.options:
-            address += ',' + ','.join(str(o) for o in self.options)
-        return address
-    def format(self, *args, **kwargs):
-        return str(self).format(*args, **kwargs)
-STDIO = SocatAddress.stdio()
-class SocatOption(object):
-    def __init__(self, name, *args):
-        self.name = name
-        self.args = args
-    @classmethod
-    def bind(cls, host):
-        return cls('bind', host)
-    @classmethod
-    def fork(cls):
-        return cls('fork')
-    @classmethod
-    def ip_multicast_ttl(cls, ttl):
-        return cls('ip-multicast-ttl', int(ttl))
-    @classmethod
-    def ip_multicast_if(cls, interface_address):
-        return cls('ip-multicast-if', interface_address)
-    @classmethod
-    def ip_add_membership(cls, multicast_address, interface_address):
-        return cls('ip-add-membership', multicast_address, interface_address)
-    def __str__(self):
-        result = self.name
-        args = self.args
-        if args:
-            result += '=' + ':'.join(str(a) for a in args)
-        return result
-class SocatCommand(object):
-    def __init__(self, source=STDIO, destination=STDIO, command=COMMAND):
-        self.source = source
-        self.destination = destination
-        self.command = command
-    def __str__(self):
-        words = [self.command, self.source, self.destination]
-        return ' '.join(str(obj) for obj in words)
-def socat_command(source=STDIO, destination=STDIO, command=COMMAND):
-    command = SocatCommand(source=source, destination=destination,
-                           command=command)
-    return str(command)
diff --git a/neutron_tempest_plugin/config.py b/neutron_tempest_plugin/config.py
index e07c92a..74dfcb1 100644
--- a/neutron_tempest_plugin/config.py
+++ b/neutron_tempest_plugin/config.py
@@ -65,6 +65,12 @@
                choices=['None', 'linuxbridge', 'ovs', 'sriov'],
                help='Agent used for devstack@q-agt.service'),
+    # Multicast tests settings
+    cfg.StrOpt('multicast_group_range',
+               default='',
+               help='Unallocated multi-cast IPv4 range, which will be used to '
+                    'test the multi-cast support.'),
     # Option for feature to connect via SSH to VMs using an intermediate SSH
     # server
diff --git a/neutron_tempest_plugin/scenario/test_multicast.py b/neutron_tempest_plugin/scenario/test_multicast.py
new file mode 100644
index 0000000..cfaa73f
--- /dev/null
+++ b/neutron_tempest_plugin/scenario/test_multicast.py
@@ -0,0 +1,297 @@
+# Copyright 2018 Red Hat, Inc.
+# All Rights Reserved.
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#         http://www.apache.org/licenses/LICENSE-2.0
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+import netaddr
+from neutron_lib import constants
+from oslo_log import log
+from tempest.lib.common.utils import data_utils
+from tempest.lib import decorators
+from neutron_tempest_plugin.common import ssh
+from neutron_tempest_plugin.common import utils
+from neutron_tempest_plugin import config
+from neutron_tempest_plugin.scenario import base
+CONF = config.CONF
+LOG = log.getLogger(__name__)
+def get_receiver_script(group, port, hello_message, ack_message, result_file):
+    return """
+import socket
+import struct
+import sys
+multicast_group = '%(group)s'
+server_address = ('', %(port)s)
+# Create the socket
+sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
+# Bind to the server address
+# Tell the operating system to add the socket to the multicast group
+# on all interfaces.
+group = socket.inet_aton(multicast_group)
+mreq = struct.pack('4sL', group, socket.INADDR_ANY)
+sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
+# Receive/respond loop
+with open('%(result_file)s', 'w') as f:
+    f.write('%(hello_message)s')
+    f.flush()
+    data, address = sock.recvfrom(1024)
+    f.write('received ' + str(len(data)) + ' bytes from ' + str(address))
+    f.write(str(data))
+sock.sendto(b'%(ack_message)s', address)
+    """ % {'group': group,
+           'port': port,
+           'hello_message': hello_message,
+           'ack_message': ack_message,
+           'result_file': result_file}
+def get_sender_script(group, port, message, result_file):
+    return """
+import socket
+import sys
+message = b'%(message)s'
+multicast_group = ('%(group)s', %(port)s)
+# Create the datagram socket
+sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
+# Set the time-to-live for messages to 1 so they do not go past the
+# local network segment.
+sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 1)
+# Set a timeout so the socket does not block indefinitely when trying
+# to receive data.
+with open('%(result_file)s', 'w') as f:
+    try:
+        # Send data to the multicast group
+        sent = sock.sendto(message, multicast_group)
+        # Look for responses from all recipients
+        while True:
+            try:
+                data, server = sock.recvfrom(1024)
+            except socket.timeout:
+                f.write('timed out, no more responses')
+                break
+            else:
+                f.write('received reply ' + str(data) + ' from ' + str(server))
+    finally:
+        sys.stdout.write('closing socket')
+        sock.close()
+    """ % {'group': group,
+           'port': port,
+           'message': message,
+           'result_file': result_file}
+class BaseMulticastTest(object):
+    credentials = ['primary']
+    force_tenant_isolation = False
+    # Import configuration options
+    available_type_drivers = (
+        CONF.neutron_plugin_options.available_type_drivers)
+    hello_message = "I am waiting..."
+    multicast_port = 5007
+    multicast_message = "Big Bang"
+    receiver_output_file = "/tmp/receiver_mcast_out"
+    sender_output_file = "/tmp/sender_mcast_out"
+    @classmethod
+    def skip_checks(cls):
+        super(BaseMulticastTest, cls).skip_checks()
+        advanced_image_available = (
+            CONF.neutron_plugin_options.advanced_image_ref or
+            CONF.neutron_plugin_options.default_image_is_advanced)
+        if not advanced_image_available:
+            skip_reason = "This test require advanced tools for this test"
+            raise cls.skipException(skip_reason)
+    @classmethod
+    def resource_setup(cls):
+        super(BaseMulticastTest, cls).resource_setup()
+        if CONF.neutron_plugin_options.default_image_is_advanced:
+            cls.flavor_ref = CONF.compute.flavor_ref
+            cls.image_ref = CONF.compute.image_ref
+            cls.username = CONF.validation.image_ssh_user
+        else:
+            cls.flavor_ref = (
+                CONF.neutron_plugin_options.advanced_image_flavor_ref)
+            cls.image_ref = CONF.neutron_plugin_options.advanced_image_ref
+            cls.username = CONF.neutron_plugin_options.advanced_image_ssh_user
+        # setup basic topology for servers we can log into it
+        cls.network = cls.create_network()
+        cls.subnet = cls.create_subnet(cls.network)
+        cls.router = cls.create_router_by_client()
+        cls.create_router_interface(cls.router['id'], cls.subnet['id'])
+        cls.keypair = cls.create_keypair()
+        cls.secgroup = cls.os_primary.network_client.create_security_group(
+            name='secgroup_mtu')
+        cls.security_groups.append(cls.secgroup['security_group'])
+        cls.create_loginable_secgroup_rule(
+            secgroup_id=cls.secgroup['security_group']['id'])
+        cls.create_pingable_secgroup_rule(
+            secgroup_id=cls.secgroup['security_group']['id'])
+        # Create security group rule for UDP (multicast traffic)
+        cls.create_secgroup_rules(
+            rule_list=[dict(protocol=constants.PROTO_NAME_UDP,
+                            direction=constants.INGRESS_DIRECTION,
+                            remote_ip_prefix=cls.any_addresses,
+                            ethertype=cls.ethertype)],
+            secgroup_id=cls.secgroup['security_group']['id'])
+        # Multicast IP range to be used for multicast group IP asignement
+        if '-' in cls.multicast_group_range:
+            multicast_group_range = netaddr.IPRange(
+                *cls.multicast_group_range.split('-'))
+        else:
+            multicast_group_range = netaddr.IPNetwork(
+                cls.multicast_group_range)
+        cls.multicast_group_iter = iter(multicast_group_range)
+    def _create_server(self):
+        name = data_utils.rand_name("multicast-server")
+        server = self.create_server(
+            flavor_ref=self.flavor_ref,
+            image_ref=self.image_ref,
+            key_name=self.keypair['name'], name=name,
+            networks=[{'uuid': self.network['id']}],
+            security_groups=[{'name': self.secgroup['security_group']['name']}]
+        )['server']
+        self.wait_for_server_active(server)
+        port = self.client.list_ports(
+            network_id=self.network['id'], device_id=server['id'])['ports'][0]
+        server['fip'] = self.create_floatingip(port=port)
+        return server
+    def _prepare_sender(self, server, mcast_address):
+        check_script = get_sender_script(
+            group=mcast_address, port=self.multicast_port,
+            message=self.multicast_message,
+            result_file=self.sender_output_file)
+        ssh_client = ssh.Client(server['fip']['floating_ip_address'],
+                                self.username,
+                                pkey=self.keypair['private_key'])
+        ssh_client.execute_script(
+            'echo "%s" > ~/multicast_traffic_sender.py' % check_script)
+        return ssh_client
+    def _prepare_receiver(self, server, mcast_address):
+        check_script = get_receiver_script(
+            group=mcast_address, port=self.multicast_port,
+            hello_message=self.hello_message, ack_message=server['id'],
+            result_file=self.receiver_output_file)
+        ssh_client = ssh.Client(
+            server['fip']['floating_ip_address'],
+            self.username,
+            pkey=self.keypair['private_key'])
+        ssh_client.execute_script(
+            'echo "%s" > ~/multicast_traffic_receiver.py' % check_script)
+        return ssh_client
+    @decorators.idempotent_id('113486fc-24c9-4be4-8361-03b1c9892867')
+    def test_multicast_between_vms_on_same_network(self):
+        """Test multicast messaging between two servers on the same network
+        [Sender server] -> (Multicast network) -> [Receiver server]
+        """
+        sender = self._create_server()
+        receivers = [self._create_server() for _ in range(1)]
+        # Sender can be also receiver of multicast traffic
+        receivers.append(sender)
+        self._check_multicast_conectivity(sender=sender, receivers=receivers)
+    def _check_multicast_conectivity(self, sender, receivers):
+        """Test multi-cast messaging between two servers
+        [Sender server] -> ... some network topology ... -> [Receiver server]
+        """
+        mcast_address = next(self.multicast_group_iter)
+        LOG.debug("Multicast group address: %s", mcast_address)
+        def _message_received(client, msg, file_path):
+            result = client.execute_script(
+                "cat {path} || echo '{path} not exists yet'".format(
+                    path=file_path))
+            return msg in result
+        sender_ssh_client = self._prepare_sender(sender, mcast_address)
+        receiver_ssh_clients = []
+        receiver_ids = []
+        for receiver in receivers:
+            receiver_ssh_client = self._prepare_receiver(
+                receiver, mcast_address)
+            receiver_ssh_client.execute_script(
+                "python3 ~/multicast_traffic_receiver.py &", shell="bash")
+            utils.wait_until_true(
+                lambda: _message_received(
+                    receiver_ssh_client, self.hello_message,
+                    self.receiver_output_file),
+                exception=RuntimeError(
+                    "Receiver script didn't start properly on server "
+                    "{!r}.".format(receiver['id'])))
+            receiver_ssh_clients.append(receiver_ssh_client)
+            receiver_ids.append(receiver['id'])
+        # Now lets run scripts on sender
+        sender_ssh_client.execute_script(
+            "python3 ~/multicast_traffic_sender.py")
+        # And check if message was received
+        for receiver_ssh_client in receiver_ssh_clients:
+            utils.wait_until_true(
+                lambda: _message_received(
+                    receiver_ssh_client, self.multicast_message,
+                    self.receiver_output_file),
+                exception=RuntimeError(
+                    "Receiver {!r} didn't get multicast message".format(
+                        receiver['id'])))
+        # TODO(slaweq): add validation of answears on sended server
+        replies_result = sender_ssh_client.execute_script(
+            "cat {path} || echo '{path} not exists yet'".format(
+                path=self.sender_output_file))
+        for receiver_id in receiver_ids:
+            self.assertIn(receiver_id, replies_result)
+class MulticastTestIPv4(BaseMulticastTest, base.BaseTempestTestCase):
+    # Import configuration options
+    multicast_group_range = CONF.neutron_plugin_options.multicast_group_range
+    # IP version specific parameters
+    _ip_version = constants.IP_VERSION_4
+    any_addresses = constants.IPv4_ANY