protocol added
diff --git a/sensors/cp_protocol.py b/sensors/cp_protocol.py
new file mode 100644
index 0000000..9b52646
--- /dev/null
+++ b/sensors/cp_protocol.py
@@ -0,0 +1,240 @@
+#!/usr/bin/env python
+""" Protocol class """
+
+import re
+import zlib
+import json
+import binascii
+import logging
+
+import msgpack
+
+from logger import define_logger
+
+# protocol contains 2 type of packet:
+# 1 - header, which contains template schema of counters
+# 2 - body, which contains only values in order as in template
+# it uses msgpack for optimization
+#
+# packet has format:
+# begin_data_prefixSIZE\n\nDATAend_data_postfix
+# packet part has format:
+# SIZE\n\rDATA
+#
+# DATA use archivation
+
+
+class PacketException(Exception):
+ """ Exceptions from Packet"""
+ pass
+
+
+class Packet(object):
+ """ Class proceed packet by protocol"""
+
+ prefix = "begin_data_prefix"
+ postfix = "end_data_postfix"
+ header_prefix = "template"
+ # other fields
+ # is_begin
+ # is_end
+ # crc
+ # data
+ # data_len
+
+ def __init__(self):
+ # preinit
+ self.is_begin = False
+ self.is_end = False
+ self.crc = None
+ self.data = ""
+ self.data_len = None
+ self.srv_template = None
+ self.clt_template = None
+ self.tmpl_size = 0
+
+
+ def new_packet(self, part):
+ """ New packet adding """
+ # proceed packet
+ try:
+ # get size
+ local_size_s, _, part = part.partition("\n\r")
+ local_size = int(local_size_s)
+
+ # find prefix
+ begin = part.find(self.prefix)
+ if begin != -1:
+ # divide data if something before begin and prefix
+ from_i = begin + len(self.prefix)
+ part = part[from_i:]
+ # reset flags
+ self.is_begin = True
+ self.is_end = False
+ self.data = ""
+ # get size
+ data_len_s, _, part = part.partition("\n\r")
+ self.data_len = int(data_len_s)
+ # get crc
+ crc_s, _, part = part.partition("\n\r")
+ self.crc = int(crc_s)
+
+ # bad size?
+ if local_size != self.data_len:
+ raise PacketException("Part size error")
+
+ # find postfix
+ end = part.find(self.postfix)
+ if end != -1:
+ # divide postfix
+ part = part[:end]
+ self.is_end = True
+
+ self.data += part
+ # check if it is end
+ if self.is_end:
+ self.data = zlib.decompress(self.data)
+ if self.data_len != len(self.data):
+ raise PacketException("Total size error")
+ if binascii.crc32(self.data) != self.crc:
+ raise PacketException("CRC error")
+
+ # check, if it is template
+ if self.data.startswith(self.header_prefix):
+ self.srv_template = self.data
+ # template is for internal use
+ return None
+
+ # decode values list
+ vals = msgpack.unpackb(self.data)
+ dump = self.srv_template % tuple(vals)
+ return dump
+ else:
+ return None
+
+
+ except PacketException as e:
+ # if something wrong - skip packet
+ logger = logging.getLogger(__name__)
+ logger.warning("Packet skipped: %s", e)
+ self.is_begin = False
+ self.is_end = False
+ return None
+
+ except TypeError:
+ # if something wrong - skip packet
+ logger = logging.getLogger(__name__)
+ logger.warning("Packet skipped: doesn't match schema")
+ self.is_begin = False
+ self.is_end = False
+ return None
+
+ except:
+ # if something at all wrong - skip packet
+ logger = logging.getLogger(__name__)
+ logger.warning("Packet skipped: something is wrong")
+ self.is_begin = False
+ self.is_end = False
+ return None
+
+
+ @staticmethod
+ def create_packet(data, part_size):
+ """ Create packet divided by parts with part_size from data
+ No compression here """
+ # prepare data
+ data_len = "%i\n\r" % len(data)
+ header = "%s%s%s\n\r" % (Packet.prefix, data_len, binascii.crc32(data))
+ compact_data = zlib.compress(data)
+ packet = "%s%s%s" % (header, compact_data, Packet.postfix)
+
+ partheader_len = len(data_len)
+
+ beg = 0
+ end = part_size - partheader_len
+
+ result = []
+ while beg < len(packet):
+ block = packet[beg:beg+end]
+ result.append(data_len + block)
+ beg += end
+
+ return result
+
+
+ def create_packet_v2(self, data, part_size):
+ """ Create packet divided by parts with part_size from data
+ Compressed """
+ result = []
+ # create and add to result template header
+ if self.srv_template is None:
+ perf_string = json.dumps(data)
+ self.create_answer_template(perf_string)
+ template = self.header_prefix + self.srv_template
+ header = Packet.create_packet(template, part_size)
+ result.extend(header)
+
+ vals = self.get_matching_value_list(data)
+ body = msgpack.packb(vals)
+ parts = Packet.create_packet(body, part_size)
+ result.extend(parts)
+ return result
+
+
+ def get_matching_value_list(self, data):
+ """ Get values in order server expect"""
+ vals = range(0, self.tmpl_size)
+
+ try:
+ for node, groups in self.clt_template.items():
+ for group, counters in groups.items():
+ for counter, index in counters.items():
+ if not isinstance(index, dict):
+ vals[index] = data[node][group][counter]
+ else:
+ for k, i in index.items():
+ vals[i] = data[node][group][counter][k]
+
+ return vals
+
+ except (IndexError, KeyError):
+ logger = logging.getLogger(__name__)
+ logger.error("Data don't match last schema")
+ raise PacketException("Data don't match last schema")
+
+
+
+ def create_answer_template(self, perf_string):
+ """ Create template for server to insert counter values
+ Return tuple of server and clien templates + number of replaces"""
+ replacer = re.compile(": [0-9]+\.?[0-9]*")
+ # replace all values by %s
+ finditer = replacer.finditer(perf_string)
+ # server not need know positions
+ self.srv_template = ""
+ # client need positions
+ clt_template = ""
+ beg = 0
+ k = 0
+ # this could be done better?
+ for match in finditer:
+ # define input place in server template
+ self.srv_template += perf_string[beg:match.start()]
+ self.srv_template += ": %s"
+ # define match number in client template
+ clt_template += perf_string[beg:match.start()]
+ clt_template += ": %i" % k
+
+ beg = match.end()
+ k += 1
+
+ # add tail
+ self.srv_template += perf_string[beg:]
+ clt_template += perf_string[beg:]
+
+ self.tmpl_size = k
+ self.clt_template = json.loads(clt_template)
+
+
+
+define_logger(__name__)
diff --git a/sensors/cp_transport.py b/sensors/cp_transport.py
new file mode 100644
index 0000000..b8719a5
--- /dev/null
+++ b/sensors/cp_transport.py
@@ -0,0 +1,157 @@
+#!/usr/bin/env python
+""" UDP sender class """
+
+import socket
+import urlparse
+
+from cp_protocol import Packet
+from logger import define_logger
+
+
+class SenderException(Exception):
+ """ Exceptions in Sender class """
+ pass
+
+
+class Timeout(Exception):
+ """ Exceptions in Sender class """
+ pass
+
+
+class Sender(object):
+ """ UDP sender class """
+
+ def __init__(self, url=None, port=None, host="127.0.0.1", size=256):
+ """ Create connection object from input udp string or params"""
+
+ # test input
+ if url is None and port is None:
+ raise SenderException("Bad initialization")
+ if url is not None:
+ data = urlparse.urlparse(url)
+ # check schema
+ if data.scheme != "udp":
+ mes = "Bad protocol type: %s instead of UDP" % data.scheme
+ logger.error(mes)
+ raise SenderException("Bad protocol type")
+ # try to get port
+ try:
+ int_port = int(data.port)
+ except ValueError:
+ logger.error("Bad UDP port")
+ raise SenderException("Bad UDP port")
+ # save paths
+ self.sendto = (data.hostname, int_port)
+ self.bindto = (data.hostname, int_port)
+ # try to get size
+ try:
+ self.size = int(data.path.strip("/"))
+ except ValueError:
+ logger.error("Bad packet part size")
+ raise SenderException("Bad packet part size")
+ else:
+ # url is None - use size and port
+ self.sendto = (host, port)
+ self.bindto = ("0.0.0.0", port)
+ self.size = size
+
+ self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ self.binded = False
+ self.all_data = {}
+ self.send_packer = None
+
+
+ def bind(self):
+ """ Prepare for listening """
+ self.sock.bind(self.bindto)
+ self.sock.settimeout(0.5)
+ self.binded = True
+
+
+ def send(self, data):
+ """ Send data to udp socket"""
+ if self.sock.sendto(data, self.sendto) != len(data):
+ mes = "Cannot send data to %s:%s" % self.sendto
+ logger.error(mes)
+ raise SenderException("Cannot send data")
+
+
+ def send_by_protocol(self, data):
+ """ Send data by Packet protocol
+ data = dict"""
+ if self.send_packer is None:
+ self.send_packer = Packet()
+ parts = self.send_packer.create_packet_v2(data, self.size)
+ for part in parts:
+ self.send(part)
+
+
+ def recv(self):
+ """ Receive data from udp socket"""
+ # check for binding
+ if not self.binded:
+ self.bind()
+ # try to recv
+ try:
+ data, (remote_ip, _) = self.sock.recvfrom(self.size)
+ return data, remote_ip
+ except socket.timeout:
+ raise Timeout()
+
+
+ def recv_by_protocol(self):
+ """ Receive data from udp socket by Packet protocol"""
+ data, remote_ip = self.recv()
+
+ if remote_ip not in self.all_data:
+ self.all_data[remote_ip] = Packet()
+
+ return self.all_data[remote_ip].new_packet(data)
+
+
+ def recv_with_answer(self, stop_event=None):
+ """ Receive data from udp socket and send 'ok' back
+ Command port = local port + 1
+ Answer port = local port
+ Waiting for command is blocking """
+ # create command socket
+ command_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ command_port = self.bindto[1]+1
+ command_sock.bind(("0.0.0.0", command_port))
+ command_sock.settimeout(1)
+ # try to recv
+ while True:
+ try:
+ data, (remote_ip, _) = command_sock.recvfrom(self.size)
+ self.send("ok")
+ return data, remote_ip
+ except socket.timeout:
+ if stop_event is not None and stop_event.is_set():
+ # return None if we are interrupted
+ return None
+
+
+ def verified_send(self, send_host, message, max_repeat=20):
+ """ Send and verify it by answer not more then max_repeat
+ Send port = local port + 1
+ Answer port = local port
+ Return True if send is verified """
+ # create send socket
+ send_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ send_port = self.sendto[1]+1
+ for repeat in range(0, max_repeat):
+ send_sock.sendto(message, (send_host, send_port))
+ try:
+ data, remote_ip = self.recv()
+ if remote_ip == send_host and data == "ok":
+ return True
+ else:
+ logger.warning("No answer from %s, try %i", send_host, repeat)
+ except Timeout:
+ logger.warning("No answer from %s, try %i", send_host, repeat)
+
+ return False
+
+
+
+logger = define_logger(__name__)
diff --git a/sensors/logger.py b/sensors/logger.py
new file mode 100644
index 0000000..bd4c6ef
--- /dev/null
+++ b/sensors/logger.py
@@ -0,0 +1,19 @@
+#!/usr/bin/env python
+""" Logger initialization """
+
+import logging
+
+
+def define_logger(name):
+ """ Initialization of logger"""
+ logger = logging.getLogger(name)
+ logger.setLevel(logging.INFO)
+ ch = logging.StreamHandler()
+ ch.setLevel(logging.INFO)
+ logger.addHandler(ch)
+
+ log_format = '%(asctime)s - %(levelname)s - %(name)s - %(message)s'
+ formatter = logging.Formatter(log_format,
+ "%H:%M:%S")
+ ch.setFormatter(formatter)
+ return logger
diff --git a/sensors/protocol.py b/sensors/protocol.py
index fa88927..5d3bf65 100644
--- a/sensors/protocol.py
+++ b/sensors/protocol.py
@@ -1,8 +1,11 @@
+import time
import socket
import select
import cPickle as pickle
from urlparse import urlparse
+import cp_transport
+
class Timeout(Exception):
pass
@@ -104,6 +107,34 @@
raise Timeout()
+class HugeUDPTransport(ITransport, cp_transport.Sender):
+ def __init__(self, receiver, ip, port):
+ cp_transport.Sender.__init__(self, port=port, host=ip)
+ if receiver:
+ self.bind()
+
+ def send(self, data):
+ self.send_by_protocol(data)
+
+ def recv(self, timeout=None):
+ begin = time.time()
+
+ while True:
+
+ try:
+ # return not None, if packet is ready
+ ready = self.recv_by_protocol()
+ # if data ready - return it
+ if ready is not None:
+ return ready
+ # if data not ready - check if it's time to die
+ if time.time() - begin >= timeout:
+ break
+
+ except cp_transport.Timeout:
+ # no answer yet - check, if timeout end
+ if time.time() - begin >= timeout:
+ break
# -------------------------- Factory function --------------------------------
@@ -115,6 +146,9 @@
ip, port = parsed_uri.netloc.split(":")
return UDPTransport(receiver, ip=ip, port=int(port),
packer_cls=PickleSerializer)
+ elif parsed_uri.scheme == 'hugeudp':
+ ip, port = parsed_uri.netloc.split(":")
+ return HugeUDPTransport(receiver, ip=ip, port=int(port))
else:
templ = "Can't instantiate transport from {0!r}"
raise ValueError(templ.format(uri))