tempo commit
diff --git a/wally/run_test.py b/wally/run_test.py
index 359d917..d7e803a 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -1,15 +1,14 @@
 from __future__ import print_function
 
 import os
+import re
 import sys
 import time
-import Queue
 import pprint
 import signal
 import logging
 import argparse
 import functools
-import threading
 import contextlib
 import collections
 
@@ -36,11 +35,16 @@
 from wally.discover import discover, Node
 from wally.timeseries import SensorDatastore
 from wally import utils, report, ssh_utils, start_vms
-from wally.suits import IOPerfTest, PgBenchTest, MysqlTest
 from wally.config import (cfg_dict, load_config, setup_loggers,
                           get_test_files, save_run_params, load_run_params)
 from wally.sensors_utils import with_sensors_util, sensors_info_util
 
+from wally.suits.mysql import MysqlTest
+from wally.suits.itest import TestConfig
+from wally.suits.io.fio import IOPerfTest
+from wally.suits.postgres import PgBenchTest
+
+
 TOOL_TYPE_MAPPER = {
     "io": IOPerfTest,
     "pgbench": PgBenchTest,
@@ -137,100 +141,19 @@
     logger.debug("Raw hardware info in " + cfg['hwinfo_directory'] + " folder")
 
 
-def test_thread(test, node, barrier, res_q):
-    exc = None
-    try:
-        logger.debug("Run preparation for {0}".format(node.get_conn_id()))
-        test.pre_run()
-        logger.debug("Run test for {0}".format(node.get_conn_id()))
-        test.run(barrier)
-    except utils.StopTestError as exc:
-        pass
-    except Exception as exc:
-        msg = "In test {0} for node {1}"
-        msg = msg.format(test, node.get_conn_id())
-        logger.exception(msg)
-        exc = utils.StopTestError(msg, exc)
-
-    try:
-        test.cleanup()
-    except utils.StopTestError as exc1:
-        if exc is None:
-            exc = exc1
-    except:
-        msg = "Duringf cleanup - in test {0} for node {1}"
-        logger.exception(msg.format(test, node))
-
-    if exc is not None:
-        res_q.put(exc)
-
-
-def run_single_test(test_nodes, name, test_cls, params, log_directory,
+def run_single_test(test_nodes, name, params, log_directory,
                     test_local_folder, run_uuid):
-    logger.info("Starting {0} tests".format(name))
-    res_q = Queue.Queue()
-    threads = []
-    coord_q = Queue.Queue()
-    rem_folder = test_local_folder.format(name=name)
 
-    barrier = utils.Barrier(len(test_nodes))
-    for idx, node in enumerate(test_nodes):
-        msg = "Starting {0} test on {1} node"
-        logger.debug(msg.format(name, node.conn_url))
+    test_cls = TOOL_TYPE_MAPPER[name]
+    test_cfg = TestConfig(test_cls.__name__,
+                          params=params,
+                          test_uuid=run_uuid,
+                          nodes=test_nodes,
+                          log_directory=log_directory,
+                          remote_dir=test_local_folder.format(name=name))
 
-        params = params.copy()
-        params['testnodes_count'] = len(test_nodes)
-        test = test_cls(options=params,
-                        is_primary=(idx == 0),
-                        on_result_cb=res_q.put,
-                        test_uuid=run_uuid,
-                        node=node,
-                        remote_dir=rem_folder,
-                        log_directory=log_directory,
-                        coordination_queue=coord_q,
-                        total_nodes_count=len(test_nodes))
-        th = threading.Thread(None, test_thread,
-                              "Test:" + node.get_conn_id(),
-                              (test, node, barrier, res_q))
-        threads.append(th)
-        th.daemon = True
-        th.start()
-
-    th = threading.Thread(None, test_cls.coordination_th,
-                          "Coordination thread",
-                          (coord_q, barrier, len(threads)))
-    threads.append(th)
-    th.daemon = True
-    th.start()
-
-    results = []
-    coord_q.put(None)
-
-    while len(threads) != 0:
-        nthreads = []
-        time.sleep(0.1)
-
-        for th in threads:
-            if not th.is_alive():
-                th.join()
-            else:
-                nthreads.append(th)
-
-        threads = nthreads
-
-        while not res_q.empty():
-            val = res_q.get()
-
-            if isinstance(val, utils.StopTestError):
-                raise val
-
-            if isinstance(val, Exception):
-                msg = "Exception during test execution: {0!s}"
-                raise ValueError(msg.format(val))
-
-            results.append(val)
-
-    return results
+    test = test_cls(test_cfg)
+    return test.run()
 
 
 def suspend_vm_nodes(unused_nodes):
@@ -311,14 +234,12 @@
                              len(resumable_nodes_ids)))
                 start_vms.unpause(resumable_nodes_ids)
 
-            test_cls = TOOL_TYPE_MAPPER[name]
             try:
                 sens_nodes = curr_test_nodes + not_test_nodes
                 with sensors_info_util(cfg, sens_nodes) as sensor_data:
                     t_start = time.time()
                     res = run_single_test(curr_test_nodes,
                                           name,
-                                          test_cls,
                                           params,
                                           dir_path,
                                           cfg['default_test_local_folder'],
@@ -432,27 +353,6 @@
                 ctx.nodes.append(node)
 
 
-def get_creds_openrc(path):
-    fc = open(path).read()
-
-    echo = 'echo "$OS_TENANT_NAME:$OS_USERNAME:$OS_PASSWORD@$OS_AUTH_URL"'
-
-    msg = "Failed to get creads from openrc file"
-    with utils.log_error(msg):
-        data = utils.run_locally(['/bin/bash'],
-                                 input_data=fc + "\n" + echo)
-
-    msg = "Failed to get creads from openrc file: " + data
-    with utils.log_error(msg):
-        data = data.strip()
-        user, tenant, passwd_auth_url = data.split(':', 2)
-        passwd, auth_url = passwd_auth_url.rsplit("@", 1)
-        assert (auth_url.startswith("https://") or
-                auth_url.startswith("http://"))
-
-    return user, passwd, tenant, auth_url
-
-
 def get_OS_credentials(cfg, ctx):
     creds = None
     tenant = None
@@ -461,8 +361,7 @@
         os_cfg = cfg['clouds']['openstack']
         if 'OPENRC' in os_cfg:
             logger.info("Using OS credentials from " + os_cfg['OPENRC'])
-            user, passwd, tenant, auth_url = \
-                get_creds_openrc(os_cfg['OPENRC'])
+            user, passwd, tenant, auth_url = utils.get_creds_openrc(os_cfg['OPENRC'])
         elif 'ENV' in os_cfg:
             logger.info("Using OS credentials from shell environment")
             user, passwd, tenant, auth_url = start_vms.ostack_get_creds()
@@ -488,8 +387,7 @@
                  'tenant': tenant,
                  'auth_url': auth_url}
 
