blob: 68b2e7dfa9ba75d4f267b6714f6e999b3247cba6 [file] [log] [blame]
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +03001import os
2import time
3import json
koder aka kdanilov168f6092015-04-19 02:33:38 +03004import logging
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +03005import contextlib
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +03006
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +03007from concurrent.futures import ThreadPoolExecutor
8
9from wally.ssh_utils import (copy_paths, run_over_ssh,
10 save_to_remote, read_from_remote)
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030011
12
koder aka kdanilov168f6092015-04-19 02:33:38 +030013logger = logging.getLogger("wally.sensors")
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030014
15
16class SensorConfig(object):
Alyona Kiseleva7f6de4f2015-04-21 01:04:20 +030017 def __init__(self, conn, url, sensors, source_id,
18 monitor_url=None):
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030019 self.conn = conn
20 self.url = url
21 self.sensors = sensors
koder aka kdanilov168f6092015-04-19 02:33:38 +030022 self.source_id = source_id
23 self.monitor_url = monitor_url
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030024
25
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030026@contextlib.contextmanager
27def with_sensors(sensor_configs, remote_path):
28 paths = {os.path.dirname(__file__):
29 os.path.join(remote_path, "sensors")}
30 config_remote_path = os.path.join(remote_path, "conf.json")
31
32 def deploy_sensors(node_sensor_config):
koder aka kdanilovbb5fe072015-05-21 02:50:23 +030033 # check that path already exists
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030034 copy_paths(node_sensor_config.conn, paths)
35 with node_sensor_config.conn.open_sftp() as sftp:
36 sensors_config = node_sensor_config.sensors.copy()
37 sensors_config['source_id'] = node_sensor_config.source_id
38 save_to_remote(sftp, config_remote_path,
39 json.dumps(sensors_config))
40
41 def remove_sensors(node_sensor_config):
42 run_over_ssh(node_sensor_config.conn,
43 "rm -rf {0}".format(remote_path),
44 node=node_sensor_config.url, timeout=10)
45
46 logger.debug("Installing sensors on {0} nodes".format(len(sensor_configs)))
47 with ThreadPoolExecutor(max_workers=32) as executor:
48 list(executor.map(deploy_sensors, sensor_configs))
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030049 try:
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030050 yield
51 finally:
52 list(executor.map(remove_sensors, sensor_configs))
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030053
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030054
55@contextlib.contextmanager
56def sensors_info(sensor_configs, remote_path):
57 config_remote_path = os.path.join(remote_path, "conf.json")
58
59 def start_sensors(node_sensor_config):
60 cmd_templ = 'env PYTHONPATH="{0}" python -m ' + \
61 "sensors.main -d start -u {1} {2}"
62
63 cmd = cmd_templ.format(remote_path,
64 node_sensor_config.monitor_url,
65 config_remote_path)
66
67 run_over_ssh(node_sensor_config.conn, cmd,
68 node=node_sensor_config.url)
69
70 def stop_and_gather_data(node_sensor_config):
71 cmd = 'env PYTHONPATH="{0}" python -m sensors.main -d stop'
72 cmd = cmd.format(remote_path)
73 run_over_ssh(node_sensor_config.conn, cmd,
74 node=node_sensor_config.url)
75 # some magic
76 time.sleep(1)
77
78 assert node_sensor_config.monitor_url.startswith("csvfile://")
79
80 res_path = node_sensor_config.monitor_url.split("//", 1)[1]
81 with node_sensor_config.conn.open_sftp() as sftp:
82 res = read_from_remote(sftp, res_path)
83
84 return res
85
86 results = []
87
88 logger.debug("Starting sensors on {0} nodes".format(len(sensor_configs)))
89 with ThreadPoolExecutor(max_workers=32) as executor:
90 list(executor.map(start_sensors, sensor_configs))
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +030091 try:
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +030092 yield results
93 finally:
94 results.extend(executor.map(stop_and_gather_data, sensor_configs))