Merge "Test multi-cast messaging between VMs."
diff --git a/neutron_tempest_plugin/common/socat.py b/neutron_tempest_plugin/common/socat.py
deleted file mode 100644
index 6bd1fdc..0000000
--- a/neutron_tempest_plugin/common/socat.py
+++ /dev/null
@@ -1,105 +0,0 @@
-# Copyright 2018 Red Hat, Inc.
-# All Rights Reserved.
-#
-#    Licensed under the Apache License, Version 2.0 (the "License"); you may
-#    not use this file except in compliance with the License. You may obtain
-#    a copy of the License at
-#
-#         http://www.apache.org/licenses/LICENSE-2.0
-#
-#    Unless required by applicable law or agreed to in writing, software
-#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-#    License for the specific language governing permissions and limitations
-#    under the License.
-
-
-COMMAND = 'socat'
-
-
-class SocatAddress(object):
-
-    def __init__(self, address, args=None, options=None):
-        self.address = address
-        self.args = args
-        self.options = options
-
-    @classmethod
-    def udp_datagram(cls, host, port, options=None, ip_version=None):
-        address = 'UDP{}-DATAGRAM'.format(ip_version or '')
-        return cls(address, (host, int(port)), options)
-
-    @classmethod
-    def udp_recvfrom(cls, port, options=None, ip_version=None):
-        address = 'UDP{}-RECVFROM'.format(ip_version or '')
-        return cls(address, (int(port),), options)
-
-    @classmethod
-    def stdio(cls):
-        return cls('STDIO')
-
-    def __str__(self):
-        address = self.address
-        if self.args:
-            address += ':' + ':'.join(str(a) for a in self.args)
-        if self.options:
-            address += ',' + ','.join(str(o) for o in self.options)
-        return address
-
-    def format(self, *args, **kwargs):
-        return str(self).format(*args, **kwargs)
-
-
-STDIO = SocatAddress.stdio()
-
-
-class SocatOption(object):
-
-    def __init__(self, name, *args):
-        self.name = name
-        self.args = args
-
-    @classmethod
-    def bind(cls, host):
-        return cls('bind', host)
-
-    @classmethod
-    def fork(cls):
-        return cls('fork')
-
-    @classmethod
-    def ip_multicast_ttl(cls, ttl):
-        return cls('ip-multicast-ttl', int(ttl))
-
-    @classmethod
-    def ip_multicast_if(cls, interface_address):
-        return cls('ip-multicast-if', interface_address)
-
-    @classmethod
-    def ip_add_membership(cls, multicast_address, interface_address):
-        return cls('ip-add-membership', multicast_address, interface_address)
-
-    def __str__(self):
-        result = self.name
-        args = self.args
-        if args:
-            result += '=' + ':'.join(str(a) for a in args)
-        return result
-
-
-class SocatCommand(object):
-
-    def __init__(self, source=STDIO, destination=STDIO, command=COMMAND):
-        self.source = source
-        self.destination = destination
-        self.command = command
-
-    def __str__(self):
-        words = [self.command, self.source, self.destination]
-        return ' '.join(str(obj) for obj in words)
-
-
-def socat_command(source=STDIO, destination=STDIO, command=COMMAND):
-    command = SocatCommand(source=source, destination=destination,
-                           command=command)
-    return str(command)
diff --git a/neutron_tempest_plugin/config.py b/neutron_tempest_plugin/config.py
index 7581f3c..54dc16e 100644
--- a/neutron_tempest_plugin/config.py
+++ b/neutron_tempest_plugin/config.py
@@ -65,6 +65,12 @@
                choices=['None', 'linuxbridge', 'ovs', 'sriov'],
                help='Agent used for devstack@q-agt.service'),
 