-    msg = "OS_CREDS: user={name} tenant={tenant} auth_url={auth_url}"
-    logger.debug(msg.format(**creds))
+    logger.debug("OS_CREDS: user={name} tenant={tenant} auth_url={auth_url}".format(**creds))
     return creds
 
 
@@ -516,8 +414,7 @@
 
     new_nodes = []
     try:
-        for new_node, node_id in start_vms.launch_vms(params,
-                                                      already_has_count):
+        for new_node, node_id in start_vms.launch_vms(params, already_has_count):
             new_node.roles.append('testnode')
             ctx.nodes.append(new_node)
             os_nodes_ids.append(node_id)
@@ -695,24 +592,23 @@
         ctx.results[tp] = map(cls.load, results)
 
 
+def load_data_from_path(var_dir, _, ctx):
+    ctx.results = {}
+    res_dir = os.path.join(var_dir, 'results')
+    for dir_name in os.listdir(res_dir):
+        dir_path = os.path.join(res_dir, dir_name)
+        if not os.path.isdir(dir_path):
+            continue
+        rr = re.match(r"(?P<type>[a-z]+)_\d+$", dir_name)
+        if rr is None:
+            continue
+        tp = rr.group('type')
+        arr = ctx.results.setdefault(tp, [])
+        arr.extend(TOOL_TYPE_MAPPER[tp].load(dir_path))
+
+
 def load_data_from(var_dir):
-    return functools.partial(load_data_from_file, var_dir)
-
-
-def start_web_ui(cfg, ctx):
-    if webui is None:
-        logger.error("Can't start webui. Install cherrypy module")
-        ctx.web_thread = None
-    else:
-        th = threading.Thread(None, webui.web_main_thread, "webui", (None,))
-        th.daemon = True
-        th.start()
-        ctx.web_thread = th
-
-
-def stop_web_ui(cfg, ctx):
-    webui.web_main_stop()
-    time.sleep(1)
+    return functools.partial(load_data_from_path, var_dir)
 
 
 def parse_args(argv):
@@ -899,9 +795,6 @@
     cfg_dict['no_tests'] = opts.no_tests
     cfg_dict['dont_discover_nodes'] = opts.dont_discover_nodes
 
-    if cfg_dict.get('run_web_ui', False):
-        start_web_ui(cfg_dict, ctx)
-
     for stage in stages:
         ok = False
         with log_stage(stage):
@@ -928,9 +821,6 @@
 
     logger.info("All info stored in {0} folder".format(cfg_dict['var_dir']))
 
-    if cfg_dict.get('run_web_ui', False):
-        stop_web_ui(cfg_dict, ctx)
-
     if exc is None:
         logger.info("Tests finished successfully")
         return 0
