a lot of changes
diff --git a/wally/discover/fuel.py b/wally/discover/fuel.py
index 7a14f94..be10116 100644
--- a/wally/discover/fuel.py
+++ b/wally/discover/fuel.py
@@ -43,7 +43,7 @@
 
     fuel_nodes = list(cluster.get_nodes())
 
-    logger.debug("Found FUEL {0}".format(".".join(map(str, version))))
+    logger.info("Found FUEL {0}".format(".".join(map(str, version))))
 
     network = 'fuelweb_admin' if version >= [6, 0] else 'admin'
 
diff --git a/wally/report.py b/wally/report.py
index b334fa5..f46352d 100644
--- a/wally/report.py
+++ b/wally/report.py
@@ -1,6 +1,7 @@
 import os
 import bisect
 import logging
+import collections
 
 try:
     import matplotlib.pyplot as plt
@@ -31,13 +32,14 @@
 
 
 class PerfInfo(object):
-    def __init__(self, name, raw, meta):
+    def __init__(self, name, intervals, params, testnodes_count):
         self.name = name
         self.bw = None
         self.iops = None
         self.lat = None
-        self.raw = raw
-        self.meta = meta
+        self.params = params
+        self.intervals = intervals
+        self.testnodes_count = testnodes_count
 
 
 def split_and_add(data, block_size):
@@ -50,19 +52,51 @@
     return res
 
 
+def group_by_name(test_data):
+    name_map = collections.defaultdict(lambda: [])
+
+    for block in test_data:
+        for data in block['res']:
+            data = data.copy()
+            data['__meta__'] = block['__meta__']
+            name_map[data['name']].append(data)
+
+    return name_map
+
+
 def process_disk_info(test_data):
+    name_map = group_by_name(test_data)
     data = {}
-    vm_count = test_data['__test_meta__']['testnodes_count']
-    for name, results in test_data['res'].items():
-        assert len(results['bw']) % vm_count == 0
-        block_count = len(results['bw']) // vm_count
+    for name, results in name_map.items():
+        testnodes_count_set = set(dt['__meta__']['testnodes_count']
+                                  for dt in results)
 
-        pinfo = PerfInfo(name, results, test_data['__test_meta__'])
-        pinfo.bw = data_property(split_and_add(results['bw'], block_count))
-        pinfo.iops = data_property(split_and_add(results['iops'],
-                                                 block_count))
+        assert len(testnodes_count_set) == 1
+        testnodes_count, = testnodes_count_set
+        assert len(results) % testnodes_count == 0
 
-        pinfo.lat = data_property(results['lat'])
+        block_count = len(results) // testnodes_count
+        intervals = [result['run_interval'] for result in results]
+
+        p = results[0]['params'].copy()
+        rt = p.pop('ramp_time', 0)
+
+        for result in results[1:]:
+            tp = result['params'].copy()
+            tp.pop('ramp_time', None)
+            assert tp == p
+
+        p['ramp_time'] = rt
+        pinfo = PerfInfo(name, intervals, p, testnodes_count)
+
+        bw = [result['results']['bw'] for result in results]
+        iops = [result['results']['iops'] for result in results]
+        lat = [result['results']['lat'] for result in results]
+
+        pinfo.bw = data_property(split_and_add(bw, block_count))
+        pinfo.iops = data_property(split_and_add(iops, block_count))
+        pinfo.lat = data_property(lat)
+
         data[name] = pinfo
     return data
 
@@ -200,7 +234,7 @@
 def io_chart_mpl(title, concurence,
                  latv, latv_min, latv_max,
                  iops_or_bw, iops_or_bw_err,
-                 legend, fname):
+                 legend, fname, log=False):
     points = " MiBps" if legend == 'BW' else ""
     lc = len(concurence)
     width = 0.35
@@ -210,20 +244,21 @@
     fig, p1 = plt.subplots()
     xpos = [i - width / 2 for i in xt]
 
-    p1.bar(xpos, iops_or_bw, width=width, yerr=iops_or_bw_err,
+    p1.bar(xpos, iops_or_bw,
+           width=width,
+           yerr=iops_or_bw_err,
+           ecolor='m',
            color='y',
            label=legend)
 
-    p1.set_yscale('log')
     p1.grid(True)
-    p1.plot(xt, op_per_vm, label=legend + " per vm")
-    p1.legend()
+    p1.plot(xt, op_per_vm, '--', label=legend + "/vm", color='black')
+    handles1, labels1 = p1.get_legend_handles_labels()
 
     p2 = p1.twinx()
-    p2.set_yscale('log')
-    p2.plot(xt, latv_max, label="latency max")
-    p2.plot(xt, latv, label="latency avg")
-    p2.plot(xt, latv_min, label="latency min")
+    p2.plot(xt, latv_max, label="lat max")
+    p2.plot(xt, latv, label="lat avg")
+    p2.plot(xt, latv_min, label="lat min")
 
     plt.xlim(0.5, lc + 0.5)
     plt.xticks(xt, map(str, concurence))
@@ -231,9 +266,18 @@
     p1.set_ylabel(legend + points)
     p2.set_ylabel("Latency ms")
     plt.title(title)
-    # plt.legend(, loc=2, borderaxespad=0.)
-    # plt.legend(bbox_to_anchor=(1.05, 1), loc=2)
-    plt.legend(loc=2)
+    handles2, labels2 = p2.get_legend_handles_labels()
+
+    plt.legend(handles1 + handles2, labels1 + labels2,
+               loc='center left', bbox_to_anchor=(1.1, 0.81))
+    # fontsize='small')
+
+    if log:
+        p1.set_yscale('log')
+        p2.set_yscale('log')
+    plt.subplots_adjust(right=0.7)
+    # plt.show()  # bbox_extra_artists=(leg,), bbox_inches='tight')
+    # exit(1)
     plt.savefig(fname, format=fname.split('.')[-1])
 
 
@@ -269,17 +313,18 @@
         if len(chart_data) == 0:
             raise ValueError("Can't found any date for " + name_pref)
 
-        use_bw = ssize2b(chart_data[0].raw['blocksize']) > 16 * 1024
+        use_bw = ssize2b(chart_data[0].params['blocksize']) > 16 * 1024
 
-        chart_data.sort(key=lambda x: x.raw['concurence'])
+        chart_data.sort(key=lambda x: x.params['concurence'])
 
         #  if x.lat.average < max_lat]
         lat = [x.lat.average / 1000 for x in chart_data]
         lat_min = [x.lat.min / 1000 for x in chart_data]
         lat_max = [x.lat.max / 1000 for x in chart_data]
 
-        vm_count = x.meta['testnodes_count']
-        concurence = [x.raw['concurence'] * vm_count for x in chart_data]
+        testnodes_count = x.testnodes_count
+        concurence = [x.params['concurence'] * testnodes_count
+                      for x in chart_data]
 
         if use_bw:
             data = [x.bw.average / 1000 for x in chart_data]
@@ -306,9 +351,9 @@
     result = None
     attr = 'iops' if iops else 'bw'
     for measurement in processed_results.values():
-        ok = measurement.raw['sync_mode'] == sync_mode
-        ok = ok and (measurement.raw['blocksize'] == blocksize)
-        ok = ok and (measurement.raw['rw'] == rw)
+        ok = measurement.params['sync_mode'] == sync_mode
+        ok = ok and (measurement.params['blocksize'] == blocksize)
+        ok = ok and (measurement.params['rw'] == rw)
 
         if ok:
             field = getattr(measurement, attr)
@@ -343,12 +388,12 @@
                                         'd', '1m', 'read', False)
 
     for res in processed_results.values():
-        if res.raw['sync_mode'] == 's' and res.raw['blocksize'] == '4k':
-            if res.raw['rw'] != 'randwrite':
+        if res.params['sync_mode'] == 's' and res.params['blocksize'] == '4k':
+            if res.params['rw'] != 'randwrite':
                 continue
             rws4k_iops_lat_th.append((res.iops.average,
                                       res.lat.average,
-                                      res.raw['concurence']))
+                                      res.params['concurence']))
 
     rws4k_iops_lat_th.sort(key=lambda (_1, _2, conc): conc)
 
diff --git a/wally/run_test.py b/wally/run_test.py
index dc6637f..5322432 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -5,6 +5,7 @@
 import time
 import Queue
 import pprint
+import signal
 import logging
 import argparse
 import functools
@@ -14,6 +15,7 @@
 import collections
 
 import yaml
+import faulthandler
 from concurrent.futures import ThreadPoolExecutor
 
 from wally import pretty_yaml
