Add ovs arping check

Related-PROD: PROD-27908

Change-Id: Ibc68294c59494972b3da81d458efc9ea70add164
diff --git a/telegraf/files/script/check_ovs_arping.py b/telegraf/files/script/check_ovs_arping.py
new file mode 100644
index 0000000..3711359
--- /dev/null
+++ b/telegraf/files/script/check_ovs_arping.py
@@ -0,0 +1,288 @@
+#!/usr/bin/env python2
+
+import argparse
+from ctypes import cdll
+import multiprocessing
+from multiprocessing.dummy import Pool
+import os
+import re
+import select
+import socket
+import struct
+import subprocess
+import time
+
+import requests
+
+
+parser = argparse.ArgumentParser()
+parser.add_argument("--host", default="mon")
+parser.add_argument("--port", default="15016")
+parser.add_argument("--arp_count", default=3)
+parser.add_argument("--threads", default=multiprocessing.cpu_count() * 20)
+parser.add_argument("--timeout", default=0.5)
+
+args = parser.parse_args()
+
+prometheus_query_api = "http://{}:{}/api/v1/query".format(args.host, args.port)
+
+libc = cdll.LoadLibrary('libc.so.6')
+setns = libc.setns
+
+HOSTNAME = socket.gethostname()
+
+# socket timeout
+TIMEOUT = args.timeout
+
+# Ethernet
+TYPEFRAME = 0x0806
+
+# ARP packet
+TYPEHRD = 1
+
+# protocol ip
+PROTOCOLTYPE = 0x0800
+
+# ARP default
+PACKETSIZE = 42
+
+ARPREQUEST = 1
+ARPREPLY = 2
+
+
+class ARPSendPacket(object):
+
+    @staticmethod
+    def _convert_mac(mac):
+        macbin = ''
+        for l in re.split(r':', mac):
+            macbin += chr(int('0x' + l, 16))
+        return macbin
+
+    def __init__(self, eth_dest, eth_src, ip_sedr, mac_sedr, ip_recvr, mac_recvr):
+        # ethernet header
+        self._eth_dest = self._convert_mac(eth_dest)
+        self._eth_src = self._convert_mac(eth_src)
+        self._type_frame = struct.pack('H', socket.htons(TYPEFRAME))
+
+        # ARP packet
+        self._type_hrd = struct.pack('H', socket.htons(TYPEHRD))
+        self._type_pro = struct.pack('H', socket.htons(PROTOCOLTYPE))
+        self._mac_len = struct.pack('B', struct.calcsize('6B'))
+        self._op = struct.pack('H', socket.htons(ARPREQUEST))
+        self._mac_sedr = self._convert_mac(mac_sedr)
+        self._ip_sedr = socket.inet_aton(ip_sedr)
+        self._mac_recvr = self._convert_mac(mac_recvr)
+        self._ip_recvr = socket.inet_aton(ip_recvr)
+        self._ip_len = struct.pack('B', len(self._ip_sedr))
+
+    def __str__(self):
+        return self._eth_dest + self._eth_src + self._type_frame + \
+               self._type_hrd + self._type_pro + self._mac_len + \
+               self._ip_len + self._op + self._mac_sedr + \
+               self._ip_sedr + self._mac_recvr + self._ip_recvr
+
+
+class Namespace(object):
+
+    @staticmethod
+    def _get_ns_path(nspath=None, nsname=None, nspid=None):
+        if nsname:
+            nspath = '/var/run/netns/%s' % nsname
+        elif nspid:
+            nspath = '/proc/%d/ns/net' % nspid
+        return nspath
+
+    def __init__(self, nsname=None, nspath=None, nspid=None):
+        self.mypath = self._get_ns_path(nspid=os.getpid())
+        self.targetpath = self._get_ns_path(nspath,
+                                            nsname=nsname,
+                                            nspid=nspid)
+        if not self.targetpath:
+            raise ValueError('invalid namespace')
+
+    def __enter__(self):
+        self.myns = open(self.mypath)
+        with open(self.targetpath) as fd:
+            setns(fd.fileno(), 0)
+
+    def __exit__(self, *args):
+        setns(self.myns.fileno(), 0)
+        self.myns.close()
+
+
+def send_arp_packet(soc, packet, timeout):
+    soc.send(packet)
+    ip_to = packet[38:42]
+    time_left = timeout
+    while True:
+        time_start = time.time()
+        srecv = select.select([soc], [], [], time_left)
+        time_received = time.time()
+        time_left -= time_received - time_start
+        if not srecv[0]:
+            return ""
+        data = soc.recv(PACKETSIZE)
+        if len(data) == PACKETSIZE and ord(data[21]) == ARPREPLY:
+            ip_from = data[28:32]
+            if ip_to == ip_from:
+                return data
+        if time_left <= 0:
+            return ""
+
+
+def make_arping(namespace, interface, mac_sedr, ip_sedr, ip_recvr):
+    with Namespace(nsname=namespace):
+        soc = socket.socket(socket.PF_PACKET, socket.SOCK_RAW)
+        soc.bind((interface, TYPEFRAME))
+        bc_packet = ARPSendPacket('ff:ff:ff:ff:ff:ff', mac_sedr, ip_sedr, mac_sedr, ip_recvr, '00:00:00:00:00:00')
+        data = send_arp_packet(soc, bc_packet.__str__(), TIMEOUT)
+        if not data:
+            return False
+        res_mac = data[6:12]
+        res_mac_str = ':'.join(x.encode('hex') for x in res_mac)
+        uc_packet = ARPSendPacket(res_mac_str, mac_sedr, ip_sedr, mac_sedr, ip_recvr, res_mac_str)
+        for _ in range(args.arp_count - 1):
+            data = send_arp_packet(soc, uc_packet.__str__(), TIMEOUT)
+            if not data:
+                return False
+            mac = data[6:12]
+            if res_mac != mac:
+                return False
+        soc.close()
+        return True
+
+
+def instant_query(expression):
+    url = prometheus_query_api + "?query=" + expression
+    result = requests.get(url).json()["data"]["result"]
+    return result
+
+
+def parse_ip_a(data):
+
+    # Expected data format:
+    # 1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN group default qlen 1000
+    #     link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00
+    #     inet 127.0.0.1/8 scope host lo
+    #        valid_lft forever preferred_lft forever
+    #     inet6 ::1/128 scope host
+    #        valid_lft forever preferred_lft forever
+
+    interfaces = {}
+    name = ""
+    mac = ""
+    addresses = []
+    for line in data.split('\n'):
+        m = re.match(r"(\d+):\s?(.+):", line)
+        if m:
+            interfaces[name] = {"addresses": addresses, "mac": mac}
+            mac = ""
+            addresses = []
+            name = m.group(2)
+        m = re.match(r"\s+link/\w+\s([0-9a-f:]+)", line)
+        if m:
+            mac = m.group(1)
+        m = re.match(r"\s+inet\s([0-9.]+)/(\d+)", line)
+        if m:
+            addresses.append((m.group(1), m.group(2)))
+    interfaces[name] = {"addresses": addresses, "mac": mac}
+    interfaces.pop("", None)
+    interfaces.pop("lo", None)
+    return interfaces
+
+
+def get_ip4_network(ip, mask):
+    host_bits = 32 - int(mask)
+    netmask = socket.inet_ntoa(struct.pack('!I', (1 << 32) - (1 << host_bits)))
+    subnet = ".".join(map(str, [i & m for i, m in zip(map(int, ip.split(".")), map(int, netmask.split(".")))]))
+    return subnet
+
+
+def gather():
+    hosted_networks = set()
+    dhcp_agents = instant_query("openstack_neutron_agent_dhcp_metadata")
+    for agent in dhcp_agents:
+        host = agent["metric"]["hostname"]
+        if host == HOSTNAME:
+            network_id = agent["metric"]["network_id"]
+            hosted_networks.add(network_id)
+
+    target_networks = set()
+    dhcp_subnets = set()
+    dhcp_enabled = instant_query("openstack_neutron_subnet_enable_dhcp")
+    for net in dhcp_enabled:
+        enabled = net["value"][1]
+        if enabled == "1":
+            subnet_id = net["metric"]["id"]
+            dhcp_subnets.add(subnet_id)
+            network_id = net["metric"]["network_id"]
+            if network_id in hosted_networks:
+                target_networks.add(network_id)
+
+    active_instances = set()
+    instances = instant_query("openstack_nova_instance_status")
+    for instance in instances:
+        status = instance["value"][1]
+        if status == "0":
+            instance_id = instance["metric"]["id"]
+            active_instances.add(instance_id)
+
+    namespaces_ifs = {}
+    for network_id in target_networks:
+        cmd = "ip netns exec qdhcp-{} ip address".format(network_id)
+        process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+        output, _ = process.communicate()
+        namespaces_ifs[network_id] = parse_ip_a(output)
+
+    ports = instant_query("openstack_neutron_port_metadata")
+    checks = []
+    pool = Pool(args.threads)
+    for port in ports:
+        network_id = port["metric"]["network_id"]
+        if network_id not in hosted_networks:
+            continue
+        subnet_id = port["metric"]["subnet_id"]
+        if subnet_id not in dhcp_subnets:
+            continue
+        device_id = port["metric"]["device_id"]
+        if device_id not in active_instances:
+            continue
+        ip_address = port["metric"]["ip_address"]
+
+        ns_ifs = namespaces_ifs[network_id]
+        if len(ns_ifs) == 0:
+            continue
+        name = ns_ifs.keys()[0]
+        interface = ns_ifs[name]
+        mac = interface.get("mac", "")
+        addresses = interface.get("addresses", [])
+        ns = "qdhcp-{}".format(network_id)
+        for addr in addresses:
+            a_ip = addr[0]
+            mask = addr[1]
+            net = get_ip4_network(a_ip, mask)
+            target_net = get_ip4_network(ip_address, mask)
+            if net == target_net:
+                result = pool.apply_async(make_arping, (ns, name, mac, a_ip, ip_address))
+                checks.append({"result": result, "network_id": network_id,
+                               "ip_address": ip_address, "device_id": device_id})
+                break
+    pool.close()
+    pool.join()
+    for check in checks:
+        result_get = check["result"].get()
+        print "instance_arping,network_id=%(network_id)s,ip_address=%(ip_address)s,id=%(id)s success=%(success)s" % \
+              {
+                  'network_id': check['network_id'],
+                  'ip_address': check['ip_address'],
+                  'id': check['device_id'],
+                  'success': int(result_get)
+              }
+
+if __name__ == "__main__":
+    try:
+        gather()
+        print "instance_arping check_up=1"
+    except Exception:
+        print "instance_arping check_up=0"
diff --git a/telegraf/init.sls b/telegraf/init.sls
index 7345d3b..99cbb0f 100644
--- a/telegraf/init.sls
+++ b/telegraf/init.sls
@@ -5,3 +5,4 @@
   {%- if pillar.telegraf.remote_agent is defined %}
   - telegraf.remote_agent
   {%- endif %}