diff --git a/wally/suits/io/fio.py b/wally/suits/io/fio.py
new file mode 100644
index 0000000..f7fc9bc
--- /dev/null
+++ b/wally/suits/io/fio.py
@@ -0,0 +1,702 @@
+import re
+import time
+import json
+import os.path
+import logging
+import datetime
+import functools
+import subprocess
+import collections
+
+import yaml
+import paramiko
+import texttable
+from paramiko.ssh_exception import SSHException
+from concurrent.futures import ThreadPoolExecutor
+
+from wally.pretty_yaml import dumps
+from wally.statistic import round_3_digit, data_property
+from wally.utils import ssize2b, sec_to_str, StopTestError, Barrier, get_os
+from wally.ssh_utils import (save_to_remote, read_from_remote, BGSSHTask, reconnect)
+
+from .fio_task_parser import (execution_time, fio_cfg_compile,
+                              get_test_summary, get_test_sync_mode)
+from ..itest import TimeSeriesValue, PerfTest, TestResults, run_on_node
+
+logger = logging.getLogger("wally")
+
+
+# Results folder structure
+# results/
+#     {loadtype}_{num}/
+#         config.yaml
+#         ......
+
+
+class NoData(object):
+    pass
+
+
+def cached_prop(func):
+    @property
+    @functools.wraps(func)
+    def closure(self):
+        val = getattr(self, "_" + func.__name__)
+        if val is NoData:
+            val = func(self)
+            setattr(self, "_" + func.__name__, val)
+        return val
+    return closure
+
+
+def load_fio_log_file(fname):
+    with open(fname) as fd:
+        it = [ln.split(',')[:2] for ln in fd]
+    vals = [(float(off) / 1000, float(val.strip())) for off, val in it]
+    return TimeSeriesValue(vals)
+
+
+def load_test_results(cls, folder, run_num):
+    res = {}
+    params = None
+
+    fn = os.path.join(folder, str(run_num) + '_params.yaml')
+    params = yaml.load(open(fn).read())
+
+    conn_ids = set()
+    for fname in os.listdir(folder):
+        rr = r"{0}_(?P<conn_id>.*?)_(?P<type>[^_.]*)\.\d+\.log$".format(run_num)
+        rm = re.match(rr, fname)
+        if rm is None:
+            continue
+
+        conn_id_s = rm.group('conn_id')
+        conn_id = conn_id_s.replace('_', ':')
+        ftype = rm.group('type')
+
+        if ftype not in ('iops', 'bw', 'lat'):
+            continue
+
+        try:
+            ts = load_fio_log_file(os.path.join(folder, fname))
+            if ftype in res:
+                assert conn_id not in res[ftype]
+
+            res.setdefault(ftype, {})[conn_id] = ts
+        except AssertionError:
+            pass
+
+        conn_ids.add(conn_id)
+
+    raw_res = {}
+    for conn_id in conn_ids:
+        fn = os.path.join(folder, "{0}_{1}_rawres.json".format(run_num, conn_id_s))
+
+        # remove message hack
+        fc = "{" + open(fn).read().split('{', 1)[1]
+        raw_res[conn_id] = json.loads(fc)
+
+    return cls(params, res, raw_res)
+
+
+class Attrmapper(object):
+    def __init__(self, dct):
+        self.__dct = dct
+
+    def __getattr__(self, name):
+        try:
+            return self.__dct[name]
+        except KeyError:
+            raise AttributeError(name)
+
+
+class DiskPerfInfo(object):
+    def __init__(self, name, summary, params, testnodes_count):
+        self.name = name
+        self.bw = None
+        self.iops = None
+        self.lat = None
+        self.lat_50 = None
+        self.lat_95 = None
+
+        self.raw_bw = []
+        self.raw_iops = []
+        self.raw_lat = []
+
+        self.params = params
+        self.testnodes_count = testnodes_count
+        self.summary = summary
+        self.p = Attrmapper(self.params['vals'])
+
+        self.sync_mode = get_test_sync_mode(self.params['vals'])
+        self.concurence = self.params['vals'].get('numjobs', 1)
+
+
+def get_lat_perc_50_95(lat_mks):
+    curr_perc = 0
+    perc_50 = None
+    perc_95 = None
+    pkey = None
+    for key, val in sorted(lat_mks.items()):
+        if curr_perc + val >= 50 and perc_50 is None:
+            if pkey is None or val < 1.:
+                perc_50 = key
+            else:
+                perc_50 = (50. - curr_perc) / val * (key - pkey) + pkey
+
+        if curr_perc + val >= 95:
+            if pkey is None or val < 1.:
+                perc_95 = key
+            else:
+                perc_95 = (95. - curr_perc) / val * (key - pkey) + pkey
+            break
+
+        pkey = key
+        curr_perc += val
+
+    return perc_50 / 1000., perc_95 / 1000.
+
+
+def prepare(ramp_time, data, avg_interval):
+    if data is None:
+        return data
+
+    res = {}
+    for key, ts_data in data.items():
+        if ramp_time > 0:
+            ts_data = ts_data.skip(ramp_time)
+
+        res[key] = ts_data.derived(avg_interval)
+    return res
+
+
+class IOTestResult(TestResults):
+    """
+    Fio run results
+    config: TestConfig
+    fio_task: FioJobSection
+    ts_results: {str: TimeSeriesValue}
+    raw_result: ????
+    run_interval:(float, float) - test tun time, used for sensors
+    """
+    def __init__(self, config, fio_task, ts_results, raw_result, run_interval):
+
+        self.name = fio_task.name.split("_")[0]
+        self.fio_task = fio_task
+
+        ramp_time = fio_task.vals.get('ramp_time', 0)
+
+        self.bw = prepare(ramp_time, ts_results.get('bw'), 1.0)
+        self.lat = prepare(ramp_time, ts_results.get('lat'), 1.0)
+        self.iops = prepare(ramp_time, ts_results.get('iops'), 1.0)
+        # self.slat = drop_warmup(res.get('clat', None), self.params)
+        # self.clat = drop_warmup(res.get('slat', None), self.params)
+
+        res = {"bw": self.bw, "lat": self.lat, "iops": self.iops}
+
+        self.sensors_data = None
+        self._pinfo = None
+        TestResults.__init__(self, config, res, raw_result, run_interval)
+
+    def summary(self):
+        return get_test_summary(self.fio_task) + "vm" \
+               + str(len(self.config.nodes))
+
+    def get_yamable(self):
+        return self.summary()
+
+    @property
+    def disk_perf_info(self):
+        if self._pinfo is not None:
+            return self._pinfo
+
+        lat_mks = collections.defaultdict(lambda: 0)
+        num_res = 0
+
+        for _, result in self.raw_result.items():
+            num_res += len(result['jobs'])
+            for job_info in result['jobs']:
+                for k, v in job_info['latency_ms'].items():
+                    if isinstance(k, basestring) and k.startswith('>='):
+                        lat_mks[int(k[2:]) * 1000] += v
+                    else:
+                        lat_mks[int(k) * 1000] += v
+
+                for k, v in job_info['latency_us'].items():
+                    lat_mks[int(k)] += v
+
+        for k, v in lat_mks.items():
+            lat_mks[k] = float(v) / num_res
+
+        testnodes_count = len(self.fio_raw_res)
+
+        pinfo = DiskPerfInfo(self.name,
+                             self.summary(),
+                             self.params,
+                             testnodes_count)
+
+        pinfo.raw_bw = [res.vals() for res in self.bw.values()]
+        pinfo.raw_iops = [res.vals() for res in self.iops.values()]
+        pinfo.raw_lat = [res.vals() for res in self.lat.values()]
+
+        pinfo.bw = data_property(map(sum, zip(*pinfo.raw_bw)))
+        pinfo.iops = data_property(map(sum, zip(*pinfo.raw_iops)))
+        pinfo.lat = data_property(sum(pinfo.raw_lat, []))
+        pinfo.lat_50, pinfo.lat_95 = get_lat_perc_50_95(lat_mks)
+
+        self._pinfo = pinfo
+
+        return pinfo
+
+
+class IOPerfTest(PerfTest):
+    tcp_conn_timeout = 30
+    max_pig_timeout = 5
+    soft_runcycle = 5 * 60
+
+    def __init__(self, config):
+        PerfTest.__init__(self, config)
+
+        get = self.config.params.get
+        do_get = self.config.params.__getitem__
+
+        self.config_fname = do_get('cfg')
+
+        if '/' not in self.config_fname and '.' not in self.config_fname:
+            cfgs_dir = os.path.dirname(__file__)
+            self.config_fname = os.path.join(cfgs_dir,
+                                             self.config_fname + '.cfg')
+
+        self.alive_check_interval = get('alive_check_interval')
+        self.use_system_fio = get('use_system_fio', False)
+
+        self.config_params = get('params', {}).copy()
+
+        self.io_py_remote = self.join_remote("agent.py")
+        self.results_file = self.join_remote("results.json")
+        self.pid_file = self.join_remote("pid")
+        self.task_file = self.join_remote("task.cfg")
+        self.sh_file = self.join_remote("cmd.sh")
+        self.err_out_file = self.join_remote("fio_err_out")
+        self.exit_code_file = self.join_remote("exit_code")
+
+        self.use_sudo = get("use_sudo", True)
+        self.test_logging = get("test_logging", False)
+
+        self.raw_cfg = open(self.config_fname).read()
+        self.fio_configs = fio_cfg_compile(self.raw_cfg,
+                                           self.config_fname,
+                                           self.config_params,
+                                           split_on_names=self.test_logging)
+        self.fio_configs = list(self.fio_configs)
+
+    @classmethod
+    def load(cls, folder):
+        for fname in os.listdir(folder):
+            if re.match("\d+_params.yaml$", fname):
+                num = int(fname.split('_')[0])
+                yield load_test_results(IOTestResult, folder, num)
+
+    def cleanup(self):
+        # delete_file(conn, self.io_py_remote)
+        # Need to remove tempo files, used for testing
+        pass
+
+    def prefill_test_files(self, files, rossh):
+        cmd_templ = "fio --name=xxx --filename={0} --direct=1" + \
+                    " --bs=4m --size={1}m --rw=write"
+
+        if self.use_sudo:
+            cmd_templ = "sudo " + cmd_templ
+
+        ssize = 0
+        stime = time.time()
+
+        for fname, curr_sz in files.items():
+            cmd = cmd_templ.format(fname, curr_sz)
+            ssize += curr_sz
+
+            rossh(cmd, timeout=curr_sz)
+
+        ddtime = time.time() - stime
+        if ddtime > 1E-3:
+            fill_bw = int(ssize / ddtime)
+            mess = "Initiall dd fill bw is {0} MiBps for this vm"
+            logger.info(mess.format(fill_bw))
+        return fill_bw
+
+    def install_utils(self, rossh, max_retry=3, timeout=5):
+        need_install = []
+        packs = [('screen', 'screen')]
+
+        if self.use_system_fio:
+            packs.append(('fio', 'fio'))
+        else:
+            # define OS and x32/x64
+            # copy appropriate fio
+            # add fio deps
+            pass
+
+        for bin_name, package in packs:
+            if bin_name is None:
+                need_install.append(package)
+                continue
+
+            try:
+                rossh('which ' + bin_name, nolog=True)
+            except OSError:
+                need_install.append(package)
+
+        if len(need_install) == 0:
+            return
+
+        if 'redhat' == get_os(rossh):
+            cmd = "sudo yum -y install " + " ".join(need_install)
+        else:
+            cmd = "sudo apt-get -y install " + " ".join(need_install)
+
+        for _ in range(max_retry):
+            try:
+                rossh(cmd)
+                break
+            except OSError as err:
+                time.sleep(timeout)
+        else:
+            raise OSError("Can't install - " + str(err))
+
+    def pre_run(self):
+        prefill = False
+        prefill = self.config.options.get('prefill_files', True)
+
+        if prefill:
+            files = {}
+            for cfg_slice in self.fio_configs:
+                for section in cfg_slice:
+                    sz = ssize2b(section.vals['size'])
+                    msz = sz / (1024 ** 2)
+
+                    if sz % (1024 ** 2) != 0:
+                        msz += 1
+
+                    fname = section.vals['filename']
+
+                    # if already has other test with the same file name
+                    # take largest size
+                    files[fname] = max(files.get(fname, 0), msz)
+        else:
+            files = None
+            logger.warning("Prefilling of test files is disabled")
+
+        with ThreadPoolExecutor(len(self.config.nodes)) as pool:
+            fc = functools.partial(self.pre_run_th, files=files)
+            list(pool.map(fc, self.config.nodes))
+
+    def pre_run_th(self, node, files):
+        # fill files with pseudo-random data
+        rossh = run_on_node(node)
+
+        try:
+            cmd = 'mkdir -p "{0}"'.format(self.config.remote_dir)
+            if self.use_sudo:
+                cmd = "sudo " + cmd
+                cmd += " ; sudo chown {0} {1}".format(self.node.get_user(),
+                                                      self.config.remote_dir)
+
+            rossh(cmd)
+        except Exception as exc:
+            msg = "Failed to create folder {0} on remote {1}. Error: {2!s}"
+            msg = msg.format(self.config.remote_dir, self.node.get_conn_id(), exc)
+            logger.exception(msg)
+            raise StopTestError(msg, exc)
+
+        if files is not None:
+            self.prefill_test_files(rossh, files)
+
+        self.install_utils(rossh)
+
+    def run(self):
+        if len(self.fio_configs) > 1:
+            # +10% - is a rough estimation for additional operations
+            # like sftp, etc
+            exec_time = int(sum(map(execution_time, self.fio_configs)) * 1.1)
+            exec_time_s = sec_to_str(exec_time)
+            now_dt = datetime.datetime.now()
+            end_dt = now_dt + datetime.timedelta(0, exec_time)
+            msg = "Entire test should takes aroud: {0} and finished at {1}"
+            logger.info(msg.format(exec_time_s,
+                                   end_dt.strftime("%H:%M:%S")))
+
+        tname = os.path.basename(self.config_fname)
+        if tname.endswith('.cfg'):
+            tname = tname[:-4]
+
+        barrier = Barrier(len(self.config.nodes))
+        results = []
+
+        with ThreadPoolExecutor(len(self.config.nodes)) as pool:
+            for pos, fio_cfg in enumerate(self.fio_configs):
+                logger.info("Will run {0} test".format(fio_cfg.name))
+
+                templ = "Test should takes about {0}." + \
+                        " Should finish at {1}," + \
+                        " will wait at most till {2}"
+                exec_time = execution_time(fio_cfg)
+                exec_time_str = sec_to_str(exec_time)
+                timeout = int(exec_time + max(300, exec_time))
+
+                now_dt = datetime.datetime.now()
+                end_dt = now_dt + datetime.timedelta(0, exec_time)
+                wait_till = now_dt + datetime.timedelta(0, timeout)
+
+                logger.info(templ.format(exec_time_str,
+                                         end_dt.strftime("%H:%M:%S"),
+                                         wait_till.strftime("%H:%M:%S")))
+
+                func = functools.partial(self.do_run,
+                                         barrier=barrier,
+                                         fio_cfg=fio_cfg,
+                                         pos=pos)
+
+                max_retr = 3
+                for idx in range(max_retr):
+                    try:
+                        intervals = list(pool.map(func, self.config.nodes))
+                        break
+                    except (EnvironmentError, SSHException) as exc:
+                        logger.exception("During fio run")
+                        if idx == max_retr - 1:
+                            raise StopTestError("Fio failed", exc)
+
+                    logger.info("Sleeping 30s and retrying")
+                    time.sleep(30)
+
+                fname = "{0}_task.fio".format(pos)
+                with open(os.path.join(self.config.log_directory, fname), "w") as fd:
+                    fd.write(str(fio_cfg))
+
+                params = {'vm_count': len(self.config.nodes)}
+                params['name'] = fio_cfg.name
+                params['vals'] = dict(fio_cfg.vals.items())
+                params['intervals'] = intervals
+                params['nodes'] = [node.get_conn_id() for node in self.config.nodes]
+
+                fname = "{0}_params.yaml".format(pos)
+                with open(os.path.join(self.config.log_directory, fname), "w") as fd:
+                    fd.write(dumps(params))
+
+                res = load_test_results(self.config.log_directory, pos)
+                results.append(res)
+        return results
+
+    def do_run(self, node, barrier, fio_cfg, pos, nolog=False):
+        exec_folder = os.path.dirname(self.task_file)
+        bash_file = "#!/bin/bash\n" + \
+                    "cd {exec_folder}\n" + \
+                    "fio --output-format=json --output={out_file} " + \
+                    "--alloc-size=262144 {job_file} " + \
+                    " >{err_out_file} 2>&1 \n" + \
+                    "echo $? >{res_code_file}\n"
+
+        bash_file = bash_file.format(out_file=self.results_file,
+                                     job_file=self.task_file,
+                                     err_out_file=self.err_out_file,
+                                     res_code_file=self.exit_code_file,
+                                     exec_folder=exec_folder)
+
+        run_on_node(node)("cd {0} ; rm -rf *".format(exec_folder), nolog=True)
+
+        with node.connection.open_sftp() as sftp:
+            print ">>>>", self.task_file
+            save_to_remote(sftp, self.task_file, str(fio_cfg))
+            save_to_remote(sftp, self.sh_file, bash_file)
+
+        exec_time = execution_time(fio_cfg)
+
+        timeout = int(exec_time + max(300, exec_time))
+        soft_tout = exec_time
+
+        begin = time.time()
+
+        if self.config.options.get("use_sudo", True):
+            sudo = "sudo "
+        else:
+            sudo = ""
+
+        fnames_before = run_on_node(node)("ls -1 " + exec_folder, nolog=True)
+
+        barrier.wait()
+
+        task = BGSSHTask(node, self.config.options.get("use_sudo", True))
+        task.start(sudo + "bash " + self.sh_file)
+
+        while True:
+            try:
+                task.wait(soft_tout, timeout)
+                break
+            except paramiko.SSHException:
+                pass
+
+            try:
+                node.connection.close()
+            except:
+                pass
+
+            reconnect(node.connection, node.conn_url)
+
+        end = time.time()
+        rossh = run_on_node(node)
+        fnames_after = rossh("ls -1 " + exec_folder, nolog=True)
+
+        conn_id = node.get_conn_id().replace(":", "_")
+        if not nolog:
+            logger.debug("Test on node {0} is finished".format(conn_id))
+
+        log_files_pref = []
+        if 'write_lat_log' in fio_cfg.vals:
+            fname = fio_cfg.vals['write_lat_log']
+            log_files_pref.append(fname + '_clat')
+            log_files_pref.append(fname + '_lat')
+            log_files_pref.append(fname + '_slat')
+
+        if 'write_iops_log' in fio_cfg.vals:
+            fname = fio_cfg.vals['write_iops_log']
+            log_files_pref.append(fname + '_iops')
+
+        if 'write_bw_log' in fio_cfg.vals:
+            fname = fio_cfg.vals['write_bw_log']
+            log_files_pref.append(fname + '_bw')
+
+        files = collections.defaultdict(lambda: [])
+        all_files = [os.path.basename(self.results_file)]
+        new_files = set(fnames_after.split()) - set(fnames_before.split())
+        for fname in new_files:
+            if fname.endswith('.log') and fname.split('.')[0] in log_files_pref:
+                name, _ = os.path.splitext(fname)
+                if fname.count('.') == 1:
+                    tp = name.split("_")[-1]
+                    cnt = 0
+                else:
+                    tp_cnt = name.split("_")[-1]
+                    tp, cnt = tp_cnt.split('.')
+                files[tp].append((int(cnt), fname))
+                all_files.append(fname)
+
+        arch_name = self.join_remote('wally_result.tar.gz')
+        tmp_dir = os.path.join(self.config.log_directory, 'tmp_' + conn_id)
+        os.mkdir(tmp_dir)
+        loc_arch_name = os.path.join(tmp_dir, 'wally_result.{0}.tar.gz'.format(conn_id))
+        file_full_names = " ".join(all_files)
+
+        try:
+            os.unlink(loc_arch_name)
+        except:
+            pass
+
+        with node.connection.open_sftp() as sftp:
+            exit_code = read_from_remote(sftp, self.exit_code_file)
+            err_out = read_from_remote(sftp, self.err_out_file)
+            exit_code = exit_code.strip()
+
+            if exit_code != '0':
+                msg = "fio exit with code {0}: {1}".format(exit_code, err_out)
+                logger.critical(msg.strip())
+                raise StopTestError("fio failed")
+
+            rossh("rm -f {0}".format(arch_name), nolog=True)
+            cmd = "cd {0} ; tar zcvf {1} {2}".format(exec_folder, arch_name, file_full_names)
+            rossh(cmd, nolog=True)
+            sftp.get(arch_name, loc_arch_name)
+
+        cmd = "cd {0} ; tar xvzf {1} >/dev/null".format(tmp_dir, loc_arch_name)
+        subprocess.check_call(cmd, shell=True)
+        os.unlink(loc_arch_name)
+
+        for ftype, fls in files.items():
+            for idx, fname in fls:
+                cname = os.path.join(tmp_dir, fname)
+                loc_fname = "{0}_{1}_{2}.{3}.log".format(pos, conn_id, ftype, idx)
+                loc_path = os.path.join(self.config.log_directory, loc_fname)
+                os.rename(cname, loc_path)
+
+        cname = os.path.join(tmp_dir,
+                             os.path.basename(self.results_file))
+        loc_fname = "{0}_{1}_rawres.json".format(pos, conn_id)
+        loc_path = os.path.join(self.config.log_directory, loc_fname)
+        os.rename(cname, loc_path)
+
+        os.rmdir(tmp_dir)
+        return begin, end
+
+    @classmethod
+    def format_for_console(cls, data, dinfo):
+        """
+        create a table with io performance report
+        for console
+        """
+
+        def getconc(data):
+            th_count = data.params['vals'].get('numjobs')
+
+            if th_count is None:
+                th_count = data.params['vals'].get('concurence', 1)
+            return th_count
+
+        def key_func(data):
+            p = data.params['vals']
+
+            th_count = getconc(data)
+
+            return (data.name.rsplit("_", 1)[0],
+                    p['rw'],
+                    get_test_sync_mode(data.params),
+                    ssize2b(p['blocksize']),
+                    int(th_count) * data.testnodes_count)
+
+        tab = texttable.Texttable(max_width=120)
+        tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
+        tab.set_cols_align(["l", "l", "r", "r", "r", "r", "r", "r", "r"])
+
+        items = sorted(dinfo.values(), key=key_func)
+
+        prev_k = None
+        header = ["Name", "Description", "iops\ncum", "KiBps\ncum",
+                  "Cnf\n95%", "Dev%", "iops\nper vm", "KiBps\nper vm", "lat\nms"]
+
+        for data in items:
+            curr_k = key_func(data)[:4]
+
+            if prev_k is not None:
+                if prev_k != curr_k:
+                    tab.add_row(
+                        ["-------", "-----------", "-----", "------",
+                         "---", "----", "------", "---", "-----"])
+
+            prev_k = curr_k
+
+            test_dinfo = dinfo[(data.name, data.summary)]
+
+            iops, _ = test_dinfo.iops.rounded_average_conf()
+
+            bw, bw_conf = test_dinfo.bw.rounded_average_conf()
+            _, bw_dev = test_dinfo.bw.rounded_average_dev()
+            conf_perc = int(round(bw_conf * 100 / bw))
+            dev_perc = int(round(bw_dev * 100 / bw))
+
+            lat, _ = test_dinfo.lat.rounded_average_conf()
+            lat = round_3_digit(int(lat) // 1000)
+
+            iops_per_vm = round_3_digit(iops / data.testnodes_count)
+            bw_per_vm = round_3_digit(bw / data.testnodes_count)
+
+            iops = round_3_digit(iops)
+            bw = round_3_digit(bw)
+
+            params = (data.name.rsplit('_', 1)[0],
+                      data.summary, int(iops), int(bw), str(conf_perc),
+                      str(dev_perc),
+                      int(iops_per_vm), int(bw_per_vm), lat)
+            tab.add_row(params)
+
+        tab.header(header)
+
+        return tab.draw()
diff --git a/wally/suits/io/fio_task_parser.py b/wally/suits/io/fio_task_parser.py
index ade0028..e8ec6f9 100644
--- a/wally/suits/io/fio_task_parser.py
+++ b/wally/suits/io/fio_task_parser.py
@@ -305,8 +305,13 @@
 
 
 def get_test_sync_mode(sec):
-    is_sync = str(sec.vals.get("sync", "0")) == "1"
-    is_direct = str(sec.vals.get("direct", "0")) == "1"
+    if isinstance(sec, dict):
+        vals = sec
+    else:
+        vals = sec.vals
+
+    is_sync = str(vals.get("sync", "0")) == "1"
+    is_direct = str(vals.get("direct", "0")) == "1"
 
     if is_sync and is_direct:
         return 'x'
@@ -319,23 +324,28 @@
 
 
 def get_test_summary(sec):
+    if isinstance(sec, dict):
+        vals = sec
+    else:
+        vals = sec.vals
+
     rw = {"randread": "rr",
           "randwrite": "rw",
           "read": "sr",
           "write": "sw",
           "randrw": "rm",
           "rw": "sm",
-          "readwrite": "sm"}[sec.vals["rw"]]
+          "readwrite": "sm"}[vals["rw"]]
 
     sync_mode = get_test_sync_mode(sec)
