fix muptiply tests in one run, update report generation
diff --git a/wally/run_test.py b/wally/run_test.py
index 22856b5..5e6b4f9 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -14,6 +14,7 @@
import collections
import yaml
+import texttable
import faulthandler
from concurrent.futures import ThreadPoolExecutor
@@ -23,7 +24,8 @@
from wally.timeseries import SensorDatastore
from wally import utils, report, ssh_utils, start_vms
from wally.suits import IOPerfTest, PgBenchTest, MysqlTest
-from wally.config import cfg_dict, load_config, setup_loggers
+from wally.config import (cfg_dict, load_config, setup_loggers,
+ get_test_files)
from wally.sensors_utils import with_sensors_util, sensors_info_util
TOOL_TYPE_MAPPER = {
@@ -150,8 +152,8 @@
res_q.put(exc)
-def run_single_test(test_nodes, name, test_cls, params,
- test_local_folder, run_uuid, counter=[0]):
+def run_single_test(test_nodes, name, test_cls, params, log_directory,
+ test_local_folder, run_uuid):
logger.info("Starting {0} tests".format(name))
res_q = Queue.Queue()
threads = []
@@ -163,13 +165,6 @@
msg = "Starting {0} test on {1} node"
logger.debug(msg.format(name, node.conn_url))
- dname = "{0}_{1}_{2}".format(name, counter[0], node.get_ip())
- counter[0] += 1
- dr = os.path.join(cfg_dict['test_log_directory'], dname)
-
- if not os.path.exists(dr):
- os.makedirs(dr)
-
params = params.copy()
params['testnodes_count'] = len(test_nodes)
test = test_cls(options=params,
@@ -178,16 +173,18 @@
test_uuid=run_uuid,
node=node,
remote_dir=rem_folder,
- log_directory=dr,
+ log_directory=log_directory,
coordination_queue=coord_q,
total_nodes_count=len(test_nodes))
- th = threading.Thread(None, test_thread, None,
+ th = threading.Thread(None, test_thread,
+ "Test:" + node.get_conn_id(),
(test, node, barrier, res_q))
threads.append(th)
th.daemon = True
th.start()
- th = threading.Thread(None, test_cls.coordination_th, None,
+ th = threading.Thread(None, test_cls.coordination_th,
+ "Coordination thread",
(coord_q, barrier, len(threads)))
threads.append(th)
th.daemon = True
@@ -198,6 +195,7 @@
while len(threads) != 0:
nthreads = []
+ time.sleep(0.1)
for th in threads:
if not th.is_alive():
@@ -271,6 +269,24 @@
if 0 == len(curr_test_nodes):
continue
+ # make a directory for results
+ all_tests_dirs = os.listdir(cfg_dict['results'])
+
+ if 'name' in params:
+ dir_name = "{0}_{1}".format(name, params['name'])
+ else:
+ for idx in range(len(all_tests_dirs) + 1):
+ dir_name = "{0}_{1}".format(name, idx)
+ if dir_name not in all_tests_dirs:
+ break
+ else:
+ raise utils.StopTestError(
+ "Can't select directory for test results")
+
+ dir_path = os.path.join(cfg_dict['results'], dir_name)
+ if not os.path.exists(dir_path):
+ os.mkdir(dir_path)
+
if cfg.get('suspend_unused_vms', True):
pausable_nodes_ids = suspend_vm_nodes(unused_nodes)
@@ -287,8 +303,11 @@
sens_nodes = curr_test_nodes + not_test_nodes
with sensors_info_util(cfg, sens_nodes) as sensor_data:
t_start = time.time()
- res = run_single_test(curr_test_nodes, name, test_cls,
+ res = run_single_test(curr_test_nodes,
+ name,
+ test_cls,
params,
+ dir_path,
cfg['default_test_local_folder'],
cfg['run_uuid'])
t_end = time.time()
@@ -379,9 +398,7 @@
def reuse_vms_stage(cfg, ctx):
- p = cfg.get('clouds', {})
- p = p.get('openstack', {})
- p = p.get('vms', [])
+ p = cfg.get('clouds', {}).get('openstack', {}).get('vms', [])
for creds in p:
vm_name_pattern, conn_pattern = creds.split(",")
@@ -425,6 +442,8 @@
def get_OS_credentials(cfg, ctx):
creds = None
+ tenant = None
+
if 'openstack' in cfg['clouds']:
os_cfg = cfg['clouds']['openstack']
if 'OPENRC' in os_cfg:
@@ -434,18 +453,18 @@
elif 'ENV' in os_cfg:
logger.info("Using OS credentials from shell environment")
user, passwd, tenant, auth_url = start_vms.ostack_get_creds()
- else:
+ elif 'OS_TENANT_NAME' in os_cfg:
logger.info("Using predefined credentials")
tenant = os_cfg['OS_TENANT_NAME'].strip()
user = os_cfg['OS_USERNAME'].strip()
passwd = os_cfg['OS_PASSWORD'].strip()
auth_url = os_cfg['OS_AUTH_URL'].strip()
- elif 'fuel' in cfg['clouds'] and \
- 'openstack_env' in cfg['clouds']['fuel']:
+ if tenant is None and 'fuel' in cfg['clouds'] and \
+ 'openstack_env' in cfg['clouds']['fuel']:
logger.info("Using fuel creds")
creds = ctx.fuel_openstack_creds
- else:
+ elif tenant is None:
logger.error("Can't found OS credentials")
raise utils.StopTestError("Can't found OS credentials", None)
@@ -472,7 +491,9 @@
start_vms.nova_connect(**os_creds)
params.update(config)
- params['keypair_file_private'] = params['keypair_name'] + ".pem"
+ if 'keypair_file_private' not in params:
+ params['keypair_file_private'] = params['keypair_name'] + ".pem"
+
params['group_name'] = cfg_dict['run_uuid']
if not config.get('skip_preparation', False):
@@ -499,7 +520,7 @@
def run_tests_stage(cfg, ctx):
- ctx.results = []
+ ctx.results = collections.defaultdict(lambda: [])
if 'tests' not in cfg:
return
@@ -532,11 +553,12 @@
with with_sensors_util(cfg_dict, ctx.nodes):
for test_group in config.get('tests', []):
- ctx.results.extend(run_tests(cfg, test_group,
- ctx.nodes))
+ for tp, res in run_tests(cfg, test_group, ctx.nodes):
+ ctx.results[tp].extend(res)
else:
with with_sensors_util(cfg_dict, ctx.nodes):
- ctx.results.extend(run_tests(cfg, group, ctx.nodes))
+ for tp, res in run_tests(cfg, group, ctx.nodes):
+ ctx.results[tp].extend(res)
def shut_down_vms_stage(cfg, ctx):
@@ -575,14 +597,14 @@
def store_raw_results_stage(cfg, ctx):
- raw_results = os.path.join(cfg_dict['var_dir'], 'raw_results.yaml')
+ raw_results = cfg_dict['raw_results']
if os.path.exists(raw_results):
cont = yaml.load(open(raw_results).read())
else:
cont = []
- cont.extend(utils.yamable(ctx.results))
+ cont.extend(utils.yamable(ctx.results).items())
raw_data = pretty_yaml.dumps(cont)
with open(raw_results, "w") as fd:
@@ -590,30 +612,33 @@
def console_report_stage(cfg, ctx):
- for tp, data in ctx.results:
- if 'io' == tp and data is not None:
- dinfo = report.process_disk_info(data)
- text_rep_fname = cfg['text_report_file']
- rep = IOPerfTest.format_for_console(data, dinfo)
+ first_report = True
+ text_rep_fname = cfg['text_report_file']
+ with open(text_rep_fname, "w") as fd:
+ for tp, data in ctx.results.items():
+ if 'io' == tp and data is not None:
+ dinfo = report.process_disk_info(data)
+ rep = IOPerfTest.format_for_console(data, dinfo)
+ elif tp in ['mysql', 'pgbench'] and data is not None:
+ rep = MysqlTest.format_for_console(data)
+ else:
+ logger.warning("Can't generate text report for " + tp)
+ continue
- with open(text_rep_fname, "w") as fd:
- fd.write(rep)
- fd.write("\n")
- fd.flush()
+ fd.write(rep)
+ fd.write("\n")
- logger.info("Text report were stored in " + text_rep_fname)
+ if first_report:
+ logger.info("Text report were stored in " + text_rep_fname)
+ first_report = False
+
print("\n" + rep + "\n")
- if tp in ['mysql', 'pgbench'] and data is not None:
- print("\n")
- print(MysqlTest.format_for_console(data))
- print("\n")
-
def html_report_stage(cfg, ctx):
html_rep_fname = cfg['html_report_file']
found = False
- for tp, data in ctx.results:
+ for tp, data in ctx.results.items():
if 'io' == tp and data is not None:
if found:
logger.error("Making reports for more than one " +
@@ -635,10 +660,10 @@
def load_data_from(var_dir):
def load_data_from_file(_, ctx):
raw_results = os.path.join(var_dir, 'raw_results.yaml')
- ctx.results = []
+ ctx.results = {}
for tp, results in yaml.load(open(raw_results).read()):
cls = TOOL_TYPE_MAPPER[tp]
- ctx.results.append((tp, map(cls.load, results)))
+ ctx.results[tp] = map(cls.load, results)
return load_data_from_file
@@ -684,6 +709,7 @@
help="Skip html report", default=False)
parser.add_argument("--params", metavar="testname.paramname",
help="Test params", default=[])
+ parser.add_argument("--ls", action='store_true', default=False)
parser.add_argument("config_file")
return parser.parse_args(argv[1:])
@@ -696,7 +722,52 @@
return func.__name__ + " stage"
+def list_results(path):
+ results = []
+
+ for dname in os.listdir(path):
+
+ cfg = get_test_files(os.path.join(path, dname))
+
+ if not os.path.isfile(cfg['raw_results']):
+ continue
+
+ res_mtime = time.ctime(os.path.getmtime(cfg['raw_results']))
+ cfg = yaml.load(open(cfg['saved_config_file']).read())
+
+ test_names = []
+
+ for block in cfg['tests']:
+ assert len(block.items()) == 1
+ name, data = block.items()[0]
+ if name == 'start_test_nodes':
+ for in_blk in data['tests']:
+ assert len(in_blk.items()) == 1
+ in_name, _ = in_blk.items()[0]
+ test_names.append(in_name)
+ else:
+ test_names.append(name)
+ results.append((dname, test_names, res_mtime))
+
+ tab = texttable.Texttable(max_width=120)
+ tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
+ tab.set_cols_align(["l", "l", "l"])
+ results.sort(key=lambda x: x[2])
+
+ for data in results:
+ dname, tests, mtime = data
+ tab.add_row([dname, ', '.join(tests), mtime])
+
+ tab.header(["Name", "Tests", "etime"])
+
+ print(tab.draw())
+
+
def main(argv):
+ if '--ls' in argv:
+ list_results(argv[-1])
+ exit(0)
+
faulthandler.register(signal.SIGUSR1, all_threads=True)
opts = parse_args(argv)
load_config(opts.config_file, opts.post_process_only)
@@ -708,6 +779,10 @@
setup_loggers(level, cfg_dict['log_file'])
+ if not os.path.exists(cfg_dict['saved_config_file']):
+ with open(cfg_dict['saved_config_file'], 'w') as fd:
+ fd.write(open(opts.config_file).read())
+
if opts.post_process_only is not None:
stages = [
load_data_from(opts.post_process_only)