koder aka kdanilov | 4af1c1d | 2015-05-18 15:48:58 +0300 | [diff] [blame] | 1 | import os |
| 2 | import time |
| 3 | import json |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 4 | import logging |
koder aka kdanilov | 4af1c1d | 2015-05-18 15:48:58 +0300 | [diff] [blame] | 5 | import contextlib |
koder aka kdanilov | cff7b2e | 2015-04-18 20:48:15 +0300 | [diff] [blame] | 6 | |
koder aka kdanilov | 4af1c1d | 2015-05-18 15:48:58 +0300 | [diff] [blame] | 7 | from concurrent.futures import ThreadPoolExecutor |
| 8 | |
| 9 | from wally.ssh_utils import (copy_paths, run_over_ssh, |
| 10 | save_to_remote, read_from_remote) |
koder aka kdanilov | cff7b2e | 2015-04-18 20:48:15 +0300 | [diff] [blame] | 11 | |
| 12 | |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 13 | logger = logging.getLogger("wally.sensors") |
koder aka kdanilov | cff7b2e | 2015-04-18 20:48:15 +0300 | [diff] [blame] | 14 | |
| 15 | |
| 16 | class SensorConfig(object): |
Alyona Kiseleva | 7f6de4f | 2015-04-21 01:04:20 +0300 | [diff] [blame] | 17 | def __init__(self, conn, url, sensors, source_id, |
| 18 | monitor_url=None): |
koder aka kdanilov | cff7b2e | 2015-04-18 20:48:15 +0300 | [diff] [blame] | 19 | self.conn = conn |
| 20 | self.url = url |
| 21 | self.sensors = sensors |
koder aka kdanilov | 168f609 | 2015-04-19 02:33:38 +0300 | [diff] [blame] | 22 | self.source_id = source_id |
| 23 | self.monitor_url = monitor_url |
koder aka kdanilov | cff7b2e | 2015-04-18 20:48:15 +0300 | [diff] [blame] | 24 | |
| 25 | |
koder aka kdanilov | 4af1c1d | 2015-05-18 15:48:58 +0300 | [diff] [blame] | 26 | @contextlib.contextmanager |
| 27 | def with_sensors(sensor_configs, remote_path): |
| 28 | paths = {os.path.dirname(__file__): |
| 29 | os.path.join(remote_path, "sensors")} |
| 30 | config_remote_path = os.path.join(remote_path, "conf.json") |
| 31 | |
| 32 | def deploy_sensors(node_sensor_config): |
| 33 | copy_paths(node_sensor_config.conn, paths) |
| 34 | with node_sensor_config.conn.open_sftp() as sftp: |
| 35 | sensors_config = node_sensor_config.sensors.copy() |
| 36 | sensors_config['source_id'] = node_sensor_config.source_id |
| 37 | save_to_remote(sftp, config_remote_path, |
| 38 | json.dumps(sensors_config)) |
| 39 | |
| 40 | def remove_sensors(node_sensor_config): |
| 41 | run_over_ssh(node_sensor_config.conn, |
| 42 | "rm -rf {0}".format(remote_path), |
| 43 | node=node_sensor_config.url, timeout=10) |
| 44 | |
| 45 | logger.debug("Installing sensors on {0} nodes".format(len(sensor_configs))) |
| 46 | with ThreadPoolExecutor(max_workers=32) as executor: |
| 47 | list(executor.map(deploy_sensors, sensor_configs)) |
koder aka kdanilov | cff7b2e | 2015-04-18 20:48:15 +0300 | [diff] [blame] | 48 | try: |
koder aka kdanilov | 4af1c1d | 2015-05-18 15:48:58 +0300 | [diff] [blame] | 49 | yield |
| 50 | finally: |
| 51 | list(executor.map(remove_sensors, sensor_configs)) |
koder aka kdanilov | cff7b2e | 2015-04-18 20:48:15 +0300 | [diff] [blame] | 52 | |
koder aka kdanilov | 4af1c1d | 2015-05-18 15:48:58 +0300 | [diff] [blame] | 53 | |
| 54 | @contextlib.contextmanager |
| 55 | def sensors_info(sensor_configs, remote_path): |
| 56 | config_remote_path = os.path.join(remote_path, "conf.json") |
| 57 | |
| 58 | def start_sensors(node_sensor_config): |
| 59 | cmd_templ = 'env PYTHONPATH="{0}" python -m ' + \ |
| 60 | "sensors.main -d start -u {1} {2}" |
| 61 | |
| 62 | cmd = cmd_templ.format(remote_path, |
| 63 | node_sensor_config.monitor_url, |
| 64 | config_remote_path) |
| 65 | |
| 66 | run_over_ssh(node_sensor_config.conn, cmd, |
| 67 | node=node_sensor_config.url) |
| 68 | |
| 69 | def stop_and_gather_data(node_sensor_config): |
| 70 | cmd = 'env PYTHONPATH="{0}" python -m sensors.main -d stop' |
| 71 | cmd = cmd.format(remote_path) |
| 72 | run_over_ssh(node_sensor_config.conn, cmd, |
| 73 | node=node_sensor_config.url) |
| 74 | # some magic |
| 75 | time.sleep(1) |
| 76 | |
| 77 | assert node_sensor_config.monitor_url.startswith("csvfile://") |
| 78 | |
| 79 | res_path = node_sensor_config.monitor_url.split("//", 1)[1] |
| 80 | with node_sensor_config.conn.open_sftp() as sftp: |
| 81 | res = read_from_remote(sftp, res_path) |
| 82 | |
| 83 | return res |
| 84 | |
| 85 | results = [] |
| 86 | |
| 87 | logger.debug("Starting sensors on {0} nodes".format(len(sensor_configs))) |
| 88 | with ThreadPoolExecutor(max_workers=32) as executor: |
| 89 | list(executor.map(start_sensors, sensor_configs)) |
koder aka kdanilov | cff7b2e | 2015-04-18 20:48:15 +0300 | [diff] [blame] | 90 | try: |
koder aka kdanilov | 4af1c1d | 2015-05-18 15:48:58 +0300 | [diff] [blame] | 91 | yield results |
| 92 | finally: |
| 93 | results.extend(executor.map(stop_and_gather_data, sensor_configs)) |