blob: 52d33ed87acdb91ddccc4abbc105240cd22b2512 [file] [log] [blame]
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +03001import os
2import time
3import json
koder aka kdanilov168f6092015-04-19 02:33:38 +03004import logging
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +03005import contextlib
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +03006
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +03007from concurrent.futures import ThreadPoolExecutor
8
9from wally.ssh_utils import (copy_paths, run_over_ssh,
10 save_to_remote, read_from_remote)
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030011
12
koder aka kdanilov168f6092015-04-19 02:33:38 +030013logger = logging.getLogger("wally.sensors")
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030014
15
16class SensorConfig(object):
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +030017 def __init__(self, conn, url, sensors, source_id,
18 monitor_url=None):
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030019 self.conn = conn
20 self.url = url
21 self.sensors = sensors
koder aka kdanilov168f6092015-04-19 02:33:38 +030022 self.source_id = source_id
23 self.monitor_url = monitor_url
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030024
25
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030026@contextlib.contextmanager
27def with_sensors(sensor_configs, remote_path):
28 paths = {os.path.dirname(__file__):
29 os.path.join(remote_path, "sensors")}
30 config_remote_path = os.path.join(remote_path, "conf.json")
31
32 def deploy_sensors(node_sensor_config):
33 copy_paths(node_sensor_config.conn, paths)
34 with node_sensor_config.conn.open_sftp() as sftp:
35 sensors_config = node_sensor_config.sensors.copy()
36 sensors_config['source_id'] = node_sensor_config.source_id
37 save_to_remote(sftp, config_remote_path,
38 json.dumps(sensors_config))
39
40 def remove_sensors(node_sensor_config):
41 run_over_ssh(node_sensor_config.conn,
42 "rm -rf {0}".format(remote_path),
43 node=node_sensor_config.url, timeout=10)
44
45 logger.debug("Installing sensors on {0} nodes".format(len(sensor_configs)))
46 with ThreadPoolExecutor(max_workers=32) as executor:
47 list(executor.map(deploy_sensors, sensor_configs))
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030048 try:
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030049 yield
50 finally:
51 list(executor.map(remove_sensors, sensor_configs))
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030052
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030053
54@contextlib.contextmanager
55def sensors_info(sensor_configs, remote_path):
56 config_remote_path = os.path.join(remote_path, "conf.json")
57
58 def start_sensors(node_sensor_config):
59 cmd_templ = 'env PYTHONPATH="{0}" python -m ' + \
60 "sensors.main -d start -u {1} {2}"
61
62 cmd = cmd_templ.format(remote_path,
63 node_sensor_config.monitor_url,
64 config_remote_path)
65
66 run_over_ssh(node_sensor_config.conn, cmd,
67 node=node_sensor_config.url)
68
69 def stop_and_gather_data(node_sensor_config):
70 cmd = 'env PYTHONPATH="{0}" python -m sensors.main -d stop'
71 cmd = cmd.format(remote_path)
72 run_over_ssh(node_sensor_config.conn, cmd,
73 node=node_sensor_config.url)
74 # some magic
75 time.sleep(1)
76
77 assert node_sensor_config.monitor_url.startswith("csvfile://")
78
79 res_path = node_sensor_config.monitor_url.split("//", 1)[1]
80 with node_sensor_config.conn.open_sftp() as sftp:
81 res = read_from_remote(sftp, res_path)
82
83 return res
84
85 results = []
86
87 logger.debug("Starting sensors on {0} nodes".format(len(sensor_configs)))
88 with ThreadPoolExecutor(max_workers=32) as executor:
89 list(executor.map(start_sensors, sensor_configs))
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030090 try:
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030091 yield results
92 finally:
93 results.extend(executor.map(stop_and_gather_data, sensor_configs))