large commit. refactoring, fio code totally reworker, huge improvenent in test time and results, etc
diff --git a/run_test.py b/run_test.py
index d1a9f4f..992c7eb 100755
--- a/run_test.py
+++ b/run_test.py
@@ -1,4 +1,5 @@
import sys
+import json
import Queue
import pprint
import logging
@@ -10,10 +11,11 @@
import utils
import ssh_utils
+import start_vms
from nodes import discover
from nodes.node import Node
from config import cfg_dict
-from itest import IOPerfTest, PgBenchTest
+from tests.itest import IOPerfTest, PgBenchTest
from sensors.api import start_monitoring
@@ -57,15 +59,19 @@
logger.info("Connecting to nodes")
with ThreadPoolExecutor(32) as pool:
list(pool.map(connect_one, nodes))
+ logger.info("All nodes connected successfully")
def save_sensors_data(q):
logger.info("Start receiving sensors data")
+ sensor_data = []
while True:
val = q.get()
if val is None:
+ print sensor_data
+ q.put(sensor_data)
break
- # logger.debug("Sensors -> {0!r}".format(val))
+ sensor_data.append(val)
logger.info("Sensors thread exits")
@@ -90,26 +96,30 @@
res_q = Queue.Queue()
- for name, params in config['tests'].items():
- logger.info("Starting {0} tests".format(name))
+ for test in config['tests']:
+ for name, params in test.items():
+ logger.info("Starting {0} tests".format(name))
- threads = []
- barrier = utils.Barrier(len(test_nodes))
- for node in test_nodes:
- msg = "Starting {0} test on {1} node"
- logger.debug(msg.format(name, node.conn_url))
- test = tool_type_mapper[name](params, res_q.put)
- th = threading.Thread(None, test_thread, None,
- (test, node, barrier))
- threads.append(th)
- th.daemon = True
- th.start()
+ threads = []
+ barrier = utils.Barrier(len(test_nodes))
+ for node in test_nodes:
+ msg = "Starting {0} test on {1} node"
+ logger.debug(msg.format(name, node.conn_url))
+ test = tool_type_mapper[name](params, res_q.put)
+ th = threading.Thread(None, test_thread, None,
+ (test, node, barrier))
+ threads.append(th)
+ th.daemon = True
+ th.start()
- for th in threads:
- th.join()
+ for th in threads:
+ th.join()
- while not res_q.empty():
- logger.info("Get test result {0!r}".format(res_q.get()))
+ results = []
+ while not res_q.empty():
+ results.append(res_q.get())
+ # logger.info("Get test result {0!r}".format(results[-1]))
+ yield name, results
def parse_args(argv):
@@ -120,14 +130,14 @@
action='store_true', default=False,
help="print some extra log info")
- parser.add_argument('stages', nargs="+",
- choices=["discover", "connect", "start_new_nodes",
- "deploy_sensors", "run_tests"])
+ parser.add_argument("-o", '--output-dest', nargs="*")
+ parser.add_argument("config_file", nargs="?", default="config.yaml")
return parser.parse_args(argv[1:])
-def log_nodes_statistic(nodes):
+def log_nodes_statistic(_, ctx):
+ nodes = ctx.nodes
logger.info("Found {0} nodes total".format(len(nodes)))
per_role = collections.defaultdict(lambda: 0)
for node in nodes:
@@ -142,65 +152,132 @@
pass
+def connect_stage(cfg, ctx):
+ ctx.clear_calls_stack.append(disconnect_stage)
+ connect_all(ctx.nodes)
+
+
+def discover_stage(cfg, ctx):
+ if 'discover' in cfg:
+ discover_objs = [i.strip() for i in cfg['discover'].strip().split(",")]
+ ctx.nodes.extend(discover.discover(discover_objs, cfg['clouds']))
+
+ for url, roles in cfg.get('explicit_nodes', {}).items():
+ ctx.nodes.append(Node(url, roles.split(",")))
+
+
+def deploy_sensors_stage(cfg_dict, ctx):
+ ctx.clear_calls_stack.append(remove_sensors_stage)
+ if 'sensors' not in cfg_dict:
+ return
+
+ cfg = cfg_dict.get('sensors')
+ sens_cfg = []
+
+ for role, sensors_str in cfg["roles_mapping"].items():
+ sensors = [sens.strip() for sens in sensors_str.split(",")]
+
+ collect_cfg = dict((sensor, {}) for sensor in sensors)
+
+ for node in ctx.nodes:
+ if role in node.roles:
+ sens_cfg.append((node.connection, collect_cfg))
+
+ log_sensors_config(sens_cfg)
+
+ ctx.sensor_cm = start_monitoring(cfg["receiver_uri"], None,
+ connected_config=sens_cfg)
+
+ ctx.sensors_control_queue = ctx.sensor_cm.__enter__()
+
+ th = threading.Thread(None, save_sensors_data, None,
+ (ctx.sensors_control_queue,))
+ th.daemon = True
+ th.start()
+ ctx.sensor_listen_thread = th
+
+
+def remove_sensors_stage(cfg, ctx):
+ ctx.sensors_control_queue.put(None)
+ ctx.sensor_listen_thread.join()
+ ctx.sensor_data = ctx.sensors_control_queue.get()
+
+
+def run_tests_stage(cfg, ctx):
+ ctx.results = []
+
+ if 'tests' in cfg:
+ ctx.results.extend(run_tests(cfg_dict, ctx.nodes))
+
+ # if 'start_test_nodes' in opts.stages:
+ # params = cfg_dict['start_test_nodes']['openstack']
+ # for new_node in start_vms.launch_vms(params):
+ # new_node.roles.append('testnode')
+ # nodes.append(new_node)
+
+
+def disconnect_stage(cfg, ctx):
+ for node in ctx.nodes:
+ if node.connection is not None:
+ node.connection.close()
+
+
+def report_stage(cfg, ctx):
+ output_dest = cfg.get('output_dest')
+ if output_dest is not None:
+ with open(output_dest, "w") as fd:
+ data = {"sensor_data": ctx.sensor_data,
+ "results": ctx.results}
+ fd.write(json.dumps(data))
+ else:
+ print "=" * 20 + " RESULTS " + "=" * 20
+ pprint.pprint(ctx.results)
+ print "=" * 60
+
+
+def complete_log_nodes_statistic(cfg, ctx):
+ nodes = ctx.nodes
+ for node in nodes:
+ logger.debug(str(node))
+
+
+class Context(object):
+ def __init__(self):
+ self.nodes = []
+ self.clear_calls_stack = []
+
+
def main(argv):
opts = parse_args(argv)
level = logging.DEBUG if opts.extra_logs else logging.WARNING
setup_logger(logger, level)
- nodes = []
+ stages = [
+ discover_stage,
+ connect_stage,
+ complete_log_nodes_statistic,
+ # deploy_sensors_stage,
+ run_tests_stage,
+ report_stage
+ ]
- if 'discover' in opts.stages:
- logger.info("Start node discovery")
- nodes = discover.discover(cfg_dict.get('discover'))
+ ctx = Context()
+ try:
+ for stage in stages:
+ logger.info("Start {0.__name__} stage".format(stage))
+ stage(cfg_dict, ctx)
+ finally:
+ exc, cls, tb = sys.exc_info()
+ for stage in ctx.clear_calls_stack[::-1]:
+ try:
+ logger.info("Start {0.__name__} stage".format(stage))
+ stage(cfg_dict, ctx)
+ except:
+ pass
- if 'explicit_nodes' in cfg_dict:
- for url, roles in cfg_dict['explicit_nodes'].items():
- nodes.append(Node(url, roles.split(",")))
-
- log_nodes_statistic(nodes)
-
- if 'connect' in opts.stages:
- connect_all(nodes)
-
- if 'deploy_sensors' in opts.stages:
- logger.info("Deploing sensors")
- cfg = cfg_dict.get('sensors')
- sens_cfg = []
-
- for role, sensors_str in cfg["roles_mapping"].items():
- sensors = [sens.strip() for sens in sensors_str.split(",")]
-
- collect_cfg = dict((sensor, {}) for sensor in sensors)
-
- for node in nodes:
- if role in node.roles:
- sens_cfg.append((node.connection, collect_cfg))
-
- log_sensors_config(sens_cfg)
-
- sensor_cm = start_monitoring(cfg["receiver_uri"], None,
- connected_config=sens_cfg)
-
- with sensor_cm as sensors_control_queue:
- th = threading.Thread(None, save_sensors_data, None,
- (sensors_control_queue,))
- th.daemon = True
- th.start()
-
- # TODO: wait till all nodes start to send sensors data
-
- if 'run_tests' in opts.stages:
- run_tests(cfg_dict, nodes)
-
- sensors_control_queue.put(None)
- th.join()
- elif 'run_tests' in opts.stages:
- run_tests(cfg_dict, nodes)
-
- logger.info("Disconnecting")
- for node in nodes:
- node.connection.close()
+ if exc is not None:
+ raise exc, cls, tb
return 0