Performance improvements for the ovs checker

Related-PROD: PROD-27908

Change-Id: I49d1a28454a1c30a3710e412d44e3709117a1557
diff --git a/telegraf/files/script/check_ovs_arping.py b/telegraf/files/script/check_ovs_arping.py
index 3711359..099532f 100644
--- a/telegraf/files/script/check_ovs_arping.py
+++ b/telegraf/files/script/check_ovs_arping.py
@@ -3,7 +3,7 @@
 import argparse
 from ctypes import cdll
 import multiprocessing
-from multiprocessing.dummy import Pool
+from multiprocessing.pool import ThreadPool
 import os
 import re
 import select
@@ -18,13 +18,12 @@
 parser = argparse.ArgumentParser()
 parser.add_argument("--host", default="mon")
 parser.add_argument("--port", default="15016")
-parser.add_argument("--arp_count", default=3)
-parser.add_argument("--threads", default=multiprocessing.cpu_count() * 20)
-parser.add_argument("--timeout", default=0.5)
-
+parser.add_argument("--arp_count", default=3, type=int)
+parser.add_argument("--timeout", default=0.5, type=float)
+parser.add_argument("--processes", default=multiprocessing.cpu_count(), type=int)
 args = parser.parse_args()
 
-prometheus_query_api = "http://{}:{}/api/v1/query".format(args.host, args.port)
+PROMETHEUS_QUERY_API = "http://{}:{}/api/v1/query".format(args.host, args.port)
 
 libc = cdll.LoadLibrary('libc.so.6')
 setns = libc.setns
@@ -111,51 +110,73 @@
         self.myns.close()
 
 
-def send_arp_packet(soc, packet, timeout):
-    soc.send(packet)
-    ip_to = packet[38:42]
+def call_process(cmd):
+    p = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+    output = p.communicate()
+    return output
+
+
+def recv_arp_packets(soc, ip_bin, ip_recvrs, timeout):
     time_left = timeout
+    packets = set()
+    ips_left = ip_recvrs.copy()
     while True:
         time_start = time.time()
         srecv = select.select([soc], [], [], time_left)
         time_received = time.time()
         time_left -= time_received - time_start
         if not srecv[0]:
-            return ""
+            return packets
         data = soc.recv(PACKETSIZE)
         if len(data) == PACKETSIZE and ord(data[21]) == ARPREPLY:
-            ip_from = data[28:32]
-            if ip_to == ip_from:
-                return data
-        if time_left <= 0:
-            return ""
+            ip_to = data[38:42]
+            if ip_to == ip_bin:
+                ip_from = socket.inet_ntoa(data[28:32])
+                if ip_from in ips_left:
+                    packets.add(data)
+                    ips_left.discard(ip_from)
+        if time_left <= 0 or len(ips_left) == 0:
+            return packets
 
 
-def make_arping(namespace, interface, mac_sedr, ip_sedr, ip_recvr):
+def make_subnet_arping(namespace, interface, mac_sedr, ip_sedr, ip_recvrs):
     with Namespace(nsname=namespace):
         soc = socket.socket(socket.PF_PACKET, socket.SOCK_RAW)
         soc.bind((interface, TYPEFRAME))
-        bc_packet = ARPSendPacket('ff:ff:ff:ff:ff:ff', mac_sedr, ip_sedr, mac_sedr, ip_recvr, '00:00:00:00:00:00')
-        data = send_arp_packet(soc, bc_packet.__str__(), TIMEOUT)
-        if not data:
-            return False
-        res_mac = data[6:12]
-        res_mac_str = ':'.join(x.encode('hex') for x in res_mac)
-        uc_packet = ARPSendPacket(res_mac_str, mac_sedr, ip_sedr, mac_sedr, ip_recvr, res_mac_str)
+        for ip_recvr in ip_recvrs:
+            bc_packet = ARPSendPacket('ff:ff:ff:ff:ff:ff', mac_sedr, ip_sedr, mac_sedr, ip_recvr, '00:00:00:00:00:00')
+            soc.send(bc_packet.__str__())
+        packets = recv_arp_packets(soc, socket.inet_aton(ip_sedr), ip_recvrs, TIMEOUT)
+        res_addrs = set()
+        for packet in packets:
+            ip_from = packet[28:32]
+            mac_from = packet[6:12]
+            res_addrs.add((ip_from, mac_from))
         for _ in range(args.arp_count - 1):
-            data = send_arp_packet(soc, uc_packet.__str__(), TIMEOUT)
-            if not data:
-                return False
-            mac = data[6:12]
-            if res_mac != mac:
-                return False
+            tmp_res_addrs = set()
+            str_ips = set()
+            for addr in res_addrs:
+                ip_str = socket.inet_ntoa(addr[0])
+                str_ips.add(ip_str)
+                mac_str = ':'.join(x.encode('hex') for x in addr[1])
+                uc_packet = ARPSendPacket(mac_str, mac_sedr, ip_sedr, mac_sedr, ip_str, mac_str)
+                soc.send(uc_packet.__str__())
+            packets = recv_arp_packets(soc, socket.inet_aton(ip_sedr), str_ips, TIMEOUT)
+            for packet in packets:
+                ip_from = packet[28:32]
+                mac_from = packet[6:12]
+                tmp_res_addrs.add((ip_from, mac_from))
+            res_addrs = tmp_res_addrs
+        ips = set()
+        for addr in res_addrs:
+            ips.add(socket.inet_ntoa(addr[0]))
         soc.close()