-    th_count = sec.vals.get('numjobs')
+    th_count = vals.get('numjobs')
 
     if th_count is None:
-        th_count = sec.vals.get('concurence', 1)
+        th_count = vals.get('concurence', 1)
 
     return "{0}{1}{2}th{3}".format(rw,
                                    sync_mode,
-                                   sec.vals['blocksize'],
+                                   vals['blocksize'],
                                    th_count)
 
 
@@ -343,65 +353,6 @@
     return sec.vals.get('ramp_time', 0) + sec.vals.get('runtime', 0)
 
 
-def slice_config(sec_iter, runcycle=None, max_jobs=1000, split_on_names=False):
-    jcount = 0
-    runtime = 0
-    curr_slice = []
-    prev_name = None
-
-    for pos, sec in enumerate(sec_iter):
-
-        if prev_name is not None:
-            split_here = False
-
-            if split_on_names and prev_name != sec.name:
-                split_here = True
-
-            if split_here:
-                yield curr_slice
-                curr_slice = []
-                runtime = 0
-                jcount = 0
-
-        prev_name = sec.name
-
-        jc = sec.vals.get('numjobs', 1)
-        msg = "numjobs should be integer, not {0!r}".format(jc)
-        assert isinstance(jc, int), msg
-
-        curr_task_time = execution_time(sec)
-
-        if jc > max_jobs:
-            err_templ = "Can't process job {0!r} - too large numjobs"
-            raise ValueError(err_templ.format(sec.name))
-
-        if runcycle is not None and len(curr_slice) != 0:
-            rc_ok = curr_task_time + runtime <= runcycle
-        else:
-            rc_ok = True
-
-        if jc + jcount <= max_jobs and rc_ok:
-            runtime += curr_task_time
-            jcount += jc
-            curr_slice.append(sec)
-            continue
-
-        assert len(curr_slice) != 0
-        yield curr_slice
-
-        if '_ramp_time' in sec.vals:
-            sec.vals['ramp_time'] = sec.vals.pop('_ramp_time')
-            curr_task_time = execution_time(sec)
-
-        runtime = curr_task_time
-        jcount = jc
-        curr_slice = [sec]
-        prev_name = None
-
-    if curr_slice != []:
-        yield curr_slice
-
-
 def parse_all_in_1(source, fname=None):
     return fio_config_parse(fio_config_lexer(source, fname))
 
