blob: e8c62617cfae9a14f9ae2abd3362c0b920e87746 [file] [log] [blame]
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +03001import Queue
koder aka kdanilov168f6092015-04-19 02:33:38 +03002import logging
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +03003import threading
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +03004
5from .deploy_sensors import (deploy_and_start_sensors,
6 stop_and_remove_sensors)
koder aka kdanilovafd98742015-04-24 01:27:22 +03007from .protocol import create_protocol, Timeout, CantUnpack
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +03008
9
koder aka kdanilov168f6092015-04-19 02:33:38 +030010__all__ = ['Empty', 'recv_main',
11 'deploy_and_start_sensors',
12 'SensorConfig',
13 'stop_and_remove_sensors',
14 'start_listener_thread',
15 ]
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030016
17
18Empty = Queue.Empty
koder aka kdanilov168f6092015-04-19 02:33:38 +030019logger = logging.getLogger("wally.sensors")
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030020
21
22class SensorConfig(object):
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +030023 def __init__(self, conn, url, sensors, source_id,
24 monitor_url=None):
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030025 self.conn = conn
26 self.url = url
27 self.sensors = sensors
koder aka kdanilov168f6092015-04-19 02:33:38 +030028 self.source_id = source_id
29 self.monitor_url = monitor_url
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030030
31
32def recv_main(proto, data_q, cmd_q):
33 while True:
34 try:
koder aka kdanilovafd98742015-04-24 01:27:22 +030035 ip, packet = proto.recv(0.1)
36 if packet is not None:
37 data_q.put((ip, packet))
38 except AssertionError as exc:
39 logger.warning("Error in sensor data " + str(exc))
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030040 except Timeout:
41 pass
koder aka kdanilovafd98742015-04-24 01:27:22 +030042 except CantUnpack as exc:
43 print exc
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030044
45 try:
46 val = cmd_q.get(False)
47
48 if val is None:
49 return
50
51 except Queue.Empty:
52 pass
53
54
koder aka kdanilov168f6092015-04-19 02:33:38 +030055def start_listener_thread(uri):
56 data_q = Queue.Queue()
57 cmd_q = Queue.Queue()
58 logger.debug("Listening for sensor data on " + uri)
59 proto = create_protocol(uri, receiver=True)
60 th = threading.Thread(None, recv_main, None, (proto, data_q, cmd_q))
61 th.daemon = True
62 th.start()
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030063
koder aka kdanilov168f6092015-04-19 02:33:38 +030064 def stop_thread():
65 cmd_q.put(None)
66 th.join()
67
68 return data_q, stop_thread