Add UDP test scenario

Update test_healthmonitor_traffic & test_basic_traffic tests
to support UDP traffic in test_traffic_ops
Add simple UDP test in test_ipv6_traffic_ops

Add a UDP test server, merged with the existing HTTP test server.

Change-Id: I1e497b75672753ed0e7acf482bc0e4a6138d3437
diff --git a/octavia_tempest_plugin/tests/test_base.py b/octavia_tempest_plugin/tests/test_base.py
index 2f94bcc..aab78c1 100644
--- a/octavia_tempest_plugin/tests/test_base.py
+++ b/octavia_tempest_plugin/tests/test_base.py
@@ -12,12 +12,14 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
+import errno
 import ipaddress
 import pkg_resources
 import random
 import requests
 import shlex
 import six
+import socket
 import string
 import subprocess
 import tempfile
@@ -45,6 +47,9 @@
 RETRY_BACKOFF = 1
 RETRY_MAX = 5
 
+SRC_PORT_NUMBER_MIN = 32768
+SRC_PORT_NUMBER_MAX = 61000
+
 
 class LoadBalancerBaseTest(test.BaseTestCase):
     """Base class for load balancer tests."""
@@ -61,6 +66,8 @@
     webserver2_response = 5
     used_ips = []
 
+    src_port_number = SRC_PORT_NUMBER_MIN
+
     @classmethod
     def skip_checks(cls):
         """Check if we should skip all of the children tests."""
@@ -548,6 +555,34 @@
                 cls.lb_mem_SGr_client.delete_security_group_rule,
                 cls.lb_mem_SGr_client.show_security_group_rule,
                 SGr['id'])
+            # Create a security group rule to allow UDP 80-81 (test webservers)
+            SGr = cls.lb_mem_SGr_client.create_security_group_rule(
+                direction='ingress',
+                security_group_id=cls.lb_member_sec_group['id'],
+                protocol='udp',
+                ethertype='IPv4',
+                port_range_min=80,
+                port_range_max=81)['security_group_rule']
+            cls.addClassResourceCleanup(
+                waiters.wait_for_not_found,
+                cls.lb_mem_SGr_client.delete_security_group_rule,
+                cls.lb_mem_SGr_client.show_security_group_rule,
+                SGr['id'])
+            # Create a security group rule to allow UDP 9999 (test webservers)
+            # Port 9999 is used to illustrate health monitor ERRORs on closed
+            # ports.
+            SGr = cls.lb_mem_SGr_client.create_security_group_rule(
+                direction='ingress',
+                security_group_id=cls.lb_member_sec_group['id'],
+                protocol='udp',
+                ethertype='IPv4',
+                port_range_min=9999,
+                port_range_max=9999)['security_group_rule']
+            cls.addClassResourceCleanup(
+                waiters.wait_for_not_found,
+                cls.lb_mem_SGr_client.delete_security_group_rule,
+                cls.lb_mem_SGr_client.show_security_group_rule,
+                SGr['id'])
             # Create a security group rule to allow 22 (ssh)
             SGr = cls.lb_mem_SGr_client.create_security_group_rule(
                 direction='ingress',
@@ -575,6 +610,20 @@
                     cls.lb_mem_SGr_client.delete_security_group_rule,
                     cls.lb_mem_SGr_client.show_security_group_rule,
                     SGr['id'])
