blob: 02b661a5c2f2f503e8f5f15ad7c6f9c9c7e14e03 [file] [log] [blame]
koder aka kdanilov2c473092015-03-29 17:12:13 +03001import sys
Ved-vampir0c7e2d42015-03-18 17:18:47 +03002import time
koder aka kdanilovdda86d32015-03-16 11:20:04 +02003import socket
4import select
5import cPickle as pickle
6from urlparse import urlparse
7
Ved-vampir0c7e2d42015-03-18 17:18:47 +03008import cp_transport
9
koder aka kdanilovdda86d32015-03-16 11:20:04 +020010
11class Timeout(Exception):
12 pass
13
14
15# ------------------------------------- Serializers --------------------------
16
17
18class ISensortResultsSerializer(object):
19 def pack(self, data):
20 pass
21
22 def unpack(self, data):
23 pass
24
25
26class PickleSerializer(ISensortResultsSerializer):
27 def pack(self, data):
28 ndata = {key: val.value for key, val in data.items()}
29 return pickle.dumps(ndata)
30
31 def unpack(self, data):
32 return pickle.loads(data)
33
Ved-vampir2c2f2e92015-03-18 18:02:25 +030034try:
35 # try to use full-function lib
36 import msgpack
37
38 class mgspackSerializer(ISensortResultsSerializer):
39 def pack(self, data):
40 return msgpack.packb(data)
41
42 def unpack(self, data):
43 return msgpack.unpackb(data)
44
45 MSGPackSerializer = mgspackSerializer
46except ImportError:
47 # use local lib, if failed import
48 import umsgpack
49
50 class umsgspackSerializer(ISensortResultsSerializer):
51 def pack(self, data):
52 return umsgpack.packb(data)
53
54 def unpack(self, data):
55 return umsgpack.unpackb(data)
56
57 MSGPackSerializer = umsgspackSerializer
koder aka kdanilovdda86d32015-03-16 11:20:04 +020058
59# ------------------------------------- Transports ---------------------------
60
koder aka kdanilov2c473092015-03-29 17:12:13 +030061
koder aka kdanilovdda86d32015-03-16 11:20:04 +020062class ITransport(object):
63 def __init__(self, receiver):
64 pass
65
66 def send(self, data):
67 pass
68
69 def recv(self, timeout=None):
70 pass
71
72
73class StdoutTransport(ITransport):
74 MIN_COL_WIDTH = 10
75
76 def __init__(self, receiver, delta=True):
77 if receiver:
koder aka kdanilov2c473092015-03-29 17:12:13 +030078 cname = self.__class__.__name__
79 raise ValueError("{0} don't allows receiving".format(cname))
koder aka kdanilovdda86d32015-03-16 11:20:04 +020080
81 self.headers = None
82 self.line_format = ""
83 self.prev = {}
84 self.delta = delta
koder aka kdanilov2c473092015-03-29 17:12:13 +030085 self.fd = sys.stdout
koder aka kdanilovdda86d32015-03-16 11:20:04 +020086
87 def send(self, data):
88 if self.headers is None:
89 self.headers = sorted(data)
90
91 for pos, header in enumerate(self.headers):
92 self.line_format += "{%s:>%s}" % (pos,
93 max(len(header) + 1,
94 self.MIN_COL_WIDTH))
95
96 print self.line_format.format(*self.headers)
97
98 if self.delta:
99 vals = [data[header].value - self.prev.get(header, 0)
100 for header in self.headers]
101
102 self.prev.update({header: data[header].value
103 for header in self.headers})
104 else:
105 vals = [data[header].value for header in self.headers]
106
koder aka kdanilov2c473092015-03-29 17:12:13 +0300107 self.fd.write(self.line_format.format(*vals) + "\n")
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200108
109 def recv(self, timeout=None):
koder aka kdanilov2c473092015-03-29 17:12:13 +0300110 cname = self.__class__.__name__
111 raise ValueError("{0} don't allows receiving".format(cname))
112
113
114class FileTransport(StdoutTransport):
115 def __init__(self, receiver, fname, delta=True):
116 StdoutTransport.__init__(self, receiver, delta)
117 self.fd = open(fname, "w")
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200118
119
120class UDPTransport(ITransport):
121 def __init__(self, receiver, ip, port, packer_cls):
122 self.port = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
123 if receiver:
124 self.port.bind((ip, port))
125 self.packer_cls = packer_cls
126 self.packers = {}
127 else:
128 self.packer = packer_cls()
129 self.dst = (ip, port)
130
131 def send(self, data):
132 raw_data = self.packer.pack(data)
133 self.port.sendto(raw_data, self.dst)
134
135 def recv(self, timeout=None):
136 r, _, _ = select.select([self.port], [], [], timeout)
137 if len(r) != 0:
138 raw_data, addr = self.port.recvfrom(10000)
139 packer = self.packers.setdefault(addr, self.packer_cls())
140 return addr, packer.unpack(raw_data)
141 else:
142 raise Timeout()
143
144
Ved-vampir0c7e2d42015-03-18 17:18:47 +0300145class HugeUDPTransport(ITransport, cp_transport.Sender):
Ved-vampir2c2f2e92015-03-18 18:02:25 +0300146 def __init__(self, receiver, ip, port, packer_cls):
Ved-vampir0c7e2d42015-03-18 17:18:47 +0300147 cp_transport.Sender.__init__(self, port=port, host=ip)
148 if receiver:
149 self.bind()
150
151 def send(self, data):
152 self.send_by_protocol(data)
153
154 def recv(self, timeout=None):
155 begin = time.time()
156
157 while True:
158
159 try:
160 # return not None, if packet is ready
161 ready = self.recv_by_protocol()
162 # if data ready - return it
163 if ready is not None:
164 return ready
165 # if data not ready - check if it's time to die
166 if time.time() - begin >= timeout:
167 break
168
169 except cp_transport.Timeout:
170 # no answer yet - check, if timeout end
171 if time.time() - begin >= timeout:
172 break
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200173# -------------------------- Factory function --------------------------------
174
175
176def create_protocol(uri, receiver=False):
177 parsed_uri = urlparse(uri)
178 if parsed_uri.scheme == 'stdout':
179 return StdoutTransport(receiver)
180 elif parsed_uri.scheme == 'udp':
181 ip, port = parsed_uri.netloc.split(":")
182 return UDPTransport(receiver, ip=ip, port=int(port),
183 packer_cls=PickleSerializer)
koder aka kdanilov2c473092015-03-29 17:12:13 +0300184 elif parsed_uri.scheme == 'file':
185 return FileTransport(receiver, parsed_uri.path)
Ved-vampir0c7e2d42015-03-18 17:18:47 +0300186 elif parsed_uri.scheme == 'hugeudp':
187 ip, port = parsed_uri.netloc.split(":")
Ved-vampir2c2f2e92015-03-18 18:02:25 +0300188 return HugeUDPTransport(receiver, ip=ip, port=int(port),
koder aka kdanilov2c473092015-03-29 17:12:13 +0300189 packer_cls=MSGPackSerializer)
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200190 else:
191 templ = "Can't instantiate transport from {0!r}"
192 raise ValueError(templ.format(uri))