@@ -417,8 +368,7 @@
     it = (apply_params(sec, test_params) for sec in it)
     it = flatmap(process_cycles, it)
     it = flatmap(process_repeats, it)
-    it = itertools.imap(finall_process, it)
-    return slice_config(it, **slice_params)
+    return itertools.imap(finall_process, it)
 
 
 def parse_args(argv):
diff --git a/wally/suits/io/rrd.cfg b/wally/suits/io/rrd.cfg
index 78e1f0e..42ce09f 100644
--- a/wally/suits/io/rrd.cfg
+++ b/wally/suits/io/rrd.cfg
@@ -2,8 +2,8 @@
 include defaults.cfg
 
 size={TEST_FILE_SIZE}
-ramp_time=5
-runtime=40
+ramp_time=0
+runtime=5
 
 # ---------------------------------------------------------------------
 [rws_{TEST_SUMM}]
diff --git a/wally/suits/io/verify.cfg b/wally/suits/io/verify.cfg
index 92c78e5..35cf48a 100644
--- a/wally/suits/io/verify.cfg
+++ b/wally/suits/io/verify.cfg
@@ -2,20 +2,21 @@
 include defaults.cfg
 
 size={TEST_FILE_SIZE}
-ramp_time=5
-runtime=10
+ramp_time=90
+runtime=600
 
 # ---------------------------------------------------------------------
 [verify_{TEST_SUMM}]
 blocksize=4k
 rw=randwrite
 direct=1
