blob: dc34af08eed277e883259b1e6742b4c609a25f3f [file] [log] [blame]
koder aka kdanilovdda86d32015-03-16 11:20:04 +02001import Queue
2import threading
3
4from contextlib import contextmanager
5
6from deploy_sensors import (deploy_and_start_sensors,
7 stop_and_remove_sensors)
8from protocol import create_protocol, Timeout
9
10
11Empty = Queue.Empty
12
13
14def recv_main(proto, data_q, cmd_q):
15 while True:
16 try:
17 data_q.put(proto.recv(0.1))
18 except Timeout:
19 pass
koder aka kdanilov2c473092015-03-29 17:12:13 +030020
koder aka kdanilovdda86d32015-03-16 11:20:04 +020021 try:
22 val = cmd_q.get(False)
23
24 if val is None:
25 return
26
27 except Queue.Empty:
28 pass
29
30
31@contextmanager
koder aka kdanilov2c473092015-03-29 17:12:13 +030032def start_monitoring(uri, config=None, connected_config=None):
33 deploy_and_start_sensors(uri, config=config,
34 connected_config=connected_config)
koder aka kdanilovdda86d32015-03-16 11:20:04 +020035 try:
36 data_q = Queue.Queue()
37 cmd_q = Queue.Queue()
38 proto = create_protocol(uri, receiver=True)
39 th = threading.Thread(None, recv_main, None, (proto, data_q, cmd_q))
40 th.daemon = True
41 th.start()
42
43 try:
44 yield data_q
45 finally:
46 cmd_q.put(None)
47 th.join()
48 finally:
koder aka kdanilov2c473092015-03-29 17:12:13 +030049 stop_and_remove_sensors(config,
50 connected_config=connected_config)