blob: c0530117c6c4110a51eda21be92c9ae1fb48e618 [file] [log] [blame]
koder aka kdanilov2c473092015-03-29 17:12:13 +03001import sys
koder aka kdanilov416b87a2015-05-12 00:26:04 +03002import csv
Ved-vampir0c7e2d42015-03-18 17:18:47 +03003import time
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +03004import struct
koder aka kdanilovdda86d32015-03-16 11:20:04 +02005import socket
6import select
7import cPickle as pickle
8from urlparse import urlparse
9
10
11class Timeout(Exception):
12 pass
13
14
koder aka kdanilovafd98742015-04-24 01:27:22 +030015class CantUnpack(Exception):
16 pass
17
18
koder aka kdanilovdda86d32015-03-16 11:20:04 +020019# ------------------------------------- Serializers --------------------------
20
21
22class ISensortResultsSerializer(object):
23 def pack(self, data):
24 pass
25
26 def unpack(self, data):
27 pass
28
29
koder aka kdanilovafd98742015-04-24 01:27:22 +030030class StructSerializerSend(ISensortResultsSerializer):
31 initial_times = 5
32 resend_timeout = 60
33 HEADERS = 'h'
34 DATA = 'd'
35 END_OF_HEADERS = '\x00'
36 END_OF_SOURCE_ID = '\x00'
37 HEADERS_SEPARATOR = '\n'
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +030038
39 def __init__(self):
koder aka kdanilovafd98742015-04-24 01:27:22 +030040 self.field_order = None
41 self.headers_send_cycles_left = self.initial_times
42 self.pack_fmt = None
43 self.next_header_send_time = None
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +030044
45 def pack(self, data):
koder aka kdanilovafd98742015-04-24 01:27:22 +030046 data = data.copy()
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +030047
koder aka kdanilovafd98742015-04-24 01:27:22 +030048 source_id = data.pop("source_id")
49 vals = [int(data.pop("time").value)]
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +030050
koder aka kdanilovafd98742015-04-24 01:27:22 +030051 if self.field_order is None:
52 self.field_order = sorted(data.keys())
53 self.pack_fmt = "!I" + "I" * len(self.field_order)
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +030054
koder aka kdanilovafd98742015-04-24 01:27:22 +030055 need_resend = False
56 if self.next_header_send_time is not None:
57 if time.time() > self.next_header_send_time:
58 need_resend = True
59
60 if self.headers_send_cycles_left > 0 or need_resend:
61 forder = self.HEADERS_SEPARATOR.join(self.field_order)
62 flen = struct.pack("!H", len(self.field_order))
63
64 result = (self.HEADERS + source_id +
65 self.END_OF_SOURCE_ID +
koder aka kdanilovf86d7af2015-05-06 04:01:54 +030066 socket.gethostname() +
67 self.END_OF_SOURCE_ID +
koder aka kdanilovafd98742015-04-24 01:27:22 +030068 flen + forder + self.END_OF_HEADERS)
69
70 if self.headers_send_cycles_left > 0:
71 self.headers_send_cycles_left -= 1
72
73 self.next_header_send_time = time.time() + self.resend_timeout
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +030074 else:
koder aka kdanilovafd98742015-04-24 01:27:22 +030075 result = ""
76
77 for name in self.field_order:
78 vals.append(int(data[name].value))
79
80 packed_data = self.DATA + source_id
81 packed_data += self.END_OF_SOURCE_ID
82 packed_data += struct.pack(self.pack_fmt, *vals)
83
84 return result + packed_data
85
86
87class StructSerializerRecv(ISensortResultsSerializer):
88 def __init__(self):
89 self.fields = {}
90 self.formats = {}
koder aka kdanilovf86d7af2015-05-06 04:01:54 +030091 self.hostnames = {}
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +030092
93 def unpack(self, data):
94 code = data[0]
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +030095
koder aka kdanilovafd98742015-04-24 01:27:22 +030096 if code == StructSerializerSend.HEADERS:
koder aka kdanilovf86d7af2015-05-06 04:01:54 +030097 source_id, hostname, packed_data = data[1:].split(
98 StructSerializerSend.END_OF_SOURCE_ID, 2)
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +030099 # fields order provided
koder aka kdanilovafd98742015-04-24 01:27:22 +0300100 flen_sz = struct.calcsize("!H")
101 flen = struct.unpack("!H", packed_data[:flen_sz])[0]
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +0300102
koder aka kdanilovafd98742015-04-24 01:27:22 +0300103 headers_data, rest = packed_data[flen_sz:].split(
104 StructSerializerSend.END_OF_HEADERS, 1)
105
106 forder = headers_data.split(
107 StructSerializerSend.HEADERS_SEPARATOR)
108
109 assert len(forder) == flen, \
110 "Wrong len {0} != {1}".format(len(forder), flen)
111
112 if 'source_id' in self.fields:
113 assert self.fields[source_id] == ['time'] + forder,\
114 "New field order"
115 else:
116 self.fields[source_id] = ['time'] + forder
117 self.formats[source_id] = "!I" + "I" * flen
koder aka kdanilovf86d7af2015-05-06 04:01:54 +0300118 self.hostnames[source_id] = hostname
koder aka kdanilovafd98742015-04-24 01:27:22 +0300119
120 if len(rest) != 0:
121 return self.unpack(rest)
122 return None
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +0300123 else:
koder aka kdanilovf86d7af2015-05-06 04:01:54 +0300124 source_id, packed_data = data[1:].split(
125 StructSerializerSend.END_OF_SOURCE_ID, 1)
koder aka kdanilovafd98742015-04-24 01:27:22 +0300126 assert code == StructSerializerSend.DATA,\
127 "Unknown code {0!r}".format(code)
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +0300128
koder aka kdanilovafd98742015-04-24 01:27:22 +0300129 try:
130 fields = self.fields[source_id]
131 except KeyError:
132 raise CantUnpack("No fields order provided"
133 " for {0} yet".format(source_id))
134 s_format = self.formats[source_id]
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +0300135
koder aka kdanilovafd98742015-04-24 01:27:22 +0300136 exp_size = struct.calcsize(s_format)
137 assert len(packed_data) == exp_size, \
138 "Wrong data len {0} != {1}".format(len(packed_data), exp_size)
139
140 vals = struct.unpack(s_format, packed_data)
141 res = dict(zip(fields, vals))
142 res['source_id'] = source_id
koder aka kdanilovf86d7af2015-05-06 04:01:54 +0300143 res['hostname'] = self.hostnames[source_id]
koder aka kdanilovafd98742015-04-24 01:27:22 +0300144 return res
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +0300145
146
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200147class PickleSerializer(ISensortResultsSerializer):
148 def pack(self, data):
koder aka kdanilov168f6092015-04-19 02:33:38 +0300149 ndata = {}
150 for key, val in data.items():
151 if isinstance(val, basestring):
152 ndata[key] = val
153 else:
154 ndata[key] = val.value
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200155 return pickle.dumps(ndata)
156
157 def unpack(self, data):
158 return pickle.loads(data)
159
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200160# ------------------------------------- Transports ---------------------------
161
koder aka kdanilov2c473092015-03-29 17:12:13 +0300162
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200163class ITransport(object):
164 def __init__(self, receiver):
165 pass
166
167 def send(self, data):
168 pass
169
170 def recv(self, timeout=None):
171 pass
172
173
174class StdoutTransport(ITransport):
175 MIN_COL_WIDTH = 10
176
177 def __init__(self, receiver, delta=True):
178 if receiver:
koder aka kdanilov2c473092015-03-29 17:12:13 +0300179 cname = self.__class__.__name__
180 raise ValueError("{0} don't allows receiving".format(cname))
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200181
182 self.headers = None
183 self.line_format = ""
184 self.prev = {}
185 self.delta = delta
koder aka kdanilov2c473092015-03-29 17:12:13 +0300186 self.fd = sys.stdout
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200187
188 def send(self, data):
189 if self.headers is None:
190 self.headers = sorted(data)
191
192 for pos, header in enumerate(self.headers):
193 self.line_format += "{%s:>%s}" % (pos,
194 max(len(header) + 1,
195 self.MIN_COL_WIDTH))
196
197 print self.line_format.format(*self.headers)
198
199 if self.delta:
200 vals = [data[header].value - self.prev.get(header, 0)
201 for header in self.headers]
202
koder aka kdanilov6b1341a2015-04-21 22:44:21 +0300203 self.prev.update(dict((header, data[header].value)
204 for header in self.headers))
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200205 else:
206 vals = [data[header].value for header in self.headers]
207
koder aka kdanilov2c473092015-03-29 17:12:13 +0300208 self.fd.write(self.line_format.format(*vals) + "\n")
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200209
210 def recv(self, timeout=None):
koder aka kdanilov2c473092015-03-29 17:12:13 +0300211 cname = self.__class__.__name__
212 raise ValueError("{0} don't allows receiving".format(cname))
213
214
215class FileTransport(StdoutTransport):
216 def __init__(self, receiver, fname, delta=True):
217 StdoutTransport.__init__(self, receiver, delta)
218 self.fd = open(fname, "w")
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200219
220
koder aka kdanilov416b87a2015-05-12 00:26:04 +0300221class CSVFileTransport(ITransport):
222 required_keys = set(['time', 'source_id', 'hostname'])
223
224 def __init__(self, receiver, fname):
225 ITransport.__init__(self, receiver)
226 self.fd = open(fname, "w")
227 self.csv_fd = csv.writer(self.fd)
228 self.field_list = []
229 self.csv_fd.writerow(['NEW_DATA'])
230
231 def send(self, data):
232 if self.field_list == []:
233 keys = set(data)
234 assert self.required_keys.issubset(keys)
235 keys -= self.required_keys
236 self.field_list = sorted(keys)
237 self.csv_fd.writerow([data['source_id'], data['hostname']] +
238 self.field_list)
239
240 self.csv_fd.writerow(map(data.__getitem__, ['time'] + self.field_list))
241
242
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200243class UDPTransport(ITransport):
244 def __init__(self, receiver, ip, port, packer_cls):
245 self.port = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
246 if receiver:
247 self.port.bind((ip, port))
248 self.packer_cls = packer_cls
249 self.packers = {}
250 else:
251 self.packer = packer_cls()
252 self.dst = (ip, port)
253
254 def send(self, data):
255 raw_data = self.packer.pack(data)
256 self.port.sendto(raw_data, self.dst)
257
258 def recv(self, timeout=None):
259 r, _, _ = select.select([self.port], [], [], timeout)
260 if len(r) != 0:
261 raw_data, addr = self.port.recvfrom(10000)
262 packer = self.packers.setdefault(addr, self.packer_cls())
263 return addr, packer.unpack(raw_data)
264 else:
265 raise Timeout()
266
267
268# -------------------------- Factory function --------------------------------
269
270
271def create_protocol(uri, receiver=False):
272 parsed_uri = urlparse(uri)
273 if parsed_uri.scheme == 'stdout':
274 return StdoutTransport(receiver)
275 elif parsed_uri.scheme == 'udp':
276 ip, port = parsed_uri.netloc.split(":")
koder aka kdanilovafd98742015-04-24 01:27:22 +0300277
278 if receiver:
279 packer_cls = StructSerializerRecv
280 else:
281 packer_cls = StructSerializerSend
282
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200283 return UDPTransport(receiver, ip=ip, port=int(port),
koder aka kdanilovafd98742015-04-24 01:27:22 +0300284 packer_cls=packer_cls)
koder aka kdanilov2c473092015-03-29 17:12:13 +0300285 elif parsed_uri.scheme == 'file':
286 return FileTransport(receiver, parsed_uri.path)
koder aka kdanilov416b87a2015-05-12 00:26:04 +0300287 elif parsed_uri.scheme == 'csvfile':
288 return CSVFileTransport(receiver, parsed_uri.path)
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200289 else:
290 templ = "Can't instantiate transport from {0!r}"
291 raise ValueError(templ.format(uri))