blob: f66bb36e8c439880f3f28691d152d0c9bb9c0b8f [file] [log] [blame]
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +03001import Queue
2import threading
3from contextlib import contextmanager
4
5from .deploy_sensors import (deploy_and_start_sensors,
6 stop_and_remove_sensors)
7from .protocol import create_protocol, Timeout
8
9
10__all__ = ['Empty', 'recv_main', 'start_monitoring',
11 'deploy_and_start_sensors', 'SensorConfig']
12
13
14Empty = Queue.Empty
15
16
17class SensorConfig(object):
18 def __init__(self, conn, url, sensors):
19 self.conn = conn
20 self.url = url
21 self.sensors = sensors
22
23
24def recv_main(proto, data_q, cmd_q):
25 while True:
26 try:
27 data_q.put(proto.recv(0.1))
28 except Timeout:
29 pass
30
31 try:
32 val = cmd_q.get(False)
33
34 if val is None:
35 return
36
37 except Queue.Empty:
38 pass
39
40
41@contextmanager
42def start_monitoring(uri, configs):
43 deploy_and_start_sensors(uri, configs)
44 try:
45 data_q = Queue.Queue()
46 cmd_q = Queue.Queue()
47 proto = create_protocol(uri, receiver=True)
48 th = threading.Thread(None, recv_main, None, (proto, data_q, cmd_q))
49 th.daemon = True
50 th.start()
51
52 try:
53 yield data_q
54 finally:
55 cmd_q.put(None)
56 th.join()
57 finally:
58 stop_and_remove_sensors(configs)