fix sensors deploy code
diff --git a/run_test.py b/run_test.py
index b424b76..8086308 100755
--- a/run_test.py
+++ b/run_test.py
@@ -1,5 +1,6 @@
import os
import sys
+import time
import Queue
import pprint
import logging
@@ -18,9 +19,9 @@
from nodes import discover
from nodes.node import Node
from config import cfg_dict, load_config
-from sensors.api import start_monitoring
from tests.itest import IOPerfTest, PgBenchTest
from formatters import format_results_for_console
+from sensors.api import start_monitoring, deploy_and_start_sensors
logger = logging.getLogger("io-perf-tool")
@@ -124,15 +125,15 @@
logger.info("All nodes connected successfully")
-def save_sensors_data(q):
+def save_sensors_data(q, fd):
logger.info("Start receiving sensors data")
- sensor_data = []
while True:
val = q.get()
if val is None:
- q.put(sensor_data)
+ q.put([])
break
- sensor_data.append(val)
+ fd.write("\n" + str(time.time()) + " : ")
+ fd.write(repr(val))
logger.info("Sensors thread exits")
@@ -245,14 +246,16 @@
ctx.sensors_control_queue = ctx.sensor_cm.__enter__()
+ fd = open(cfg_dict['sensor_storage'], "w")
th = threading.Thread(None, save_sensors_data, None,
- (ctx.sensors_control_queue,))
+ (ctx.sensors_control_queue, fd))
th.daemon = True
th.start()
ctx.sensor_listen_thread = th
def remove_sensors_stage(cfg, ctx):
+ ctx.sensor_cm.__exit__(None, None, None)
ctx.sensors_control_queue.put(None)
ctx.sensor_listen_thread.join()
ctx.sensor_data = ctx.sensors_control_queue.get()
@@ -327,12 +330,26 @@
connect_all(new_nodes)
+ # deploy sensors on new nodes
+ # unify this code
+ if 'sensors' in cfg:
+ sens_cfg = []
+ sensors_str = cfg["sensors"]["roles_mapping"]['testnode']
+ sensors = [sens.strip() for sens in sensors_str.split(",")]
+
+ collect_cfg = dict((sensor, {}) for sensor in sensors)
+ for node in new_nodes:
+ sens_cfg.append((node.connection, collect_cfg))
+
+ uri = cfg["sensors"]["receiver_uri"]
+ deploy_and_start_sensors(uri, None,
+ connected_config=sens_cfg)
+
for test_group in config.get('tests', []):
ctx.results.extend(run_tests(test_group, ctx.nodes))
finally:
- # shut_down_vms_stage(cfg, ctx)
- pass
+ shut_down_vms_stage(cfg, ctx)
elif 'tests' in key:
ctx.results.extend(run_tests(config, ctx.nodes))