a lot of chenges
diff --git a/wally/suits/io/__init__.py b/wally/suits/io/__init__.py
index e69de29..4828850 100644
--- a/wally/suits/io/__init__.py
+++ b/wally/suits/io/__init__.py
@@ -0,0 +1,330 @@
+import time
+import json
+import os.path
+import logging
+import datetime
+
+from wally.utils import (ssize2b, open_for_append_or_create,
+                         sec_to_str, StopTestError)
+
+from wally.ssh_utils import save_to_remote, read_from_remote, BGSSHTask
+
+from ..itest import IPerfTest, TestResults
+from .formatter import format_results_for_console
+from .fio_task_parser import (execution_time, fio_cfg_compile,
+                              get_test_summary, FioJobSection)
+
+
+logger = logging.getLogger("wally")
+
+
+class IOTestResults(TestResults):
+    def summary(self):
+        return get_test_summary(self.config) + "vm" + str(self.vm_count)
+
+    def get_yamable(self):
+        return {
+            'type': "fio_test",
+            'params': self.params,
+            'config': (self.config.name, self.config.vals),
+            'results': self.results,
+            'raw_result': self.raw_result,
+            'run_interval': self.run_interval,
+            'vm_count': self.vm_count
+        }
+
+    @classmethod
+    def from_yaml(cls, data):
+        name, vals = data['config']
+        sec = FioJobSection(name)
+        sec.vals = vals
+
+        return cls(sec, data['params'], data['results'],
+                   data['raw_result'], data['run_interval'],
+                   data['vm_count'])
+
+
+def get_slice_parts_offset(test_slice, real_inteval):
+    calc_exec_time = sum(map(execution_time, test_slice))
+    coef = (real_inteval[1] - real_inteval[0]) / calc_exec_time
+    curr_offset = real_inteval[0]
+    for section in test_slice:
+        slen = execution_time(section) * coef
+        yield (curr_offset, curr_offset + slen)
+        curr_offset += slen
+
+
+class IOPerfTest(IPerfTest):
+    tcp_conn_timeout = 30
+    max_pig_timeout = 5
+    soft_runcycle = 5 * 60
+
+    def __init__(self, *dt, **mp):
+        IPerfTest.__init__(self, *dt, **mp)
+        self.config_fname = self.options['cfg']
+
+        if '/' not in self.config_fname and '.' not in self.config_fname:
+            cfgs_dir = os.path.dirname(__file__)
+            self.config_fname = os.path.join(cfgs_dir,
+                                             self.config_fname + '.cfg')
+
+        self.alive_check_interval = self.options.get('alive_check_interval')
+
+        self.config_params = self.options.get('params', {}).copy()
+        self.tool = self.options.get('tool', 'fio')
+
+        raw_res = os.path.join(self.log_directory, "raw_results.txt")
+        self.fio_raw_results_file = open_for_append_or_create(raw_res)
+
+        self.io_py_remote = self.join_remote("agent.py")
+        self.results_file = self.join_remote("results.json")
+        self.pid_file = self.join_remote("pid")
+        self.task_file = self.join_remote("task.cfg")
+        self.use_sudo = self.options.get("use_sudo", True)
+        self.test_logging = self.options.get("test_logging", False)
+        self.raw_cfg = open(self.config_fname).read()
+        self.fio_configs = fio_cfg_compile(self.raw_cfg,
+                                           self.config_fname,
+                                           self.config_params,
+                                           split_on_names=self.test_logging)
+        self.fio_configs = list(self.fio_configs)
+
+        cmd_log = os.path.join(self.log_directory, "task_compiled.cfg")
+        fio_command_file = open_for_append_or_create(cmd_log)
+        splitter = "\n\n" + "-" * 60 + "\n\n"
+        fio_command_file.write(splitter.join(map(str, self.fio_configs)))
+
+    def __str__(self):
+        return "{0}({1})".format(self.__class__.__name__,
+                                 self.node.get_conn_id())
+
+    @classmethod
+    def load(cls, data):
+        return IOTestResults.from_yaml(data)
+
+    def cleanup(self):
+        # delete_file(conn, self.io_py_remote)
+        # Need to remove tempo files, used for testing
+        pass
+
+    def prefill_test_files(self):
+        files = {}
+        for cfg_slice in self.fio_configs:
+            for section in cfg_slice:
+                sz = ssize2b(section.vals['size'])
+                msz = sz / (1024 ** 2)
+
+                if sz % (1024 ** 2) != 0:
+                    msz += 1
+
+                fname = section.vals['filename']
+
+                # if already has other test with the same file name
+                # take largest size
+                files[fname] = max(files.get(fname, 0), msz)
+
+        cmd_templ = "dd oflag=direct " + \
+                    "if=/dev/zero of={0} bs={1} count={2}"
+
+        if self.use_sudo:
+            cmd_templ = "sudo " + cmd_templ
+
+        ssize = 0
+        stime = time.time()
+
+        for fname, curr_sz in files.items():
+            cmd = cmd_templ.format(fname, 1024 ** 2, curr_sz)
+            ssize += curr_sz
+            self.run_over_ssh(cmd, timeout=curr_sz)
+
+        ddtime = time.time() - stime
+        if ddtime > 1E-3:
+            fill_bw = int(ssize / ddtime)
+            mess = "Initiall dd fill bw is {0} MiBps for this vm"
+            logger.info(mess.format(fill_bw))
+            self.coordinate(('init_bw', fill_bw))
+
+    def install_utils(self, max_retry=3, timeout=5):
+        need_install = []
+        for bin_name, package in (('fio', 'fio'), ('screen', 'screen')):
+            try:
+                self.run_over_ssh('which ' + bin_name, nolog=True)
+            except OSError:
+                need_install.append(package)
+
+        if len(need_install) == 0:
+            return
+
+        cmd = "sudo apt-get -y install " + " ".join(need_install)
+
+        for i in range(max_retry):
+            try:
+                self.run_over_ssh(cmd)
+                break
+            except OSError as err:
+                time.sleep(timeout)
+        else:
+            raise OSError("Can't install - " + str(err))
+
+    def pre_run(self):
+        try:
+            cmd = 'mkdir -p "{0}"'.format(self.remote_dir)
+            if self.use_sudo:
+                cmd = "sudo " + cmd
+                cmd += " ; sudo chown {0} {1}".format(self.node.get_user(),
+                                                      self.remote_dir)
+
+            self.run_over_ssh(cmd)
+        except Exception as exc:
+            msg = "Failed to create folder {0} on remote {1}. Error: {2!s}"
+            msg = msg.format(self.remote_dir, self.node.get_conn_id(), exc)
+            logger.exception(msg)
+            raise StopTestError(msg, exc)
+
+        self.install_utils()
+
+        if self.options.get('prefill_files', True):
+            self.prefill_test_files()
+        elif self.is_primary:
+            logger.warning("Prefilling of test files is disabled")
+
+    def run(self, barrier):
+        try:
+            if len(self.fio_configs) > 1 and self.is_primary:
+
+                exec_time = 0
+                for test_slice in self.fio_configs:
+                    exec_time += sum(map(execution_time, test_slice))
+
+                # +10% - is a rough estimation for additional operations
+                # like sftp, etc
+                exec_time = int(exec_time * 1.1)
+
+                exec_time_s = sec_to_str(exec_time)
+                now_dt = datetime.datetime.now()
+                end_dt = now_dt + datetime.timedelta(0, exec_time)
+                msg = "Entire test should takes aroud: {0} and finished at {1}"
+                logger.info(msg.format(exec_time_s,
+                                       end_dt.strftime("%H:%M:%S")))
+
+            for pos, fio_cfg_slice in enumerate(self.fio_configs):
+                fio_cfg_slice = list(fio_cfg_slice)
+                names = [i.name for i in fio_cfg_slice]
+                msgs = []
+                already_processed = set()
+                for name in names:
+                    if name not in already_processed:
+                        already_processed.add(name)
+
+                        if 1 == names.count(name):
+                            msgs.append(name)
+                        else:
+                            frmt = "{0} * {1}"
+                            msgs.append(frmt.format(name,
+                                                    names.count(name)))
+
+                if self.is_primary:
+                    logger.info("Will run tests: " + ", ".join(msgs))
+
+                nolog = (pos != 0) or not self.is_primary
+                out_err, interval = self.do_run(barrier, fio_cfg_slice,
+                                                nolog=nolog)
+
+                try:
+                    full_raw_res = json.loads(out_err)
+
+                    res = {"bw": [], "iops": [], "lat": [],
+                           "clat": [], "slat": []}
+
+                    for raw_result in full_raw_res['jobs']:
+                        load_data = raw_result['mixed']
+
+                        res["bw"].append(load_data["bw"])
+                        res["iops"].append(load_data["iops"])
+                        res["lat"].append(load_data["lat"]["mean"])
+                        res["clat"].append(load_data["clat"]["mean"])
+                        res["slat"].append(load_data["slat"]["mean"])
+
+                    first = fio_cfg_slice[0]
+                    p1 = first.vals.copy()
+                    p1.pop('ramp_time', 0)
+
+                    for nxt in fio_cfg_slice[1:]:
+                        assert nxt.name == first.name
+                        p2 = nxt.vals
+                        p2.pop('_ramp_time', 0)
+
+                        assert p1 == p2
+
+                    tres = IOTestResults(first,
+                                         self.config_params, res,
+                                         full_raw_res, interval,
+                                         vm_count=self.total_nodes_count)
+                    self.on_result_cb(tres)
+                except (OSError, StopTestError):
+                    raise
+                except Exception as exc:
+                    msg_templ = "Error during postprocessing results: {0!s}"
+                    raise RuntimeError(msg_templ.format(exc))
+
+        finally:
+            barrier.exit()
+
+    def do_run(self, barrier, cfg_slice, nolog=False):
+        # return open("/tmp/lit-sunshine/io/results.json").read(), (1, 2)
+        conn_id = self.node.get_conn_id()
+
+        cmd_templ = "fio --output-format=json --output={1} " + \
+                    "--alloc-size=262144 {0}"
+
+        if self.options.get("use_sudo", True):
+            cmd_templ = "sudo " + cmd_templ
+
+        task_fc = "\n\n".join(map(str, cfg_slice))
+        with self.node.connection.open_sftp() as sftp:
+            save_to_remote(sftp, self.task_file, task_fc)
+
+        cmd = cmd_templ.format(self.task_file, self.results_file)
+
+        exec_time = sum(map(execution_time, cfg_slice))
+        exec_time_str = sec_to_str(exec_time)
+
+        timeout = int(exec_time + max(300, exec_time))
+        soft_tout = exec_time
+        barrier.wait()
+
+        if self.is_primary:
+            templ = "Test should takes about {0}." + \
+                    " Should finish at {1}," + \
+                    " will wait at most till {2}"
+            now_dt = datetime.datetime.now()
+            end_dt = now_dt + datetime.timedelta(0, exec_time)
+            wait_till = now_dt + datetime.timedelta(0, timeout)
+
+            logger.info(templ.format(exec_time_str,
+                                     end_dt.strftime("%H:%M:%S"),
+                                     wait_till.strftime("%H:%M:%S")))
+
+        task = BGSSHTask(self.node, self.options.get("use_sudo", True))
+        begin = time.time()
+        task.start(cmd)
+        task.wait(soft_tout, timeout)
+        end = time.time()
+
+        if not nolog:
+            logger.debug("Test on node {0} is finished".format(conn_id))
+
+        with self.node.connection.open_sftp() as sftp:
+            return read_from_remote(sftp, self.results_file), (begin, end)
+
+    @classmethod
+    def merge_results(cls, results):
+        merged = results[0]
+        for block in results[1:]:
+            assert block["__meta__"] == merged["__meta__"]
+            merged['res'].extend(block['res'])
+        return merged
+
+    @classmethod
+    def format_for_console(cls, data, dinfo):
+        return format_results_for_console(dinfo)
diff --git a/wally/suits/io/agent.py b/wally/suits/io/agent.py
deleted file mode 100644
index 3c3e436..0000000
--- a/wally/suits/io/agent.py
+++ /dev/null
@@ -1,672 +0,0 @@
-import os
-import sys
-import time
-import json
-import copy
-import select
-import pprint
-import os.path
-import argparse
-import traceback
-import subprocess
-import itertools
-from collections import OrderedDict
-
-
-SECTION = 0
-SETTING = 1
-
-
-class FioJobSection(object):
-    def __init__(self, name):
-        self.name = name
-        self.vals = OrderedDict()
-        self.format_params = {}
-
-    def copy(self):
-        return copy.deepcopy(self)
-
-
-def to_bytes(sz):
-    sz = sz.lower()
-    try:
-        return int(sz)
-    except ValueError:
-        if sz[-1] == 'm':
-            return (1024 ** 2) * int(sz[:-1])
-        if sz[-1] == 'k':
-            return 1024 * int(sz[:-1])
-        if sz[-1] == 'g':
-            return (1024 ** 3) * int(sz[:-1])
-        raise
-
-
-def fio_config_lexer(fio_cfg):
-    for lineno, line in enumerate(fio_cfg.split("\n")):
-        try:
-            line = line.strip()
-
-            if line.startswith("#") or line.startswith(";"):
-                continue
-
-            if line == "":
-                continue
-
-            if line.startswith('['):
-                assert line.endswith(']'), "name should ends with ]"
-                yield lineno, SECTION, line[1:-1], None
-            elif '=' in line:
-                opt_name, opt_val = line.split('=', 1)
-                yield lineno, SETTING, opt_name.strip(), opt_val.strip()
-            else:
-                yield lineno, SETTING, line, '1'
-        except Exception as exc:
-            pref = "During parsing line number {0}\n{1!s}".format(lineno, exc)
-            raise ValueError(pref)
-
-
-def fio_config_parse(lexer_iter, format_params):
-    orig_format_params_keys = set(format_params)
-    format_params = format_params.copy()
-    in_defaults = False
-    curr_section = None
-    defaults = OrderedDict()
-
-    for lineno, tp, name, val in lexer_iter:
-        if tp == SECTION:
-            if curr_section is not None:
-                yield curr_section
-
-            if name == 'defaults':
-                in_defaults = True
-                curr_section = None
-            else:
-                in_defaults = False
-                curr_section = FioJobSection(name)
-                curr_section.format_params = format_params.copy()
-                curr_section.vals = defaults.copy()
-        else:
-            assert tp == SETTING
-            if name == name.upper():
-                msg = "Param not in default section in line " + str(lineno)
-                assert in_defaults, msg
-                if name not in orig_format_params_keys:
-                    # don't make parse_value for PARAMS
-                    # they would be parsed later
-                    # or this would breakes arrays
-                    format_params[name] = val
-            elif in_defaults:
-                defaults[name] = parse_value(val)
-            else:
-                msg = "data outside section, line " + str(lineno)
-                assert curr_section is not None, msg
-                curr_section.vals[name] = parse_value(val)
-
-    if curr_section is not None:
-        yield curr_section
-
-
-def parse_value(val):
-    try:
-        return int(val)
-    except ValueError:
-        pass
-
-    try:
-        return float(val)
-    except ValueError:
-        pass
-
-    if val.startswith('{%'):
-        assert val.endswith("%}")
-        content = val[2:-2]
-        vals = list(i.strip() for i in content.split(','))
-        return map(parse_value, vals)
-    return val
-
-
-def process_repeats(sec_iter):
-
-    for sec in sec_iter:
-        if '*' in sec.name:
-            msg = "Only one '*' allowed in section name"
-            assert sec.name.count('*') == 1, msg
-
-            name, count = sec.name.split("*")
-            sec.name = name.strip()
-            count = count.strip()
-
-            try:
-                count = int(count.strip().format(**sec.format_params))
-            except KeyError:
-                raise ValueError("No parameter {0} given".format(count[1:-1]))
-            except ValueError:
-                msg = "Parameter {0} nas non-int value {1!r}"
-                raise ValueError(msg.format(count[1:-1],
-                                 count.format(**sec.format_params)))
-
-            yield sec.copy()
-
-            if 'ramp_time' in sec.vals:
-                sec = sec.copy()
-                sec.vals['_ramp_time'] = sec.vals.pop('ramp_time')
-
-            for _ in range(count - 1):
-                yield sec.copy()
-        else:
-            yield sec
-
-
-def process_cycles(sec_iter):
-    # insert parametrized cycles
-    sec_iter = try_format_params_into_section(sec_iter)
-
-    for sec in sec_iter:
-
-        cycles_var_names = []
-        cycles_var_values = []
-
-        for name, val in sec.vals.items():
-            if isinstance(val, (list, tuple)):
-                cycles_var_names.append(name)
-                cycles_var_values.append(val)
-
-        if len(cycles_var_names) == 0:
-            yield sec
-        else:
-            for combination in itertools.product(*cycles_var_values):
-                new_sec = sec.copy()
-                new_sec.vals.update(zip(cycles_var_names, combination))
-                yield new_sec
-
-
-def try_format_params_into_section(sec_iter):
-    for sec in sec_iter:
-        params = sec.format_params
-        for name, val in sec.vals.items():
-            if isinstance(val, basestring):
-                try:
-                    sec.vals[name] = parse_value(val.format(**params))
-                except:
-                    pass
-
-        yield sec
-
-
-def format_params_into_section_finall(sec_iter, counter=[0]):
-    group_report_err_msg = "Group reporting should be set if numjobs != 1"
-
-    for sec in sec_iter:
-
-        num_jobs = int(sec.vals.get('numjobs', '1'))
-        if num_jobs != 1:
-            assert 'group_reporting' in sec.vals, group_report_err_msg
-
-        assert sec.vals.get('unified_rw_reporting', '1') in (1, '1')
-        sec.vals['unified_rw_reporting'] = '1'
-
-        params = sec.format_params.copy()
-
-        fsize = to_bytes(sec.vals['size'])
-        params['PER_TH_OFFSET'] = fsize // num_jobs
-
-        for name, val in sec.vals.items():
-            if isinstance(val, basestring):
-                sec.vals[name] = parse_value(val.format(**params))
-            else:
-                assert isinstance(val, (int, float))
-
-        params['UNIQ'] = 'UN{0}'.format(counter[0])
-        params['COUNTER'] = str(counter[0])
-        counter[0] += 1
-        params['TEST_SUMM'] = get_test_summary(sec.vals,
-                                               params.get('VM_COUNT', 1))
-        params.update(sec.vals)
-        sec.name = sec.name.format(**params)
-
-        yield sec
-
-
-def fio_config_to_str(sec_iter):
-    res = ""
-
-    for pos, sec in enumerate(sec_iter):
-        if pos != 0:
-            res += "\n"
-
-        res += "[{0}]\n".format(sec.name)
-
-        for name, val in sec.vals.items():
-            if name.startswith('_'):
-                continue
-            res += "{0}={1}\n".format(name, val)
-
-    return res
-
-
-def get_test_sync_mode(config):
-    try:
-        return config['sync_mode']
-    except KeyError:
-        pass
-
-    is_sync = str(config.get("sync", "0")) == "1"
-    is_direct = str(config.get("direct", "0")) == "1"
-
-    if is_sync and is_direct:
-        return 'x'
-    elif is_sync:
-        return 's'
-    elif is_direct:
-        return 'd'
-    else:
-        return 'a'
-
-
-def get_test_summary(params, testnodes_count):
-    rw = {"randread": "rr",
-          "randwrite": "rw",
-          "read": "sr",
-          "write": "sw"}[params["rw"]]
-
-    sync_mode = get_test_sync_mode(params)
-    th_count = params.get('numjobs')
-
-    if th_count is None:
-        th_count = params.get('concurence', 1)
-
-    return "{0}{1}{2}th{3}vm{4}".format(rw,
-                                        sync_mode,
-                                        params['blocksize'],
-                                        th_count,
-                                        testnodes_count)
-
-
-def calculate_execution_time(sec_iter):
-    time = 0
-    for sec in sec_iter:
-        time += sec.vals.get('ramp_time', 0)
-        time += sec.vals.get('runtime', 0)
-    return time
-
-
-def slice_config(sec_iter, runcycle=None, max_jobs=1000,
-                 soft_runcycle=None, split_on_names=False):
-    jcount = 0
-    runtime = 0
-    curr_slice = []
-    prev_name = None
-
-    for pos, sec in enumerate(sec_iter):
-
-        if prev_name is not None:
-            split_here = False
-
-            if soft_runcycle is not None and prev_name != sec.name:
-                split_here = (runtime > soft_runcycle)
-
-            if split_on_names and prev_name != sec.name:
-                split_here = True
-
-            if split_here:
-                yield curr_slice
-                curr_slice = []
-                runtime = 0
-                jcount = 0
-
-        prev_name = sec.name
-
-        jc = sec.vals.get('numjobs', 1)
-        msg = "numjobs should be integer, not {0!r}".format(jc)
-        assert isinstance(jc, int), msg
-
-        curr_task_time = calculate_execution_time([sec])
-
-        if jc > max_jobs:
-            err_templ = "Can't process job {0!r} - too large numjobs"
-            raise ValueError(err_templ.format(sec.name))
-
-        if runcycle is not None and len(curr_slice) != 0:
-            rc_ok = curr_task_time + runtime <= runcycle
-        else:
-            rc_ok = True
-
-        if jc + jcount <= max_jobs and rc_ok:
-            runtime += curr_task_time
-            jcount += jc
-            curr_slice.append(sec)
-            continue
-
-        assert len(curr_slice) != 0
-        yield curr_slice
-
-        if '_ramp_time' in sec.vals:
-            sec.vals['ramp_time'] = sec.vals.pop('_ramp_time')
-            curr_task_time = calculate_execution_time([sec])
-
-        runtime = curr_task_time
-        jcount = jc
-        curr_slice = [sec]
-        prev_name = None
-
-    if curr_slice != []:
-        yield curr_slice
-
-
-def parse_all_in_1(source, test_params):
-    lexer_it = fio_config_lexer(source)
-    sec_it = fio_config_parse(lexer_it, test_params)
-    sec_it = process_cycles(sec_it)
-    sec_it = process_repeats(sec_it)
-    return format_params_into_section_finall(sec_it)
-
-
-def parse_and_slice_all_in_1(source, test_params, **slice_params):
-    sec_it = parse_all_in_1(source, test_params)
-    return slice_config(sec_it, **slice_params)
-
-
-def compile_all_in_1(source, test_params, **slice_params):
-    slices_it = parse_and_slice_all_in_1(source, test_params, **slice_params)
-    for slices in slices_it:
-        yield fio_config_to_str(slices)
-
-
-def do_run_fio(config_slice):
-    benchmark_config = fio_config_to_str(config_slice)
-    cmd = ["fio", "--output-format=json", "--alloc-size=262144", "-"]
-    p = subprocess.Popen(cmd,
-                         stdin=subprocess.PIPE,
-                         stdout=subprocess.PIPE,
-                         stderr=subprocess.PIPE)
-
-    start_time = time.time()
-    # set timeout
-    raw_out, raw_err = p.communicate(benchmark_config)
-    end_time = time.time()
-
-    if 0 != p.returncode:
-        msg = "Fio failed with code: {0}\nOutput={1}"
-        raise OSError(msg.format(p.returncode, raw_err))
-
-    # HACK
-    raw_out = "{" + raw_out.split('{', 1)[1]
-
-    try:
-        parsed_out = json.loads(raw_out)["jobs"]
-    except KeyError:
-        msg = "Can't parse fio output {0!r}: no 'jobs' found"
-        raw_out = raw_out[:100]
-        raise ValueError(msg.format(raw_out))
-
-    except Exception as exc:
-        msg = "Can't parse fio output: {0!r}\nError: {1!s}"
-        raw_out = raw_out[:100]
-        raise ValueError(msg.format(raw_out, exc))
-
-    return zip(parsed_out, config_slice), (start_time, end_time)
-
-
-class FioResult(object):
-    def __init__(self, name, params, run_interval, results):
-        self.params = params.copy()
-        self.name = name
-        self.run_interval = run_interval
-        self.results = results
-
-    def json_obj(self):
-        return self.__dict__
-
-
-def make_job_results(section, job_output, slice_timings):
-    # merge by section.merge_id
-
-    raw_result = job_output['mixed']
-
-    res = {
-        "bw": raw_result["bw"],
-        "iops": raw_result["iops"],
-        "lat": raw_result["lat"]["mean"],
-        "clat": raw_result["clat"]["mean"],
-        "slat": raw_result["slat"]["mean"]
-    }
-
-    vls = section.vals.copy()
-
-    vls['sync_mode'] = get_test_sync_mode(vls)
-    vls['concurence'] = vls.get('numjobs', 1)
-
-    return FioResult(section.name, vls, slice_timings, res)
-
-
-def get_slice_parts_offset(test_slice, real_inteval):
-    calc_exec_time = calculate_execution_time(test_slice)
-    coef = (real_inteval[1] - real_inteval[0]) / calc_exec_time
-    curr_offset = real_inteval[0]
-    for section in test_slice:
-        slen = calculate_execution_time([section]) * coef
-        yield (curr_offset, curr_offset + slen)
-        curr_offset += slen
-
-
-def run_fio(sliced_it, raw_results_func=None):
-    sliced_list = list(sliced_it)
-
-    curr_test_num = 0
-    executed_tests = 0
-    result = []
-
-    for i, test_slice in enumerate(sliced_list):
-        test_slice = list(test_slice)
-
-        res_cfg_it, slice_timings = do_run_fio(test_slice)
-        sec_intervals = get_slice_parts_offset(test_slice,
-                                               slice_timings)
-        res_cfg_it = enumerate(zip(res_cfg_it, sec_intervals),
-                               curr_test_num)
-
-        section_names = []
-        for curr_test_num, ((job_output, section), interval) in res_cfg_it:
-            executed_tests += 1
-            section_names.append(section.name)
-
-            if raw_results_func is not None:
-                raw_results_func(executed_tests,
-                                 [job_output, section])
-
-            msg = "{0} != {1}".format(section.name, job_output["jobname"])
-            assert section.name == job_output["jobname"], msg
-
-            result.append(make_job_results(section, job_output, interval))
-
-        curr_test_num += 1
-        msg_template = "Done {0} tests from {1}. ETA: {2}"
-
-        rest = sliced_list[i:]
-        time_eta = sum(map(calculate_execution_time, rest))
-        test_left = sum(map(len, rest))
-        print msg_template.format(curr_test_num,
-                                  test_left,
-                                  sec_to_str(time_eta))
-
-    return result
-
-
-def run_benchmark(binary_tp, *argv, **kwargs):
-    if 'fio' == binary_tp:
-        return run_fio(*argv, **kwargs)
-    raise ValueError("Unknown behcnmark {0}".format(binary_tp))
-
-
-def read_config(fd, timeout=10):
-    job_cfg = ""
-    etime = time.time() + timeout
-    while True:
-        wtime = etime - time.time()
-        if wtime <= 0:
-            raise IOError("No config provided")
-
-        r, w, x = select.select([fd], [], [], wtime)
-        if len(r) == 0:
-            raise IOError("No config provided")
-
-        char = fd.read(1)
-        if '' == char:
-            return job_cfg
-
-        job_cfg += char
-
-
-def sec_to_str(seconds):
-    h = seconds // 3600
-    m = (seconds % 3600) // 60
-    s = seconds % 60
-    return "{0}:{1:02d}:{2:02d}".format(h, m, s)
-
-
-def parse_args(argv):
-    parser = argparse.ArgumentParser(
-        description="Run fio' and return result")
-    parser.add_argument("--type", metavar="BINARY_TYPE",
-                        choices=['fio'], default='fio',
-                        help=argparse.SUPPRESS)
-    parser.add_argument("--start-at", metavar="START_AT_UTC", type=int,
-                        help="Start execution at START_AT_UTC")
-    parser.add_argument("--json", action="store_true", default=False,
-                        help="Json output format")
-    parser.add_argument("-o", "--output", default='-', metavar="FILE_PATH",
-                        help="Store results to FILE_PATH")
-    parser.add_argument("--estimate", action="store_true", default=False,
-                        help="Only estimate task execution time")
-    parser.add_argument("--compile", action="store_true", default=False,
-                        help="Compile config file to fio config")
-    parser.add_argument("--num-tests", action="store_true", default=False,
-                        help="Show total number of tests")
-    parser.add_argument("--runcycle", type=int, default=None,
-                        metavar="MAX_CYCLE_SECONDS",
-                        help="Max cycle length in seconds")
-    parser.add_argument("--show-raw-results", action='store_true',
-                        default=False, help="Output raw input and results")
-    parser.add_argument("--params", nargs="*", metavar="PARAM=VAL",
-                        default=[],
-                        help="Provide set of pairs PARAM=VAL to" +
-                             "format into job description")
-    parser.add_argument("-p", "--pid-file", metavar="FILE_TO_STORE_PID",
-                        default=None, help="Store pid to FILE_TO_STORE_PID " +
-                        "and remove this file on exit")
-    parser.add_argument("jobfile")
-    return parser.parse_args(argv)
-
-
-def main(argv):
-    argv_obj = parse_args(argv)
-
-    if argv_obj.jobfile == '-':
-        job_cfg = read_config(sys.stdin)
-    else:
-        job_cfg = open(argv_obj.jobfile).read()
-
-    if argv_obj.output == '-':
-        out_fd = sys.stdout
-    else:
-        out_fd = open(argv_obj.output, "w")
-
-    if argv_obj.pid_file is not None:
-        with open(argv_obj.pid_file, "w") as fd:
-            fd.write(str(os.getpid()))
-
-    try:
-        params = {}
-        for param_val in argv_obj.params:
-            assert '=' in param_val
-            name, val = param_val.split("=", 1)
-            params[name] = val
-
-        slice_params = {
-            'runcycle': argv_obj.runcycle,
-        }
-
-        sliced_it = parse_and_slice_all_in_1(job_cfg, params, **slice_params)
-
-        if argv_obj.estimate:
-            it = map(calculate_execution_time, sliced_it)
-            print sec_to_str(sum(it))
-            return 0
-
-        if argv_obj.num_tests or argv_obj.compile:
-            if argv_obj.compile:
-                for test_slice in sliced_it:
-                    out_fd.write(fio_config_to_str(test_slice))
-                    out_fd.write("\n#" + "-" * 70 + "\n\n")
-
-            if argv_obj.num_tests:
-                print len(list(sliced_it))
-
-            return 0
-
-        if argv_obj.start_at is not None:
-            ctime = time.time()
-            if argv_obj.start_at >= ctime:
-                time.sleep(ctime - argv_obj.start_at)
-
-        def raw_res_func(test_num, data):
-            pref = "========= RAW_RESULTS({0}) =========\n".format(test_num)
-            out_fd.write(pref)
-            out_fd.write(json.dumps(data))
-            out_fd.write("\n========= END OF RAW_RESULTS =========\n")
-            out_fd.flush()
-
-        rrfunc = raw_res_func if argv_obj.show_raw_results else None
-
-        job_res = run_benchmark(argv_obj.type,
-                                sliced_it, rrfunc)
-
-        res = {'__meta__': {'params': params,
-                            'testnodes_count': int(params.get('VM_COUNT', 1))},
-               'res': [j.json_obj() for j in job_res]}
-
-        oformat = 'json' if argv_obj.json else 'eval'
-
-        msg = "========= RESULTS(format={0}) =========\n"
-        out_fd.write(msg.format(oformat))
-        if argv_obj.json:
-            out_fd.write(json.dumps(res))
-        else:
-            out_fd.write(pprint.pformat(res) + "\n")
-        out_fd.write("\n========= END OF RESULTS =========\n")
-
-        return 0
-    except:
-        out_fd.write("============ ERROR =============\n")
-        out_fd.write(traceback.format_exc() + "\n")
-        out_fd.write("============ END OF ERROR =============\n")
-        return 1
-    finally:
-        try:
-            if out_fd is not sys.stdout:
-                out_fd.flush()
-                os.fsync(out_fd)
-                out_fd.close()
-        except Exception:
-            traceback.print_exc()
-
-        if argv_obj.pid_file is not None:
-            if os.path.exists(argv_obj.pid_file):
-                os.unlink(argv_obj.pid_file)
-
-
-def fake_main(x):
-    import yaml
-    time.sleep(60)
-    out_fd = sys.stdout
-    fname = "/tmp/perf_tests/metempirical_alisha/raw_results.yaml"
-    res = yaml.load(open(fname).read())[0][1]
-    out_fd.write("========= RESULTS(format=json) =========\n")
-    out_fd.write(json.dumps(res))
-    out_fd.write("\n========= END OF RESULTS =========\n")
-    return 0
-
-
-if __name__ == '__main__':
-    # exit(fake_main(sys.argv[1:]))
-    exit(main(sys.argv[1:]))
diff --git a/wally/suits/io/ceph.cfg b/wally/suits/io/ceph.cfg
index f38b37c..26aa65f 100644
--- a/wally/suits/io/ceph.cfg
+++ b/wally/suits/io/ceph.cfg
@@ -1,28 +1,19 @@
-[defaults]
-wait_for_previous=1
-group_reporting=1
-time_based=1
-buffered=0
-iodepth=1
-softrandommap=1
-filename={FILENAME}
-NUM_ROUNDS=7
-thread=1
-
-# this is critical for correct results in multy-node run
-randrepeat=0
+[global]
+include defaults.cfg
 
 NUMJOBS={% 1, 5, 10, 15, 40 %}
 NUMJOBS_SHORT={% 1, 2, 3, 10 %}
