a lot of changes
diff --git a/wally/run_test.py b/wally/run_test.py
index dc6637f..5322432 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -5,6 +5,7 @@
import time
import Queue
import pprint
+import signal
import logging
import argparse
import functools
@@ -14,6 +15,7 @@
import collections
import yaml
+import faulthandler
from concurrent.futures import ThreadPoolExecutor
from wally import pretty_yaml
@@ -21,9 +23,9 @@
from wally.discover import discover, Node
from wally.timeseries import SensorDatastore
from wally import utils, report, ssh_utils, start_vms
-from wally.suits.itest import IOPerfTest, PgBenchTest, MysqlTest
-from wally.sensors_utils import deploy_sensors_stage
from wally.config import cfg_dict, load_config, setup_loggers
+from wally.suits.itest import IOPerfTest, PgBenchTest, MysqlTest
+from wally.sensors_utils import deploy_sensors_stage, gather_sensors_stage
try:
@@ -143,6 +145,78 @@
res_q.put(exc)
+def run_single_test(test_nodes, name, test_cls, params,
+ test_local_folder, run_uuid, counter=[0]):
+ logger.info("Starting {0} tests".format(name))
+ res_q = Queue.Queue()
+ threads = []
+ coord_q = Queue.Queue()
+ rem_folder = test_local_folder.format(name=name)
+
+ barrier = utils.Barrier(len(test_nodes))
+ for idx, node in enumerate(test_nodes):
+ msg = "Starting {0} test on {1} node"
+ logger.debug(msg.format(name, node.conn_url))
+
+ dname = "{0}_{1}_{2}".format(name, counter[0], node.get_ip())
+ counter[0] += 1
+ dr = os.path.join(cfg_dict['test_log_directory'], dname)
+
+ if not os.path.exists(dr):
+ os.makedirs(dr)
+
+ params = params.copy()
+ params['testnodes_count'] = len(test_nodes)
+ test = test_cls(options=params,
+ is_primary=(idx == 0),
+ on_result_cb=res_q.put,
+ test_uuid=run_uuid,
+ node=node,
+ remote_dir=rem_folder,
+ log_directory=dr,
+ coordination_queue=coord_q)
+ th = threading.Thread(None, test_thread, None,
+ (test, node, barrier, res_q))
+ threads.append(th)
+ th.daemon = True
+ th.start()
+
+ th = threading.Thread(None, test_cls.coordination_th, None,
+ (coord_q, barrier, len(threads)))
+ threads.append(th)
+ th.daemon = True
+ th.start()
+
+ results = []
+ coord_q.put(None)
+
+ while len(threads) != 0:
+ nthreads = []
+
+ for th in threads:
+ if not th.is_alive():
+ th.join()
+ else:
+ nthreads.append(th)
+
+ threads = nthreads
+
+ while not res_q.empty():
+ val = res_q.get()
+
+ if isinstance(val, utils.StopTestError):
+ raise val
+
+ if isinstance(val, Exception):
+ msg = "Exception during test execution: {0!s}"
+ raise ValueError(msg.format(val))
+
+ results.append(val)
+
+ results = test_cls.merge_results(results)
+ return results
+
+
def run_tests(cfg, test_block, nodes):
tool_type_mapper = {
"io": IOPerfTest,
@@ -156,87 +230,65 @@
logger.error("No test nodes found")
return
- test_number_per_type = {}
- res_q = Queue.Queue()
-
for name, params in test_block.items():
- logger.info("Starting {0} tests".format(name))
- test_num = test_number_per_type.get(name, 0)
- test_number_per_type[name] = test_num + 1
- threads = []
- barrier = utils.Barrier(len(test_nodes))
- coord_q = Queue.Queue()
- test_cls = tool_type_mapper[name]
- rem_folder = cfg['default_test_local_folder'].format(name=name)
-
- for idx, node in enumerate(test_nodes):
- msg = "Starting {0} test on {1} node"
- logger.debug(msg.format(name, node.conn_url))
-
- dr = os.path.join(
- cfg_dict['test_log_directory'],
- "{0}_{1}_{2}".format(name, test_num, node.get_ip())
- )
-
- if not os.path.exists(dr):
- os.makedirs(dr)
-
- test = test_cls(options=params,
- is_primary=(idx == 0),
- on_result_cb=res_q.put,
- test_uuid=cfg['run_uuid'],
- node=node,
- remote_dir=rem_folder,
- log_directory=dr,
- coordination_queue=coord_q)
- th = threading.Thread(None, test_thread, None,
- (test, node, barrier, res_q))
- threads.append(th)
- th.daemon = True
- th.start()
-
- th = threading.Thread(None, test_cls.coordination_th, None,
- (coord_q, barrier, len(threads)))
- threads.append(th)
- th.daemon = True
- th.start()
-
- def gather_results(res_q, results):
- while not res_q.empty():
- val = res_q.get()
-
- if isinstance(val, utils.StopTestError):
- raise val
-
- if isinstance(val, Exception):
- msg = "Exception during test execution: {0!s}"
- raise ValueError(msg.format(val))
-
- results.append(val)
-
results = []
+ limit = params.get('node_limit')
+ if isinstance(limit, (int, long)):
+ vm_limits = [limit]
+ elif limit is None:
+ vm_limits = [len(test_nodes)]
+ else:
+ vm_limits = limit
- # MAX_WAIT_TIME = 10
- # end_time = time.time() + MAX_WAIT_TIME
+ for vm_count in vm_limits:
+ if vm_count == 'all':
+ curr_test_nodes = test_nodes
+ unused_nodes = []
+ else:
+ curr_test_nodes = test_nodes[:vm_count]
+ unused_nodes = test_nodes[vm_count:]
- # while time.time() < end_time:
- while True:
- for th in threads:
- th.join(1)
- gather_results(res_q, results)
- # if time.time() > end_time:
- # break
+ if 0 == len(curr_test_nodes):
+ continue
- if all(not th.is_alive() for th in threads):
- break
+ if cfg.get('suspend_unused_vms', True):
+ pausable_nodes_ids = [node.os_vm_id for node in unused_nodes
+ if node.os_vm_id is not None]
+ non_pausable = len(unused_nodes) - len(pausable_nodes_ids)
- # if any(th.is_alive() for th in threads):
- # logger.warning("Some test threads still running")
+ if 0 != non_pausable:
+ logger.warning("Can't pause {0} nodes".format(
+ non_pausable))
- gather_results(res_q, results)
- result = test_cls.merge_results(results)
- result['__test_meta__'] = {'testnodes_count': len(test_nodes)}
- yield name, result
+ if len(pausable_nodes_ids) != 0:
+ logger.debug("Try to pause {0} unused nodes".format(
+ len(pausable_nodes_ids)))
+ start_vms.pause(pausable_nodes_ids)
+
+ resumable_nodes_ids = [node.os_vm_id for node in curr_test_nodes
+ if node.os_vm_id is not None]
+
+ if len(resumable_nodes_ids) != 0:
+ logger.debug("Check and unpause {0} nodes".format(
+ len(resumable_nodes_ids)))
+ start_vms.unpause(resumable_nodes_ids)
+
+ test_cls = tool_type_mapper[name]
+ try:
+ res = run_single_test(curr_test_nodes, name, test_cls,
+ params,
+ cfg['default_test_local_folder'],
+ cfg['run_uuid'])
+ finally:
+ if cfg.get('suspend_unused_vms', True):
+ if len(pausable_nodes_ids) != 0:
+ logger.debug("Unpausing {0} nodes".format(
+ len(pausable_nodes_ids)))
+ start_vms.unpause(pausable_nodes_ids)
+
+ results.append(res)
+
+ yield name, results
def log_nodes_statistic(_, ctx):
@@ -306,13 +358,22 @@
fd.write(pretty_yaml.dumps(cluster))
-def reuse_vms_stage(vm_name_pattern, conn_pattern):
- def reuse_vms(cfg, ctx):
+def reuse_vms_stage(cfg, ctx):
+ p = cfg.get('clouds', {})
+ p = p.get('openstack', {})
+ p = p.get('vms', [])
+
+ for creds in p:
+ vm_name_pattern, conn_pattern = creds.split(",")
try:
msg = "Looking for vm with name like {0}".format(vm_name_pattern)
logger.debug(msg)
- os_creds = get_OS_credentials(cfg, ctx, "clouds")
+ if not start_vms.is_connected():
+ os_creds = get_OS_credentials(cfg, ctx)
+ else:
+ os_creds = {}
+
conn = start_vms.nova_connect(**os_creds)
for ip, vm_id in start_vms.find_vms(conn, vm_name_pattern):
node = Node(conn_pattern.format(ip=ip), ['testnode'])
@@ -325,56 +386,58 @@
logger.exception(msg)
raise utils.StopTestError(msg, exc)
- return reuse_vms
+
+def get_creds_openrc(path):
+ fc = open(path).read()
+
+ echo = 'echo "$OS_TENANT_NAME:$OS_USERNAME:$OS_PASSWORD@$OS_AUTH_URL"'
+
+ try:
+ data = utils.run_locally(['/bin/bash'],
+ input_data=fc + "\n" + echo)
+ except subprocess.CalledProcessError as exc:
+ msg = "Failed to get creads from openrc file: " + data
+ logger.exception(msg)
+ raise utils.StopTestError(msg, exc)
+
+ try:
+ data = data.strip()
+ user, tenant, passwd_auth_url = data.split(':', 2)
+ passwd, auth_url = passwd_auth_url.rsplit("@", 1)
+ assert (auth_url.startswith("https://") or
+ auth_url.startswith("http://"))
+ except Exception as exc:
+ msg = "Failed to get creads from openrc file: " + data
+ logger.exception(msg)
+ raise utils.StopTestError(msg, exc)
+ return user, passwd, tenant, auth_url
-def get_OS_credentials(cfg, ctx, creds_type):
+def get_OS_credentials(cfg, ctx):
creds = None
-
- if creds_type == 'clouds':
- logger.info("Using OS credentials from 'cloud' section")
- if 'openstack' in cfg['clouds']:
- os_cfg = cfg['clouds']['openstack']
-
+ if 'openstack' in cfg['clouds']:
+ os_cfg = cfg['clouds']['openstack']
+ if 'OPENRC' in os_cfg:
+ logger.info("Using OS credentials from " + os_cfg['OPENRC'])
+ user, passwd, tenant, auth_url = \
+ get_creds_openrc(os_cfg['OPENRC'])
+ elif 'ENV' in os_cfg:
+ logger.info("Using OS credentials from shell environment")
+ user, passwd, tenant, auth_url = start_vms.ostack_get_creds()
+ else:
+ logger.info("Using predefined credentials")
tenant = os_cfg['OS_TENANT_NAME'].strip()
user = os_cfg['OS_USERNAME'].strip()
passwd = os_cfg['OS_PASSWORD'].strip()
auth_url = os_cfg['OS_AUTH_URL'].strip()
- elif 'fuel' in cfg['clouds'] and \
- 'openstack_env' in cfg['clouds']['fuel']:
- creds = ctx.fuel_openstack_creds
-
- elif creds_type == 'ENV':
- logger.info("Using OS credentials from shell environment")
- user, passwd, tenant, auth_url = start_vms.ostack_get_creds()
- elif os.path.isfile(creds_type):
- logger.info("Using OS credentials from " + creds_type)
- fc = open(creds_type).read()
-
- echo = 'echo "$OS_TENANT_NAME:$OS_USERNAME:$OS_PASSWORD@$OS_AUTH_URL"'
-
- try:
- data = utils.run_locally(['/bin/bash'], input=fc + "\n" + echo)
- except subprocess.CalledProcessError as exc:
- msg = "Failed to get creads from openrc file: " + data
- logger.exception(msg)
- raise utils.StopTestError(msg, exc)
-
- try:
- user, tenant, passwd_auth_url = data.split(':', 2)
- passwd, auth_url = passwd_auth_url.rsplit("@", 1)
- assert (auth_url.startswith("https://") or
- auth_url.startswith("http://"))
- except Exception as exc:
- msg = "Failed to get creads from openrc file: " + data
- logger.exception(msg)
- raise utils.StopTestError(msg, exc)
-
+ elif 'fuel' in cfg['clouds'] and \
+ 'openstack_env' in cfg['clouds']['fuel']:
+ logger.info("Using fuel creds")
+ creds = ctx.fuel_openstack_creds
else:
- msg = "Creds {0!r} isn't supported".format(creds_type)
- logger.error(msg)
- raise utils.StopTestError(msg, None)
+ logger.error("Can't found OS credentials")
+ raise utils.StopTestError("Can't found OS credentials", None)
if creds is None:
creds = {'name': user,
@@ -392,16 +455,18 @@
params = cfg['vm_configs'][config['cfg_name']].copy()
os_nodes_ids = []
- os_creds_type = config['creds']
- os_creds = get_OS_credentials(cfg, ctx, os_creds_type)
+ if not start_vms.is_connected():
+ os_creds = get_OS_credentials(cfg, ctx)
+ else:
+ os_creds = {}
start_vms.nova_connect(**os_creds)
- logger.info("Preparing openstack")
params.update(config)
params['keypair_file_private'] = params['keypair_name'] + ".pem"
params['group_name'] = cfg_dict['run_uuid']
if not config.get('skip_preparation', False):
+ logger.info("Preparing openstack")
start_vms.prepare_os_subpr(params=params, **os_creds)
new_nodes = []
@@ -446,26 +511,25 @@
vm_ctx = create_vms_ctx(ctx, cfg, config['openstack'],
num_test_nodes)
with vm_ctx as new_nodes:
- connect_all(new_nodes, True)
+ if len(new_nodes) != 0:
+ connect_all(new_nodes, True)
- for node in new_nodes:
- if node.connection is None:
- msg = "Failed to connect to vm {0}"
- raise RuntimeError(msg.format(node.get_conn_id()))
+ for node in new_nodes:
+ if node.connection is None:
+ msg = "Failed to connect to vm {0}"
+ raise RuntimeError(msg.format(node.get_conn_id()))
- deploy_sensors_stage(cfg_dict,
- ctx,
- nodes=new_nodes,
- undeploy=False)
+ deploy_sensors_stage(cfg_dict,
+ ctx,
+ nodes=new_nodes,
+ undeploy=False)
- if not cfg['no_tests']:
- for test_group in config.get('tests', []):
- test_res = run_tests(cfg, test_group, ctx.nodes)
- ctx.results.extend(test_res)
+ for test_group in config.get('tests', []):
+ test_res = run_tests(cfg, test_group, ctx.nodes)
+ ctx.results.extend(test_res)
else:
- if not cfg['no_tests']:
- test_res = run_tests(cfg, group, ctx.nodes)
- ctx.results.extend(test_res)
+ test_res = run_tests(cfg, group, ctx.nodes)
+ ctx.results.extend(test_res)
def shut_down_vms_stage(cfg, ctx):
@@ -522,9 +586,19 @@
for tp, data in ctx.results:
if 'io' == tp and data is not None:
dinfo = report.process_disk_info(data)
+ text_rep_fname = cfg['text_report_file']
+ rep = IOPerfTest.format_for_console(data, dinfo)
+
+ with open(text_rep_fname, "w") as fd:
+ fd.write(rep)
+ fd.write("\n")
+ fd.flush()
+
+ logger.info("Text report were stored in " + text_rep_fname)
print("\n")
print(IOPerfTest.format_for_console(data, dinfo))
print("\n")
+
if tp in ['mysql', 'pgbench'] and data is not None:
print("\n")
print(MysqlTest.format_for_console(data))
@@ -547,14 +621,6 @@
cfg['charts_img_path'],
lab_info=ctx.hw_info)
- text_rep_fname = cfg_dict['text_report_file']
- with open(text_rep_fname, "w") as fd:
- fd.write(IOPerfTest.format_for_console(data, dinfo))
- fd.write("\n")
- fd.flush()
-
- logger.info("Text report were stored in " + text_rep_fname)
-
def complete_log_nodes_statistic(cfg, ctx):
nodes = ctx.nodes
@@ -610,13 +676,19 @@
help="Skip html report", default=False)
parser.add_argument("--params", metavar="testname.paramname",
help="Test params", default=[])
- parser.add_argument("--reuse-vms", default=[], nargs='*')
parser.add_argument("config_file")
return parser.parse_args(argv[1:])
+# from plop.collector import Collector
+
+
def main(argv):
+ # collector = Collector()
+ # collector.start()
+
+ faulthandler.register(signal.SIGUSR1, all_threads=True)
opts = parse_args(argv)
load_config(opts.config_file, opts.post_process_only)
@@ -629,11 +701,8 @@
discover_stage
]
- for reuse_param in opts.reuse_vms:
- pref, ssh_templ = reuse_param.split(',', 1)
- stages.append(reuse_vms_stage(pref, ssh_templ))
-
stages.extend([
+ reuse_vms_stage,
log_nodes_statistic,
save_nodes_stage,
connect_stage])
@@ -644,7 +713,8 @@
stages.extend([
deploy_sensors_stage,
run_tests_stage,
- store_raw_results_stage
+ store_raw_results_stage,
+ gather_sensors_stage
])
report_stages = [
@@ -683,7 +753,10 @@
try:
for stage in stages:
- logger.info("Start {0.__name__} stage".format(stage))
+ if stage.__name__.endswith("stage"):
+ logger.info("Start {0.__name__}".format(stage))
+ else:
+ logger.info("Start {0.__name__} stage".format(stage))
stage(cfg_dict, ctx)
except utils.StopTestError as exc:
logger.error(msg_templ.format(stage, exc))
@@ -693,7 +766,10 @@
exc, cls, tb = sys.exc_info()
for stage in ctx.clear_calls_stack[::-1]:
try:
- logger.info("Start {0.__name__} stage".format(stage))
+ if stage.__name__.endswith("stage"):
+ logger.info("Start {0.__name__}".format(stage))
+ else:
+ logger.info("Start {0.__name__} stage".format(stage))
stage(cfg_dict, ctx)
except utils.StopTestError as cleanup_exc:
logger.error(msg_templ.format(stage, cleanup_exc))
@@ -718,6 +794,9 @@
if cfg_dict.get('run_web_ui', False):
stop_web_ui(cfg_dict, ctx)
+ # collector.stop()
+ # open("plop.out", "w").write(repr(dict(collector.stack_counts)))
+
if exc is None:
logger.info("Tests finished successfully")
return 0