blob: 6eed90a8cd29b9d91fa6105e6003472ec954c20f [file] [log] [blame]
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +03001import Queue
koder aka kdanilov168f6092015-04-19 02:33:38 +03002import logging
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +03003import threading
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +03004
5from .deploy_sensors import (deploy_and_start_sensors,
6 stop_and_remove_sensors)
7from .protocol import create_protocol, Timeout
8
9
koder aka kdanilov168f6092015-04-19 02:33:38 +030010__all__ = ['Empty', 'recv_main',
11 'deploy_and_start_sensors',
12 'SensorConfig',
13 'stop_and_remove_sensors',
14 'start_listener_thread',
15 ]
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030016
17
18Empty = Queue.Empty
koder aka kdanilov168f6092015-04-19 02:33:38 +030019logger = logging.getLogger("wally.sensors")
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030020
21
22class SensorConfig(object):
koder aka kdanilov168f6092015-04-19 02:33:38 +030023 def __init__(self, conn, url, sensors, source_id, monitor_url=None):
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030024 self.conn = conn
25 self.url = url
26 self.sensors = sensors
koder aka kdanilov168f6092015-04-19 02:33:38 +030027 self.source_id = source_id
28 self.monitor_url = monitor_url
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030029
30
31def recv_main(proto, data_q, cmd_q):
32 while True:
33 try:
34 data_q.put(proto.recv(0.1))
35 except Timeout:
36 pass
37
38 try:
39 val = cmd_q.get(False)
40
41 if val is None:
42 return
43
44 except Queue.Empty:
45 pass
46
47
koder aka kdanilov168f6092015-04-19 02:33:38 +030048def start_listener_thread(uri):
49 data_q = Queue.Queue()
50 cmd_q = Queue.Queue()
51 logger.debug("Listening for sensor data on " + uri)
52 proto = create_protocol(uri, receiver=True)
53 th = threading.Thread(None, recv_main, None, (proto, data_q, cmd_q))
54 th.daemon = True
55 th.start()
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030056
koder aka kdanilov168f6092015-04-19 02:33:38 +030057 def stop_thread():
58 cmd_q.put(None)
59 th.join()
60
61 return data_q, stop_thread