blob: fad7e00f316a2be1efdbe89c0fcd87e8fad8351f [file] [log] [blame]
koder aka kdanilov2c473092015-03-29 17:12:13 +03001import sys
Ved-vampir0c7e2d42015-03-18 17:18:47 +03002import time
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +03003import struct
koder aka kdanilovdda86d32015-03-16 11:20:04 +02004import socket
5import select
6import cPickle as pickle
7from urlparse import urlparse
8
9
10class Timeout(Exception):
11 pass
12
13
koder aka kdanilovafd98742015-04-24 01:27:22 +030014class CantUnpack(Exception):
15 pass
16
17
koder aka kdanilovdda86d32015-03-16 11:20:04 +020018# ------------------------------------- Serializers --------------------------
19
20
21class ISensortResultsSerializer(object):
22 def pack(self, data):
23 pass
24
25 def unpack(self, data):
26 pass
27
28
koder aka kdanilovafd98742015-04-24 01:27:22 +030029class StructSerializerSend(ISensortResultsSerializer):
30 initial_times = 5
31 resend_timeout = 60
32 HEADERS = 'h'
33 DATA = 'd'
34 END_OF_HEADERS = '\x00'
35 END_OF_SOURCE_ID = '\x00'
36 HEADERS_SEPARATOR = '\n'
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +030037
38 def __init__(self):
koder aka kdanilovafd98742015-04-24 01:27:22 +030039 self.field_order = None
40 self.headers_send_cycles_left = self.initial_times
41 self.pack_fmt = None
42 self.next_header_send_time = None
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +030043
44 def pack(self, data):
koder aka kdanilovafd98742015-04-24 01:27:22 +030045 data = data.copy()
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +030046
koder aka kdanilovafd98742015-04-24 01:27:22 +030047 source_id = data.pop("source_id")
48 vals = [int(data.pop("time").value)]
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +030049
koder aka kdanilovafd98742015-04-24 01:27:22 +030050 if self.field_order is None:
51 self.field_order = sorted(data.keys())
52 self.pack_fmt = "!I" + "I" * len(self.field_order)
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +030053
koder aka kdanilovafd98742015-04-24 01:27:22 +030054 need_resend = False
55 if self.next_header_send_time is not None:
56 if time.time() > self.next_header_send_time:
57 need_resend = True
58
59 if self.headers_send_cycles_left > 0 or need_resend:
60 forder = self.HEADERS_SEPARATOR.join(self.field_order)
61 flen = struct.pack("!H", len(self.field_order))
62
63 result = (self.HEADERS + source_id +
64 self.END_OF_SOURCE_ID +
65 flen + forder + self.END_OF_HEADERS)
66
67 if self.headers_send_cycles_left > 0:
68 self.headers_send_cycles_left -= 1
69
70 self.next_header_send_time = time.time() + self.resend_timeout
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +030071 else:
koder aka kdanilovafd98742015-04-24 01:27:22 +030072 result = ""
73
74 for name in self.field_order:
75 vals.append(int(data[name].value))
76
77 packed_data = self.DATA + source_id
78 packed_data += self.END_OF_SOURCE_ID
79 packed_data += struct.pack(self.pack_fmt, *vals)
80
81 return result + packed_data
82
83
84class StructSerializerRecv(ISensortResultsSerializer):
85 def __init__(self):
86 self.fields = {}
87 self.formats = {}
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +030088
89 def unpack(self, data):
90 code = data[0]
koder aka kdanilovafd98742015-04-24 01:27:22 +030091 source_id, _, packed_data = data[1:].partition(
92 StructSerializerSend.END_OF_SOURCE_ID)
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +030093
koder aka kdanilovafd98742015-04-24 01:27:22 +030094 if code == StructSerializerSend.HEADERS:
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +030095 # fields order provided
koder aka kdanilovafd98742015-04-24 01:27:22 +030096 flen_sz = struct.calcsize("!H")
97 flen = struct.unpack("!H", packed_data[:flen_sz])[0]
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +030098
koder aka kdanilovafd98742015-04-24 01:27:22 +030099 headers_data, rest = packed_data[flen_sz:].split(
100 StructSerializerSend.END_OF_HEADERS, 1)
101
102 forder = headers_data.split(
103 StructSerializerSend.HEADERS_SEPARATOR)
104
105 assert len(forder) == flen, \
106 "Wrong len {0} != {1}".format(len(forder), flen)
107
108 if 'source_id' in self.fields:
109 assert self.fields[source_id] == ['time'] + forder,\
110 "New field order"
111 else:
112 self.fields[source_id] = ['time'] + forder
113 self.formats[source_id] = "!I" + "I" * flen
114
115 if len(rest) != 0:
116 return self.unpack(rest)
117 return None
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +0300118 else:
koder aka kdanilovafd98742015-04-24 01:27:22 +0300119 assert code == StructSerializerSend.DATA,\
120 "Unknown code {0!r}".format(code)
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +0300121
koder aka kdanilovafd98742015-04-24 01:27:22 +0300122 try:
123 fields = self.fields[source_id]
124 except KeyError:
125 raise CantUnpack("No fields order provided"
126 " for {0} yet".format(source_id))
127 s_format = self.formats[source_id]
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +0300128
koder aka kdanilovafd98742015-04-24 01:27:22 +0300129 exp_size = struct.calcsize(s_format)
130 assert len(packed_data) == exp_size, \
131 "Wrong data len {0} != {1}".format(len(packed_data), exp_size)
132
133 vals = struct.unpack(s_format, packed_data)
134 res = dict(zip(fields, vals))
135 res['source_id'] = source_id
136 return res
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +0300137
138
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200139class PickleSerializer(ISensortResultsSerializer):
140 def pack(self, data):
koder aka kdanilov168f6092015-04-19 02:33:38 +0300141 ndata = {}
142 for key, val in data.items():
143 if isinstance(val, basestring):
144 ndata[key] = val
145 else:
146 ndata[key] = val.value
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200147 return pickle.dumps(ndata)
148
149 def unpack(self, data):
150 return pickle.loads(data)
151
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200152# ------------------------------------- Transports ---------------------------
153
koder aka kdanilov2c473092015-03-29 17:12:13 +0300154
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200155class ITransport(object):
156 def __init__(self, receiver):
157 pass
158
159 def send(self, data):
160 pass
161
162 def recv(self, timeout=None):
163 pass
164
165
166class StdoutTransport(ITransport):
167 MIN_COL_WIDTH = 10
168
169 def __init__(self, receiver, delta=True):
170 if receiver:
koder aka kdanilov2c473092015-03-29 17:12:13 +0300171 cname = self.__class__.__name__
172 raise ValueError("{0} don't allows receiving".format(cname))
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200173
174 self.headers = None
175 self.line_format = ""
176 self.prev = {}
177 self.delta = delta
koder aka kdanilov2c473092015-03-29 17:12:13 +0300178 self.fd = sys.stdout
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200179
180 def send(self, data):
181 if self.headers is None:
182 self.headers = sorted(data)
183
184 for pos, header in enumerate(self.headers):
185 self.line_format += "{%s:>%s}" % (pos,
186 max(len(header) + 1,
187 self.MIN_COL_WIDTH))
188
189 print self.line_format.format(*self.headers)
190
191 if self.delta:
192 vals = [data[header].value - self.prev.get(header, 0)
193 for header in self.headers]
194
koder aka kdanilov6b1341a2015-04-21 22:44:21 +0300195 self.prev.update(dict((header, data[header].value)
196 for header in self.headers))
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200197 else:
198 vals = [data[header].value for header in self.headers]
199
koder aka kdanilov2c473092015-03-29 17:12:13 +0300200 self.fd.write(self.line_format.format(*vals) + "\n")
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200201
202 def recv(self, timeout=None):
koder aka kdanilov2c473092015-03-29 17:12:13 +0300203 cname = self.__class__.__name__
204 raise ValueError("{0} don't allows receiving".format(cname))
205
206
207class FileTransport(StdoutTransport):
208 def __init__(self, receiver, fname, delta=True):
209 StdoutTransport.__init__(self, receiver, delta)
210 self.fd = open(fname, "w")
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200211
212
213class UDPTransport(ITransport):
214 def __init__(self, receiver, ip, port, packer_cls):
215 self.port = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
216 if receiver:
217 self.port.bind((ip, port))
218 self.packer_cls = packer_cls
219 self.packers = {}
220 else:
221 self.packer = packer_cls()
222 self.dst = (ip, port)
223
224 def send(self, data):
225 raw_data = self.packer.pack(data)
226 self.port.sendto(raw_data, self.dst)
227
228 def recv(self, timeout=None):
229 r, _, _ = select.select([self.port], [], [], timeout)
230 if len(r) != 0:
231 raw_data, addr = self.port.recvfrom(10000)
232 packer = self.packers.setdefault(addr, self.packer_cls())
233 return addr, packer.unpack(raw_data)
234 else:
235 raise Timeout()
236
237
238# -------------------------- Factory function --------------------------------
239
240
241def create_protocol(uri, receiver=False):
242 parsed_uri = urlparse(uri)
243 if parsed_uri.scheme == 'stdout':
244 return StdoutTransport(receiver)
245 elif parsed_uri.scheme == 'udp':
246 ip, port = parsed_uri.netloc.split(":")
koder aka kdanilovafd98742015-04-24 01:27:22 +0300247
248 if receiver:
249 packer_cls = StructSerializerRecv
250 else:
251 packer_cls = StructSerializerSend
252
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200253 return UDPTransport(receiver, ip=ip, port=int(port),
koder aka kdanilovafd98742015-04-24 01:27:22 +0300254 packer_cls=packer_cls)
koder aka kdanilov2c473092015-03-29 17:12:13 +0300255 elif parsed_uri.scheme == 'file':
256 return FileTransport(receiver, parsed_uri.path)
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200257 else:
258 templ = "Can't instantiate transport from {0!r}"
259 raise ValueError(templ.format(uri))