tempo commit
diff --git a/wally/run_test.py b/wally/run_test.py
index 359d917..d7e803a 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -1,15 +1,14 @@
from __future__ import print_function
import os
+import re
import sys
import time
-import Queue
import pprint
import signal
import logging
import argparse
import functools
-import threading
import contextlib
import collections
@@ -36,11 +35,16 @@
from wally.discover import discover, Node
from wally.timeseries import SensorDatastore
from wally import utils, report, ssh_utils, start_vms
-from wally.suits import IOPerfTest, PgBenchTest, MysqlTest
from wally.config import (cfg_dict, load_config, setup_loggers,
get_test_files, save_run_params, load_run_params)
from wally.sensors_utils import with_sensors_util, sensors_info_util
+from wally.suits.mysql import MysqlTest
+from wally.suits.itest import TestConfig
+from wally.suits.io.fio import IOPerfTest
+from wally.suits.postgres import PgBenchTest
+
+
TOOL_TYPE_MAPPER = {
"io": IOPerfTest,
"pgbench": PgBenchTest,
@@ -137,100 +141,19 @@
logger.debug("Raw hardware info in " + cfg['hwinfo_directory'] + " folder")
-def test_thread(test, node, barrier, res_q):
- exc = None
- try:
- logger.debug("Run preparation for {0}".format(node.get_conn_id()))
- test.pre_run()
- logger.debug("Run test for {0}".format(node.get_conn_id()))
- test.run(barrier)
- except utils.StopTestError as exc:
- pass
- except Exception as exc:
- msg = "In test {0} for node {1}"
- msg = msg.format(test, node.get_conn_id())
- logger.exception(msg)
- exc = utils.StopTestError(msg, exc)
-
- try:
- test.cleanup()
- except utils.StopTestError as exc1:
- if exc is None:
- exc = exc1
- except:
- msg = "Duringf cleanup - in test {0} for node {1}"
- logger.exception(msg.format(test, node))
-
- if exc is not None:
- res_q.put(exc)
-
-
-def run_single_test(test_nodes, name, test_cls, params, log_directory,
+def run_single_test(test_nodes, name, params, log_directory,
test_local_folder, run_uuid):
- logger.info("Starting {0} tests".format(name))
- res_q = Queue.Queue()
- threads = []
- coord_q = Queue.Queue()
- rem_folder = test_local_folder.format(name=name)
- barrier = utils.Barrier(len(test_nodes))
- for idx, node in enumerate(test_nodes):
- msg = "Starting {0} test on {1} node"
- logger.debug(msg.format(name, node.conn_url))
+ test_cls = TOOL_TYPE_MAPPER[name]
+ test_cfg = TestConfig(test_cls.__name__,
+ params=params,
+ test_uuid=run_uuid,
+ nodes=test_nodes,
+ log_directory=log_directory,
+ remote_dir=test_local_folder.format(name=name))
- params = params.copy()
- params['testnodes_count'] = len(test_nodes)
- test = test_cls(options=params,
- is_primary=(idx == 0),
- on_result_cb=res_q.put,
- test_uuid=run_uuid,
- node=node,
- remote_dir=rem_folder,
- log_directory=log_directory,
- coordination_queue=coord_q,
- total_nodes_count=len(test_nodes))
- th = threading.Thread(None, test_thread,
- "Test:" + node.get_conn_id(),
- (test, node, barrier, res_q))
- threads.append(th)
- th.daemon = True
- th.start()
-
- th = threading.Thread(None, test_cls.coordination_th,
- "Coordination thread",
- (coord_q, barrier, len(threads)))
- threads.append(th)
- th.daemon = True
- th.start()
-
- results = []
- coord_q.put(None)
-
- while len(threads) != 0:
- nthreads = []
- time.sleep(0.1)
-
- for th in threads:
- if not th.is_alive():
- th.join()
- else:
- nthreads.append(th)
-
- threads = nthreads
-
- while not res_q.empty():
- val = res_q.get()
-
- if isinstance(val, utils.StopTestError):
- raise val
-
- if isinstance(val, Exception):
- msg = "Exception during test execution: {0!s}"
- raise ValueError(msg.format(val))
-
- results.append(val)
-
- return results
+ test = test_cls(test_cfg)
+ return test.run()
def suspend_vm_nodes(unused_nodes):
@@ -311,14 +234,12 @@
len(resumable_nodes_ids)))
start_vms.unpause(resumable_nodes_ids)
- test_cls = TOOL_TYPE_MAPPER[name]
try:
sens_nodes = curr_test_nodes + not_test_nodes
with sensors_info_util(cfg, sens_nodes) as sensor_data:
t_start = time.time()
res = run_single_test(curr_test_nodes,
name,
- test_cls,
params,
dir_path,
cfg['default_test_local_folder'],
@@ -432,27 +353,6 @@
ctx.nodes.append(node)
-def get_creds_openrc(path):
- fc = open(path).read()
-
- echo = 'echo "$OS_TENANT_NAME:$OS_USERNAME:$OS_PASSWORD@$OS_AUTH_URL"'
-
- msg = "Failed to get creads from openrc file"
- with utils.log_error(msg):
- data = utils.run_locally(['/bin/bash'],
- input_data=fc + "\n" + echo)
-
- msg = "Failed to get creads from openrc file: " + data
- with utils.log_error(msg):
- data = data.strip()
- user, tenant, passwd_auth_url = data.split(':', 2)
- passwd, auth_url = passwd_auth_url.rsplit("@", 1)
- assert (auth_url.startswith("https://") or
- auth_url.startswith("http://"))
-
- return user, passwd, tenant, auth_url
-
-
def get_OS_credentials(cfg, ctx):
creds = None
tenant = None
@@ -461,8 +361,7 @@
os_cfg = cfg['clouds']['openstack']
if 'OPENRC' in os_cfg:
logger.info("Using OS credentials from " + os_cfg['OPENRC'])
- user, passwd, tenant, auth_url = \
- get_creds_openrc(os_cfg['OPENRC'])
+ user, passwd, tenant, auth_url = utils.get_creds_openrc(os_cfg['OPENRC'])
elif 'ENV' in os_cfg:
logger.info("Using OS credentials from shell environment")
user, passwd, tenant, auth_url = start_vms.ostack_get_creds()
@@ -488,8 +387,7 @@
'tenant': tenant,
'auth_url': auth_url}
- msg = "OS_CREDS: user={name} tenant={tenant} auth_url={auth_url}"
- logger.debug(msg.format(**creds))
+ logger.debug("OS_CREDS: user={name} tenant={tenant} auth_url={auth_url}".format(**creds))
return creds
@@ -516,8 +414,7 @@
new_nodes = []
try:
- for new_node, node_id in start_vms.launch_vms(params,
- already_has_count):
+ for new_node, node_id in start_vms.launch_vms(params, already_has_count):
new_node.roles.append('testnode')
ctx.nodes.append(new_node)
os_nodes_ids.append(node_id)
@@ -695,24 +592,23 @@
ctx.results[tp] = map(cls.load, results)
+def load_data_from_path(var_dir, _, ctx):
+ ctx.results = {}
+ res_dir = os.path.join(var_dir, 'results')
+ for dir_name in os.listdir(res_dir):
+ dir_path = os.path.join(res_dir, dir_name)
+ if not os.path.isdir(dir_path):
+ continue
+ rr = re.match(r"(?P<type>[a-z]+)_\d+$", dir_name)
+ if rr is None:
+ continue
+ tp = rr.group('type')
+ arr = ctx.results.setdefault(tp, [])
+ arr.extend(TOOL_TYPE_MAPPER[tp].load(dir_path))
+
+
def load_data_from(var_dir):
- return functools.partial(load_data_from_file, var_dir)
-
-
-def start_web_ui(cfg, ctx):
- if webui is None:
- logger.error("Can't start webui. Install cherrypy module")
- ctx.web_thread = None
- else:
- th = threading.Thread(None, webui.web_main_thread, "webui", (None,))
- th.daemon = True
- th.start()
- ctx.web_thread = th
-
-
-def stop_web_ui(cfg, ctx):
- webui.web_main_stop()
- time.sleep(1)
+ return functools.partial(load_data_from_path, var_dir)
def parse_args(argv):
@@ -899,9 +795,6 @@
cfg_dict['no_tests'] = opts.no_tests
cfg_dict['dont_discover_nodes'] = opts.dont_discover_nodes
- if cfg_dict.get('run_web_ui', False):
- start_web_ui(cfg_dict, ctx)
-
for stage in stages:
ok = False
with log_stage(stage):
@@ -928,9 +821,6 @@
logger.info("All info stored in {0} folder".format(cfg_dict['var_dir']))
- if cfg_dict.get('run_web_ui', False):
- stop_web_ui(cfg_dict, ctx)
-
if exc is None:
logger.info("Tests finished successfully")
return 0