+TEST_FILE_SIZE=100G
 
-size=100G
+size={TEST_FILE_SIZE}
 ramp_time=15
 runtime=60
+NUM_ROUNDS=7
 
 # ---------------------------------------------------------------------
 # check different thread count, sync mode. (latency, iops) = func(th_count)
 # ---------------------------------------------------------------------
-[ceph_test_{TEST_SUMM} * {NUM_ROUNDS}]
+[ceph_test_{TEST_SUMM}]
 blocksize=4k
 rw=randwrite
 sync=1
@@ -31,7 +22,7 @@
 # ---------------------------------------------------------------------
 # direct write
 # ---------------------------------------------------------------------
-[ceph_test_{TEST_SUMM} * {NUM_ROUNDS}]
+[ceph_test_{TEST_SUMM}]
 blocksize=4k
 rw=randwrite
 direct=1
@@ -41,7 +32,7 @@
 # check different thread count, direct read mode. (latency, iops) = func(th_count)
 # also check iops for randread
 # ---------------------------------------------------------------------
-[ceph_test_{TEST_SUMM} * {NUM_ROUNDS}]
+[ceph_test_{TEST_SUMM}]
 blocksize=4k
 rw=randread
 direct=1
@@ -51,7 +42,7 @@
 # this is essentially sequential write/read operations
 # we can't use sequential with numjobs > 1 due to caching and block merging
 # ---------------------------------------------------------------------
