fixing, improve sersors installation code
diff --git a/wally/sensors/api.py b/wally/sensors/api.py
index f66bb36..6eed90a 100644
--- a/wally/sensors/api.py
+++ b/wally/sensors/api.py
@@ -1,24 +1,31 @@
import Queue
+import logging
import threading
-from contextlib import contextmanager
from .deploy_sensors import (deploy_and_start_sensors,
stop_and_remove_sensors)
from .protocol import create_protocol, Timeout
-__all__ = ['Empty', 'recv_main', 'start_monitoring',
- 'deploy_and_start_sensors', 'SensorConfig']
+__all__ = ['Empty', 'recv_main',
+ 'deploy_and_start_sensors',
+ 'SensorConfig',
+ 'stop_and_remove_sensors',
+ 'start_listener_thread',
+ ]
Empty = Queue.Empty
+logger = logging.getLogger("wally.sensors")
class SensorConfig(object):
- def __init__(self, conn, url, sensors):
+ def __init__(self, conn, url, sensors, source_id, monitor_url=None):
self.conn = conn
self.url = url
self.sensors = sensors
+ self.source_id = source_id
+ self.monitor_url = monitor_url
def recv_main(proto, data_q, cmd_q):
@@ -38,21 +45,17 @@
pass
-@contextmanager
-def start_monitoring(uri, configs):
- deploy_and_start_sensors(uri, configs)
- try:
- data_q = Queue.Queue()
- cmd_q = Queue.Queue()
- proto = create_protocol(uri, receiver=True)
- th = threading.Thread(None, recv_main, None, (proto, data_q, cmd_q))
- th.daemon = True
- th.start()
+def start_listener_thread(uri):
+ data_q = Queue.Queue()
+ cmd_q = Queue.Queue()
+ logger.debug("Listening for sensor data on " + uri)
+ proto = create_protocol(uri, receiver=True)
+ th = threading.Thread(None, recv_main, None, (proto, data_q, cmd_q))
+ th.daemon = True
+ th.start()
- try:
- yield data_q
- finally:
- cmd_q.put(None)
- th.join()
- finally:
- stop_and_remove_sensors(configs)
+ def stop_thread():
+ cmd_q.put(None)
+ th.join()
+
+ return data_q, stop_thread
diff --git a/wally/sensors/cp_protocol.py b/wally/sensors/cp_protocol.py
index 4e96afe..1b6993c 100644
--- a/wally/sensors/cp_protocol.py
+++ b/wally/sensors/cp_protocol.py
@@ -8,7 +8,7 @@
import binascii
-logger = logging.getLogger("wally")
+logger = logging.getLogger("wally.sensors")
# protocol contains 2 type of packet:
diff --git a/wally/sensors/deploy_sensors.py b/wally/sensors/deploy_sensors.py
index 249adfb..73e7902 100644
--- a/wally/sensors/deploy_sensors.py
+++ b/wally/sensors/deploy_sensors.py
@@ -7,14 +7,14 @@
from wally.ssh_utils import copy_paths, run_over_ssh
-logger = logging.getLogger('wally')
+logger = logging.getLogger('wally.sensors')
def wait_all_ok(futures):
return all(future.result() for future in futures)
-def deploy_and_start_sensors(monitor_uri, sensor_configs,
+def deploy_and_start_sensors(sensor_configs,
remote_path='/tmp/sensors/sensors'):
paths = {os.path.dirname(__file__): remote_path}
@@ -25,29 +25,29 @@
futures.append(executor.submit(deploy_and_start_sensor,
paths,
node_sensor_config,
- monitor_uri,
remote_path))
if not wait_all_ok(futures):
raise RuntimeError("Sensor deployment fails on some nodes")
-def deploy_and_start_sensor(paths, node_sensor_config,
- monitor_uri, remote_path):
+def deploy_and_start_sensor(paths, node_sensor_config, remote_path):
try:
copy_paths(node_sensor_config.conn, paths)
sftp = node_sensor_config.conn.open_sftp()
config_remote_path = os.path.join(remote_path, "conf.json")
+ sensors_config = node_sensor_config.sensors.copy()
+ sensors_config['source_id'] = node_sensor_config.source_id
with sftp.open(config_remote_path, "w") as fd:
- fd.write(json.dumps(node_sensor_config.sensors))
+ fd.write(json.dumps(sensors_config))
cmd_templ = 'env PYTHONPATH="{0}" python -m ' + \
"sensors.main -d start -u {1} {2}"
cmd = cmd_templ.format(os.path.dirname(remote_path),
- monitor_uri,
+ node_sensor_config.monitor_url,
config_remote_path)
run_over_ssh(node_sensor_config.conn, cmd,
@@ -55,7 +55,7 @@
sftp.close()
except:
- msg = "During deploing sensors in {0}".format(node_sensor_config.url)
+ msg = "During deploing sensors on {0}".format(node_sensor_config.url)
logger.exception(msg)
return False
return True
@@ -69,9 +69,8 @@
# some magic
time.sleep(0.3)
- conn.exec_command("rm -rf {0}".format(remote_path))
-
- logger.debug("Sensors stopped and removed")
+ # logger.warning("Sensors don't removed")
+ run_over_ssh(conn, "rm -rf {0}".format(remote_path), node=url)
def stop_and_remove_sensors(configs, remote_path='/tmp/sensors'):
@@ -85,3 +84,4 @@
remote_path))
wait(futures)
+ logger.debug("Sensors stopped and removed")
diff --git a/wally/sensors/main.py b/wally/sensors/main.py
index 3753e7c..e86bbed 100644
--- a/wally/sensors/main.py
+++ b/wally/sensors/main.py
@@ -51,6 +51,11 @@
prev = {}
while True:
+ try:
+ source_id = str(required_sensors.pop('source_id'))
+ except KeyError:
+ source_id = None
+
gtime, data = get_values(required_sensors.items())
curr = {'time': SensorInfo(gtime, True)}
for name, val in data.items():
@@ -60,6 +65,10 @@
prev[name] = val.value
else:
curr[name] = SensorInfo(val.value, False)
+
+ if source_id is not None:
+ curr['source_id'] = source_id
+
sender.send(curr)
time.sleep(opts.timeout)
diff --git a/wally/sensors/protocol.py b/wally/sensors/protocol.py
index c2ace01..7688f31 100644
--- a/wally/sensors/protocol.py
+++ b/wally/sensors/protocol.py
@@ -25,7 +25,12 @@
class PickleSerializer(ISensortResultsSerializer):
def pack(self, data):
- ndata = {key: val.value for key, val in data.items()}
+ ndata = {}
+ for key, val in data.items():
+ if isinstance(val, basestring):
+ ndata[key] = val
+ else:
+ ndata[key] = val.value
return pickle.dumps(ndata)
def unpack(self, data):