blob: 249adfb3fe5aaa9142e8c64660537bf403b2062b [file] [log] [blame]
koder aka kdanilovcff7b2e2015-04-18 20:48:15 +03001import time
2import json
3import os.path
4import logging
5
6from concurrent.futures import ThreadPoolExecutor, wait
7
8from wally.ssh_utils import copy_paths, run_over_ssh
9
10logger = logging.getLogger('wally')
11
12
13def wait_all_ok(futures):
14 return all(future.result() for future in futures)
15
16
17def deploy_and_start_sensors(monitor_uri, sensor_configs,
18 remote_path='/tmp/sensors/sensors'):
19
20 paths = {os.path.dirname(__file__): remote_path}
21 with ThreadPoolExecutor(max_workers=32) as executor:
22 futures = []
23
24 for node_sensor_config in sensor_configs:
25 futures.append(executor.submit(deploy_and_start_sensor,
26 paths,
27 node_sensor_config,
28 monitor_uri,
29 remote_path))
30
31 if not wait_all_ok(futures):
32 raise RuntimeError("Sensor deployment fails on some nodes")
33
34
35def deploy_and_start_sensor(paths, node_sensor_config,
36 monitor_uri, remote_path):
37 try:
38 copy_paths(node_sensor_config.conn, paths)
39 sftp = node_sensor_config.conn.open_sftp()
40
41 config_remote_path = os.path.join(remote_path, "conf.json")
42
43 with sftp.open(config_remote_path, "w") as fd:
44 fd.write(json.dumps(node_sensor_config.sensors))
45
46 cmd_templ = 'env PYTHONPATH="{0}" python -m ' + \
47 "sensors.main -d start -u {1} {2}"
48
49 cmd = cmd_templ.format(os.path.dirname(remote_path),
50 monitor_uri,
51 config_remote_path)
52
53 run_over_ssh(node_sensor_config.conn, cmd,
54 node=node_sensor_config.url)
55 sftp.close()
56
57 except:
58 msg = "During deploing sensors in {0}".format(node_sensor_config.url)
59 logger.exception(msg)
60 return False
61 return True
62
63
64def stop_and_remove_sensor(conn, url, remote_path):
65 cmd = 'env PYTHONPATH="{0}" python -m sensors.main -d stop'
66
67 run_over_ssh(conn, cmd.format(remote_path), node=url)
68
69 # some magic
70 time.sleep(0.3)
71
72 conn.exec_command("rm -rf {0}".format(remote_path))
73
74 logger.debug("Sensors stopped and removed")
75
76
77def stop_and_remove_sensors(configs, remote_path='/tmp/sensors'):
78 with ThreadPoolExecutor(max_workers=32) as executor:
79 futures = []
80
81 for node_sensor_config in configs:
82 futures.append(executor.submit(stop_and_remove_sensor,
83 node_sensor_config.conn,
84 node_sensor_config.url,
85 remote_path))
86
87 wait(futures)