-[ceph_test_{TEST_SUMM} * {NUM_ROUNDS}]
+[ceph_test_{TEST_SUMM}]
 blocksize=16m
 rw={% randread, randwrite %}
 direct=1
diff --git a/wally/suits/io/check_distribution.cfg b/wally/suits/io/check_distribution.cfg
index e7cafd9..4746f37 100644
--- a/wally/suits/io/check_distribution.cfg
+++ b/wally/suits/io/check_distribution.cfg
@@ -1,19 +1,13 @@
-[defaults]
+[global]
+include defaults.cfg
 NUM_ROUNDS=301
 
-# this is critical for correct results in multy-node run
-randrepeat=0
-
-[distrubution_test_{TEST_SUMM} * {NUM_ROUNDS}]
+[distrubution_test_{TEST_SUMM}]
 blocksize=4k
 rw=randwrite
 direct=1
+
 ramp_time=5
-buffered=0
-wait_for_previous
-filename={FILENAME}
-iodepth=1
-size=10G
-time_based
 runtime=30
-group_reporting
+
+size=10G
diff --git a/wally/suits/io/check_linearity.cfg b/wally/suits/io/check_linearity.cfg
index 670e8b3..f7c37fb 100644
--- a/wally/suits/io/check_linearity.cfg
+++ b/wally/suits/io/check_linearity.cfg
@@ -1,33 +1,26 @@
-[defaults]
+[global]
+
+include defaults.cfg
 NUM_ROUNDS=7
 
