koder aka kdanilov | 2c47309 | 2015-03-29 17:12:13 +0300 | [diff] [blame] | 1 | import sys |
koder aka kdanilov | 416b87a | 2015-05-12 00:26:04 +0300 | [diff] [blame] | 2 | import csv |
Ved-vampir | 0c7e2d4 | 2015-03-18 17:18:47 +0300 | [diff] [blame] | 3 | import time |
Alyona Kiseleva | 7f6de4f | 2015-04-21 01:04:20 +0300 | [diff] [blame] | 4 | import struct |
koder aka kdanilov | dda86d3 | 2015-03-16 11:20:04 +0200 | [diff] [blame] | 5 | import socket |
| 6 | import select |
| 7 | import cPickle as pickle |
| 8 | from urlparse import urlparse |
| 9 | |
| 10 | |
| 11 | class Timeout(Exception): |
| 12 | pass |
| 13 | |
| 14 | |
koder aka kdanilov | afd9874 | 2015-04-24 01:27:22 +0300 | [diff] [blame] | 15 | class CantUnpack(Exception): |
| 16 | pass |
| 17 | |
| 18 | |
koder aka kdanilov | dda86d3 | 2015-03-16 11:20:04 +0200 | [diff] [blame] | 19 | # ------------------------------------- Serializers -------------------------- |
| 20 | |
| 21 | |
| 22 | class ISensortResultsSerializer(object): |
| 23 | def pack(self, data): |
| 24 | pass |
| 25 | |
| 26 | def unpack(self, data): |
| 27 | pass |
| 28 | |
| 29 | |
koder aka kdanilov | afd9874 | 2015-04-24 01:27:22 +0300 | [diff] [blame] | 30 | class StructSerializerSend(ISensortResultsSerializer): |
| 31 | initial_times = 5 |
| 32 | resend_timeout = 60 |
| 33 | HEADERS = 'h' |
| 34 | DATA = 'd' |
| 35 | END_OF_HEADERS = '\x00' |
| 36 | END_OF_SOURCE_ID = '\x00' |
| 37 | HEADERS_SEPARATOR = '\n' |
Alyona Kiseleva | 7f6de4f | 2015-04-21 01:04:20 +0300 | [diff] [blame] | 38 | |
| 39 | def __init__(self): |
koder aka kdanilov | afd9874 | 2015-04-24 01:27:22 +0300 | [diff] [blame] | 40 | self.field_order = None |
| 41 | self.headers_send_cycles_left = self.initial_times |
| 42 | self.pack_fmt = None |
| 43 | self.next_header_send_time = None |
Alyona Kiseleva | 7f6de4f | 2015-04-21 01:04:20 +0300 | [diff] [blame] | 44 | |
| 45 | def pack(self, data): |
koder aka kdanilov | afd9874 | 2015-04-24 01:27:22 +0300 | [diff] [blame] | 46 | data = data.copy() |
Alyona Kiseleva | 7f6de4f | 2015-04-21 01:04:20 +0300 | [diff] [blame] | 47 | |
koder aka kdanilov | afd9874 | 2015-04-24 01:27:22 +0300 | [diff] [blame] | 48 | source_id = data.pop("source_id") |
| 49 | vals = [int(data.pop("time").value)] |
Alyona Kiseleva | 7f6de4f | 2015-04-21 01:04:20 +0300 | [diff] [blame] | 50 | |
koder aka kdanilov | afd9874 | 2015-04-24 01:27:22 +0300 | [diff] [blame] | 51 | if self.field_order is None: |
| 52 | self.field_order = sorted(data.keys()) |
| 53 | self.pack_fmt = "!I" + "I" * len(self.field_order) |
Alyona Kiseleva | 7f6de4f | 2015-04-21 01:04:20 +0300 | [diff] [blame] | 54 | |
koder aka kdanilov | afd9874 | 2015-04-24 01:27:22 +0300 | [diff] [blame] | 55 | need_resend = False |
| 56 | if self.next_header_send_time is not None: |
| 57 | if time.time() > self.next_header_send_time: |
| 58 | need_resend = True |
| 59 | |
| 60 | if self.headers_send_cycles_left > 0 or need_resend: |
| 61 | forder = self.HEADERS_SEPARATOR.join(self.field_order) |
| 62 | flen = struct.pack("!H", len(self.field_order)) |
| 63 | |
| 64 | result = (self.HEADERS + source_id + |
| 65 | self.END_OF_SOURCE_ID + |
koder aka kdanilov | f86d7af | 2015-05-06 04:01:54 +0300 | [diff] [blame] | 66 | socket.gethostname() + |
| 67 | self.END_OF_SOURCE_ID + |
koder aka kdanilov | afd9874 | 2015-04-24 01:27:22 +0300 | [diff] [blame] | 68 | flen + forder + self.END_OF_HEADERS) |
| 69 | |
| 70 | if self.headers_send_cycles_left > 0: |
| 71 | self.headers_send_cycles_left -= 1 |
| 72 | |
| 73 | self.next_header_send_time = time.time() + self.resend_timeout |
Alyona Kiseleva | 7f6de4f | 2015-04-21 01:04:20 +0300 | [diff] [blame] | 74 | else: |
koder aka kdanilov | afd9874 | 2015-04-24 01:27:22 +0300 | [diff] [blame] | 75 | result = "" |
| 76 | |
| 77 | for name in self.field_order: |
| 78 | vals.append(int(data[name].value)) |
| 79 | |
| 80 | packed_data = self.DATA + source_id |
| 81 | packed_data += self.END_OF_SOURCE_ID |
| 82 | packed_data += struct.pack(self.pack_fmt, *vals) |
| 83 | |
| 84 | return result + packed_data |
| 85 | |
| 86 | |
| 87 | class StructSerializerRecv(ISensortResultsSerializer): |
| 88 | def __init__(self): |
| 89 | self.fields = {} |
| 90 | self.formats = {} |
koder aka kdanilov | f86d7af | 2015-05-06 04:01:54 +0300 | [diff] [blame] | 91 | self.hostnames = {} |
Alyona Kiseleva | 7f6de4f | 2015-04-21 01:04:20 +0300 | [diff] [blame] | 92 | |
| 93 | def unpack(self, data): |
| 94 | code = data[0] |
Alyona Kiseleva | 7f6de4f | 2015-04-21 01:04:20 +0300 | [diff] [blame] | 95 | |
koder aka kdanilov | afd9874 | 2015-04-24 01:27:22 +0300 | [diff] [blame] | 96 | if code == StructSerializerSend.HEADERS: |
koder aka kdanilov | f86d7af | 2015-05-06 04:01:54 +0300 | [diff] [blame] | 97 | source_id, hostname, packed_data = data[1:].split( |
| 98 | StructSerializerSend.END_OF_SOURCE_ID, 2) |
Alyona Kiseleva | 7f6de4f | 2015-04-21 01:04:20 +0300 | [diff] [blame] | 99 | # fields order provided |
koder aka kdanilov | afd9874 | 2015-04-24 01:27:22 +0300 | [diff] [blame] | 100 | flen_sz = struct.calcsize("!H") |
| 101 | flen = struct.unpack("!H", packed_data[:flen_sz])[0] |
Alyona Kiseleva | 7f6de4f | 2015-04-21 01:04:20 +0300 | [diff] [blame] | 102 | |
koder aka kdanilov | afd9874 | 2015-04-24 01:27:22 +0300 | [diff] [blame] | 103 | headers_data, rest = packed_data[flen_sz:].split( |
| 104 | StructSerializerSend.END_OF_HEADERS, 1) |
| 105 | |
| 106 | forder = headers_data.split( |
| 107 | StructSerializerSend.HEADERS_SEPARATOR) |
| 108 | |
| 109 | assert len(forder) == flen, \ |
| 110 | "Wrong len {0} != {1}".format(len(forder), flen) |
| 111 | |
| 112 | if 'source_id' in self.fields: |
| 113 | assert self.fields[source_id] == ['time'] + forder,\ |
| 114 | "New field order" |
| 115 | else: |
| 116 | self.fields[source_id] = ['time'] + forder |
| 117 | self.formats[source_id] = "!I" + "I" * flen |
koder aka kdanilov | f86d7af | 2015-05-06 04:01:54 +0300 | [diff] [blame] | 118 | self.hostnames[source_id] = hostname |
koder aka kdanilov | afd9874 | 2015-04-24 01:27:22 +0300 | [diff] [blame] | 119 | |
| 120 | if len(rest) != 0: |
| 121 | return self.unpack(rest) |
| 122 | return None |
Alyona Kiseleva | 7f6de4f | 2015-04-21 01:04:20 +0300 | [diff] [blame] | 123 | else: |
koder aka kdanilov | f86d7af | 2015-05-06 04:01:54 +0300 | [diff] [blame] | 124 | source_id, packed_data = data[1:].split( |
| 125 | StructSerializerSend.END_OF_SOURCE_ID, 1) |
koder aka kdanilov | afd9874 | 2015-04-24 01:27:22 +0300 | [diff] [blame] | 126 | assert code == StructSerializerSend.DATA,\ |
| 127 | "Unknown code {0!r}".format(code) |
Alyona Kiseleva | 7f6de4f | 2015-04-21 01:04:20 +0300 | [diff] [blame] | 128 | |
koder aka kdanilov | afd9874 | 2015-04-24 01:27:22 +0300 | [diff] [blame] | 129 | try: |
| 130 | fields = self.fields[source_id] |
| 131 | except KeyError: |
| 132 | raise CantUnpack("No fields order provided" |
| 133 | " for {0} yet".format(source_id)) |
| 134 | s_format = self.formats[source_id] |
Alyona Kiseleva | 7f6de4f | 2015-04-21 01:04:20 +0300 | [diff] [blame] | 135 | |
koder aka kdanilov | afd9874 | 2015-04-24 01:27:22 +0300 | [diff] [blame] | 136 | exp_size = struct.calcsize(s_format) |
| 137 | assert len(packed_data) == exp_size, \ |
| 138 | "Wrong data len {0} != {1}".format(len(packed_data), exp_size) |
| 139 | |
| 140 | vals = struct.unpack(s_format, packed_data) |
| 141 | res = dict(zip(fields, vals)) |
| 142 | res['source_id'] = source_id |
koder aka kdanilov | f86d7af | 2015-05-06 04:01:54 +0300 | [diff] [blame] | 143 | res['hostname'] = self.hostnames[source_id] |
koder aka kdanilov | afd9874 | 2015-04-24 01:27:22 +0300 | [diff] [blame] | 144 | return res |
Alyona Kiseleva | 7f6de4f | 2015-04-21 01:04:20 +0300 | [diff] [blame] | 145 | |
| 146 | |
koder aka kdanilov | dda86d3 | 2015-03-16 11:20:04 +0200 | [diff] [blame] | 147 | class PickleSerializer(ISensortResultsSerializer): |
| 148 | def pack(self, data): |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 149 | ndata = {} |
| 150 | for key, val in data.items(): |
| 151 | if isinstance(val, basestring): |
| 152 | ndata[key] = val |
| 153 | else: |
| 154 | ndata[key] = val.value |
koder aka kdanilov | dda86d3 | 2015-03-16 11:20:04 +0200 | [diff] [blame] | 155 | return pickle.dumps(ndata) |
| 156 | |
| 157 | def unpack(self, data): |
| 158 | return pickle.loads(data) |
| 159 | |
koder aka kdanilov | dda86d3 | 2015-03-16 11:20:04 +0200 | [diff] [blame] | 160 | # ------------------------------------- Transports --------------------------- |
| 161 | |
koder aka kdanilov | 2c47309 | 2015-03-29 17:12:13 +0300 | [diff] [blame] | 162 | |
koder aka kdanilov | dda86d3 | 2015-03-16 11:20:04 +0200 | [diff] [blame] | 163 | class ITransport(object): |
| 164 | def __init__(self, receiver): |
| 165 | pass |
| 166 | |
| 167 | def send(self, data): |
| 168 | pass |
| 169 | |
| 170 | def recv(self, timeout=None): |
| 171 | pass |
| 172 | |
| 173 | |
| 174 | class StdoutTransport(ITransport): |
| 175 | MIN_COL_WIDTH = 10 |
| 176 | |
| 177 | def __init__(self, receiver, delta=True): |
| 178 | if receiver: |
koder aka kdanilov | 2c47309 | 2015-03-29 17:12:13 +0300 | [diff] [blame] | 179 | cname = self.__class__.__name__ |
| 180 | raise ValueError("{0} don't allows receiving".format(cname)) |
koder aka kdanilov | dda86d3 | 2015-03-16 11:20:04 +0200 | [diff] [blame] | 181 | |
| 182 | self.headers = None |
| 183 | self.line_format = "" |
| 184 | self.prev = {} |
| 185 | self.delta = delta |
koder aka kdanilov | 2c47309 | 2015-03-29 17:12:13 +0300 | [diff] [blame] | 186 | self.fd = sys.stdout |
koder aka kdanilov | dda86d3 | 2015-03-16 11:20:04 +0200 | [diff] [blame] | 187 | |
| 188 | def send(self, data): |
| 189 | if self.headers is None: |
| 190 | self.headers = sorted(data) |
koder aka kdanilov | 4af1c1d | 2015-05-18 15:48:58 +0300 | [diff] [blame] | 191 | self.headers.remove('source_id') |
koder aka kdanilov | dda86d3 | 2015-03-16 11:20:04 +0200 | [diff] [blame] | 192 | |
| 193 | for pos, header in enumerate(self.headers): |
| 194 | self.line_format += "{%s:>%s}" % (pos, |
| 195 | max(len(header) + 1, |
| 196 | self.MIN_COL_WIDTH)) |
| 197 | |
| 198 | print self.line_format.format(*self.headers) |
| 199 | |
| 200 | if self.delta: |
koder aka kdanilov | 4af1c1d | 2015-05-18 15:48:58 +0300 | [diff] [blame] | 201 | |
koder aka kdanilov | dda86d3 | 2015-03-16 11:20:04 +0200 | [diff] [blame] | 202 | vals = [data[header].value - self.prev.get(header, 0) |
| 203 | for header in self.headers] |
| 204 | |
koder aka kdanilov | 6b1341a | 2015-04-21 22:44:21 +0300 | [diff] [blame] | 205 | self.prev.update(dict((header, data[header].value) |
| 206 | for header in self.headers)) |
koder aka kdanilov | dda86d3 | 2015-03-16 11:20:04 +0200 | [diff] [blame] | 207 | else: |
| 208 | vals = [data[header].value for header in self.headers] |
| 209 | |
koder aka kdanilov | 2c47309 | 2015-03-29 17:12:13 +0300 | [diff] [blame] | 210 | self.fd.write(self.line_format.format(*vals) + "\n") |
koder aka kdanilov | dda86d3 | 2015-03-16 11:20:04 +0200 | [diff] [blame] | 211 | |
| 212 | def recv(self, timeout=None): |
koder aka kdanilov | 2c47309 | 2015-03-29 17:12:13 +0300 | [diff] [blame] | 213 | cname = self.__class__.__name__ |
| 214 | raise ValueError("{0} don't allows receiving".format(cname)) |
| 215 | |
| 216 | |
| 217 | class FileTransport(StdoutTransport): |
| 218 | def __init__(self, receiver, fname, delta=True): |
| 219 | StdoutTransport.__init__(self, receiver, delta) |
| 220 | self.fd = open(fname, "w") |
koder aka kdanilov | dda86d3 | 2015-03-16 11:20:04 +0200 | [diff] [blame] | 221 | |
| 222 | |
koder aka kdanilov | 416b87a | 2015-05-12 00:26:04 +0300 | [diff] [blame] | 223 | class CSVFileTransport(ITransport): |
koder aka kdanilov | 4af1c1d | 2015-05-18 15:48:58 +0300 | [diff] [blame] | 224 | required_keys = set(['time', 'source_id']) |
koder aka kdanilov | 416b87a | 2015-05-12 00:26:04 +0300 | [diff] [blame] | 225 | |
| 226 | def __init__(self, receiver, fname): |
| 227 | ITransport.__init__(self, receiver) |
| 228 | self.fd = open(fname, "w") |
| 229 | self.csv_fd = csv.writer(self.fd) |
| 230 | self.field_list = [] |
| 231 | self.csv_fd.writerow(['NEW_DATA']) |
| 232 | |
| 233 | def send(self, data): |
| 234 | if self.field_list == []: |
| 235 | keys = set(data) |
| 236 | assert self.required_keys.issubset(keys) |
| 237 | keys -= self.required_keys |
| 238 | self.field_list = sorted(keys) |
koder aka kdanilov | 4af1c1d | 2015-05-18 15:48:58 +0300 | [diff] [blame] | 239 | self.csv_fd.writerow([data['source_id'], socket.getfqdn()] + |
koder aka kdanilov | 416b87a | 2015-05-12 00:26:04 +0300 | [diff] [blame] | 240 | self.field_list) |
koder aka kdanilov | 4af1c1d | 2015-05-18 15:48:58 +0300 | [diff] [blame] | 241 | self.field_list = ['time'] + self.field_list |
koder aka kdanilov | 416b87a | 2015-05-12 00:26:04 +0300 | [diff] [blame] | 242 | |
koder aka kdanilov | 4af1c1d | 2015-05-18 15:48:58 +0300 | [diff] [blame] | 243 | self.csv_fd.writerow([data[sens].value for sens in self.field_list]) |
| 244 | |
| 245 | |
| 246 | class RAMTransport(ITransport): |
| 247 | def __init__(self, next_tr): |
| 248 | self.next = next_tr |
| 249 | self.data = [] |
| 250 | |
| 251 | def send(self, data): |
| 252 | self.data.append(data) |
| 253 | |
| 254 | def flush(self): |
| 255 | for data in self.data: |
| 256 | self.next.send(data) |
| 257 | self.data = [] |
koder aka kdanilov | 416b87a | 2015-05-12 00:26:04 +0300 | [diff] [blame] | 258 | |
| 259 | |
koder aka kdanilov | dda86d3 | 2015-03-16 11:20:04 +0200 | [diff] [blame] | 260 | class UDPTransport(ITransport): |
| 261 | def __init__(self, receiver, ip, port, packer_cls): |
| 262 | self.port = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
| 263 | if receiver: |
| 264 | self.port.bind((ip, port)) |
| 265 | self.packer_cls = packer_cls |
| 266 | self.packers = {} |
| 267 | else: |
| 268 | self.packer = packer_cls() |
| 269 | self.dst = (ip, port) |
| 270 | |
| 271 | def send(self, data): |
| 272 | raw_data = self.packer.pack(data) |
| 273 | self.port.sendto(raw_data, self.dst) |
| 274 | |
| 275 | def recv(self, timeout=None): |
| 276 | r, _, _ = select.select([self.port], [], [], timeout) |
| 277 | if len(r) != 0: |
| 278 | raw_data, addr = self.port.recvfrom(10000) |
| 279 | packer = self.packers.setdefault(addr, self.packer_cls()) |
| 280 | return addr, packer.unpack(raw_data) |
| 281 | else: |
| 282 | raise Timeout() |
| 283 | |
| 284 | |
| 285 | # -------------------------- Factory function -------------------------------- |
| 286 | |
| 287 | |
| 288 | def create_protocol(uri, receiver=False): |
koder aka kdanilov | 4af1c1d | 2015-05-18 15:48:58 +0300 | [diff] [blame] | 289 | if uri == 'stdout': |
koder aka kdanilov | dda86d3 | 2015-03-16 11:20:04 +0200 | [diff] [blame] | 290 | return StdoutTransport(receiver) |
koder aka kdanilov | 4af1c1d | 2015-05-18 15:48:58 +0300 | [diff] [blame] | 291 | |
| 292 | parsed_uri = urlparse(uri) |
| 293 | if parsed_uri.scheme == 'udp': |
koder aka kdanilov | dda86d3 | 2015-03-16 11:20:04 +0200 | [diff] [blame] | 294 | ip, port = parsed_uri.netloc.split(":") |
koder aka kdanilov | afd9874 | 2015-04-24 01:27:22 +0300 | [diff] [blame] | 295 | |
| 296 | if receiver: |
| 297 | packer_cls = StructSerializerRecv |
| 298 | else: |
| 299 | packer_cls = StructSerializerSend |
| 300 | |
koder aka kdanilov | dda86d3 | 2015-03-16 11:20:04 +0200 | [diff] [blame] | 301 | return UDPTransport(receiver, ip=ip, port=int(port), |
koder aka kdanilov | afd9874 | 2015-04-24 01:27:22 +0300 | [diff] [blame] | 302 | packer_cls=packer_cls) |
koder aka kdanilov | 2c47309 | 2015-03-29 17:12:13 +0300 | [diff] [blame] | 303 | elif parsed_uri.scheme == 'file': |
| 304 | return FileTransport(receiver, parsed_uri.path) |
koder aka kdanilov | 416b87a | 2015-05-12 00:26:04 +0300 | [diff] [blame] | 305 | elif parsed_uri.scheme == 'csvfile': |
| 306 | return CSVFileTransport(receiver, parsed_uri.path) |
koder aka kdanilov | 4af1c1d | 2015-05-18 15:48:58 +0300 | [diff] [blame] | 307 | elif parsed_uri.scheme == 'ram': |
| 308 | intenal_recv = CSVFileTransport(receiver, parsed_uri.path) |
| 309 | return RAMTransport(intenal_recv) |
koder aka kdanilov | dda86d3 | 2015-03-16 11:20:04 +0200 | [diff] [blame] | 310 | else: |
| 311 | templ = "Can't instantiate transport from {0!r}" |
| 312 | raise ValueError(templ.format(uri)) |