blob: 7688f31f9b76da70b130706929472ccf5c670a8c [file] [log] [blame]
koder aka kdanilov2c473092015-03-29 17:12:13 +03001import sys
Ved-vampir0c7e2d42015-03-18 17:18:47 +03002import time
koder aka kdanilovdda86d32015-03-16 11:20:04 +02003import socket
4import select
5import cPickle as pickle
6from urlparse import urlparse
7
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +03008from . import cp_transport
Ved-vampir0c7e2d42015-03-18 17:18:47 +03009
koder aka kdanilovdda86d32015-03-16 11:20:04 +020010
11class Timeout(Exception):
12 pass
13
14
15# ------------------------------------- Serializers --------------------------
16
17
18class ISensortResultsSerializer(object):
19 def pack(self, data):
20 pass
21
22 def unpack(self, data):
23 pass
24
25
26class PickleSerializer(ISensortResultsSerializer):
27 def pack(self, data):
koder aka kdanilov168f6092015-04-19 02:33:38 +030028 ndata = {}
29 for key, val in data.items():
30 if isinstance(val, basestring):
31 ndata[key] = val
32 else:
33 ndata[key] = val.value
koder aka kdanilovdda86d32015-03-16 11:20:04 +020034 return pickle.dumps(ndata)
35
36 def unpack(self, data):
37 return pickle.loads(data)
38
Ved-vampir2c2f2e92015-03-18 18:02:25 +030039try:
40 # try to use full-function lib
41 import msgpack
42
43 class mgspackSerializer(ISensortResultsSerializer):
44 def pack(self, data):
45 return msgpack.packb(data)
46
47 def unpack(self, data):
48 return msgpack.unpackb(data)
49
50 MSGPackSerializer = mgspackSerializer
51except ImportError:
52 # use local lib, if failed import
53 import umsgpack
54
55 class umsgspackSerializer(ISensortResultsSerializer):
56 def pack(self, data):
57 return umsgpack.packb(data)
58
59 def unpack(self, data):
60 return umsgpack.unpackb(data)
61
62 MSGPackSerializer = umsgspackSerializer
koder aka kdanilovdda86d32015-03-16 11:20:04 +020063
64# ------------------------------------- Transports ---------------------------
65
koder aka kdanilov2c473092015-03-29 17:12:13 +030066
koder aka kdanilovdda86d32015-03-16 11:20:04 +020067class ITransport(object):
68 def __init__(self, receiver):
69 pass
70
71 def send(self, data):
72 pass
73
74 def recv(self, timeout=None):
75 pass
76
77
78class StdoutTransport(ITransport):
79 MIN_COL_WIDTH = 10
80
81 def __init__(self, receiver, delta=True):
82 if receiver:
koder aka kdanilov2c473092015-03-29 17:12:13 +030083 cname = self.__class__.__name__
84 raise ValueError("{0} don't allows receiving".format(cname))
koder aka kdanilovdda86d32015-03-16 11:20:04 +020085
86 self.headers = None
87 self.line_format = ""
88 self.prev = {}
89 self.delta = delta
koder aka kdanilov2c473092015-03-29 17:12:13 +030090 self.fd = sys.stdout
koder aka kdanilovdda86d32015-03-16 11:20:04 +020091
92 def send(self, data):
93 if self.headers is None:
94 self.headers = sorted(data)
95
96 for pos, header in enumerate(self.headers):
97 self.line_format += "{%s:>%s}" % (pos,
98 max(len(header) + 1,
99 self.MIN_COL_WIDTH))
100
101 print self.line_format.format(*self.headers)
102
103 if self.delta:
104 vals = [data[header].value - self.prev.get(header, 0)
105 for header in self.headers]
106
107 self.prev.update({header: data[header].value
108 for header in self.headers})
109 else:
110 vals = [data[header].value for header in self.headers]
111
koder aka kdanilov2c473092015-03-29 17:12:13 +0300112 self.fd.write(self.line_format.format(*vals) + "\n")
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200113
114 def recv(self, timeout=None):
koder aka kdanilov2c473092015-03-29 17:12:13 +0300115 cname = self.__class__.__name__
116 raise ValueError("{0} don't allows receiving".format(cname))
117
118
119class FileTransport(StdoutTransport):
120 def __init__(self, receiver, fname, delta=True):
121 StdoutTransport.__init__(self, receiver, delta)
122 self.fd = open(fname, "w")
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200123
124
125class UDPTransport(ITransport):
126 def __init__(self, receiver, ip, port, packer_cls):
127 self.port = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
128 if receiver:
129 self.port.bind((ip, port))
130 self.packer_cls = packer_cls
131 self.packers = {}
132 else:
133 self.packer = packer_cls()
134 self.dst = (ip, port)
135
136 def send(self, data):
137 raw_data = self.packer.pack(data)
138 self.port.sendto(raw_data, self.dst)
139
140 def recv(self, timeout=None):
141 r, _, _ = select.select([self.port], [], [], timeout)
142 if len(r) != 0:
143 raw_data, addr = self.port.recvfrom(10000)
144 packer = self.packers.setdefault(addr, self.packer_cls())
145 return addr, packer.unpack(raw_data)
146 else:
147 raise Timeout()
148
149
Ved-vampir0c7e2d42015-03-18 17:18:47 +0300150class HugeUDPTransport(ITransport, cp_transport.Sender):
Ved-vampir2c2f2e92015-03-18 18:02:25 +0300151 def __init__(self, receiver, ip, port, packer_cls):
Ved-vampir0c7e2d42015-03-18 17:18:47 +0300152 cp_transport.Sender.__init__(self, port=port, host=ip)
153 if receiver:
154 self.bind()
155
156 def send(self, data):
157 self.send_by_protocol(data)
158
159 def recv(self, timeout=None):
160 begin = time.time()
161
162 while True:
163
164 try:
165 # return not None, if packet is ready
166 ready = self.recv_by_protocol()
167 # if data ready - return it
168 if ready is not None:
169 return ready
170 # if data not ready - check if it's time to die
171 if time.time() - begin >= timeout:
172 break
173
174 except cp_transport.Timeout:
175 # no answer yet - check, if timeout end
176 if time.time() - begin >= timeout:
177 break
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200178# -------------------------- Factory function --------------------------------
179
180
181def create_protocol(uri, receiver=False):
182 parsed_uri = urlparse(uri)
183 if parsed_uri.scheme == 'stdout':
184 return StdoutTransport(receiver)
185 elif parsed_uri.scheme == 'udp':
186 ip, port = parsed_uri.netloc.split(":")
187 return UDPTransport(receiver, ip=ip, port=int(port),
188 packer_cls=PickleSerializer)
koder aka kdanilov2c473092015-03-29 17:12:13 +0300189 elif parsed_uri.scheme == 'file':
190 return FileTransport(receiver, parsed_uri.path)
Ved-vampir0c7e2d42015-03-18 17:18:47 +0300191 elif parsed_uri.scheme == 'hugeudp':
192 ip, port = parsed_uri.netloc.split(":")
Ved-vampir2c2f2e92015-03-18 18:02:25 +0300193 return HugeUDPTransport(receiver, ip=ip, port=int(port),
koder aka kdanilov2c473092015-03-29 17:12:13 +0300194 packer_cls=MSGPackSerializer)
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200195 else:
196 templ = "Can't instantiate transport from {0!r}"
197 raise ValueError(templ.format(uri))