-        return True
+        return ips, ip_recvrs.difference(ips)
 
 
 def instant_query(expression):
-    url = prometheus_query_api + "?query=" + expression
-    result = requests.get(url).json()["data"]["result"]
+    params = {"query": expression}
+    result = requests.get(PROMETHEUS_QUERY_API, params=params).json()["data"]["result"]
     return result
 
 
@@ -228,27 +249,35 @@
             instance_id = instance["metric"]["id"]
             active_instances.add(instance_id)
 
-    namespaces_ifs = {}
+    thread_pool = ThreadPool(multiprocessing.cpu_count())
+    ns_checks = []
     for network_id in target_networks:
         cmd = "ip netns exec qdhcp-{} ip address".format(network_id)
-        process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
-        output, _ = process.communicate()
-        namespaces_ifs[network_id] = parse_ip_a(output)
+        result = thread_pool.apply_async(call_process, (cmd,))
+        ns_checks.append({"result": result, "network_id": network_id})
+    thread_pool.close()
+    thread_pool.join()
+
+    namespaces_ifs = {}
+    for ns_check in ns_checks:
+        network_id = ns_check["network_id"]
+        out, err = ns_check["result"].get()
+        namespaces_ifs[network_id] = parse_ip_a(out)
 
     ports = instant_query("openstack_neutron_port_metadata")
-    checks = []
-    pool = Pool(args.threads)
+    check_map = {}
+    mapping = {}
     for port in ports:
-        network_id = port["metric"]["network_id"]
+        network_id = port["metric"].get("network_id", "")
         if network_id not in hosted_networks:
             continue
-        subnet_id = port["metric"]["subnet_id"]
+        subnet_id = port["metric"].get("subnet_id", "")
         if subnet_id not in dhcp_subnets:
             continue
-        device_id = port["metric"]["device_id"]
+        device_id = port["metric"].get("device_id", "")
         if device_id not in active_instances:
             continue
-        ip_address = port["metric"]["ip_address"]
+        ip_address = port["metric"].get("ip_address", "")
 
         ns_ifs = namespaces_ifs[network_id]
         if len(ns_ifs) == 0:
@@ -264,22 +293,44 @@
             net = get_ip4_network(a_ip, mask)
             target_net = get_ip4_network(ip_address, mask)
             if net == target_net:
-                result = pool.apply_async(make_arping, (ns, name, mac, a_ip, ip_address))
-                checks.append({"result": result, "network_id": network_id,
-                               "ip_address": ip_address, "device_id": device_id})
+                id = ns + " " + name + " " + mac + " " + a_ip
+                if id not in check_map:
+                    check_map[id] = set()
+                check_map[id].add(ip_address)
+                id_long = id + " " + ip_address
+                mapping[id_long] = {"network_id": network_id, "device_id": device_id, "ip": ip_address}
                 break
+    checks = []
+    pool = ThreadPool(args.processes)
+    for id in check_map:
+        ns, name, mac, a_ip = id.split(' ')
+        result = pool.apply_async(make_subnet_arping, (ns, name, mac, a_ip, check_map[id]))
+        checks.append({"result": result, "id": id})
     pool.close()
     pool.join()
     for check in checks:
-        result_get = check["result"].get()
-        print "instance_arping,network_id=%(network_id)s,ip_address=%(ip_address)s,id=%(id)s success=%(success)s" % \
-              {
-                  'network_id': check['network_id'],
-                  'ip_address': check['ip_address'],
-                  'id': check['device_id'],
-                  'success': int(result_get)
-              }
-
+        id = check["id"]
+        passed, failed = check["result"].get()
+        for ip in passed:
+            id_long = id + " " + ip
+            res = mapping[id_long]
+            print "instance_arping,network_id=%(network_id)s,ip_address=%(ip_address)s,id=%(id)s success=%(success)s" % \
+                  {
+                      'network_id': res['network_id'],
+                      'ip_address': res['ip'],
+                      'id': res['device_id'],
+                      'success': "1"
+                  }
+        for ip in failed:
+            id_long = id + " " + ip
+            res = mapping[id_long]
+            print "instance_arping,network_id=%(network_id)s,ip_address=%(ip_address)s,id=%(id)s success=%(success)s" % \
+                  {
+                      'network_id': res['network_id'],
+                      'ip_address': res['ip'],
+                      'id': res['device_id'],
+                      'success': "0"
+                  }
 if __name__ == "__main__":
     try:
         gather()