blob: b7e29b3897f5932ab4fab8374ba83a8c2097ccf3 [file] [log] [blame]
import time
import json
import os.path
from disk_perf_test_tool.ssh_copy_directory import copy_paths
from disk_perf_test_tool.ssh_runner import connect
from concurrent.futures import ThreadPoolExecutor, wait
def wait_all_ok(futures):
return all(future.result() for future in futures)
def deploy_and_start_sensors(monitor_uri, config, remote_path='/tmp/sensors'):
paths = {os.path.dirname(__file__): remote_path}
with ThreadPoolExecutor(max_workers=32) as executor:
futures = []
for uri, config in config.items():
futures.append(executor.submit(deploy_and_start_sensor,
paths, uri, monitor_uri,
config, remote_path))
if not wait_all_ok(futures):
raise RuntimeError("Sensor deployment fails on some nodes")
def deploy_and_start_sensor(paths, uri, monitor_uri, config, remote_path):
try:
conn = connect(uri)
copy_paths(conn, paths)
sftp = conn.open_sftp()
config_remote_path = os.path.join(remote_path, "conf.json")
main_remote_path = os.path.join(remote_path, "main.py")
with sftp.open(config_remote_path, "w") as fd:
fd.write(json.dumps(config))
cmd_templ = "python {0} -d start -u {1} {2}"
cmd = cmd_templ.format(main_remote_path,
monitor_uri,
config_remote_path)
conn.exec_command(cmd)
sftp.close()
conn.close()
except Exception as exc:
print exc
return False
return True
def stop_and_remove_sensor(uri, remote_path):
conn = connect(uri)
main_remote_path = os.path.join(remote_path, "main.py")
cmd_templ = "python {0} -d stop"
conn.exec_command(cmd_templ.format(main_remote_path))
time.sleep(0.3)
# print out.read(), err.read()
conn.exec_command("rm -rf {0}".format(remote_path))
conn.close()
def stop_and_remove_sensors(config, remote_path='/tmp/sensors'):
with ThreadPoolExecutor(max_workers=32) as executor:
futures = []
for uri, config in config.items():
futures.append(executor.submit(stop_and_remove_sensor,
uri, remote_path))
wait(futures)