blob: 9fd1a841b113dec81f2a09c1e382dad805e6032c [file] [log] [blame]
koder aka kdanilov2c473092015-03-29 17:12:13 +03001import sys
Ved-vampir0c7e2d42015-03-18 17:18:47 +03002import time
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +03003import struct
koder aka kdanilovdda86d32015-03-16 11:20:04 +02004import socket
5import select
6import cPickle as pickle
7from urlparse import urlparse
8
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +03009from . import cp_transport
Ved-vampir0c7e2d42015-03-18 17:18:47 +030010
koder aka kdanilovdda86d32015-03-16 11:20:04 +020011
12class Timeout(Exception):
13 pass
14
15
16# ------------------------------------- Serializers --------------------------
17
18
19class ISensortResultsSerializer(object):
20 def pack(self, data):
21 pass
22
23 def unpack(self, data):
24 pass
25
26
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +030027class StructSerializer(ISensortResultsSerializer):
28 class LocalConfig(object):
29 def __init__(self):
30 self.last_format_sent = -1
31 self.initial_sent = False
32 self.initial_times = 5
33 self.field_order = None
34
35 def __init__(self):
36 self.configs = {}
37
38 def pack(self, data):
39 OLD_FORMAT = 5
40 source_id = data["source_id"]
41 config = self.configs.setdefault(source_id,
42 StructSerializer.LocalConfig())
43
44 if config.field_order is None or \
45 not config.initial_sent or \
46 time.time() - config.last_format_sent > OLD_FORMAT:
47 # send|resend format
48 field_order = sorted(data.keys())
49
50 config.field_order = field_order
51 config.last_format_sent = time.time()
52 if not config.initial_sent:
53 config.initial_times -= 1
54 config.initial_sent = (config.initial_times <= 0)
55
56 forder = "\n".join(field_order)
57 flen = struct.pack("!H", len(field_order))
58 return "\x00{0}\x00{1}{2}".format(source_id, flen, forder)
59 else:
60 # send data
61 # time will be first after source_id
62 vals = [data["time"]]
63 for name in config.field_order:
64 if name in data:
65 vals.append(data[name])
66 pack_fmt = "!" + ("I" * len(vals))
67 packed_data = struct.pack(pack_fmt, vals)
68 return "\x01{0}\x00{1}".format(source_id, packed_data)
69
70 def unpack(self, data):
71 code = data[0]
72 data = data[1:]
73 source_id, _, packed_data = data.partition("\x00")
74 config = self.configs.setdefault(source_id,
75 StructSerializer.LocalConfig())
76 unpacked_data = {"source_id":source_id}
77
78 if code == "\x00":
79 # fields order provided
80 flen = struct.unpack("!H", packed_data[:2])
81 forder = packed_data[2:].split("\n")
82 if len(forder) != flen:
83 return unpacked_data
84 config.field_order = forder
85 return unpacked_data
86
87 else:
88 # data provided
89 # try to find fields_order
90 if config.field_order is None:
91 raise ValueError("No fields order provided"
92 " for {0}, cannot unpack".format(source_id))
93
94 val_size = 4
95 if len(packed_data) % val_size != 0:
96 raise ValueError("Bad packet received"
97 " from {0}, cannot unpack".format(source_id))
98 datalen = len(packed_data) / val_size
99 pack_fmt = "!" + ("I" * datalen)
100 vals = struct.unpack(pack_fmt, packed_data)
101
102 unpacked_data['time'] = vals[0]
103 i = 1
104 for field in config.field_order:
105 data[field] = vals[i]
106 i += 1
107 return data
108
109
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200110class PickleSerializer(ISensortResultsSerializer):
111 def pack(self, data):
koder aka kdanilov168f6092015-04-19 02:33:38 +0300112 ndata = {}
113 for key, val in data.items():
114 if isinstance(val, basestring):
115 ndata[key] = val
116 else:
117 ndata[key] = val.value
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200118 return pickle.dumps(ndata)
119
120 def unpack(self, data):
121 return pickle.loads(data)
122
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200123# ------------------------------------- Transports ---------------------------
124
koder aka kdanilov2c473092015-03-29 17:12:13 +0300125
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200126class ITransport(object):
127 def __init__(self, receiver):
128 pass
129
130 def send(self, data):
131 pass
132
133 def recv(self, timeout=None):
134 pass
135
136
137class StdoutTransport(ITransport):
138 MIN_COL_WIDTH = 10
139
140 def __init__(self, receiver, delta=True):
141 if receiver:
koder aka kdanilov2c473092015-03-29 17:12:13 +0300142 cname = self.__class__.__name__
143 raise ValueError("{0} don't allows receiving".format(cname))
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200144
145 self.headers = None
146 self.line_format = ""
147 self.prev = {}
148 self.delta = delta
koder aka kdanilov2c473092015-03-29 17:12:13 +0300149 self.fd = sys.stdout
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200150
151 def send(self, data):
152 if self.headers is None:
153 self.headers = sorted(data)
154
155 for pos, header in enumerate(self.headers):
156 self.line_format += "{%s:>%s}" % (pos,
157 max(len(header) + 1,
158 self.MIN_COL_WIDTH))
159
160 print self.line_format.format(*self.headers)
161
162 if self.delta:
163 vals = [data[header].value - self.prev.get(header, 0)
164 for header in self.headers]
165
koder aka kdanilov6b1341a2015-04-21 22:44:21 +0300166 self.prev.update(dict((header, data[header].value)
167 for header in self.headers))
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200168 else:
169 vals = [data[header].value for header in self.headers]
170
koder aka kdanilov2c473092015-03-29 17:12:13 +0300171 self.fd.write(self.line_format.format(*vals) + "\n")
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200172
173 def recv(self, timeout=None):
koder aka kdanilov2c473092015-03-29 17:12:13 +0300174 cname = self.__class__.__name__
175 raise ValueError("{0} don't allows receiving".format(cname))
176
177
178class FileTransport(StdoutTransport):
179 def __init__(self, receiver, fname, delta=True):
180 StdoutTransport.__init__(self, receiver, delta)
181 self.fd = open(fname, "w")
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200182
183
184class UDPTransport(ITransport):
185 def __init__(self, receiver, ip, port, packer_cls):
186 self.port = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
187 if receiver:
188 self.port.bind((ip, port))
189 self.packer_cls = packer_cls
190 self.packers = {}
191 else:
192 self.packer = packer_cls()
193 self.dst = (ip, port)
194
195 def send(self, data):
196 raw_data = self.packer.pack(data)
197 self.port.sendto(raw_data, self.dst)
198
199 def recv(self, timeout=None):
200 r, _, _ = select.select([self.port], [], [], timeout)
201 if len(r) != 0:
202 raw_data, addr = self.port.recvfrom(10000)
203 packer = self.packers.setdefault(addr, self.packer_cls())
204 return addr, packer.unpack(raw_data)
205 else:
206 raise Timeout()
207
208
209# -------------------------- Factory function --------------------------------
210
211
212def create_protocol(uri, receiver=False):
213 parsed_uri = urlparse(uri)
214 if parsed_uri.scheme == 'stdout':
215 return StdoutTransport(receiver)
216 elif parsed_uri.scheme == 'udp':
217 ip, port = parsed_uri.netloc.split(":")
218 return UDPTransport(receiver, ip=ip, port=int(port),
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +0300219 packer_cls=StructSerializer)
koder aka kdanilov2c473092015-03-29 17:12:13 +0300220 elif parsed_uri.scheme == 'file':
221 return FileTransport(receiver, parsed_uri.path)
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200222 else:
223 templ = "Can't instantiate transport from {0!r}"
224 raise ValueError(templ.format(uri))