a lot of chenges
diff --git a/wally/sensors/api.py b/wally/sensors/api.py
index e8c6261..52d33ed 100644
--- a/wally/sensors/api.py
+++ b/wally/sensors/api.py
@@ -1,21 +1,15 @@
-import Queue
+import os
+import time
+import json
import logging
-import threading
+import contextlib
-from .deploy_sensors import (deploy_and_start_sensors,
- stop_and_remove_sensors)
-from .protocol import create_protocol, Timeout, CantUnpack
+from concurrent.futures import ThreadPoolExecutor
+
+from wally.ssh_utils import (copy_paths, run_over_ssh,
+ save_to_remote, read_from_remote)
-__all__ = ['Empty', 'recv_main',
- 'deploy_and_start_sensors',
- 'SensorConfig',
- 'stop_and_remove_sensors',
- 'start_listener_thread',
- ]
-
-
-Empty = Queue.Empty
logger = logging.getLogger("wally.sensors")
@@ -29,40 +23,71 @@
self.monitor_url = monitor_url
-def recv_main(proto, data_q, cmd_q):
- while True:
+@contextlib.contextmanager
+def with_sensors(sensor_configs, remote_path):
+ paths = {os.path.dirname(__file__):
+ os.path.join(remote_path, "sensors")}
+ config_remote_path = os.path.join(remote_path, "conf.json")
+
+ def deploy_sensors(node_sensor_config):
+ copy_paths(node_sensor_config.conn, paths)
+ with node_sensor_config.conn.open_sftp() as sftp:
+ sensors_config = node_sensor_config.sensors.copy()
+ sensors_config['source_id'] = node_sensor_config.source_id
+ save_to_remote(sftp, config_remote_path,
+ json.dumps(sensors_config))
+
+ def remove_sensors(node_sensor_config):
+ run_over_ssh(node_sensor_config.conn,
+ "rm -rf {0}".format(remote_path),
+ node=node_sensor_config.url, timeout=10)
+
+ logger.debug("Installing sensors on {0} nodes".format(len(sensor_configs)))
+ with ThreadPoolExecutor(max_workers=32) as executor:
+ list(executor.map(deploy_sensors, sensor_configs))
try:
- ip, packet = proto.recv(0.1)
- if packet is not None:
- data_q.put((ip, packet))
- except AssertionError as exc:
- logger.warning("Error in sensor data " + str(exc))
- except Timeout:
- pass
- except CantUnpack as exc:
- print exc
+ yield
+ finally:
+ list(executor.map(remove_sensors, sensor_configs))
+
+@contextlib.contextmanager
+def sensors_info(sensor_configs, remote_path):
+ config_remote_path = os.path.join(remote_path, "conf.json")
+
+ def start_sensors(node_sensor_config):
+ cmd_templ = 'env PYTHONPATH="{0}" python -m ' + \
+ "sensors.main -d start -u {1} {2}"
+
+ cmd = cmd_templ.format(remote_path,
+ node_sensor_config.monitor_url,
+ config_remote_path)
+
+ run_over_ssh(node_sensor_config.conn, cmd,
+ node=node_sensor_config.url)
+
+ def stop_and_gather_data(node_sensor_config):
+ cmd = 'env PYTHONPATH="{0}" python -m sensors.main -d stop'
+ cmd = cmd.format(remote_path)
+ run_over_ssh(node_sensor_config.conn, cmd,
+ node=node_sensor_config.url)
+ # some magic
+ time.sleep(1)
+
+ assert node_sensor_config.monitor_url.startswith("csvfile://")
+
+ res_path = node_sensor_config.monitor_url.split("//", 1)[1]
+ with node_sensor_config.conn.open_sftp() as sftp:
+ res = read_from_remote(sftp, res_path)
+
+ return res
+
+ results = []
+
+ logger.debug("Starting sensors on {0} nodes".format(len(sensor_configs)))
+ with ThreadPoolExecutor(max_workers=32) as executor:
+ list(executor.map(start_sensors, sensor_configs))
try:
- val = cmd_q.get(False)
-
- if val is None:
- return
-
- except Queue.Empty:
- pass
-
-
-def start_listener_thread(uri):
- data_q = Queue.Queue()
- cmd_q = Queue.Queue()
- logger.debug("Listening for sensor data on " + uri)
- proto = create_protocol(uri, receiver=True)
- th = threading.Thread(None, recv_main, None, (proto, data_q, cmd_q))
- th.daemon = True
- th.start()
-
- def stop_thread():
- cmd_q.put(None)
- th.join()
-
- return data_q, stop_thread
+ yield results
+ finally:
+ results.extend(executor.map(stop_and_gather_data, sensor_configs))