+# numjobs=5
 
 # ---------------------------------------------------------------------
-# [verify_{TEST_SUMM}]
-# blocksize=4k
-# rw=randwrite
-# sync=1
+[verify_{TEST_SUMM}]
+blocksize=4k
+rw=randwrite
+sync=1
 
 # ---------------------------------------------------------------------
 # [verify_{TEST_SUMM}]
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index 1cbf88c..7564722 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -1,104 +1,56 @@
 import abc
+import time
+import logging
 import os.path
 import functools
 
+from concurrent.futures import ThreadPoolExecutor
 
+from wally.utils import Barrier, StopTestError
+from wally.statistic import data_property
 from wally.ssh_utils import run_over_ssh, copy_paths
 
 
-def cached_prop(func):
-    @property
-    @functools.wraps(func)
-    def closure(self):
-        val = getattr(self, "_" + func.__name__)
-        if val is NoData:
-            val = func(self)
-            setattr(self, "_" + func.__name__, val)
-        return val
-    return closure
+logger = logging.getLogger("wally")
 
 
-class NoData(object):
-    pass
+class TestConfig(object):
+    """
+    this class describe test input configuration
 
-
-class VMThData(object):
-    "store set of values for VM_COUNT * TH_COUNT"
-
-
-class IOTestResult(object):
-    def __init__(self):
-        self.run_config = None
-        self.suite_config = None
-        self.run_interval = None
-
-        self.bw = None
-        self.lat = None
-        self.iops = None
-        self.slat = None
-        self.clat = None
-
-        self.fio_section = None
-
-        self._lat_log = NoData
-        self._iops_log = NoData
-        self._bw_log = NoData
-
-        self._sensors_data = NoData
-        self._raw_resuls = NoData
-
-    def to_jsonable(self):
-        pass
-
-    @property
-    def thread_count(self):
-        pass
-
-    @property
-    def sync_mode(self):
-        pass
-
-    @property
-    def abbrev_name(self):
-        pass
-
-    @property
-    def full_name(self):
-        pass
-
-    @cached_prop
-    def lat_log(self):
-        pass
-
-    @cached_prop
-    def iops_log(self):
-        pass
-
-    @cached_prop
-    def bw_log(self):
-        pass
-
-    @cached_prop
-    def sensors_data(self):
-        pass
-
-    @cached_prop
-    def raw_resuls(self):
-        pass
+    test_type:str - test type name
+    params:{str:Any} - parameters from yaml file for this test
+    test_uuid:str - UUID to be used to create filenames and Co
+    log_directory:str - local directory to store results
+    nodes:[Node] - node to run tests on
+    remote_dir:str - directory on nodes to be used for local files
+    """
+    def __init__(self, test_type, params, test_uuid, nodes,
+                 log_directory, remote_dir):
+        self.test_type = test_type
+        self.params = params
+        self.test_uuid = test_uuid
+        self.log_directory = log_directory
+        self.nodes = nodes
+        self.remote_dir = remote_dir
 
 
 class TestResults(object):