-# this is critical for correct results in multy-node run
-randrepeat=0
-
-ramp_time=5
-buffered=0
-wait_for_previous
-filename={FILENAME}
-iodepth=1
-size=10G
-time_based
+size={TEST_FILE_SIZE}
 ramp_time=5
 runtime=30
 
 # ---------------------------------------------------------------------
 # check read and write linearity. oper_time = func(size)
 # ---------------------------------------------------------------------
-[linearity_test_{TEST_SUMM} * {NUM_ROUNDS}]
-blocksize={% 512, 1k, 4k, 8k, 16k, 32k, 64k, 128k, 256k, 512k, 1m, 2m, 4m %}
-rw={% randwrite, randread %}
-direct=1
+# [linearity_test_{TEST_SUMM}]
+# blocksize={BLOCK_SIZES}
+# rw={% randwrite, randread %}
+# direct=1
 
 # ---------------------------------------------------------------------
 # check sync write linearity. oper_time = func(size)
 # check sync BW as well
 # ---------------------------------------------------------------------
-[linearity_test_{TEST_SUMM} * {NUM_ROUNDS}]
-blocksize={% 512, 1k, 4k, 8k, 16k, 32k, 64k, 128k, 256k, 512k, 1m, 2m, 4m %}
+[linearity_test_{TEST_SUMM}]
+blocksize={BLOCK_SIZES}
 rw=randwrite
 sync=1
 