@@ -21,9 +23,9 @@
 from wally.discover import discover, Node
 from wally.timeseries import SensorDatastore
 from wally import utils, report, ssh_utils, start_vms
-from wally.suits.itest import IOPerfTest, PgBenchTest, MysqlTest
-from wally.sensors_utils import deploy_sensors_stage
 from wally.config import cfg_dict, load_config, setup_loggers
+from wally.suits.itest import IOPerfTest, PgBenchTest, MysqlTest
+from wally.sensors_utils import deploy_sensors_stage, gather_sensors_stage
 
 
 try:
@@ -143,6 +145,78 @@
         res_q.put(exc)
 
 
+def run_single_test(test_nodes, name, test_cls, params,
+                    test_local_folder, run_uuid, counter=[0]):
+    logger.info("Starting {0} tests".format(name))
+    res_q = Queue.Queue()
+    threads = []
+    coord_q = Queue.Queue()
+    rem_folder = test_local_folder.format(name=name)
+
+    barrier = utils.Barrier(len(test_nodes))
+    for idx, node in enumerate(test_nodes):
+        msg = "Starting {0} test on {1} node"
+        logger.debug(msg.format(name, node.conn_url))
+
+        dname = "{0}_{1}_{2}".format(name, counter[0], node.get_ip())
+        counter[0] += 1
+        dr = os.path.join(cfg_dict['test_log_directory'], dname)
+
+        if not os.path.exists(dr):
+            os.makedirs(dr)
+
+        params = params.copy()
+        params['testnodes_count'] = len(test_nodes)
+        test = test_cls(options=params,
+                        is_primary=(idx == 0),
+                        on_result_cb=res_q.put,
+                        test_uuid=run_uuid,
+                        node=node,
+                        remote_dir=rem_folder,
+                        log_directory=dr,
+                        coordination_queue=coord_q)
+        th = threading.Thread(None, test_thread, None,
+                              (test, node, barrier, res_q))
+        threads.append(th)
+        th.daemon = True
+        th.start()
+
+    th = threading.Thread(None, test_cls.coordination_th, None,
+                          (coord_q, barrier, len(threads)))
+    threads.append(th)
+    th.daemon = True
+    th.start()
+
+    results = []
+    coord_q.put(None)
+
+    while len(threads) != 0:
+        nthreads = []
+
+        for th in threads:
+            if not th.is_alive():
+                th.join()
+            else:
+                nthreads.append(th)
+
+        threads = nthreads
+
+        while not res_q.empty():
+            val = res_q.get()
+
+            if isinstance(val, utils.StopTestError):
+                raise val
+
+            if isinstance(val, Exception):
+                msg = "Exception during test execution: {0!s}"
+                raise ValueError(msg.format(val))
+
+            results.append(val)
+
+    results = test_cls.merge_results(results)
+    return results
+
+
 def run_tests(cfg, test_block, nodes):
     tool_type_mapper = {
         "io": IOPerfTest,
@@ -156,87 +230,65 @@
         logger.error("No test nodes found")
         return
 
-    test_number_per_type = {}
-    res_q = Queue.Queue()
-
     for name, params in test_block.items():
-        logger.info("Starting {0} tests".format(name))
-        test_num = test_number_per_type.get(name, 0)
-        test_number_per_type[name] = test_num + 1
-        threads = []
-        barrier = utils.Barrier(len(test_nodes))
-        coord_q = Queue.Queue()
-        test_cls = tool_type_mapper[name]
-        rem_folder = cfg['default_test_local_folder'].format(name=name)
-
-        for idx, node in enumerate(test_nodes):
-            msg = "Starting {0} test on {1} node"
-            logger.debug(msg.format(name, node.conn_url))
-
-            dr = os.path.join(
-                    cfg_dict['test_log_directory'],
-                    "{0}_{1}_{2}".format(name, test_num, node.get_ip())
-                )
-
-            if not os.path.exists(dr):
-                os.makedirs(dr)
-
-            test = test_cls(options=params,
-                            is_primary=(idx == 0),
-                            on_result_cb=res_q.put,
-                            test_uuid=cfg['run_uuid'],
-                            node=node,
-                            remote_dir=rem_folder,
-                            log_directory=dr,
-                            coordination_queue=coord_q)
-            th = threading.Thread(None, test_thread, None,
-                                  (test, node, barrier, res_q))
-            threads.append(th)
-            th.daemon = True
-            th.start()
-
-        th = threading.Thread(None, test_cls.coordination_th, None,
-                              (coord_q, barrier, len(threads)))
-        threads.append(th)
-        th.daemon = True
-        th.start()
-
-        def gather_results(res_q, results):
-            while not res_q.empty():
-                val = res_q.get()
-
-                if isinstance(val, utils.StopTestError):
-                    raise val
-
-                if isinstance(val, Exception):
-                    msg = "Exception during test execution: {0!s}"
-                    raise ValueError(msg.format(val))
-
-                results.append(val)
-
         results = []
+        limit = params.get('node_limit')
+        if isinstance(limit, (int, long)):
+            vm_limits = [limit]
+        elif limit is None:
+            vm_limits = [len(test_nodes)]
+        else:
+            vm_limits = limit
 
-        # MAX_WAIT_TIME = 10
-        # end_time = time.time() + MAX_WAIT_TIME
+        for vm_count in vm_limits:
+            if vm_count == 'all':
+                curr_test_nodes = test_nodes
+                unused_nodes = []
+            else:
+                curr_test_nodes = test_nodes[:vm_count]
+                unused_nodes = test_nodes[vm_count:]
 
-        # while time.time() < end_time:
-        while True:
-            for th in threads:
-                th.join(1)
-                gather_results(res_q, results)
-                # if time.time() > end_time:
-                #     break
+            if 0 == len(curr_test_nodes):
+                continue
 
-            if all(not th.is_alive() for th in threads):
-                break
+            if cfg.get('suspend_unused_vms', True):
+                pausable_nodes_ids = [node.os_vm_id for node in unused_nodes
+                                      if node.os_vm_id is not None]
+                non_pausable = len(unused_nodes) - len(pausable_nodes_ids)
 
-        # if any(th.is_alive() for th in threads):
-        #     logger.warning("Some test threads still running")
+                if 0 != non_pausable:
+                    logger.warning("Can't pause {0} nodes".format(
+                                   non_pausable))
 
-        gather_results(res_q, results)
-        result = test_cls.merge_results(results)
-        result['__test_meta__'] = {'testnodes_count': len(test_nodes)}
-        yield name, result
+                if len(pausable_nodes_ids) != 0:
+                    logger.debug("Try to pause {0} unused nodes".format(
+                                 len(pausable_nodes_ids)))
+                    start_vms.pause(pausable_nodes_ids)
+
+            resumable_nodes_ids = [node.os_vm_id for node in curr_test_nodes
+                                   if node.os_vm_id is not None]
+
+            if len(resumable_nodes_ids) != 0:
+                logger.debug("Check and unpause {0} nodes".format(
+                             len(resumable_nodes_ids)))
+                start_vms.unpause(resumable_nodes_ids)
+
+            test_cls = tool_type_mapper[name]
+            try:
+                res = run_single_test(curr_test_nodes, name, test_cls,
+                                      params,
+                                      cfg['default_test_local_folder'],
+                                      cfg['run_uuid'])
+            finally:
+                if cfg.get('suspend_unused_vms', True):
+                    if len(pausable_nodes_ids) != 0:
+                        logger.debug("Unpausing {0} nodes".format(
+                                     len(pausable_nodes_ids)))
+                        start_vms.unpause(pausable_nodes_ids)
+
+            results.append(res)
+
+        yield name, results
 
 
 def log_nodes_statistic(_, ctx):
@@ -306,13 +358,22 @@
         fd.write(pretty_yaml.dumps(cluster))
 
 
-def reuse_vms_stage(vm_name_pattern, conn_pattern):
-    def reuse_vms(cfg, ctx):
+def reuse_vms_stage(cfg, ctx):
+    p = cfg.get('clouds', {})
+    p = p.get('openstack', {})
+    p = p.get('vms', [])
+
+    for creds in p:
+        vm_name_pattern, conn_pattern = creds.split(",")
         try:
             msg = "Looking for vm with name like {0}".format(vm_name_pattern)
             logger.debug(msg)
 
-            os_creds = get_OS_credentials(cfg, ctx, "clouds")
+            if not start_vms.is_connected():
+                os_creds = get_OS_credentials(cfg, ctx)
+            else:
+                os_creds = {}
+
             conn = start_vms.nova_connect(**os_creds)
             for ip, vm_id in start_vms.find_vms(conn, vm_name_pattern):
                 node = Node(conn_pattern.format(ip=ip), ['testnode'])
@@ -325,56 +386,58 @@
             logger.exception(msg)
             raise utils.StopTestError(msg, exc)
 
-    return reuse_vms
+
+def get_creds_openrc(path):
+    fc = open(path).read()
+
+    echo = 'echo "$OS_TENANT_NAME:$OS_USERNAME:$OS_PASSWORD@$OS_AUTH_URL"'
+
+    try:
+        data = utils.run_locally(['/bin/bash'],
+                                 input_data=fc + "\n" + echo)
+    except subprocess.CalledProcessError as exc:
+        msg = "Failed to get creads from openrc file: " + data
+        logger.exception(msg)
+        raise utils.StopTestError(msg, exc)
+
+    try:
+        data = data.strip()
+        user, tenant, passwd_auth_url = data.split(':', 2)
+        passwd, auth_url = passwd_auth_url.rsplit("@", 1)
+        assert (auth_url.startswith("https://") or
+                auth_url.startswith("http://"))
+    except Exception as exc:
+        msg = "Failed to get creads from openrc file: " + data
+        logger.exception(msg)
+        raise utils.StopTestError(msg, exc)
+    return user, passwd, tenant, auth_url
 
 
-def get_OS_credentials(cfg, ctx, creds_type):
+def get_OS_credentials(cfg, ctx):
     creds = None
-
-    if creds_type == 'clouds':
-        logger.info("Using OS credentials from 'cloud' section")
-        if 'openstack' in cfg['clouds']:
-            os_cfg = cfg['clouds']['openstack']
-
+    if 'openstack' in cfg['clouds']:
+        os_cfg = cfg['clouds']['openstack']
+        if 'OPENRC' in os_cfg:
+            logger.info("Using OS credentials from " + os_cfg['OPENRC'])
+            user, passwd, tenant, auth_url = \
+                get_creds_openrc(os_cfg['OPENRC'])
+        elif 'ENV' in os_cfg:
+            logger.info("Using OS credentials from shell environment")
+            user, passwd, tenant, auth_url = start_vms.ostack_get_creds()
+        else:
+            logger.info("Using predefined credentials")
             tenant = os_cfg['OS_TENANT_NAME'].strip()
             user = os_cfg['OS_USERNAME'].strip()
             passwd = os_cfg['OS_PASSWORD'].strip()
             auth_url = os_cfg['OS_AUTH_URL'].strip()
 
-        elif 'fuel' in cfg['clouds'] and \
-             'openstack_env' in cfg['clouds']['fuel']:
-            creds = ctx.fuel_openstack_creds
-
-    elif creds_type == 'ENV':
-        logger.info("Using OS credentials from shell environment")
-        user, passwd, tenant, auth_url = start_vms.ostack_get_creds()
-    elif os.path.isfile(creds_type):
-        logger.info("Using OS credentials from " + creds_type)
-        fc = open(creds_type).read()
-
-        echo = 'echo "$OS_TENANT_NAME:$OS_USERNAME:$OS_PASSWORD@$OS_AUTH_URL"'
-
-        try:
-            data = utils.run_locally(['/bin/bash'], input=fc + "\n" + echo)
-        except subprocess.CalledProcessError as exc:
-            msg = "Failed to get creads from openrc file: " + data
-            logger.exception(msg)
-            raise utils.StopTestError(msg, exc)
-
-        try:
-            user, tenant, passwd_auth_url = data.split(':', 2)
-            passwd, auth_url = passwd_auth_url.rsplit("@", 1)
-            assert (auth_url.startswith("https://") or
-                    auth_url.startswith("http://"))
-        except Exception as exc:
-            msg = "Failed to get creads from openrc file: " + data
-            logger.exception(msg)
-            raise utils.StopTestError(msg, exc)
-
+    elif 'fuel' in cfg['clouds'] and \
+         'openstack_env' in cfg['clouds']['fuel']:
+        logger.info("Using fuel creds")
+        creds = ctx.fuel_openstack_creds
     else:
-        msg = "Creds {0!r} isn't supported".format(creds_type)
-        logger.error(msg)
-        raise utils.StopTestError(msg, None)
+        logger.error("Can't found OS credentials")
+        raise utils.StopTestError("Can't found OS credentials", None)
 
     if creds is None:
         creds = {'name': user,
@@ -392,16 +455,18 @@
     params = cfg['vm_configs'][config['cfg_name']].copy()
     os_nodes_ids = []
 
-    os_creds_type = config['creds']
-    os_creds = get_OS_credentials(cfg, ctx, os_creds_type)
+    if not start_vms.is_connected():
+        os_creds = get_OS_credentials(cfg, ctx)
+    else:
+        os_creds = {}
     start_vms.nova_connect(**os_creds)
 
-    logger.info("Preparing openstack")
     params.update(config)
     params['keypair_file_private'] = params['keypair_name'] + ".pem"
     params['group_name'] = cfg_dict['run_uuid']
 
     if not config.get('skip_preparation', False):
+        logger.info("Preparing openstack")
         start_vms.prepare_os_subpr(params=params, **os_creds)
 
     new_nodes = []
@@ -446,26 +511,25 @@
             vm_ctx = create_vms_ctx(ctx, cfg, config['openstack'],
                                     num_test_nodes)
             with vm_ctx as new_nodes:
-                connect_all(new_nodes, True)
+                if len(new_nodes) != 0:
+                    connect_all(new_nodes, True)
 
-                for node in new_nodes:
-                    if node.connection is None:
-                        msg = "Failed to connect to vm {0}"
-                        raise RuntimeError(msg.format(node.get_conn_id()))
+                    for node in new_nodes:
+                        if node.connection is None:
+                            msg = "Failed to connect to vm {0}"
+                            raise RuntimeError(msg.format(node.get_conn_id()))
 
-                deploy_sensors_stage(cfg_dict,
-                                     ctx,
-                                     nodes=new_nodes,
-                                     undeploy=False)
+                    deploy_sensors_stage(cfg_dict,
+                                         ctx,
+                                         nodes=new_nodes,
+                                         undeploy=False)
 
-                if not cfg['no_tests']:
-                    for test_group in config.get('tests', []):
-                        test_res = run_tests(cfg, test_group, ctx.nodes)
-                        ctx.results.extend(test_res)
+                for test_group in config.get('tests', []):
+                    test_res = run_tests(cfg, test_group, ctx.nodes)
+                    ctx.results.extend(test_res)
         else:
-            if not cfg['no_tests']:
-                test_res = run_tests(cfg, group, ctx.nodes)
-                ctx.results.extend(test_res)
+            test_res = run_tests(cfg, group, ctx.nodes)
+            ctx.results.extend(test_res)
 
 
 def shut_down_vms_stage(cfg, ctx):
@@ -522,9 +586,19 @@
     for tp, data in ctx.results:
         if 'io' == tp and data is not None:
             dinfo = report.process_disk_info(data)
+            text_rep_fname = cfg['text_report_file']
+            rep = IOPerfTest.format_for_console(data, dinfo)
+
+            with open(text_rep_fname, "w") as fd:
+                fd.write(rep)
+                fd.write("\n")
+                fd.flush()
+
+            logger.info("Text report were stored in " + text_rep_fname)
             print("\n")
             print(IOPerfTest.format_for_console(data, dinfo))
             print("\n")
+
         if tp in ['mysql', 'pgbench'] and data is not None:
             print("\n")
             print(MysqlTest.format_for_console(data))
@@ -547,14 +621,6 @@
                                   cfg['charts_img_path'],
                                   lab_info=ctx.hw_info)
 
-            text_rep_fname = cfg_dict['text_report_file']
-            with open(text_rep_fname, "w") as fd:
-                fd.write(IOPerfTest.format_for_console(data, dinfo))
-                fd.write("\n")
-                fd.flush()
-
-            logger.info("Text report were stored in " + text_rep_fname)
-
 
 def complete_log_nodes_statistic(cfg, ctx):
     nodes = ctx.nodes
@@ -610,13 +676,19 @@
                         help="Skip html report", default=False)
     parser.add_argument("--params", metavar="testname.paramname",
                         help="Test params", default=[])
