koder aka kdanilov | cff7b2e | 2015-04-18 20:48:15 +0300 | [diff] [blame] | 1 | import time |
| 2 | import json |
| 3 | import os.path |
| 4 | import logging |
| 5 | |
| 6 | from concurrent.futures import ThreadPoolExecutor, wait |
| 7 | |
| 8 | from wally.ssh_utils import copy_paths, run_over_ssh |
| 9 | |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 10 | logger = logging.getLogger('wally.sensors') |
koder aka kdanilov | cff7b2e | 2015-04-18 20:48:15 +0300 | [diff] [blame] | 11 | |
| 12 | |
| 13 | def wait_all_ok(futures): |
| 14 | return all(future.result() for future in futures) |
| 15 | |
| 16 | |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 17 | def deploy_and_start_sensors(sensor_configs, |
koder aka kdanilov | cff7b2e | 2015-04-18 20:48:15 +0300 | [diff] [blame] | 18 | remote_path='/tmp/sensors/sensors'): |
| 19 | |
| 20 | paths = {os.path.dirname(__file__): remote_path} |
| 21 | with ThreadPoolExecutor(max_workers=32) as executor: |
| 22 | futures = [] |
| 23 | |
| 24 | for node_sensor_config in sensor_configs: |
| 25 | futures.append(executor.submit(deploy_and_start_sensor, |
| 26 | paths, |
| 27 | node_sensor_config, |
koder aka kdanilov | cff7b2e | 2015-04-18 20:48:15 +0300 | [diff] [blame] | 28 | remote_path)) |
| 29 | |
| 30 | if not wait_all_ok(futures): |
| 31 | raise RuntimeError("Sensor deployment fails on some nodes") |
| 32 | |
| 33 | |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 34 | def deploy_and_start_sensor(paths, node_sensor_config, remote_path): |
koder aka kdanilov | cff7b2e | 2015-04-18 20:48:15 +0300 | [diff] [blame] | 35 | try: |
| 36 | copy_paths(node_sensor_config.conn, paths) |
| 37 | sftp = node_sensor_config.conn.open_sftp() |
| 38 | |
| 39 | config_remote_path = os.path.join(remote_path, "conf.json") |
| 40 | |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 41 | sensors_config = node_sensor_config.sensors.copy() |
| 42 | sensors_config['source_id'] = node_sensor_config.source_id |
koder aka kdanilov | cff7b2e | 2015-04-18 20:48:15 +0300 | [diff] [blame] | 43 | with sftp.open(config_remote_path, "w") as fd: |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 44 | fd.write(json.dumps(sensors_config)) |
koder aka kdanilov | cff7b2e | 2015-04-18 20:48:15 +0300 | [diff] [blame] | 45 | |
| 46 | cmd_templ = 'env PYTHONPATH="{0}" python -m ' + \ |
| 47 | "sensors.main -d start -u {1} {2}" |
| 48 | |
| 49 | cmd = cmd_templ.format(os.path.dirname(remote_path), |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 50 | node_sensor_config.monitor_url, |
koder aka kdanilov | cff7b2e | 2015-04-18 20:48:15 +0300 | [diff] [blame] | 51 | config_remote_path) |
| 52 | |
| 53 | run_over_ssh(node_sensor_config.conn, cmd, |
| 54 | node=node_sensor_config.url) |
| 55 | sftp.close() |
| 56 | |
| 57 | except: |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 58 | msg = "During deploing sensors on {0}".format(node_sensor_config.url) |
koder aka kdanilov | cff7b2e | 2015-04-18 20:48:15 +0300 | [diff] [blame] | 59 | logger.exception(msg) |
| 60 | return False |
| 61 | return True |
| 62 | |
| 63 | |
| 64 | def stop_and_remove_sensor(conn, url, remote_path): |
koder aka kdanilov | abd6ead | 2015-04-24 02:03:07 +0300 | [diff] [blame^] | 65 | try: |
| 66 | cmd = 'env PYTHONPATH="{0}" python -m sensors.main -d stop' |
| 67 | cmd = cmd.format(remote_path) |
| 68 | run_over_ssh(conn, cmd, node=url) |
| 69 | # some magic |
| 70 | time.sleep(0.3) |
koder aka kdanilov | cff7b2e | 2015-04-18 20:48:15 +0300 | [diff] [blame] | 71 | |
koder aka kdanilov | abd6ead | 2015-04-24 02:03:07 +0300 | [diff] [blame^] | 72 | # logger.warning("Sensors don't removed") |
| 73 | run_over_ssh(conn, "rm -rf {0}".format(remote_path), node=url) |
| 74 | except Exception as exc: |
| 75 | msg = "Failed to remove sensors from node {0}: {1!s}" |
| 76 | logger.error(msg.format(url, exc)) |
koder aka kdanilov | cff7b2e | 2015-04-18 20:48:15 +0300 | [diff] [blame] | 77 | |
| 78 | |
| 79 | def stop_and_remove_sensors(configs, remote_path='/tmp/sensors'): |
| 80 | with ThreadPoolExecutor(max_workers=32) as executor: |
| 81 | futures = [] |
| 82 | |
| 83 | for node_sensor_config in configs: |
| 84 | futures.append(executor.submit(stop_and_remove_sensor, |
| 85 | node_sensor_config.conn, |
| 86 | node_sensor_config.url, |
| 87 | remote_path)) |
| 88 | |
| 89 | wait(futures) |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 90 | logger.debug("Sensors stopped and removed") |