blob: 7c8aa0e592227bbd692987b68b29752dc90bb11a [file] [log] [blame]
import sys
import csv
import time
import struct
import socket
import select
import cPickle as pickle
from urlparse import urlparse
class Timeout(Exception):
pass
class CantUnpack(Exception):
pass
# ------------------------------------- Serializers --------------------------
class ISensortResultsSerializer(object):
def pack(self, data):
pass
def unpack(self, data):
pass
class StructSerializerSend(ISensortResultsSerializer):
initial_times = 5
resend_timeout = 60
HEADERS = 'h'
DATA = 'd'
END_OF_HEADERS = '\x00'
END_OF_SOURCE_ID = '\x00'
HEADERS_SEPARATOR = '\n'
def __init__(self):
self.field_order = None
self.headers_send_cycles_left = self.initial_times
self.pack_fmt = None
self.next_header_send_time = None
def pack(self, data):
data = data.copy()
source_id = data.pop("source_id")
vals = [int(data.pop("time").value)]
if self.field_order is None:
self.field_order = sorted(data.keys())
self.pack_fmt = "!I" + "I" * len(self.field_order)
need_resend = False
if self.next_header_send_time is not None:
if time.time() > self.next_header_send_time:
need_resend = True
if self.headers_send_cycles_left > 0 or need_resend:
forder = self.HEADERS_SEPARATOR.join(self.field_order)
flen = struct.pack("!H", len(self.field_order))
result = (self.HEADERS + source_id +
self.END_OF_SOURCE_ID +
socket.gethostname() +
self.END_OF_SOURCE_ID +
flen + forder + self.END_OF_HEADERS)
if self.headers_send_cycles_left > 0:
self.headers_send_cycles_left -= 1
self.next_header_send_time = time.time() + self.resend_timeout
else:
result = ""
for name in self.field_order:
vals.append(int(data[name].value))
packed_data = self.DATA + source_id
packed_data += self.END_OF_SOURCE_ID
packed_data += struct.pack(self.pack_fmt, *vals)
return result + packed_data
class StructSerializerRecv(ISensortResultsSerializer):
def __init__(self):
self.fields = {}
self.formats = {}
self.hostnames = {}
def unpack(self, data):
code = data[0]
if code == StructSerializerSend.HEADERS:
source_id, hostname, packed_data = data[1:].split(
StructSerializerSend.END_OF_SOURCE_ID, 2)
# fields order provided
flen_sz = struct.calcsize("!H")
flen = struct.unpack("!H", packed_data[:flen_sz])[0]
headers_data, rest = packed_data[flen_sz:].split(
StructSerializerSend.END_OF_HEADERS, 1)
forder = headers_data.split(
StructSerializerSend.HEADERS_SEPARATOR)
assert len(forder) == flen, \
"Wrong len {0} != {1}".format(len(forder), flen)
if 'source_id' in self.fields:
assert self.fields[source_id] == ['time'] + forder,\
"New field order"
else:
self.fields[source_id] = ['time'] + forder
self.formats[source_id] = "!I" + "I" * flen
self.hostnames[source_id] = hostname
if len(rest) != 0:
return self.unpack(rest)
return None
else:
source_id, packed_data = data[1:].split(
StructSerializerSend.END_OF_SOURCE_ID, 1)
assert code == StructSerializerSend.DATA,\
"Unknown code {0!r}".format(code)
try:
fields = self.fields[source_id]
except KeyError:
raise CantUnpack("No fields order provided"
" for {0} yet".format(source_id))
s_format = self.formats[source_id]
exp_size = struct.calcsize(s_format)
assert len(packed_data) == exp_size, \
"Wrong data len {0} != {1}".format(len(packed_data), exp_size)
vals = struct.unpack(s_format, packed_data)
res = dict(zip(fields, vals))
res['source_id'] = source_id
res['hostname'] = self.hostnames[source_id]
return res
class PickleSerializer(ISensortResultsSerializer):
def pack(self, data):
ndata = {}
for key, val in data.items():
if isinstance(val, basestring):
ndata[key] = val
else:
ndata[key] = val.value
return pickle.dumps(ndata)
def unpack(self, data):
return pickle.loads(data)
# ------------------------------------- Transports ---------------------------
class ITransport(object):
def __init__(self, receiver):
pass
def send(self, data):
pass
def recv(self, timeout=None):
pass
class StdoutTransport(ITransport):
MIN_COL_WIDTH = 10
def __init__(self, receiver, delta=True):
if receiver:
cname = self.__class__.__name__
raise ValueError("{0} don't allows receiving".format(cname))
self.headers = None
self.line_format = ""
self.prev = {}
self.delta = delta
self.fd = sys.stdout
def send(self, data):
if self.headers is None:
self.headers = sorted(data)
self.headers.remove('source_id')
for pos, header in enumerate(self.headers):
self.line_format += "{%s:>%s}" % (pos,
max(len(header) + 1,
self.MIN_COL_WIDTH))
print self.line_format.format(*self.headers)
if self.delta:
vals = [data[header].value - self.prev.get(header, 0)
for header in self.headers]
self.prev.update(dict((header, data[header].value)
for header in self.headers))
else:
vals = [data[header].value for header in self.headers]
self.fd.write(self.line_format.format(*vals) + "\n")
def recv(self, timeout=None):
cname = self.__class__.__name__
raise ValueError("{0} don't allows receiving".format(cname))
class FileTransport(StdoutTransport):
def __init__(self, receiver, fname, delta=True):
StdoutTransport.__init__(self, receiver, delta)
self.fd = open(fname, "w")
class CSVFileTransport(ITransport):
required_keys = set(['time', 'source_id'])
def __init__(self, receiver, fname):
ITransport.__init__(self, receiver)
self.fd = open(fname, "w")
self.csv_fd = csv.writer(self.fd)
self.field_list = []
self.csv_fd.writerow(['NEW_DATA'])
def send(self, data):
if self.field_list == []:
keys = set(data)
assert self.required_keys.issubset(keys)
keys -= self.required_keys
self.field_list = sorted(keys)
self.csv_fd.writerow([data['source_id'], socket.getfqdn()] +
self.field_list)
self.field_list = ['time'] + self.field_list
self.csv_fd.writerow([data[sens].value for sens in self.field_list])
class RAMTransport(ITransport):
def __init__(self, next_tr):
self.next = next_tr
self.data = []
def send(self, data):
self.data.append(data)
def flush(self):
for data in self.data:
self.next.send(data)
self.data = []
class UDPTransport(ITransport):
def __init__(self, receiver, ip, port, packer_cls):
self.port = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
if receiver:
self.port.bind((ip, port))
self.packer_cls = packer_cls
self.packers = {}
else:
self.packer = packer_cls()
self.dst = (ip, port)
def send(self, data):
raw_data = self.packer.pack(data)
self.port.sendto(raw_data, self.dst)
def recv(self, timeout=None):
r, _, _ = select.select([self.port], [], [], timeout)
if len(r) != 0:
raw_data, addr = self.port.recvfrom(10000)
packer = self.packers.setdefault(addr, self.packer_cls())
return addr, packer.unpack(raw_data)
else:
raise Timeout()
# -------------------------- Factory function --------------------------------
def create_protocol(uri, receiver=False):
if uri == 'stdout':
return StdoutTransport(receiver)
parsed_uri = urlparse(uri)
if parsed_uri.scheme == 'udp':
ip, port = parsed_uri.netloc.split(":")
if receiver:
packer_cls = StructSerializerRecv
else:
packer_cls = StructSerializerSend
return UDPTransport(receiver, ip=ip, port=int(port),
packer_cls=packer_cls)
elif parsed_uri.scheme == 'file':
return FileTransport(receiver, parsed_uri.path)
elif parsed_uri.scheme == 'csvfile':
return CSVFileTransport(receiver, parsed_uri.path)
elif parsed_uri.scheme == 'ram':
intenal_recv = CSVFileTransport(receiver, parsed_uri.path)
return RAMTransport(intenal_recv)
else:
templ = "Can't instantiate transport from {0!r}"
raise ValueError(templ.format(uri))