pre-release bug fixes
diff --git a/run_test.py b/run_test.py
index 2e01394..60ffbbc 100755
--- a/run_test.py
+++ b/run_test.py
@@ -1,11 +1,9 @@
import os
import sys
-import json
import Queue
import pprint
import logging
import argparse
-import traceback
import threading
import collections
@@ -20,27 +18,65 @@
from nodes import discover
from nodes.node import Node
from config import cfg_dict, load_config
-from tests.itest import IOPerfTest, PgBenchTest
from sensors.api import start_monitoring
+from tests.itest import IOPerfTest, PgBenchTest
from formatters import format_results_for_console
logger = logging.getLogger("io-perf-tool")
+def color_me(color):
+ RESET_SEQ = "\033[0m"
+ COLOR_SEQ = "\033[1;%dm"
+
+ color_seq = COLOR_SEQ % (30 + color)
+
+ def closure(msg):
+ return color_seq + msg + RESET_SEQ
+ return closure
+
+
+class ColoredFormatter(logging.Formatter):
+ BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8)
+
+ colors = {
+ 'WARNING': color_me(YELLOW),
+ 'DEBUG': color_me(BLUE),
+ 'CRITICAL': color_me(YELLOW),
+ 'ERROR': color_me(RED)
+ }
+
+ def __init__(self, msg, use_color=True):
+ logging.Formatter.__init__(self, msg)
+ self.use_color = use_color
+
+ def format(self, record):
+ levelname = record.levelname
+
+ if levelname in self.colors:
+ record.levelname = self.colors[levelname](levelname)
+
+ return logging.Formatter.format(self, record)
+
+
def setup_logger(logger, level=logging.DEBUG, log_fname=None):
- # logger.setLevel(level)
+ logger.setLevel(logging.DEBUG)
sh = logging.StreamHandler()
sh.setLevel(level)
log_format = '%(asctime)s - %(levelname)-6s - %(name)s - %(message)s'
+ colored_formatter = ColoredFormatter(log_format,
+ "%H:%M:%S")
+
formatter = logging.Formatter(log_format,
"%H:%M:%S")
- sh.setFormatter(formatter)
+ sh.setFormatter(colored_formatter)
logger.addHandler(sh)
if log_fname is not None:
fh = logging.FileHandler(log_fname)
+ fh.setFormatter(formatter)
fh.setLevel(logging.DEBUG)
logger.addHandler(fh)
@@ -71,6 +107,7 @@
raise ValueError("Unknown url type {0}".format(node.conn_url))
except Exception:
logger.exception("During connect to {0}".format(node))
+ raise
def connect_all(nodes):
@@ -86,21 +123,21 @@
while True:
val = q.get()
if val is None:
- print sensor_data
q.put(sensor_data)
break
sensor_data.append(val)
logger.info("Sensors thread exits")
-def test_thread(test, node, barrier):
+def test_thread(test, node, barrier, res_q):
try:
logger.debug("Run preparation for {0}".format(node.conn_url))
test.pre_run(node.connection)
logger.debug("Run test for {0}".format(node.conn_url))
test.run(node.connection, barrier)
- except:
+ except Exception as exc:
logger.exception("In test {0} for node {1}".format(test, node))
+ res_q.put(exc)
def run_tests(config, nodes):
@@ -114,32 +151,44 @@
res_q = Queue.Queue()
- for test in config['tests']:
- for test in config['tests'][test]['tests']:
- for name, params in test.items():
- logger.info("Starting {0} tests".format(name))
+ for test_block in config:
+ for name, params in test_block.items():
+ logger.info("Starting {0} tests".format(name))
- threads = []
- barrier = utils.Barrier(len(test_nodes))
- for node in test_nodes:
- msg = "Starting {0} test on {1} node"
- logger.debug(msg.format(name, node.conn_url))
- test = tool_type_mapper[name](params, res_q.put)
- th = threading.Thread(None, test_thread, None,
- (test, node, barrier))
- threads.append(th)
- th.daemon = True
- th.start()
+ threads = []
+ barrier = utils.Barrier(len(test_nodes))
+ for node in test_nodes:
+ msg = "Starting {0} test on {1} node"
+ logger.debug(msg.format(name, node.conn_url))
+ test = tool_type_mapper[name](params, res_q.put)
+ th = threading.Thread(None, test_thread, None,
+ (test, node, barrier, res_q))
+ threads.append(th)
+ th.daemon = True
+ th.start()
- for th in threads:
- th.join()
-
- results = []
-
+ def gather_results(res_q, results):
while not res_q.empty():
- results.append(res_q.get())
+ val = res_q.get()
- yield name, test.merge_results(results)
+ if isinstance(val, Exception):
+ msg = "Exception during test execution: {0}"
+ raise ValueError(msg.format(val.message))
+
+ results.append(val)
+
+ results = []
+
+ while True:
+ for th in threads:
+ th.join(1)
+ gather_results(res_q, results)
+
+ if all(not th.is_alive() for th in threads):
+ break
+
+ gather_results(res_q, results)
+ yield name, test.merge_results(results)
def parse_args(argv):
@@ -184,7 +233,7 @@
def discover_stage(cfg, ctx):
- if 'discover' in cfg:
+ if cfg.get('discover') is not None:
discover_objs = [i.strip() for i in cfg['discover'].strip().split(",")]
ctx.nodes.extend(discover.discover(discover_objs, cfg['clouds']))
@@ -193,10 +242,10 @@
def deploy_sensors_stage(cfg_dict, ctx):
- ctx.clear_calls_stack.append(remove_sensors_stage)
if 'sensors' not in cfg_dict:
return
+ ctx.clear_calls_stack.append(remove_sensors_stage)
cfg = cfg_dict.get('sensors')
sens_cfg = []
@@ -272,7 +321,7 @@
connect_all(new_nodes)
if 'tests' in cfg:
- ctx.results.extend(run_tests(cfg_dict, ctx.nodes))
+ ctx.results.extend(run_tests(cfg['tests'], ctx.nodes))
def shut_down_vms_stage(cfg, ctx):
@@ -282,9 +331,10 @@
else:
nodes_ids = ctx.openstack_nodes_ids
- logger.info("Removing nodes")
- start_vms.clear_nodes(nodes_ids)
- logger.info("Nodes has been removed")
+ if len(nodes_ids) != 0:
+ logger.info("Removing nodes")
+ start_vms.clear_nodes(nodes_ids)
+ logger.info("Nodes has been removed")
if os.path.exists(vm_ids_fname):
os.remove(vm_ids_fname)
@@ -308,6 +358,8 @@
def disconnect_stage(cfg, ctx):
+ ssh_utils.close_all_sessions()
+
for node in ctx.nodes:
if node.connection is not None:
node.connection.close()
@@ -352,21 +404,23 @@
def report_stage(cfg, ctx):
- output_dest = cfg.get('output_dest')
+ # html_report = report.make_io_report(ctx.results)
+ # html_rep_fname = cfg['html_report_file']
- if output_dest is not None:
- if output_dest.endswith(".html"):
- report.render_html_results(ctx, output_dest)
- logger.info("Results were stored in %s" % output_dest)
- else:
- with open(output_dest, "w") as fd:
- data = {"sensor_data": ctx.sensor_data,
- "results": ctx.results}
- fd.write(json.dumps(data))
- else:
- print "=" * 20 + " RESULTS " + "=" * 20
- pprint.pprint(ctx.results)
- print "=" * 60
+ # with open(html_rep_fname, "w") as fd:
+ # fd.write(html_report)
+
+ # logger.info("Html report were stored in " + html_rep_fname)
+
+ text_rep_fname = cfg_dict['text_report_file']
+ with open(text_rep_fname, "w") as fd:
+ for tp, data in ctx.results:
+ if 'io' == tp:
+ fd.write(format_results_for_console(data))
+ fd.write("\n")
+ fd.flush()
+
+ logger.info("Text report were stored in " + text_rep_fname)
def complete_log_nodes_statistic(cfg, ctx):
@@ -389,15 +443,13 @@
stages = [
load_data_from(opts.post_process_only),
console_report_stage,
- # report_stage
+ report_stage
]
else:
stages = [
discover_stage,
log_nodes_statistic,
- complete_log_nodes_statistic,
connect_stage,
- complete_log_nodes_statistic,
deploy_sensors_stage,
run_tests_stage,
store_raw_results_stage,
@@ -410,7 +462,8 @@
level = logging.DEBUG if opts.extra_logs else logging.WARNING
setup_logger(logger, level, cfg_dict['log_file'])
- logger.info("Store all info into {0}".format(cfg_dict['var_dir']))
+ logger.info("All info would be stored into {0}".format(
+ cfg_dict['var_dir']))
ctx = Context()
ctx.build_meta['build_id'] = opts.build_id
@@ -434,6 +487,7 @@
if exc is not None:
raise exc, cls, tb
+ logger.info("All info stotored into {0}".format(cfg_dict['var_dir']))
return 0