Performance improvements for the ovs checker
Related-PROD: PROD-27908
Change-Id: I49d1a28454a1c30a3710e412d44e3709117a1557
diff --git a/telegraf/files/script/check_ovs_arping.py b/telegraf/files/script/check_ovs_arping.py
index 3711359..099532f 100644
--- a/telegraf/files/script/check_ovs_arping.py
+++ b/telegraf/files/script/check_ovs_arping.py
@@ -3,7 +3,7 @@
import argparse
from ctypes import cdll
import multiprocessing
-from multiprocessing.dummy import Pool
+from multiprocessing.pool import ThreadPool
import os
import re
import select
@@ -18,13 +18,12 @@
parser = argparse.ArgumentParser()
parser.add_argument("--host", default="mon")
parser.add_argument("--port", default="15016")
-parser.add_argument("--arp_count", default=3)
-parser.add_argument("--threads", default=multiprocessing.cpu_count() * 20)
-parser.add_argument("--timeout", default=0.5)
-
+parser.add_argument("--arp_count", default=3, type=int)
+parser.add_argument("--timeout", default=0.5, type=float)
+parser.add_argument("--processes", default=multiprocessing.cpu_count(), type=int)
args = parser.parse_args()
-prometheus_query_api = "http://{}:{}/api/v1/query".format(args.host, args.port)
+PROMETHEUS_QUERY_API = "http://{}:{}/api/v1/query".format(args.host, args.port)
libc = cdll.LoadLibrary('libc.so.6')
setns = libc.setns
@@ -111,51 +110,73 @@
self.myns.close()
-def send_arp_packet(soc, packet, timeout):
- soc.send(packet)
- ip_to = packet[38:42]
+def call_process(cmd):
+ p = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ output = p.communicate()
+ return output
+
+
+def recv_arp_packets(soc, ip_bin, ip_recvrs, timeout):
time_left = timeout
+ packets = set()
+ ips_left = ip_recvrs.copy()
while True:
time_start = time.time()
srecv = select.select([soc], [], [], time_left)
time_received = time.time()
time_left -= time_received - time_start
if not srecv[0]:
- return ""
+ return packets
data = soc.recv(PACKETSIZE)
if len(data) == PACKETSIZE and ord(data[21]) == ARPREPLY:
- ip_from = data[28:32]
- if ip_to == ip_from:
- return data
- if time_left <= 0:
- return ""
+ ip_to = data[38:42]
+ if ip_to == ip_bin:
+ ip_from = socket.inet_ntoa(data[28:32])
+ if ip_from in ips_left:
+ packets.add(data)
+ ips_left.discard(ip_from)
+ if time_left <= 0 or len(ips_left) == 0:
+ return packets
-def make_arping(namespace, interface, mac_sedr, ip_sedr, ip_recvr):
+def make_subnet_arping(namespace, interface, mac_sedr, ip_sedr, ip_recvrs):
with Namespace(nsname=namespace):
soc = socket.socket(socket.PF_PACKET, socket.SOCK_RAW)
soc.bind((interface, TYPEFRAME))
- bc_packet = ARPSendPacket('ff:ff:ff:ff:ff:ff', mac_sedr, ip_sedr, mac_sedr, ip_recvr, '00:00:00:00:00:00')
- data = send_arp_packet(soc, bc_packet.__str__(), TIMEOUT)
- if not data:
- return False
- res_mac = data[6:12]
- res_mac_str = ':'.join(x.encode('hex') for x in res_mac)
- uc_packet = ARPSendPacket(res_mac_str, mac_sedr, ip_sedr, mac_sedr, ip_recvr, res_mac_str)
+ for ip_recvr in ip_recvrs:
+ bc_packet = ARPSendPacket('ff:ff:ff:ff:ff:ff', mac_sedr, ip_sedr, mac_sedr, ip_recvr, '00:00:00:00:00:00')
+ soc.send(bc_packet.__str__())
+ packets = recv_arp_packets(soc, socket.inet_aton(ip_sedr), ip_recvrs, TIMEOUT)
+ res_addrs = set()
+ for packet in packets:
+ ip_from = packet[28:32]
+ mac_from = packet[6:12]
+ res_addrs.add((ip_from, mac_from))
for _ in range(args.arp_count - 1):
- data = send_arp_packet(soc, uc_packet.__str__(), TIMEOUT)
- if not data:
- return False
- mac = data[6:12]
- if res_mac != mac:
- return False
+ tmp_res_addrs = set()
+ str_ips = set()
+ for addr in res_addrs:
+ ip_str = socket.inet_ntoa(addr[0])
+ str_ips.add(ip_str)
+ mac_str = ':'.join(x.encode('hex') for x in addr[1])
+ uc_packet = ARPSendPacket(mac_str, mac_sedr, ip_sedr, mac_sedr, ip_str, mac_str)
+ soc.send(uc_packet.__str__())
+ packets = recv_arp_packets(soc, socket.inet_aton(ip_sedr), str_ips, TIMEOUT)
+ for packet in packets:
+ ip_from = packet[28:32]
+ mac_from = packet[6:12]
+ tmp_res_addrs.add((ip_from, mac_from))
+ res_addrs = tmp_res_addrs
+ ips = set()
+ for addr in res_addrs:
+ ips.add(socket.inet_ntoa(addr[0]))
soc.close()
- return True
+ return ips, ip_recvrs.difference(ips)
def instant_query(expression):
- url = prometheus_query_api + "?query=" + expression
- result = requests.get(url).json()["data"]["result"]
+ params = {"query": expression}
+ result = requests.get(PROMETHEUS_QUERY_API, params=params).json()["data"]["result"]
return result
@@ -228,27 +249,35 @@
instance_id = instance["metric"]["id"]
active_instances.add(instance_id)
- namespaces_ifs = {}
+ thread_pool = ThreadPool(multiprocessing.cpu_count())
+ ns_checks = []
for network_id in target_networks:
cmd = "ip netns exec qdhcp-{} ip address".format(network_id)
- process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- output, _ = process.communicate()
- namespaces_ifs[network_id] = parse_ip_a(output)
+ result = thread_pool.apply_async(call_process, (cmd,))
+ ns_checks.append({"result": result, "network_id": network_id})
+ thread_pool.close()
+ thread_pool.join()
+
+ namespaces_ifs = {}
+ for ns_check in ns_checks:
+ network_id = ns_check["network_id"]
+ out, err = ns_check["result"].get()
+ namespaces_ifs[network_id] = parse_ip_a(out)
ports = instant_query("openstack_neutron_port_metadata")
- checks = []
- pool = Pool(args.threads)
+ check_map = {}
+ mapping = {}
for port in ports:
- network_id = port["metric"]["network_id"]
+ network_id = port["metric"].get("network_id", "")
if network_id not in hosted_networks:
continue
- subnet_id = port["metric"]["subnet_id"]
+ subnet_id = port["metric"].get("subnet_id", "")
if subnet_id not in dhcp_subnets:
continue
- device_id = port["metric"]["device_id"]
+ device_id = port["metric"].get("device_id", "")
if device_id not in active_instances:
continue
- ip_address = port["metric"]["ip_address"]
+ ip_address = port["metric"].get("ip_address", "")
ns_ifs = namespaces_ifs[network_id]
if len(ns_ifs) == 0:
@@ -264,22 +293,44 @@
net = get_ip4_network(a_ip, mask)
target_net = get_ip4_network(ip_address, mask)
if net == target_net:
- result = pool.apply_async(make_arping, (ns, name, mac, a_ip, ip_address))
- checks.append({"result": result, "network_id": network_id,
- "ip_address": ip_address, "device_id": device_id})
+ id = ns + " " + name + " " + mac + " " + a_ip
+ if id not in check_map:
+ check_map[id] = set()
+ check_map[id].add(ip_address)
+ id_long = id + " " + ip_address
+ mapping[id_long] = {"network_id": network_id, "device_id": device_id, "ip": ip_address}
break
+ checks = []
+ pool = ThreadPool(args.processes)
+ for id in check_map:
+ ns, name, mac, a_ip = id.split(' ')
+ result = pool.apply_async(make_subnet_arping, (ns, name, mac, a_ip, check_map[id]))
+ checks.append({"result": result, "id": id})
pool.close()
pool.join()
for check in checks:
- result_get = check["result"].get()
- print "instance_arping,network_id=%(network_id)s,ip_address=%(ip_address)s,id=%(id)s success=%(success)s" % \
- {
- 'network_id': check['network_id'],
- 'ip_address': check['ip_address'],
- 'id': check['device_id'],
- 'success': int(result_get)
- }
-
+ id = check["id"]
+ passed, failed = check["result"].get()
+ for ip in passed:
+ id_long = id + " " + ip
+ res = mapping[id_long]
+ print "instance_arping,network_id=%(network_id)s,ip_address=%(ip_address)s,id=%(id)s success=%(success)s" % \
+ {
+ 'network_id': res['network_id'],
+ 'ip_address': res['ip'],
+ 'id': res['device_id'],
+ 'success': "1"
+ }
+ for ip in failed:
+ id_long = id + " " + ip
+ res = mapping[id_long]
+ print "instance_arping,network_id=%(network_id)s,ip_address=%(ip_address)s,id=%(id)s success=%(success)s" % \
+ {
+ 'network_id': res['network_id'],
+ 'ip_address': res['ip'],
+ 'id': res['device_id'],
+ 'success': "0"
+ }
if __name__ == "__main__":
try:
gather()