koder aka kdanilov | 2c47309 | 2015-03-29 17:12:13 +0300 | [diff] [blame] | 1 | import sys |
Ved-vampir | 0c7e2d4 | 2015-03-18 17:18:47 +0300 | [diff] [blame] | 2 | import time |
Alyona Kiseleva | 7f6de4f | 2015-04-21 01:04:20 +0300 | [diff] [blame] | 3 | import struct |
koder aka kdanilov | dda86d3 | 2015-03-16 11:20:04 +0200 | [diff] [blame] | 4 | import socket |
| 5 | import select |
| 6 | import cPickle as pickle |
| 7 | from urlparse import urlparse |
| 8 | |
koder aka kdanilov | cff7b2e | 2015-04-18 20:48:15 +0300 | [diff] [blame] | 9 | from . import cp_transport |
Ved-vampir | 0c7e2d4 | 2015-03-18 17:18:47 +0300 | [diff] [blame] | 10 | |
koder aka kdanilov | dda86d3 | 2015-03-16 11:20:04 +0200 | [diff] [blame] | 11 | |
| 12 | class Timeout(Exception): |
| 13 | pass |
| 14 | |
| 15 | |
| 16 | # ------------------------------------- Serializers -------------------------- |
| 17 | |
| 18 | |
| 19 | class ISensortResultsSerializer(object): |
| 20 | def pack(self, data): |
| 21 | pass |
| 22 | |
| 23 | def unpack(self, data): |
| 24 | pass |
| 25 | |
| 26 | |
Alyona Kiseleva | 7f6de4f | 2015-04-21 01:04:20 +0300 | [diff] [blame] | 27 | class StructSerializer(ISensortResultsSerializer): |
| 28 | class LocalConfig(object): |
| 29 | def __init__(self): |
| 30 | self.last_format_sent = -1 |
| 31 | self.initial_sent = False |
| 32 | self.initial_times = 5 |
| 33 | self.field_order = None |
| 34 | |
| 35 | def __init__(self): |
| 36 | self.configs = {} |
| 37 | |
| 38 | def pack(self, data): |
| 39 | OLD_FORMAT = 5 |
| 40 | source_id = data["source_id"] |
| 41 | config = self.configs.setdefault(source_id, |
| 42 | StructSerializer.LocalConfig()) |
| 43 | |
| 44 | if config.field_order is None or \ |
| 45 | not config.initial_sent or \ |
| 46 | time.time() - config.last_format_sent > OLD_FORMAT: |
| 47 | # send|resend format |
| 48 | field_order = sorted(data.keys()) |
| 49 | |
| 50 | config.field_order = field_order |
| 51 | config.last_format_sent = time.time() |
| 52 | if not config.initial_sent: |
| 53 | config.initial_times -= 1 |
| 54 | config.initial_sent = (config.initial_times <= 0) |
| 55 | |
| 56 | forder = "\n".join(field_order) |
| 57 | flen = struct.pack("!H", len(field_order)) |
| 58 | return "\x00{0}\x00{1}{2}".format(source_id, flen, forder) |
| 59 | else: |
| 60 | # send data |
| 61 | # time will be first after source_id |
| 62 | vals = [data["time"]] |
| 63 | for name in config.field_order: |
| 64 | if name in data: |
| 65 | vals.append(data[name]) |
| 66 | pack_fmt = "!" + ("I" * len(vals)) |
| 67 | packed_data = struct.pack(pack_fmt, vals) |
| 68 | return "\x01{0}\x00{1}".format(source_id, packed_data) |
| 69 | |
| 70 | def unpack(self, data): |
| 71 | code = data[0] |
| 72 | data = data[1:] |
| 73 | source_id, _, packed_data = data.partition("\x00") |
| 74 | config = self.configs.setdefault(source_id, |
| 75 | StructSerializer.LocalConfig()) |
| 76 | unpacked_data = {"source_id":source_id} |
| 77 | |
| 78 | if code == "\x00": |
| 79 | # fields order provided |
| 80 | flen = struct.unpack("!H", packed_data[:2]) |
| 81 | forder = packed_data[2:].split("\n") |
| 82 | if len(forder) != flen: |
| 83 | return unpacked_data |
| 84 | config.field_order = forder |
| 85 | return unpacked_data |
| 86 | |
| 87 | else: |
| 88 | # data provided |
| 89 | # try to find fields_order |
| 90 | if config.field_order is None: |
| 91 | raise ValueError("No fields order provided" |
| 92 | " for {0}, cannot unpack".format(source_id)) |
| 93 | |
| 94 | val_size = 4 |
| 95 | if len(packed_data) % val_size != 0: |
| 96 | raise ValueError("Bad packet received" |
| 97 | " from {0}, cannot unpack".format(source_id)) |
| 98 | datalen = len(packed_data) / val_size |
| 99 | pack_fmt = "!" + ("I" * datalen) |
| 100 | vals = struct.unpack(pack_fmt, packed_data) |
| 101 | |
| 102 | unpacked_data['time'] = vals[0] |
| 103 | i = 1 |
| 104 | for field in config.field_order: |
| 105 | data[field] = vals[i] |
| 106 | i += 1 |
| 107 | return data |
| 108 | |
| 109 | |
koder aka kdanilov | dda86d3 | 2015-03-16 11:20:04 +0200 | [diff] [blame] | 110 | class PickleSerializer(ISensortResultsSerializer): |
| 111 | def pack(self, data): |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 112 | ndata = {} |
| 113 | for key, val in data.items(): |
| 114 | if isinstance(val, basestring): |
| 115 | ndata[key] = val |
| 116 | else: |
| 117 | ndata[key] = val.value |
koder aka kdanilov | dda86d3 | 2015-03-16 11:20:04 +0200 | [diff] [blame] | 118 | return pickle.dumps(ndata) |
| 119 | |
| 120 | def unpack(self, data): |
| 121 | return pickle.loads(data) |
| 122 | |
koder aka kdanilov | dda86d3 | 2015-03-16 11:20:04 +0200 | [diff] [blame] | 123 | # ------------------------------------- Transports --------------------------- |
| 124 | |
koder aka kdanilov | 2c47309 | 2015-03-29 17:12:13 +0300 | [diff] [blame] | 125 | |
koder aka kdanilov | dda86d3 | 2015-03-16 11:20:04 +0200 | [diff] [blame] | 126 | class ITransport(object): |
| 127 | def __init__(self, receiver): |
| 128 | pass |
| 129 | |
| 130 | def send(self, data): |
| 131 | pass |
| 132 | |
| 133 | def recv(self, timeout=None): |
| 134 | pass |
| 135 | |
| 136 | |
| 137 | class StdoutTransport(ITransport): |
| 138 | MIN_COL_WIDTH = 10 |
| 139 | |
| 140 | def __init__(self, receiver, delta=True): |
| 141 | if receiver: |
koder aka kdanilov | 2c47309 | 2015-03-29 17:12:13 +0300 | [diff] [blame] | 142 | cname = self.__class__.__name__ |
| 143 | raise ValueError("{0} don't allows receiving".format(cname)) |
koder aka kdanilov | dda86d3 | 2015-03-16 11:20:04 +0200 | [diff] [blame] | 144 | |
| 145 | self.headers = None |
| 146 | self.line_format = "" |
| 147 | self.prev = {} |
| 148 | self.delta = delta |
koder aka kdanilov | 2c47309 | 2015-03-29 17:12:13 +0300 | [diff] [blame] | 149 | self.fd = sys.stdout |
koder aka kdanilov | dda86d3 | 2015-03-16 11:20:04 +0200 | [diff] [blame] | 150 | |
| 151 | def send(self, data): |
| 152 | if self.headers is None: |
| 153 | self.headers = sorted(data) |
| 154 | |
| 155 | for pos, header in enumerate(self.headers): |
| 156 | self.line_format += "{%s:>%s}" % (pos, |
| 157 | max(len(header) + 1, |
| 158 | self.MIN_COL_WIDTH)) |
| 159 | |
| 160 | print self.line_format.format(*self.headers) |
| 161 | |
| 162 | if self.delta: |
| 163 | vals = [data[header].value - self.prev.get(header, 0) |
| 164 | for header in self.headers] |
| 165 | |
koder aka kdanilov | 6b1341a | 2015-04-21 22:44:21 +0300 | [diff] [blame] | 166 | self.prev.update(dict((header, data[header].value) |
| 167 | for header in self.headers)) |
koder aka kdanilov | dda86d3 | 2015-03-16 11:20:04 +0200 | [diff] [blame] | 168 | else: |
| 169 | vals = [data[header].value for header in self.headers] |
| 170 | |
koder aka kdanilov | 2c47309 | 2015-03-29 17:12:13 +0300 | [diff] [blame] | 171 | self.fd.write(self.line_format.format(*vals) + "\n") |
koder aka kdanilov | dda86d3 | 2015-03-16 11:20:04 +0200 | [diff] [blame] | 172 | |
| 173 | def recv(self, timeout=None): |
koder aka kdanilov | 2c47309 | 2015-03-29 17:12:13 +0300 | [diff] [blame] | 174 | cname = self.__class__.__name__ |
| 175 | raise ValueError("{0} don't allows receiving".format(cname)) |
| 176 | |
| 177 | |
| 178 | class FileTransport(StdoutTransport): |
| 179 | def __init__(self, receiver, fname, delta=True): |
| 180 | StdoutTransport.__init__(self, receiver, delta) |
| 181 | self.fd = open(fname, "w") |
koder aka kdanilov | dda86d3 | 2015-03-16 11:20:04 +0200 | [diff] [blame] | 182 | |
| 183 | |
| 184 | class UDPTransport(ITransport): |
| 185 | def __init__(self, receiver, ip, port, packer_cls): |
| 186 | self.port = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
| 187 | if receiver: |
| 188 | self.port.bind((ip, port)) |
| 189 | self.packer_cls = packer_cls |
| 190 | self.packers = {} |
| 191 | else: |
| 192 | self.packer = packer_cls() |
| 193 | self.dst = (ip, port) |
| 194 | |
| 195 | def send(self, data): |
| 196 | raw_data = self.packer.pack(data) |
| 197 | self.port.sendto(raw_data, self.dst) |
| 198 | |
| 199 | def recv(self, timeout=None): |
| 200 | r, _, _ = select.select([self.port], [], [], timeout) |
| 201 | if len(r) != 0: |
| 202 | raw_data, addr = self.port.recvfrom(10000) |
| 203 | packer = self.packers.setdefault(addr, self.packer_cls()) |
| 204 | return addr, packer.unpack(raw_data) |
| 205 | else: |
| 206 | raise Timeout() |
| 207 | |
| 208 | |
| 209 | # -------------------------- Factory function -------------------------------- |
| 210 | |
| 211 | |
| 212 | def create_protocol(uri, receiver=False): |
| 213 | parsed_uri = urlparse(uri) |
| 214 | if parsed_uri.scheme == 'stdout': |
| 215 | return StdoutTransport(receiver) |
| 216 | elif parsed_uri.scheme == 'udp': |
| 217 | ip, port = parsed_uri.netloc.split(":") |
| 218 | return UDPTransport(receiver, ip=ip, port=int(port), |
Alyona Kiseleva | 7f6de4f | 2015-04-21 01:04:20 +0300 | [diff] [blame] | 219 | packer_cls=StructSerializer) |
koder aka kdanilov | 2c47309 | 2015-03-29 17:12:13 +0300 | [diff] [blame] | 220 | elif parsed_uri.scheme == 'file': |
| 221 | return FileTransport(receiver, parsed_uri.path) |
koder aka kdanilov | dda86d3 | 2015-03-16 11:20:04 +0200 | [diff] [blame] | 222 | else: |
| 223 | templ = "Can't instantiate transport from {0!r}" |
| 224 | raise ValueError(templ.format(uri)) |