koder aka kdanilov | 2c47309 | 2015-03-29 17:12:13 +0300 | [diff] [blame] | 1 | import sys |
Ved-vampir | 0c7e2d4 | 2015-03-18 17:18:47 +0300 | [diff] [blame] | 2 | import time |
koder aka kdanilov | dda86d3 | 2015-03-16 11:20:04 +0200 | [diff] [blame] | 3 | import socket |
| 4 | import select |
| 5 | import cPickle as pickle |
| 6 | from urlparse import urlparse |
| 7 | |
Ved-vampir | 0c7e2d4 | 2015-03-18 17:18:47 +0300 | [diff] [blame] | 8 | import cp_transport |
| 9 | |
koder aka kdanilov | dda86d3 | 2015-03-16 11:20:04 +0200 | [diff] [blame] | 10 | |
| 11 | class Timeout(Exception): |
| 12 | pass |
| 13 | |
| 14 | |
| 15 | # ------------------------------------- Serializers -------------------------- |
| 16 | |
| 17 | |
| 18 | class ISensortResultsSerializer(object): |
| 19 | def pack(self, data): |
| 20 | pass |
| 21 | |
| 22 | def unpack(self, data): |
| 23 | pass |
| 24 | |
| 25 | |
| 26 | class PickleSerializer(ISensortResultsSerializer): |
| 27 | def pack(self, data): |
| 28 | ndata = {key: val.value for key, val in data.items()} |
| 29 | return pickle.dumps(ndata) |
| 30 | |
| 31 | def unpack(self, data): |
| 32 | return pickle.loads(data) |
| 33 | |
Ved-vampir | 2c2f2e9 | 2015-03-18 18:02:25 +0300 | [diff] [blame] | 34 | try: |
| 35 | # try to use full-function lib |
| 36 | import msgpack |
| 37 | |
| 38 | class mgspackSerializer(ISensortResultsSerializer): |
| 39 | def pack(self, data): |
| 40 | return msgpack.packb(data) |
| 41 | |
| 42 | def unpack(self, data): |
| 43 | return msgpack.unpackb(data) |
| 44 | |
| 45 | MSGPackSerializer = mgspackSerializer |
| 46 | except ImportError: |
| 47 | # use local lib, if failed import |
| 48 | import umsgpack |
| 49 | |
| 50 | class umsgspackSerializer(ISensortResultsSerializer): |
| 51 | def pack(self, data): |
| 52 | return umsgpack.packb(data) |
| 53 | |
| 54 | def unpack(self, data): |
| 55 | return umsgpack.unpackb(data) |
| 56 | |
| 57 | MSGPackSerializer = umsgspackSerializer |
koder aka kdanilov | dda86d3 | 2015-03-16 11:20:04 +0200 | [diff] [blame] | 58 | |
| 59 | # ------------------------------------- Transports --------------------------- |
| 60 | |
koder aka kdanilov | 2c47309 | 2015-03-29 17:12:13 +0300 | [diff] [blame] | 61 | |
koder aka kdanilov | dda86d3 | 2015-03-16 11:20:04 +0200 | [diff] [blame] | 62 | class ITransport(object): |
| 63 | def __init__(self, receiver): |
| 64 | pass |
| 65 | |
| 66 | def send(self, data): |
| 67 | pass |
| 68 | |
| 69 | def recv(self, timeout=None): |
| 70 | pass |
| 71 | |
| 72 | |
| 73 | class StdoutTransport(ITransport): |
| 74 | MIN_COL_WIDTH = 10 |
| 75 | |
| 76 | def __init__(self, receiver, delta=True): |
| 77 | if receiver: |
koder aka kdanilov | 2c47309 | 2015-03-29 17:12:13 +0300 | [diff] [blame] | 78 | cname = self.__class__.__name__ |
| 79 | raise ValueError("{0} don't allows receiving".format(cname)) |
koder aka kdanilov | dda86d3 | 2015-03-16 11:20:04 +0200 | [diff] [blame] | 80 | |
| 81 | self.headers = None |
| 82 | self.line_format = "" |
| 83 | self.prev = {} |
| 84 | self.delta = delta |
koder aka kdanilov | 2c47309 | 2015-03-29 17:12:13 +0300 | [diff] [blame] | 85 | self.fd = sys.stdout |
koder aka kdanilov | dda86d3 | 2015-03-16 11:20:04 +0200 | [diff] [blame] | 86 | |
| 87 | def send(self, data): |
| 88 | if self.headers is None: |
| 89 | self.headers = sorted(data) |
| 90 | |
| 91 | for pos, header in enumerate(self.headers): |
| 92 | self.line_format += "{%s:>%s}" % (pos, |
| 93 | max(len(header) + 1, |
| 94 | self.MIN_COL_WIDTH)) |
| 95 | |
| 96 | print self.line_format.format(*self.headers) |
| 97 | |
| 98 | if self.delta: |
| 99 | vals = [data[header].value - self.prev.get(header, 0) |
| 100 | for header in self.headers] |
| 101 | |
| 102 | self.prev.update({header: data[header].value |
| 103 | for header in self.headers}) |
| 104 | else: |
| 105 | vals = [data[header].value for header in self.headers] |
| 106 | |
koder aka kdanilov | 2c47309 | 2015-03-29 17:12:13 +0300 | [diff] [blame] | 107 | self.fd.write(self.line_format.format(*vals) + "\n") |
koder aka kdanilov | dda86d3 | 2015-03-16 11:20:04 +0200 | [diff] [blame] | 108 | |
| 109 | def recv(self, timeout=None): |
koder aka kdanilov | 2c47309 | 2015-03-29 17:12:13 +0300 | [diff] [blame] | 110 | cname = self.__class__.__name__ |
| 111 | raise ValueError("{0} don't allows receiving".format(cname)) |
| 112 | |
| 113 | |
| 114 | class FileTransport(StdoutTransport): |
| 115 | def __init__(self, receiver, fname, delta=True): |
| 116 | StdoutTransport.__init__(self, receiver, delta) |
| 117 | self.fd = open(fname, "w") |
koder aka kdanilov | dda86d3 | 2015-03-16 11:20:04 +0200 | [diff] [blame] | 118 | |
| 119 | |
| 120 | class UDPTransport(ITransport): |
| 121 | def __init__(self, receiver, ip, port, packer_cls): |
| 122 | self.port = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
| 123 | if receiver: |
| 124 | self.port.bind((ip, port)) |
| 125 | self.packer_cls = packer_cls |
| 126 | self.packers = {} |
| 127 | else: |
| 128 | self.packer = packer_cls() |
| 129 | self.dst = (ip, port) |
| 130 | |
| 131 | def send(self, data): |
| 132 | raw_data = self.packer.pack(data) |
| 133 | self.port.sendto(raw_data, self.dst) |
| 134 | |
| 135 | def recv(self, timeout=None): |
| 136 | r, _, _ = select.select([self.port], [], [], timeout) |
| 137 | if len(r) != 0: |
| 138 | raw_data, addr = self.port.recvfrom(10000) |
| 139 | packer = self.packers.setdefault(addr, self.packer_cls()) |
| 140 | return addr, packer.unpack(raw_data) |
| 141 | else: |
| 142 | raise Timeout() |
| 143 | |
| 144 | |
Ved-vampir | 0c7e2d4 | 2015-03-18 17:18:47 +0300 | [diff] [blame] | 145 | class HugeUDPTransport(ITransport, cp_transport.Sender): |
Ved-vampir | 2c2f2e9 | 2015-03-18 18:02:25 +0300 | [diff] [blame] | 146 | def __init__(self, receiver, ip, port, packer_cls): |
Ved-vampir | 0c7e2d4 | 2015-03-18 17:18:47 +0300 | [diff] [blame] | 147 | cp_transport.Sender.__init__(self, port=port, host=ip) |
| 148 | if receiver: |
| 149 | self.bind() |
| 150 | |
| 151 | def send(self, data): |
| 152 | self.send_by_protocol(data) |
| 153 | |
| 154 | def recv(self, timeout=None): |
| 155 | begin = time.time() |
| 156 | |
| 157 | while True: |
| 158 | |
| 159 | try: |
| 160 | # return not None, if packet is ready |
| 161 | ready = self.recv_by_protocol() |
| 162 | # if data ready - return it |
| 163 | if ready is not None: |
| 164 | return ready |
| 165 | # if data not ready - check if it's time to die |
| 166 | if time.time() - begin >= timeout: |
| 167 | break |
| 168 | |
| 169 | except cp_transport.Timeout: |
| 170 | # no answer yet - check, if timeout end |
| 171 | if time.time() - begin >= timeout: |
| 172 | break |
koder aka kdanilov | dda86d3 | 2015-03-16 11:20:04 +0200 | [diff] [blame] | 173 | # -------------------------- Factory function -------------------------------- |
| 174 | |
| 175 | |
| 176 | def create_protocol(uri, receiver=False): |
| 177 | parsed_uri = urlparse(uri) |
| 178 | if parsed_uri.scheme == 'stdout': |
| 179 | return StdoutTransport(receiver) |
| 180 | elif parsed_uri.scheme == 'udp': |
| 181 | ip, port = parsed_uri.netloc.split(":") |
| 182 | return UDPTransport(receiver, ip=ip, port=int(port), |
| 183 | packer_cls=PickleSerializer) |
koder aka kdanilov | 2c47309 | 2015-03-29 17:12:13 +0300 | [diff] [blame] | 184 | elif parsed_uri.scheme == 'file': |
| 185 | return FileTransport(receiver, parsed_uri.path) |
Ved-vampir | 0c7e2d4 | 2015-03-18 17:18:47 +0300 | [diff] [blame] | 186 | elif parsed_uri.scheme == 'hugeudp': |
| 187 | ip, port = parsed_uri.netloc.split(":") |
Ved-vampir | 2c2f2e9 | 2015-03-18 18:02:25 +0300 | [diff] [blame] | 188 | return HugeUDPTransport(receiver, ip=ip, port=int(port), |
koder aka kdanilov | 2c47309 | 2015-03-29 17:12:13 +0300 | [diff] [blame] | 189 | packer_cls=MSGPackSerializer) |
koder aka kdanilov | dda86d3 | 2015-03-16 11:20:04 +0200 | [diff] [blame] | 190 | else: |
| 191 | templ = "Can't instantiate transport from {0!r}" |
| 192 | raise ValueError(templ.format(uri)) |