blob: ea7378925310a188d1ab413d8a9aac53b3a1d92d [file] [log] [blame]
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +03001import Queue
koder aka kdanilov168f6092015-04-19 02:33:38 +03002import logging
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +03003import threading
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +03004
5from .deploy_sensors import (deploy_and_start_sensors,
6 stop_and_remove_sensors)
7from .protocol import create_protocol, Timeout
8
9
koder aka kdanilov168f6092015-04-19 02:33:38 +030010__all__ = ['Empty', 'recv_main',
11 'deploy_and_start_sensors',
12 'SensorConfig',
13 'stop_and_remove_sensors',
14 'start_listener_thread',
15 ]
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030016
17
18Empty = Queue.Empty
koder aka kdanilov168f6092015-04-19 02:33:38 +030019logger = logging.getLogger("wally.sensors")
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030020
21
22class SensorConfig(object):
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +030023 def __init__(self, conn, url, sensors, source_id,
24 monitor_url=None):
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030025 self.conn = conn
26 self.url = url
27 self.sensors = sensors
koder aka kdanilov168f6092015-04-19 02:33:38 +030028 self.source_id = source_id
29 self.monitor_url = monitor_url
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030030
31
32def recv_main(proto, data_q, cmd_q):
33 while True:
34 try:
35 data_q.put(proto.recv(0.1))
36 except Timeout:
37 pass
38
39 try:
40 val = cmd_q.get(False)
41
42 if val is None:
43 return
44
45 except Queue.Empty:
46 pass
47
48
koder aka kdanilov168f6092015-04-19 02:33:38 +030049def start_listener_thread(uri):
50 data_q = Queue.Queue()
51 cmd_q = Queue.Queue()
52 logger.debug("Listening for sensor data on " + uri)
53 proto = create_protocol(uri, receiver=True)
54 th = threading.Thread(None, recv_main, None, (proto, data_q, cmd_q))
55 th.daemon = True
56 th.start()
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030057
koder aka kdanilov168f6092015-04-19 02:33:38 +030058 def stop_thread():
59 cmd_q.put(None)
60 th.join()
61
62 return data_q, stop_thread