+  - telegraf.script
diff --git a/telegraf/meta/telegraf.yml b/telegraf/meta/telegraf.yml
index 2965166..d0801fb 100644
--- a/telegraf/meta/telegraf.yml
+++ b/telegraf/meta/telegraf.yml
@@ -1,3 +1,15 @@
+{%- if pillar.neutron is defined %}
+  {%- if pillar.neutron.get('gateway', {}).get('enabled', False) == True or (pillar.neutron.get('compute',{}).get('enabled', False) == True and pillar.neutron.get('compute',{}).get('dhcp_agent_enabled', False) == True) %}
+    {%- set prometheus_address = pillar._param.stacklight_monitor_address %}
+agent:
+  input:
+    ovs_arping_check:
+      template: telegraf/files/input/exec.conf
+      commands: "/usr/local/bin/check_ovs_arping.py --host {{ prometheus_address }} --port 15016"
+      interval: 45s
+  {%- endif %}
+{%- endif %}
+
 {%- if pillar.telegraf.remote_agent is defined %}
   {%- set addresses = [] %}
   {%- for node_name, node_grains in salt['mine.get']('*', 'grains.items').items() %}
diff --git a/telegraf/script.sls b/telegraf/script.sls
new file mode 100644
index 0000000..6e058cf
--- /dev/null
+++ b/telegraf/script.sls
@@ -0,0 +1,12 @@
+{%- if pillar.neutron is defined %}
+  {%- if pillar.neutron.get('gateway', {}).get('enabled', False) == True or (pillar.neutron.get('compute',{}).get('enabled', False) == True and pillar.neutron.get('compute',{}).get('dhcp_agent_enabled', False) == True) %}
+
+ovs_arping_check_telegraf_script:
+  file.managed:
+  - name: /usr/local/bin/check_ovs_arping.py
+  - source: salt://telegraf/files/script/check_ovs_arping.py
+  - template: jinja
+  - mode: 755
+
+  {%- endif %}
+{%- endif %}