blob: cfdd93ec82dc51644c520f676158ae1cfea4c6de [file] [log] [blame]
Ved-vampir0c7e2d42015-03-18 17:18:47 +03001import time
koder aka kdanilovdda86d32015-03-16 11:20:04 +02002import socket
3import select
4import cPickle as pickle
5from urlparse import urlparse
6
Ved-vampir0c7e2d42015-03-18 17:18:47 +03007import cp_transport
8
koder aka kdanilovdda86d32015-03-16 11:20:04 +02009
10class Timeout(Exception):
11 pass
12
13
14# ------------------------------------- Serializers --------------------------
15
16
17class ISensortResultsSerializer(object):
18 def pack(self, data):
19 pass
20
21 def unpack(self, data):
22 pass
23
24
25class PickleSerializer(ISensortResultsSerializer):
26 def pack(self, data):
27 ndata = {key: val.value for key, val in data.items()}
28 return pickle.dumps(ndata)
29
30 def unpack(self, data):
31 return pickle.loads(data)
32
Ved-vampir2c2f2e92015-03-18 18:02:25 +030033try:
34 # try to use full-function lib
35 import msgpack
36
37 class mgspackSerializer(ISensortResultsSerializer):
38 def pack(self, data):
39 return msgpack.packb(data)
40
41 def unpack(self, data):
42 return msgpack.unpackb(data)
43
44 MSGPackSerializer = mgspackSerializer
45except ImportError:
46 # use local lib, if failed import
47 import umsgpack
48
49 class umsgspackSerializer(ISensortResultsSerializer):
50 def pack(self, data):
51 return umsgpack.packb(data)
52
53 def unpack(self, data):
54 return umsgpack.unpackb(data)
55
56 MSGPackSerializer = umsgspackSerializer
koder aka kdanilovdda86d32015-03-16 11:20:04 +020057
58# ------------------------------------- Transports ---------------------------
59
60class ITransport(object):
61 def __init__(self, receiver):
62 pass
63
64 def send(self, data):
65 pass
66
67 def recv(self, timeout=None):
68 pass
69
70
71class StdoutTransport(ITransport):
72 MIN_COL_WIDTH = 10
73
74 def __init__(self, receiver, delta=True):
75 if receiver:
76 raise ValueError("StdoutTransport don't allows receiving")
77
78 self.headers = None
79 self.line_format = ""
80 self.prev = {}
81 self.delta = delta
82
83 def send(self, data):
84 if self.headers is None:
85 self.headers = sorted(data)
86
87 for pos, header in enumerate(self.headers):
88 self.line_format += "{%s:>%s}" % (pos,
89 max(len(header) + 1,
90 self.MIN_COL_WIDTH))
91
92 print self.line_format.format(*self.headers)
93
94 if self.delta:
95 vals = [data[header].value - self.prev.get(header, 0)
96 for header in self.headers]
97
98 self.prev.update({header: data[header].value
99 for header in self.headers})
100 else:
101 vals = [data[header].value for header in self.headers]
102
103 print self.line_format.format(*vals)
104
105 def recv(self, timeout=None):
106 raise ValueError("StdoutTransport don't allows receiving")
107
108
109class UDPTransport(ITransport):
110 def __init__(self, receiver, ip, port, packer_cls):
111 self.port = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
112 if receiver:
113 self.port.bind((ip, port))
114 self.packer_cls = packer_cls
115 self.packers = {}
116 else:
117 self.packer = packer_cls()
118 self.dst = (ip, port)
119
120 def send(self, data):
121 raw_data = self.packer.pack(data)
122 self.port.sendto(raw_data, self.dst)
123
124 def recv(self, timeout=None):
125 r, _, _ = select.select([self.port], [], [], timeout)
126 if len(r) != 0:
127 raw_data, addr = self.port.recvfrom(10000)
128 packer = self.packers.setdefault(addr, self.packer_cls())
129 return addr, packer.unpack(raw_data)
130 else:
131 raise Timeout()
132
133
Ved-vampir0c7e2d42015-03-18 17:18:47 +0300134class HugeUDPTransport(ITransport, cp_transport.Sender):
Ved-vampir2c2f2e92015-03-18 18:02:25 +0300135 def __init__(self, receiver, ip, port, packer_cls):
Ved-vampir0c7e2d42015-03-18 17:18:47 +0300136 cp_transport.Sender.__init__(self, port=port, host=ip)
137 if receiver:
138 self.bind()
139
140 def send(self, data):
141 self.send_by_protocol(data)
142
143 def recv(self, timeout=None):
144 begin = time.time()
145
146 while True:
147
148 try:
149 # return not None, if packet is ready
150 ready = self.recv_by_protocol()
151 # if data ready - return it
152 if ready is not None:
153 return ready
154 # if data not ready - check if it's time to die
155 if time.time() - begin >= timeout:
156 break
157
158 except cp_transport.Timeout:
159 # no answer yet - check, if timeout end
160 if time.time() - begin >= timeout:
161 break
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200162# -------------------------- Factory function --------------------------------
163
164
165def create_protocol(uri, receiver=False):
166 parsed_uri = urlparse(uri)
167 if parsed_uri.scheme == 'stdout':
168 return StdoutTransport(receiver)
169 elif parsed_uri.scheme == 'udp':
170 ip, port = parsed_uri.netloc.split(":")
171 return UDPTransport(receiver, ip=ip, port=int(port),
172 packer_cls=PickleSerializer)
Ved-vampir0c7e2d42015-03-18 17:18:47 +0300173 elif parsed_uri.scheme == 'hugeudp':
174 ip, port = parsed_uri.netloc.split(":")
Ved-vampir2c2f2e92015-03-18 18:02:25 +0300175 return HugeUDPTransport(receiver, ip=ip, port=int(port),
176 packer_cls=MSGPackSerializer)
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200177 else:
178 templ = "Can't instantiate transport from {0!r}"
179 raise ValueError(templ.format(uri))