blob: f66bb36e8c439880f3f28691d152d0c9bb9c0b8f [file] [log] [blame]
import Queue
import threading
from contextlib import contextmanager
from .deploy_sensors import (deploy_and_start_sensors,
stop_and_remove_sensors)
from .protocol import create_protocol, Timeout
__all__ = ['Empty', 'recv_main', 'start_monitoring',
'deploy_and_start_sensors', 'SensorConfig']
Empty = Queue.Empty
class SensorConfig(object):
def __init__(self, conn, url, sensors):
self.conn = conn
self.url = url
self.sensors = sensors
def recv_main(proto, data_q, cmd_q):
while True:
try:
data_q.put(proto.recv(0.1))
except Timeout:
pass
try:
val = cmd_q.get(False)
if val is None:
return
except Queue.Empty:
pass
@contextmanager
def start_monitoring(uri, configs):
deploy_and_start_sensors(uri, configs)
try:
data_q = Queue.Queue()
cmd_q = Queue.Queue()
proto = create_protocol(uri, receiver=True)
th = threading.Thread(None, recv_main, None, (proto, data_q, cmd_q))
th.daemon = True
th.start()
try:
yield data_q
finally:
cmd_q.put(None)
th.join()
finally:
stop_and_remove_sensors(configs)