a lot of chenges
diff --git a/TODO b/TODO
index ae60fcb..6271e18 100644
--- a/TODO
+++ b/TODO
@@ -3,15 +3,11 @@
 offline сенсоры
 dd запускать в фоне и чекать периодически
 починить все подвисания во всех потоках - дампить стеки при подвисании
-варьирование количества виртуалок
-поправить графики
 
 
 Finding bottlenecks (алена) - починить процессор
-unified_rw_reporting=1
 fadvise_hint=0
 Изменить с репорте сенсоров все на %
-Добавить в репорт количество операций
 посмотреть что с сетевыми картами
 Resource consumption:
 	добавить процессор,
diff --git a/report_templates/report_ceph.html b/report_templates/report_ceph.html
index d597ced..4ac903b 100644
--- a/report_templates/report_ceph.html
+++ b/report_templates/report_ceph.html
@@ -10,10 +10,33 @@
 <div class="page-header text-center">
   <h2>Performance Report</h2>
 </div>
+
+<!--
+0) Menu
+1) Lab very short performance: max IOPS, max BW, EC2 VM count
+2) Engineering report
+3) boxplots
+4) BW/lat/IOPS = f(time) report
+5) Bottlneck/consumption reports
+6) Excessive lab info
+7) Report description
+-->
+
 <div class="container-fluid text-center">
+
     <div class="row" style="margin-bottom: 40px">
         <div class="col-md-12">
             <center>
+
+            <h4>Summary</h4>
+            <table style="width: auto;" class="table table-bordered table-striped">
+                <tr>
+                    <td>Compute count</td><td>computes</td>
+                    <td>OSD count</td><td>OSD count</td>
+                    <td>Total Ceph disks count</td><td>OSD_hdd_count</td>
+                </tr>
+            </table>
+
             <table><tr><td>
                 <H4>Random direct performance,<br>4KiB blocks</H4>
                 <table style="width: auto;" class="table table-bordered table-striped">
@@ -70,11 +93,11 @@
             </div>
             <center><br>
             <table><tr>
-                <td><img src="charts/rand_read_4k.{img_ext}" /></td>
-                <td><img src="charts/rand_write_4k.{img_ext}" /></td>
+                <td>{rand_read_4k}</td>
+                <td>{rand_write_4k}</td>
             </tr><tr>
-                <td><img src="charts/rand_read_16m.{img_ext}" /></td>
-                <td><img src="charts/rand_write_16m.{img_ext}" /></td>
+                <td>{rand_read_16m}</td>
+                <td>{rand_write_16m}</td>
             </tr></table>
             </center>
             </center>
diff --git a/report_templates/report_linearity.html b/report_templates/report_linearity.html
new file mode 100644
index 0000000..0fa7862
--- /dev/null
+++ b/report_templates/report_linearity.html
@@ -0,0 +1,29 @@
+<!DOCTYPE html>
+<html>
+<head>
+    <title>Report</title>
+    <link rel="stylesheet"
+          href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/css/bootstrap.min.css">
+</head>
+
+<body>
+<div class="page-header text-center">
+  <h2>IOPS vs Block size</h2>
+</div>
+<div class="container-fluid text-center">
+
+    <div class="row" style="margin-bottom: 40px">
+        <div class="col-md-12">
+            <center>
+            <H3>{descr[oper_descr]} VM_COUNT:{descr[vm_count]} Thread per vm:{descr[concurence]}</H3> <br>
+            <table><tr>
+                <td>{iops_vs_size}</td>
+                <td>{iotime_vs_size}</td>
+            </tr>
+            </center>
+        </div>
+    </div>
+</div>
+</body>
+
+</html>
\ No newline at end of file
diff --git a/wally/sensors/influx_exporter.py b/scripts/influx_exporter.py
similarity index 100%
rename from wally/sensors/influx_exporter.py
rename to scripts/influx_exporter.py
diff --git a/scripts/postprocessing/bottleneck.py b/scripts/postprocessing/bottleneck.py
index d8d4b4c..8507041 100644
--- a/scripts/postprocessing/bottleneck.py
+++ b/scripts/postprocessing/bottleneck.py
@@ -1,5 +1,6 @@
 """ Analize test results for finding bottlenecks """
 
+import re
 import sys
 import csv
 import time
@@ -17,17 +18,10 @@
 except ImportError:
     pgv = None
 
+from wally.run_test import load_data_from
 from wally.utils import b2ssize, b2ssize_10
 
 
-class SensorsData(object):
-    def __init__(self, source_id, hostname, ctime, values):
-        self.source_id = source_id
-        self.hostname = hostname
-        self.ctime = ctime
-        self.values = values  # [((dev, sensor), value)]
-
-
 class SensorInfo(object):
     def __init__(self, name, print_name, native_ext, to_bytes_coef):
         self.name = name
@@ -51,99 +45,68 @@
                 if sinfo.to_bytes_coef is not None)
 
 
-def load_results(fd):
-    data = fd.read(100)
-    fd.seek(0, os.SEEK_SET)
+class NodeSensorsData(object):
+    def __init__(self, source_id, hostname, headers, values):
+        self.source_id = source_id
+        self.hostname = hostname
+        self.headers = headers
+        self.values = values
+        self.times = None
 
-    # t = time.time()
-    if '(' in data or '{' in data:
-        res, source_id2nostname = load_results_eval(fd)
-    else:
-        res, source_id2nostname = load_results_csv(fd)
+    def finalize(self):
+        self.times = [v[0] for v in self.values]
 
-    # print int(((time.time() - t) * 1000000) / len(res)), len(res)
+    def get_data_for_interval(self, beg, end):
+        p1 = bisect.bisect_left(self.times, beg)
+        p2 = bisect.bisect_right(self.times, end)
 
-    return res, source_id2nostname
+        obj = self.__class__(self.source_id,
+                             self.hostname,
+                             self.headers,
+                             self.values[p1:p2])
+        obj.times = self.times[p1:p2]
+        return obj
+
+    def __getitem__(self, name):
+        idx = self.headers.index(name.split('.'))
+        # +1 as first is a time
+        return [val[idx] for val in self.values]
 
 
 def load_results_csv(fd):
-
-    fields = {}
-    res = []
-    source_id2nostname = {}
-    coefs = {}
-
-    # cached for performance
-    ii = int
-    zz = zip
-    SD = SensorsData
-    ra = res.append
-
-    for row in csv.reader(fd):
-        if len(row) == 0:
-            continue
-        ip, port = row[:2]
-        ip_port = (ip, ii(port))
-
-        if ip_port not in fields:
-            sensors = [i.split('.') for i in row[4:]]
-            fields[ip_port] = row[2:4] + sensors
-            source_id2nostname[row[2]] = row[3]
-            coefs[ip_port] = [to_bytes.get(s[1], 1) for s in sensors]
-        else:
-            fld = fields[ip_port]
-            processed_data = []
-            a = processed_data.append
-
-            # this cycle is critical for performance
-            # don't "refactor" it, unles you are confident
-            # in what you are doing
-            for dev_sensor, val, coef in zz(fld[2:], row[3:], coefs[ip_port]):
-                a((dev_sensor, ii(val) * coef))
-
-            ctime = ii(row[2])
-            sd = SD(fld[0], fld[1], ctime, processed_data)
-            ra((ctime, sd))
-
-    res.sort(key=lambda x: x[0])
-    return res, source_id2nostname
-
-
-def load_results_eval(fd):
-    res = []
-    source_id2nostname = {}
-
-    for line in fd:
-        if line.strip() == "":
+    data = fd.read()
+    results = {}
+    for block in data.split("NEW_DATA"):
+        block = block.strip()
+        if len(block) == 0:
             continue
 
-        _, data = eval(line)
-        ctime = data.pop('time')
-        source_id = data.pop('source_id')
-        hostname = data.pop('hostname')
+        it = csv.reader(block.split("\n"))
+        headers = next(it)
+        sens_data = [map(int, vals) for vals in it]
+        source_id, hostname = headers[:2]
+        headers = [(None, 'time')] + \
+                  [header.split('.') for header in headers[2:]]
+        assert set(map(len, headers)) == set([2])
 
-        processed_data = []
-        for k, v in data.items():
-            dev, sensor = k.split('.')
-            processed_data.append(((dev, sensor),
-                                   v * to_bytes.get(sensor, 1)))
+        results[source_id] = NodeSensorsData(source_id, hostname,
+                                             headers, sens_data)
 
-        sd = SensorsData(source_id, hostname, ctime, processed_data)
-        res.append((ctime, sd))
-        source_id2nostname[source_id] = hostname
-
-    res.sort(key=lambda x: x[0])
-    return res, source_id2nostname
+    return results
 
 
-def load_test_timings(fd, max_diff=1000):
+def load_test_timings(fname, max_diff=1000):
     raw_map = collections.defaultdict(lambda: [])
-    data = yaml.load(fd.read())
-    for test_type, test_results in data:
+
+    class data(object):
+        pass
+
+    load_data_from(fname)(None, data)
+
+    for test_type, test_results in data.results:
         if test_type == 'io':
             for tests_res in test_results:
-                for test_res in tests_res['res']:
-                    raw_map[test_res['name']].append(test_res['run_interval'])
+                raw_map[tests_res.config.name].append(tests_res.run_interval)
 
     result = {}
     for name, intervals in raw_map.items():
@@ -208,15 +171,28 @@
 def total_consumption(sensors_data, roles_map):
     result = {}
 
-    for _, item in sensors_data:
-        for (dev, sensor), val in item.values:
+    for name, sensor_data in sensors_data.items():
+        for pos, (dev, sensor) in enumerate(sensor_data.headers):
+            if 'time' == sensor:
+                continue
 
             try:
                 ad = result[sensor]
             except KeyError:
                 ad = result[sensor] = AggregatedData(sensor)
 
-            ad.per_device[(item.hostname, dev)] += val
+            val = sum(vals[pos] for vals in sensor_data.values)
+
+            ad.per_device[(sensor_data.hostname, dev)] += val
+
+    # vals1 = sensors_data['localhost:22']['sdc.sectors_read']
+    # vals2 = sensors_data['localhost:22']['sdb.sectors_written']
+
+    # from matplotlib import pyplot as plt
+    # plt.plot(range(len(vals1)), vals1)
+    # plt.plot(range(len(vals2)), vals2)
+    # plt.show()
+    # exit(1)
 
     for ad in result.values():
         for (hostname, dev), val in ad.per_device.items():
@@ -299,25 +275,6 @@
     return "\n".join(table)
 
 
-def parse_args(args):
-    parser = argparse.ArgumentParser()
-    parser.add_argument('-t', '--time_period', nargs=2,
-                        type=int, default=None,
-                        help="Begin and end time for tests")
-    parser.add_argument('-m', '--max-bottlenek', type=int,
-                        default=15, help="Max bottlenek to show")
-    parser.add_argument('-x', '--max-diff', type=int,
-                        default=10, help="Max bottlenek to show in" +
-                        "0.1% from test nodes summ load")
-    parser.add_argument('-d', '--debug-ver', action='store_true',
-                        help="Full report with original data")
-    parser.add_argument('-u', '--user-ver', action='store_true',
-                        default=True, help="Avg load report")
-    parser.add_argument('-s', '--select-loads', nargs='*', default=[])
-    parser.add_argument('results_folder')
-    return parser.parse_args(args[1:])
-
-
 def make_roles_mapping(source_id_mapping, source_id2hostname):
     result = {}
     for ssh_url, roles in source_id_mapping.items():
@@ -349,7 +306,8 @@
         if sens.to_bytes_coef is not None:
             agg = consumption.get(name)
             if agg is not None:
-                max_data = max(max_data, agg.per_role.get('testnode', 0))
+                cdt = agg.per_role.get('testnode', 0) * sens.to_bytes_coef
+                max_data = max(max_data, cdt)
     return max_data
 
 
@@ -364,12 +322,11 @@
 
 
 def get_data_for_intervals(data, intervals):
-    res = []
+    res = {}
     for begin, end in intervals:
-        times = [ctime for ctime, _ in data]
-        b_p = bisect.bisect_left(times, begin)
-        e_p = bisect.bisect_right(times, end)
-        res.extend(data[b_p:e_p])
+        for name, node_data in data.items():
+            ndata = node_data.get_data_for_interval(begin, end)
+            res[name] = ndata
     return res
 
 
@@ -380,92 +337,181 @@
         self.net_devs = None
 
 
-# def plot_consumption(per_consumer_table, fields):
-#     hosts = {}
-#     storage_sensors = ('sectors_written', 'sectors_read')
+def plot_consumption(per_consumer_table, fields, refload):
+    if pgv is None:
+        return
 
-#     for (hostname, dev), consumption in per_consumer_table.items():
-#         if dev != '*':
-#             continue
+    hosts = {}
+    storage_sensors = ('sectors_written', 'sectors_read')
 
-#         if hostname not in hosts:
-#             hosts[hostname] = Host(hostname)
+    for (hostname, dev), consumption in per_consumer_table.items():
+        if hostname not in hosts:
+            hosts[hostname] = Host(hostname)
 
-#         cons_map = map(zip(fields, consumption))
+        host = hosts[hostname]
+        cons_map = dict(zip(fields, consumption))
 
-#         for sn in storage_sensors:
-#             vl = cons_map.get(sn, 0)
-#             if vl > 0:
-#                 pass
+        for sn in storage_sensors:
+            vl = cons_map.get(sn, 0)
+            if vl > 0:
+                host.hdd_devs.setdefault(dev, {})[sn] = vl
+
+    p = pgv.AGraph(name='system', directed=True)
+
+    net = "Network"
+    p.add_node(net)
+
+    in_color = 'red'
+    out_color = 'green'
+
+    for host in hosts.values():
+        g = p.subgraph(name="cluster_" + host.name, label=host.name,
+                       color="blue")
+        g.add_node(host.name, shape="diamond")
+        p.add_edge(host.name, net)
+        p.add_edge(net, host.name)
+
+        for dev_name, values in host.hdd_devs.items():
+            if dev_name == '*':
+                continue
+
+            to = values.get('sectors_written', 0)
+            frm = values.get('sectors_read', 0)
+            to_pw = 7 * to / refload
+            frm_pw = 7 * frm / refload
+            min_with = 0.1
+
+            if to_pw > min_with or frm_pw > min_with:
+                dev_fqn = host.name + "." + dev_name
+                g.add_node(dev_fqn)
+
+                if to_pw > min_with:
+                    g.add_edge(host.name, dev_fqn,
+                               label=b2ssize(to) + "B",
+                               penwidth=to_pw,
+                               fontcolor=out_color,
+                               color=out_color)
+
+                if frm_pw > min_with:
+                    g.add_edge(dev_fqn, host.name,
+                               label=b2ssize(frm) + "B",
+                               penwidth=frm_pw,
+                               color=in_color,
+                               fontcolor=in_color)
+
+    return p.string()
+
+
+def parse_args(args):
+    parser = argparse.ArgumentParser()
+    parser.add_argument('-t', '--time_period', nargs=2,
+                        type=int, default=None,
+                        help="Begin and end time for tests")
+    parser.add_argument('-m', '--max-bottlenek', type=int,
+                        default=15, help="Max bottlenek to show")
+    parser.add_argument('-x', '--max-diff', type=int,
+                        default=10, help="Max bottlenek to show in" +
+                        "0.1% from test nodes summ load")
+    parser.add_argument('-d', '--debug-ver', action='store_true',
+                        help="Full report with original data")
+    parser.add_argument('-u', '--user-ver', action='store_true',
+                        default=True, help="Avg load report")
+    parser.add_argument('-s', '--select-loads', nargs='*', default=[])
+    parser.add_argument('-f', '--fields', nargs='*', default=[])
+    parser.add_argument('results_folder')
+    return parser.parse_args(args[1:])
 
 
 def main(argv):
     opts = parse_args(argv)
 
-    sensors_data_fname = os.path.join(opts.results_folder,
-                                      'sensor_storage.txt')
+    stor_dir = os.path.join(opts.results_folder, 'sensor_storage')
+    data = {}
+    source_id2hostname = {}
+
+    csv_files = os.listdir(stor_dir)
+    for fname in csv_files:
+        assert re.match(r"\d+_\d+.csv$", fname)
+
+    csv_files.sort(key=lambda x: int(x.split('_')[0]))
+
+    for fname in csv_files:
+        with open(os.path.join(stor_dir, fname)) as fd:
+            for name, node_sens_data in load_results_csv(fd).items():
+                if name in data:
+                    assert data[name].hostname == node_sens_data.hostname
+                    assert data[name].source_id == node_sens_data.source_id
+                    assert data[name].headers == node_sens_data.headers
+                    data[name].values.extend(node_sens_data.values)
+                else:
+                    data[name] = node_sens_data
+
+    for nd in data.values():
+        assert nd.source_id not in source_id2hostname
+        source_id2hostname[nd.source_id] = nd.hostname
+        nd.finalize()
 
     roles_file = os.path.join(opts.results_folder,
                               'nodes.yaml')
 
-    raw_results_file = os.path.join(opts.results_folder,
-                                    'raw_results.yaml')
-
     src2roles = yaml.load(open(roles_file))
-    timings = load_test_timings(open(raw_results_file))
-    with open(sensors_data_fname) as fd:
-        data, source_id2hostname = load_results(fd)
+
+    timings = load_test_timings(opts.results_folder)
 
     roles_map = make_roles_mapping(src2roles, source_id2hostname)
     max_diff = float(opts.max_diff) / 1000
 
-    # print print_bottlenecks(data, opts.max_bottlenek)
-    # print print_bottlenecks(data, opts.max_bottlenek)
+    fields = ('recv_bytes', 'send_bytes',
+              'sectors_read', 'sectors_written',
+              'reads_completed', 'writes_completed')
 
-    for name, intervals in sorted(timings.items()):
+    if opts.fields != []:
+        fields = [field for field in fields if field in opts.fields]
+
+    for test_name, intervals in sorted(timings.items()):
         if opts.select_loads != []:
-            if name not in opts.select_loads:
+            if test_name not in opts.select_loads:
                 continue
 
-        print
-        print
-        print "-" * 30 + " " + name + " " + "-" * 30
-        print
+        data_chunks = get_data_for_intervals(data, intervals)
 
-        data_chunk = get_data_for_intervals(data, intervals)
-
-        consumption = total_consumption(data_chunk, roles_map)
+        consumption = total_consumption(data_chunks, roles_map)
 
         testdata_sz = get_testdata_size(consumption) * max_diff
         testop_count = get_testop_cout(consumption) * max_diff
 
-        fields = ('recv_bytes', 'send_bytes',
-                  'sectors_read', 'sectors_written',
-                  'reads_completed', 'writes_completed')
         per_consumer_table = {}
+        per_consumer_table_str = {}
 
         all_consumers = set(consumption.values()[0].all_together)
+        fields = [field for field in fields if field in consumption]
         all_consumers_sum = []
 
         for consumer in all_consumers:
+            tb_str = per_consumer_table_str[consumer] = []
             tb = per_consumer_table[consumer] = []
             vl = 0
             for name in fields:
                 val = consumption[name].all_together[consumer]
                 if SINFO_MAP[name].to_bytes_coef is None:
                     if val < testop_count:
-                        val = 0
-                    tb.append(b2ssize_10(int(val)))
+                        tb_str.append('0')
+                    else:
+                        tb_str.append(b2ssize_10(int(val)))
                 else:
+                    val = int(val) * SINFO_MAP[name].to_bytes_coef
                     if val < testdata_sz:
-                        val = 0
-                    tb.append(b2ssize(int(val)) + "B")
+                        tb_str.append('-')
+                    else:
+                        tb_str.append(b2ssize(val) + "B")
+                tb.append(int(val))
                 vl += int(val)
             all_consumers_sum.append((vl, consumer))
 
         all_consumers_sum.sort(reverse=True)
-        # plot_consumption(per_consumer_table, fields)
-        # continue
+
+        plot_consumption(per_consumer_table, fields,
+                         testdata_sz / max_diff)
 
         tt = texttable.Texttable(max_width=130)
         tt.set_cols_align(["l"] + ["r"] * len(fields))
@@ -481,11 +527,13 @@
         for summ, consumer in all_consumers_sum:
             if summ > 0:
                 tt.add_row([":".join(consumer)] +
-                           [v if v not in ('0B', '0') else '-'
-                            for v in per_consumer_table[consumer]])
+                           per_consumer_table_str[consumer])
 
         tt.set_deco(texttable.Texttable.VLINES | texttable.Texttable.HEADER)
-        print tt.draw()
+        res = tt.draw()
+        max_len = max(map(len, res.split("\n")))
+        print test_name.center(max_len)
+        print res
 
 
 if __name__ == "__main__":
diff --git a/wally/config.py b/wally/config.py
index 90fde3c..2a09d47 100644
--- a/wally/config.py
+++ b/wally/config.py
@@ -56,16 +56,17 @@
         with open(run_params_file, 'w') as fd:
             fd.write(dumps({'run_uuid': cfg_dict['run_uuid']}))
 
