tempo commit
diff --git a/wally/run_test.py b/wally/run_test.py
index 359d917..d7e803a 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -1,15 +1,14 @@
 from __future__ import print_function
 
 import os
+import re
 import sys
 import time
-import Queue
 import pprint
 import signal
 import logging
 import argparse
 import functools
-import threading
 import contextlib
 import collections
 
@@ -36,11 +35,16 @@
 from wally.discover import discover, Node
 from wally.timeseries import SensorDatastore
 from wally import utils, report, ssh_utils, start_vms
-from wally.suits import IOPerfTest, PgBenchTest, MysqlTest
 from wally.config import (cfg_dict, load_config, setup_loggers,
                           get_test_files, save_run_params, load_run_params)
 from wally.sensors_utils import with_sensors_util, sensors_info_util
 
+from wally.suits.mysql import MysqlTest
+from wally.suits.itest import TestConfig
+from wally.suits.io.fio import IOPerfTest
+from wally.suits.postgres import PgBenchTest
+
+
 TOOL_TYPE_MAPPER = {
     "io": IOPerfTest,
     "pgbench": PgBenchTest,
@@ -137,100 +141,19 @@
     logger.debug("Raw hardware info in " + cfg['hwinfo_directory'] + " folder")
 
 
-def test_thread(test, node, barrier, res_q):
-    exc = None
-    try:
-        logger.debug("Run preparation for {0}".format(node.get_conn_id()))
-        test.pre_run()
-        logger.debug("Run test for {0}".format(node.get_conn_id()))
-        test.run(barrier)
-    except utils.StopTestError as exc:
-        pass
-    except Exception as exc:
-        msg = "In test {0} for node {1}"
-        msg = msg.format(test, node.get_conn_id())
-        logger.exception(msg)
-        exc = utils.StopTestError(msg, exc)
-
-    try:
-        test.cleanup()
-    except utils.StopTestError as exc1:
-        if exc is None:
-            exc = exc1
-    except:
-        msg = "Duringf cleanup - in test {0} for node {1}"
-        logger.exception(msg.format(test, node))
-
-    if exc is not None:
-        res_q.put(exc)
-
-
-def run_single_test(test_nodes, name, test_cls, params, log_directory,
+def run_single_test(test_nodes, name, params, log_directory,
                     test_local_folder, run_uuid):
-    logger.info("Starting {0} tests".format(name))
-    res_q = Queue.Queue()
-    threads = []
-    coord_q = Queue.Queue()
-    rem_folder = test_local_folder.format(name=name)
 
-    barrier = utils.Barrier(len(test_nodes))
-    for idx, node in enumerate(test_nodes):
-        msg = "Starting {0} test on {1} node"
-        logger.debug(msg.format(name, node.conn_url))
+    test_cls = TOOL_TYPE_MAPPER[name]
+    test_cfg = TestConfig(test_cls.__name__,
+                          params=params,
+                          test_uuid=run_uuid,
+                          nodes=test_nodes,
+                          log_directory=log_directory,
+                          remote_dir=test_local_folder.format(name=name))
 
-        params = params.copy()
-        params['testnodes_count'] = len(test_nodes)
-        test = test_cls(options=params,
-                        is_primary=(idx == 0),
-                        on_result_cb=res_q.put,
-                        test_uuid=run_uuid,
-                        node=node,
-                        remote_dir=rem_folder,
-                        log_directory=log_directory,
-                        coordination_queue=coord_q,
-                        total_nodes_count=len(test_nodes))
-        th = threading.Thread(None, test_thread,
-                              "Test:" + node.get_conn_id(),
-                              (test, node, barrier, res_q))
-        threads.append(th)
-        th.daemon = True
-        th.start()
-
-    th = threading.Thread(None, test_cls.coordination_th,
-                          "Coordination thread",
-                          (coord_q, barrier, len(threads)))
-    threads.append(th)
-    th.daemon = True
-    th.start()
-
-    results = []
-    coord_q.put(None)
-
-    while len(threads) != 0:
-        nthreads = []
-        time.sleep(0.1)
-
-        for th in threads:
-            if not th.is_alive():
-                th.join()
-            else:
-                nthreads.append(th)
-
-        threads = nthreads
-
-        while not res_q.empty():
-            val = res_q.get()
-
-            if isinstance(val, utils.StopTestError):
-                raise val
-
-            if isinstance(val, Exception):
-                msg = "Exception during test execution: {0!s}"
-                raise ValueError(msg.format(val))
-
-            results.append(val)
-
-    return results
+    test = test_cls(test_cfg)
+    return test.run()
 
 
 def suspend_vm_nodes(unused_nodes):
@@ -311,14 +234,12 @@
                              len(resumable_nodes_ids)))
                 start_vms.unpause(resumable_nodes_ids)
 
