koder aka kdanilov | 4af1c1d | 2015-05-18 15:48:58 +0300 | [diff] [blame] | 1 | import os |
| 2 | import time |
| 3 | import json |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 4 | import logging |
koder aka kdanilov | 4af1c1d | 2015-05-18 15:48:58 +0300 | [diff] [blame] | 5 | import contextlib |
koder aka kdanilov | cff7b2e | 2015-04-18 20:48:15 +0300 | [diff] [blame] | 6 | |
koder aka kdanilov | 4af1c1d | 2015-05-18 15:48:58 +0300 | [diff] [blame] | 7 | from concurrent.futures import ThreadPoolExecutor |
| 8 | |
| 9 | from wally.ssh_utils import (copy_paths, run_over_ssh, |
| 10 | save_to_remote, read_from_remote) |
koder aka kdanilov | cff7b2e | 2015-04-18 20:48:15 +0300 | [diff] [blame] | 11 | |
| 12 | |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 13 | logger = logging.getLogger("wally.sensors") |
koder aka kdanilov | cff7b2e | 2015-04-18 20:48:15 +0300 | [diff] [blame] | 14 | |
| 15 | |
| 16 | class SensorConfig(object): |
Alyona Kiseleva | 7f6de4f | 2015-04-21 01:04:20 +0300 | [diff] [blame] | 17 | def __init__(self, conn, url, sensors, source_id, |
| 18 | monitor_url=None): |
koder aka kdanilov | cff7b2e | 2015-04-18 20:48:15 +0300 | [diff] [blame] | 19 | self.conn = conn |
| 20 | self.url = url |
| 21 | self.sensors = sensors |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 22 | self.source_id = source_id |
| 23 | self.monitor_url = monitor_url |
koder aka kdanilov | cff7b2e | 2015-04-18 20:48:15 +0300 | [diff] [blame] | 24 | |
| 25 | |
koder aka kdanilov | 4af1c1d | 2015-05-18 15:48:58 +0300 | [diff] [blame] | 26 | @contextlib.contextmanager |
| 27 | def with_sensors(sensor_configs, remote_path): |
| 28 | paths = {os.path.dirname(__file__): |
| 29 | os.path.join(remote_path, "sensors")} |
| 30 | config_remote_path = os.path.join(remote_path, "conf.json") |
| 31 | |
| 32 | def deploy_sensors(node_sensor_config): |
koder aka kdanilov | bb5fe07 | 2015-05-21 02:50:23 +0300 | [diff] [blame] | 33 | # check that path already exists |
koder aka kdanilov | 4af1c1d | 2015-05-18 15:48:58 +0300 | [diff] [blame] | 34 | copy_paths(node_sensor_config.conn, paths) |
| 35 | with node_sensor_config.conn.open_sftp() as sftp: |
| 36 | sensors_config = node_sensor_config.sensors.copy() |
| 37 | sensors_config['source_id'] = node_sensor_config.source_id |
| 38 | save_to_remote(sftp, config_remote_path, |
| 39 | json.dumps(sensors_config)) |
| 40 | |
| 41 | def remove_sensors(node_sensor_config): |
| 42 | run_over_ssh(node_sensor_config.conn, |
| 43 | "rm -rf {0}".format(remote_path), |
| 44 | node=node_sensor_config.url, timeout=10) |
| 45 | |
| 46 | logger.debug("Installing sensors on {0} nodes".format(len(sensor_configs))) |
| 47 | with ThreadPoolExecutor(max_workers=32) as executor: |
| 48 | list(executor.map(deploy_sensors, sensor_configs)) |
koder aka kdanilov | cff7b2e | 2015-04-18 20:48:15 +0300 | [diff] [blame] | 49 | try: |
koder aka kdanilov | 4af1c1d | 2015-05-18 15:48:58 +0300 | [diff] [blame] | 50 | yield |
| 51 | finally: |
| 52 | list(executor.map(remove_sensors, sensor_configs)) |
koder aka kdanilov | cff7b2e | 2015-04-18 20:48:15 +0300 | [diff] [blame] | 53 | |
koder aka kdanilov | 4af1c1d | 2015-05-18 15:48:58 +0300 | [diff] [blame] | 54 | |
| 55 | @contextlib.contextmanager |
| 56 | def sensors_info(sensor_configs, remote_path): |
| 57 | config_remote_path = os.path.join(remote_path, "conf.json") |
| 58 | |
| 59 | def start_sensors(node_sensor_config): |
| 60 | cmd_templ = 'env PYTHONPATH="{0}" python -m ' + \ |
| 61 | "sensors.main -d start -u {1} {2}" |
| 62 | |
| 63 | cmd = cmd_templ.format(remote_path, |
| 64 | node_sensor_config.monitor_url, |
| 65 | config_remote_path) |
| 66 | |
| 67 | run_over_ssh(node_sensor_config.conn, cmd, |
| 68 | node=node_sensor_config.url) |
| 69 | |
| 70 | def stop_and_gather_data(node_sensor_config): |
| 71 | cmd = 'env PYTHONPATH="{0}" python -m sensors.main -d stop' |
| 72 | cmd = cmd.format(remote_path) |
| 73 | run_over_ssh(node_sensor_config.conn, cmd, |
| 74 | node=node_sensor_config.url) |
| 75 | # some magic |
| 76 | time.sleep(1) |
| 77 | |
| 78 | assert node_sensor_config.monitor_url.startswith("csvfile://") |
| 79 | |
| 80 | res_path = node_sensor_config.monitor_url.split("//", 1)[1] |
| 81 | with node_sensor_config.conn.open_sftp() as sftp: |
| 82 | res = read_from_remote(sftp, res_path) |
| 83 | |
| 84 | return res |
| 85 | |
| 86 | results = [] |
| 87 | |
| 88 | logger.debug("Starting sensors on {0} nodes".format(len(sensor_configs))) |
| 89 | with ThreadPoolExecutor(max_workers=32) as executor: |
| 90 | list(executor.map(start_sensors, sensor_configs)) |
koder aka kdanilov | cff7b2e | 2015-04-18 20:48:15 +0300 | [diff] [blame] | 91 | try: |
koder aka kdanilov | 4af1c1d | 2015-05-18 15:48:58 +0300 | [diff] [blame] | 92 | yield results |
| 93 | finally: |
| 94 | results.extend(executor.map(stop_and_gather_data, sensor_configs)) |