-    parser.add_argument("--reuse-vms", default=[], nargs='*')
     parser.add_argument("config_file")
 
     return parser.parse_args(argv[1:])
 
 
+# from plop.collector import Collector
+
+
 def main(argv):
+    # collector = Collector()
+    # collector.start()
+
+    faulthandler.register(signal.SIGUSR1, all_threads=True)
     opts = parse_args(argv)
     load_config(opts.config_file, opts.post_process_only)
 
@@ -629,11 +701,8 @@
             discover_stage
         ]
 
-        for reuse_param in opts.reuse_vms:
-            pref, ssh_templ = reuse_param.split(',', 1)
-            stages.append(reuse_vms_stage(pref, ssh_templ))
-
         stages.extend([
+            reuse_vms_stage,
             log_nodes_statistic,
             save_nodes_stage,
             connect_stage])
@@ -644,7 +713,8 @@
         stages.extend([
             deploy_sensors_stage,
             run_tests_stage,
-            store_raw_results_stage
+            store_raw_results_stage,
+            gather_sensors_stage
         ])
 
     report_stages = [
@@ -683,7 +753,10 @@
 
     try:
         for stage in stages:
-            logger.info("Start {0.__name__} stage".format(stage))
+            if stage.__name__.endswith("stage"):
+                logger.info("Start {0.__name__}".format(stage))
+            else:
+                logger.info("Start {0.__name__} stage".format(stage))
             stage(cfg_dict, ctx)
     except utils.StopTestError as exc:
         logger.error(msg_templ.format(stage, exc))
@@ -693,7 +766,10 @@
         exc, cls, tb = sys.exc_info()
         for stage in ctx.clear_calls_stack[::-1]:
             try:
-                logger.info("Start {0.__name__} stage".format(stage))
+                if stage.__name__.endswith("stage"):
+                    logger.info("Start {0.__name__}".format(stage))
+                else:
+                    logger.info("Start {0.__name__} stage".format(stage))
                 stage(cfg_dict, ctx)
             except utils.StopTestError as cleanup_exc:
                 logger.error(msg_templ.format(stage, cleanup_exc))
@@ -718,6 +794,9 @@
     if cfg_dict.get('run_web_ui', False):
         stop_web_ui(cfg_dict, ctx)
 
+    # collector.stop()
+    # open("plop.out", "w").write(repr(dict(collector.stack_counts)))
+
     if exc is None:
         logger.info("Tests finished successfully")
         return 0
diff --git a/wally/sensors/deploy_sensors.py b/wally/sensors/deploy_sensors.py
index 6fff833..4a1c5df 100644
--- a/wally/sensors/deploy_sensors.py
+++ b/wally/sensors/deploy_sensors.py
@@ -70,7 +70,8 @@
         time.sleep(0.3)
 
         # logger.warning("Sensors don't removed")
-        run_over_ssh(conn, "rm -rf {0}".format(remote_path), node=url)
+        run_over_ssh(conn, "rm -rf {0}".format(remote_path),
+                     node=url, timeout=10)
     except Exception as exc:
         msg = "Failed to remove sensors from node {0}: {1!s}"
         logger.error(msg.format(url, exc))
diff --git a/wally/sensors/protocol.py b/wally/sensors/protocol.py
index 67aef2a..c053011 100644
--- a/wally/sensors/protocol.py
+++ b/wally/sensors/protocol.py
@@ -1,4 +1,5 @@
 import sys
+import csv
 import time
 import struct
 import socket
@@ -217,6 +218,28 @@
         self.fd = open(fname, "w")
 
 
+class CSVFileTransport(ITransport):
+    required_keys = set(['time', 'source_id', 'hostname'])
+
+    def __init__(self, receiver, fname):
+        ITransport.__init__(self, receiver)
+        self.fd = open(fname, "w")
+        self.csv_fd = csv.writer(self.fd)
+        self.field_list = []
+        self.csv_fd.writerow(['NEW_DATA'])
+
+    def send(self, data):
+        if self.field_list == []:
+            keys = set(data)
+            assert self.required_keys.issubset(keys)
+            keys -= self.required_keys
+            self.field_list = sorted(keys)
+            self.csv_fd.writerow([data['source_id'], data['hostname']] +
+                                 self.field_list)
+
+        self.csv_fd.writerow(map(data.__getitem__, ['time'] + self.field_list))
+
+
 class UDPTransport(ITransport):
     def __init__(self, receiver, ip, port, packer_cls):
         self.port = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
@@ -261,6 +284,8 @@
                             packer_cls=packer_cls)
     elif parsed_uri.scheme == 'file':
         return FileTransport(receiver, parsed_uri.path)
