2.0 ready
diff --git a/wally/run_test.py b/wally/run_test.py
index 4484d45..43e4e43 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -1,13 +1,7 @@
-from __future__ import print_function
-
import os
import re
-import sys
import time
-import pprint
-import signal
import logging
-import argparse
import functools
import contextlib
import collections
@@ -20,23 +14,12 @@
except ImportError:
yaml_load = _yaml_load
-
-import texttable
-
-try:
- import faulthandler
-except ImportError:
- faulthandler = None
-
from concurrent.futures import ThreadPoolExecutor
-from wally import pretty_yaml
from wally.hw_info import get_hw_info
+from wally.config import get_test_files
from wally.discover import discover, Node
-from wally.timeseries import SensorDatastore
-from wally import utils, report, ssh_utils, start_vms
-from wally.config import (cfg_dict, load_config, setup_loggers,
- get_test_files, save_run_params, load_run_params)
+from wally import pretty_yaml, utils, report, ssh_utils, start_vms
from wally.sensors_utils import with_sensors_util, sensors_info_util
from wally.suits.mysql import MysqlTest
@@ -52,65 +35,65 @@
}
-try:
- from wally import webui
-except ImportError:
- webui = None
-
-
logger = logging.getLogger("wally")
-def format_result(res, formatter):
- data = "\n{0}\n".format("=" * 80)
- data += pprint.pformat(res) + "\n"
- data += "{0}\n".format("=" * 80)
- templ = "{0}\n\n====> {1}\n\n{2}\n\n"
- return templ.format(data, formatter(res), "=" * 80)
+def connect_all(nodes, spawned_node=False):
+ """
+ Connect to all nodes, log errors
+ nodes:[Node] - list of nodes
+ spawned_node:bool - whenever nodes is newly spawned VM
+ """
-
-class Context(object):
- def __init__(self):
- self.build_meta = {}
- self.nodes = []
- self.clear_calls_stack = []
- self.openstack_nodes_ids = []
- self.sensors_mon_q = None
- self.hw_info = []
-
-
-def connect_one(node, vm=False):
- if node.conn_url == 'local':
- node.connection = ssh_utils.connect(node.conn_url)
- return
-
- try:
- ssh_pref = "ssh://"
- if node.conn_url.startswith(ssh_pref):
- url = node.conn_url[len(ssh_pref):]
-
- if vm:
- conn_timeout = 240
- else:
- conn_timeout = 30
-
- node.connection = ssh_utils.connect(url,
- conn_timeout=conn_timeout)
- else:
- raise ValueError("Unknown url type {0}".format(node.conn_url))
- except Exception as exc:
- # logger.exception("During connect to " + node.get_conn_id())
- msg = "During connect to {0}: {1!s}".format(node.get_conn_id(),
- exc)
- logger.error(msg)
- node.connection = None
-
-
-def connect_all(nodes, vm=False):
logger.info("Connecting to nodes")
+
+ conn_timeout = 240 if spawned_node else 30
+
+ def connect_ext(conn_url):
+ try:
+ return ssh_utils.connect(conn_url, conn_timeout=conn_timeout)
+ except Exception as exc:
+ logger.error("During connect to {0}: {1!s}".format(conn_url, exc))
+ return None
+
+ urls = []
+ ssh_pref = "ssh://"
+
+ for node in nodes:
+ if node.conn_url == 'local':
+ urls.append(node.conn_url)
+ elif node.conn_url.startswith(ssh_pref):
+ urls.append(node.conn_url[len(ssh_pref):])
+ else:
+ msg = "Unknown url type {0}".format(node.conn_url)
+ logger.error(msg)
+ raise utils.StopTestError(msg)
+
with ThreadPoolExecutor(32) as pool:
- connect_one_f = functools.partial(connect_one, vm=vm)
- list(pool.map(connect_one_f, nodes))
+ for node, conn in zip(nodes, pool.map(connect_ext, urls)):
+ node.connection = conn
+
+ failed_testnodes = []
+ failed_nodes = []
+
+ for node in nodes:
+ if node.connection is None:
+ if 'testnode' in node.roles:
+ failed_testnodes.append(node.get_conn_id())
+ else:
+ failed_nodes.append(node.get_conn_id())
+
+ if failed_nodes != []:
+ msg = "Node(s) {0} would be excluded - can't connect"
+ logger.warning(msg.format(",".join(failed_nodes)))
+
+ if failed_testnodes != []:
+ msg = "Can't connect to testnode(s) " + ",".join(failed_testnodes)
+ logger.error(msg)
+ raise utils.StopTestError(msg)
+
+ if len(failed_nodes) == 0:
+ logger.info("All nodes connected successfully")
def collect_hw_info_stage(cfg, ctx):
@@ -132,7 +115,7 @@
if info.hostname is not None:
fname = os.path.join(
- cfg_dict['hwinfo_directory'],
+ cfg.hwinfo_directory,
info.hostname + "_lshw.xml")
with open(fname, "w") as fd:
@@ -141,22 +124,8 @@
logger.debug("Raw hardware info in " + cfg['hwinfo_directory'] + " folder")
-def run_single_test(test_nodes, name, params, log_directory,
- test_local_folder, run_uuid):
-
- test_cls = TOOL_TYPE_MAPPER[name]
- test_cfg = TestConfig(test_cls.__name__,
- params=params,
- test_uuid=run_uuid,
- nodes=test_nodes,
- log_directory=log_directory,
- remote_dir=test_local_folder.format(name=name))
-
- test = test_cls(test_cfg)
- return test.run()
-
-
-def suspend_vm_nodes(unused_nodes):
+@contextlib.contextmanager
+def suspend_vm_nodes_ctx(unused_nodes):
pausable_nodes_ids = [node.os_vm_id for node in unused_nodes
if node.os_vm_id is not None]
non_pausable = len(unused_nodes) - len(pausable_nodes_ids)
@@ -170,15 +139,38 @@
len(pausable_nodes_ids)))
start_vms.pause(pausable_nodes_ids)
- return pausable_nodes_ids
+ try:
+ yield pausable_nodes_ids
+ finally:
+ if len(pausable_nodes_ids) != 0:
+ logger.debug("Unpausing {0} nodes".format(
+ len(pausable_nodes_ids)))
+ start_vms.unpause(pausable_nodes_ids)
+
+
+def generate_result_dir_name(results, name, params):
+ # make a directory for results
+ all_tests_dirs = os.listdir(results)
+
+ if 'name' in params:
+ dir_name = "{0}_{1}".format(name, params['name'])
+ else:
+ for idx in range(len(all_tests_dirs) + 1):
+ dir_name = "{0}_{1}".format(name, idx)
+ if dir_name not in all_tests_dirs:
+ break
+ else:
+ raise utils.StopTestError("Can't select directory for test results")
+
+ return os.path.join(results, dir_name)
def run_tests(cfg, test_block, nodes):
- test_nodes = [node for node in nodes
- if 'testnode' in node.roles]
-
- not_test_nodes = [node for node in nodes
- if 'testnode' not in node.roles]
+ """
+ Run test from test block
+ """
+ test_nodes = [node for node in nodes if 'testnode' in node.roles]
+ not_test_nodes = [node for node in nodes if 'testnode' not in node.roles]
if len(test_nodes) == 0:
logger.error("No test nodes found")
@@ -186,15 +178,23 @@
for name, params in test_block.items():
results = []
- limit = params.get('node_limit')
+
+ # iterate over all node counts
+ limit = params.get('node_limit', len(test_nodes))
if isinstance(limit, (int, long)):
vm_limits = [limit]
- elif limit is None:
- vm_limits = [len(test_nodes)]
else:
+ list_or_tpl = isinstance(limit, (tuple, list))
+ all_ints = list_or_tpl and all(isinstance(climit, (int, long))
+ for climit in limit)
+ if not all_ints:
+ msg = "'node_limit' parameter ion config should" + \
+ "be either int or list if integers, not {0!r}".format(limit)
+ raise ValueError(msg)
vm_limits = limit
for vm_count in vm_limits:
+ # select test nodes
if vm_count == 'all':
curr_test_nodes = test_nodes
unused_nodes = []
@@ -205,111 +205,72 @@
if 0 == len(curr_test_nodes):
continue
- # make a directory for results
- all_tests_dirs = os.listdir(cfg_dict['results'])
+ results_path = generate_result_dir_name(cfg.results_storage, name, params)
+ utils.mkdirs_if_unxists(results_path)
- if 'name' in params:
- dir_name = "{0}_{1}".format(name, params['name'])
+ # suspend all unused virtual nodes
+ if cfg.settings.get('suspend_unused_vms', True):
+ suspend_ctx = suspend_vm_nodes_ctx(unused_nodes)
else:
- for idx in range(len(all_tests_dirs) + 1):
- dir_name = "{0}_{1}".format(name, idx)
- if dir_name not in all_tests_dirs:
- break
- else:
- raise utils.StopTestError(
- "Can't select directory for test results")
+ suspend_ctx = utils.empty_ctx()
- dir_path = os.path.join(cfg_dict['results'], dir_name)
- if not os.path.exists(dir_path):
- os.mkdir(dir_path)
+ with suspend_ctx:
+ resumable_nodes_ids = [node.os_vm_id for node in curr_test_nodes
+ if node.os_vm_id is not None]
- if cfg.get('suspend_unused_vms', True):
- pausable_nodes_ids = suspend_vm_nodes(unused_nodes)
+ if len(resumable_nodes_ids) != 0:
+ logger.debug("Check and unpause {0} nodes".format(
+ len(resumable_nodes_ids)))
+ start_vms.unpause(resumable_nodes_ids)
- resumable_nodes_ids = [node.os_vm_id for node in curr_test_nodes
- if node.os_vm_id is not None]
-
- if len(resumable_nodes_ids) != 0:
- logger.debug("Check and unpause {0} nodes".format(
- len(resumable_nodes_ids)))
- start_vms.unpause(resumable_nodes_ids)
-
- try:
sens_nodes = curr_test_nodes + not_test_nodes
with sensors_info_util(cfg, sens_nodes) as sensor_data:
- t_start = time.time()
- res = run_single_test(curr_test_nodes,
- name,
- params,
- dir_path,
- cfg['default_test_local_folder'],
- cfg['run_uuid'])
- t_end = time.time()
- finally:
- if cfg.get('suspend_unused_vms', True):
- if len(pausable_nodes_ids) != 0:
- logger.debug("Unpausing {0} nodes".format(
- len(pausable_nodes_ids)))
- start_vms.unpause(pausable_nodes_ids)
+ test_cls = TOOL_TYPE_MAPPER[name]
+ remote_dir = cfg.default_test_local_folder.format(name=name)
+
+ test_cfg = TestConfig(test_cls.__name__,
+ params=params,
+ test_uuid=cfg.run_uuid,
+ nodes=test_nodes,
+ log_directory=results_path,
+ remote_dir=remote_dir)
+
+ t_start = time.time()
+ res = test_cls(test_cfg).run()
+ t_end = time.time()
+
+ # save sensor data
if sensor_data is not None:
fname = "{0}_{1}.csv".format(int(t_start), int(t_end))
- fpath = os.path.join(cfg['sensor_storage'], fname)
+ fpath = os.path.join(cfg.sensor_storage, fname)
with open(fpath, "w") as fd:
fd.write("\n\n".join(sensor_data))
- results.extend(res)
+ results.append(res)
yield name, results
-def log_nodes_statistic(_, ctx):
- nodes = ctx.nodes
- logger.info("Found {0} nodes total".format(len(nodes)))
- per_role = collections.defaultdict(lambda: 0)
- for node in nodes:
- for role in node.roles:
- per_role[role] += 1
-
- for role, count in sorted(per_role.items()):
- logger.debug("Found {0} nodes with role {1}".format(count, role))
-
-
def connect_stage(cfg, ctx):
ctx.clear_calls_stack.append(disconnect_stage)
connect_all(ctx.nodes)
-
- all_ok = True
-
- for node in ctx.nodes:
- if node.connection is None:
- if 'testnode' in node.roles:
- msg = "Can't connect to testnode {0}"
- msg = msg.format(node.get_conn_id())
- logger.error(msg)
- raise utils.StopTestError(msg)
- else:
- msg = "Node {0} would be excluded - can't connect"
- logger.warning(msg.format(node.get_conn_id()))
- all_ok = False
-
- if all_ok:
- logger.info("All nodes connected successfully")
-
- ctx.nodes = [node for node in ctx.nodes
- if node.connection is not None]
+ ctx.nodes = [node for node in ctx.nodes if node.connection is not None]
def discover_stage(cfg, ctx):
+ """
+ discover clusters and nodes stage
+ """
if cfg.get('discover') is not None:
- discover_objs = [i.strip() for i in cfg['discover'].strip().split(",")]
+ discover_objs = [i.strip() for i in cfg.discover.strip().split(",")]
nodes = discover(ctx,
discover_objs,
- cfg['clouds'],
- cfg['var_dir'],
- not cfg['dont_discover_nodes'])
+ cfg.clouds,
+ cfg.results_storage,
+ not cfg.dont_discover_nodes)
ctx.nodes.extend(nodes)
@@ -327,16 +288,18 @@
if len(roles) != 0:
cluster[node.conn_url] = roles
- with open(cfg['nodes_report_file'], "w") as fd:
+ with open(cfg.nodes_report_file, "w") as fd:
fd.write(pretty_yaml.dumps(cluster))
def reuse_vms_stage(cfg, ctx):
- p = cfg.get('clouds', {}).get('openstack', {}).get('vms', [])
+ vms_patterns = cfg.get('clouds', {}).get('openstack', {}).get('vms', [])
+ private_key_path = get_vm_keypair(cfg)['keypair_file_private']
- for creds in p:
- vm_name_pattern, conn_pattern = creds.split(",")
+ for creds in vms_patterns:
+ user_name, vm_name_pattern = creds.split("@", 1)
msg = "Vm like {0} lookup failed".format(vm_name_pattern)
+
with utils.log_error(msg):
msg = "Looking for vm with name like {0}".format(vm_name_pattern)
logger.debug(msg)
@@ -348,7 +311,10 @@
conn = start_vms.nova_connect(**os_creds)
for ip, vm_id in start_vms.find_vms(conn, vm_name_pattern):
- node = Node(conn_pattern.format(ip=ip), ['testnode'])
+ conn_url = "ssh://{user}@{ip}::{key}".format(user=user_name,
+ ip=ip,
+ key=private_key_path)
+ node = Node(conn_url, ['testnode'])
node.os_vm_id = vm_id
ctx.nodes.append(node)
@@ -357,8 +323,8 @@
creds = None
tenant = None
- if 'openstack' in cfg['clouds']:
- os_cfg = cfg['clouds']['openstack']
+ if 'openstack' in cfg.clouds:
+ os_cfg = cfg.clouds['openstack']
if 'OPENRC' in os_cfg:
logger.info("Using OS credentials from " + os_cfg['OPENRC'])
user, passwd, tenant, auth_url = utils.get_creds_openrc(os_cfg['OPENRC'])
@@ -372,8 +338,8 @@
passwd = os_cfg['OS_PASSWORD'].strip()
auth_url = os_cfg['OS_AUTH_URL'].strip()
- if tenant is None and 'fuel' in cfg['clouds'] and \
- 'openstack_env' in cfg['clouds']['fuel'] and \
+ if tenant is None and 'fuel' in cfg.clouds and \
+ 'openstack_env' in cfg.clouds['fuel'] and \
ctx.fuel_openstack_creds is not None:
logger.info("Using fuel creds")
creds = ctx.fuel_openstack_creds
@@ -388,31 +354,57 @@
'auth_url': auth_url}
logger.debug("OS_CREDS: user={name} tenant={tenant} auth_url={auth_url}".format(**creds))
+
return creds
+def get_vm_keypair(cfg):
+ res = {}
+ for field, ext in (('keypair_file_private', 'pem'),
+ ('keypair_file_public', 'pub')):
+ fpath = cfg.vm_configs.get(field)
+
+ if fpath is None:
+ fpath = cfg.vm_configs['keypair_name'] + "." + ext
+
+ if os.path.isabs(fpath):
+ res[field] = fpath
+ else:
+ res[field] = os.path.join(cfg.config_folder, fpath)
+ return res
+
+
@contextlib.contextmanager
def create_vms_ctx(ctx, cfg, config, already_has_count=0):
- params = cfg['vm_configs'][config['cfg_name']].copy()
+ if config['count'].startswith('='):
+ count = int(config['count'][1:])
+ if count <= already_has_count:
+ logger.debug("Not need new vms")
+ yield []
+ return
+
+ params = cfg.vm_configs[config['cfg_name']].copy()
os_nodes_ids = []
if not start_vms.is_connected():
os_creds = get_OS_credentials(cfg, ctx)
else:
os_creds = {}
- start_vms.nova_connect(**os_creds)
+
+ nova = start_vms.nova_connect(**os_creds)
params.update(config)
- if 'keypair_file_private' not in params:
- params['keypair_file_private'] = params['keypair_name'] + ".pem"
+ params.update(get_vm_keypair(cfg))
- params['group_name'] = cfg_dict['run_uuid']
+ params['group_name'] = cfg.run_uuid
+ params['keypair_name'] = cfg.vm_configs['keypair_name']
if not config.get('skip_preparation', False):
logger.info("Preparing openstack")
- start_vms.prepare_os_subpr(params=params, **os_creds)
+ start_vms.prepare_os_subpr(nova, params=params, **os_creds)
new_nodes = []
+ old_nodes = ctx.nodes[:]
try:
for new_node, node_id in start_vms.launch_vms(params, already_has_count):
new_node.roles.append('testnode')
@@ -426,17 +418,15 @@
yield new_nodes
finally:
- if not cfg['keep_vm']:
+ if not cfg.keep_vm:
shut_down_vms_stage(cfg, ctx)
+ ctx.nodes = old_nodes
def run_tests_stage(cfg, ctx):
ctx.results = collections.defaultdict(lambda: [])
- if 'tests' not in cfg:
- return
-
- for group in cfg['tests']:
+ for group in cfg.get('tests', []):
if len(group.items()) != 1:
msg = "Items in tests section should have len == 1"
@@ -451,33 +441,36 @@
logger.error(msg)
raise utils.StopTestError(msg)
- num_test_nodes = sum(1 for node in ctx.nodes
- if 'testnode' in node.roles)
+ num_test_nodes = 0
+ for node in ctx.nodes:
+ if 'testnode' in node.roles:
+ num_test_nodes += 1
vm_ctx = create_vms_ctx(ctx, cfg, config['openstack'],
num_test_nodes)
- with vm_ctx as new_nodes:
- if len(new_nodes) != 0:
- logger.debug("Connecting to new nodes")
- connect_all(new_nodes, True)
+ tests = config.get('tests', [])
+ else:
+ vm_ctx = utils.empty_ctx([])
+ tests = [group]
- for node in new_nodes:
- if node.connection is None:
- msg = "Failed to connect to vm {0}"
- raise RuntimeError(msg.format(node.get_conn_id()))
+ if cfg.get('sensors') is None:
+ sensor_ctx = utils.empty_ctx()
+ else:
+ sensor_ctx = with_sensors_util(cfg.get('sensors'), ctx.nodes)
- with with_sensors_util(cfg_dict, ctx.nodes):
- for test_group in config.get('tests', []):
+ with vm_ctx as new_nodes:
+ if len(new_nodes) != 0:
+ connect_all(new_nodes, True)
+
+ if not cfg.no_tests:
+ for test_group in tests:
+ with sensor_ctx:
for tp, res in run_tests(cfg, test_group, ctx.nodes):
ctx.results[tp].extend(res)
- else:
- with with_sensors_util(cfg_dict, ctx.nodes):
- for tp, res in run_tests(cfg, group, ctx.nodes):
- ctx.results[tp].extend(res)
def shut_down_vms_stage(cfg, ctx):
- vm_ids_fname = cfg_dict['vm_ids_fname']
+ vm_ids_fname = cfg.vm_ids_fname
if ctx.openstack_nodes_ids is None:
nodes_ids = open(vm_ids_fname).read().split()
else:
@@ -493,12 +486,12 @@
def store_nodes_in_log(cfg, nodes_ids):
- with open(cfg['vm_ids_fname'], 'w') as fd:
+ with open(cfg.vm_ids_fname, 'w') as fd:
fd.write("\n".join(nodes_ids))
def clear_enviroment(cfg, ctx):
- if os.path.exists(cfg_dict['vm_ids_fname']):
+ if os.path.exists(cfg.vm_ids_fname):
shut_down_vms_stage(cfg, ctx)
@@ -511,28 +504,29 @@
def store_raw_results_stage(cfg, ctx):
-
- raw_results = cfg_dict['raw_results']
-
- if os.path.exists(raw_results):
- cont = yaml_load(open(raw_results).read())
+ if os.path.exists(cfg.raw_results):
+ cont = yaml_load(open(cfg.raw_results).read())
else:
cont = []
cont.extend(utils.yamable(ctx.results).items())
raw_data = pretty_yaml.dumps(cont)
- with open(raw_results, "w") as fd:
+ with open(cfg.raw_results, "w") as fd:
fd.write(raw_data)
def console_report_stage(cfg, ctx):
first_report = True
- text_rep_fname = cfg['text_report_file']
+ text_rep_fname = cfg.text_report_file
with open(text_rep_fname, "w") as fd:
for tp, data in ctx.results.items():
if 'io' == tp and data is not None:
- rep = IOPerfTest.format_for_console(data)
+ rep_lst = []
+ for result in data:
+ rep_lst.append(
+ IOPerfTest.format_for_console(list(result)))
+ rep = "\n\n".join(rep_lst)
elif tp in ['mysql', 'pgbench'] and data is not None:
rep = MysqlTest.format_for_console(data)
else:
@@ -550,7 +544,7 @@
def test_load_report_stage(cfg, ctx):
- load_rep_fname = cfg['load_report_file']
+ load_rep_fname = cfg.load_report_file
found = False
for idx, (tp, data) in enumerate(ctx.results.items()):
if 'io' == tp and data is not None:
@@ -564,49 +558,33 @@
def html_report_stage(cfg, ctx):
- html_rep_fname = cfg['html_report_file']
+ html_rep_fname = cfg.html_report_file
found = False
for tp, data in ctx.results.items():
if 'io' == tp and data is not None:
- if found:
+ if found or len(data) > 1:
logger.error("Making reports for more than one " +
"io block isn't supported! All " +
"report, except first are skipped")
continue
found = True
- report.make_io_report(data,
+ report.make_io_report(list(data[0]),
cfg.get('comment', ''),
html_rep_fname,
lab_info=ctx.hw_info)
-def complete_log_nodes_statistic(cfg, ctx):
- nodes = ctx.nodes
- for node in nodes:
- logger.debug(str(node))
+def load_data_from_path(test_res_dir):
+ files = get_test_files(test_res_dir)
+ raw_res = yaml_load(open(files['raw_results']).read())
+ res = collections.defaultdict(lambda: [])
+ for tp, test_lists in raw_res:
+ for tests in test_lists:
+ for suite_name, suite_data in tests.items():
+ result_folder = suite_data[0]
+ res[tp].append(TOOL_TYPE_MAPPER[tp].load(suite_name, result_folder))
-def load_data_from_file(var_dir, _, ctx):
- raw_results = os.path.join(var_dir, 'raw_results.yaml')
- ctx.results = {}
- for tp, results in yaml_load(open(raw_results).read()):
- cls = TOOL_TYPE_MAPPER[tp]
- ctx.results[tp] = map(cls.load, results)
-
-
-def load_data_from_path(var_dir):
- res_dir = os.path.join(var_dir, 'results')
- res = {}
- for dir_name in os.listdir(res_dir):
- dir_path = os.path.join(res_dir, dir_name)
- if not os.path.isdir(dir_path):
- continue
- rr = re.match(r"(?P<type>[a-z]+)_\d+$", dir_name)
- if rr is None:
- continue
- tp = rr.group('type')
- arr = res.setdefault(tp, [])
- arr.extend(TOOL_TYPE_MAPPER[tp].load(dir_path))
return res
@@ -617,230 +595,3 @@
def load_data_from(var_dir):
return functools.partial(load_data_from_path_stage, var_dir)
-
-
-def parse_args(argv):
- descr = "Disk io performance test suite"
- parser = argparse.ArgumentParser(prog='wally', description=descr)
-
- # subparsers = parser.add_subparsers()
- # test_parser = subparsers.add_parser('test', help='run tests')
-
- parser.add_argument("-l", dest='extra_logs',
- action='store_true', default=False,
- help="print some extra log info")
- parser.add_argument("-b", '--build_description',
- type=str, default="Build info")
- parser.add_argument("-i", '--build_id', type=str, default="id")
- parser.add_argument("-t", '--build_type', type=str, default="GA")
- parser.add_argument("-u", '--username', type=str, default="admin")
- parser.add_argument("-n", '--no-tests', action='store_true',
- help="Don't run tests", default=False)
- parser.add_argument("-p", '--post-process-only', metavar="VAR_DIR",
- help="Only process data from previour run")
- parser.add_argument("-x", '--xxx', action='store_true')
- parser.add_argument("-k", '--keep-vm', action='store_true',
- help="Don't remove test vm's", default=False)
- parser.add_argument("-d", '--dont-discover-nodes', action='store_true',
- help="Don't connect/discover fuel nodes",
- default=False)
- parser.add_argument("-r", '--no-html-report', action='store_true',
- help="Skip html report", default=False)
- parser.add_argument("--params", metavar="testname.paramname",
- help="Test params", default=[])
- parser.add_argument("--ls", action='store_true', default=False)
- parser.add_argument("-c", "--comment", default="")
- parser.add_argument("config_file")
-
- return parser.parse_args(argv[1:])
-
-
-def get_stage_name(func):
- nm = get_func_name(func)
- if nm.endswith("stage"):
- return nm
- else:
- return nm + " stage"
-
-
-def get_test_names(raw_res):
- res = set()
- for tp, data in raw_res:
- for block in data:
- res.add("{0}({1})".format(tp, block.get('test_name', '-')))
- return res
-
-
-def list_results(path):
- results = []
-
- for dname in os.listdir(path):
-
- files_cfg = get_test_files(os.path.join(path, dname))
-
- if not os.path.isfile(files_cfg['raw_results']):
- continue
-
- mt = os.path.getmtime(files_cfg['raw_results'])
- res_mtime = time.ctime(mt)
-
- raw_res = yaml_load(open(files_cfg['raw_results']).read())
- test_names = ",".join(sorted(get_test_names(raw_res)))
-
- params = load_run_params(files_cfg['run_params_file'])
-
- comm = params.get('comment')
- results.append((mt, dname, test_names, res_mtime,
- '-' if comm is None else comm))
-
- tab = texttable.Texttable(max_width=200)
- tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
- tab.set_cols_align(["l", "l", "l", "l"])
- results.sort()
-
- for data in results[::-1]:
- tab.add_row(data[1:])
-
- tab.header(["Name", "Tests", "etime", "Comment"])
-
- print(tab.draw())
-
-
-def get_func_name(obj):
- if hasattr(obj, '__name__'):
- return obj.__name__
- if hasattr(obj, 'func_name'):
- return obj.func_name
- return obj.func.func_name
-
-
-@contextlib.contextmanager
-def log_stage(func):
- msg_templ = "Exception during {0}: {1!s}"
- msg_templ_no_exc = "During {0}"
-
- logger.info("Start " + get_stage_name(func))
-
- try:
- yield
- except utils.StopTestError as exc:
- logger.error(msg_templ.format(
- get_func_name(func), exc))
- except Exception:
- logger.exception(msg_templ_no_exc.format(
- get_func_name(func)))
-
-
-def main(argv):
- if faulthandler is not None:
- faulthandler.register(signal.SIGUSR1, all_threads=True)
-
- opts = parse_args(argv)
-
- # x = load_data_from_path("/var/wally_results/uncorroborant_dinah")
- # y = load_data_from_path("/var/wally_results/nonmelting_jamal")
- # print(IOPerfTest.format_diff_for_console([x['io'], y['io']]))
- # exit(1)
-
- if opts.ls:
- list_results(opts.config_file)
- exit(0)
-
- data_dir = load_config(opts.config_file, opts.post_process_only)
-
- if opts.post_process_only is None:
- cfg_dict['comment'] = opts.comment
- save_run_params()
-
- if cfg_dict.get('logging', {}).get("extra_logs", False) or opts.extra_logs:
- # level = logging.DEBUG
- level = logging.INFO
- else:
- level = logging.WARNING
-
- setup_loggers(level, cfg_dict['log_file'])
-
- if not os.path.exists(cfg_dict['saved_config_file']):
- with open(cfg_dict['saved_config_file'], 'w') as fd:
- fd.write(open(opts.config_file).read())
-
- if opts.post_process_only is not None:
- stages = [
- load_data_from(data_dir)
- ]
- else:
- stages = [
- discover_stage
- ]
-
- stages.extend([
- reuse_vms_stage,
- log_nodes_statistic,
- save_nodes_stage,
- connect_stage])
-
- if cfg_dict.get('collect_info', True):
- stages.append(collect_hw_info_stage)
-
- stages.extend([
- # deploy_sensors_stage,
- run_tests_stage,
- store_raw_results_stage,
- # gather_sensors_stage
- ])
-
- report_stages = [
- console_report_stage,
- ]
-
- if opts.xxx:
- report_stages.append(test_load_report_stage)
- elif not opts.no_html_report:
- report_stages.append(html_report_stage)
-
- logger.info("All info would be stored into {0}".format(
- cfg_dict['var_dir']))
-
- ctx = Context()
- ctx.results = {}
- ctx.build_meta['build_id'] = opts.build_id
- ctx.build_meta['build_descrption'] = opts.build_description
- ctx.build_meta['build_type'] = opts.build_type
- ctx.build_meta['username'] = opts.username
- ctx.sensors_data = SensorDatastore()
-
- cfg_dict['keep_vm'] = opts.keep_vm
- cfg_dict['no_tests'] = opts.no_tests
- cfg_dict['dont_discover_nodes'] = opts.dont_discover_nodes
-
- for stage in stages:
- ok = False
- with log_stage(stage):
- stage(cfg_dict, ctx)
- ok = True
- if not ok:
- break
-
- exc, cls, tb = sys.exc_info()
- for stage in ctx.clear_calls_stack[::-1]:
- with log_stage(stage):
- stage(cfg_dict, ctx)
-
- logger.debug("Start utils.cleanup")
- for clean_func, args, kwargs in utils.iter_clean_func():
- with log_stage(clean_func):
- clean_func(*args, **kwargs)
-
- if exc is None:
- for report_stage in report_stages:
- with log_stage(report_stage):
- report_stage(cfg_dict, ctx)
-
- logger.info("All info stored in {0} folder".format(cfg_dict['var_dir']))
-
- if exc is None:
- logger.info("Tests finished successfully")
- return 0
- else:
- logger.error("Tests are failed. See detailed error above")
- return 1