koder aka kdanilov | dda86d3 | 2015-03-16 11:20:04 +0200 | [diff] [blame] | 1 | import time |
| 2 | import json |
| 3 | import os.path |
| 4 | |
Yulia Portnova | 0e64ea2 | 2015-03-20 17:27:22 +0200 | [diff] [blame] | 5 | from ssh_copy_directory import copy_paths |
| 6 | from ssh_runner import connect |
koder aka kdanilov | dda86d3 | 2015-03-16 11:20:04 +0200 | [diff] [blame] | 7 | |
| 8 | from concurrent.futures import ThreadPoolExecutor, wait |
| 9 | |
| 10 | |
| 11 | def wait_all_ok(futures): |
| 12 | return all(future.result() for future in futures) |
| 13 | |
| 14 | |
| 15 | def deploy_and_start_sensors(monitor_uri, config, remote_path='/tmp/sensors'): |
| 16 | paths = {os.path.dirname(__file__): remote_path} |
| 17 | with ThreadPoolExecutor(max_workers=32) as executor: |
| 18 | futures = [] |
| 19 | |
| 20 | for uri, config in config.items(): |
| 21 | futures.append(executor.submit(deploy_and_start_sensor, |
| 22 | paths, uri, monitor_uri, |
| 23 | config, remote_path)) |
| 24 | |
| 25 | if not wait_all_ok(futures): |
| 26 | raise RuntimeError("Sensor deployment fails on some nodes") |
| 27 | |
| 28 | |
| 29 | def deploy_and_start_sensor(paths, uri, monitor_uri, config, remote_path): |
| 30 | try: |
| 31 | conn = connect(uri) |
| 32 | copy_paths(conn, paths) |
| 33 | sftp = conn.open_sftp() |
| 34 | |
| 35 | config_remote_path = os.path.join(remote_path, "conf.json") |
| 36 | main_remote_path = os.path.join(remote_path, "main.py") |
| 37 | |
| 38 | with sftp.open(config_remote_path, "w") as fd: |
| 39 | fd.write(json.dumps(config)) |
| 40 | |
| 41 | cmd_templ = "python {0} -d start -u {1} {2}" |
| 42 | cmd = cmd_templ.format(main_remote_path, |
| 43 | monitor_uri, |
| 44 | config_remote_path) |
| 45 | conn.exec_command(cmd) |
| 46 | sftp.close() |
| 47 | conn.close() |
koder aka kdanilov | e4ade1a | 2015-03-16 20:44:16 +0200 | [diff] [blame] | 48 | except: |
koder aka kdanilov | dda86d3 | 2015-03-16 11:20:04 +0200 | [diff] [blame] | 49 | return False |
| 50 | return True |
| 51 | |
| 52 | |
| 53 | def stop_and_remove_sensor(uri, remote_path): |
| 54 | conn = connect(uri) |
| 55 | main_remote_path = os.path.join(remote_path, "main.py") |
| 56 | |
| 57 | cmd_templ = "python {0} -d stop" |
| 58 | conn.exec_command(cmd_templ.format(main_remote_path)) |
koder aka kdanilov | e4ade1a | 2015-03-16 20:44:16 +0200 | [diff] [blame] | 59 | |
| 60 | # some magic |
koder aka kdanilov | dda86d3 | 2015-03-16 11:20:04 +0200 | [diff] [blame] | 61 | time.sleep(0.3) |
koder aka kdanilov | dda86d3 | 2015-03-16 11:20:04 +0200 | [diff] [blame] | 62 | |
| 63 | conn.exec_command("rm -rf {0}".format(remote_path)) |
| 64 | |
| 65 | conn.close() |
| 66 | |
| 67 | |
| 68 | def stop_and_remove_sensors(config, remote_path='/tmp/sensors'): |
| 69 | with ThreadPoolExecutor(max_workers=32) as executor: |
| 70 | futures = [] |
| 71 | |
| 72 | for uri, config in config.items(): |
| 73 | futures.append(executor.submit(stop_and_remove_sensor, |
| 74 | uri, remote_path)) |
| 75 | |
| 76 | wait(futures) |