blob: 7c8aa0e592227bbd692987b68b29752dc90bb11a [file] [log] [blame]
koder aka kdanilov2c473092015-03-29 17:12:13 +03001import sys
koder aka kdanilov416b87a2015-05-12 00:26:04 +03002import csv
Ved-vampir0c7e2d42015-03-18 17:18:47 +03003import time
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +03004import struct
koder aka kdanilovdda86d32015-03-16 11:20:04 +02005import socket
6import select
7import cPickle as pickle
8from urlparse import urlparse
9
10
11class Timeout(Exception):
12 pass
13
14
koder aka kdanilovafd98742015-04-24 01:27:22 +030015class CantUnpack(Exception):
16 pass
17
18
koder aka kdanilovdda86d32015-03-16 11:20:04 +020019# ------------------------------------- Serializers --------------------------
20
21
22class ISensortResultsSerializer(object):
23 def pack(self, data):
24 pass
25
26 def unpack(self, data):
27 pass
28
29
koder aka kdanilovafd98742015-04-24 01:27:22 +030030class StructSerializerSend(ISensortResultsSerializer):
31 initial_times = 5
32 resend_timeout = 60
33 HEADERS = 'h'
34 DATA = 'd'
35 END_OF_HEADERS = '\x00'
36 END_OF_SOURCE_ID = '\x00'
37 HEADERS_SEPARATOR = '\n'
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +030038
39 def __init__(self):
koder aka kdanilovafd98742015-04-24 01:27:22 +030040 self.field_order = None
41 self.headers_send_cycles_left = self.initial_times
42 self.pack_fmt = None
43 self.next_header_send_time = None
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +030044
45 def pack(self, data):
koder aka kdanilovafd98742015-04-24 01:27:22 +030046 data = data.copy()
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +030047
koder aka kdanilovafd98742015-04-24 01:27:22 +030048 source_id = data.pop("source_id")
49 vals = [int(data.pop("time").value)]
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +030050
koder aka kdanilovafd98742015-04-24 01:27:22 +030051 if self.field_order is None:
52 self.field_order = sorted(data.keys())
53 self.pack_fmt = "!I" + "I" * len(self.field_order)
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +030054
koder aka kdanilovafd98742015-04-24 01:27:22 +030055 need_resend = False
56 if self.next_header_send_time is not None:
57 if time.time() > self.next_header_send_time:
58 need_resend = True
59
60 if self.headers_send_cycles_left > 0 or need_resend:
61 forder = self.HEADERS_SEPARATOR.join(self.field_order)
62 flen = struct.pack("!H", len(self.field_order))
63
64 result = (self.HEADERS + source_id +
65 self.END_OF_SOURCE_ID +
koder aka kdanilovf86d7af2015-05-06 04:01:54 +030066 socket.gethostname() +
67 self.END_OF_SOURCE_ID +
koder aka kdanilovafd98742015-04-24 01:27:22 +030068 flen + forder + self.END_OF_HEADERS)
69
70 if self.headers_send_cycles_left > 0:
71 self.headers_send_cycles_left -= 1
72
73 self.next_header_send_time = time.time() + self.resend_timeout
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +030074 else:
koder aka kdanilovafd98742015-04-24 01:27:22 +030075 result = ""
76
77 for name in self.field_order:
78 vals.append(int(data[name].value))
79
80 packed_data = self.DATA + source_id
81 packed_data += self.END_OF_SOURCE_ID
82 packed_data += struct.pack(self.pack_fmt, *vals)
83
84 return result + packed_data
85
86
87class StructSerializerRecv(ISensortResultsSerializer):
88 def __init__(self):
89 self.fields = {}
90 self.formats = {}
koder aka kdanilovf86d7af2015-05-06 04:01:54 +030091 self.hostnames = {}
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +030092
93 def unpack(self, data):
94 code = data[0]
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +030095
koder aka kdanilovafd98742015-04-24 01:27:22 +030096 if code == StructSerializerSend.HEADERS:
koder aka kdanilovf86d7af2015-05-06 04:01:54 +030097 source_id, hostname, packed_data = data[1:].split(
98 StructSerializerSend.END_OF_SOURCE_ID, 2)
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +030099 # fields order provided
koder aka kdanilovafd98742015-04-24 01:27:22 +0300100 flen_sz = struct.calcsize("!H")
101 flen = struct.unpack("!H", packed_data[:flen_sz])[0]
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +0300102
koder aka kdanilovafd98742015-04-24 01:27:22 +0300103 headers_data, rest = packed_data[flen_sz:].split(
104 StructSerializerSend.END_OF_HEADERS, 1)
105
106 forder = headers_data.split(
107 StructSerializerSend.HEADERS_SEPARATOR)
108
109 assert len(forder) == flen, \
110 "Wrong len {0} != {1}".format(len(forder), flen)
111
112 if 'source_id' in self.fields:
113 assert self.fields[source_id] == ['time'] + forder,\
114 "New field order"
115 else:
116 self.fields[source_id] = ['time'] + forder
117 self.formats[source_id] = "!I" + "I" * flen
koder aka kdanilovf86d7af2015-05-06 04:01:54 +0300118 self.hostnames[source_id] = hostname
koder aka kdanilovafd98742015-04-24 01:27:22 +0300119
120 if len(rest) != 0:
121 return self.unpack(rest)
122 return None
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +0300123 else:
koder aka kdanilovf86d7af2015-05-06 04:01:54 +0300124 source_id, packed_data = data[1:].split(
125 StructSerializerSend.END_OF_SOURCE_ID, 1)
koder aka kdanilovafd98742015-04-24 01:27:22 +0300126 assert code == StructSerializerSend.DATA,\
127 "Unknown code {0!r}".format(code)
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +0300128
koder aka kdanilovafd98742015-04-24 01:27:22 +0300129 try:
130 fields = self.fields[source_id]
131 except KeyError:
132 raise CantUnpack("No fields order provided"
133 " for {0} yet".format(source_id))
134 s_format = self.formats[source_id]
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +0300135
koder aka kdanilovafd98742015-04-24 01:27:22 +0300136 exp_size = struct.calcsize(s_format)
137 assert len(packed_data) == exp_size, \
138 "Wrong data len {0} != {1}".format(len(packed_data), exp_size)
139
140 vals = struct.unpack(s_format, packed_data)
141 res = dict(zip(fields, vals))
142 res['source_id'] = source_id
koder aka kdanilovf86d7af2015-05-06 04:01:54 +0300143 res['hostname'] = self.hostnames[source_id]
koder aka kdanilovafd98742015-04-24 01:27:22 +0300144 return res
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +0300145
146
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200147class PickleSerializer(ISensortResultsSerializer):
148 def pack(self, data):
koder aka kdanilov168f6092015-04-19 02:33:38 +0300149 ndata = {}
150 for key, val in data.items():
151 if isinstance(val, basestring):
152 ndata[key] = val
153 else:
154 ndata[key] = val.value
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200155 return pickle.dumps(ndata)
156
157 def unpack(self, data):
158 return pickle.loads(data)
159
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200160# ------------------------------------- Transports ---------------------------
161
koder aka kdanilov2c473092015-03-29 17:12:13 +0300162
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200163class ITransport(object):
164 def __init__(self, receiver):
165 pass
166
167 def send(self, data):
168 pass
169
170 def recv(self, timeout=None):
171 pass
172
173
174class StdoutTransport(ITransport):
175 MIN_COL_WIDTH = 10
176
177 def __init__(self, receiver, delta=True):
178 if receiver:
koder aka kdanilov2c473092015-03-29 17:12:13 +0300179 cname = self.__class__.__name__
180 raise ValueError("{0} don't allows receiving".format(cname))
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200181
182 self.headers = None
183 self.line_format = ""
184 self.prev = {}
185 self.delta = delta
koder aka kdanilov2c473092015-03-29 17:12:13 +0300186 self.fd = sys.stdout
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200187
188 def send(self, data):
189 if self.headers is None:
190 self.headers = sorted(data)
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300191 self.headers.remove('source_id')
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200192
193 for pos, header in enumerate(self.headers):
194 self.line_format += "{%s:>%s}" % (pos,
195 max(len(header) + 1,
196 self.MIN_COL_WIDTH))
197
198 print self.line_format.format(*self.headers)
199
200 if self.delta:
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300201
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200202 vals = [data[header].value - self.prev.get(header, 0)
203 for header in self.headers]
204
koder aka kdanilov6b1341a2015-04-21 22:44:21 +0300205 self.prev.update(dict((header, data[header].value)
206 for header in self.headers))
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200207 else:
208 vals = [data[header].value for header in self.headers]
209
koder aka kdanilov2c473092015-03-29 17:12:13 +0300210 self.fd.write(self.line_format.format(*vals) + "\n")
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200211
212 def recv(self, timeout=None):
koder aka kdanilov2c473092015-03-29 17:12:13 +0300213 cname = self.__class__.__name__
214 raise ValueError("{0} don't allows receiving".format(cname))
215
216
217class FileTransport(StdoutTransport):
218 def __init__(self, receiver, fname, delta=True):
219 StdoutTransport.__init__(self, receiver, delta)
220 self.fd = open(fname, "w")
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200221
222
koder aka kdanilov416b87a2015-05-12 00:26:04 +0300223class CSVFileTransport(ITransport):
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300224 required_keys = set(['time', 'source_id'])
koder aka kdanilov416b87a2015-05-12 00:26:04 +0300225
226 def __init__(self, receiver, fname):
227 ITransport.__init__(self, receiver)
228 self.fd = open(fname, "w")
229 self.csv_fd = csv.writer(self.fd)
230 self.field_list = []
231 self.csv_fd.writerow(['NEW_DATA'])
232
233 def send(self, data):
234 if self.field_list == []:
235 keys = set(data)
236 assert self.required_keys.issubset(keys)
237 keys -= self.required_keys
238 self.field_list = sorted(keys)
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300239 self.csv_fd.writerow([data['source_id'], socket.getfqdn()] +
koder aka kdanilov416b87a2015-05-12 00:26:04 +0300240 self.field_list)
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300241 self.field_list = ['time'] + self.field_list
koder aka kdanilov416b87a2015-05-12 00:26:04 +0300242
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300243 self.csv_fd.writerow([data[sens].value for sens in self.field_list])
244
245
246class RAMTransport(ITransport):
247 def __init__(self, next_tr):
248 self.next = next_tr
249 self.data = []
250
251 def send(self, data):
252 self.data.append(data)
253
254 def flush(self):
255 for data in self.data:
256 self.next.send(data)
257 self.data = []
koder aka kdanilov416b87a2015-05-12 00:26:04 +0300258
259
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200260class UDPTransport(ITransport):
261 def __init__(self, receiver, ip, port, packer_cls):
262 self.port = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
263 if receiver:
264 self.port.bind((ip, port))
265 self.packer_cls = packer_cls
266 self.packers = {}
267 else:
268 self.packer = packer_cls()
269 self.dst = (ip, port)
270
271 def send(self, data):
272 raw_data = self.packer.pack(data)
273 self.port.sendto(raw_data, self.dst)
274
275 def recv(self, timeout=None):
276 r, _, _ = select.select([self.port], [], [], timeout)
277 if len(r) != 0:
278 raw_data, addr = self.port.recvfrom(10000)
279 packer = self.packers.setdefault(addr, self.packer_cls())
280 return addr, packer.unpack(raw_data)
281 else:
282 raise Timeout()
283
284
285# -------------------------- Factory function --------------------------------
286
287
288def create_protocol(uri, receiver=False):
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300289 if uri == 'stdout':
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200290 return StdoutTransport(receiver)
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300291
292 parsed_uri = urlparse(uri)
293 if parsed_uri.scheme == 'udp':
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200294 ip, port = parsed_uri.netloc.split(":")
koder aka kdanilovafd98742015-04-24 01:27:22 +0300295
296 if receiver:
297 packer_cls = StructSerializerRecv
298 else:
299 packer_cls = StructSerializerSend
300
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200301 return UDPTransport(receiver, ip=ip, port=int(port),
koder aka kdanilovafd98742015-04-24 01:27:22 +0300302 packer_cls=packer_cls)
koder aka kdanilov2c473092015-03-29 17:12:13 +0300303 elif parsed_uri.scheme == 'file':
304 return FileTransport(receiver, parsed_uri.path)
koder aka kdanilov416b87a2015-05-12 00:26:04 +0300305 elif parsed_uri.scheme == 'csvfile':
306 return CSVFileTransport(receiver, parsed_uri.path)
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +0300307 elif parsed_uri.scheme == 'ram':
308 intenal_recv = CSVFileTransport(receiver, parsed_uri.path)
309 return RAMTransport(intenal_recv)
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200310 else:
311 templ = "Can't instantiate transport from {0!r}"
312 raise ValueError(templ.format(uri))