+                # Create a security group rule to allow UDP 80-81 (test
+                # webservers)
+                SGr = cls.lb_mem_SGr_client.create_security_group_rule(
+                    direction='ingress',
+                    security_group_id=cls.lb_member_sec_group['id'],
+                    protocol='udp',
+                    ethertype='IPv6',
+                    port_range_min=80,
+                    port_range_max=81)['security_group_rule']
+                cls.addClassResourceCleanup(
+                    waiters.wait_for_not_found,
+                    cls.lb_mem_SGr_client.delete_security_group_rule,
+                    cls.lb_mem_SGr_client.show_security_group_rule,
+                    SGr['id'])
                 # Create a security group rule to allow 22 (ssh)
                 SGr = cls.lb_mem_SGr_client.create_security_group_rule(
                     direction='ingress',
@@ -647,6 +696,10 @@
         cls._validate_webserver(cls.webserver1_public_ip,
                                 cls.webserver1_response)
 
+        # Validate udp server 1
+        cls._validate_udp_server(cls.webserver1_public_ip,
+                                 cls.webserver1_response)
+
         # Set up serving on webserver 2
         cls._install_start_webserver(cls.webserver2_public_ip,
                                      cls.lb_member_keypair['private_key'],
@@ -656,6 +709,10 @@
         cls._validate_webserver(cls.webserver2_public_ip,
                                 cls.webserver2_response)
 
+        # Validate udp server 2
+        cls._validate_udp_server(cls.webserver2_public_ip,
+                                 cls.webserver2_response)
+
     @classmethod
     def _create_networks(cls):
         super(LoadBalancerBaseTestWithCompute, cls)._create_networks()
@@ -796,8 +853,8 @@
     @classmethod
     def _install_start_webserver(cls, ip_address, ssh_key, start_id):
         local_file = pkg_resources.resource_filename(
-            'octavia_tempest_plugin.contrib.httpd', 'httpd.bin')
-        dest_file = '/dev/shm/httpd.bin'
+            'octavia_tempest_plugin.contrib.test_server', 'test_server.bin')
+        dest_file = '/dev/shm/test_server.bin'
 
         linux_client = remote_client.RemoteClient(
             ip_address, CONF.validation.image_ssh_user, pkey=ssh_key)
@@ -859,20 +916,101 @@
         URL = 'http://{0}:81'.format(ip_address)
         validators.validate_URL_response(URL, expected_body=str(start_id + 1))
 
-    def _wait_for_lb_functional(self, vip_address,
-                                protocol='http', verify=True):
-        session = requests.Session()
+    @classmethod
+    def _validate_udp_server(cls, ip_address, start_id):
+        res = cls._udp_request(ip_address, 80)
+        if res != str(start_id):
+            raise Exception("Response from test server doesn't match the "
+                            "expected value ({0} != {1}).".format(
+                                res, str(start_id)))
+
+        res = cls._udp_request(ip_address, 81)
+        if res != str(start_id + 1):
+            raise Exception("Response from test server doesn't match the "
+                            "expected value ({0} != {1}).".format(
+                                res, str(start_id + 1)))
+
+    @classmethod
+    def _udp_request(cls, vip_address, port=80, timeout=None):
+        if ipaddress.ip_address(vip_address).version == 6:
+            family = socket.AF_INET6
+        else:
+            family = socket.AF_INET
+
+        sock = socket.socket(family, socket.SOCK_DGRAM)
+
+        # Force the use of an incremental port number for source to avoid
+        # re-use of a previous source port that will affect the round-robin
+        # dispatch
+        while True:
+            port_number = cls.src_port_number
+            cls.src_port_number += 1
+            if cls.src_port_number >= SRC_PORT_NUMBER_MAX:
+                cls.src_port_number = SRC_PORT_NUMBER_MIN
+
+            # catch and skip already used ports on the host
+            try:
+                sock.bind(('', port_number))
+            except OSError as e:
+                # if error is 'Address already in use', try next port number
+                if e.errno != errno.EADDRINUSE:
+                    raise e
+            else:
+                # successfully bind the socket
+                break
+
+        server_address = (vip_address, port)
+        data = b"data\n"
+
+        if timeout is not None:
+            sock.settimeout(timeout)
+
+        sock.sendto(data, server_address)
+        data, addr = sock.recvfrom(4096)
+
+        sock.close()
+
+        return data.decode('utf-8')
+
+    def _wait_for_lb_functional(self, vip_address, traffic_member_count,
+                                protocol_port, protocol, verify):
+        if protocol != const.UDP:
+            session = requests.Session()
         start = time.time()
 
+        response_counts = {}
+
+        # Send requests to the load balancer until at least
+        # "traffic_member_count" members have replied (ensure network
+        # connectivity is functional between the load balancer and the membesr)
         while time.time() - start < CONF.load_balancer.build_timeout:
             try:
-                session.get("{0}://{1}".format(protocol, vip_address),
-                            timeout=2, verify=verify)
-                time.sleep(1)
-                return
+                if protocol != const.UDP:
+                    url = "{0}://{1}{2}{3}".format(
+                        protocol.lower(),
+                        vip_address,
+                        ':' if protocol_port else '',
+                        protocol_port or '')
+                    r = session.get(url, timeout=2, verify=verify)
+                    data = r.content
+                else:
+                    data = self._udp_request(vip_address, port=protocol_port,
+                                             timeout=2)
+                if data in response_counts:
+                    response_counts[data] += 1
+                else:
+                    response_counts[data] = 1
+
+                if traffic_member_count == len(response_counts):
+                    LOG.debug('Loadbalancer response totals: %s',
+                              response_counts)
+                    time.sleep(1)
+                    return
             except Exception:
                 LOG.warning('Server is not passing initial traffic. Waiting.')
                 time.sleep(1)
+
+        LOG.debug('Loadbalancer response totals: %s', response_counts)
         LOG.error('Server did not begin passing traffic within the timeout '
                   'period. Failing test.')
         raise Exception()
@@ -880,16 +1018,27 @@
     def _send_lb_request(self, handler, protocol, vip_address,
                          verify, protocol_port, num=20):
         response_counts = {}
+
         # Send a number requests to lb vip
         for i in range(num):
             try:
-                r = handler.get('{0}://{1}:{2}'.format(protocol, vip_address,
-                                                       protocol_port),
-                                timeout=2, verify=verify)
-                if r.content in response_counts:
-                    response_counts[r.content] += 1
+                if protocol != const.UDP:
+                    url = "{0}://{1}{2}{3}".format(
+                        protocol.lower(),
+                        vip_address,
+                        ':' if protocol_port else '',
+                        protocol_port or '')
+                    r = handler.get(url, timeout=2, verify=verify)
+                    data = r.content
                 else:
-                    response_counts[r.content] = 1
+                    data = self._udp_request(vip_address, port=protocol_port,
+                                             timeout=2)
+
+                if data in response_counts:
+                    response_counts[data] += 1
+                else:
+                    response_counts[data] = 1
+
             except Exception:
                 LOG.exception('Failed to send request to loadbalancer vip')
                 raise Exception('Failed to connect to lb')
@@ -897,7 +1046,7 @@
         return response_counts
 
     def _check_members_balanced_round_robin(
-            self, vip_address, traffic_member_count=2, protocol='http',
+            self, vip_address, traffic_member_count=2, protocol=const.HTTP,
             verify=True, protocol_port=80):
 
         handler = requests.Session()
@@ -912,7 +1061,7 @@
         self.assertEqual(1, len(set(response_counts.values())))
 
     def _check_members_balanced_source_ip_port(
-            self, vip_address, traffic_member_count=2, protocol='http',
+            self, vip_address, traffic_member_count=2, protocol=const.HTTP,
             verify=True, protocol_port=80):
 
         handler = requests
@@ -931,11 +1080,14 @@
             self.assertEqual(1, len(response_counts))
 
     def check_members_balanced(self, vip_address, traffic_member_count=2,
-                               protocol='http', verify=True, protocol_port=80):
+                               protocol=const.HTTP, verify=True,
+                               protocol_port=80):
 
-        if ipaddress.ip_address(vip_address).version == 6:
+        if (ipaddress.ip_address(vip_address).version == 6 and
+                protocol != const.UDP):
             vip_address = '[{}]'.format(vip_address)
-        self._wait_for_lb_functional(vip_address, protocol, verify)
+        self._wait_for_lb_functional(vip_address, traffic_member_count,
+                                     protocol_port, protocol, verify)
 
         validate_func = '_check_members_balanced_%s' % self.lb_algorithm
         validate_func = getattr(self, validate_func.lower())