diff --git a/wally/suits/io/defaults.cfg b/wally/suits/io/defaults.cfg
new file mode 100644
index 0000000..51a8145
--- /dev/null
+++ b/wally/suits/io/defaults.cfg
@@ -0,0 +1,14 @@
+buffered=0
+group_reporting=1
+iodepth=1
+softrandommap=1
+thread=1
+time_based=1
+wait_for_previous=1
+
+# this is critical for correct results in multy-node run
+randrepeat=0
+
+filename={FILENAME}
+
+
diff --git a/wally/suits/io/fio_task_parser.py b/wally/suits/io/fio_task_parser.py
new file mode 100644
index 0000000..52c4bb3
--- /dev/null
+++ b/wally/suits/io/fio_task_parser.py
@@ -0,0 +1,458 @@
+import os
+import sys
+import copy
+import os.path
+import argparse
+import itertools
+from collections import OrderedDict, namedtuple
+
+
+from wally.utils import sec_to_str
+
+
+SECTION = 0
+SETTING = 1
+INCLUDE = 2
+
+
+Var = namedtuple('Var', ('name',))
+CfgLine = namedtuple('CfgLine', ('fname', 'lineno', 'oline',
+                                 'tp', 'name', 'val'))
+
+
+class FioJobSection(object):
+    def __init__(self, name):
+        self.name = name
+        self.vals = OrderedDict()
+
+    def copy(self):
+        return copy.deepcopy(self)
+
+    def required_vars(self):
+        for name, val in self.vals.items():
+            if isinstance(val, Var):
+                yield name, val
+
+    def is_free(self):
+        return len(list(self.required_vars())) == 0
+
+    def __str__(self):
+        res = "[{0}]\n".format(self.name)
+
+        for name, val in self.vals.items():
+            if name.startswith('_') or name == name.upper():
+                continue
+            if isinstance(val, Var):
+                res += "{0}={{{1}}}\n".format(name, val.name)
+            else:
+                res += "{0}={1}\n".format(name, val)
+
+        return res
+
+
+def to_bytes(sz):
+    sz = sz.lower()
+    try:
+        return int(sz)
+    except ValueError:
+        if sz[-1] == 'm':
+            return (1024 ** 2) * int(sz[:-1])
+        if sz[-1] == 'k':
+            return 1024 * int(sz[:-1])
+        if sz[-1] == 'g':
+            return (1024 ** 3) * int(sz[:-1])
+        raise
+
+
+class ParseError(ValueError):
+    def __init__(self, msg, fname, lineno, line_cont=""):
+        ValueError.__init__(self, msg)
+        self.file_name = fname
+        self.lineno = lineno
+        self.line_cont = line_cont
+
+    def __str__(self):
+        msg = "In {0}:{1} ({2}) : {3}"
+        return msg.format(self.file_name,
+                          self.lineno,
+                          self.line_cont,
+                          super(ParseError, self).__str__())
+
+
+def is_name(name):
+    if len(name) == 0:
+        return False
+
+    if name[0] != '_' and not name[0].isalpha():
+        return False
+
+    for ch in name[1:]:
+        if name[0] != '_' and not name[0].isalnum():
+            return False
+
+    return True
+
+
+def parse_value(val):
+    try:
+        return int(val)
+    except ValueError:
+        pass
+
+    try:
+        return float(val)
+    except ValueError:
+        pass
+
+    if val.startswith('{%'):
+        assert val.endswith("%}")
+        content = val[2:-2]
+        vals = list(i.strip() for i in content.split(','))
+        return map(parse_value, vals)
+
+    if val.startswith('{'):
+        assert val.endswith("}")
+        assert is_name(val[1:-1])
+        return Var(val[1:-1])
+    return val
+
+
+def fio_config_lexer(fio_cfg, fname):
+    for lineno, oline in enumerate(fio_cfg.split("\n")):
+        try:
+            line = oline.strip()
+
+            if line.startswith("#") or line.startswith(";"):
+                continue
+
+            if line == "":
+                continue
+
+            if '#' in line:
+                raise ParseError("# isn't allowed inside line",
+                                 fname, lineno, oline)
+
+            if line.startswith('['):
+                yield CfgLine(fname, lineno, oline, SECTION,
+                              line[1:-1].strip(), None)
+            elif '=' in line:
+                opt_name, opt_val = line.split('=', 1)
+                yield CfgLine(fname, lineno, oline, SETTING,
+                              opt_name.strip(),
+                              parse_value(opt_val.strip()))
+            elif line.startswith("include "):
+                yield CfgLine(fname, lineno, oline, INCLUDE,
+                              line.split(" ", 1)[1], None)
+            else:
+                yield CfgLine(fname, lineno, oline, SETTING, line, '1')
+
+        except Exception as exc:
+            raise ParseError(str(exc), fname, lineno, oline)
+
+
+def fio_config_parse(lexer_iter):
+    in_globals = False
+    curr_section = None
+    glob_vals = OrderedDict()
+    sections_count = 0
+
+    lexed_lines = list(lexer_iter)
+    one_more = True
+    includes = {}
+
+    while one_more:
+        new_lines = []
+        one_more = False
+        for line in lexed_lines:
+            fname, lineno, oline, tp, name, val = line
+
+            if INCLUDE == tp:
+                if not os.path.exists(fname):
+                    dirname = '.'
+                else:
+                    dirname = os.path.dirname(fname)
+
+                new_fname = os.path.join(dirname, name)
+                includes[new_fname] = (fname, lineno)
+
+                try:
+                    cont = open(new_fname).read()
+                except IOError as err:
+                    msg = "Error while including file {0}: {1}"
+                    raise ParseError(msg.format(new_fname, err),
+                                     fname, lineno, oline)
+
+                new_lines.extend(fio_config_lexer(cont, new_fname))
+                one_more = True
+            else:
+                new_lines.append(line)
+
+        lexed_lines = new_lines
+
+    for fname, lineno, oline, tp, name, val in lexed_lines:
+        if tp == SECTION:
+            if curr_section is not None:
+                yield curr_section
+                curr_section = None
+
+            if name == 'global':
+                if sections_count != 0:
+                    raise ParseError("[global] section should" +
+                                     " be only one and first",
+                                     fname, lineno, oline)
+                in_globals = True
+            else:
+                in_globals = False
+                curr_section = FioJobSection(name)
+                curr_section.vals = glob_vals.copy()
+            sections_count += 1
+        else:
+            assert tp == SETTING
+            if in_globals:
+                glob_vals[name] = val
+            elif name == name.upper():
+                raise ParseError("Param '" + name +
+                                 "' not in [global] section",
+                                 fname, lineno, oline)
+            elif curr_section is None:
+                    raise ParseError("Data outside section",
+                                     fname, lineno, oline)
+            else:
+                curr_section.vals[name] = val
+
+    if curr_section is not None:
+        yield curr_section
+
+
+def process_repeats(sec):
+    sec = sec.copy()
+    count = sec.vals.pop('NUM_ROUNDS', 1)
+    assert isinstance(count, (int, long))
+
+    for _ in range(count):
+        yield sec.copy()
+
+        if 'ramp_time' in sec.vals:
+            sec.vals['_ramp_time'] = sec.vals.pop('ramp_time')
+
+
+def process_cycles(sec):
+    cycles = OrderedDict()
+
+    for name, val in sec.vals.items():
+        if isinstance(val, list) and name.upper() != name:
+            cycles[name] = val
+
+    if len(cycles) == 0:
+        yield sec
+    else:
+        for combination in itertools.product(*cycles.values()):
+            new_sec = sec.copy()
+            new_sec.vals.update(zip(cycles.keys(), combination))
+            yield new_sec
+
+
+def apply_params(sec, params):
+    processed_vals = OrderedDict()
+    processed_vals.update(params)
+    for name, val in sec.vals.items():
+        if name in params:
+            continue
+
+        if isinstance(val, Var):
+            if val.name in params:
+                val = params[val.name]
+            elif val.name in processed_vals:
+                val = processed_vals[val.name]
+        processed_vals[name] = val
+    sec = sec.copy()
+    sec.vals = processed_vals
+    return sec
+
+
+def finall_process(sec, counter=[0]):
+    sec = sec.copy()
+
+    if sec.vals.get('numjobs', '1') != 1:
+        msg = "Group reporting should be set if numjobs != 1"
+        assert 'group_reporting' in sec.vals, msg
+
+    sec.vals['unified_rw_reporting'] = '1'
+
+    params = sec.vals.copy()
+    params['UNIQ'] = 'UN{0}'.format(counter[0])
+    params['COUNTER'] = str(counter[0])
+    params['TEST_SUMM'] = get_test_summary(sec)
+    sec.name = sec.name.format(**params)
+    counter[0] += 1
+
+    return sec
+
+
+def get_test_sync_mode(sec):
+    is_sync = str(sec.vals.get("sync", "0")) == "1"
+    is_direct = str(sec.vals.get("direct", "0")) == "1"
+
+    if is_sync and is_direct:
+        return 'x'
+    elif is_sync:
+        return 's'
+    elif is_direct:
+        return 'd'
+    else:
+        return 'a'
+
+
+def get_test_summary(sec):
+    rw = {"randread": "rr",
+          "randwrite": "rw",
+          "read": "sr",
+          "write": "sw"}[sec.vals["rw"]]
+
+    sync_mode = get_test_sync_mode(sec)
+    th_count = sec.vals.get('numjobs')
+
+    if th_count is None:
+        th_count = sec.vals.get('concurence', 1)
+
+    return "{0}{1}{2}th{3}".format(rw,
+                                   sync_mode,
+                                   sec.vals['blocksize'],
+                                   th_count)
+
+
+def execution_time(sec):
+    return sec.vals.get('ramp_time', 0) + sec.vals.get('runtime', 0)
+
+
+def slice_config(sec_iter, runcycle=None, max_jobs=1000, split_on_names=False):
+    jcount = 0
+    runtime = 0
+    curr_slice = []
+    prev_name = None
+
+    for pos, sec in enumerate(sec_iter):
+
+        if prev_name is not None:
+            split_here = False
+
+            if split_on_names and prev_name != sec.name:
+                split_here = True
+
+            if split_here:
+                yield curr_slice
+                curr_slice = []
+                runtime = 0
+                jcount = 0
+
+        prev_name = sec.name
+
+        jc = sec.vals.get('numjobs', 1)
+        msg = "numjobs should be integer, not {0!r}".format(jc)
+        assert isinstance(jc, int), msg
+
+        curr_task_time = execution_time(sec)
+
+        if jc > max_jobs:
+            err_templ = "Can't process job {0!r} - too large numjobs"
+            raise ValueError(err_templ.format(sec.name))
+
+        if runcycle is not None and len(curr_slice) != 0:
+            rc_ok = curr_task_time + runtime <= runcycle
+        else:
+            rc_ok = True
+
+        if jc + jcount <= max_jobs and rc_ok:
+            runtime += curr_task_time
+            jcount += jc
+            curr_slice.append(sec)
+            continue
+
+        assert len(curr_slice) != 0
+        yield curr_slice
+
+        if '_ramp_time' in sec.vals:
+            sec.vals['ramp_time'] = sec.vals.pop('_ramp_time')
+            curr_task_time = execution_time(sec)
+
+        runtime = curr_task_time
+        jcount = jc
+        curr_slice = [sec]
+        prev_name = None
+
+    if curr_slice != []:
+        yield curr_slice
+
+
+def parse_all_in_1(source, fname=None):
+    return fio_config_parse(fio_config_lexer(source, fname))
+
+
+def flatmap(func, inp_iter):
+    for val in inp_iter:
+        for res in func(val):
+            yield res
+
+
+def fio_cfg_compile(source, fname, test_params, **slice_params):
+    it = parse_all_in_1(source, fname)
+    it = (apply_params(sec, test_params) for sec in it)
+    it = flatmap(process_cycles, it)
+    it = flatmap(process_repeats, it)
+    it = itertools.imap(finall_process, it)
+    return slice_config(it, **slice_params)
+
+
+def parse_args(argv):
+    parser = argparse.ArgumentParser(
+        description="Run fio' and return result")
+    parser.add_argument("--runcycle", type=int, default=None,
+                        metavar="MAX_CYCLE_SECONDS",
+                        help="Max cycle length in seconds")
+    parser.add_argument("-p", "--params", nargs="*", metavar="PARAM=VAL",
+                        default=[],
+                        help="Provide set of pairs PARAM=VAL to" +
+                             "format into job description")
+    parser.add_argument("action", choices=['estimate', 'compile', 'num_tests'])
+    parser.add_argument("jobfile")
+    return parser.parse_args(argv)
+
+
+def main(argv):
+    argv_obj = parse_args(argv)
+
+    if argv_obj.jobfile == '-':
+        job_cfg = sys.stdin.read()
+    else:
+        job_cfg = open(argv_obj.jobfile).read()
+
+    params = {}
+    for param_val in argv_obj.params:
+        assert '=' in param_val
+        name, val = param_val.split("=", 1)
+        params[name] = parse_value(val)
+
+    slice_params = {
+        'runcycle': argv_obj.runcycle,
+    }
+
+    sliced_it = fio_cfg_compile(job_cfg, argv_obj.jobfile,
+                                params, **slice_params)
+
+    if argv_obj.action == 'estimate':
+        sum_time = 0
+        for cfg_slice in sliced_it:
+            sum_time += sum(map(execution_time, cfg_slice))
+        print sec_to_str(sum_time)
+    elif argv_obj.action == 'num_tests':
+        print sum(map(len, map(list, sliced_it)))
+    elif argv_obj.action == 'compile':
+        splitter = "\n#" + "-" * 70 + "\n\n"
+        for cfg_slice in sliced_it:
+            print splitter.join(map(str, cfg_slice))
+
+    return 0
+
+
+if __name__ == '__main__':
+    exit(main(sys.argv[1:]))
diff --git a/wally/suits/io/formatter.py b/wally/suits/io/formatter.py
index 22c090f..84b0a13 100644
--- a/wally/suits/io/formatter.py
+++ b/wally/suits/io/formatter.py
@@ -2,15 +2,21 @@
 
 from wally.utils import ssize2b
 from wally.statistic import round_3_digit
