blob: 67aef2a1e66f8c2f90fc5f2ac7425ed3021359ba [file] [log] [blame]
koder aka kdanilov2c473092015-03-29 17:12:13 +03001import sys
Ved-vampir0c7e2d42015-03-18 17:18:47 +03002import time
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +03003import struct
koder aka kdanilovdda86d32015-03-16 11:20:04 +02004import socket
5import select
6import cPickle as pickle
7from urlparse import urlparse
8
9
10class Timeout(Exception):
11 pass
12
13
koder aka kdanilovafd98742015-04-24 01:27:22 +030014class CantUnpack(Exception):
15 pass
16
17
koder aka kdanilovdda86d32015-03-16 11:20:04 +020018# ------------------------------------- Serializers --------------------------
19
20
21class ISensortResultsSerializer(object):
22 def pack(self, data):
23 pass
24
25 def unpack(self, data):
26 pass
27
28
koder aka kdanilovafd98742015-04-24 01:27:22 +030029class StructSerializerSend(ISensortResultsSerializer):
30 initial_times = 5
31 resend_timeout = 60
32 HEADERS = 'h'
33 DATA = 'd'
34 END_OF_HEADERS = '\x00'
35 END_OF_SOURCE_ID = '\x00'
36 HEADERS_SEPARATOR = '\n'
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +030037
38 def __init__(self):
koder aka kdanilovafd98742015-04-24 01:27:22 +030039 self.field_order = None
40 self.headers_send_cycles_left = self.initial_times
41 self.pack_fmt = None
42 self.next_header_send_time = None
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +030043
44 def pack(self, data):
koder aka kdanilovafd98742015-04-24 01:27:22 +030045 data = data.copy()
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +030046
koder aka kdanilovafd98742015-04-24 01:27:22 +030047 source_id = data.pop("source_id")
48 vals = [int(data.pop("time").value)]
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +030049
koder aka kdanilovafd98742015-04-24 01:27:22 +030050 if self.field_order is None:
51 self.field_order = sorted(data.keys())
52 self.pack_fmt = "!I" + "I" * len(self.field_order)
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +030053
koder aka kdanilovafd98742015-04-24 01:27:22 +030054 need_resend = False
55 if self.next_header_send_time is not None:
56 if time.time() > self.next_header_send_time:
57 need_resend = True
58
59 if self.headers_send_cycles_left > 0 or need_resend:
60 forder = self.HEADERS_SEPARATOR.join(self.field_order)
61 flen = struct.pack("!H", len(self.field_order))
62
63 result = (self.HEADERS + source_id +
64 self.END_OF_SOURCE_ID +
koder aka kdanilovf86d7af2015-05-06 04:01:54 +030065 socket.gethostname() +
66 self.END_OF_SOURCE_ID +
koder aka kdanilovafd98742015-04-24 01:27:22 +030067 flen + forder + self.END_OF_HEADERS)
68
69 if self.headers_send_cycles_left > 0:
70 self.headers_send_cycles_left -= 1
71
72 self.next_header_send_time = time.time() + self.resend_timeout
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +030073 else:
koder aka kdanilovafd98742015-04-24 01:27:22 +030074 result = ""
75
76 for name in self.field_order:
77 vals.append(int(data[name].value))
78
79 packed_data = self.DATA + source_id
80 packed_data += self.END_OF_SOURCE_ID
81 packed_data += struct.pack(self.pack_fmt, *vals)
82
83 return result + packed_data
84
85
86class StructSerializerRecv(ISensortResultsSerializer):
87 def __init__(self):
88 self.fields = {}
89 self.formats = {}
koder aka kdanilovf86d7af2015-05-06 04:01:54 +030090 self.hostnames = {}
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +030091
92 def unpack(self, data):
93 code = data[0]
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +030094
koder aka kdanilovafd98742015-04-24 01:27:22 +030095 if code == StructSerializerSend.HEADERS:
koder aka kdanilovf86d7af2015-05-06 04:01:54 +030096 source_id, hostname, packed_data = data[1:].split(
97 StructSerializerSend.END_OF_SOURCE_ID, 2)
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +030098 # fields order provided
koder aka kdanilovafd98742015-04-24 01:27:22 +030099 flen_sz = struct.calcsize("!H")
100 flen = struct.unpack("!H", packed_data[:flen_sz])[0]
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +0300101
koder aka kdanilovafd98742015-04-24 01:27:22 +0300102 headers_data, rest = packed_data[flen_sz:].split(
103 StructSerializerSend.END_OF_HEADERS, 1)
104
105 forder = headers_data.split(
106 StructSerializerSend.HEADERS_SEPARATOR)
107
108 assert len(forder) == flen, \
109 "Wrong len {0} != {1}".format(len(forder), flen)
110
111 if 'source_id' in self.fields:
112 assert self.fields[source_id] == ['time'] + forder,\
113 "New field order"
114 else:
115 self.fields[source_id] = ['time'] + forder
116 self.formats[source_id] = "!I" + "I" * flen
koder aka kdanilovf86d7af2015-05-06 04:01:54 +0300117 self.hostnames[source_id] = hostname
koder aka kdanilovafd98742015-04-24 01:27:22 +0300118
119 if len(rest) != 0:
120 return self.unpack(rest)
121 return None
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +0300122 else:
koder aka kdanilovf86d7af2015-05-06 04:01:54 +0300123 source_id, packed_data = data[1:].split(
124 StructSerializerSend.END_OF_SOURCE_ID, 1)
koder aka kdanilovafd98742015-04-24 01:27:22 +0300125 assert code == StructSerializerSend.DATA,\
126 "Unknown code {0!r}".format(code)
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +0300127
koder aka kdanilovafd98742015-04-24 01:27:22 +0300128 try:
129 fields = self.fields[source_id]
130 except KeyError:
131 raise CantUnpack("No fields order provided"
132 " for {0} yet".format(source_id))
133 s_format = self.formats[source_id]
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +0300134
koder aka kdanilovafd98742015-04-24 01:27:22 +0300135 exp_size = struct.calcsize(s_format)
136 assert len(packed_data) == exp_size, \
137 "Wrong data len {0} != {1}".format(len(packed_data), exp_size)
138
139 vals = struct.unpack(s_format, packed_data)
140 res = dict(zip(fields, vals))
141 res['source_id'] = source_id
koder aka kdanilovf86d7af2015-05-06 04:01:54 +0300142 res['hostname'] = self.hostnames[source_id]
koder aka kdanilovafd98742015-04-24 01:27:22 +0300143 return res
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +0300144
145
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200146class PickleSerializer(ISensortResultsSerializer):
147 def pack(self, data):
koder aka kdanilov168f6092015-04-19 02:33:38 +0300148 ndata = {}
149 for key, val in data.items():
150 if isinstance(val, basestring):
151 ndata[key] = val
152 else:
153 ndata[key] = val.value
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200154 return pickle.dumps(ndata)
155
156 def unpack(self, data):
157 return pickle.loads(data)
158
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200159# ------------------------------------- Transports ---------------------------
160
koder aka kdanilov2c473092015-03-29 17:12:13 +0300161
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200162class ITransport(object):
163 def __init__(self, receiver):
164 pass
165
166 def send(self, data):
167 pass
168
169 def recv(self, timeout=None):
170 pass
171
172
173class StdoutTransport(ITransport):
174 MIN_COL_WIDTH = 10
175
176 def __init__(self, receiver, delta=True):
177 if receiver:
koder aka kdanilov2c473092015-03-29 17:12:13 +0300178 cname = self.__class__.__name__
179 raise ValueError("{0} don't allows receiving".format(cname))
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200180
181 self.headers = None
182 self.line_format = ""
183 self.prev = {}
184 self.delta = delta
koder aka kdanilov2c473092015-03-29 17:12:13 +0300185 self.fd = sys.stdout
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200186
187 def send(self, data):
188 if self.headers is None:
189 self.headers = sorted(data)
190
191 for pos, header in enumerate(self.headers):
192 self.line_format += "{%s:>%s}" % (pos,
193 max(len(header) + 1,
194 self.MIN_COL_WIDTH))
195
196 print self.line_format.format(*self.headers)
197
198 if self.delta:
199 vals = [data[header].value - self.prev.get(header, 0)
200 for header in self.headers]
201
koder aka kdanilov6b1341a2015-04-21 22:44:21 +0300202 self.prev.update(dict((header, data[header].value)
203 for header in self.headers))
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200204 else:
205 vals = [data[header].value for header in self.headers]
206
koder aka kdanilov2c473092015-03-29 17:12:13 +0300207 self.fd.write(self.line_format.format(*vals) + "\n")
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200208
209 def recv(self, timeout=None):
koder aka kdanilov2c473092015-03-29 17:12:13 +0300210 cname = self.__class__.__name__
211 raise ValueError("{0} don't allows receiving".format(cname))
212
213
214class FileTransport(StdoutTransport):
215 def __init__(self, receiver, fname, delta=True):
216 StdoutTransport.__init__(self, receiver, delta)
217 self.fd = open(fname, "w")
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200218
219
220class UDPTransport(ITransport):
221 def __init__(self, receiver, ip, port, packer_cls):
222 self.port = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
223 if receiver:
224 self.port.bind((ip, port))
225 self.packer_cls = packer_cls
226 self.packers = {}
227 else:
228 self.packer = packer_cls()
229 self.dst = (ip, port)
230
231 def send(self, data):
232 raw_data = self.packer.pack(data)
233 self.port.sendto(raw_data, self.dst)
234
235 def recv(self, timeout=None):
236 r, _, _ = select.select([self.port], [], [], timeout)
237 if len(r) != 0:
238 raw_data, addr = self.port.recvfrom(10000)
239 packer = self.packers.setdefault(addr, self.packer_cls())
240 return addr, packer.unpack(raw_data)
241 else:
242 raise Timeout()
243
244
245# -------------------------- Factory function --------------------------------
246
247
248def create_protocol(uri, receiver=False):
249 parsed_uri = urlparse(uri)
250 if parsed_uri.scheme == 'stdout':
251 return StdoutTransport(receiver)
252 elif parsed_uri.scheme == 'udp':
253 ip, port = parsed_uri.netloc.split(":")
koder aka kdanilovafd98742015-04-24 01:27:22 +0300254
255 if receiver:
256 packer_cls = StructSerializerRecv
257 else:
258 packer_cls = StructSerializerSend
259
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200260 return UDPTransport(receiver, ip=ip, port=int(port),
koder aka kdanilovafd98742015-04-24 01:27:22 +0300261 packer_cls=packer_cls)
koder aka kdanilov2c473092015-03-29 17:12:13 +0300262 elif parsed_uri.scheme == 'file':
263 return FileTransport(receiver, parsed_uri.path)
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200264 else:
265 templ = "Can't instantiate transport from {0!r}"
266 raise ValueError(templ.format(uri))