a lot of chenges
diff --git a/wally/sensors/api.py b/wally/sensors/api.py
index e8c6261..52d33ed 100644
--- a/wally/sensors/api.py
+++ b/wally/sensors/api.py
@@ -1,21 +1,15 @@
-import Queue
+import os
+import time
+import json
import logging
-import threading
+import contextlib
-from .deploy_sensors import (deploy_and_start_sensors,
- stop_and_remove_sensors)
-from .protocol import create_protocol, Timeout, CantUnpack
+from concurrent.futures import ThreadPoolExecutor
+
+from wally.ssh_utils import (copy_paths, run_over_ssh,
+ save_to_remote, read_from_remote)
-__all__ = ['Empty', 'recv_main',
- 'deploy_and_start_sensors',
- 'SensorConfig',
- 'stop_and_remove_sensors',
- 'start_listener_thread',
- ]
-
-
-Empty = Queue.Empty
logger = logging.getLogger("wally.sensors")
@@ -29,40 +23,71 @@
self.monitor_url = monitor_url
-def recv_main(proto, data_q, cmd_q):
- while True:
+@contextlib.contextmanager
+def with_sensors(sensor_configs, remote_path):
+ paths = {os.path.dirname(__file__):
+ os.path.join(remote_path, "sensors")}
+ config_remote_path = os.path.join(remote_path, "conf.json")
+
+ def deploy_sensors(node_sensor_config):
+ copy_paths(node_sensor_config.conn, paths)
+ with node_sensor_config.conn.open_sftp() as sftp:
+ sensors_config = node_sensor_config.sensors.copy()
+ sensors_config['source_id'] = node_sensor_config.source_id
+ save_to_remote(sftp, config_remote_path,
+ json.dumps(sensors_config))
+
+ def remove_sensors(node_sensor_config):
+ run_over_ssh(node_sensor_config.conn,
+ "rm -rf {0}".format(remote_path),
+ node=node_sensor_config.url, timeout=10)
+
+ logger.debug("Installing sensors on {0} nodes".format(len(sensor_configs)))
+ with ThreadPoolExecutor(max_workers=32) as executor:
+ list(executor.map(deploy_sensors, sensor_configs))
try:
- ip, packet = proto.recv(0.1)
- if packet is not None:
- data_q.put((ip, packet))
- except AssertionError as exc:
- logger.warning("Error in sensor data " + str(exc))
- except Timeout:
- pass
- except CantUnpack as exc:
- print exc
+ yield
+ finally:
+ list(executor.map(remove_sensors, sensor_configs))
+
+@contextlib.contextmanager
+def sensors_info(sensor_configs, remote_path):
+ config_remote_path = os.path.join(remote_path, "conf.json")
+
+ def start_sensors(node_sensor_config):
+ cmd_templ = 'env PYTHONPATH="{0}" python -m ' + \
+ "sensors.main -d start -u {1} {2}"
+
+ cmd = cmd_templ.format(remote_path,
+ node_sensor_config.monitor_url,
+ config_remote_path)
+
+ run_over_ssh(node_sensor_config.conn, cmd,
+ node=node_sensor_config.url)
+
+ def stop_and_gather_data(node_sensor_config):
+ cmd = 'env PYTHONPATH="{0}" python -m sensors.main -d stop'
+ cmd = cmd.format(remote_path)
+ run_over_ssh(node_sensor_config.conn, cmd,
+ node=node_sensor_config.url)
+ # some magic
+ time.sleep(1)
+
+ assert node_sensor_config.monitor_url.startswith("csvfile://")
+
+ res_path = node_sensor_config.monitor_url.split("//", 1)[1]
+ with node_sensor_config.conn.open_sftp() as sftp:
+ res = read_from_remote(sftp, res_path)
+
+ return res
+
+ results = []
+
+ logger.debug("Starting sensors on {0} nodes".format(len(sensor_configs)))
+ with ThreadPoolExecutor(max_workers=32) as executor:
+ list(executor.map(start_sensors, sensor_configs))
try:
- val = cmd_q.get(False)
-
- if val is None:
- return
-
- except Queue.Empty:
- pass
-
-
-def start_listener_thread(uri):
- data_q = Queue.Queue()
- cmd_q = Queue.Queue()
- logger.debug("Listening for sensor data on " + uri)
- proto = create_protocol(uri, receiver=True)
- th = threading.Thread(None, recv_main, None, (proto, data_q, cmd_q))
- th.daemon = True
- th.start()
-
- def stop_thread():
- cmd_q.put(None)
- th.join()
-
- return data_q, stop_thread
+ yield results
+ finally:
+ results.extend(executor.map(stop_and_gather_data, sensor_configs))
diff --git a/wally/sensors/deploy_sensors.py b/wally/sensors/deploy_sensors.py
index 4a1c5df..82ab21a 100644
--- a/wally/sensors/deploy_sensors.py
+++ b/wally/sensors/deploy_sensors.py
@@ -3,9 +3,8 @@
import os.path
import logging
-from concurrent.futures import ThreadPoolExecutor, wait
-
-from wally.ssh_utils import copy_paths, run_over_ssh
+from wally.ssh_utils import (copy_paths, run_over_ssh,
+ save_to_remote, read_from_remote)
logger = logging.getLogger('wally.sensors')
@@ -34,25 +33,23 @@
def deploy_and_start_sensor(paths, node_sensor_config, remote_path):
try:
copy_paths(node_sensor_config.conn, paths)
- sftp = node_sensor_config.conn.open_sftp()
+ with node_sensor_config.conn.open_sftp() as sftp:
+ config_remote_path = os.path.join(remote_path, "conf.json")
- config_remote_path = os.path.join(remote_path, "conf.json")
+ sensors_config = node_sensor_config.sensors.copy()
+ sensors_config['source_id'] = node_sensor_config.source_id
+ with sftp.open(config_remote_path, "w") as fd:
+ fd.write(json.dumps(sensors_config))
- sensors_config = node_sensor_config.sensors.copy()
- sensors_config['source_id'] = node_sensor_config.source_id
- with sftp.open(config_remote_path, "w") as fd:
- fd.write(json.dumps(sensors_config))
+ cmd_templ = 'env PYTHONPATH="{0}" python -m ' + \
+ "sensors.main -d start -u {1} {2}"
- cmd_templ = 'env PYTHONPATH="{0}" python -m ' + \
- "sensors.main -d start -u {1} {2}"
+ cmd = cmd_templ.format(os.path.dirname(remote_path),
+ node_sensor_config.monitor_url,
+ config_remote_path)
- cmd = cmd_templ.format(os.path.dirname(remote_path),
- node_sensor_config.monitor_url,
- config_remote_path)
-
- run_over_ssh(node_sensor_config.conn, cmd,
- node=node_sensor_config.url)
- sftp.close()
+ run_over_ssh(node_sensor_config.conn, cmd,
+ node=node_sensor_config.url)
except:
msg = "During deploing sensors on {0}".format(node_sensor_config.url)
diff --git a/wally/sensors/influx_exporter.py b/wally/sensors/influx_exporter.py
deleted file mode 100644
index 34b3c0a..0000000
--- a/wally/sensors/influx_exporter.py
+++ /dev/null
@@ -1,31 +0,0 @@
-from urlparse import urlparse
-from influxdb import InfluxDBClient
-
-
-def connect(url):
- parsed_url = urlparse(url)
- user_passwd, host_port = parsed_url.netloc.rsplit("@", 1)
- user, passwd = user_passwd.split(":", 1)
- host, port = host_port.split(":")
- return InfluxDBClient(host, int(port), user, passwd, parsed_url.path[1:])
-
-
-def add_data(conn, hostname, data):
- per_sensor_data = {}
- for serie in data:
- serie = serie.copy()
- gtime = serie.pop('time')
- for key, val in serie.items():
- dev, sensor = key.split('.')
- data = per_sensor_data.setdefault(sensor, [])
- data.append([gtime, hostname, dev, val])
-
- infl_data = []
- columns = ['time', 'host', 'device', 'value']
- for sensor_name, points in per_sensor_data.items():
- infl_data.append(
- {'columns': columns,
- 'name': sensor_name,
- 'points': points})
-
- conn.write_points(infl_data)
diff --git a/wally/sensors/main.py b/wally/sensors/main.py
index 2d0bc81..20eedc5 100644
--- a/wally/sensors/main.py
+++ b/wally/sensors/main.py
@@ -35,7 +35,9 @@
def parse_args(args):
parser = argparse.ArgumentParser()
parser.add_argument('-d', '--daemon',
- choices=('start', 'stop', 'status'),
+ choices=('start', 'stop', 'status',
+ 'start_monitoring', 'stop_monitoring',
+ 'dump_ram_data'),
default=None)
parser.add_argument('-u', '--url', default='stdout://')
diff --git a/wally/sensors/protocol.py b/wally/sensors/protocol.py
index c053011..7c8aa0e 100644
--- a/wally/sensors/protocol.py
+++ b/wally/sensors/protocol.py
@@ -188,6 +188,7 @@
def send(self, data):
if self.headers is None:
self.headers = sorted(data)
+ self.headers.remove('source_id')
for pos, header in enumerate(self.headers):
self.line_format += "{%s:>%s}" % (pos,
@@ -197,6 +198,7 @@
print self.line_format.format(*self.headers)
if self.delta:
+
vals = [data[header].value - self.prev.get(header, 0)
for header in self.headers]
@@ -219,7 +221,7 @@
class CSVFileTransport(ITransport):
- required_keys = set(['time', 'source_id', 'hostname'])
+ required_keys = set(['time', 'source_id'])
def __init__(self, receiver, fname):
ITransport.__init__(self, receiver)
@@ -234,10 +236,25 @@
assert self.required_keys.issubset(keys)
keys -= self.required_keys
self.field_list = sorted(keys)
- self.csv_fd.writerow([data['source_id'], data['hostname']] +
+ self.csv_fd.writerow([data['source_id'], socket.getfqdn()] +
self.field_list)
+ self.field_list = ['time'] + self.field_list
- self.csv_fd.writerow(map(data.__getitem__, ['time'] + self.field_list))
+ self.csv_fd.writerow([data[sens].value for sens in self.field_list])
+
+
+class RAMTransport(ITransport):
+ def __init__(self, next_tr):
+ self.next = next_tr
+ self.data = []
+
+ def send(self, data):
+ self.data.append(data)
+
+ def flush(self):
+ for data in self.data:
+ self.next.send(data)
+ self.data = []
class UDPTransport(ITransport):
@@ -269,10 +286,11 @@
def create_protocol(uri, receiver=False):
- parsed_uri = urlparse(uri)
- if parsed_uri.scheme == 'stdout':
+ if uri == 'stdout':
return StdoutTransport(receiver)
- elif parsed_uri.scheme == 'udp':
+
+ parsed_uri = urlparse(uri)
+ if parsed_uri.scheme == 'udp':
ip, port = parsed_uri.netloc.split(":")
if receiver:
@@ -286,6 +304,9 @@
return FileTransport(receiver, parsed_uri.path)
elif parsed_uri.scheme == 'csvfile':
return CSVFileTransport(receiver, parsed_uri.path)
+ elif parsed_uri.scheme == 'ram':
+ intenal_recv = CSVFileTransport(receiver, parsed_uri.path)
+ return RAMTransport(intenal_recv)
else:
templ = "Can't instantiate transport from {0!r}"
raise ValueError(templ.format(uri))