+    elif parsed_uri.scheme == 'csvfile':
+        return CSVFileTransport(receiver, parsed_uri.path)
     else:
         templ = "Can't instantiate transport from {0!r}"
         raise ValueError(templ.format(uri))
diff --git a/wally/sensors_utils.py b/wally/sensors_utils.py
index 9350349..65de0ef 100644
--- a/wally/sensors_utils.py
+++ b/wally/sensors_utils.py
@@ -119,10 +119,10 @@
 
 def deploy_sensors_stage(cfg, ctx, nodes=None, undeploy=True,
                          recv_timeout=10, ignore_nodata=False):
-    if 'sensors' not in cfg:
-        return
 
     cfg = cfg.get('sensors')
+    if cfg is None:
+        return
 
     if nodes is None:
         nodes = ctx.nodes
@@ -134,11 +134,14 @@
         logger.info("Nothing to monitor, no sensors would be installed")
         return
 
-    if ctx.sensors_mon_q is None:
-        logger.info("Start sensors data receiving thread")
-        ctx.sensors_mon_q = start_sensor_process_thread(ctx, cfg,
-                                                        sensors_configs,
-                                                        source2roles_map)
+    is_online = cfg.get('online', False)
+
+    if is_online:
+        if ctx.sensors_mon_q is None:
+            logger.info("Start sensors data receiving thread")
+            ctx.sensors_mon_q = start_sensor_process_thread(ctx, cfg,
+                                                            sensors_configs,
+                                                            source2roles_map)
 
     if undeploy:
         def remove_sensors_stage(cfg, ctx):
@@ -153,8 +156,30 @@
                                 num_monitoref_nodes))
 
     deploy_and_start_sensors(sensors_configs)