+    # Multicast tests settings
+    cfg.StrOpt('multicast_group_range',
+               default='224.0.0.120-224.0.0.250',
+               help='Unallocated multi-cast IPv4 range, which will be used to '
+                    'test the multi-cast support.'),
+
     # Option for feature to connect via SSH to VMs using an intermediate SSH
     # server
     cfg.StrOpt('ssh_proxy_jump_host',
diff --git a/neutron_tempest_plugin/scenario/test_multicast.py b/neutron_tempest_plugin/scenario/test_multicast.py
new file mode 100644
index 0000000..cfaa73f
--- /dev/null
+++ b/neutron_tempest_plugin/scenario/test_multicast.py
@@ -0,0 +1,297 @@
+# Copyright 2018 Red Hat, Inc.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import netaddr
+from neutron_lib import constants
+from oslo_log import log
+from tempest.lib.common.utils import data_utils
+from tempest.lib import decorators
+
+from neutron_tempest_plugin.common import ssh
+from neutron_tempest_plugin.common import utils
+from neutron_tempest_plugin import config
+from neutron_tempest_plugin.scenario import base
+
+
+CONF = config.CONF
+LOG = log.getLogger(__name__)
+
+
+def get_receiver_script(group, port, hello_message, ack_message, result_file):
+
+    return """
+import socket
+import struct
+import sys
+
+multicast_group = '%(group)s'
+server_address = ('', %(port)s)
+
+# Create the socket
+sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
+
+# Bind to the server address
+sock.bind(server_address)
+
+# Tell the operating system to add the socket to the multicast group
+# on all interfaces.
+group = socket.inet_aton(multicast_group)
+mreq = struct.pack('4sL', group, socket.INADDR_ANY)
+sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
+
+# Receive/respond loop
+with open('%(result_file)s', 'w') as f:
+    f.write('%(hello_message)s')
+    f.flush()
+    data, address = sock.recvfrom(1024)
+    f.write('received ' + str(len(data)) + ' bytes from ' + str(address))
+    f.write(str(data))
+sock.sendto(b'%(ack_message)s', address)
+    """ % {'group': group,
+           'port': port,
+           'hello_message': hello_message,
+           'ack_message': ack_message,
+           'result_file': result_file}
+
+
+def get_sender_script(group, port, message, result_file):
+
+    return """
+import socket
+import sys
+
+message = b'%(message)s'
+multicast_group = ('%(group)s', %(port)s)
+
+# Create the datagram socket
+sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
+# Set the time-to-live for messages to 1 so they do not go past the
+# local network segment.
+sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 1)
+
+# Set a timeout so the socket does not block indefinitely when trying
+# to receive data.
+sock.settimeout(1)
+
+with open('%(result_file)s', 'w') as f:
+    try:
+        # Send data to the multicast group
+        sent = sock.sendto(message, multicast_group)
+
+        # Look for responses from all recipients
+        while True:
+            try:
+                data, server = sock.recvfrom(1024)
+            except socket.timeout:
+                f.write('timed out, no more responses')
+                break
+            else:
+                f.write('received reply ' + str(data) + ' from ' + str(server))
+    finally:
+        sys.stdout.write('closing socket')
+        sock.close()
+    """ % {'group': group,
+           'port': port,
+           'message': message,
+           'result_file': result_file}
+
+
+class BaseMulticastTest(object):
+
+    credentials = ['primary']
+    force_tenant_isolation = False
+
+    # Import configuration options
+    available_type_drivers = (
+        CONF.neutron_plugin_options.available_type_drivers)
+
+    hello_message = "I am waiting..."
+    multicast_port = 5007
+    multicast_message = "Big Bang"
+    receiver_output_file = "/tmp/receiver_mcast_out"
+    sender_output_file = "/tmp/sender_mcast_out"
+
+    @classmethod
+    def skip_checks(cls):
+        super(BaseMulticastTest, cls).skip_checks()
+        advanced_image_available = (
+            CONF.neutron_plugin_options.advanced_image_ref or
+            CONF.neutron_plugin_options.default_image_is_advanced)
+        if not advanced_image_available:
+            skip_reason = "This test require advanced tools for this test"
+            raise cls.skipException(skip_reason)
+
+    @classmethod
+    def resource_setup(cls):
+        super(BaseMulticastTest, cls).resource_setup()
+
+        if CONF.neutron_plugin_options.default_image_is_advanced:
+            cls.flavor_ref = CONF.compute.flavor_ref
+            cls.image_ref = CONF.compute.image_ref
+            cls.username = CONF.validation.image_ssh_user
+        else:
+            cls.flavor_ref = (
+                CONF.neutron_plugin_options.advanced_image_flavor_ref)
+            cls.image_ref = CONF.neutron_plugin_options.advanced_image_ref
+            cls.username = CONF.neutron_plugin_options.advanced_image_ssh_user
+
+        # setup basic topology for servers we can log into it
+        cls.network = cls.create_network()
+        cls.subnet = cls.create_subnet(cls.network)
+        cls.router = cls.create_router_by_client()
+        cls.create_router_interface(cls.router['id'], cls.subnet['id'])
+
+        cls.keypair = cls.create_keypair()
+
+        cls.secgroup = cls.os_primary.network_client.create_security_group(
+            name='secgroup_mtu')
+        cls.security_groups.append(cls.secgroup['security_group'])
+        cls.create_loginable_secgroup_rule(
+            secgroup_id=cls.secgroup['security_group']['id'])
+        cls.create_pingable_secgroup_rule(
+            secgroup_id=cls.secgroup['security_group']['id'])
+        # Create security group rule for UDP (multicast traffic)
+        cls.create_secgroup_rules(
+            rule_list=[dict(protocol=constants.PROTO_NAME_UDP,
+                            direction=constants.INGRESS_DIRECTION,
+                            remote_ip_prefix=cls.any_addresses,
+                            ethertype=cls.ethertype)],
+            secgroup_id=cls.secgroup['security_group']['id'])
+
+        # Multicast IP range to be used for multicast group IP asignement
+        if '-' in cls.multicast_group_range:
+            multicast_group_range = netaddr.IPRange(
+                *cls.multicast_group_range.split('-'))
+        else:
+            multicast_group_range = netaddr.IPNetwork(
+                cls.multicast_group_range)
+        cls.multicast_group_iter = iter(multicast_group_range)
+
+    def _create_server(self):
+        name = data_utils.rand_name("multicast-server")
+        server = self.create_server(
+            flavor_ref=self.flavor_ref,
+            image_ref=self.image_ref,
+            key_name=self.keypair['name'], name=name,
+            networks=[{'uuid': self.network['id']}],
+            security_groups=[{'name': self.secgroup['security_group']['name']}]
+        )['server']
+        self.wait_for_server_active(server)
+        port = self.client.list_ports(
+            network_id=self.network['id'], device_id=server['id'])['ports'][0]
+        server['fip'] = self.create_floatingip(port=port)
+        return server
+
+    def _prepare_sender(self, server, mcast_address):
+        check_script = get_sender_script(
+            group=mcast_address, port=self.multicast_port,
+            message=self.multicast_message,
+            result_file=self.sender_output_file)
+        ssh_client = ssh.Client(server['fip']['floating_ip_address'],
+                                self.username,
+                                pkey=self.keypair['private_key'])
+
+        ssh_client.execute_script(
+            'echo "%s" > ~/multicast_traffic_sender.py' % check_script)
+        return ssh_client
+
+    def _prepare_receiver(self, server, mcast_address):
+        check_script = get_receiver_script(
+            group=mcast_address, port=self.multicast_port,
+            hello_message=self.hello_message, ack_message=server['id'],
+            result_file=self.receiver_output_file)
+        ssh_client = ssh.Client(
+            server['fip']['floating_ip_address'],
+            self.username,
+            pkey=self.keypair['private_key'])
+        ssh_client.execute_script(
+            'echo "%s" > ~/multicast_traffic_receiver.py' % check_script)
+        return ssh_client
+
+    @decorators.idempotent_id('113486fc-24c9-4be4-8361-03b1c9892867')
+    def test_multicast_between_vms_on_same_network(self):
+        """Test multicast messaging between two servers on the same network
+
+        [Sender server] -> (Multicast network) -> [Receiver server]
+        """
+        sender = self._create_server()
+        receivers = [self._create_server() for _ in range(1)]
+        # Sender can be also receiver of multicast traffic
+        receivers.append(sender)
+        self._check_multicast_conectivity(sender=sender, receivers=receivers)
+
+    def _check_multicast_conectivity(self, sender, receivers):
+        """Test multi-cast messaging between two servers
+
+        [Sender server] -> ... some network topology ... -> [Receiver server]
+        """
+        mcast_address = next(self.multicast_group_iter)
+        LOG.debug("Multicast group address: %s", mcast_address)
+
+        def _message_received(client, msg, file_path):
+            result = client.execute_script(
+                "cat {path} || echo '{path} not exists yet'".format(
+                    path=file_path))
+            return msg in result
+
+        sender_ssh_client = self._prepare_sender(sender, mcast_address)
+        receiver_ssh_clients = []
+        receiver_ids = []
+        for receiver in receivers:
+            receiver_ssh_client = self._prepare_receiver(
+                receiver, mcast_address)
+            receiver_ssh_client.execute_script(
+                "python3 ~/multicast_traffic_receiver.py &", shell="bash")
+            utils.wait_until_true(
+                lambda: _message_received(
+                    receiver_ssh_client, self.hello_message,
+                    self.receiver_output_file),
+                exception=RuntimeError(
+                    "Receiver script didn't start properly on server "
+                    "{!r}.".format(receiver['id'])))
+
+            receiver_ssh_clients.append(receiver_ssh_client)
+            receiver_ids.append(receiver['id'])
+
+        # Now lets run scripts on sender
+        sender_ssh_client.execute_script(
+            "python3 ~/multicast_traffic_sender.py")
+
+        # And check if message was received
+        for receiver_ssh_client in receiver_ssh_clients:
+            utils.wait_until_true(
+                lambda: _message_received(
+                    receiver_ssh_client, self.multicast_message,
+                    self.receiver_output_file),
+                exception=RuntimeError(
+                    "Receiver {!r} didn't get multicast message".format(
+                        receiver['id'])))
+
+        # TODO(slaweq): add validation of answears on sended server
+        replies_result = sender_ssh_client.execute_script(
+            "cat {path} || echo '{path} not exists yet'".format(
+                path=self.sender_output_file))
+        for receiver_id in receiver_ids:
+            self.assertIn(receiver_id, replies_result)
+
+
+class MulticastTestIPv4(BaseMulticastTest, base.BaseTempestTestCase):
+
+    # Import configuration options
+    multicast_group_range = CONF.neutron_plugin_options.multicast_group_range
+
+    # IP version specific parameters
+    _ip_version = constants.IP_VERSION_4
+    any_addresses = constants.IPv4_ANY