-from wally.suits.io.agent import get_test_summary
+from .fio_task_parser import get_test_summary, get_test_sync_mode
 
 
 def key_func(data):
-    p = data.params
+    p = data.params.vals
+
+    th_count = data.params.vals.get('numjobs')
+
+    if th_count is None:
+        th_count = data.params.vals.get('concurence', 1)
+
     return (p['rw'],
-            p['sync_mode'],
+            get_test_sync_mode(data.params),
             ssize2b(p['blocksize']),
-            int(p['concurence']) * data.testnodes_count,
+            int(th_count) * data.testnodes_count,
             data.name)
 
 
@@ -41,8 +47,7 @@
 
         prev_k = curr_k
 
-        descr = get_test_summary(data.params, data.testnodes_count)
-        test_dinfo = dinfo[data.name]
+        test_dinfo = dinfo[(data.name, data.summary)]
 
         iops, _ = test_dinfo.iops.rounded_average_conf()
 
@@ -61,7 +66,7 @@
         bw = round_3_digit(bw)
 
         params = (data.name.rsplit('_', 1)[0],
-                  descr, int(iops), int(bw), str(conf_perc),
+                  data.summary, int(iops), int(bw), str(conf_perc),
                   str(dev_perc),
                   int(iops_per_vm), int(bw_per_vm), lat)
         tab.add_row(params)
