a lot of chenges
diff --git a/wally/run_test.py b/wally/run_test.py
index 5322432..22856b5 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -10,7 +10,6 @@
import argparse
import functools
import threading
-import subprocess
import contextlib
import collections
@@ -23,9 +22,15 @@
from wally.discover import discover, Node
from wally.timeseries import SensorDatastore
from wally import utils, report, ssh_utils, start_vms
+from wally.suits import IOPerfTest, PgBenchTest, MysqlTest
from wally.config import cfg_dict, load_config, setup_loggers
-from wally.suits.itest import IOPerfTest, PgBenchTest, MysqlTest
-from wally.sensors_utils import deploy_sensors_stage, gather_sensors_stage
+from wally.sensors_utils import with_sensors_util, sensors_info_util
+
+TOOL_TYPE_MAPPER = {
+ "io": IOPerfTest,
+ "pgbench": PgBenchTest,
+ "mysql": MysqlTest,
+}
try:
@@ -174,7 +179,8 @@
node=node,
remote_dir=rem_folder,
log_directory=dr,
- coordination_queue=coord_q)
+ coordination_queue=coord_q,
+ total_nodes_count=len(test_nodes))
th = threading.Thread(None, test_thread, None,
(test, node, barrier, res_q))
threads.append(th)
@@ -213,19 +219,33 @@
results.append(val)
- results = test_cls.merge_results(results)
return results
-def run_tests(cfg, test_block, nodes):
- tool_type_mapper = {
- "io": IOPerfTest,
- "pgbench": PgBenchTest,
- "mysql": MysqlTest,
- }
+def suspend_vm_nodes(unused_nodes):
+ pausable_nodes_ids = [node.os_vm_id for node in unused_nodes
+ if node.os_vm_id is not None]
+ non_pausable = len(unused_nodes) - len(pausable_nodes_ids)
+ if 0 != non_pausable:
+ logger.warning("Can't pause {0} nodes".format(
+ non_pausable))
+
+ if len(pausable_nodes_ids) != 0:
+ logger.debug("Try to pause {0} unused nodes".format(
+ len(pausable_nodes_ids)))
+ start_vms.pause(pausable_nodes_ids)
+
+ return pausable_nodes_ids
+
+
+def run_tests(cfg, test_block, nodes):
test_nodes = [node for node in nodes
if 'testnode' in node.roles]
+
+ not_test_nodes = [node for node in nodes
+ if 'testnode' not in node.roles]
+
if len(test_nodes) == 0:
logger.error("No test nodes found")
return
@@ -252,18 +272,7 @@
continue
if cfg.get('suspend_unused_vms', True):
- pausable_nodes_ids = [node.os_vm_id for node in unused_nodes
- if node.os_vm_id is not None]
- non_pausable = len(unused_nodes) - len(pausable_nodes_ids)
-
- if 0 != non_pausable:
- logger.warning("Can't pause {0} nodes".format(
- non_pausable))
-
- if len(pausable_nodes_ids) != 0:
- logger.debug("Try to pause {0} unused nodes".format(
- len(pausable_nodes_ids)))
- start_vms.pause(pausable_nodes_ids)
+ pausable_nodes_ids = suspend_vm_nodes(unused_nodes)
resumable_nodes_ids = [node.os_vm_id for node in curr_test_nodes
if node.os_vm_id is not None]
@@ -273,12 +282,16 @@
len(resumable_nodes_ids)))
start_vms.unpause(resumable_nodes_ids)
- test_cls = tool_type_mapper[name]
+ test_cls = TOOL_TYPE_MAPPER[name]
try:
- res = run_single_test(curr_test_nodes, name, test_cls,
- params,
- cfg['default_test_local_folder'],
- cfg['run_uuid'])
+ sens_nodes = curr_test_nodes + not_test_nodes
+ with sensors_info_util(cfg, sens_nodes) as sensor_data:
+ t_start = time.time()
+ res = run_single_test(curr_test_nodes, name, test_cls,
+ params,
+ cfg['default_test_local_folder'],
+ cfg['run_uuid'])
+ t_end = time.time()
finally:
if cfg.get('suspend_unused_vms', True):
if len(pausable_nodes_ids) != 0:
@@ -286,7 +299,14 @@
len(pausable_nodes_ids)))
start_vms.unpause(pausable_nodes_ids)
- results.append(res)
+ if sensor_data is not None:
+ fname = "{0}_{1}.csv".format(int(t_start), int(t_end))
+ fpath = os.path.join(cfg['sensor_storage'], fname)
+
+ with open(fpath, "w") as fd:
+ fd.write("\n\n".join(sensor_data))
+
+ results.extend(res)
yield name, results
@@ -365,7 +385,8 @@
for creds in p:
vm_name_pattern, conn_pattern = creds.split(",")
- try:
+ msg = "Vm like {0} lookup failed".format(vm_name_pattern)
+ with utils.log_error(msg):
msg = "Looking for vm with name like {0}".format(vm_name_pattern)
logger.debug(msg)
@@ -379,12 +400,6 @@
node = Node(conn_pattern.format(ip=ip), ['testnode'])
node.os_vm_id = vm_id
ctx.nodes.append(node)
- except utils.StopTestError:
- raise
- except Exception as exc:
- msg = "Vm like {0} lookup failed".format(vm_name_pattern)
- logger.exception(msg)
- raise utils.StopTestError(msg, exc)
def get_creds_openrc(path):
@@ -392,24 +407,19 @@
echo = 'echo "$OS_TENANT_NAME:$OS_USERNAME:$OS_PASSWORD@$OS_AUTH_URL"'
- try:
+ msg = "Failed to get creads from openrc file"
+ with utils.log_error(msg):
data = utils.run_locally(['/bin/bash'],
input_data=fc + "\n" + echo)
- except subprocess.CalledProcessError as exc:
- msg = "Failed to get creads from openrc file: " + data
- logger.exception(msg)
- raise utils.StopTestError(msg, exc)
- try:
+ msg = "Failed to get creads from openrc file: " + data
+ with utils.log_error(msg):
data = data.strip()
user, tenant, passwd_auth_url = data.split(':', 2)
passwd, auth_url = passwd_auth_url.rsplit("@", 1)
assert (auth_url.startswith("https://") or
auth_url.startswith("http://"))
- except Exception as exc:
- msg = "Failed to get creads from openrc file: " + data
- logger.exception(msg)
- raise utils.StopTestError(msg, exc)
+
return user, passwd, tenant, auth_url
@@ -512,6 +522,7 @@
num_test_nodes)
with vm_ctx as new_nodes:
if len(new_nodes) != 0:
+ logger.debug("Connecting to new nodes")
connect_all(new_nodes, True)
for node in new_nodes:
@@ -519,17 +530,13 @@
msg = "Failed to connect to vm {0}"
raise RuntimeError(msg.format(node.get_conn_id()))
- deploy_sensors_stage(cfg_dict,
- ctx,
- nodes=new_nodes,
- undeploy=False)
-
- for test_group in config.get('tests', []):
- test_res = run_tests(cfg, test_group, ctx.nodes)
- ctx.results.extend(test_res)
+ with with_sensors_util(cfg_dict, ctx.nodes):
+ for test_group in config.get('tests', []):
+ ctx.results.extend(run_tests(cfg, test_group,
+ ctx.nodes))
else:
- test_res = run_tests(cfg, group, ctx.nodes)
- ctx.results.extend(test_res)
+ with with_sensors_util(cfg_dict, ctx.nodes):
+ ctx.results.extend(run_tests(cfg, group, ctx.nodes))
def shut_down_vms_stage(cfg, ctx):
@@ -595,9 +602,7 @@
fd.flush()
logger.info("Text report were stored in " + text_rep_fname)
- print("\n")
- print(IOPerfTest.format_for_console(data, dinfo))
- print("\n")
+ print("\n" + rep + "\n")
if tp in ['mysql', 'pgbench'] and data is not None:
print("\n")
@@ -618,7 +623,6 @@
found = True
dinfo = report.process_disk_info(data)
report.make_io_report(dinfo, data, html_rep_fname,
- cfg['charts_img_path'],
lab_info=ctx.hw_info)
@@ -629,9 +633,13 @@
def load_data_from(var_dir):
- def load_data_from_file(cfg, ctx):
+ def load_data_from_file(_, ctx):
raw_results = os.path.join(var_dir, 'raw_results.yaml')
- ctx.results = yaml.load(open(raw_results).read())
+ ctx.results = []
+ for tp, results in yaml.load(open(raw_results).read()):
+ cls = TOOL_TYPE_MAPPER[tp]
+ ctx.results.append((tp, map(cls.load, results)))
+
return load_data_from_file
@@ -681,17 +689,25 @@
return parser.parse_args(argv[1:])
-# from plop.collector import Collector
+def get_stage_name(func):
+ if func.__name__.endswith("stage"):
+ return func.__name__
+ else:
+ return func.__name__ + " stage"
def main(argv):
- # collector = Collector()
- # collector.start()
-
faulthandler.register(signal.SIGUSR1, all_threads=True)
opts = parse_args(argv)
load_config(opts.config_file, opts.post_process_only)
+ if cfg_dict.get('logging', {}).get("extra_logs", False) or opts.extra_logs:
+ level = logging.DEBUG
+ else:
+ level = logging.WARNING
+
+ setup_loggers(level, cfg_dict['log_file'])
+
if opts.post_process_only is not None:
stages = [
load_data_from(opts.post_process_only)
@@ -711,10 +727,10 @@
stages.append(collect_hw_info_stage)
stages.extend([
- deploy_sensors_stage,
+ # deploy_sensors_stage,
run_tests_stage,
store_raw_results_stage,
- gather_sensors_stage
+ # gather_sensors_stage
])
report_stages = [
@@ -724,13 +740,6 @@
if not opts.no_html_report:
report_stages.append(html_report_stage)
- if cfg_dict.get('logging', {}).get("extra_logs", False) or opts.extra_logs:
- level = logging.DEBUG
- else:
- level = logging.WARNING
-
- setup_loggers(level, cfg_dict['log_file'])
-
logger.info("All info would be stored into {0}".format(
cfg_dict['var_dir']))
@@ -753,10 +762,7 @@
try:
for stage in stages:
- if stage.__name__.endswith("stage"):
- logger.info("Start {0.__name__}".format(stage))
- else:
- logger.info("Start {0.__name__} stage".format(stage))
+ logger.info("Start " + get_stage_name(stage))
stage(cfg_dict, ctx)
except utils.StopTestError as exc:
logger.error(msg_templ.format(stage, exc))
@@ -766,10 +772,7 @@
exc, cls, tb = sys.exc_info()
for stage in ctx.clear_calls_stack[::-1]:
try:
- if stage.__name__.endswith("stage"):
- logger.info("Start {0.__name__}".format(stage))
- else:
- logger.info("Start {0.__name__} stage".format(stage))
+ logger.info("Start " + get_stage_name(stage))
stage(cfg_dict, ctx)
except utils.StopTestError as cleanup_exc:
logger.error(msg_templ.format(stage, cleanup_exc))
@@ -779,14 +782,16 @@
logger.debug("Start utils.cleanup")
for clean_func, args, kwargs in utils.iter_clean_func():
try:
+ logger.info("Start " + get_stage_name(clean_func))
clean_func(*args, **kwargs)
except utils.StopTestError as cleanup_exc:
- logger.error(msg_templ.format(stage, cleanup_exc))
+ logger.error(msg_templ.format(clean_func, cleanup_exc))
except Exception:
- logger.exception(msg_templ_no_exc.format(stage))
+ logger.exception(msg_templ_no_exc.format(clean_func))
if exc is None:
for report_stage in report_stages:
+ logger.info("Start " + get_stage_name(report_stage))
report_stage(cfg_dict, ctx)
logger.info("All info stored in {0} folder".format(cfg_dict['var_dir']))
@@ -794,9 +799,6 @@
if cfg_dict.get('run_web_ui', False):
stop_web_ui(cfg_dict, ctx)
- # collector.stop()
- # open("plop.out", "w").write(repr(dict(collector.stack_counts)))
-
if exc is None:
logger.info("Tests finished successfully")
return 0