-    cfg_dict['charts_img_path'] = in_var_dir('charts')
-    mkdirs_if_unxists(cfg_dict['charts_img_path'])
-
     cfg_dict['vm_ids_fname'] = in_var_dir('os_vm_ids')
     cfg_dict['html_report_file'] = in_var_dir('{0}_report.html')
     cfg_dict['text_report_file'] = in_var_dir('report.txt')
     cfg_dict['log_file'] = in_var_dir('log.txt')
-    cfg_dict['sensor_storage'] = in_var_dir('sensor_storage.txt')
+    cfg_dict['sensor_storage'] = in_var_dir('sensor_storage')
+    mkdirs_if_unxists(cfg_dict['sensor_storage'])
     cfg_dict['nodes_report_file'] = in_var_dir('nodes.yaml')
 
+    if 'sensors_remote_path' not in cfg_dict:
+        cfg_dict['sensors_remote_path'] = '/tmp/sensors'
+
     testnode_log_root = cfg_dict.get('testnode_log_root', '/var/wally')
     testnode_log_dir = os.path.join(testnode_log_root, "{0}/{{name}}")
     cfg_dict['default_test_local_folder'] = \
diff --git a/wally/discover/fuel.py b/wally/discover/fuel.py
index be10116..34adc07 100644
--- a/wally/discover/fuel.py
+++ b/wally/discover/fuel.py
@@ -68,7 +68,7 @@
     nodes = []
     ips_ports = []
 
-    logger.info("Forwarding ssh ports from FUEL nodes localhost")
+    logger.info("Forwarding ssh ports from FUEL nodes to localhost")
     fuel_usr, fuel_passwd = ssh_creds.split(":", 1)
     ips = [str(fuel_node.get_ip(network)) for fuel_node in fuel_nodes]
     port_fw = forward_ssh_ports(fuel_host, fuel_usr, fuel_passwd, ips)
diff --git a/wally/hw_info.py b/wally/hw_info.py
index 9d38913..5819eed 100644
--- a/wally/hw_info.py
+++ b/wally/hw_info.py
@@ -27,11 +27,18 @@
         self.raw = None
         self.storage_controllers = []
 
+    def get_HDD_count(self):
+        # SATA HDD COUNT, SAS 10k HDD COUNT, SAS SSD count, PCI-E SSD count
+        return []
+
     def get_summary(self):
         cores = sum(count for _, count in self.cores)
         disks = sum(size for _, size in self.disks_info.values())
 
-        return {'cores': cores, 'ram': self.ram_size, 'storage': disks}
+        return {'cores': cores,
+                'ram': self.ram_size,
+                'storage': disks,
+                'disk_count': len(self.disks_info)}
 
     def __str__(self):
         res = []
@@ -92,7 +99,6 @@
 
 class SWInfo(object):
     def __init__(self):
-        self.os = None
         self.partitions = None
         self.kernel_version = None
         self.fio_version = None
@@ -105,6 +111,28 @@
 
 def get_sw_info(conn):
     res = SWInfo()
+
+    with conn.open_sftp() as sftp:
+        def get(fname):
+            try:
+                return ssh_utils.read_from_remote(sftp, fname)
+            except:
+                return None
+
+        res.kernel_version = get('/proc/version')
+        res.partitions = get('/etc/mtab')
+        res.OS_version = get('/etc/lsb-release')
+
+    def rr(cmd):
+        try:
+            return ssh_utils.run_over_ssh(conn, cmd, nolog=True)
+        except:
+            return None
+
+    res.libvirt_version = rr("virsh -v")
+    res.qemu_version = rr("qemu-system-x86_64 --version")
+    res.ceph_version = rr("ceph --version")
+
     return res
 
 
diff --git a/wally/pretty_yaml.py b/wally/pretty_yaml.py
index ff1f3bc..699af7e 100644
--- a/wally/pretty_yaml.py
+++ b/wally/pretty_yaml.py
@@ -3,9 +3,12 @@
 
 
 def dumps_simple(val):
-    bad_symbols = set(" \r\t\n,':")
+    bad_symbols = set(" \r\t\n,':{}[]><;")
 
     if isinstance(val, basestring):
+        if isinstance(val, unicode):
+            val = val.encode('utf8')
+
         if len(bad_symbols & set(val)) != 0:
             return repr(val)
         return val
@@ -28,7 +31,7 @@
     return all(isinstance(val, (int, float, long)) for val in vals)
 
 
-def dumpv(data, tab_sz=4, width=120, min_width=40):
+def dumpv(data, tab_sz=4, width=160, min_width=40):
     tab = ' ' * tab_sz
 
     if width < min_width:
@@ -44,6 +47,8 @@
                 one_line = "[{0}]".format(", ".join(map(dumps_simple, data)))
             else:
                 one_line = "[{0}]".format(",".join(map(dumps_simple, data)))
+        elif len(data) == 0:
+            one_line = "[]"
         else:
             one_line = None
 
@@ -58,21 +63,40 @@
         else:
             res.append(one_line)
     elif isinstance(data, dict):
-        assert all(map(is_simple, data.keys()))
+        if len(data) == 0:
+            res.append("{}")
+        else:
+            assert all(map(is_simple, data.keys()))
 
-        for k, v in data.items():
-            key_str = dumps_simple(k) + ": "
-            val_res = dumpv(v, tab_sz, width - tab_sz, min_width)
+            one_line = None
+            if all(map(is_simple, data.values())):
+                one_line = ", ".join(
+                    "{0}: {1}".format(dumps_simple(k), dumps_simple(v))
+                    for k, v in sorted(data.items()))
+                one_line = "{" + one_line + "}"
+                if len(one_line) > width:
+                    one_line = None
 
-            if len(val_res) == 1 and \
-               len(key_str + val_res[0]) < width and \
-               not isinstance(v, dict):
-                res.append(key_str + val_res[0])
+            if one_line is None:
+                for k, v in data.items():
+                    key_str = dumps_simple(k) + ": "
+                    val_res = dumpv(v, tab_sz, width - tab_sz, min_width)
+
+                    if len(val_res) == 1 and \
+                       len(key_str + val_res[0]) < width and \
+                       not isinstance(v, dict):
+                        res.append(key_str + val_res[0])
+                    else:
+                        res.append(key_str)
+                        res.extend(tab + i for i in val_res)
             else:
-                res.append(key_str)
-                res.extend(tab + i for i in val_res)
+                res.append(one_line)
     else:
-        raise ValueError("Can't pack {0!r}".format(data))
+        try:
+            get_yamable = data.get_yamable
+        except AttributeError:
+            raise ValueError("Can't pack {0!r}".format(data))
+        res = dumpv(get_yamable(), tab_sz, width, min_width)
 
     return res
 
diff --git a/wally/report.py b/wally/report.py
index f46352d..04577a9 100644
--- a/wally/report.py
+++ b/wally/report.py
@@ -2,16 +2,19 @@
 import bisect
 import logging
 import collections
+from cStringIO import StringIO
 
 try:
+    import numpy
+    import scipy
     import matplotlib.pyplot as plt
 except ImportError:
     plt = None
 
 import wally
-from wally import charts
 from wally.utils import ssize2b
 from wally.statistic import round_3_digit, data_property
+from wally.suits.io.fio_task_parser import get_test_sync_mode
 
 
 logger = logging.getLogger("wally.report")
@@ -31,35 +34,43 @@
 report_funcs = []
 
 
+class Attrmapper(object):
+    def __init__(self, dct):
+        self.__dct = dct
+
+    def __getattr__(self, name):
+        try:
+            return self.__dct[name]
+        except KeyError:
+            raise AttributeError(name)
+
+
 class PerfInfo(object):
-    def __init__(self, name, intervals, params, testnodes_count):
+    def __init__(self, name, summary, intervals, params, testnodes_count):
         self.name = name
         self.bw = None
         self.iops = None
         self.lat = None
+
+        self.raw_bw = []
+        self.raw_iops = []
+        self.raw_lat = []
+
         self.params = params
         self.intervals = intervals
         self.testnodes_count = testnodes_count
+        self.summary = summary
+        self.p = Attrmapper(self.params.vals)
 
-
-def split_and_add(data, block_size):
-    assert len(data) % block_size == 0
-    res = [0] * block_size
-
-    for idx, val in enumerate(data):
-        res[idx % block_size] += val
-
-    return res
+        self.sync_mode = get_test_sync_mode(self.params)
+        self.concurence = self.params.vals.get('numjobs', 1)
 
 
 def group_by_name(test_data):
     name_map = collections.defaultdict(lambda: [])
 
-    for block in test_data:
-        for data in block['res']:
-            data = data.copy()
-            data['__meta__'] = block['__meta__']
-            name_map[data['name']].append(data)
+    for data in test_data:
+        name_map[(data.config.name, data.summary())].append(data)
 
     return name_map
 
@@ -67,37 +78,27 @@
 def process_disk_info(test_data):
     name_map = group_by_name(test_data)
     data = {}
-    for name, results in name_map.items():
-        testnodes_count_set = set(dt['__meta__']['testnodes_count']
-                                  for dt in results)
+    for (name, summary), results in name_map.items():
+        testnodes_count_set = set(dt.vm_count for dt in results)
 
         assert len(testnodes_count_set) == 1
         testnodes_count, = testnodes_count_set
         assert len(results) % testnodes_count == 0
 
-        block_count = len(results) // testnodes_count
-        intervals = [result['run_interval'] for result in results]
+        intervals = [result.run_interval for result in results]
+        p = results[0].config
+        pinfo = PerfInfo(p.name, result.summary(), intervals,
+                         p, testnodes_count)
 
-        p = results[0]['params'].copy()
-        rt = p.pop('ramp_time', 0)
+        pinfo.raw_bw = [result.results['bw'] for result in results]
+        pinfo.raw_iops = [result.results['iops'] for result in results]
+        pinfo.raw_lat = [result.results['lat'] for result in results]
 
-        for result in results[1:]:
-            tp = result['params'].copy()
-            tp.pop('ramp_time', None)
-            assert tp == p
+        pinfo.bw = data_property(map(sum, zip(*pinfo.raw_bw)))
+        pinfo.iops = data_property(map(sum, zip(*pinfo.raw_iops)))
+        pinfo.lat = data_property(sum(pinfo.raw_lat, []))
 
-        p['ramp_time'] = rt
-        pinfo = PerfInfo(name, intervals, p, testnodes_count)
-
-        bw = [result['results']['bw'] for result in results]
-        iops = [result['results']['iops'] for result in results]
-        lat = [result['results']['lat'] for result in results]
-
-        pinfo.bw = data_property(split_and_add(bw, block_count))
-        pinfo.iops = data_property(split_and_add(iops, block_count))
-        pinfo.lat = data_property(lat)
-
-        data[name] = pinfo
+        data[(p.name, summary)] = pinfo
     return data
 
 
@@ -108,70 +109,138 @@
     return closure
 
 
-def linearity_report(processed_results, path, lab_info):
-    names = {}
-    for tp1 in ('rand', 'seq'):
-        for oper in ('read', 'write'):
-            for sync in ('sync', 'direct', 'async'):
-                sq = (tp1, oper, sync)
-                name = "{0} {1} {2}".format(*sq)
-                names["".join(word[0] for word in sq)] = name
+def get_test_lcheck_params(pinfo):
+    res = [{
+        's': 'sync',
+        'd': 'direct',
+        'a': 'async',
+        'x': 'sync direct'
+    }[pinfo.sync_mode]]
 
-    colors = ['red', 'green', 'blue', 'cyan',
-              'magenta', 'black', 'yellow', 'burlywood']
-    markers = ['*', '^', 'x', 'o', '+', '.']
-    color = 0
-    marker = 0
+    res.append(pinfo.p.rw)
 
-    plot_data = {}
-
-    name_pref = 'linearity_test_rrd'
-
-    for res in processed_results.values():
-        if res.name.startswith(name_pref):
-            iotime = 1000000. / res.iops
-            iotime_max = iotime * (1 + res.dev * 3)
-            bsize = ssize2b(res.raw['blocksize'])
-            plot_data[bsize] = (iotime, iotime_max)
-
-    min_sz = min(plot_data)
-    min_iotime, _ = plot_data.pop(min_sz)
-
-    x = []
-    y = []
-    e = []
-
-    for k, (v, vmax) in sorted(plot_data.items()):
-        y.append(v - min_iotime)
-        x.append(k)
-        e.append(y[-1] - (vmax - min_iotime))
-
-    tp = 'rrd'
-    plt.errorbar(x, y, e, linestyle='None', label=names[tp],
-                 color=colors[color], ecolor="black",
-                 marker=markers[marker])
-    plt.yscale('log')
-    plt.xscale('log')
-    # plt.show()
-
-    # ynew = approximate_line(ax, ay, ax, True)
-    # plt.plot(ax, ynew, color=colors[color])
-    # color += 1
-    # marker += 1
-    # plt.legend(loc=2)
-    # plt.title("Linearity test by %i dots" % (len(vals)))
+    return " ".join(res)
 
 
-if plt:
-    linearity_report = report('linearity', 'linearity_test')(linearity_report)
+def get_emb_data_svg(plt):
+    sio = StringIO()
+    plt.savefig(sio, format='svg')
+    img_start = "<!-- Created with matplotlib (http://matplotlib.org/) -->"
+    return sio.getvalue().split(img_start, 1)[1]
 
 
-def render_all_html(dest, info, lab_description, img_ext, templ_name):
+def get_template(templ_name):
     very_root_dir = os.path.dirname(os.path.dirname(wally.__file__))
     templ_dir = os.path.join(very_root_dir, 'report_templates')
     templ_file = os.path.join(templ_dir, templ_name)
-    templ = open(templ_file, 'r').read()
+    return open(templ_file, 'r').read()
 
+
+@report('linearity', 'linearity_test')
+def linearity_report(processed_results, path, lab_info):
+    labels_and_data = []
+
+    vls = processed_results.values()[0].params.vals.copy()
+    del vls['blocksize']
+
+    for res in processed_results.values():
+        if res.name.startswith('linearity_test'):
+            iotimes = [1000. / val for val in res.iops.raw]
+            labels_and_data.append([res.p.blocksize, res.iops.raw, iotimes])
+            cvls = res.params.vals.copy()
+            del cvls['blocksize']
+            assert cvls == vls
+
+    labels_and_data.sort(key=lambda x: ssize2b(x[0]))
+    _, ax1 = plt.subplots()
+
+    labels, data, iotimes = zip(*labels_and_data)
+    plt.boxplot(iotimes)
+
+    if len(labels_and_data) > 2 and ssize2b(labels_and_data[-2][0]) >= 4096:
+        xt = range(1, len(labels) + 1)
+
+        def io_time(sz, bw, initial_lat):
+            return sz / bw + initial_lat
+
+        x = numpy.array(map(ssize2b, labels))
+        y = numpy.array([sum(dt) / len(dt) for dt in iotimes])
+        popt, _ = scipy.optimize.curve_fit(io_time, x, y, p0=(100., 1.))
+
+        y1 = io_time(x, *popt)
+        plt.plot(xt, y1, linestyle='--', label='LS linear approxomation')
+
+        for idx, (sz, _, _) in enumerate(labels_and_data):
+            if ssize2b(sz) >= 4096:
+                break
+
+        bw = (x[-1] - x[idx]) / (y[-1] - y[idx])
+        lat = y[-1] - x[-1] / bw
+        y2 = io_time(x, bw, lat)
+
+        plt.plot(xt, y2, linestyle='--',
+                 label='(4k & max) linear approxomation')
+
+    plt.setp(ax1, xticklabels=labels)
+
+    plt.xlabel("Block size")
+    plt.ylabel("IO time, ms")
+
+    plt.legend(loc=0)
+    plt.grid()
+    iotime_plot = get_emb_data_svg(plt)
+
+    _, ax1 = plt.subplots()
+    plt.boxplot(data)
+    plt.setp(ax1, xticklabels=labels)
+
+    plt.xlabel("Block size")
+    plt.ylabel("IOPS")
+    plt.grid()
+
+    iops_plot = get_emb_data_svg(plt)
+
+    res1 = processed_results.values()[0]
+    descr = {
+        'vm_count': res1.testnodes_count,
+        'concurence': res1.concurence,
+        'oper_descr': get_test_lcheck_params(res1).capitalize()
+    }
+
+    params_map = {'iotime_vs_size': iotime_plot,
+                  'iops_vs_size': iops_plot,
+                  'descr': descr}
+
+    with open(path, 'w') as fd:
+        fd.write(get_template('report_linearity.html').format(**params_map))
+
+
+@report('lat_vs_iops', 'lat_vs_iops')
+def lat_vs_iops(processed_results, path, lab_info):
+    lat_iops = collections.defaultdict(lambda: [])
+    for res in processed_results.values():
+        if res.name.startswith('lat_vs_iops'):
+            lat_iops[res.concurence].append((res.lat.average / 1000.0,
+                                             res.lat.deviation / 1000.0,
+                                             res.iops.average,
+                                             res.iops.deviation))
+
+    colors = ['red', 'green', 'blue', 'orange', 'magenta', "teal"][::-1]
+    for conc, lat_iops in sorted(lat_iops.items()):
+        lat, dev, iops, iops_dev = zip(*lat_iops)
+        plt.errorbar(iops, lat, xerr=iops_dev, yerr=dev, fmt='ro',
+                     label=str(conc) + " threads",
+                     color=colors.pop())
+
+    plt.xlabel("IOPS")
+    plt.ylabel("Latency, ms")
+    plt.grid()
+    plt.legend(loc=0)
+    plt.show()
+    exit(1)
+
+
+def render_all_html(dest, info, lab_description, images, templ_name):
     data = info.__dict__.copy()
     for name, val in data.items():
         if not name.startswith('__'):
@@ -185,62 +254,25 @@
     data['bw_write_max'] = (data['bw_write_max'][0] // 1024,
                             data['bw_write_max'][1])
 
-    report = templ.format(lab_info=lab_description, img_ext=img_ext,
-                          **data)
-    open(dest, 'w').write(report)
+    images.update(data)
+    report = get_template(templ_name).format(lab_info=lab_description,
+                                             **images)
 
-
-def render_hdd_html(dest, info, lab_description, img_ext):
-    render_all_html(dest, info, lab_description, img_ext,
-                    "report_hdd.html")
-
-
-def render_ceph_html(dest, info, lab_description, img_ext):
-    render_all_html(dest, info, lab_description, img_ext,
-                    "report_ceph.html")
+    with open(dest, 'w') as fd:
+        fd.write(report)
 
 
 def io_chart(title, concurence,
              latv, latv_min, latv_max,
-             iops_or_bw, iops_or_bw_dev,
-             legend, fname):
-    bar_data = iops_or_bw
-    bar_dev = iops_or_bw_dev
-    legend = [legend]
-
-    iops_or_bw_per_vm = []
-    for iops, conc in zip(iops_or_bw, concurence):
-        iops_or_bw_per_vm.append(iops / conc)
-
-    bar_dev_bottom = []
-    bar_dev_top = []
-    for val, err in zip(bar_data, bar_dev):
-        bar_dev_top.append(val + err)
-        bar_dev_bottom.append(val - err)
-
-    charts.render_vertical_bar(title, legend, [bar_data], [bar_dev_top],
-                               [bar_dev_bottom], file_name=fname,
-                               scale_x=concurence, label_x="clients",
-                               label_y=legend[0],
-                               lines=[
-                                    (latv, "msec", "rr", "lat"),
-                                    # (latv_min, None, None, "lat_min"),
-                                    # (latv_max, None, None, "lat_max"),
-                                    (iops_or_bw_per_vm, None, None,
-                                     legend[0] + " per client")
-                                ])
-
-
-def io_chart_mpl(title, concurence,
-                 latv, latv_min, latv_max,
-                 iops_or_bw, iops_or_bw_err,
-                 legend, fname, log=False):
+             iops_or_bw, iops_or_bw_err,
+             legend, log=False,
+             boxplots=False):
     points = " MiBps" if legend == 'BW' else ""
     lc = len(concurence)
     width = 0.35
     xt = range(1, lc + 1)
 
-    op_per_vm = [v / c for v, c in zip(iops_or_bw, concurence)]
+    op_per_vm = [v / (vm * th) for v, (vm, th) in zip(iops_or_bw, concurence)]
     fig, p1 = plt.subplots()
     xpos = [i - width / 2 for i in xt]
 
@@ -252,7 +284,7 @@
            label=legend)
 
     p1.grid(True)
-    p1.plot(xt, op_per_vm, '--', label=legend + "/vm", color='black')
+    p1.plot(xt, op_per_vm, '--', label=legend + "/thread", color='black')
     handles1, labels1 = p1.get_legend_handles_labels()
 
     p2 = p1.twinx()
@@ -261,8 +293,8 @@
     p2.plot(xt, latv_min, label="lat min")
 
     plt.xlim(0.5, lc + 0.5)
-    plt.xticks(xt, map(str, concurence))
-    p1.set_xlabel("Threads")
+    plt.xticks(xt, ["{0} * {1}".format(vm, th) for (vm, th) in concurence])
+    p1.set_xlabel("VM Count * Thread per VM")
     p1.set_ylabel(legend + points)
     p2.set_ylabel("Latency ms")
     plt.title(title)
@@ -270,39 +302,17 @@
 
     plt.legend(handles1 + handles2, labels1 + labels2,
                loc='center left', bbox_to_anchor=(1.1, 0.81))