-            test_cls = TOOL_TYPE_MAPPER[name]
             try:
                 sens_nodes = curr_test_nodes + not_test_nodes
                 with sensors_info_util(cfg, sens_nodes) as sensor_data:
                     t_start = time.time()
                     res = run_single_test(curr_test_nodes,
                                           name,
-                                          test_cls,
                                           params,
                                           dir_path,
                                           cfg['default_test_local_folder'],
@@ -432,27 +353,6 @@
                 ctx.nodes.append(node)
 
 
-def get_creds_openrc(path):
-    fc = open(path).read()
-
-    echo = 'echo "$OS_TENANT_NAME:$OS_USERNAME:$OS_PASSWORD@$OS_AUTH_URL"'
-
-    msg = "Failed to get creads from openrc file"
-    with utils.log_error(msg):
-        data = utils.run_locally(['/bin/bash'],
-                                 input_data=fc + "\n" + echo)
-
-    msg = "Failed to get creads from openrc file: " + data
-    with utils.log_error(msg):
-        data = data.strip()
-        user, tenant, passwd_auth_url = data.split(':', 2)
-        passwd, auth_url = passwd_auth_url.rsplit("@", 1)
-        assert (auth_url.startswith("https://") or
-                auth_url.startswith("http://"))
-
-    return user, passwd, tenant, auth_url
-
-
 def get_OS_credentials(cfg, ctx):
     creds = None
     tenant = None
@@ -461,8 +361,7 @@
         os_cfg = cfg['clouds']['openstack']
         if 'OPENRC' in os_cfg:
             logger.info("Using OS credentials from " + os_cfg['OPENRC'])
-            user, passwd, tenant, auth_url = \
-                get_creds_openrc(os_cfg['OPENRC'])
+            user, passwd, tenant, auth_url = utils.get_creds_openrc(os_cfg['OPENRC'])
         elif 'ENV' in os_cfg:
             logger.info("Using OS credentials from shell environment")
             user, passwd, tenant, auth_url = start_vms.ostack_get_creds()
@@ -488,8 +387,7 @@
                  'tenant': tenant,
                  'auth_url': auth_url}
 
-    msg = "OS_CREDS: user={name} tenant={tenant} auth_url={auth_url}"
-    logger.debug(msg.format(**creds))
+    logger.debug("OS_CREDS: user={name} tenant={tenant} auth_url={auth_url}".format(**creds))
     return creds
 
 
@@ -516,8 +414,7 @@
 
     new_nodes = []
     try:
-        for new_node, node_id in start_vms.launch_vms(params,
-                                                      already_has_count):
+        for new_node, node_id in start_vms.launch_vms(params, already_has_count):
             new_node.roles.append('testnode')
             ctx.nodes.append(new_node)
             os_nodes_ids.append(node_id)
@@ -695,24 +592,23 @@
         ctx.results[tp] = map(cls.load, results)
 
 
+def load_data_from_path(var_dir, _, ctx):
+    ctx.results = {}
+    res_dir = os.path.join(var_dir, 'results')
+    for dir_name in os.listdir(res_dir):
+        dir_path = os.path.join(res_dir, dir_name)
+        if not os.path.isdir(dir_path):
+            continue
+        rr = re.match(r"(?P<type>[a-z]+)_\d+$", dir_name)
+        if rr is None:
+            continue
+        tp = rr.group('type')
+        arr = ctx.results.setdefault(tp, [])
+        arr.extend(TOOL_TYPE_MAPPER[tp].load(dir_path))
+
+
 def load_data_from(var_dir):
-    return functools.partial(load_data_from_file, var_dir)
-
-
-def start_web_ui(cfg, ctx):
-    if webui is None:
-        logger.error("Can't start webui. Install cherrypy module")
-        ctx.web_thread = None
-    else:
-        th = threading.Thread(None, webui.web_main_thread, "webui", (None,))
-        th.daemon = True
-        th.start()
-        ctx.web_thread = th
-
-
-def stop_web_ui(cfg, ctx):
-    webui.web_main_stop()
-    time.sleep(1)
+    return functools.partial(load_data_from_path, var_dir)
 
 
 def parse_args(argv):
@@ -899,9 +795,6 @@
     cfg_dict['no_tests'] = opts.no_tests
     cfg_dict['dont_discover_nodes'] = opts.dont_discover_nodes
 
-    if cfg_dict.get('run_web_ui', False):
-        start_web_ui(cfg_dict, ctx)
-
     for stage in stages:
         ok = False
         with log_stage(stage):
@@ -928,9 +821,6 @@
 
     logger.info("All info stored in {0} folder".format(cfg_dict['var_dir']))
 
-    if cfg_dict.get('run_web_ui', False):
-        stop_web_ui(cfg_dict, ctx)
-
     if exc is None:
         logger.info("Tests finished successfully")
         return 0