blob: fa8892799c0573fdaf065b9d24e38fa250dd5fc8 [file] [log] [blame]
koder aka kdanilovdda86d32015-03-16 11:20:04 +02001import socket
2import select
3import cPickle as pickle
4from urlparse import urlparse
5
6
7class Timeout(Exception):
8 pass
9
10
11# ------------------------------------- Serializers --------------------------
12
13
14class ISensortResultsSerializer(object):
15 def pack(self, data):
16 pass
17
18 def unpack(self, data):
19 pass
20
21
22class PickleSerializer(ISensortResultsSerializer):
23 def pack(self, data):
24 ndata = {key: val.value for key, val in data.items()}
25 return pickle.dumps(ndata)
26
27 def unpack(self, data):
28 return pickle.loads(data)
29
30
31# ------------------------------------- Transports ---------------------------
32
33class ITransport(object):
34 def __init__(self, receiver):
35 pass
36
37 def send(self, data):
38 pass
39
40 def recv(self, timeout=None):
41 pass
42
43
44class StdoutTransport(ITransport):
45 MIN_COL_WIDTH = 10
46
47 def __init__(self, receiver, delta=True):
48 if receiver:
49 raise ValueError("StdoutTransport don't allows receiving")
50
51 self.headers = None
52 self.line_format = ""
53 self.prev = {}
54 self.delta = delta
55
56 def send(self, data):
57 if self.headers is None:
58 self.headers = sorted(data)
59
60 for pos, header in enumerate(self.headers):
61 self.line_format += "{%s:>%s}" % (pos,
62 max(len(header) + 1,
63 self.MIN_COL_WIDTH))
64
65 print self.line_format.format(*self.headers)
66
67 if self.delta:
68 vals = [data[header].value - self.prev.get(header, 0)
69 for header in self.headers]
70
71 self.prev.update({header: data[header].value
72 for header in self.headers})
73 else:
74 vals = [data[header].value for header in self.headers]
75
76 print self.line_format.format(*vals)
77
78 def recv(self, timeout=None):
79 raise ValueError("StdoutTransport don't allows receiving")
80
81
82class UDPTransport(ITransport):
83 def __init__(self, receiver, ip, port, packer_cls):
84 self.port = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
85 if receiver:
86 self.port.bind((ip, port))
87 self.packer_cls = packer_cls
88 self.packers = {}
89 else:
90 self.packer = packer_cls()
91 self.dst = (ip, port)
92
93 def send(self, data):
94 raw_data = self.packer.pack(data)
95 self.port.sendto(raw_data, self.dst)
96
97 def recv(self, timeout=None):
98 r, _, _ = select.select([self.port], [], [], timeout)
99 if len(r) != 0:
100 raw_data, addr = self.port.recvfrom(10000)
101 packer = self.packers.setdefault(addr, self.packer_cls())
102 return addr, packer.unpack(raw_data)
103 else:
104 raise Timeout()
105
106
107# -------------------------- Factory function --------------------------------
108
109
110def create_protocol(uri, receiver=False):
111 parsed_uri = urlparse(uri)
112 if parsed_uri.scheme == 'stdout':
113 return StdoutTransport(receiver)
114 elif parsed_uri.scheme == 'udp':
115 ip, port = parsed_uri.netloc.split(":")
116 return UDPTransport(receiver, ip=ip, port=int(port),
117 packer_cls=PickleSerializer)
118 else:
119 templ = "Can't instantiate transport from {0!r}"
120 raise ValueError(templ.format(uri))