-    # fontsize='small')
 
     if log:
         p1.set_yscale('log')
         p2.set_yscale('log')
-    plt.subplots_adjust(right=0.7)
-    # plt.show()  # bbox_extra_artists=(leg,), bbox_inches='tight')
-    # exit(1)
-    plt.savefig(fname, format=fname.split('.')[-1])
+    plt.subplots_adjust(right=0.68)
+
+    return get_emb_data_svg(plt)
 
 
-def make_hdd_plots(processed_results, charts_dir):
-    plots = [
-        ('hdd_test_rrd4k', 'rand_read_4k', 'Random read 4k direct IOPS'),
-        ('hdd_test_rws4k', 'rand_write_4k', 'Random write 4k sync IOPS')
-    ]
-    return make_plots(processed_results, charts_dir, plots)
-
-
-def make_ceph_plots(processed_results, charts_dir):
-    plots = [
-        ('ceph_test_rrd4k', 'rand_read_4k', 'Random read 4k direct IOPS'),
-        ('ceph_test_rws4k', 'rand_write_4k', 'Random write 4k sync IOPS'),
-        ('ceph_test_rrd16m', 'rand_read_16m',
-         'Random read 16m direct MiBps'),
-        ('ceph_test_rwd16m', 'rand_write_16m',
-            'Random write 16m direct MiBps'),
-    ]
-    return make_plots(processed_results, charts_dir, plots)
-
-
-def make_plots(processed_results, charts_dir, plots):
-    file_ext = None
+def make_plots(processed_results, plots):
+    files = {}
     for name_pref, fname, desc in plots:
         chart_data = []
 
@@ -313,9 +323,9 @@
         if len(chart_data) == 0:
             raise ValueError("Can't found any date for " + name_pref)
 
-        use_bw = ssize2b(chart_data[0].params['blocksize']) > 16 * 1024
+        use_bw = ssize2b(chart_data[0].p.blocksize) > 16 * 1024
 
-        chart_data.sort(key=lambda x: x.params['concurence'])
+        chart_data.sort(key=lambda x: x.concurence)
 
         #  if x.lat.average < max_lat]
         lat = [x.lat.average / 1000 for x in chart_data]
@@ -323,7 +333,7 @@
         lat_max = [x.lat.max / 1000 for x in chart_data]
 
         testnodes_count = x.testnodes_count
-        concurence = [x.params['concurence'] * testnodes_count
+        concurence = [(testnodes_count, x.concurence)
                       for x in chart_data]
 
         if use_bw:
@@ -335,25 +345,24 @@
             data_dev = [x.iops.confidence for x in chart_data]
             name = "IOPS"
 
-        fname = os.path.join(charts_dir, fname)
-        if plt is not None:
-            io_chart_mpl(desc, concurence, lat, lat_min, lat_max,
-                         data, data_dev, name, fname + '.svg')
-            file_ext = 'svg'
-        else:
-            io_chart(desc, concurence, lat, lat_min, lat_max,
-                     data, data_dev, name, fname + '.png')
-            file_ext = 'png'
-    return file_ext
+        fc = io_chart(title=desc,
+                      concurence=concurence,
+                      latv=lat, latv_min=lat_min, latv_max=lat_max,
+                      iops_or_bw=data,
+                      iops_or_bw_err=data_dev,
+                      legend=name)
+        files[fname] = fc
+
+    return files
 
 
 def find_max_where(processed_results, sync_mode, blocksize, rw, iops=True):
     result = None
     attr = 'iops' if iops else 'bw'
     for measurement in processed_results.values():
-        ok = measurement.params['sync_mode'] == sync_mode
-        ok = ok and (measurement.params['blocksize'] == blocksize)
-        ok = ok and (measurement.params['rw'] == rw)
+        ok = measurement.sync_mode == sync_mode
+        ok = ok and (measurement.p.blocksize == blocksize)
+        ok = ok and (measurement.p.rw == rw)
 
         if ok:
             field = getattr(measurement, attr)
@@ -388,12 +397,12 @@
                                         'd', '1m', 'read', False)
 
     for res in processed_results.values():
-        if res.params['sync_mode'] == 's' and res.params['blocksize'] == '4k':
-            if res.params['rw'] != 'randwrite':
+        if res.sync_mode == 's' and res.p.blocksize == '4k':
+            if res.p.rw != 'randwrite':
                 continue
             rws4k_iops_lat_th.append((res.iops.average,
                                       res.lat.average,
-                                      res.params['concurence']))
+                                      res.concurence))
 
     rws4k_iops_lat_th.sort(key=lambda (_1, _2, conc): conc)
 
@@ -438,21 +447,33 @@
     return hdi
 
 
-@report('HDD', 'hdd_test_rrd4k,hdd_test_rws4k')
-def make_hdd_report(processed_results, path, charts_path, lab_info):
-    img_ext = make_hdd_plots(processed_results, charts_path)
+@report('HDD', 'hdd_test')
+def make_hdd_report(processed_results, path, lab_info):
+    plots = [
+        ('hdd_test_rrd4k', 'rand_read_4k', 'Random read 4k direct IOPS'),
+        ('hdd_test_rws4k', 'rand_write_4k', 'Random write 4k sync IOPS')
+    ]
+    images = make_plots(processed_results, plots)
     di = get_disk_info(processed_results)
-    render_hdd_html(path, di, lab_info, img_ext)
+    render_all_html(path, di, lab_info, images, "report_hdd.html")
 
 
 @report('Ceph', 'ceph_test')
-def make_ceph_report(processed_results, path, charts_path, lab_info):
-    img_ext = make_ceph_plots(processed_results, charts_path)
+def make_ceph_report(processed_results, path, lab_info):
+    plots = [
+        ('ceph_test_rrd4k', 'rand_read_4k', 'Random read 4k direct IOPS'),
+        ('ceph_test_rws4k', 'rand_write_4k', 'Random write 4k sync IOPS'),
+        ('ceph_test_rrd16m', 'rand_read_16m', 'Random read 16m direct MiBps'),
+        ('ceph_test_rwd16m', 'rand_write_16m',
+         'Random write 16m direct MiBps'),
+    ]
+
+    images = make_plots(processed_results, plots)
     di = get_disk_info(processed_results)
-    render_ceph_html(path, di, lab_info, img_ext)
+    render_all_html(path, di, lab_info, images, "report_ceph.html")
 
 
-def make_io_report(dinfo, results, path, charts_path, lab_info=None):
+def make_io_report(dinfo, results, path, lab_info=None):
     lab_info = {
         "total_disk": "None",
         "total_memory": "None",
@@ -461,7 +482,8 @@
     }
 
     try:
-        res_fields = sorted(dinfo.keys())
+        res_fields = sorted(v.name for v in dinfo.values())
+
         for fields, name, func in report_funcs:
             for field in fields:
                 pos = bisect.bisect_left(res_fields, field)
@@ -474,7 +496,7 @@
             else:
                 hpath = path.format(name)
                 logger.debug("Generatins report " + name + " into " + hpath)
-                func(dinfo, hpath, charts_path, lab_info)
+                func(dinfo, hpath, lab_info)
                 break
         else:
             logger.warning("No report generator found for this load")
diff --git a/wally/run_test.py b/wally/run_test.py
index 5322432..22856b5 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -10,7 +10,6 @@
 import argparse
 import functools
 import threading
-import subprocess
 import contextlib
 import collections
 
@@ -23,9 +22,15 @@
 from wally.discover import discover, Node
 from wally.timeseries import SensorDatastore
 from wally import utils, report, ssh_utils, start_vms
+from wally.suits import IOPerfTest, PgBenchTest, MysqlTest
 from wally.config import cfg_dict, load_config, setup_loggers
-from wally.suits.itest import IOPerfTest, PgBenchTest, MysqlTest
-from wally.sensors_utils import deploy_sensors_stage, gather_sensors_stage
+from wally.sensors_utils import with_sensors_util, sensors_info_util
+
+TOOL_TYPE_MAPPER = {
+    "io": IOPerfTest,
+    "pgbench": PgBenchTest,
+    "mysql": MysqlTest,
+}
 
 
 try:
@@ -174,7 +179,8 @@
                         node=node,
                         remote_dir=rem_folder,
                         log_directory=dr,
-                        coordination_queue=coord_q)
+                        coordination_queue=coord_q,
+                        total_nodes_count=len(test_nodes))
         th = threading.Thread(None, test_thread, None,
                               (test, node, barrier, res_q))
         threads.append(th)
@@ -213,19 +219,33 @@
 
             results.append(val)
 
-    results = test_cls.merge_results(results)
     return results
 
 
-def run_tests(cfg, test_block, nodes):
-    tool_type_mapper = {
-        "io": IOPerfTest,
-        "pgbench": PgBenchTest,
-        "mysql": MysqlTest,
-    }
+def suspend_vm_nodes(unused_nodes):
+    pausable_nodes_ids = [node.os_vm_id for node in unused_nodes
+                          if node.os_vm_id is not None]
+    non_pausable = len(unused_nodes) - len(pausable_nodes_ids)
 
+    if 0 != non_pausable:
+        logger.warning("Can't pause {0} nodes".format(
+                       non_pausable))
+
+    if len(pausable_nodes_ids) != 0:
+        logger.debug("Try to pause {0} unused nodes".format(
+                     len(pausable_nodes_ids)))
+        start_vms.pause(pausable_nodes_ids)
+
+    return pausable_nodes_ids
+
+
+def run_tests(cfg, test_block, nodes):
     test_nodes = [node for node in nodes
                   if 'testnode' in node.roles]
+
+    not_test_nodes = [node for node in nodes
+                      if 'testnode' not in node.roles]
+
     if len(test_nodes) == 0:
         logger.error("No test nodes found")
         return
@@ -252,18 +272,7 @@
                 continue
 
             if cfg.get('suspend_unused_vms', True):
-                pausable_nodes_ids = [node.os_vm_id for node in unused_nodes
-                                      if node.os_vm_id is not None]
-                non_pausable = len(unused_nodes) - len(pausable_nodes_ids)
-
-                if 0 != non_pausable:
-                    logger.warning("Can't pause {0} nodes".format(
-                                   non_pausable))
-
-                if len(pausable_nodes_ids) != 0:
-                    logger.debug("Try to pause {0} unused nodes".format(
-                                 len(pausable_nodes_ids)))
-                    start_vms.pause(pausable_nodes_ids)
+                pausable_nodes_ids = suspend_vm_nodes(unused_nodes)
 
             resumable_nodes_ids = [node.os_vm_id for node in curr_test_nodes
                                    if node.os_vm_id is not None]
@@ -273,12 +282,16 @@
                              len(resumable_nodes_ids)))
                 start_vms.unpause(resumable_nodes_ids)
 
-            test_cls = tool_type_mapper[name]
+            test_cls = TOOL_TYPE_MAPPER[name]
             try:
-                res = run_single_test(curr_test_nodes, name, test_cls,
-                                      params,
-                                      cfg['default_test_local_folder'],
-                                      cfg['run_uuid'])
+                sens_nodes = curr_test_nodes + not_test_nodes
+                with sensors_info_util(cfg, sens_nodes) as sensor_data:
+                    t_start = time.time()
+                    res = run_single_test(curr_test_nodes, name, test_cls,
+                                          params,
+                                          cfg['default_test_local_folder'],
+                                          cfg['run_uuid'])
+                    t_end = time.time()
             finally:
                 if cfg.get('suspend_unused_vms', True):
                     if len(pausable_nodes_ids) != 0:
@@ -286,7 +299,14 @@
                                      len(pausable_nodes_ids)))
                         start_vms.unpause(pausable_nodes_ids)
 
-            results.append(res)
+            if sensor_data is not None:
+                fname = "{0}_{1}.csv".format(int(t_start), int(t_end))
+                fpath = os.path.join(cfg['sensor_storage'], fname)
+
+                with open(fpath, "w") as fd:
+                    fd.write("\n\n".join(sensor_data))
+
+            results.extend(res)
 
         yield name, results
 
@@ -365,7 +385,8 @@
 
     for creds in p:
         vm_name_pattern, conn_pattern = creds.split(",")
-        try:
+        msg = "Vm like {0} lookup failed".format(vm_name_pattern)
+        with utils.log_error(msg):
             msg = "Looking for vm with name like {0}".format(vm_name_pattern)
             logger.debug(msg)
 
@@ -379,12 +400,6 @@
                 node = Node(conn_pattern.format(ip=ip), ['testnode'])
                 node.os_vm_id = vm_id
                 ctx.nodes.append(node)
-        except utils.StopTestError:
-            raise
-        except Exception as exc:
-            msg = "Vm like {0} lookup failed".format(vm_name_pattern)
-            logger.exception(msg)
-            raise utils.StopTestError(msg, exc)
 
 
 def get_creds_openrc(path):
@@ -392,24 +407,19 @@
 
     echo = 'echo "$OS_TENANT_NAME:$OS_USERNAME:$OS_PASSWORD@$OS_AUTH_URL"'
 
-    try:
+    msg = "Failed to get creads from openrc file"
+    with utils.log_error(msg):
         data = utils.run_locally(['/bin/bash'],
                                  input_data=fc + "\n" + echo)
-    except subprocess.CalledProcessError as exc:
-        msg = "Failed to get creads from openrc file: " + data
-        logger.exception(msg)
-        raise utils.StopTestError(msg, exc)
 
-    try:
+    msg = "Failed to get creads from openrc file: " + data
+    with utils.log_error(msg):
         data = data.strip()
         user, tenant, passwd_auth_url = data.split(':', 2)
         passwd, auth_url = passwd_auth_url.rsplit("@", 1)
         assert (auth_url.startswith("https://") or
                 auth_url.startswith("http://"))
-    except Exception as exc:
-        msg = "Failed to get creads from openrc file: " + data
-        logger.exception(msg)
-        raise utils.StopTestError(msg, exc)
+
     return user, passwd, tenant, auth_url
 
 
@@ -512,6 +522,7 @@
                                     num_test_nodes)
             with vm_ctx as new_nodes:
                 if len(new_nodes) != 0:
+                    logger.debug("Connecting to new nodes")
                     connect_all(new_nodes, True)
 
                     for node in new_nodes:
@@ -519,17 +530,13 @@
                             msg = "Failed to connect to vm {0}"
                             raise RuntimeError(msg.format(node.get_conn_id()))
 
-                    deploy_sensors_stage(cfg_dict,
-                                         ctx,
-                                         nodes=new_nodes,
-                                         undeploy=False)
-
-                for test_group in config.get('tests', []):
-                    test_res = run_tests(cfg, test_group, ctx.nodes)
-                    ctx.results.extend(test_res)
+                with with_sensors_util(cfg_dict, ctx.nodes):
+                    for test_group in config.get('tests', []):
+                        ctx.results.extend(run_tests(cfg, test_group,
+                                                     ctx.nodes))
         else:
-            test_res = run_tests(cfg, group, ctx.nodes)
-            ctx.results.extend(test_res)
+            with with_sensors_util(cfg_dict, ctx.nodes):
+                ctx.results.extend(run_tests(cfg, group, ctx.nodes))
 
 
 def shut_down_vms_stage(cfg, ctx):
@@ -595,9 +602,7 @@
                 fd.flush()
 
             logger.info("Text report were stored in " + text_rep_fname)
-            print("\n")
-            print(IOPerfTest.format_for_console(data, dinfo))
-            print("\n")
+            print("\n" + rep + "\n")
 
         if tp in ['mysql', 'pgbench'] and data is not None:
             print("\n")
@@ -618,7 +623,6 @@
             found = True
             dinfo = report.process_disk_info(data)
             report.make_io_report(dinfo, data, html_rep_fname,
-                                  cfg['charts_img_path'],
                                   lab_info=ctx.hw_info)
 
 
@@ -629,9 +633,13 @@
 
 
 def load_data_from(var_dir):
-    def load_data_from_file(cfg, ctx):
+    def load_data_from_file(_, ctx):
         raw_results = os.path.join(var_dir, 'raw_results.yaml')
-        ctx.results = yaml.load(open(raw_results).read())
+        ctx.results = []
+        for tp, results in yaml.load(open(raw_results).read()):
+            cls = TOOL_TYPE_MAPPER[tp]
+            ctx.results.append((tp, map(cls.load, results)))
+
     return load_data_from_file
 
 
@@ -681,17 +689,25 @@
     return parser.parse_args(argv[1:])
 
 
-# from plop.collector import Collector
+def get_stage_name(func):
+    if func.__name__.endswith("stage"):
+        return func.__name__
+    else:
+        return func.__name__ + " stage"
 
 
 def main(argv):
-    # collector = Collector()
-    # collector.start()
-
     faulthandler.register(signal.SIGUSR1, all_threads=True)
     opts = parse_args(argv)
     load_config(opts.config_file, opts.post_process_only)
 