-    wait_for_new_sensors_data(ctx, monitored_nodes, recv_timeout,
-                              ignore_nodata)
+
+    if is_online:
+        wait_for_new_sensors_data(ctx, monitored_nodes, recv_timeout,
+                                  ignore_nodata)
+
+
+def gather_sensors_stage(cfg, ctx, nodes=None):
+    cfg = cfg.get('sensors')
+    if cfg is None:
+        return
+
+    is_online = cfg.get('online', False)
+    if is_online:
+        return
+
+    if nodes is None:
+        nodes = ctx.nodes
+
+    _, sensors_configs, _ = get_sensors_config_for_nodes(cfg, nodes)
+    gather_sensors_info(sensors_configs)
+
+
+def gather_sensors_info(sensors_configs):
+    pass
 
 
 def wait_for_new_sensors_data(ctx, monitored_nodes, recv_timeout,
diff --git a/wally/ssh_utils.py b/wally/ssh_utils.py
index 7b6d593..45ca892 100644
--- a/wally/ssh_utils.py
+++ b/wally/ssh_utils.py
@@ -1,5 +1,6 @@
 import re
 import time
+import errno
 import socket
 import shutil
 import logging
@@ -68,6 +69,18 @@
 NODE_KEYS = {}
 
 
+def exists(sftp, path):
+    """os.path.exists for paramiko's SCP object
+    """
+    try:
+        sftp.stat(path)
+        return True
+    except IOError as e:
+        if e.errno == errno.ENOENT:
+            return False
+        raise
+
+
 def set_key_for_node(host_port, key):
     sio = StringIO.StringIO(key)
     NODE_KEYS[host_port] = paramiko.RSAKey.from_private_key(sio)
@@ -76,7 +89,7 @@
 
 def ssh_connect(creds, conn_timeout=60):
     if creds == 'local':
-        return Local
+        return Local()
 
     tcp_timeout = 15
     banner_timeout = 30
@@ -286,6 +299,8 @@
     templs = [
         "^{host_rr}$",
         "^{host_rr}:{port_rr}$",
+        "^{host_rr}::{key_file_rr}$",
+        "^{host_rr}:{port_rr}:{key_file_rr}$",
         "^{user_rr}@{host_rr}$",
         "^{user_rr}@{host_rr}:{port_rr}$",
         "^{user_rr}@{host_rr}::{key_file_rr}$",
@@ -330,7 +345,7 @@
 
 def connect(uri, **params):
     if uri == 'local':
-        return Local
+        return Local()
 
     creds = parse_ssh_uri(uri)
     creds.port = int(creds.port)
@@ -338,14 +353,28 @@
 
 
 all_sessions_lock = threading.Lock()
-all_sessions = []
+all_sessions = {}
+
+
+def start_in_bg(conn, cmd, capture_out=False, **params):
+    assert not capture_out
+    pid = run_over_ssh(conn, "nohup {0} 2>&1 >/dev/null & echo $!",
+                       timeout=10, **params)
+    return int(pid.strip()), None, None
+
+
+def check_running(conn, pid):
+    try:
+        run_over_ssh(conn, "ls /proc/{0}", timeout=10, nolog=True)
+    except OSError:
+        return False
 
 
 def run_over_ssh(conn, cmd, stdin_data=None, timeout=60,
                  nolog=False, node=None):
     "should be replaces by normal implementation, with select"
 
-    if conn is Local:
+    if isinstance(conn, Local):
         if not nolog:
             logger.debug("SSH:local Exec {0!r}".format(cmd))
         proc = subprocess.Popen(cmd, shell=True,
@@ -367,7 +396,7 @@
         node = ""
 
     with all_sessions_lock:
-        all_sessions.append(session)
+        all_sessions[id(session)] = session
 
     try:
         session.set_combine_stderr(True)
@@ -400,6 +429,8 @@
 
         code = session.recv_exit_status()
     finally:
+        with all_sessions_lock:
+            del all_sessions[id(session)]
         session.close()
 
     if code != 0:
@@ -411,9 +442,10 @@
 
 def close_all_sessions():
     with all_sessions_lock:
-        for session in all_sessions:
+        for session in all_sessions.values():
             try:
                 session.sendall('\x03')
                 session.close()
             except:
                 pass
+        all_sessions.clear()
diff --git a/wally/start_vms.py b/wally/start_vms.py
index 0bd2b9a..7e1c687 100644
--- a/wally/start_vms.py
+++ b/wally/start_vms.py
@@ -5,7 +5,7 @@
 import logging
 import subprocess
 
-from concurrent.futures import ThreadPoolExecutor
+from concurrent.futures import ThreadPoolExecutor, wait
 
 from novaclient.exceptions import NotFound
 from novaclient.client import Client as n_client
@@ -23,6 +23,10 @@
 CINDER_CONNECTION = None
 
 
+def is_connected():
+    return NOVA_CONNECTION is not None
+
+
 def ostack_get_creds():
     if STORED_OPENSTACK_CREDS is None:
         env = os.environ.get
@@ -119,6 +123,42 @@
                         break
 
 
+def pause(ids):
+    def pause_vm(conn, vm_id):
+        vm = conn.servers.get(vm_id)
+        if vm.status == 'ACTIVE':
+            vm.pause()
+
+    conn = nova_connect()
+    with ThreadPoolExecutor(max_workers=16) as executor:
+        futures = [executor.submit(pause_vm, conn, vm_id)
+                   for vm_id in ids]
+        for future in futures:
+            future.result()
+
+
+def unpause(ids, max_resume_time=10):
+    def unpause(conn, vm_id):
+        vm = conn.servers.get(vm_id)
+        if vm.status == 'PAUSED':
+            vm.unpause()
+
+        for i in range(max_resume_time * 10):
+            vm = conn.servers.get(vm_id)
+            if vm.status != 'PAUSED':
+                return
+            time.sleep(0.1)
+        raise RuntimeError("Can't unpause vm {0}".format(vm_id))
+
+    conn = nova_connect()
+    with ThreadPoolExecutor(max_workers=16) as executor:
+        futures = [executor.submit(unpause, conn, vm_id)
+                   for vm_id in ids]
+
+        for future in futures:
+            future.result()
+
+
 def prepare_os(nova, params):
     allow_ssh(nova, params['security_group'])
 
@@ -253,7 +293,8 @@
 def launch_vms(params, already_has_count=0):
     logger.debug("Starting new nodes on openstack")
     count = params['count']
-    lst = NOVA_CONNECTION.services.list(binary='nova-compute')
+    nova = nova_connect()
+    lst = nova.services.list(binary='nova-compute')
     srv_count = len([srv for srv in lst if srv.status == 'enabled'])
 
     if isinstance(count, basestring):
diff --git a/wally/statistic.py b/wally/statistic.py
index 5a9d163..8180619 100644
--- a/wally/statistic.py
+++ b/wally/statistic.py
@@ -148,6 +148,16 @@
     def rounded_average_conf(self):
         return round_deviation((self.average, self.confidence))
 
+    def rounded_average_dev(self):
+        return round_deviation((self.average, self.deviation))
+
+    def __str__(self):
+        return "StatProps({0} ~ {1})".format(round_3_digit(self.average),
+                                             round_3_digit(self.deviation))
+
+    def __repr__(self):
+        return str(self)
+
 
 def data_property(data, confidence=0.95):
     res = StatProps()
diff --git a/wally/suits/io/agent.py b/wally/suits/io/agent.py
index 57ba229..3c3e436 100644
--- a/wally/suits/io/agent.py
+++ b/wally/suits/io/agent.py
@@ -59,7 +59,7 @@
                 opt_name, opt_val = line.split('=', 1)
                 yield lineno, SETTING, opt_name.strip(), opt_val.strip()
             else:
-                yield lineno, SETTING, line, None
+                yield lineno, SETTING, line, '1'
         except Exception as exc:
             pref = "During parsing line number {0}\n{1!s}".format(lineno, exc)
             raise ValueError(pref)
@@ -107,9 +107,6 @@
 
 
 def parse_value(val):
-    if val is None:
-        return None
-
     try:
         return int(val)
     except ValueError:
@@ -170,7 +167,7 @@
         cycles_var_values = []
 
         for name, val in sec.vals.items():
-            if isinstance(val, list):
+            if isinstance(val, (list, tuple)):
                 cycles_var_names.append(name)
                 cycles_var_values.append(val)
 
@@ -205,7 +202,10 @@
         if num_jobs != 1:
             assert 'group_reporting' in sec.vals, group_report_err_msg
 
-        params = sec.format_params
+        assert sec.vals.get('unified_rw_reporting', '1') in (1, '1')
+        sec.vals['unified_rw_reporting'] = '1'
+
+        params = sec.format_params.copy()
 
         fsize = to_bytes(sec.vals['size'])
         params['PER_TH_OFFSET'] = fsize // num_jobs
@@ -214,13 +214,14 @@
             if isinstance(val, basestring):
                 sec.vals[name] = parse_value(val.format(**params))
             else:
-                assert isinstance(val, (int, float)) or val is None
+                assert isinstance(val, (int, float))
 
         params['UNIQ'] = 'UN{0}'.format(counter[0])
         params['COUNTER'] = str(counter[0])
         counter[0] += 1
-        params['TEST_SUMM'] = get_test_summary(sec.vals)
-
+        params['TEST_SUMM'] = get_test_summary(sec.vals,
+                                               params.get('VM_COUNT', 1))
+        params.update(sec.vals)
         sec.name = sec.name.format(**params)
 
         yield sec
@@ -238,11 +239,7 @@
         for name, val in sec.vals.items():
             if name.startswith('_'):
                 continue
-
-            if val is None:
-                res += name + "\n"
-            else:
-                res += "{0}={1}\n".format(name, val)
+            res += "{0}={1}\n".format(name, val)
 
     return res
 
@@ -266,7 +263,7 @@
         return 'a'
 
 
-def get_test_summary(params):
+def get_test_summary(params, testnodes_count):
     rw = {"randread": "rr",
           "randwrite": "rw",
           "read": "sr",
@@ -278,10 +275,11 @@
     if th_count is None:
         th_count = params.get('concurence', 1)
 
-    return "{0}{1}{2}th{3}".format(rw,
-                                   sync_mode,
-                                   params['blocksize'],
-                                   th_count)
+    return "{0}{1}{2}th{3}vm{4}".format(rw,
+                                        sync_mode,
+                                        params['blocksize'],
+                                        th_count,
+                                        testnodes_count)
 
 
 def calculate_execution_time(sec_iter):
@@ -387,13 +385,13 @@
     raw_out, raw_err = p.communicate(benchmark_config)
     end_time = time.time()
 
-    # HACK
-    raw_out = "{" + raw_out.split('{', 1)[1]
-
     if 0 != p.returncode:
         msg = "Fio failed with code: {0}\nOutput={1}"
         raise OSError(msg.format(p.returncode, raw_err))
 
+    # HACK
+    raw_out = "{" + raw_out.split('{', 1)[1]
+
     try:
         parsed_out = json.loads(raw_out)["jobs"]
     except KeyError:
@@ -409,45 +407,46 @@
     return zip(parsed_out, config_slice), (start_time, end_time)
 
 
-def add_job_results(section, job_output, res):
-    if job_output['write']['iops'] != 0:
-        raw_result = job_output['write']
-    else:
-        raw_result = job_output['read']
+class FioResult(object):
+    def __init__(self, name, params, run_interval, results):
+        self.params = params.copy()
+        self.name = name
+        self.run_interval = run_interval
+        self.results = results
 
-    vals = section.vals
-    if section.name not in res:
-        j_res = {}
-        j_res["rw"] = vals["rw"]
-        j_res["sync_mode"] = get_test_sync_mode(vals)
-        j_res["concurence"] = int(vals.get("numjobs", 1))
-        j_res["blocksize"] = vals["blocksize"]
-        j_res["jobname"] = job_output["jobname"]
-        j_res["timings"] = [int(vals.get("runtime", 0)),
-                            int(vals.get("ramp_time", 0))]
-    else:
-        j_res = res[section.name]
-        assert j_res["rw"] == vals["rw"]
-        assert j_res["rw"] == vals["rw"]
-        assert j_res["sync_mode"] == get_test_sync_mode(vals)
-        assert j_res["concurence"] == int(vals.get("numjobs", 1))
-        assert j_res["blocksize"] == vals["blocksize"]
-        assert j_res["jobname"] == job_output["jobname"]
+    def json_obj(self):
+        return self.__dict__
 
-        # ramp part is skipped for all tests, except first
-        # assert j_res["timings"] == (vals.get("runtime"),
-        #                             vals.get("ramp_time"))
 
-    def j_app(name, x):
-        j_res.setdefault(name, []).append(x)
+def make_job_results(section, job_output, slice_timings):
+    # merge by section.merge_id
 
-    j_app("bw", raw_result["bw"])
-    j_app("iops", raw_result["iops"])
-    j_app("lat", raw_result["lat"]["mean"])
-    j_app("clat", raw_result["clat"]["mean"])
-    j_app("slat", raw_result["slat"]["mean"])
+    raw_result = job_output['mixed']
 
-    res[section.name] = j_res
+    res = {
+        "bw": raw_result["bw"],
+        "iops": raw_result["iops"],
+        "lat": raw_result["lat"]["mean"],
+        "clat": raw_result["clat"]["mean"],
+        "slat": raw_result["slat"]["mean"]
+    }
+
+    vls = section.vals.copy()
+
+    vls['sync_mode'] = get_test_sync_mode(vls)
+    vls['concurence'] = vls.get('numjobs', 1)
+
+    return FioResult(section.name, vls, slice_timings, res)
+
+
+def get_slice_parts_offset(test_slice, real_inteval):
+    calc_exec_time = calculate_execution_time(test_slice)
+    coef = (real_inteval[1] - real_inteval[0]) / calc_exec_time
+    curr_offset = real_inteval[0]
+    for section in test_slice:
+        slen = calculate_execution_time([section]) * coef
+        yield (curr_offset, curr_offset + slen)
+        curr_offset += slen
 
 
 def run_fio(sliced_it, raw_results_func=None):
@@ -455,15 +454,19 @@
 
     curr_test_num = 0
     executed_tests = 0
-    result = {}
-    timings = []
+    result = []
 
     for i, test_slice in enumerate(sliced_list):
+        test_slice = list(test_slice)
+
         res_cfg_it, slice_timings = do_run_fio(test_slice)
-        res_cfg_it = enumerate(res_cfg_it, curr_test_num)
+        sec_intervals = get_slice_parts_offset(test_slice,
+                                               slice_timings)
+        res_cfg_it = enumerate(zip(res_cfg_it, sec_intervals),
+                               curr_test_num)
 
         section_names = []
-        for curr_test_num, (job_output, section) in res_cfg_it:
+        for curr_test_num, ((job_output, section), interval) in res_cfg_it:
             executed_tests += 1
             section_names.append(section.name)
 
@@ -474,12 +477,8 @@
             msg = "{0} != {1}".format(section.name, job_output["jobname"])
             assert section.name == job_output["jobname"], msg
 
-            if section.name.startswith('_'):
-                continue
+            result.append(make_job_results(section, job_output, interval))
 
-            add_job_results(section, job_output, result)
-
-        timings.append((section_names, slice_timings))
         curr_test_num += 1
         msg_template = "Done {0} tests from {1}. ETA: {2}"
 
@@ -490,7 +489,7 @@
                                   test_left,
                                   sec_to_str(time_eta))
 
-    return result, executed_tests, timings
+    return result
 
 
 def run_benchmark(binary_tp, *argv, **kwargs):
@@ -619,19 +618,14 @@
 
         rrfunc = raw_res_func if argv_obj.show_raw_results else None
 
-        stime = time.time()
-        job_res, num_tests, timings = run_benchmark(argv_obj.type,
-                                                    sliced_it, rrfunc)
-        etime = time.time()
+        job_res = run_benchmark(argv_obj.type,
+                                sliced_it, rrfunc)
 
-        res = {'__meta__': {'raw_cfg': job_cfg,
-                            'params': params,
-                            'timings': timings},
-               'res': job_res}
+        res = {'__meta__': {'params': params,
+                            'testnodes_count': int(params.get('VM_COUNT', 1))},
+               'res': [j.json_obj() for j in job_res]}
 
         oformat = 'json' if argv_obj.json else 'eval'
-        msg = "\nRun {0} tests in {1} seconds\n"
-        out_fd.write(msg.format(num_tests, int(etime - stime)))
 
         msg = "========= RESULTS(format={0}) =========\n"
         out_fd.write(msg.format(oformat))
diff --git a/wally/suits/io/check_distribution.cfg b/wally/suits/io/check_distribution.cfg
index d08e52b..e7cafd9 100644
--- a/wally/suits/io/check_distribution.cfg
+++ b/wally/suits/io/check_distribution.cfg
@@ -1,6 +1,9 @@
 [defaults]
 NUM_ROUNDS=301
 
+# this is critical for correct results in multy-node run
+randrepeat=0
+
 [distrubution_test_{TEST_SUMM} * {NUM_ROUNDS}]
 blocksize=4k
 rw=randwrite
diff --git a/wally/suits/io/check_linearity.cfg b/wally/suits/io/check_linearity.cfg
index a75d937..670e8b3 100644
--- a/wally/suits/io/check_linearity.cfg
+++ b/wally/suits/io/check_linearity.cfg
@@ -1,6 +1,9 @@
 [defaults]
 NUM_ROUNDS=7
 
+# this is critical for correct results in multy-node run
+randrepeat=0
+
 ramp_time=5
 buffered=0
 wait_for_previous
@@ -8,6 +11,7 @@
 iodepth=1
 size=10G
 time_based
+ramp_time=5
 runtime=30
 
 # ---------------------------------------------------------------------
diff --git a/wally/suits/io/check_th_count.cfg b/wally/suits/io/check_th_count.cfg
index 3f9dd19..1607634 100644
--- a/wally/suits/io/check_th_count.cfg
+++ b/wally/suits/io/check_th_count.cfg
@@ -1,4 +1,8 @@
 [defaults]
+
+# this is critical for correct results in multy-node run
+randrepeat=0
+
 NUM_ROUNDS=7
 ramp_time=5
 buffered=0
diff --git a/wally/suits/io/check_warmup.cfg b/wally/suits/io/check_warmup.cfg
index 37ee083..415e3ce 100644
--- a/wally/suits/io/check_warmup.cfg
+++ b/wally/suits/io/check_warmup.cfg
@@ -1,6 +1,9 @@
 [defaults]
 NUM_ROUNDS=7
 
+# this is critical for correct results in multy-node run
+randrepeat=0
+
 ramp_time=5
 buffered=0
 wait_for_previous
diff --git a/wally/suits/io/formatter.py b/wally/suits/io/formatter.py
index 09565be..22c090f 100644
--- a/wally/suits/io/formatter.py
+++ b/wally/suits/io/formatter.py
@@ -5,61 +5,64 @@
 from wally.suits.io.agent import get_test_summary
 
 
-def key_func(k_data):
-    name, data = k_data
-    return (data['rw'],
-            data['sync_mode'],
-            ssize2b(data['blocksize']),
-            data['concurence'],
-            name)
+def key_func(data):
+    p = data.params
+    return (p['rw'],
+            p['sync_mode'],
+            ssize2b(p['blocksize']),
+            int(p['concurence']) * data.testnodes_count,
+            data.name)
 
 
-def format_results_for_console(test_set, dinfo):
+def format_results_for_console(dinfo):
     """
     create a table with io performance report
     for console
     """
     tab = texttable.Texttable(max_width=120)
     tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
-    tab.set_cols_align(["l", "l", "r", "r", "r", "r", "r", "r"])
+    tab.set_cols_align(["l", "l", "r", "r", "r", "r", "r", "r", "r"])
 
-    items = sorted(test_set['res'].items(), key=key_func)
+    items = sorted(dinfo.values(), key=key_func)
 
     prev_k = None
-    vm_count = test_set['__test_meta__']['testnodes_count']
     header = ["Name", "Description", "iops\ncum", "KiBps\ncum",
-              "Cnf\n95%", "iops\nper vm", "KiBps\nper vm", "lat\nms"]
+              "Cnf\n95%", "Dev%", "iops\nper vm", "KiBps\nper vm", "lat\nms"]
 
-    for test_name, data in items:
+    for data in items:
 
-        curr_k = key_func((test_name, data))[:3]
+        curr_k = key_func(data)[:3]
 
         if prev_k is not None:
             if prev_k != curr_k:
                 tab.add_row(
-                    ["-------", "--------", "-----", "------",
-                     "---", "------", "---", "-----"])
+                    ["-------", "-----------", "-----", "------",
+                     "---", "----", "------", "---", "-----"])
 
         prev_k = curr_k
 
-        descr = get_test_summary(data)
-        test_dinfo = dinfo[test_name]
+        descr = get_test_summary(data.params, data.testnodes_count)
+        test_dinfo = dinfo[data.name]
 
         iops, _ = test_dinfo.iops.rounded_average_conf()
+
         bw, bw_conf = test_dinfo.bw.rounded_average_conf()
+        _, bw_dev = test_dinfo.bw.rounded_average_dev()
         conf_perc = int(round(bw_conf * 100 / bw))
+        dev_perc = int(round(bw_dev * 100 / bw))
 
         lat, _ = test_dinfo.lat.rounded_average_conf()
         lat = round_3_digit(int(lat) // 1000)
 
-        iops_per_vm = round_3_digit(iops / float(vm_count))
-        bw_per_vm = round_3_digit(bw / float(vm_count))
+        iops_per_vm = round_3_digit(iops / data.testnodes_count)
+        bw_per_vm = round_3_digit(bw / data.testnodes_count)
 
         iops = round_3_digit(iops)
         bw = round_3_digit(bw)
 
-        params = (test_name.split('_', 1)[0],
+        params = (data.name.rsplit('_', 1)[0],
                   descr, int(iops), int(bw), str(conf_perc),
+                  str(dev_perc),
                   int(iops_per_vm), int(bw_per_vm), lat)
         tab.add_row(params)
 
diff --git a/wally/suits/io/hdd.cfg b/wally/suits/io/hdd.cfg
index 4156171..17d0509 100644
--- a/wally/suits/io/hdd.cfg
+++ b/wally/suits/io/hdd.cfg
@@ -9,6 +9,9 @@
 NUM_ROUNDS=7
 NUMJOBS={% 1, 5, 10, 15, 20, 30, 40, 80 %}
 
+# this is critical for correct results in multy-node run
+randrepeat=0
+
 size=10G
 ramp_time=5
 runtime=30
diff --git a/wally/suits/io/lat_vs_iops.cfg b/wally/suits/io/lat_vs_iops.cfg
new file mode 100644
index 0000000..a587a96
--- /dev/null
+++ b/wally/suits/io/lat_vs_iops.cfg
@@ -0,0 +1,29 @@
+[defaults]
+wait_for_previous=1
+filename={FILENAME}
+
+# this is critical for correct results in multy-node run
+randrepeat=0
+
+iodepth=1
+size=100G
+group_reporting=1
+
+IOPS_LIMIT={% 100, 500 %}
+
+ramp_time=5
+runtime=30
+time_based=1
+
+buffered=0
+NUMJOBS=1
+
+# ---------------------------------------------------------------------
+# latency as function from IOPS
+# ---------------------------------------------------------------------
+[lat_vs_iops{rate_iops}_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize=4k
+rw=randread
+direct=1
+numjobs={NUMJOBS}
+rate_iops={IOPS_LIMIT}
diff --git a/wally/suits/io/long_test.cfg b/wally/suits/io/long_test.cfg
index a59c360..fd420d8 100644
--- a/wally/suits/io/long_test.cfg
+++ b/wally/suits/io/long_test.cfg
@@ -1,5 +1,8 @@
 [defaults]
 
+# this is critical for correct results in multy-node run
+randrepeat=0
+
 # 24h test
 NUM_ROUNDS1=270
 NUM_ROUNDS2=261
diff --git a/wally/suits/io/rrd.cfg b/wally/suits/io/rrd.cfg
index 3f4c074..89833b9 100644
--- a/wally/suits/io/rrd.cfg
+++ b/wally/suits/io/rrd.cfg
@@ -8,6 +8,9 @@
 filename={FILENAME}
 NUM_ROUNDS=35
 
+# this is critical for correct results in multy-node run
+randrepeat=0
+
 size=30G
 ramp_time=15
 runtime=60
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index dd52f33..36d3fcf 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -15,7 +15,8 @@
 from wally.ssh_utils import (copy_paths, run_over_ssh,
                              save_to_remote,
                              # delete_file,
-                             connect, read_from_remote, Local)
+                             connect, read_from_remote, Local,
+                             exists)
 
 from . import postgres
 from . import mysql
@@ -124,7 +125,6 @@
     pre_run_script = os.path.join(root, "prepare.sh")
     run_script = os.path.join(root, "run.sh")
 
-
     @classmethod
     def format_for_console(cls, data):
         tab = texttable.Texttable(max_width=120)
@@ -163,7 +163,14 @@
                                              self.config_fname + '.cfg')
 
         self.alive_check_interval = self.options.get('alive_check_interval')
-        self.config_params = self.options.get('params', {})
+
+        self.config_params = {}
+        for name, val in self.options.get('params', {}).items():
+            if isinstance(val, (list, tuple)):
+                val = "{%" + ','.join(map(str, val)) + "%}"
+            self.config_params[name] = val
+
+        self.config_params['VM_COUNT'] = self.options['testnodes_count']
         self.tool = self.options.get('tool', 'fio')
         self.raw_cfg = open(self.config_fname).read()
         self.configs = list(io_agent.parse_all_in_1(self.raw_cfg,
@@ -230,6 +237,8 @@
         cmd_templ = "dd oflag=direct " + \
                     "if=/dev/zero of={0} bs={1} count={2}"
 
+        # cmd_templ = "fio --rw=write --bs={1} --direct=1 --size={2} "
+
         if self.use_sudo:
             cmd_templ = "sudo " + cmd_templ
 
@@ -282,7 +291,7 @@
         except Exception as exc:
             msg = "Failed to create folder {0} on remote {1}. Error: {2!s}"
             msg = msg.format(self.remote_dir, self.node.get_conn_id(), exc)
-            logger.error(msg)
+            logger.exception(msg)
             raise StopTestError(msg, exc)
 
         self.install_utils()
@@ -318,31 +327,36 @@
         except OSError:
             return False
 
-    def get_test_status(self, die_timeout=3):
+    def get_test_status(self, res_file=None):
+        found_res_file = False
         is_connected = None
         is_running = None
         pid = None
         err = None
 
         try:
-            conn = connect(self.node.conn_url,
-                           conn_timeout=self.tcp_conn_timeout)
-            with conn:
-                with conn.open_sftp() as sftp:
-                    try:
-                        pid = read_from_remote(sftp, self.pid_file)
-                        is_running = True
-                    except (NameError, IOError, OSError) as exc:
-                        pid = None
+            # conn = connect(self.node.conn_url,
+            #                conn_timeout=self.tcp_conn_timeout)
+            # with conn:
+            conn = self.node.connection
+            with conn.open_sftp() as sftp:
+                try:
+                    pid = read_from_remote(sftp, self.pid_file)
+                    is_running = True
+                except (NameError, IOError, OSError) as exc:
+                    pid = None
+                    is_running = False
+
+                if is_running:
+                    if not self.check_process_is_running(sftp, pid):
+                        try:
+                            sftp.remove(self.pid_file)
+                        except (IOError, NameError, OSError):
+                            pass
                         is_running = False
 
-                    if is_running:
-                        if not self.check_process_is_running(sftp, pid):
-                            try:
-                                sftp.remove(self.pid_file)
-                            except (IOError, NameError, OSError):
-                                pass
-                            is_running = False
+                if res_file is not None:
+                    found_res_file = exists(sftp, res_file)
 
             is_connected = True
 
@@ -350,9 +364,9 @@
             err = str(exc)
             is_connected = False
 
-        return is_connected, is_running, pid, err
+        return found_res_file, is_connected, is_running, pid, err
 
-    def wait_till_finished(self, soft_timeout, timeout):
+    def wait_till_finished(self, soft_timeout, timeout, res_fname=None):
         conn_id = self.node.get_conn_id()
         end_of_wait_time = timeout + time.time()
         soft_end_of_wait_time = soft_timeout + time.time()
@@ -366,7 +380,11 @@
         while end_of_wait_time > time.time():
             time.sleep(time_till_check)
 
-            is_connected, is_running, npid, err = self.get_test_status()
+            found_res_file, is_connected, is_running, npid, err = \
+                self.get_test_status(res_fname)
+
+            if found_res_file and not is_running:
+                return
 
             if is_connected and not is_running:
                 if pid is None:
@@ -437,7 +455,6 @@
 
                 try:
                     for data in parse_output(out_err):
-                        data['__meta__']['raw_cfg'] = self.raw_cfg
                         self.on_result_cb(data)
                 except (OSError, StopTestError):
                     raise
@@ -458,11 +475,14 @@
         if self.options.get("use_sudo", True):
             cmd_templ = "sudo " + cmd_templ
 
-        params = " ".join("{0}={1}".format(k, v)
-                          for k, v in self.config_params.items())
+        params = []
+        for k, v in self.config_params.items():
+            if isinstance(v, basestring) and v.startswith("{%"):
+                continue
+            params.append("{0}={1}".format(k, v))
 
-        if "" != params:
-            params = "--params " + params
+        if [] != params:
+            params = "--params " + " ".join(params)
 
         with self.node.connection.open_sftp() as sftp:
             save_to_remote(sftp, self.task_file,
@@ -501,59 +521,67 @@
                 logger.debug(msg.format(conn_id, screen_name))
 
         # TODO: add monitoring socket
-        if self.node.connection is not Local:
-            self.node.connection.close()
+        # if not isinstance(self.node.connection, Local):
+        #     self.node.connection.close()
 
-        self.wait_till_finished(soft_tout, timeout)
+        self.wait_till_finished(soft_tout, timeout, self.log_fl)
         if not nolog:
             logger.debug("Test on node {0} is finished".format(conn_id))
 
-        if self.node.connection is not Local:
-            conn_timeout = self.tcp_conn_timeout * 3
-            self.node.connection = connect(self.node.conn_url,
-                                           conn_timeout=conn_timeout)
+        # if self.node.connection is not Local:
+        #     conn_timeout = self.tcp_conn_timeout * 3
+        #     self.node.connection = connect(self.node.conn_url,
+        #                                    conn_timeout=conn_timeout)
 
         with self.node.connection.open_sftp() as sftp:
             return read_from_remote(sftp, self.log_fl)
 
     @classmethod
     def merge_results(cls, results):
-        if len(results) == 0:
-            return None
+        merged = results[0]
+        for block in results[1:]:
+            assert block["__meta__"] == merged["__meta__"]
+            merged['res'].extend(block['res'])
+        return merged
 
-        merged_result = results[0]
-        merged_data = merged_result['res']
-        mergable_fields = ['bw', 'clat', 'iops', 'lat', 'slat']
+    # @classmethod
+    # def merge_results(cls, results):
+    #     if len(results) == 0:
+    #         return None
 
-        for res in results[1:]:
-            mm = merged_result['__meta__']
-            assert mm['raw_cfg'] == res['__meta__']['raw_cfg']
-            assert mm['params'] == res['__meta__']['params']
-            mm['timings'].extend(res['__meta__']['timings'])
+    #     merged_result = results[0]
+    #     merged_data = merged_result['res']
+    #     mergable_fields = ['bw', 'clat', 'iops', 'lat', 'slat']
 
-            data = res['res']
-            for testname, test_data in data.items():
-                if testname not in merged_data:
-                    merged_data[testname] = test_data
-                    continue
+    #     for res in results[1:]:
+    #         mm = merged_result['__meta__']
+    #         assert mm['raw_cfg'] == res['__meta__']['raw_cfg']
+    #         assert mm['params'] == res['__meta__']['params']
+    #         mm['timings'].extend(res['__meta__']['timings'])
 
-                res_test_data = merged_data[testname]
+    #         data = res['res']
+    #         for testname, test_data in data.items():
+    #             if testname not in merged_data:
+    #                 merged_data[testname] = test_data
+    #                 continue
 
-                diff = set(test_data.keys()).symmetric_difference(
-                            res_test_data.keys())
+    #             res_test_data = merged_data[testname]
 
-                msg = "Difference: {0}".format(",".join(diff))
-                assert len(diff) == 0, msg
+    #             diff = set(test_data.keys()).symmetric_difference(
+    #                         res_test_data.keys())
 
-                for k, v in test_data.items():
-                    if k in mergable_fields:
-                        res_test_data[k].extend(v)
-                    else:
-                        msg = "{0!r} != {1!r}".format(res_test_data[k], v)
-                        assert res_test_data[k] == v, msg
+    #             msg = "Difference: {0}".format(",".join(diff))
+    #             assert len(diff) == 0, msg
 
-        return merged_result
+    #             for k, v in test_data.items():
+    #                 if k in mergable_fields:
+    #                     res_test_data[k].extend(v)
+    #                 else:
+    #                     msg = "{0!r} != {1!r}".format(res_test_data[k], v)
+    #                     assert res_test_data[k] == v, msg
+
+    #     return merged_result
 
     @classmethod
     def format_for_console(cls, data, dinfo):
-        return io_formatter.format_results_for_console(data, dinfo)
+        return io_formatter.format_results_for_console(dinfo)
diff --git a/wally/suits/mysql/__init__.py b/wally/suits/mysql/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/wally/suits/mysql/__init__.py
diff --git a/wally/utils.py b/wally/utils.py
index 3792ba4..1fa74c5 100644
--- a/wally/utils.py
+++ b/wally/utils.py
@@ -1,6 +1,7 @@
 import re
 import os
 import time
+import psutil
 import socket
 import logging
 import threading
@@ -130,24 +131,57 @@
     return "{0}{1}i".format(size // scale, name)
 
 
+RSMAP_10 = [('k', 1000),
+            ('m', 1000 ** 2),
+            ('g', 1000 ** 3),
+            ('t', 1000 ** 4)]
+
+
+def b2ssize_10(size):
+    if size < 1000:
+        return str(size)
+
+    for name, scale in RSMAP_10:
+        if size < 1000 * scale:
+            if size % scale == 0:
+                return "{0} {1}".format(size // scale, name)
+            else:
+                return "{0:.1f} {1}".format(float(size) / scale, name)
+
+    return "{0}{1}".format(size // scale, name)
+
+
 def run_locally(cmd, input_data="", timeout=20):
     shell = isinstance(cmd, basestring)
-
     proc = subprocess.Popen(cmd,
                             shell=shell,
+                            stdin=subprocess.PIPE,
                             stdout=subprocess.PIPE,
                             stderr=subprocess.PIPE)
+    res = []
 
-    end_time = time.time() + timeout
+    def thread_func():
+        rr = proc.communicate(input_data)
+        res.extend(rr)
 
-    while end_time > time.time():
-        if proc.poll() is None:
-            time.sleep(1)
+    thread = threading.Thread(target=thread_func)
+    thread.daemon = True
+    thread.start()
+    thread.join(timeout)
 
-    out, err = proc.communicate()
+    if thread.is_alive():
 
+        parent = psutil.Process(proc.pid)
+        for child in parent.children(recursive=True):
+            child.kill()
+        parent.kill()
+        thread.join()
+        raise RuntimeError("Local process timeout: " + str(cmd))
+
+    out, err = res
     if 0 != proc.returncode:
-        raise subprocess.CalledProcessError(proc.returncode, cmd, out + err)
+        raise subprocess.CalledProcessError(proc.returncode,
+                                            cmd, out + err)
 
     return out