-    def __init__(self, config, params, results,
-                 raw_result, run_interval, vm_count,
-                 test_name, **attrs):
+    """
+    this class describe test results
+
+    config:TestConfig - test config object
+    params:dict - parameters from yaml file for this test
+    results:{str:MeasurementMesh} - test results object
+    raw_result:Any - opaque object to store raw results
+    run_interval:(float, float) - test tun time, used for sensors
+    """
+    def __init__(self, config, results, raw_result, run_interval):
         self.config = config
-        self.params = params
+        self.params = config.params
         self.results = results
         self.raw_result = raw_result
         self.run_interval = run_interval
-        self.vm_count = vm_count
-        self.test_name = test_name
-        self.__dict__.update(attrs)
 
     def __str__(self):
         res = "{0}({1}):\n    results:\n".format(
@@ -124,100 +76,209 @@
         pass
 
 
-class IPerfTest(object):
-    def __init__(self, options, is_primary, on_result_cb, test_uuid, node,
-                 total_nodes_count,
-                 log_directory=None,
-                 coordination_queue=None,
-                 remote_dir="/tmp/wally"):
-        self.options = options
-        self.on_result_cb = on_result_cb
-        self.log_directory = log_directory
-        self.node = node
-        self.test_uuid = test_uuid
-        self.coordination_queue = coordination_queue
-        self.remote_dir = remote_dir
-        self.is_primary = is_primary
+class MeasurementMatrix(object):
+    """
+    data:[[MeasurementResult]] - VM_COUNT x TH_COUNT matrix of MeasurementResult
+    """
+    def __init__(self, data):
+        self.data = data
+
+    def per_vm(self):
+        return self.data
+
+    def per_th(self):
+        return sum(self.data, [])
+
+
+class MeasurementResults(object):
+    def stat(self):
+        return data_property(self.data)
+
+    def __str__(self):
+        return 'TS([' + ", ".join(map(str, self.data)) + '])'
+
+
+class SimpleVals(MeasurementResults):
+    """
+    data:[float] - list of values
+    """
+    def __init__(self, data):
+        self.data = data
+
+
+class TimeSeriesValue(MeasurementResults):
+    """
+    values:[(float, float, float)] - list of (start_time, lenght, average_value_for_interval)
+    """
+    def __init__(self, data):
+        assert len(data) > 0
+        data = [(0, 0)] + data
+
+        self.values = []
+        for (cstart, cval), (nstart, nval) in zip(data[:-1], data[1:]):
+            self.values.append((cstart, nstart - cstart, nval))
+
+    @property
+    def values(self):
+        return [val[2] for val in self.data]
+
+    def skip(self, seconds):
+        nres = []
+        for start, ln, val in enumerate(self.data):
+            if start + ln < seconds:
+                continue
+            elif start > seconds:
+                nres.append([start + ln - seconds, val])
+            else:
+                nres.append([0, val])
+        return self.__class__(nres)
+
+    def derived(self, tdelta):
+        end = tdelta
+        res = [[end, 0.0]]
+        tdelta = float(tdelta)
+
+        for start, lenght, val in self.data:
+            if start < end:
+                ln = min(end, start + lenght) - start
+                res[-1][1] += val * ln / tdelta
+
+            if end <= start + lenght:
+                end += tdelta
+                res.append([end, 0.0])
+                while end < start + lenght:
+                    res[-1][1] = val
+                    res.append([end, 0.0])
+                    end += tdelta
+
+        if res[-1][1] < 1:
+            res = res[:-1]
+
+        return self.__class__(res)
+
+
+class PerfTest(object):
+    """
+    Very base class for tests
+    config:TestConfig - test configuration
+    stop_requested:bool - stop for test requested
+    """
+    def __init__(self, config):
+        self.config = config
         self.stop_requested = False
-        self.total_nodes_count = total_nodes_count
 
     def request_stop(self):
         self.stop_requested = True
 
     def join_remote(self, path):
-        return os.path.join(self.remote_dir, path)
-
-    def coordinate(self, data):
-        if self.coordination_queue is not None:
-            self.coordination_queue.put((self.node.get_conn_id(), data))
-
-    def pre_run(self):
-        pass
-
-    def cleanup(self):
-        pass
+        return os.path.join(self.config.remote_dir, path)
 
     @classmethod
     @abc.abstractmethod
-    def load(cls, data):
+    def load(cls, path):
         pass
 
     @abc.abstractmethod
-    def run(self, barrier):
+    def run(self):
         pass
 
-    @classmethod
+    @abc.abstractmethod
     def format_for_console(cls, data):
-        msg = "{0}.format_for_console".format(cls.__name__)
-        raise NotImplementedError(msg)
-
-    def run_over_ssh(self, cmd, **kwargs):
-        return run_over_ssh(self.node.connection, cmd,
-                            node=self.node.get_conn_id(), **kwargs)
-
-    @classmethod
-    def coordination_th(cls, coord_q, barrier, num_threads):
         pass
 
 
-class TwoScriptTest(IPerfTest):
-    def __init__(self, *dt, **mp):
-        IPerfTest.__init__(self, *dt, **mp)
+def run_on_node(node):
+    def closure(*args, **kwargs):
+        return run_over_ssh(node.connection,
+                            *args,
+                            node=node.get_conn_id(),
+                            **kwargs)
+    return closure
 
-        if 'scripts_path' in self.options:
-            self.root = self.options['scripts_path']
-            self.run_script = self.options['run_script']
-            self.prerun_script = self.options['prerun_script']
+
+class ThreadedTest(PerfTest):
+    """
+    Base class for tests, which spawn separated thread for each node
+    """
+
+    def run(self):
+        barrier = Barrier(len(self.nodes))
+        th_test_func = functools.partial(self.th_test_func, barrier)
+
+        with ThreadPoolExecutor(len(self.nodes)) as pool:
+            return list(pool.map(th_test_func, self.config.nodes))
+
+    @abc.abstractmethod
+    def do_test(self, node):
+        pass
+
+    def th_test_func(self, barrier, node):
+        logger.debug("Starting {0} test on {1} node".format(self.__class__.__name__,
+                                                            node.conn_url))
+
+        logger.debug("Run preparation for {0}".format(node.get_conn_id()))
+        self.pre_run(node)
+        barrier.wait()
+        try:
+            logger.debug("Run test for {0}".format(node.get_conn_id()))
+            return self.do_test(node)
+        except StopTestError as exc:
+            pass
+        except Exception as exc:
+            msg = "In test {0} for node {1}".format(self, node.get_conn_id())
+            logger.exception(msg)
+            exc = StopTestError(msg, exc)
+
+        try:
+            self.cleanup()
+        except StopTestError as exc1:
+            if exc is None:
+                exc = exc1
+        except Exception as exc1:
+            if exc is None:
+                msg = "Duringf cleanup - in test {0} for node {1}".format(self, node)
+                logger.exception(msg)
+                exc = StopTestError(msg, exc)
+
+        if exc is not None:
+            raise exc
+
+    def pre_run(self, node):
+        pass
+
+    def cleanup(self, node):
+        pass
+
+
+class TwoScriptTest(ThreadedTest):
+    def __init__(self, *dt, **mp):
+        ThreadedTest.__init__(self, *dt, **mp)
+
+        self.prerun_script = self.config.params['prerun_script']
+        self.run_script = self.config.params['run_script']
+
+        self.prerun_tout = self.config.params.get('prerun_tout', 3600)
+        self.run_tout = self.config.params.get('run_tout', 3600)
 
     def get_remote_for_script(self, script):
-        return os.path.join(self.remote_dir, script.rpartition('/')[2])
+        return os.path.join(self.options.remote_dir,
+                            os.path.basename(script))
 
-    def pre_run(self):
-        copy_paths(self.node.connection, {self.root: self.remote_dir})
+    def pre_run(self, node):
+        copy_paths(node.connection,
+                   {
+                       self.run_script: self.get_remote_for_script(self.run_script),
+                       self.prerun_script: self.get_remote_for_script(self.prerun_script),
+                   })
+
         cmd = self.get_remote_for_script(self.pre_run_script)
-        self.run_over_ssh(cmd, timeout=2000)
+        cmd += ' ' + self.config.params.get('prerun_opts', '')
+        run_on_node(node)(cmd, timeout=self.prerun_tout)
 
-    def run(self, barrier):
-        remote_script = self.get_remote_for_script(self.run_script)
-        cmd_opts = ' '.join(["%s %s" % (key, val) for key, val
-                             in self.options.items()])
-        cmd = remote_script + ' ' + cmd_opts
-        out_err = self.run_over_ssh(cmd, timeout=6000)
-        self.on_result(out_err, cmd)
-
-    def parse_results(self, out):
-        for line in out.split("\n"):
-            key, separator, value = line.partition(":")
-            if key and value:
-                self.on_result_cb((key, float(value)))
-
-    def on_result(self, out_err, cmd):
-        try:
-            self.parse_results(out_err)
-        except Exception as exc:
-            msg_templ = "Error during postprocessing results: {0!s}. {1}"
-            raise RuntimeError(msg_templ.format(exc, out_err))
-
-    def merge_results(self, results):
-        tpcm = sum([val[1] for val in results])
-        return {"res": {"TpmC": tpcm}}
+    def run(self, node):
+        cmd = self.get_remote_for_script(self.run_script)
+        cmd += ' ' + self.config.params.get('run_opts', '')
+        t1 = time.time()
+        res = run_on_node(node)(cmd, timeout=self.run_tout)
+        t2 = time.time()
+        return TestResults(self.config, None, res, (t1, t2))