+    if cfg_dict.get('logging', {}).get("extra_logs", False) or opts.extra_logs:
+        level = logging.DEBUG
+    else:
+        level = logging.WARNING
+
+    setup_loggers(level, cfg_dict['log_file'])
+
     if opts.post_process_only is not None:
         stages = [
             load_data_from(opts.post_process_only)
@@ -711,10 +727,10 @@
             stages.append(collect_hw_info_stage)
 
         stages.extend([
-            deploy_sensors_stage,
+            # deploy_sensors_stage,
             run_tests_stage,
             store_raw_results_stage,
-            gather_sensors_stage
+            # gather_sensors_stage
         ])
 
     report_stages = [
@@ -724,13 +740,6 @@
     if not opts.no_html_report:
         report_stages.append(html_report_stage)
 
-    if cfg_dict.get('logging', {}).get("extra_logs", False) or opts.extra_logs:
-        level = logging.DEBUG
-    else:
-        level = logging.WARNING
-
-    setup_loggers(level, cfg_dict['log_file'])
-
     logger.info("All info would be stored into {0}".format(
         cfg_dict['var_dir']))
 
@@ -753,10 +762,7 @@
 
     try:
         for stage in stages:
-            if stage.__name__.endswith("stage"):
-                logger.info("Start {0.__name__}".format(stage))
-            else:
-                logger.info("Start {0.__name__} stage".format(stage))
+            logger.info("Start " + get_stage_name(stage))
             stage(cfg_dict, ctx)
     except utils.StopTestError as exc:
         logger.error(msg_templ.format(stage, exc))
@@ -766,10 +772,7 @@
         exc, cls, tb = sys.exc_info()
         for stage in ctx.clear_calls_stack[::-1]:
             try:
-                if stage.__name__.endswith("stage"):
-                    logger.info("Start {0.__name__}".format(stage))
-                else:
-                    logger.info("Start {0.__name__} stage".format(stage))
+                logger.info("Start " + get_stage_name(stage))
                 stage(cfg_dict, ctx)
             except utils.StopTestError as cleanup_exc:
                 logger.error(msg_templ.format(stage, cleanup_exc))
@@ -779,14 +782,16 @@
         logger.debug("Start utils.cleanup")
         for clean_func, args, kwargs in utils.iter_clean_func():
             try:
+                logger.info("Start " + get_stage_name(clean_func))
                 clean_func(*args, **kwargs)
             except utils.StopTestError as cleanup_exc:
-                logger.error(msg_templ.format(stage, cleanup_exc))
+                logger.error(msg_templ.format(clean_func, cleanup_exc))
             except Exception:
-                logger.exception(msg_templ_no_exc.format(stage))
+                logger.exception(msg_templ_no_exc.format(clean_func))
 
     if exc is None:
         for report_stage in report_stages:
+            logger.info("Start " + get_stage_name(report_stage))
             report_stage(cfg_dict, ctx)
 
     logger.info("All info stored in {0} folder".format(cfg_dict['var_dir']))
@@ -794,9 +799,6 @@
     if cfg_dict.get('run_web_ui', False):
         stop_web_ui(cfg_dict, ctx)
 
-    # collector.stop()
-    # open("plop.out", "w").write(repr(dict(collector.stack_counts)))
-
     if exc is None:
         logger.info("Tests finished successfully")
         return 0
diff --git a/wally/sensors/api.py b/wally/sensors/api.py
index e8c6261..52d33ed 100644
--- a/wally/sensors/api.py
+++ b/wally/sensors/api.py
@@ -1,21 +1,15 @@
-import Queue
+import os
+import time
+import json
 import logging
-import threading
+import contextlib
 
-from .deploy_sensors import (deploy_and_start_sensors,
-                             stop_and_remove_sensors)
-from .protocol import create_protocol, Timeout, CantUnpack
+from concurrent.futures import ThreadPoolExecutor
+
+from wally.ssh_utils import (copy_paths, run_over_ssh,
+                             save_to_remote, read_from_remote)
 
 
-__all__ = ['Empty', 'recv_main',
-           'deploy_and_start_sensors',
-           'SensorConfig',
-           'stop_and_remove_sensors',
-           'start_listener_thread',
-           ]
-
-
-Empty = Queue.Empty
 logger = logging.getLogger("wally.sensors")
 
 
@@ -29,40 +23,71 @@
         self.monitor_url = monitor_url
 
 
-def recv_main(proto, data_q, cmd_q):
-    while True:
+@contextlib.contextmanager
+def with_sensors(sensor_configs, remote_path):
+    paths = {os.path.dirname(__file__):
+             os.path.join(remote_path, "sensors")}
+    config_remote_path = os.path.join(remote_path, "conf.json")
+
+    def deploy_sensors(node_sensor_config):
+        copy_paths(node_sensor_config.conn, paths)
+        with node_sensor_config.conn.open_sftp() as sftp:
+            sensors_config = node_sensor_config.sensors.copy()
+            sensors_config['source_id'] = node_sensor_config.source_id
+            save_to_remote(sftp, config_remote_path,
+                           json.dumps(sensors_config))
+
+    def remove_sensors(node_sensor_config):
+        run_over_ssh(node_sensor_config.conn,
+                     "rm -rf {0}".format(remote_path),
+                     node=node_sensor_config.url, timeout=10)
+
+    logger.debug("Installing sensors on {0} nodes".format(len(sensor_configs)))
+    with ThreadPoolExecutor(max_workers=32) as executor:
+        list(executor.map(deploy_sensors, sensor_configs))
         try:
-            ip, packet = proto.recv(0.1)
-            if packet is not None:
-                data_q.put((ip, packet))
-        except AssertionError as exc:
-            logger.warning("Error in sensor data " + str(exc))
-        except Timeout:
-            pass
-        except CantUnpack as exc:
-            print exc
+            yield
+        finally:
+            list(executor.map(remove_sensors, sensor_configs))
 
+
+@contextlib.contextmanager
+def sensors_info(sensor_configs, remote_path):
+    config_remote_path = os.path.join(remote_path, "conf.json")
+
+    def start_sensors(node_sensor_config):
+        cmd_templ = 'env PYTHONPATH="{0}" python -m ' + \
+                    "sensors.main -d start -u {1} {2}"
+
+        cmd = cmd_templ.format(remote_path,
+                               node_sensor_config.monitor_url,
+                               config_remote_path)
+
+        run_over_ssh(node_sensor_config.conn, cmd,
+                     node=node_sensor_config.url)
+
+    def stop_and_gather_data(node_sensor_config):
+        cmd = 'env PYTHONPATH="{0}" python -m sensors.main -d stop'
+        cmd = cmd.format(remote_path)
+        run_over_ssh(node_sensor_config.conn, cmd,
+                     node=node_sensor_config.url)
+        # some magic
+        time.sleep(1)
+
+        assert node_sensor_config.monitor_url.startswith("csvfile://")
+
+        res_path = node_sensor_config.monitor_url.split("//", 1)[1]
+        with node_sensor_config.conn.open_sftp() as sftp:
+            res = read_from_remote(sftp, res_path)
+
+        return res
+
+    results = []
+
+    logger.debug("Starting sensors on {0} nodes".format(len(sensor_configs)))
+    with ThreadPoolExecutor(max_workers=32) as executor:
+        list(executor.map(start_sensors, sensor_configs))
         try:
-            val = cmd_q.get(False)
-
-            if val is None:
-                return
-
-        except Queue.Empty:
-            pass
-
-
-def start_listener_thread(uri):
-    data_q = Queue.Queue()
-    cmd_q = Queue.Queue()
-    logger.debug("Listening for sensor data on " + uri)
-    proto = create_protocol(uri, receiver=True)
-    th = threading.Thread(None, recv_main, None, (proto, data_q, cmd_q))
-    th.daemon = True
-    th.start()
-
-    def stop_thread():
-        cmd_q.put(None)
-        th.join()
-
-    return data_q, stop_thread
+            yield results
+        finally:
+            results.extend(executor.map(stop_and_gather_data, sensor_configs))
diff --git a/wally/sensors/deploy_sensors.py b/wally/sensors/deploy_sensors.py
index 4a1c5df..82ab21a 100644
--- a/wally/sensors/deploy_sensors.py
+++ b/wally/sensors/deploy_sensors.py
@@ -3,9 +3,8 @@
 import os.path
 import logging
 
-from concurrent.futures import ThreadPoolExecutor, wait
-
-from wally.ssh_utils import copy_paths, run_over_ssh
+from wally.ssh_utils import (copy_paths, run_over_ssh,
+                             save_to_remote, read_from_remote)
 
 logger = logging.getLogger('wally.sensors')
 
@@ -34,25 +33,23 @@
 def deploy_and_start_sensor(paths, node_sensor_config, remote_path):
     try:
         copy_paths(node_sensor_config.conn, paths)
-        sftp = node_sensor_config.conn.open_sftp()
+        with node_sensor_config.conn.open_sftp() as sftp:
+            config_remote_path = os.path.join(remote_path, "conf.json")
 
-        config_remote_path = os.path.join(remote_path, "conf.json")
+            sensors_config = node_sensor_config.sensors.copy()
+            sensors_config['source_id'] = node_sensor_config.source_id
+            with sftp.open(config_remote_path, "w") as fd:
+                fd.write(json.dumps(sensors_config))
 
-        sensors_config = node_sensor_config.sensors.copy()
-        sensors_config['source_id'] = node_sensor_config.source_id
-        with sftp.open(config_remote_path, "w") as fd:
-            fd.write(json.dumps(sensors_config))
+            cmd_templ = 'env PYTHONPATH="{0}" python -m ' + \
+                        "sensors.main -d start -u {1} {2}"
 
-        cmd_templ = 'env PYTHONPATH="{0}" python -m ' + \
-                    "sensors.main -d start -u {1} {2}"
+            cmd = cmd_templ.format(os.path.dirname(remote_path),
+                                   node_sensor_config.monitor_url,
+                                   config_remote_path)
 
-        cmd = cmd_templ.format(os.path.dirname(remote_path),
-                               node_sensor_config.monitor_url,
-                               config_remote_path)
-
-        run_over_ssh(node_sensor_config.conn, cmd,
-                     node=node_sensor_config.url)
-        sftp.close()
+            run_over_ssh(node_sensor_config.conn, cmd,
+                         node=node_sensor_config.url)
 
     except:
         msg = "During deploing sensors on {0}".format(node_sensor_config.url)
diff --git a/wally/sensors/main.py b/wally/sensors/main.py
index 2d0bc81..20eedc5 100644
--- a/wally/sensors/main.py
+++ b/wally/sensors/main.py
@@ -35,7 +35,9 @@
 def parse_args(args):
     parser = argparse.ArgumentParser()
     parser.add_argument('-d', '--daemon',
-                        choices=('start', 'stop', 'status'),
+                        choices=('start', 'stop', 'status',
+                                 'start_monitoring', 'stop_monitoring',
+                                 'dump_ram_data'),
                         default=None)
 
     parser.add_argument('-u', '--url', default='stdout://')
diff --git a/wally/sensors/protocol.py b/wally/sensors/protocol.py
index c053011..7c8aa0e 100644
--- a/wally/sensors/protocol.py
+++ b/wally/sensors/protocol.py
@@ -188,6 +188,7 @@
     def send(self, data):
         if self.headers is None:
             self.headers = sorted(data)
+            self.headers.remove('source_id')
 
             for pos, header in enumerate(self.headers):
                 self.line_format += "{%s:>%s}" % (pos,
@@ -197,6 +198,7 @@
             print self.line_format.format(*self.headers)
 
         if self.delta:
+
             vals = [data[header].value - self.prev.get(header, 0)
                     for header in self.headers]
 
@@ -219,7 +221,7 @@
 
 
 class CSVFileTransport(ITransport):
-    required_keys = set(['time', 'source_id', 'hostname'])
+    required_keys = set(['time', 'source_id'])
 
     def __init__(self, receiver, fname):
         ITransport.__init__(self, receiver)
@@ -234,10 +236,25 @@
             assert self.required_keys.issubset(keys)
             keys -= self.required_keys
             self.field_list = sorted(keys)
-            self.csv_fd.writerow([data['source_id'], data['hostname']] +
+            self.csv_fd.writerow([data['source_id'], socket.getfqdn()] +
                                  self.field_list)
+            self.field_list = ['time'] + self.field_list
 
-        self.csv_fd.writerow(map(data.__getitem__, ['time'] + self.field_list))
+        self.csv_fd.writerow([data[sens].value for sens in self.field_list])
+
+
+class RAMTransport(ITransport):
+    def __init__(self, next_tr):
+        self.next = next_tr
+        self.data = []
+
+    def send(self, data):
+        self.data.append(data)
+
+    def flush(self):
+        for data in self.data:
+            self.next.send(data)
+        self.data = []
 
 
 class UDPTransport(ITransport):
@@ -269,10 +286,11 @@
 
 
 def create_protocol(uri, receiver=False):
-    parsed_uri = urlparse(uri)
-    if parsed_uri.scheme == 'stdout':
+    if uri == 'stdout':
         return StdoutTransport(receiver)
-    elif parsed_uri.scheme == 'udp':
+
+    parsed_uri = urlparse(uri)
+    if parsed_uri.scheme == 'udp':
         ip, port = parsed_uri.netloc.split(":")
 
         if receiver:
@@ -286,6 +304,9 @@
         return FileTransport(receiver, parsed_uri.path)
     elif parsed_uri.scheme == 'csvfile':
         return CSVFileTransport(receiver, parsed_uri.path)
+    elif parsed_uri.scheme == 'ram':
+        intenal_recv = CSVFileTransport(receiver, parsed_uri.path)
+        return RAMTransport(intenal_recv)
     else:
         templ = "Can't instantiate transport from {0!r}"
         raise ValueError(templ.format(uri))
diff --git a/wally/sensors_utils.py b/wally/sensors_utils.py
index 65de0ef..61a5c08 100644
--- a/wally/sensors_utils.py
+++ b/wally/sensors_utils.py
@@ -1,71 +1,19 @@
-import csv
-import time
-import Queue
+import os.path
 import logging
-import threading
+import contextlib
 
-from wally import utils
-from wally.config import cfg_dict
-from wally.sensors.api import (start_listener_thread,
-                               deploy_and_start_sensors,
-                               SensorConfig,
-                               stop_and_remove_sensors)
+from wally.sensors.api import (with_sensors, sensors_info, SensorConfig)
+
 
 logger = logging.getLogger("wally.sensors")
-DEFAULT_RECEIVER_URL = "udp://{ip}:5699"
 
 
-def save_sensors_data(data_q, mon_q, fd, data_store, source2roles_map):
-    fd.write("\n")
-
-    observed_nodes = set()
-    fields_list_for_nodes = {}
-    required_keys = set(['time', 'source_id', 'hostname'])
-
-    try:
-        csv_fd = csv.writer(fd)
-        while True:
-            val = data_q.get()
-            if val is None:
-                break
-
-            addr, data = val
-            if addr not in observed_nodes:
-                mon_q.put(addr + (data['source_id'],))
-                observed_nodes.add(addr)
-                keys = set(data)
-                assert required_keys.issubset(keys)
-                keys -= required_keys
-
-                fields_list_for_nodes[addr] = sorted(keys)
-                csv_fd.writerow([addr[0], addr[1],
-                                 data['source_id'], data['hostname']] +
-                                fields_list_for_nodes[addr])
-
-            csv_fd.writerow([addr[0], addr[1]] +
-                            map(data.__getitem__,
-                                ['time'] + fields_list_for_nodes[addr]))
-
-            # fd.write(repr((addr, data)) + "\n")
-            # source_id = data.pop('source_id')
-            # rep_time = data.pop('time')
-            # if 'testnode' in source2roles_map.get(source_id, []):
-            #     sum_io_q = 0
-            #     data_store.update_values(rep_time,
-            #                              {"testnodes:io": sum_io_q},
-            #                              add=True)
-    except Exception:
-        logger.exception("Error in sensors thread")
-    logger.info("Sensors thread exits")
-
-
-def get_sensors_config_for_nodes(cfg, nodes):
+def get_sensors_config_for_nodes(cfg, nodes, remote_path):
     monitored_nodes = []
     sensors_configs = []
     source2roles_map = {}
 
-    receiver_url = cfg.get("receiver_url", DEFAULT_RECEIVER_URL)
-    assert '{ip}' in receiver_url
+    receiver_url = "csvfile://" + os.path.join(remote_path, "results.csv")
 
     for role, sensors_str in cfg["roles_mapping"].items():
         sensors = [sens.strip() for sens in sensors_str.split(",")]
@@ -74,137 +22,41 @@
 
         for node in nodes:
             if role in node.roles:
-
-                if node.monitor_ip is not None:
-                    monitor_url = receiver_url.format(ip=node.monitor_ip)
-                else:
-                    ip = node.get_ip()
-                    ext_ip = utils.get_ip_for_target(ip)
-                    monitor_url = receiver_url.format(ip=ext_ip)
-
                 source2roles_map[node.get_conn_id()] = node.roles
                 monitored_nodes.append(node)
                 sens_cfg = SensorConfig(node.connection,
                                         node.get_conn_id(),
                                         collect_cfg,
                                         source_id=node.get_conn_id(),
-                                        monitor_url=monitor_url)
+                                        monitor_url=receiver_url)
                 sensors_configs.append(sens_cfg)
 
     return monitored_nodes, sensors_configs, source2roles_map
 
 
-def start_sensor_process_thread(ctx, cfg, sensors_configs, source2roles_map):
-    receiver_url = cfg.get('receiver_url', DEFAULT_RECEIVER_URL)
-    sensors_data_q, stop_sensors_loop = \
-        start_listener_thread(receiver_url.format(ip='0.0.0.0'))
-
-    mon_q = Queue.Queue()
-    fd = open(cfg_dict['sensor_storage'], "w")
-
-    params = sensors_data_q, mon_q, fd, ctx.sensors_data, source2roles_map
-    sensor_listen_th = threading.Thread(None, save_sensors_data, None,
-                                        params)
-    sensor_listen_th.daemon = True
-    sensor_listen_th.start()
-
-    def stop_sensors_receiver(cfg, ctx):
-        stop_sensors_loop()
-        sensors_data_q.put(None)
-        sensor_listen_th.join()
-
-    ctx.clear_calls_stack.append(stop_sensors_receiver)
-    return mon_q
-
-
-def deploy_sensors_stage(cfg, ctx, nodes=None, undeploy=True,
-                         recv_timeout=10, ignore_nodata=False):
-
-    cfg = cfg.get('sensors')
-    if cfg is None:
+@contextlib.contextmanager
+def with_sensors_util(cfg, nodes):
+    if 'sensors' not in cfg:
+        yield
         return
 
-    if nodes is None:
-        nodes = ctx.nodes
-
     monitored_nodes, sensors_configs, source2roles_map = \
-        get_sensors_config_for_nodes(cfg, nodes)
+        get_sensors_config_for_nodes(cfg['sensors'], nodes,
+                                     cfg['sensors_remote_path'])
 
-    if len(monitored_nodes) == 0:
-        logger.info("Nothing to monitor, no sensors would be installed")
+    with with_sensors(sensors_configs, cfg['sensors_remote_path']):
+        yield source2roles_map
+
+
+@contextlib.contextmanager
+def sensors_info_util(cfg, nodes):
+    if 'sensors' not in cfg:
+        yield None
         return
 
-    is_online = cfg.get('online', False)
+    _, sensors_configs, _ = \
+        get_sensors_config_for_nodes(cfg['sensors'], nodes,
+                                     cfg['sensors_remote_path'])
 
-    if is_online:
-        if ctx.sensors_mon_q is None:
-            logger.info("Start sensors data receiving thread")
-            ctx.sensors_mon_q = start_sensor_process_thread(ctx, cfg,
-                                                            sensors_configs,
-                                                            source2roles_map)
-
-    if undeploy:
-        def remove_sensors_stage(cfg, ctx):
-            _, sensors_configs, _ = \
-                get_sensors_config_for_nodes(cfg['sensors'], nodes)
-            stop_and_remove_sensors(sensors_configs)
-
-        ctx.clear_calls_stack.append(remove_sensors_stage)
-
-    num_monitoref_nodes = len(sensors_configs)
-    logger.info("Deploing new sensors on {0} node(s)".format(
-                                num_monitoref_nodes))
-
-    deploy_and_start_sensors(sensors_configs)
-
-    if is_online:
-        wait_for_new_sensors_data(ctx, monitored_nodes, recv_timeout,
-                                  ignore_nodata)
-
-
-def gather_sensors_stage(cfg, ctx, nodes=None):
-    cfg = cfg.get('sensors')
-    if cfg is None:
-        return
-
-    is_online = cfg.get('online', False)
-    if is_online:
-        return
-
-    if nodes is None:
-        nodes = ctx.nodes
-
-    _, sensors_configs, _ = get_sensors_config_for_nodes(cfg, nodes)
-    gather_sensors_info(sensors_configs)
-
-
-def gather_sensors_info(sensors_configs):
-    pass
-
-
-def wait_for_new_sensors_data(ctx, monitored_nodes, recv_timeout,
-                              ignore_nodata):
-    etime = time.time() + recv_timeout
-
-    msg = "Waiting at most {0}s till all {1} nodes starts report sensors data"
-    nodes_ids = set(node.get_conn_id() for node in monitored_nodes)
-    logger.debug(msg.format(recv_timeout, len(nodes_ids)))
-
-    # wait till all nodes start sending data
-    while len(nodes_ids) != 0:
-        tleft = etime - time.time()
-        try:
-            source_id = ctx.sensors_mon_q.get(True, tleft)[2]
-        except Queue.Empty:
-            if not ignore_nodata:
-                msg = "Node(s) {0} not sending any sensor data in {1}s"
-                msg = msg.format(", ".join(nodes_ids), recv_timeout)
-                raise RuntimeError(msg)
-            else:
-                return
-
-        if source_id not in nodes_ids:
-            msg = "Receive sensors from extra node: {0}".format(source_id)
-            logger.warning(msg)
-
-        nodes_ids.remove(source_id)
+    with sensors_info(sensors_configs, cfg['sensors_remote_path']) as res:
+        yield res
diff --git a/wally/ssh_utils.py b/wally/ssh_utils.py
index 45ca892..1e8b647 100644
--- a/wally/ssh_utils.py
+++ b/wally/ssh_utils.py
@@ -1,6 +1,7 @@
 import re
 import time
 import errno
+import random
 import socket
 import shutil
 import logging
@@ -356,18 +357,78 @@
 all_sessions = {}
 
 
-def start_in_bg(conn, cmd, capture_out=False, **params):
-    assert not capture_out
-    pid = run_over_ssh(conn, "nohup {0} 2>&1 >/dev/null & echo $!",
-                       timeout=10, **params)
-    return int(pid.strip()), None, None
+class BGSSHTask(object):
+    def __init__(self, node, use_sudo):
+        self.node = node
+        self.pid = None
+        self.use_sudo = use_sudo
 
+    def start(self, orig_cmd, **params):
+        uniq_name = 'test'
+        cmd = "screen -S {0} -d -m {1}".format(uniq_name, orig_cmd)
+        run_over_ssh(self.node.connection, cmd,
+                     timeout=10, node=self.node.get_conn_id(),
+                     **params)
+        processes = run_over_ssh(self.node.connection, "ps aux", nolog=True)
 
-def check_running(conn, pid):
-    try:
-        run_over_ssh(conn, "ls /proc/{0}", timeout=10, nolog=True)
-    except OSError:
-        return False
+        for proc in processes.split("\n"):
+            if orig_cmd in proc and "SCREEN" not in proc:
+                self.pid = proc.split()[1]
+                break
+        else:
+            self.pid = -1
+
+    def check_running(self):
+        assert self.pid is not None
+        try:
+            run_over_ssh(self.node.connection,
+                         "ls /proc/{0}".format(self.pid),
+                         timeout=10, nolog=True)
+            return True
+        except OSError:
+            return False
+        # try:
+        #     sftp.stat("/proc/{0}".format(pid))
+        #     return True
+        # except (OSError, IOError, NameError):
+        #     return False
+
+    def kill(self, soft=True, use_sudo=True):
+        assert self.pid is not None
+        try:
+            if soft:
+                cmd = "kill {0}"
+            else:
+                cmd = "kill -9 {0}"
+
+            if self.use_sudo:
+                cmd = "sudo " + cmd
+
+            run_over_ssh(self.node.connection,
+                         cmd.format(self.pid), nolog=True)
+            return True
+        except OSError:
+            return False
+
+    def wait(self, soft_timeout, timeout):
+        end_of_wait_time = timeout + time.time()
+        soft_end_of_wait_time = soft_timeout + time.time()
+        time_till_check = random.randint(5, 10)
+
+        while self.check_running() and time.time() < soft_end_of_wait_time:
+            time.sleep(soft_end_of_wait_time - time.time())
+
+        while end_of_wait_time > time.time():
+            time.sleep(time_till_check)
+            if not self.check_running():
+                break
+        else:
+            self.kill()
+            time.sleep(3)
+            if self.check_running():
+                self.kill(soft=False)
+            return False
+        return True
 
 
 def run_over_ssh(conn, cmd, stdin_data=None, timeout=60,
@@ -429,9 +490,14 @@
 
         code = session.recv_exit_status()
     finally:
+        found = False
         with all_sessions_lock:
-            del all_sessions[id(session)]
-        session.close()
+            if id(session) in all_sessions:
+                found = True
+                del all_sessions[id(session)]
+
+        if found:
+            session.close()
 
     if code != 0:
         templ = "SSH:{0} Cmd {1!r} failed with code {2}. Output: {3}"
diff --git a/wally/start_vms.py b/wally/start_vms.py
index 7e1c687..3ab4383 100644
--- a/wally/start_vms.py
+++ b/wally/start_vms.py
@@ -259,7 +259,7 @@
     return vol
 
 
-def wait_for_server_active(nova, server, timeout=240):
+def wait_for_server_active(nova, server, timeout=300):
     t = time.time()
     while True:
         time.sleep(1)
@@ -291,7 +291,7 @@
 
 
 def launch_vms(params, already_has_count=0):
-    logger.debug("Starting new nodes on openstack")
+    logger.debug("Calculating new vm count")
     count = params['count']
     nova = nova_connect()
     lst = nova.services.list(binary='nova-compute')
@@ -305,8 +305,11 @@
             count = int(count[1:]) - already_has_count
 
     if count <= 0:
+        logger.debug("Not need new vms")
         return
 
+    logger.debug("Starting new nodes on openstack")
+
     assert isinstance(count, (int, long))
 
     srv_params = "img: {image[name]}, flavor: {flavor[name]}".format(**params)
diff --git a/wally/statistic.py b/wally/statistic.py
index 8180619..74ce572 100644
--- a/wally/statistic.py
+++ b/wally/statistic.py
@@ -144,6 +144,7 @@
         self.confidence = None
         self.min = None
         self.max = None
+        self.raw = None
 
     def rounded_average_conf(self):
         return round_deviation((self.average, self.confidence))
@@ -184,4 +185,5 @@
     else:
         res.confidence = res.deviation
 
+    res.raw = data[:]
     return res
diff --git a/wally/suits/__init__.py b/wally/suits/__init__.py
index 7b6610e..c4e8854 100644
--- a/wally/suits/__init__.py
+++ b/wally/suits/__init__.py
@@ -1,3 +1,5 @@
-from .itest import TwoScriptTest, PgBenchTest, IOPerfTest
+from .io import IOPerfTest
+from .mysql import MysqlTest
+from .postgres import PgBenchTest
 
-__all__ = ["TwoScriptTest", "PgBenchTest", "IOPerfTest"]
+__all__ = ["MysqlTest", "PgBenchTest", "IOPerfTest"]
diff --git a/wally/suits/io/__init__.py b/wally/suits/io/__init__.py
index e69de29..4828850 100644
--- a/wally/suits/io/__init__.py
+++ b/wally/suits/io/__init__.py
@@ -0,0 +1,330 @@
+import time
+import json
+import os.path
+import logging
+import datetime
+
+from wally.utils import (ssize2b, open_for_append_or_create,
+                         sec_to_str, StopTestError)
+
+from wally.ssh_utils import save_to_remote, read_from_remote, BGSSHTask
+
+from ..itest import IPerfTest, TestResults
+from .formatter import format_results_for_console
+from .fio_task_parser import (execution_time, fio_cfg_compile,
+                              get_test_summary, FioJobSection)
+
+
+logger = logging.getLogger("wally")
+
+
+class IOTestResults(TestResults):
+    def summary(self):
+        return get_test_summary(self.config) + "vm" + str(self.vm_count)
+
+    def get_yamable(self):
+        return {
+            'type': "fio_test",
+            'params': self.params,
+            'config': (self.config.name, self.config.vals),
+            'results': self.results,
+            'raw_result': self.raw_result,
+            'run_interval': self.run_interval,
+            'vm_count': self.vm_count
+        }
+
+    @classmethod
+    def from_yaml(cls, data):
+        name, vals = data['config']
+        sec = FioJobSection(name)
+        sec.vals = vals
+
+        return cls(sec, data['params'], data['results'],
+                   data['raw_result'], data['run_interval'],
+                   data['vm_count'])
+
+
+def get_slice_parts_offset(test_slice, real_inteval):
+    calc_exec_time = sum(map(execution_time, test_slice))
+    coef = (real_inteval[1] - real_inteval[0]) / calc_exec_time
+    curr_offset = real_inteval[0]
+    for section in test_slice:
+        slen = execution_time(section) * coef
+        yield (curr_offset, curr_offset + slen)
+        curr_offset += slen
+
+
+class IOPerfTest(IPerfTest):
+    tcp_conn_timeout = 30
+    max_pig_timeout = 5
+    soft_runcycle = 5 * 60
+
+    def __init__(self, *dt, **mp):
+        IPerfTest.__init__(self, *dt, **mp)
+        self.config_fname = self.options['cfg']
+
+        if '/' not in self.config_fname and '.' not in self.config_fname:
+            cfgs_dir = os.path.dirname(__file__)
+            self.config_fname = os.path.join(cfgs_dir,
+                                             self.config_fname + '.cfg')
+
+        self.alive_check_interval = self.options.get('alive_check_interval')
+
+        self.config_params = self.options.get('params', {}).copy()
+        self.tool = self.options.get('tool', 'fio')
+
+        raw_res = os.path.join(self.log_directory, "raw_results.txt")
+        self.fio_raw_results_file = open_for_append_or_create(raw_res)
+
+        self.io_py_remote = self.join_remote("agent.py")
+        self.results_file = self.join_remote("results.json")
+        self.pid_file = self.join_remote("pid")
+        self.task_file = self.join_remote("task.cfg")
+        self.use_sudo = self.options.get("use_sudo", True)
+        self.test_logging = self.options.get("test_logging", False)
+        self.raw_cfg = open(self.config_fname).read()
+        self.fio_configs = fio_cfg_compile(self.raw_cfg,
+                                           self.config_fname,
+                                           self.config_params,
+                                           split_on_names=self.test_logging)
+        self.fio_configs = list(self.fio_configs)
+
+        cmd_log = os.path.join(self.log_directory, "task_compiled.cfg")
+        fio_command_file = open_for_append_or_create(cmd_log)
+        splitter = "\n\n" + "-" * 60 + "\n\n"
+        fio_command_file.write(splitter.join(map(str, self.fio_configs)))
+
+    def __str__(self):
+        return "{0}({1})".format(self.__class__.__name__,
+                                 self.node.get_conn_id())
+
+    @classmethod
+    def load(cls, data):
+        return IOTestResults.from_yaml(data)
+
+    def cleanup(self):
+        # delete_file(conn, self.io_py_remote)
+        # Need to remove tempo files, used for testing
+        pass
+
+    def prefill_test_files(self):
+        files = {}
+        for cfg_slice in self.fio_configs:
+            for section in cfg_slice:
+                sz = ssize2b(section.vals['size'])
+                msz = sz / (1024 ** 2)
+
+                if sz % (1024 ** 2) != 0:
+                    msz += 1
+
+                fname = section.vals['filename']
+
+                # if already has other test with the same file name
+                # take largest size
+                files[fname] = max(files.get(fname, 0), msz)
+
+        cmd_templ = "dd oflag=direct " + \
+                    "if=/dev/zero of={0} bs={1} count={2}"
+
+        if self.use_sudo:
+            cmd_templ = "sudo " + cmd_templ
+
+        ssize = 0
+        stime = time.time()
+
+        for fname, curr_sz in files.items():
+            cmd = cmd_templ.format(fname, 1024 ** 2, curr_sz)
+            ssize += curr_sz
+            self.run_over_ssh(cmd, timeout=curr_sz)
+
+        ddtime = time.time() - stime
+        if ddtime > 1E-3:
+            fill_bw = int(ssize / ddtime)
+            mess = "Initiall dd fill bw is {0} MiBps for this vm"
+            logger.info(mess.format(fill_bw))
+            self.coordinate(('init_bw', fill_bw))
+
+    def install_utils(self, max_retry=3, timeout=5):
+        need_install = []
+        for bin_name, package in (('fio', 'fio'), ('screen', 'screen')):
+            try:
+                self.run_over_ssh('which ' + bin_name, nolog=True)
+            except OSError:
+                need_install.append(package)
+
+        if len(need_install) == 0:
+            return
+
+        cmd = "sudo apt-get -y install " + " ".join(need_install)
+
+        for i in range(max_retry):
+            try:
+                self.run_over_ssh(cmd)
+                break
+            except OSError as err:
+                time.sleep(timeout)
+        else:
+            raise OSError("Can't install - " + str(err))
+
+    def pre_run(self):
+        try:
+            cmd = 'mkdir -p "{0}"'.format(self.remote_dir)
+            if self.use_sudo:
+                cmd = "sudo " + cmd
+                cmd += " ; sudo chown {0} {1}".format(self.node.get_user(),
+                                                      self.remote_dir)
+
+            self.run_over_ssh(cmd)
+        except Exception as exc:
+            msg = "Failed to create folder {0} on remote {1}. Error: {2!s}"
+            msg = msg.format(self.remote_dir, self.node.get_conn_id(), exc)
+            logger.exception(msg)
+            raise StopTestError(msg, exc)
+
+        self.install_utils()
+
+        if self.options.get('prefill_files', True):
+            self.prefill_test_files()
+        elif self.is_primary:
+            logger.warning("Prefilling of test files is disabled")
+
+    def run(self, barrier):
+        try:
+            if len(self.fio_configs) > 1 and self.is_primary:
+
+                exec_time = 0
+                for test_slice in self.fio_configs:
+                    exec_time += sum(map(execution_time, test_slice))
+
+                # +10% - is a rough estimation for additional operations
+                # like sftp, etc
+                exec_time = int(exec_time * 1.1)
+
+                exec_time_s = sec_to_str(exec_time)
+                now_dt = datetime.datetime.now()
+                end_dt = now_dt + datetime.timedelta(0, exec_time)
+                msg = "Entire test should takes aroud: {0} and finished at {1}"
+                logger.info(msg.format(exec_time_s,
+                                       end_dt.strftime("%H:%M:%S")))
+
+            for pos, fio_cfg_slice in enumerate(self.fio_configs):
+                fio_cfg_slice = list(fio_cfg_slice)
+                names = [i.name for i in fio_cfg_slice]
+                msgs = []
+                already_processed = set()
+                for name in names:
+                    if name not in already_processed:
+                        already_processed.add(name)
+
+                        if 1 == names.count(name):
+                            msgs.append(name)
+                        else:
+                            frmt = "{0} * {1}"
+                            msgs.append(frmt.format(name,
+                                                    names.count(name)))
+
+                if self.is_primary:
+                    logger.info("Will run tests: " + ", ".join(msgs))
+
+                nolog = (pos != 0) or not self.is_primary
+                out_err, interval = self.do_run(barrier, fio_cfg_slice,
+                                                nolog=nolog)
+
+                try:
+                    full_raw_res = json.loads(out_err)
+
+                    res = {"bw": [], "iops": [], "lat": [],
+                           "clat": [], "slat": []}
+
+                    for raw_result in full_raw_res['jobs']:
+                        load_data = raw_result['mixed']
+
+                        res["bw"].append(load_data["bw"])
+                        res["iops"].append(load_data["iops"])
+                        res["lat"].append(load_data["lat"]["mean"])
+                        res["clat"].append(load_data["clat"]["mean"])
+                        res["slat"].append(load_data["slat"]["mean"])
+
+                    first = fio_cfg_slice[0]
+                    p1 = first.vals.copy()
+                    p1.pop('ramp_time', 0)
+
+                    for nxt in fio_cfg_slice[1:]:
+                        assert nxt.name == first.name
+                        p2 = nxt.vals
+                        p2.pop('_ramp_time', 0)
+
+                        assert p1 == p2
+
+                    tres = IOTestResults(first,
+                                         self.config_params, res,
+                                         full_raw_res, interval,
+                                         vm_count=self.total_nodes_count)
+                    self.on_result_cb(tres)
+                except (OSError, StopTestError):
+                    raise
+                except Exception as exc:
+                    msg_templ = "Error during postprocessing results: {0!s}"
+                    raise RuntimeError(msg_templ.format(exc))
+
+        finally:
+            barrier.exit()
+
+    def do_run(self, barrier, cfg_slice, nolog=False):
+        # return open("/tmp/lit-sunshine/io/results.json").read(), (1, 2)
+        conn_id = self.node.get_conn_id()
+
+        cmd_templ = "fio --output-format=json --output={1} " + \
+                    "--alloc-size=262144 {0}"
+
+        if self.options.get("use_sudo", True):
+            cmd_templ = "sudo " + cmd_templ
+
+        task_fc = "\n\n".join(map(str, cfg_slice))
+        with self.node.connection.open_sftp() as sftp:
+            save_to_remote(sftp, self.task_file, task_fc)
+
+        cmd = cmd_templ.format(self.task_file, self.results_file)
+
+        exec_time = sum(map(execution_time, cfg_slice))
+        exec_time_str = sec_to_str(exec_time)
+
+        timeout = int(exec_time + max(300, exec_time))
+        soft_tout = exec_time
+        barrier.wait()
+
+        if self.is_primary:
+            templ = "Test should takes about {0}." + \
+                    " Should finish at {1}," + \
+                    " will wait at most till {2}"
+            now_dt = datetime.datetime.now()
+            end_dt = now_dt + datetime.timedelta(0, exec_time)
+            wait_till = now_dt + datetime.timedelta(0, timeout)
+
+            logger.info(templ.format(exec_time_str,
+                                     end_dt.strftime("%H:%M:%S"),
+                                     wait_till.strftime("%H:%M:%S")))
+
+        task = BGSSHTask(self.node, self.options.get("use_sudo", True))
+        begin = time.time()
+        task.start(cmd)
+        task.wait(soft_tout, timeout)
+        end = time.time()
+
+        if not nolog:
+            logger.debug("Test on node {0} is finished".format(conn_id))
+
+        with self.node.connection.open_sftp() as sftp:
+            return read_from_remote(sftp, self.results_file), (begin, end)
+
+    @classmethod
+    def merge_results(cls, results):
+        merged = results[0]
+        for block in results[1:]:
+            assert block["__meta__"] == merged["__meta__"]
+            merged['res'].extend(block['res'])
+        return merged
+
+    @classmethod
+    def format_for_console(cls, data, dinfo):
+        return format_results_for_console(dinfo)
diff --git a/wally/suits/io/agent.py b/wally/suits/io/agent.py
deleted file mode 100644
index 3c3e436..0000000
--- a/wally/suits/io/agent.py
+++ /dev/null
@@ -1,672 +0,0 @@
-import os
-import sys
-import time
-import json
-import copy
-import select
-import pprint
-import os.path
-import argparse
-import traceback
-import subprocess
-import itertools
-from collections import OrderedDict
-
-
-SECTION = 0
-SETTING = 1
-
-
-class FioJobSection(object):
-    def __init__(self, name):
-        self.name = name
-        self.vals = OrderedDict()
-        self.format_params = {}
-
-    def copy(self):
-        return copy.deepcopy(self)
-
-
-def to_bytes(sz):
-    sz = sz.lower()
-    try:
-        return int(sz)
-    except ValueError:
-        if sz[-1] == 'm':
-            return (1024 ** 2) * int(sz[:-1])
-        if sz[-1] == 'k':
-            return 1024 * int(sz[:-1])
-        if sz[-1] == 'g':
-            return (1024 ** 3) * int(sz[:-1])
-        raise
-
-
-def fio_config_lexer(fio_cfg):
-    for lineno, line in enumerate(fio_cfg.split("\n")):
-        try:
-            line = line.strip()
-
-            if line.startswith("#") or line.startswith(";"):
-                continue
-
-            if line == "":
-                continue
-
-            if line.startswith('['):
-                assert line.endswith(']'), "name should ends with ]"
-                yield lineno, SECTION, line[1:-1], None
-            elif '=' in line:
-                opt_name, opt_val = line.split('=', 1)
-                yield lineno, SETTING, opt_name.strip(), opt_val.strip()
-            else:
-                yield lineno, SETTING, line, '1'
-        except Exception as exc:
-            pref = "During parsing line number {0}\n{1!s}".format(lineno, exc)
-            raise ValueError(pref)
-
-
-def fio_config_parse(lexer_iter, format_params):
-    orig_format_params_keys = set(format_params)
-    format_params = format_params.copy()
-    in_defaults = False
-    curr_section = None
-    defaults = OrderedDict()
-
-    for lineno, tp, name, val in lexer_iter:
-        if tp == SECTION:
-            if curr_section is not None:
-                yield curr_section
-
-            if name == 'defaults':
-                in_defaults = True
-                curr_section = None
-            else:
-                in_defaults = False
-                curr_section = FioJobSection(name)
-                curr_section.format_params = format_params.copy()
-                curr_section.vals = defaults.copy()
-        else:
-            assert tp == SETTING
-            if name == name.upper():
-                msg = "Param not in default section in line " + str(lineno)
-                assert in_defaults, msg
-                if name not in orig_format_params_keys:
-                    # don't make parse_value for PARAMS
-                    # they would be parsed later
-                    # or this would breakes arrays
-                    format_params[name] = val
-            elif in_defaults:
-                defaults[name] = parse_value(val)
-            else:
-                msg = "data outside section, line " + str(lineno)
-                assert curr_section is not None, msg
-                curr_section.vals[name] = parse_value(val)
-
-    if curr_section is not None:
-        yield curr_section
-
-
-def parse_value(val):
-    try:
-        return int(val)
-    except ValueError:
-        pass
-
-    try:
-        return float(val)
-    except ValueError:
-        pass
-
-    if val.startswith('{%'):
-        assert val.endswith("%}")
-        content = val[2:-2]
-        vals = list(i.strip() for i in content.split(','))
-        return map(parse_value, vals)
-    return val
-
-
-def process_repeats(sec_iter):
-
-    for sec in sec_iter:
-        if '*' in sec.name:
-            msg = "Only one '*' allowed in section name"
-            assert sec.name.count('*') == 1, msg
-
-            name, count = sec.name.split("*")
-            sec.name = name.strip()
-            count = count.strip()
-
-            try:
-                count = int(count.strip().format(**sec.format_params))
-            except KeyError:
-                raise ValueError("No parameter {0} given".format(count[1:-1]))
-            except ValueError:
-                msg = "Parameter {0} nas non-int value {1!r}"
-                raise ValueError(msg.format(count[1:-1],
-                                 count.format(**sec.format_params)))
-
-            yield sec.copy()
-
-            if 'ramp_time' in sec.vals:
-                sec = sec.copy()
-                sec.vals['_ramp_time'] = sec.vals.pop('ramp_time')
-
-            for _ in range(count - 1):
-                yield sec.copy()
-        else:
-            yield sec
-
-
-def process_cycles(sec_iter):
-    # insert parametrized cycles
-    sec_iter = try_format_params_into_section(sec_iter)
-
-    for sec in sec_iter:
-
-        cycles_var_names = []
-        cycles_var_values = []
-
-        for name, val in sec.vals.items():
-            if isinstance(val, (list, tuple)):
-                cycles_var_names.append(name)
-                cycles_var_values.append(val)
-
-        if len(cycles_var_names) == 0:
-            yield sec
-        else:
-            for combination in itertools.product(*cycles_var_values):
-                new_sec = sec.copy()
-                new_sec.vals.update(zip(cycles_var_names, combination))
-                yield new_sec
-
-
-def try_format_params_into_section(sec_iter):
-    for sec in sec_iter:
-        params = sec.format_params
-        for name, val in sec.vals.items():
-            if isinstance(val, basestring):
-                try:
-                    sec.vals[name] = parse_value(val.format(**params))
-                except:
-                    pass
-
-        yield sec
-
-
-def format_params_into_section_finall(sec_iter, counter=[0]):
-    group_report_err_msg = "Group reporting should be set if numjobs != 1"
-
-    for sec in sec_iter:
-
-        num_jobs = int(sec.vals.get('numjobs', '1'))
-        if num_jobs != 1:
-            assert 'group_reporting' in sec.vals, group_report_err_msg
-
-        assert sec.vals.get('unified_rw_reporting', '1') in (1, '1')
-        sec.vals['unified_rw_reporting'] = '1'
-
-        params = sec.format_params.copy()
-
-        fsize = to_bytes(sec.vals['size'])
-        params['PER_TH_OFFSET'] = fsize // num_jobs
-
-        for name, val in sec.vals.items():
-            if isinstance(val, basestring):
-                sec.vals[name] = parse_value(val.format(**params))
-            else:
-                assert isinstance(val, (int, float))
-
-        params['UNIQ'] = 'UN{0}'.format(counter[0])
-        params['COUNTER'] = str(counter[0])
-        counter[0] += 1
-        params['TEST_SUMM'] = get_test_summary(sec.vals,
-                                               params.get('VM_COUNT', 1))
-        params.update(sec.vals)
-        sec.name = sec.name.format(**params)
-
-        yield sec
-
-
-def fio_config_to_str(sec_iter):
-    res = ""
-
-    for pos, sec in enumerate(sec_iter):
-        if pos != 0:
-            res += "\n"
-
-        res += "[{0}]\n".format(sec.name)
-
-        for name, val in sec.vals.items():
-            if name.startswith('_'):
-                continue
-            res += "{0}={1}\n".format(name, val)
-
-    return res
-
-
-def get_test_sync_mode(config):
-    try:
-        return config['sync_mode']
-    except KeyError:
-        pass
-
-    is_sync = str(config.get("sync", "0")) == "1"
-    is_direct = str(config.get("direct", "0")) == "1"
-
-    if is_sync and is_direct:
-        return 'x'
-    elif is_sync:
-        return 's'
-    elif is_direct:
-        return 'd'
-    else:
-        return 'a'
-
-
-def get_test_summary(params, testnodes_count):
-    rw = {"randread": "rr",
-          "randwrite": "rw",
-          "read": "sr",
-          "write": "sw"}[params["rw"]]
-
-    sync_mode = get_test_sync_mode(params)
-    th_count = params.get('numjobs')
-
-    if th_count is None:
-        th_count = params.get('concurence', 1)
-
-    return "{0}{1}{2}th{3}vm{4}".format(rw,
-                                        sync_mode,
-                                        params['blocksize'],
-                                        th_count,
-                                        testnodes_count)
-
-
-def calculate_execution_time(sec_iter):
-    time = 0
-    for sec in sec_iter:
-        time += sec.vals.get('ramp_time', 0)
-        time += sec.vals.get('runtime', 0)
-    return time
-
-
-def slice_config(sec_iter, runcycle=None, max_jobs=1000,
-                 soft_runcycle=None, split_on_names=False):
-    jcount = 0
-    runtime = 0
-    curr_slice = []
-    prev_name = None
-
-    for pos, sec in enumerate(sec_iter):
-
-        if prev_name is not None:
-            split_here = False
-
-            if soft_runcycle is not None and prev_name != sec.name:
-                split_here = (runtime > soft_runcycle)
-
-            if split_on_names and prev_name != sec.name:
-                split_here = True
-
-            if split_here:
-                yield curr_slice
-                curr_slice = []
-                runtime = 0
-                jcount = 0
-
-        prev_name = sec.name
-
-        jc = sec.vals.get('numjobs', 1)
-        msg = "numjobs should be integer, not {0!r}".format(jc)
-        assert isinstance(jc, int), msg
-
-        curr_task_time = calculate_execution_time([sec])
-
-        if jc > max_jobs:
-            err_templ = "Can't process job {0!r} - too large numjobs"
-            raise ValueError(err_templ.format(sec.name))
-
-        if runcycle is not None and len(curr_slice) != 0:
-            rc_ok = curr_task_time + runtime <= runcycle
-        else:
-            rc_ok = True
-
-        if jc + jcount <= max_jobs and rc_ok:
-            runtime += curr_task_time
-            jcount += jc
-            curr_slice.append(sec)
-            continue
-
-        assert len(curr_slice) != 0
-        yield curr_slice
-
-        if '_ramp_time' in sec.vals:
-            sec.vals['ramp_time'] = sec.vals.pop('_ramp_time')
-            curr_task_time = calculate_execution_time([sec])
-
-        runtime = curr_task_time
-        jcount = jc
-        curr_slice = [sec]
-        prev_name = None
-
-    if curr_slice != []:
-        yield curr_slice
-
-
-def parse_all_in_1(source, test_params):
-    lexer_it = fio_config_lexer(source)
-    sec_it = fio_config_parse(lexer_it, test_params)
-    sec_it = process_cycles(sec_it)
-    sec_it = process_repeats(sec_it)
-    return format_params_into_section_finall(sec_it)
-
-
-def parse_and_slice_all_in_1(source, test_params, **slice_params):
-    sec_it = parse_all_in_1(source, test_params)
-    return slice_config(sec_it, **slice_params)
-
-
-def compile_all_in_1(source, test_params, **slice_params):
-    slices_it = parse_and_slice_all_in_1(source, test_params, **slice_params)
-    for slices in slices_it:
-        yield fio_config_to_str(slices)
-
-
-def do_run_fio(config_slice):
-    benchmark_config = fio_config_to_str(config_slice)
-    cmd = ["fio", "--output-format=json", "--alloc-size=262144", "-"]
-    p = subprocess.Popen(cmd,
-                         stdin=subprocess.PIPE,
-                         stdout=subprocess.PIPE,
-                         stderr=subprocess.PIPE)
-
-    start_time = time.time()
-    # set timeout
-    raw_out, raw_err = p.communicate(benchmark_config)
-    end_time = time.time()
-
-    if 0 != p.returncode:
-        msg = "Fio failed with code: {0}\nOutput={1}"
-        raise OSError(msg.format(p.returncode, raw_err))
-
-    # HACK
-    raw_out = "{" + raw_out.split('{', 1)[1]
-
-    try:
-        parsed_out = json.loads(raw_out)["jobs"]
-    except KeyError:
-        msg = "Can't parse fio output {0!r}: no 'jobs' found"
-        raw_out = raw_out[:100]
-        raise ValueError(msg.format(raw_out))
-
-    except Exception as exc:
-        msg = "Can't parse fio output: {0!r}\nError: {1!s}"
-        raw_out = raw_out[:100]
-        raise ValueError(msg.format(raw_out, exc))
-
-    return zip(parsed_out, config_slice), (start_time, end_time)
-
-
-class FioResult(object):
-    def __init__(self, name, params, run_interval, results):
-        self.params = params.copy()
-        self.name = name
-        self.run_interval = run_interval
-        self.results = results
-
-    def json_obj(self):
-        return self.__dict__
-
-
-def make_job_results(section, job_output, slice_timings):
-    # merge by section.merge_id
-
-    raw_result = job_output['mixed']
-
-    res = {
-        "bw": raw_result["bw"],
-        "iops": raw_result["iops"],
-        "lat": raw_result["lat"]["mean"],
-        "clat": raw_result["clat"]["mean"],
-        "slat": raw_result["slat"]["mean"]
-    }
-
-    vls = section.vals.copy()
-
-    vls['sync_mode'] = get_test_sync_mode(vls)
-    vls['concurence'] = vls.get('numjobs', 1)
-
-    return FioResult(section.name, vls, slice_timings, res)
-
-
-def get_slice_parts_offset(test_slice, real_inteval):
-    calc_exec_time = calculate_execution_time(test_slice)
-    coef = (real_inteval[1] - real_inteval[0]) / calc_exec_time
-    curr_offset = real_inteval[0]
-    for section in test_slice:
-        slen = calculate_execution_time([section]) * coef
-        yield (curr_offset, curr_offset + slen)
-        curr_offset += slen
-
-
-def run_fio(sliced_it, raw_results_func=None):
-    sliced_list = list(sliced_it)
-
-    curr_test_num = 0
-    executed_tests = 0
-    result = []
-
-    for i, test_slice in enumerate(sliced_list):
-        test_slice = list(test_slice)
-
-        res_cfg_it, slice_timings = do_run_fio(test_slice)
-        sec_intervals = get_slice_parts_offset(test_slice,
-                                               slice_timings)
-        res_cfg_it = enumerate(zip(res_cfg_it, sec_intervals),
-                               curr_test_num)
-
-        section_names = []
-        for curr_test_num, ((job_output, section), interval) in res_cfg_it:
-            executed_tests += 1
-            section_names.append(section.name)
-
-            if raw_results_func is not None:
-                raw_results_func(executed_tests,
-                                 [job_output, section])
-
-            msg = "{0} != {1}".format(section.name, job_output["jobname"])
-            assert section.name == job_output["jobname"], msg
-
-            result.append(make_job_results(section, job_output, interval))
-
-        curr_test_num += 1
-        msg_template = "Done {0} tests from {1}. ETA: {2}"
-
-        rest = sliced_list[i:]
-        time_eta = sum(map(calculate_execution_time, rest))
-        test_left = sum(map(len, rest))
-        print msg_template.format(curr_test_num,
-                                  test_left,
-                                  sec_to_str(time_eta))
-
-    return result
-
-
-def run_benchmark(binary_tp, *argv, **kwargs):
-    if 'fio' == binary_tp:
-        return run_fio(*argv, **kwargs)
-    raise ValueError("Unknown behcnmark {0}".format(binary_tp))
-
-
-def read_config(fd, timeout=10):
-    job_cfg = ""
-    etime = time.time() + timeout
-    while True:
-        wtime = etime - time.time()
-        if wtime <= 0:
-            raise IOError("No config provided")
-
-        r, w, x = select.select([fd], [], [], wtime)
-        if len(r) == 0:
-            raise IOError("No config provided")
-
-        char = fd.read(1)
-        if '' == char:
-            return job_cfg
-
-        job_cfg += char
-
-
-def sec_to_str(seconds):
-    h = seconds // 3600
-    m = (seconds % 3600) // 60
-    s = seconds % 60
-    return "{0}:{1:02d}:{2:02d}".format(h, m, s)
-
-
-def parse_args(argv):
-    parser = argparse.ArgumentParser(
-        description="Run fio' and return result")
-    parser.add_argument("--type", metavar="BINARY_TYPE",
-                        choices=['fio'], default='fio',
-                        help=argparse.SUPPRESS)
-    parser.add_argument("--start-at", metavar="START_AT_UTC", type=int,
-                        help="Start execution at START_AT_UTC")
-    parser.add_argument("--json", action="store_true", default=False,
-                        help="Json output format")
-    parser.add_argument("-o", "--output", default='-', metavar="FILE_PATH",
-                        help="Store results to FILE_PATH")
-    parser.add_argument("--estimate", action="store_true", default=False,
-                        help="Only estimate task execution time")
-    parser.add_argument("--compile", action="store_true", default=False,
-                        help="Compile config file to fio config")
-    parser.add_argument("--num-tests", action="store_true", default=False,
-                        help="Show total number of tests")
-    parser.add_argument("--runcycle", type=int, default=None,
-                        metavar="MAX_CYCLE_SECONDS",
-                        help="Max cycle length in seconds")
-    parser.add_argument("--show-raw-results", action='store_true',
-                        default=False, help="Output raw input and results")
-    parser.add_argument("--params", nargs="*", metavar="PARAM=VAL",
-                        default=[],
-                        help="Provide set of pairs PARAM=VAL to" +
-                             "format into job description")
-    parser.add_argument("-p", "--pid-file", metavar="FILE_TO_STORE_PID",
-                        default=None, help="Store pid to FILE_TO_STORE_PID " +
-                        "and remove this file on exit")
-    parser.add_argument("jobfile")
-    return parser.parse_args(argv)
-
-
-def main(argv):
-    argv_obj = parse_args(argv)
-
-    if argv_obj.jobfile == '-':
-        job_cfg = read_config(sys.stdin)
-    else:
-        job_cfg = open(argv_obj.jobfile).read()
-
-    if argv_obj.output == '-':
-        out_fd = sys.stdout
-    else:
-        out_fd = open(argv_obj.output, "w")
-
-    if argv_obj.pid_file is not None:
-        with open(argv_obj.pid_file, "w") as fd:
-            fd.write(str(os.getpid()))
-
-    try:
-        params = {}
-        for param_val in argv_obj.params:
-            assert '=' in param_val
-            name, val = param_val.split("=", 1)
-            params[name] = val
-
-        slice_params = {
-            'runcycle': argv_obj.runcycle,
-        }
-
-        sliced_it = parse_and_slice_all_in_1(job_cfg, params, **slice_params)
-
-        if argv_obj.estimate:
-            it = map(calculate_execution_time, sliced_it)
-            print sec_to_str(sum(it))
-            return 0
-
-        if argv_obj.num_tests or argv_obj.compile:
-            if argv_obj.compile:
-                for test_slice in sliced_it:
-                    out_fd.write(fio_config_to_str(test_slice))
-                    out_fd.write("\n#" + "-" * 70 + "\n\n")
-
-            if argv_obj.num_tests:
-                print len(list(sliced_it))
-
-            return 0
-
-        if argv_obj.start_at is not None:
-            ctime = time.time()
-            if argv_obj.start_at >= ctime:
-                time.sleep(ctime - argv_obj.start_at)
-
-        def raw_res_func(test_num, data):
-            pref = "========= RAW_RESULTS({0}) =========\n".format(test_num)
-            out_fd.write(pref)
-            out_fd.write(json.dumps(data))
-            out_fd.write("\n========= END OF RAW_RESULTS =========\n")
-            out_fd.flush()
-
-        rrfunc = raw_res_func if argv_obj.show_raw_results else None
-
-        job_res = run_benchmark(argv_obj.type,
-                                sliced_it, rrfunc)
-
-        res = {'__meta__': {'params': params,
-                            'testnodes_count': int(params.get('VM_COUNT', 1))},
-               'res': [j.json_obj() for j in job_res]}
-
-        oformat = 'json' if argv_obj.json else 'eval'
-
-        msg = "========= RESULTS(format={0}) =========\n"
-        out_fd.write(msg.format(oformat))
-        if argv_obj.json:
-            out_fd.write(json.dumps(res))
-        else:
-            out_fd.write(pprint.pformat(res) + "\n")
-        out_fd.write("\n========= END OF RESULTS =========\n")
-
-        return 0
-    except:
-        out_fd.write("============ ERROR =============\n")
-        out_fd.write(traceback.format_exc() + "\n")
-        out_fd.write("============ END OF ERROR =============\n")
-        return 1
-    finally:
-        try:
-            if out_fd is not sys.stdout:
-                out_fd.flush()
-                os.fsync(out_fd)
-                out_fd.close()
-        except Exception:
-            traceback.print_exc()
-
-        if argv_obj.pid_file is not None:
-            if os.path.exists(argv_obj.pid_file):
-                os.unlink(argv_obj.pid_file)
-
-
-def fake_main(x):
-    import yaml
-    time.sleep(60)
-    out_fd = sys.stdout
-    fname = "/tmp/perf_tests/metempirical_alisha/raw_results.yaml"
-    res = yaml.load(open(fname).read())[0][1]
-    out_fd.write("========= RESULTS(format=json) =========\n")
-    out_fd.write(json.dumps(res))
-    out_fd.write("\n========= END OF RESULTS =========\n")
-    return 0
-
-
-if __name__ == '__main__':
-    # exit(fake_main(sys.argv[1:]))
-    exit(main(sys.argv[1:]))
diff --git a/wally/suits/io/ceph.cfg b/wally/suits/io/ceph.cfg
index f38b37c..26aa65f 100644
--- a/wally/suits/io/ceph.cfg
+++ b/wally/suits/io/ceph.cfg
@@ -1,28 +1,19 @@
-[defaults]
-wait_for_previous=1
-group_reporting=1
-time_based=1
-buffered=0
-iodepth=1
-softrandommap=1
-filename={FILENAME}
-NUM_ROUNDS=7
-thread=1
-
-# this is critical for correct results in multy-node run
-randrepeat=0
+[global]
+include defaults.cfg
 
 NUMJOBS={% 1, 5, 10, 15, 40 %}
 NUMJOBS_SHORT={% 1, 2, 3, 10 %}
+TEST_FILE_SIZE=100G
 
-size=100G
+size={TEST_FILE_SIZE}
 ramp_time=15
 runtime=60
+NUM_ROUNDS=7
 
 # ---------------------------------------------------------------------
 # check different thread count, sync mode. (latency, iops) = func(th_count)
 # ---------------------------------------------------------------------
-[ceph_test_{TEST_SUMM} * {NUM_ROUNDS}]
+[ceph_test_{TEST_SUMM}]
 blocksize=4k
 rw=randwrite
 sync=1
@@ -31,7 +22,7 @@
 # ---------------------------------------------------------------------
 # direct write
 # ---------------------------------------------------------------------
-[ceph_test_{TEST_SUMM} * {NUM_ROUNDS}]
+[ceph_test_{TEST_SUMM}]
 blocksize=4k
 rw=randwrite
 direct=1
@@ -41,7 +32,7 @@
 # check different thread count, direct read mode. (latency, iops) = func(th_count)
 # also check iops for randread
 # ---------------------------------------------------------------------
-[ceph_test_{TEST_SUMM} * {NUM_ROUNDS}]
+[ceph_test_{TEST_SUMM}]
 blocksize=4k
 rw=randread
 direct=1
@@ -51,7 +42,7 @@
 # this is essentially sequential write/read operations
 # we can't use sequential with numjobs > 1 due to caching and block merging
 # ---------------------------------------------------------------------
-[ceph_test_{TEST_SUMM} * {NUM_ROUNDS}]
+[ceph_test_{TEST_SUMM}]
 blocksize=16m
 rw={% randread, randwrite %}
 direct=1
diff --git a/wally/suits/io/check_distribution.cfg b/wally/suits/io/check_distribution.cfg
index e7cafd9..4746f37 100644
--- a/wally/suits/io/check_distribution.cfg
+++ b/wally/suits/io/check_distribution.cfg
@@ -1,19 +1,13 @@
-[defaults]
+[global]
+include defaults.cfg
 NUM_ROUNDS=301
 
-# this is critical for correct results in multy-node run
-randrepeat=0
-
-[distrubution_test_{TEST_SUMM} * {NUM_ROUNDS}]
+[distrubution_test_{TEST_SUMM}]
 blocksize=4k
 rw=randwrite
 direct=1
+
 ramp_time=5
-buffered=0
-wait_for_previous
-filename={FILENAME}
-iodepth=1
-size=10G
-time_based
 runtime=30
-group_reporting
+
+size=10G
diff --git a/wally/suits/io/check_linearity.cfg b/wally/suits/io/check_linearity.cfg
index 670e8b3..f7c37fb 100644
--- a/wally/suits/io/check_linearity.cfg
+++ b/wally/suits/io/check_linearity.cfg
@@ -1,33 +1,26 @@
-[defaults]
+[global]
+
+include defaults.cfg
 NUM_ROUNDS=7
 
-# this is critical for correct results in multy-node run
-randrepeat=0
-
-ramp_time=5
-buffered=0
-wait_for_previous
-filename={FILENAME}
-iodepth=1
-size=10G
-time_based
+size={TEST_FILE_SIZE}
 ramp_time=5
 runtime=30
 
 # ---------------------------------------------------------------------
 # check read and write linearity. oper_time = func(size)
 # ---------------------------------------------------------------------
-[linearity_test_{TEST_SUMM} * {NUM_ROUNDS}]
-blocksize={% 512, 1k, 4k, 8k, 16k, 32k, 64k, 128k, 256k, 512k, 1m, 2m, 4m %}
-rw={% randwrite, randread %}
-direct=1
+# [linearity_test_{TEST_SUMM}]
+# blocksize={BLOCK_SIZES}
+# rw={% randwrite, randread %}
+# direct=1
 
 # ---------------------------------------------------------------------
 # check sync write linearity. oper_time = func(size)
 # check sync BW as well
 # ---------------------------------------------------------------------
-[linearity_test_{TEST_SUMM} * {NUM_ROUNDS}]
-blocksize={% 512, 1k, 4k, 8k, 16k, 32k, 64k, 128k, 256k, 512k, 1m, 2m, 4m %}
+[linearity_test_{TEST_SUMM}]
+blocksize={BLOCK_SIZES}
 rw=randwrite
 sync=1
 
diff --git a/wally/suits/io/defaults.cfg b/wally/suits/io/defaults.cfg
new file mode 100644
index 0000000..51a8145
--- /dev/null
+++ b/wally/suits/io/defaults.cfg
@@ -0,0 +1,14 @@
+buffered=0
+group_reporting=1
+iodepth=1
+softrandommap=1
+thread=1
+time_based=1
+wait_for_previous=1
+
+# this is critical for correct results in multy-node run
+randrepeat=0
+
+filename={FILENAME}
+
+
diff --git a/wally/suits/io/fio_task_parser.py b/wally/suits/io/fio_task_parser.py
new file mode 100644
index 0000000..52c4bb3
--- /dev/null
+++ b/wally/suits/io/fio_task_parser.py
@@ -0,0 +1,458 @@
+import os
+import sys
+import copy
+import os.path
+import argparse
+import itertools
+from collections import OrderedDict, namedtuple
+
+
+from wally.utils import sec_to_str
+
+
+SECTION = 0
+SETTING = 1
+INCLUDE = 2
+
+
+Var = namedtuple('Var', ('name',))
+CfgLine = namedtuple('CfgLine', ('fname', 'lineno', 'oline',
+                                 'tp', 'name', 'val'))
+
+
+class FioJobSection(object):
+    def __init__(self, name):
+        self.name = name
+        self.vals = OrderedDict()
+
+    def copy(self):
+        return copy.deepcopy(self)
+
+    def required_vars(self):
+        for name, val in self.vals.items():
+            if isinstance(val, Var):
+                yield name, val
+
+    def is_free(self):
+        return len(list(self.required_vars())) == 0
+
+    def __str__(self):
+        res = "[{0}]\n".format(self.name)
+
+        for name, val in self.vals.items():
+            if name.startswith('_') or name == name.upper():
+                continue
+            if isinstance(val, Var):
+                res += "{0}={{{1}}}\n".format(name, val.name)
+            else:
+                res += "{0}={1}\n".format(name, val)
+
+        return res
+
+
+def to_bytes(sz):
+    sz = sz.lower()
+    try:
+        return int(sz)
+    except ValueError:
+        if sz[-1] == 'm':
+            return (1024 ** 2) * int(sz[:-1])
+        if sz[-1] == 'k':
+            return 1024 * int(sz[:-1])
+        if sz[-1] == 'g':
+            return (1024 ** 3) * int(sz[:-1])
+        raise
+
+
+class ParseError(ValueError):
+    def __init__(self, msg, fname, lineno, line_cont=""):
+        ValueError.__init__(self, msg)
+        self.file_name = fname
+        self.lineno = lineno
+        self.line_cont = line_cont
+
+    def __str__(self):
+        msg = "In {0}:{1} ({2}) : {3}"
+        return msg.format(self.file_name,
+                          self.lineno,
+                          self.line_cont,
+                          super(ParseError, self).__str__())
+
+
+def is_name(name):
+    if len(name) == 0:
+        return False
+
+    if name[0] != '_' and not name[0].isalpha():
+        return False
+
+    for ch in name[1:]:
+        if name[0] != '_' and not name[0].isalnum():
+            return False
+
+    return True
+
+
+def parse_value(val):
+    try:
+        return int(val)
+    except ValueError:
+        pass
+
+    try:
+        return float(val)
+    except ValueError:
+        pass
+
+    if val.startswith('{%'):
+        assert val.endswith("%}")
+        content = val[2:-2]
+        vals = list(i.strip() for i in content.split(','))
+        return map(parse_value, vals)
+
+    if val.startswith('{'):
+        assert val.endswith("}")
+        assert is_name(val[1:-1])
+        return Var(val[1:-1])
+    return val
+
+
+def fio_config_lexer(fio_cfg, fname):
+    for lineno, oline in enumerate(fio_cfg.split("\n")):
+        try:
+            line = oline.strip()
+
+            if line.startswith("#") or line.startswith(";"):
+                continue
+
+            if line == "":
+                continue
+
+            if '#' in line:
+                raise ParseError("# isn't allowed inside line",
+                                 fname, lineno, oline)
+
+            if line.startswith('['):
+                yield CfgLine(fname, lineno, oline, SECTION,
+                              line[1:-1].strip(), None)
+            elif '=' in line:
+                opt_name, opt_val = line.split('=', 1)
+                yield CfgLine(fname, lineno, oline, SETTING,
+                              opt_name.strip(),
+                              parse_value(opt_val.strip()))
+            elif line.startswith("include "):
+                yield CfgLine(fname, lineno, oline, INCLUDE,
+                              line.split(" ", 1)[1], None)
+            else:
+                yield CfgLine(fname, lineno, oline, SETTING, line, '1')
+
+        except Exception as exc:
+            raise ParseError(str(exc), fname, lineno, oline)
+
+
+def fio_config_parse(lexer_iter):
+    in_globals = False
+    curr_section = None
+    glob_vals = OrderedDict()
+    sections_count = 0
+
+    lexed_lines = list(lexer_iter)
+    one_more = True
+    includes = {}
+
+    while one_more:
+        new_lines = []
+        one_more = False
+        for line in lexed_lines:
+            fname, lineno, oline, tp, name, val = line
+
+            if INCLUDE == tp:
+                if not os.path.exists(fname):
+                    dirname = '.'
+                else:
+                    dirname = os.path.dirname(fname)
+
+                new_fname = os.path.join(dirname, name)
+                includes[new_fname] = (fname, lineno)
+
+                try:
+                    cont = open(new_fname).read()
+                except IOError as err:
+                    msg = "Error while including file {0}: {1}"
+                    raise ParseError(msg.format(new_fname, err),
+                                     fname, lineno, oline)
+
+                new_lines.extend(fio_config_lexer(cont, new_fname))
+                one_more = True
+            else:
+                new_lines.append(line)
+
+        lexed_lines = new_lines
+
+    for fname, lineno, oline, tp, name, val in lexed_lines:
+        if tp == SECTION:
+            if curr_section is not None:
+                yield curr_section
+                curr_section = None
+
+            if name == 'global':
+                if sections_count != 0:
+                    raise ParseError("[global] section should" +
+                                     " be only one and first",
+                                     fname, lineno, oline)
+                in_globals = True
+            else:
+                in_globals = False
+                curr_section = FioJobSection(name)
+                curr_section.vals = glob_vals.copy()
+            sections_count += 1
+        else:
+            assert tp == SETTING
+            if in_globals:
+                glob_vals[name] = val
+            elif name == name.upper():
+                raise ParseError("Param '" + name +
+                                 "' not in [global] section",
+                                 fname, lineno, oline)
+            elif curr_section is None:
+                    raise ParseError("Data outside section",
+                                     fname, lineno, oline)
+            else:
+                curr_section.vals[name] = val
+
+    if curr_section is not None:
+        yield curr_section
+
+
+def process_repeats(sec):
+    sec = sec.copy()
+    count = sec.vals.pop('NUM_ROUNDS', 1)
+    assert isinstance(count, (int, long))
+
+    for _ in range(count):
+        yield sec.copy()
+
+        if 'ramp_time' in sec.vals:
+            sec.vals['_ramp_time'] = sec.vals.pop('ramp_time')
+
+
+def process_cycles(sec):
+    cycles = OrderedDict()
+
+    for name, val in sec.vals.items():
+        if isinstance(val, list) and name.upper() != name:
+            cycles[name] = val
+
+    if len(cycles) == 0:
+        yield sec
+    else:
+        for combination in itertools.product(*cycles.values()):
+            new_sec = sec.copy()
+            new_sec.vals.update(zip(cycles.keys(), combination))
+            yield new_sec
+
+
+def apply_params(sec, params):
+    processed_vals = OrderedDict()
+    processed_vals.update(params)
+    for name, val in sec.vals.items():
+        if name in params:
+            continue
+
+        if isinstance(val, Var):
+            if val.name in params:
+                val = params[val.name]
+            elif val.name in processed_vals:
+                val = processed_vals[val.name]
+        processed_vals[name] = val
+    sec = sec.copy()
+    sec.vals = processed_vals
+    return sec
+
+
+def finall_process(sec, counter=[0]):
+    sec = sec.copy()
+
+    if sec.vals.get('numjobs', '1') != 1:
+        msg = "Group reporting should be set if numjobs != 1"
+        assert 'group_reporting' in sec.vals, msg
+
+    sec.vals['unified_rw_reporting'] = '1'
+
+    params = sec.vals.copy()
+    params['UNIQ'] = 'UN{0}'.format(counter[0])
+    params['COUNTER'] = str(counter[0])
+    params['TEST_SUMM'] = get_test_summary(sec)
+    sec.name = sec.name.format(**params)
+    counter[0] += 1
+
+    return sec
+
+
+def get_test_sync_mode(sec):
+    is_sync = str(sec.vals.get("sync", "0")) == "1"
+    is_direct = str(sec.vals.get("direct", "0")) == "1"
+
+    if is_sync and is_direct:
+        return 'x'
+    elif is_sync:
+        return 's'
+    elif is_direct:
+        return 'd'
+    else:
+        return 'a'
+
+
+def get_test_summary(sec):
+    rw = {"randread": "rr",
+          "randwrite": "rw",
+          "read": "sr",
+          "write": "sw"}[sec.vals["rw"]]
+
+    sync_mode = get_test_sync_mode(sec)
+    th_count = sec.vals.get('numjobs')
+
+    if th_count is None:
+        th_count = sec.vals.get('concurence', 1)
+
+    return "{0}{1}{2}th{3}".format(rw,
+                                   sync_mode,
+                                   sec.vals['blocksize'],
+                                   th_count)
+
+
+def execution_time(sec):
+    return sec.vals.get('ramp_time', 0) + sec.vals.get('runtime', 0)
+
+
+def slice_config(sec_iter, runcycle=None, max_jobs=1000, split_on_names=False):
+    jcount = 0
+    runtime = 0
+    curr_slice = []
+    prev_name = None
+
+    for pos, sec in enumerate(sec_iter):
+
+        if prev_name is not None:
+            split_here = False
+
+            if split_on_names and prev_name != sec.name:
+                split_here = True
+
+            if split_here:
+                yield curr_slice
+                curr_slice = []
+                runtime = 0
+                jcount = 0
+
+        prev_name = sec.name
+
+        jc = sec.vals.get('numjobs', 1)
+        msg = "numjobs should be integer, not {0!r}".format(jc)
+        assert isinstance(jc, int), msg
+
+        curr_task_time = execution_time(sec)
+
+        if jc > max_jobs:
+            err_templ = "Can't process job {0!r} - too large numjobs"
+            raise ValueError(err_templ.format(sec.name))
+
+        if runcycle is not None and len(curr_slice) != 0:
+            rc_ok = curr_task_time + runtime <= runcycle
+        else:
+            rc_ok = True
+
+        if jc + jcount <= max_jobs and rc_ok:
+            runtime += curr_task_time
+            jcount += jc
+            curr_slice.append(sec)
+            continue
+
+        assert len(curr_slice) != 0
+        yield curr_slice
+
+        if '_ramp_time' in sec.vals:
+            sec.vals['ramp_time'] = sec.vals.pop('_ramp_time')
+            curr_task_time = execution_time(sec)
+
+        runtime = curr_task_time
+        jcount = jc
+        curr_slice = [sec]
+        prev_name = None
+
+    if curr_slice != []:
+        yield curr_slice
+
+
+def parse_all_in_1(source, fname=None):
+    return fio_config_parse(fio_config_lexer(source, fname))
+
+
+def flatmap(func, inp_iter):
+    for val in inp_iter:
+        for res in func(val):
+            yield res
+
+
+def fio_cfg_compile(source, fname, test_params, **slice_params):
+    it = parse_all_in_1(source, fname)
+    it = (apply_params(sec, test_params) for sec in it)
+    it = flatmap(process_cycles, it)
+    it = flatmap(process_repeats, it)
+    it = itertools.imap(finall_process, it)
+    return slice_config(it, **slice_params)
+
+
+def parse_args(argv):
+    parser = argparse.ArgumentParser(
+        description="Run fio' and return result")
+    parser.add_argument("--runcycle", type=int, default=None,
+                        metavar="MAX_CYCLE_SECONDS",
+                        help="Max cycle length in seconds")
+    parser.add_argument("-p", "--params", nargs="*", metavar="PARAM=VAL",
+                        default=[],
+                        help="Provide set of pairs PARAM=VAL to" +
+                             "format into job description")
+    parser.add_argument("action", choices=['estimate', 'compile', 'num_tests'])
+    parser.add_argument("jobfile")
+    return parser.parse_args(argv)
+
+
+def main(argv):
+    argv_obj = parse_args(argv)
+
+    if argv_obj.jobfile == '-':
+        job_cfg = sys.stdin.read()
+    else:
+        job_cfg = open(argv_obj.jobfile).read()
+
+    params = {}
+    for param_val in argv_obj.params:
+        assert '=' in param_val
+        name, val = param_val.split("=", 1)
+        params[name] = parse_value(val)
+
+    slice_params = {
+        'runcycle': argv_obj.runcycle,
+    }
+
+    sliced_it = fio_cfg_compile(job_cfg, argv_obj.jobfile,
+                                params, **slice_params)
+
+    if argv_obj.action == 'estimate':
+        sum_time = 0
+        for cfg_slice in sliced_it:
+            sum_time += sum(map(execution_time, cfg_slice))
+        print sec_to_str(sum_time)
+    elif argv_obj.action == 'num_tests':
+        print sum(map(len, map(list, sliced_it)))
+    elif argv_obj.action == 'compile':
+        splitter = "\n#" + "-" * 70 + "\n\n"
+        for cfg_slice in sliced_it:
+            print splitter.join(map(str, cfg_slice))
+
+    return 0
+
+
+if __name__ == '__main__':
+    exit(main(sys.argv[1:]))
diff --git a/wally/suits/io/formatter.py b/wally/suits/io/formatter.py
index 22c090f..84b0a13 100644
--- a/wally/suits/io/formatter.py
+++ b/wally/suits/io/formatter.py
@@ -2,15 +2,21 @@
 
 from wally.utils import ssize2b
 from wally.statistic import round_3_digit
-from wally.suits.io.agent import get_test_summary
+from .fio_task_parser import get_test_summary, get_test_sync_mode
 
 
 def key_func(data):
-    p = data.params
+    p = data.params.vals
+
+    th_count = data.params.vals.get('numjobs')
+
+    if th_count is None:
+        th_count = data.params.vals.get('concurence', 1)
+
     return (p['rw'],
-            p['sync_mode'],
+            get_test_sync_mode(data.params),
             ssize2b(p['blocksize']),
-            int(p['concurence']) * data.testnodes_count,
+            int(th_count) * data.testnodes_count,
             data.name)
 
 
@@ -41,8 +47,7 @@
 
         prev_k = curr_k
 
-        descr = get_test_summary(data.params, data.testnodes_count)
-        test_dinfo = dinfo[data.name]
+        test_dinfo = dinfo[(data.name, data.summary)]
 
         iops, _ = test_dinfo.iops.rounded_average_conf()
 
@@ -61,7 +66,7 @@
         bw = round_3_digit(bw)
 
         params = (data.name.rsplit('_', 1)[0],
-                  descr, int(iops), int(bw), str(conf_perc),
+                  data.summary, int(iops), int(bw), str(conf_perc),
                   str(dev_perc),
                   int(iops_per_vm), int(bw_per_vm), lat)
         tab.add_row(params)
diff --git a/wally/suits/io/hdd.cfg b/wally/suits/io/hdd.cfg
index 17d0509..21166e5 100644
--- a/wally/suits/io/hdd.cfg
+++ b/wally/suits/io/hdd.cfg
@@ -1,16 +1,11 @@
-[defaults]
-wait_for_previous
-group_reporting
-time_based
-buffered=0
-iodepth=1
-softrandommap=1
-filename={FILENAME}
-NUM_ROUNDS=7
-NUMJOBS={% 1, 5, 10, 15, 20, 30, 40, 80 %}
+[global]
+include defaults.cfg
 
-# this is critical for correct results in multy-node run
-randrepeat=0
+NUM_ROUNDS=3
+
+# NUMJOBS={% 1, 5, 10, 15, 20, 30, 40, 80 %}
+
+NUMJOBS={% 1, 5, 10 %}
 
 size=10G
 ramp_time=5
@@ -19,7 +14,7 @@
 # ---------------------------------------------------------------------
 # check different thread count, sync mode. (latency, iops) = func(th_count)
 # ---------------------------------------------------------------------
-[hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
+[hdd_test_{TEST_SUMM}]
 blocksize=4k
 rw=randwrite
 sync=1
@@ -29,7 +24,7 @@
 # check different thread count, direct read mode. (latency, iops) = func(th_count)
 # also check iops for randread
 # ---------------------------------------------------------------------
-[hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
+[hdd_test_{TEST_SUMM}]
 blocksize=4k
 rw=randread
 direct=1
@@ -38,7 +33,7 @@
 # ---------------------------------------------------------------------
 # check IOPS randwrite.
 # ---------------------------------------------------------------------
-[hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
+[hdd_test_{TEST_SUMM}]
 blocksize=4k
 rw=randwrite
 direct=1
@@ -47,7 +42,7 @@
 # No reason for th count > 1 in case of sequantial operations
 # They became random
 # ---------------------------------------------------------------------
-[hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
+[hdd_test_{TEST_SUMM}]
 blocksize=1m
 rw={% read, write %}
 direct=1
diff --git a/wally/suits/io/lat_vs_iops.cfg b/wally/suits/io/lat_vs_iops.cfg
index a587a96..dbafcbb 100644
--- a/wally/suits/io/lat_vs_iops.cfg
+++ b/wally/suits/io/lat_vs_iops.cfg
@@ -1,29 +1,40 @@
-[defaults]
-wait_for_previous=1
-filename={FILENAME}
+[global]
+include defaults.cfg
 
-# this is critical for correct results in multy-node run
-randrepeat=0
-
-iodepth=1
-size=100G
-group_reporting=1
-
-IOPS_LIMIT={% 100, 500 %}
+TEST_FILE_SIZE=100G
+size={TEST_FILE_SIZE}
 
 ramp_time=5
 runtime=30
-time_based=1
 
-buffered=0
-NUMJOBS=1
+blocksize=4k
+rw=randwrite
+sync=1
 
 # ---------------------------------------------------------------------
 # latency as function from IOPS
 # ---------------------------------------------------------------------
-[lat_vs_iops{rate_iops}_{TEST_SUMM} * {NUM_ROUNDS}]
-blocksize=4k
-rw=randread
-direct=1
-numjobs={NUMJOBS}
-rate_iops={IOPS_LIMIT}
+[lat_vs_iops{rate_iops}_{TEST_SUMM}]
+numjobs=1
+rate_iops={% 20, 40, 60, 80, 100, 120, 160, 200, 250, 300 %}
+
+# ---------------------------------------------------------------------
+# latency as function from IOPS
+# ---------------------------------------------------------------------
+[lat_vs_iops{rate_iops}_{TEST_SUMM}]
+numjobs=3
+rate_iops={% 10, 20, 40, 60, 80, 100, 120, 160 %}
+
+# ---------------------------------------------------------------------
+# latency as function from IOPS
+# ---------------------------------------------------------------------
+[lat_vs_iops{rate_iops}_{TEST_SUMM}]
+numjobs=7
+rate_iops={% 5, 10, 20, 40, 50, 60, 70 %}
+
+# ---------------------------------------------------------------------
+# latency as function from IOPS
+# ---------------------------------------------------------------------
+[lat_vs_iops{rate_iops}_{TEST_SUMM}]
+numjobs=10
+rate_iops={% 5, 10, 20, 40, 50 %}
diff --git a/wally/suits/io/results_loader.py b/wally/suits/io/results_loader.py
deleted file mode 100644
index 988fe0e..0000000
--- a/wally/suits/io/results_loader.py
+++ /dev/null
@@ -1,56 +0,0 @@
-import re
-import json
-
-
-def parse_output(out_err):
-    err_start_patt = r"(?ims)=+\s+ERROR\s+=+"
-    err_end_patt = r"(?ims)=+\s+END OF ERROR\s+=+"
-
-    for block in re.split(err_start_patt, out_err)[1:]:
-        tb, garbage = re.split(err_end_patt, block)
-        msg = "Test fails with error:\n" + tb.strip() + "\n"
-        raise OSError(msg)
-
-    start_patt = r"(?ims)=+\s+RESULTS\(format=json\)\s+=+"
-    end_patt = r"(?ims)=+\s+END OF RESULTS\s+=+"
-
-    for block in re.split(start_patt, out_err)[1:]:
-        data, garbage = re.split(end_patt, block)
-        yield json.loads(data.strip())
-
-    start_patt = r"(?ims)=+\s+RESULTS\(format=eval\)\s+=+"
-    end_patt = r"(?ims)=+\s+END OF RESULTS\s+=+"
-
-    for block in re.split(start_patt, out_err)[1:]:
-        data, garbage = re.split(end_patt, block)
-        yield eval(data.strip())
-
-
-def filter_data(name_prefix, fields_to_select, **filters):
-    def closure(data):
-        for result in data:
-            if name_prefix is not None:
-                if not result['jobname'].startswith(name_prefix):
-                    continue
-
-            for k, v in filters.items():
-                if result.get(k) != v:
-                    break
-            else:
-                yield map(result.get, fields_to_select)
-    return closure
-
-
-def filter_processed_data(name_prefix, fields_to_select, **filters):
-    def closure(data):
-        for name, result in data.items():
-            if name_prefix is not None:
-                if not name.startswith(name_prefix):
-                    continue
-
-            for k, v in filters.items():
-                if result.raw.get(k) != v:
-                    break
-            else:
-                yield map(result.raw.get, fields_to_select)
-    return closure
diff --git a/wally/suits/io/verify.cfg b/wally/suits/io/verify.cfg
index 58b8450..9ebfad1 100644
--- a/wally/suits/io/verify.cfg
+++ b/wally/suits/io/verify.cfg
@@ -1,28 +1,18 @@
-[defaults]
-wait_for_previous
-group_reporting
-time_based
-buffered=0
-iodepth=1
-softrandommap=1
-filename={FILENAME}
-NUM_ROUNDS=1
-
-# this is critical for correct results in multy-node run
-randrepeat=0
-
+[global]
+include defaults.cfg
 size=50G
 ramp_time=5
 runtime=60
+NUM_ROUNDS=2
 
 # ---------------------------------------------------------------------
-[verify_{TEST_SUMM} * {NUM_ROUNDS}]
-blocksize=4k
-rw=randwrite
-direct=1
+#[verify_{TEST_SUMM}]
+#blocksize=4k
+#rw=randwrite
+#direct=1
 
 # ---------------------------------------------------------------------
-[verify_{TEST_SUMM} * {NUM_ROUNDS}]
+[verify_{TEST_SUMM}]
 blocksize=4k
 rw=randread
 direct=1
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index 36d3fcf..09e93f0 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -1,35 +1,47 @@
 import abc
-import time
-import socket
-import random
 import os.path
-import logging
-import datetime
-
-from paramiko import SSHException, SFTPError
-import texttable
-
-from wally.utils import (ssize2b, open_for_append_or_create,
-                         sec_to_str, StopTestError)
-
-from wally.ssh_utils import (copy_paths, run_over_ssh,
-                             save_to_remote,
-                             # delete_file,
-                             connect, read_from_remote, Local,
-                             exists)
-
-from . import postgres
-from . import mysql
-from .io import agent as io_agent
-from .io import formatter as io_formatter
-from .io.results_loader import parse_output
 
 
-logger = logging.getLogger("wally")
+from wally.ssh_utils import run_over_ssh, copy_paths
+
+
+class TestResults(object):
+    def __init__(self, config, params, results,
+                 raw_result, run_interval, vm_count):
+        self.config = config
+        self.params = params
+        self.results = results
+        self.raw_result = raw_result
+        self.run_interval = run_interval
+        self.vm_count = vm_count
+
+    def __str__(self):
+        res = "{0}({1}):\n    results:\n".format(
+                    self.__class__.__name__,
+                    self.summary())
+
+        for name, val in self.results.items():
+            res += "        {0}={1}\n".format(name, val)
+
+        res += "    params:\n"
+
+        for name, val in self.params.items():
+            res += "        {0}={1}\n".format(name, val)
+
+        return res
+
+    @abc.abstractmethod
+    def summary(self):
+        pass
+
+    @abc.abstractmethod
+    def get_yamable(self):
+        pass
 
 
 class IPerfTest(object):
     def __init__(self, options, is_primary, on_result_cb, test_uuid, node,
+                 total_nodes_count,
                  log_directory=None,
                  coordination_queue=None,
                  remote_dir="/tmp/wally"):
@@ -42,6 +54,7 @@
         self.remote_dir = remote_dir
         self.is_primary = is_primary
         self.stop_requested = False
+        self.total_nodes_count = total_nodes_count
 
     def request_stop(self):
         self.stop_requested = True
@@ -59,6 +72,11 @@
     def cleanup(self):
         pass
 
+    @classmethod
+    @abc.abstractmethod
+    def load(cls, data):
+        pass
+
     @abc.abstractmethod
     def run(self, barrier):
         pass
@@ -118,470 +136,3 @@
     def merge_results(self, results):
         tpcm = sum([val[1] for val in results])
         return {"res": {"TpmC": tpcm}}
-
-
-class PgBenchTest(TwoScriptTest):
-    root = os.path.dirname(postgres.__file__)
-    pre_run_script = os.path.join(root, "prepare.sh")
-    run_script = os.path.join(root, "run.sh")
-
-    @classmethod
-    def format_for_console(cls, data):
-        tab = texttable.Texttable(max_width=120)
-        tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
-        tab.header(["TpmC"])
-        tab.add_row([data['res']['TpmC']])
-        return tab.draw()
-
-
-class MysqlTest(TwoScriptTest):
-    root = os.path.dirname(mysql.__file__)
-    pre_run_script = os.path.join(root, "prepare.sh")
-    run_script = os.path.join(root, "run.sh")
-
-    @classmethod
-    def format_for_console(cls, data):
-        tab = texttable.Texttable(max_width=120)
-        tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
-        tab.header(["TpmC"])
-        tab.add_row([data['res']['TpmC']])
-        return tab.draw()
-
-
-class IOPerfTest(IPerfTest):
-    tcp_conn_timeout = 30
-    max_pig_timeout = 5
-    soft_runcycle = 5 * 60
-
-    def __init__(self, *dt, **mp):
-        IPerfTest.__init__(self, *dt, **mp)
-        self.config_fname = self.options['cfg']
-
-        if '/' not in self.config_fname and '.' not in self.config_fname:
-            cfgs_dir = os.path.dirname(io_agent.__file__)
-            self.config_fname = os.path.join(cfgs_dir,
-                                             self.config_fname + '.cfg')
-
-        self.alive_check_interval = self.options.get('alive_check_interval')
-
-        self.config_params = {}
-        for name, val in self.options.get('params', {}).items():
-            if isinstance(val, (list, tuple)):
-                val = "{%" + ','.join(map(str, val)) + "%}"
-            self.config_params[name] = val
-
-        self.config_params['VM_COUNT'] = self.options['testnodes_count']
-        self.tool = self.options.get('tool', 'fio')
-        self.raw_cfg = open(self.config_fname).read()
-        self.configs = list(io_agent.parse_all_in_1(self.raw_cfg,
-                                                    self.config_params))
-
-        cmd_log = os.path.join(self.log_directory, "task_compiled.cfg")
-        raw_res = os.path.join(self.log_directory, "raw_results.txt")
-
-        self.io_py_remote = self.join_remote("agent.py")
-        self.log_fl = self.join_remote("log.txt")
-        self.pid_file = self.join_remote("pid")
-        self.task_file = self.join_remote("task.cfg")
-        self.use_sudo = self.options.get("use_sudo", True)
-        self.test_logging = self.options.get("test_logging", False)
-
-        fio_command_file = open_for_append_or_create(cmd_log)
-
-        if self.test_logging:
-            soft_runcycle = self.soft_runcycle
-        else:
-            soft_runcycle = None
-
-        self.fio_configs = io_agent.parse_and_slice_all_in_1(
-                        self.raw_cfg,
-                        self.config_params,
-                        soft_runcycle=soft_runcycle,
-                        split_on_names=self.test_logging)
-
-        self.fio_configs = list(self.fio_configs)
-        splitter = "\n\n" + "-" * 60 + "\n\n"
-
-        cfg = splitter.join(
-                map(io_agent.fio_config_to_str,
-                    self.fio_configs))
-
-        fio_command_file.write(cfg)
-        self.fio_raw_results_file = open_for_append_or_create(raw_res)
-
-    def __str__(self):
-        return "{0}({1})".format(self.__class__.__name__,
-                                 self.node.get_conn_id())
-
-    def cleanup(self):
-        # delete_file(conn, self.io_py_remote)
-        # Need to remove tempo files, used for testing
-        pass
-
-    def prefill_test_files(self):
-        files = {}
-
-        for section in self.configs:
-            sz = ssize2b(section.vals['size'])
-            msz = sz / (1024 ** 2)
-
-            if sz % (1024 ** 2) != 0:
-                msz += 1
-
-            fname = section.vals['filename']
-
-            # if already has other test with the same file name
-            # take largest size
-            files[fname] = max(files.get(fname, 0), msz)
-
-        cmd_templ = "dd oflag=direct " + \
-                    "if=/dev/zero of={0} bs={1} count={2}"
-
-        # cmd_templ = "fio --rw=write --bs={1} --direct=1 --size={2} "
-
-        if self.use_sudo:
-            cmd_templ = "sudo " + cmd_templ
-
-        ssize = 0
-        stime = time.time()
-
-        for fname, curr_sz in files.items():
-            cmd = cmd_templ.format(fname, 1024 ** 2, curr_sz)
-            ssize += curr_sz
-            self.run_over_ssh(cmd, timeout=curr_sz)
-
-        ddtime = time.time() - stime
-        if ddtime > 1E-3:
-            fill_bw = int(ssize / ddtime)
-            mess = "Initiall dd fill bw is {0} MiBps for this vm"
-            logger.info(mess.format(fill_bw))
-            self.coordinate(('init_bw', fill_bw))
-
-    def install_utils(self, max_retry=3, timeout=5):
-        need_install = []
-        for bin_name, package in (('fio', 'fio'), ('screen', 'screen')):
-            try:
-                self.run_over_ssh('which ' + bin_name, nolog=True)
-            except OSError:
-                need_install.append(package)
-
-        if len(need_install) == 0:
-            return
-
-        cmd = "sudo apt-get -y install " + " ".join(need_install)
-
-        for i in range(max_retry):
-            try:
-                self.run_over_ssh(cmd)
-                break
-            except OSError as err:
-                time.sleep(timeout)
-        else:
-            raise OSError("Can't install - " + str(err))
-
-    def pre_run(self):
-        try:
-            cmd = 'mkdir -p "{0}"'.format(self.remote_dir)
-            if self.use_sudo:
-                cmd = "sudo " + cmd
-                cmd += " ; sudo chown {0} {1}".format(self.node.get_user(),
-                                                      self.remote_dir)
-
-            self.run_over_ssh(cmd)
-        except Exception as exc:
-            msg = "Failed to create folder {0} on remote {1}. Error: {2!s}"
-            msg = msg.format(self.remote_dir, self.node.get_conn_id(), exc)
-            logger.exception(msg)
-            raise StopTestError(msg, exc)
-
-        self.install_utils()
-
-        local_fname = os.path.splitext(io_agent.__file__)[0] + ".py"
-        files_to_copy = {local_fname: self.io_py_remote}
-        copy_paths(self.node.connection, files_to_copy)
-
-        if self.options.get('prefill_files', True):
-            self.prefill_test_files()
-        elif self.is_primary:
-            logger.warning("Prefilling of test files is disabled")
-
-    def check_process_is_running(self, sftp, pid):
-        try:
-            sftp.stat("/proc/{0}".format(pid))
-            return True
-        except (OSError, IOError, NameError):
-            return False
-
-    def kill_remote_process(self, conn, pid, soft=True):
-        try:
-            if soft:
-                cmd = "kill {0}"
-            else:
-                cmd = "kill -9 {0}"
-
-            if self.use_sudo:
-                cmd = "sudo " + cmd
-
-            self.run_over_ssh(cmd.format(pid))
-            return True
-        except OSError:
-            return False
-
-    def get_test_status(self, res_file=None):
-        found_res_file = False
-        is_connected = None
-        is_running = None
-        pid = None
-        err = None
-
-        try:
-            # conn = connect(self.node.conn_url,
-            #                conn_timeout=self.tcp_conn_timeout)
-            # with conn:
-            conn = self.node.connection
-            with conn.open_sftp() as sftp:
-                try:
-                    pid = read_from_remote(sftp, self.pid_file)
-                    is_running = True
-                except (NameError, IOError, OSError) as exc:
-                    pid = None
-                    is_running = False
-
-                if is_running:
-                    if not self.check_process_is_running(sftp, pid):
-                        try:
-                            sftp.remove(self.pid_file)
-                        except (IOError, NameError, OSError):
-                            pass
-                        is_running = False
-
-                if res_file is not None:
-                    found_res_file = exists(sftp, res_file)
-
-            is_connected = True
-
-        except (socket.error, SSHException, EOFError, SFTPError) as exc:
-            err = str(exc)
-            is_connected = False
-
-        return found_res_file, is_connected, is_running, pid, err
-
-    def wait_till_finished(self, soft_timeout, timeout, res_fname=None):
-        conn_id = self.node.get_conn_id()
-        end_of_wait_time = timeout + time.time()
-        soft_end_of_wait_time = soft_timeout + time.time()
-
-        time_till_check = random.randint(5, 10)
-        pid = None
-        is_running = False
-        pid_get_timeout = self.max_pig_timeout + time.time()
-        curr_connected = True
-
-        while end_of_wait_time > time.time():
-            time.sleep(time_till_check)
-
-            found_res_file, is_connected, is_running, npid, err = \
-                self.get_test_status(res_fname)
-
-            if found_res_file and not is_running:
-                return
-
-            if is_connected and not is_running:
-                if pid is None:
-                    if time.time() > pid_get_timeout:
-                        msg = ("On node {0} pid file doesn't " +
-                               "appears in time")
-                        logger.error(msg.format(conn_id))
-                        raise StopTestError("Start timeout")
-                else:
-                    # execution finished
-                    break
-
-            if npid is not None:
-                pid = npid
-
-            if is_connected and pid is not None and is_running:
-                if time.time() < soft_end_of_wait_time:
-                    time.sleep(soft_end_of_wait_time - time.time())
-
-            if is_connected and not curr_connected:
-                msg = "Connection with {0} is restored"
-                logger.debug(msg.format(conn_id))
-            elif not is_connected and curr_connected:
-                msg = "Lost connection with " + conn_id + ". Error: " + err
-                logger.debug(msg)
-
-            curr_connected = is_connected
-
-    def run(self, barrier):
-        try:
-            if len(self.fio_configs) > 1 and self.is_primary:
-
-                exec_time = 0
-                for test in self.fio_configs:
-                    exec_time += io_agent.calculate_execution_time(test)
-
-                # +5% - is a rough estimation for additional operations
-                # like sftp, etc
-                exec_time = int(exec_time * 1.05)
-
-                exec_time_s = sec_to_str(exec_time)
-                now_dt = datetime.datetime.now()
-                end_dt = now_dt + datetime.timedelta(0, exec_time)
-                msg = "Entire test should takes aroud: {0} and finished at {1}"
-                logger.info(msg.format(exec_time_s,
-                                       end_dt.strftime("%H:%M:%S")))
-
-            for pos, fio_cfg_slice in enumerate(self.fio_configs):
-                names = [i.name for i in fio_cfg_slice]
-                msgs = []
-                already_processed = set()
-                for name in names:
-                    if name not in already_processed:
-                        already_processed.add(name)
-
-                        if 1 == names.count(name):
-                            msgs.append(name)
-                        else:
-                            frmt = "{0} * {1}"
-                            msgs.append(frmt.format(name,
-                                                    names.count(name)))
-
-                if self.is_primary:
-                    logger.info("Will run tests: " + ", ".join(msgs))
-
-                nolog = (pos != 0) or not self.is_primary
-                out_err = self.do_run(barrier, fio_cfg_slice, nolog=nolog)
-
-                try:
-                    for data in parse_output(out_err):
-                        self.on_result_cb(data)
-                except (OSError, StopTestError):
-                    raise
-                except Exception as exc:
-                    msg_templ = "Error during postprocessing results: {0!s}"
-                    raise RuntimeError(msg_templ.format(exc))
-
-        finally:
-            barrier.exit()
-
-    def do_run(self, barrier, cfg, nolog=False):
-        conn_id = self.node.get_conn_id()
-
-        cmd_templ = "screen -S {screen_name} -d -m " + \
-                    "env python2 {0} -p {pid_file} -o {results_file} " + \
-                    "--type {1} {2} --json {3}"
-
-        if self.options.get("use_sudo", True):
-            cmd_templ = "sudo " + cmd_templ
-
-        params = []
-        for k, v in self.config_params.items():
-            if isinstance(v, basestring) and v.startswith("{%"):
-                continue
-            params.append("{0}={1}".format(k, v))
-
-        if [] != params:
-            params = "--params " + " ".join(params)
-
-        with self.node.connection.open_sftp() as sftp:
-            save_to_remote(sftp, self.task_file,
-                           io_agent.fio_config_to_str(cfg))
-
-        screen_name = self.test_uuid
-        cmd = cmd_templ.format(self.io_py_remote,
-                               self.tool,
-                               params,
-                               self.task_file,
-                               pid_file=self.pid_file,
-                               results_file=self.log_fl,
-                               screen_name=screen_name)
-
-        exec_time = io_agent.calculate_execution_time(cfg)
-        exec_time_str = sec_to_str(exec_time)
-
-        timeout = int(exec_time + max(300, exec_time))
-        soft_tout = exec_time
-        barrier.wait()
-        self.run_over_ssh(cmd, nolog=nolog)
-        if self.is_primary:
-            templ = "Test should takes about {0}." + \
-                    " Should finish at {1}," + \
-                    " will wait at most till {2}"
-            now_dt = datetime.datetime.now()
-            end_dt = now_dt + datetime.timedelta(0, exec_time)
-            wait_till = now_dt + datetime.timedelta(0, timeout)
-
-            logger.info(templ.format(exec_time_str,
-                                     end_dt.strftime("%H:%M:%S"),
-                                     wait_till.strftime("%H:%M:%S")))
-
-            if not nolog:
-                msg = "Tests started in screen {1} on each testnode"
-                logger.debug(msg.format(conn_id, screen_name))
-
-        # TODO: add monitoring socket
-        # if not isinstance(self.node.connection, Local):
-        #     self.node.connection.close()
-
-        self.wait_till_finished(soft_tout, timeout, self.log_fl)
-        if not nolog:
-            logger.debug("Test on node {0} is finished".format(conn_id))
-
-        # if self.node.connection is not Local:
-        #     conn_timeout = self.tcp_conn_timeout * 3
-        #     self.node.connection = connect(self.node.conn_url,
-        #                                    conn_timeout=conn_timeout)
-
-        with self.node.connection.open_sftp() as sftp:
-            return read_from_remote(sftp, self.log_fl)
-
-    @classmethod
-    def merge_results(cls, results):
-        merged = results[0]
-        for block in results[1:]:
-            assert block["__meta__"] == merged["__meta__"]
-            merged['res'].extend(block['res'])
-        return merged
-
-    # @classmethod
-    # def merge_results(cls, results):
-    #     if len(results) == 0:
-    #         return None
-
-    #     merged_result = results[0]
-    #     merged_data = merged_result['res']
-    #     mergable_fields = ['bw', 'clat', 'iops', 'lat', 'slat']
-
-    #     for res in results[1:]:
-    #         mm = merged_result['__meta__']
-    #         assert mm['raw_cfg'] == res['__meta__']['raw_cfg']
-    #         assert mm['params'] == res['__meta__']['params']
-    #         mm['timings'].extend(res['__meta__']['timings'])
-
-    #         data = res['res']
-    #         for testname, test_data in data.items():
-    #             if testname not in merged_data:
-    #                 merged_data[testname] = test_data
-    #                 continue
-
-    #             res_test_data = merged_data[testname]
-
-    #             diff = set(test_data.keys()).symmetric_difference(
-    #                         res_test_data.keys())
-
-    #             msg = "Difference: {0}".format(",".join(diff))
-    #             assert len(diff) == 0, msg
-
-    #             for k, v in test_data.items():
-    #                 if k in mergable_fields:
-    #                     res_test_data[k].extend(v)
-    #                 else:
-    #                     msg = "{0!r} != {1!r}".format(res_test_data[k], v)
-    #                     assert res_test_data[k] == v, msg
-
-    #     return merged_result
-
-    @classmethod
-    def format_for_console(cls, data, dinfo):
-        return io_formatter.format_results_for_console(dinfo)
diff --git a/wally/suits/mysql/__init__.py b/wally/suits/mysql/__init__.py
index e69de29..6c3a982 100644
--- a/wally/suits/mysql/__init__.py
+++ b/wally/suits/mysql/__init__.py
@@ -0,0 +1,19 @@
+import os.path
+
+import texttable
+
+from ..itest import TwoScriptTest
+
+
+class MysqlTest(TwoScriptTest):
+    root = os.path.dirname(__file__)
+    pre_run_script = os.path.join(root, "prepare.sh")
+    run_script = os.path.join(root, "run.sh")
+
+    @classmethod
+    def format_for_console(cls, data):
+        tab = texttable.Texttable(max_width=120)
+        tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
+        tab.header(["TpmC"])
+        tab.add_row([data['res']['TpmC']])
+        return tab.draw()
diff --git a/wally/suits/postgres/__init__.py b/wally/suits/postgres/__init__.py
index e69de29..06fdd21 100644
--- a/wally/suits/postgres/__init__.py
+++ b/wally/suits/postgres/__init__.py
@@ -0,0 +1,21 @@
+import os.path
+
+
+import texttable
+
+
+from ..itest import TwoScriptTest
+
+
+class PgBenchTest(TwoScriptTest):
+    root = os.path.dirname(__file__)
+    pre_run_script = os.path.join(root, "prepare.sh")
+    run_script = os.path.join(root, "run.sh")
+
+    @classmethod
+    def format_for_console(cls, data):
+        tab = texttable.Texttable(max_width=120)
+        tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
+        tab.header(["TpmC"])
+        tab.add_row([data['res']['TpmC']])
+        return tab.draw()
diff --git a/wally/utils.py b/wally/utils.py
index 1fa74c5..cdee319 100644
--- a/wally/utils.py
+++ b/wally/utils.py
@@ -1,5 +1,6 @@
 import re
 import os
+import sys
 import time
 import psutil
 import socket
@@ -31,6 +32,41 @@
         self.orig_exc = orig_exc
 
 
+@contextlib.contextmanager
+def log_block(message, exc_logger=None):
+    logger.debug("Starts : " + message)
+    with log_error(message, exc_logger):
+        yield
+    # try:
+    #     yield
+    # except Exception as exc:
+    #     if isinstance(exc, types) and not isinstance(exc, StopIteration):
+    #         templ = "Error during {0} stage: {1!s}"
+    #         logger.debug(templ.format(action, exc))
+    #     raise
+
+
+class log_error(object):
+    def __init__(self, message, exc_logger=None):
+        self.message = message
+        self.exc_logger = exc_logger
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, tp, value, traceback):
+        if value is None or isinstance(value, StopTestError):
+            return
+
+        if self.exc_logger is None:
+            exc_logger = sys._getframe(1).f_globals.get('logger', logger)
+        else:
+            exc_logger = self.exc_logger
+
+        exc_logger.exception(self.message, exc_info=(tp, value, traceback))
+        raise StopTestError(self.message, value)
+
+
 def check_input_param(is_ok, message):
     if not is_ok:
         logger.error(message)
@@ -79,22 +115,6 @@
             self.exited = True
 
 
-@contextlib.contextmanager
-def log_error(action, types=(Exception,)):
-    if not action.startswith("!"):
-        logger.debug("Starts : " + action)
-    else:
-        action = action[1:]
-
-    try:
-        yield
-    except Exception as exc:
-        if isinstance(exc, types) and not isinstance(exc, StopIteration):
-            templ = "Error during {0} stage: {1!s}"
-            logger.debug(templ.format(action, exc))
-        raise
-
-
 SMAP = dict(k=1024, m=1024 ** 2, g=1024 ** 3, t=1024 ** 4)
 
 
@@ -259,3 +279,13 @@
 def iter_clean_func():
     while CLEANING != []:
         yield CLEANING.pop()
+
+
+def flatten(data):
+    res = []
+    for i in data:
+        if isinstance(i, (list, tuple, set)):
+            res.extend(flatten(i))
+        else:
+            res.append(i)
+    return res