blob: 5d3bf65940ddc824818af069d9ae91a5fbc2e436 [file] [log] [blame]
Ved-vampir0c7e2d42015-03-18 17:18:47 +03001import time
koder aka kdanilovdda86d32015-03-16 11:20:04 +02002import socket
3import select
4import cPickle as pickle
5from urlparse import urlparse
6
Ved-vampir0c7e2d42015-03-18 17:18:47 +03007import cp_transport
8
koder aka kdanilovdda86d32015-03-16 11:20:04 +02009
10class Timeout(Exception):
11 pass
12
13
14# ------------------------------------- Serializers --------------------------
15
16
17class ISensortResultsSerializer(object):
18 def pack(self, data):
19 pass
20
21 def unpack(self, data):
22 pass
23
24
25class PickleSerializer(ISensortResultsSerializer):
26 def pack(self, data):
27 ndata = {key: val.value for key, val in data.items()}
28 return pickle.dumps(ndata)
29
30 def unpack(self, data):
31 return pickle.loads(data)
32
33
34# ------------------------------------- Transports ---------------------------
35
36class ITransport(object):
37 def __init__(self, receiver):
38 pass
39
40 def send(self, data):
41 pass
42
43 def recv(self, timeout=None):
44 pass
45
46
47class StdoutTransport(ITransport):
48 MIN_COL_WIDTH = 10
49
50 def __init__(self, receiver, delta=True):
51 if receiver:
52 raise ValueError("StdoutTransport don't allows receiving")
53
54 self.headers = None
55 self.line_format = ""
56 self.prev = {}
57 self.delta = delta
58
59 def send(self, data):
60 if self.headers is None:
61 self.headers = sorted(data)
62
63 for pos, header in enumerate(self.headers):
64 self.line_format += "{%s:>%s}" % (pos,
65 max(len(header) + 1,
66 self.MIN_COL_WIDTH))
67
68 print self.line_format.format(*self.headers)
69
70 if self.delta:
71 vals = [data[header].value - self.prev.get(header, 0)
72 for header in self.headers]
73
74 self.prev.update({header: data[header].value
75 for header in self.headers})
76 else:
77 vals = [data[header].value for header in self.headers]
78
79 print self.line_format.format(*vals)
80
81 def recv(self, timeout=None):
82 raise ValueError("StdoutTransport don't allows receiving")
83
84
85class UDPTransport(ITransport):
86 def __init__(self, receiver, ip, port, packer_cls):
87 self.port = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
88 if receiver:
89 self.port.bind((ip, port))
90 self.packer_cls = packer_cls
91 self.packers = {}
92 else:
93 self.packer = packer_cls()
94 self.dst = (ip, port)
95
96 def send(self, data):
97 raw_data = self.packer.pack(data)
98 self.port.sendto(raw_data, self.dst)
99
100 def recv(self, timeout=None):
101 r, _, _ = select.select([self.port], [], [], timeout)
102 if len(r) != 0:
103 raw_data, addr = self.port.recvfrom(10000)
104 packer = self.packers.setdefault(addr, self.packer_cls())
105 return addr, packer.unpack(raw_data)
106 else:
107 raise Timeout()
108
109
Ved-vampir0c7e2d42015-03-18 17:18:47 +0300110class HugeUDPTransport(ITransport, cp_transport.Sender):
111 def __init__(self, receiver, ip, port):
112 cp_transport.Sender.__init__(self, port=port, host=ip)
113 if receiver:
114 self.bind()
115
116 def send(self, data):
117 self.send_by_protocol(data)
118
119 def recv(self, timeout=None):
120 begin = time.time()
121
122 while True:
123
124 try:
125 # return not None, if packet is ready
126 ready = self.recv_by_protocol()
127 # if data ready - return it
128 if ready is not None:
129 return ready
130 # if data not ready - check if it's time to die
131 if time.time() - begin >= timeout:
132 break
133
134 except cp_transport.Timeout:
135 # no answer yet - check, if timeout end
136 if time.time() - begin >= timeout:
137 break
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200138# -------------------------- Factory function --------------------------------
139
140
141def create_protocol(uri, receiver=False):
142 parsed_uri = urlparse(uri)
143 if parsed_uri.scheme == 'stdout':
144 return StdoutTransport(receiver)
145 elif parsed_uri.scheme == 'udp':
146 ip, port = parsed_uri.netloc.split(":")
147 return UDPTransport(receiver, ip=ip, port=int(port),
148 packer_cls=PickleSerializer)
Ved-vampir0c7e2d42015-03-18 17:18:47 +0300149 elif parsed_uri.scheme == 'hugeudp':
150 ip, port = parsed_uri.netloc.split(":")
151 return HugeUDPTransport(receiver, ip=ip, port=int(port))
koder aka kdanilovdda86d32015-03-16 11:20:04 +0200152 else:
153 templ = "Can't instantiate transport from {0!r}"
154 raise ValueError(templ.format(uri))