diff --git a/wally/suits/io/hdd.cfg b/wally/suits/io/hdd.cfg
index 17d0509..21166e5 100644
--- a/wally/suits/io/hdd.cfg
+++ b/wally/suits/io/hdd.cfg
@@ -1,16 +1,11 @@
-[defaults]
-wait_for_previous
-group_reporting
-time_based
-buffered=0
-iodepth=1
-softrandommap=1
-filename={FILENAME}
-NUM_ROUNDS=7
-NUMJOBS={% 1, 5, 10, 15, 20, 30, 40, 80 %}
+[global]
+include defaults.cfg
 
-# this is critical for correct results in multy-node run
-randrepeat=0
+NUM_ROUNDS=3
+
+# NUMJOBS={% 1, 5, 10, 15, 20, 30, 40, 80 %}
+
+NUMJOBS={% 1, 5, 10 %}
 
 size=10G
 ramp_time=5
@@ -19,7 +14,7 @@
 # ---------------------------------------------------------------------
 # check different thread count, sync mode. (latency, iops) = func(th_count)
 # ---------------------------------------------------------------------
-[hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
+[hdd_test_{TEST_SUMM}]
 blocksize=4k
 rw=randwrite
 sync=1
@@ -29,7 +24,7 @@
 # check different thread count, direct read mode. (latency, iops) = func(th_count)
 # also check iops for randread
 # ---------------------------------------------------------------------
-[hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
+[hdd_test_{TEST_SUMM}]
 blocksize=4k
 rw=randread
 direct=1
@@ -38,7 +33,7 @@
 # ---------------------------------------------------------------------
 # check IOPS randwrite.
 # ---------------------------------------------------------------------
-[hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
+[hdd_test_{TEST_SUMM}]
 blocksize=4k
 rw=randwrite
 direct=1
@@ -47,7 +42,7 @@
 # No reason for th count > 1 in case of sequantial operations
 # They became random
 # ---------------------------------------------------------------------
-[hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
+[hdd_test_{TEST_SUMM}]
 blocksize=1m
 rw={% read, write %}
 direct=1
diff --git a/wally/suits/io/lat_vs_iops.cfg b/wally/suits/io/lat_vs_iops.cfg
index a587a96..dbafcbb 100644
--- a/wally/suits/io/lat_vs_iops.cfg
+++ b/wally/suits/io/lat_vs_iops.cfg
@@ -1,29 +1,40 @@
-[defaults]
-wait_for_previous=1
-filename={FILENAME}
+[global]
+include defaults.cfg
 
-# this is critical for correct results in multy-node run
-randrepeat=0
-
-iodepth=1
-size=100G
-group_reporting=1
-
-IOPS_LIMIT={% 100, 500 %}
+TEST_FILE_SIZE=100G
+size={TEST_FILE_SIZE}
 
 ramp_time=5
 runtime=30
-time_based=1
 
-buffered=0
-NUMJOBS=1
+blocksize=4k
+rw=randwrite
+sync=1
 
 # ---------------------------------------------------------------------
 # latency as function from IOPS
 # ---------------------------------------------------------------------
-[lat_vs_iops{rate_iops}_{TEST_SUMM} * {NUM_ROUNDS}]
-blocksize=4k
-rw=randread
-direct=1
-numjobs={NUMJOBS}
-rate_iops={IOPS_LIMIT}
+[lat_vs_iops{rate_iops}_{TEST_SUMM}]
+numjobs=1
+rate_iops={% 20, 40, 60, 80, 100, 120, 160, 200, 250, 300 %}
+
+# ---------------------------------------------------------------------
+# latency as function from IOPS
+# ---------------------------------------------------------------------
+[lat_vs_iops{rate_iops}_{TEST_SUMM}]
+numjobs=3
+rate_iops={% 10, 20, 40, 60, 80, 100, 120, 160 %}
+
+# ---------------------------------------------------------------------
+# latency as function from IOPS
+# ---------------------------------------------------------------------
+[lat_vs_iops{rate_iops}_{TEST_SUMM}]
+numjobs=7
+rate_iops={% 5, 10, 20, 40, 50, 60, 70 %}
+
+# ---------------------------------------------------------------------
+# latency as function from IOPS
+# ---------------------------------------------------------------------
+[lat_vs_iops{rate_iops}_{TEST_SUMM}]
+numjobs=10
+rate_iops={% 5, 10, 20, 40, 50 %}
diff --git a/wally/suits/io/results_loader.py b/wally/suits/io/results_loader.py
deleted file mode 100644
index 988fe0e..0000000
--- a/wally/suits/io/results_loader.py
+++ /dev/null
@@ -1,56 +0,0 @@
-import re
-import json
-
-
-def parse_output(out_err):
-    err_start_patt = r"(?ims)=+\s+ERROR\s+=+"
-    err_end_patt = r"(?ims)=+\s+END OF ERROR\s+=+"
-
-    for block in re.split(err_start_patt, out_err)[1:]:
-        tb, garbage = re.split(err_end_patt, block)
-        msg = "Test fails with error:\n" + tb.strip() + "\n"
-        raise OSError(msg)
-
-    start_patt = r"(?ims)=+\s+RESULTS\(format=json\)\s+=+"
-    end_patt = r"(?ims)=+\s+END OF RESULTS\s+=+"
-
-    for block in re.split(start_patt, out_err)[1:]:
-        data, garbage = re.split(end_patt, block)
-        yield json.loads(data.strip())
-
-    start_patt = r"(?ims)=+\s+RESULTS\(format=eval\)\s+=+"
-    end_patt = r"(?ims)=+\s+END OF RESULTS\s+=+"
-
-    for block in re.split(start_patt, out_err)[1:]:
-        data, garbage = re.split(end_patt, block)
-        yield eval(data.strip())
-
-
-def filter_data(name_prefix, fields_to_select, **filters):
-    def closure(data):
-        for result in data:
-            if name_prefix is not None:
-                if not result['jobname'].startswith(name_prefix):
-                    continue
-
-            for k, v in filters.items():
-                if result.get(k) != v:
-                    break
-            else:
-                yield map(result.get, fields_to_select)
-    return closure
-
-
-def filter_processed_data(name_prefix, fields_to_select, **filters):
-    def closure(data):
-        for name, result in data.items():
-            if name_prefix is not None:
-                if not name.startswith(name_prefix):
-                    continue
-
-            for k, v in filters.items():
-                if result.raw.get(k) != v:
-                    break
-            else:
-                yield map(result.raw.get, fields_to_select)
-    return closure
diff --git a/wally/suits/io/verify.cfg b/wally/suits/io/verify.cfg
index 58b8450..9ebfad1 100644
--- a/wally/suits/io/verify.cfg
+++ b/wally/suits/io/verify.cfg
@@ -1,28 +1,18 @@
-[defaults]
-wait_for_previous
-group_reporting
-time_based
-buffered=0
-iodepth=1
-softrandommap=1
-filename={FILENAME}
-NUM_ROUNDS=1
-
-# this is critical for correct results in multy-node run
-randrepeat=0
-
+[global]
+include defaults.cfg
 size=50G
 ramp_time=5
 runtime=60
+NUM_ROUNDS=2
 
 # ---------------------------------------------------------------------
-[verify_{TEST_SUMM} * {NUM_ROUNDS}]
-blocksize=4k
-rw=randwrite
-direct=1
+#[verify_{TEST_SUMM}]
+#blocksize=4k
+#rw=randwrite
+#direct=1
 
 # ---------------------------------------------------------------------
-[verify_{TEST_SUMM} * {NUM_ROUNDS}]
+[verify_{TEST_SUMM}]
 blocksize=4k
 rw=randread
 direct=1