very large refactoring
diff --git a/wally/suits/__init__.py b/wally/suits/__init__.py
new file mode 100644
index 0000000..7b6610e
--- /dev/null
+++ b/wally/suits/__init__.py
@@ -0,0 +1,3 @@
+from .itest import TwoScriptTest, PgBenchTest, IOPerfTest
+
+__all__ = ["TwoScriptTest", "PgBenchTest", "IOPerfTest"]
diff --git a/wally/suits/io/__init__.py b/wally/suits/io/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/wally/suits/io/__init__.py
diff --git a/wally/suits/io/agent.py b/wally/suits/io/agent.py
new file mode 100644
index 0000000..7589346
--- /dev/null
+++ b/wally/suits/io/agent.py
@@ -0,0 +1,688 @@
+import sys
+import time
+import json
+import random
+import select
+import pprint
+import argparse
+import traceback
+import subprocess
+import itertools
+from collections import OrderedDict
+
+
+SECTION = 0
+SETTING = 1
+
+
+def get_test_sync_mode(config):
+    try:
+        return config['sync_mode']
+    except KeyError:
+        pass
+
+    is_sync = config.get("sync", "0") == "1"
+    is_direct = config.get("direct", "0") == "1"
+
+    if is_sync and is_direct:
+        return 'x'
+    elif is_sync:
+        return 's'
+    elif is_direct:
+        return 'd'
+    else:
+        return 'a'
+
+
+def get_test_summary(params):
+    rw = {"randread": "rr",
+          "randwrite": "rw",
+          "read": "sr",
+          "write": "sw"}[params["rw"]]
+
+    sync_mode = get_test_sync_mode(params)
+    th_count = params.get('numjobs')
+    if th_count is None:
+        th_count = params.get('concurence', '1')
+    th_count = int(th_count)
+
+    return "{0}{1}{2}th{3}".format(rw,
+                                   sync_mode,
+                                   params['blocksize'],
+                                   th_count)
+
+
+counter = [0]
+
+
+def process_section(name, vals, defaults, format_params):
+    vals = vals.copy()
+    params = format_params.copy()
+
+    if '*' in name:
+        name, repeat = name.split('*')
+        name = name.strip()
+        repeat = int(repeat.format(**params))
+    else:
+        repeat = 1
+
+    # this code can be optimized
+    iterable_names = []
+    iterable_values = []
+    processed_vals = {}
+
+    for val_name, val in vals.items():
+        if val is None:
+            processed_vals[val_name] = val
+        # remove hardcode
+        elif val.startswith('{%'):
+            assert val.endswith("%}")
+            content = val[2:-2].format(**params)
+            iterable_names.append(val_name)
+            iterable_values.append(list(i.strip() for i in content.split(',')))
+        else:
+            processed_vals[val_name] = val.format(**params)
+
+    group_report_err_msg = "Group reporting should be set if numjobs != 1"
+
+    if iterable_values == []:
+        params['UNIQ'] = 'UN{0}'.format(counter[0])
+        counter[0] += 1
+        params['TEST_SUMM'] = get_test_summary(processed_vals)
+
+        if processed_vals.get('numjobs', '1') != '1':
+            assert 'group_reporting' in processed_vals, group_report_err_msg
+
+        ramp_time = processed_vals.get('ramp_time')
+        for i in range(repeat):
+            yield name.format(**params), processed_vals.copy()
+
+            if 'ramp_time' in processed_vals:
+                del processed_vals['ramp_time']
+
+        if ramp_time is not None:
+            processed_vals['ramp_time'] = ramp_time
+    else:
+        for it_vals in itertools.product(*iterable_values):
+            processed_vals.update(dict(zip(iterable_names, it_vals)))
+            params['UNIQ'] = 'UN{0}'.format(counter[0])
+            counter[0] += 1
+            params['TEST_SUMM'] = get_test_summary(processed_vals)
+
+            if processed_vals.get('numjobs', '1') != '1':
+                assert 'group_reporting' in processed_vals,\
+                    group_report_err_msg
+
+            ramp_time = processed_vals.get('ramp_time')
+
+            for i in range(repeat):
+                yield name.format(**params), processed_vals.copy()
+                if 'ramp_time' in processed_vals:
+                    processed_vals['_ramp_time'] = ramp_time
+                    processed_vals.pop('ramp_time')
+
+            if ramp_time is not None:
+                processed_vals['ramp_time'] = ramp_time
+                processed_vals.pop('_ramp_time')
+
+
+def calculate_execution_time(combinations):
+    time = 0
+    for _, params in combinations:
+        time += int(params.get('ramp_time', 0))
+        time += int(params.get('runtime', 0))
+    return time
+
+
+def parse_fio_config_full(fio_cfg, params=None):
+    defaults = {}
+    format_params = {}
+
+    if params is None:
+        ext_params = {}
+    else:
+        ext_params = params.copy()
+
+    curr_section = None
+    curr_section_name = None
+
+    for tp, name, val in parse_fio_config_iter(fio_cfg):
+        if tp == SECTION:
+            non_def = curr_section_name != 'defaults'
+            if curr_section_name is not None and non_def:
+                format_params.update(ext_params)
+                for sec in process_section(curr_section_name,
+                                           curr_section,
+                                           defaults,
+                                           format_params):
+                    yield sec
+
+            if name == 'defaults':
+                curr_section = defaults
+            else:
+                curr_section = OrderedDict()
+                curr_section.update(defaults)
+            curr_section_name = name
+
+        else:
+            assert tp == SETTING
+            assert curr_section_name is not None, "no section name"
+            if name == name.upper():
+                assert curr_section_name == 'defaults'
+                format_params[name] = val
+            else:
+                curr_section[name] = val
+
+    if curr_section_name is not None and curr_section_name != 'defaults':
+        format_params.update(ext_params)
+        for sec in process_section(curr_section_name,
+                                   curr_section,
+                                   defaults,
+                                   format_params):
+            yield sec
+
+
+def parse_fio_config_iter(fio_cfg):
+    for lineno, line in enumerate(fio_cfg.split("\n")):
+        try:
+            line = line.strip()
+
+            if line.startswith("#") or line.startswith(";"):
+                continue
+
+            if line == "":
+                continue
+
+            if line.startswith('['):
+                assert line.endswith(']'), "name should ends with ]"
+                yield SECTION, line[1:-1], None
+            elif '=' in line:
+                opt_name, opt_val = line.split('=', 1)
+                yield SETTING, opt_name.strip(), opt_val.strip()
+            else:
+                yield SETTING, line, None
+        except Exception as exc:
+            pref = "During parsing line number {0}\n".format(lineno)
+            raise ValueError(pref + exc.message)
+
+
+def format_fio_config(fio_cfg):
+    res = ""
+    for pos, (name, section) in enumerate(fio_cfg):
+        if name.startswith('_'):
+            continue
+
+        if pos != 0:
+            res += "\n"
+
+        res += "[{0}]\n".format(name)
+        for opt_name, opt_val in section.items():
+            if opt_val is None:
+                res += opt_name + "\n"
+            else:
+                res += "{0}={1}\n".format(opt_name, opt_val)
+    return res
+
+
+count = 0
+
+
+def to_bytes(sz):
+    sz = sz.lower()
+    try:
+        return int(sz)
+    except ValueError:
+        if sz[-1] == 'm':
+            return (1024 ** 2) * int(sz[:-1])
+        if sz[-1] == 'k':
+            return 1024 * int(sz[:-1])
+        raise
+
+
+def do_run_fio_fake(bconf):
+    def estimate_iops(sz, bw, lat):
+        return 1 / (lat + float(sz) / bw)
+    global count
+    count += 1
+    parsed_out = []
+
+    BW = 120.0 * (1024 ** 2)
+    LAT = 0.003
+
+    for name, cfg in bconf:
+        sz = to_bytes(cfg['blocksize'])
+        curr_lat = LAT * ((random.random() - 0.5) * 0.1 + 1)
+        curr_ulat = curr_lat * 1000000
+        curr_bw = BW * ((random.random() - 0.5) * 0.1 + 1)
+        iops = estimate_iops(sz, curr_bw, curr_lat)
+        bw = iops * sz
+
+        res = {'ctx': 10683,
+               'error': 0,
+               'groupid': 0,
+               'jobname': name,
+               'majf': 0,
+               'minf': 30,
+               'read': {'bw': 0,
+                        'bw_agg': 0.0,
+                        'bw_dev': 0.0,
+                        'bw_max': 0,
+                        'bw_mean': 0.0,
+                        'bw_min': 0,
+                        'clat': {'max': 0,
+                                 'mean': 0.0,
+                                 'min': 0,
+                                 'stddev': 0.0},
+                        'io_bytes': 0,
+                        'iops': 0,
+                        'lat': {'max': 0, 'mean': 0.0,
+                                'min': 0, 'stddev': 0.0},
+                        'runtime': 0,
+                        'slat': {'max': 0, 'mean': 0.0,
+                                 'min': 0, 'stddev': 0.0}
+                        },
+               'sys_cpu': 0.64,
+               'trim': {'bw': 0,
+                        'bw_agg': 0.0,
+                        'bw_dev': 0.0,
+                        'bw_max': 0,
+                        'bw_mean': 0.0,
+                        'bw_min': 0,
+                        'clat': {'max': 0,
+                                 'mean': 0.0,
+                                 'min': 0,
+                                 'stddev': 0.0},
+                        'io_bytes': 0,
+                        'iops': 0,
+                        'lat': {'max': 0, 'mean': 0.0,
+                                'min': 0, 'stddev': 0.0},
+                        'runtime': 0,
+                        'slat': {'max': 0, 'mean': 0.0,
+                                 'min': 0, 'stddev': 0.0}
+                        },
+               'usr_cpu': 0.23,
+               'write': {'bw': 0,
+                         'bw_agg': 0,
+                         'bw_dev': 0,
+                         'bw_max': 0,
+                         'bw_mean': 0,
+                         'bw_min': 0,
+                         'clat': {'max': 0, 'mean': 0,
+                                  'min': 0, 'stddev': 0},
+                         'io_bytes': 0,
+                         'iops': 0,
+                         'lat': {'max': 0, 'mean': 0,
+                                 'min': 0, 'stddev': 0},
+                         'runtime': 0,
+                         'slat': {'max': 0, 'mean': 0.0,
+                                  'min': 0, 'stddev': 0.0}
+                         }
+               }
+
+        if cfg['rw'] in ('read', 'randread'):
+            key = 'read'
+        elif cfg['rw'] in ('write', 'randwrite'):
+            key = 'write'
+        else:
+            raise ValueError("Uknown op type {0}".format(key))
+
+        res[key]['bw'] = bw
+        res[key]['iops'] = iops
+        res[key]['runtime'] = 30
+        res[key]['io_bytes'] = res[key]['runtime'] * bw
+        res[key]['bw_agg'] = bw
+        res[key]['bw_dev'] = bw / 30
+        res[key]['bw_max'] = bw * 1.5
+        res[key]['bw_min'] = bw / 1.5
+        res[key]['bw_mean'] = bw
+        res[key]['clat'] = {'max': curr_ulat * 10, 'mean': curr_ulat,
+                            'min': curr_ulat / 2, 'stddev': curr_ulat}
+        res[key]['lat'] = res[key]['clat'].copy()
+        res[key]['slat'] = res[key]['clat'].copy()
+
+        parsed_out.append(res)
+
+    return zip(parsed_out, bconf)
+
+
+def do_run_fio(bconf):
+    benchmark_config = format_fio_config(bconf)
+    cmd = ["fio", "--output-format=json", "--alloc-size=262144", "-"]
+    p = subprocess.Popen(cmd, stdin=subprocess.PIPE,
+                         stdout=subprocess.PIPE,
+                         stderr=subprocess.STDOUT)
+
+    # set timeout
+    raw_out, _ = p.communicate(benchmark_config)
+
+    try:
+        parsed_out = json.loads(raw_out)["jobs"]
+    except KeyError:
+        msg = "Can't parse fio output {0!r}: no 'jobs' found"
+        raw_out = raw_out[:100]
+        raise ValueError(msg.format(raw_out))
+
+    except Exception as exc:
+        msg = "Can't parse fio output: {0!r}\nError: {1}"
+        raw_out = raw_out[:100]
+        raise ValueError(msg.format(raw_out, exc.message))
+
+    return zip(parsed_out, bconf)
+
+# limited by fio
+MAX_JOBS = 1000
+
+
+def next_test_portion(whole_conf, runcycle):
+    jcount = 0
+    runtime = 0
+    bconf = []
+
+    for pos, (name, sec) in enumerate(whole_conf):
+        jc = int(sec.get('numjobs', '1'))
+
+        if runcycle is not None:
+            curr_task_time = calculate_execution_time([(name, sec)])
+        else:
+            curr_task_time = 0
+
+        if jc > MAX_JOBS:
+            err_templ = "Can't process job {0!r} - too large numjobs"
+            raise ValueError(err_templ.format(name))
+
+        if runcycle is not None and len(bconf) != 0:
+            rc_ok = curr_task_time + runtime <= runcycle
+        else:
+            rc_ok = True
+
+        if jc + jcount <= MAX_JOBS and rc_ok:
+            runtime += curr_task_time
+            jcount += jc
+            bconf.append((name, sec))
+            if '_ramp_time' in sec:
+                del sec['_ramp_time']
+            continue
+
+        assert len(bconf) != 0
+        yield bconf
+
+        if '_ramp_time' in sec:
+            sec['ramp_time'] = sec.pop('_ramp_time')
+            curr_task_time = calculate_execution_time([(name, sec)])
+
+        runtime = curr_task_time
+        jcount = jc
+        bconf = [(name, sec)]
+
+    if bconf != []:
+        yield bconf
+
+
+def add_job_results(jname, job_output, jconfig, res):
+    if job_output['write']['iops'] != 0:
+        raw_result = job_output['write']
+    else:
+        raw_result = job_output['read']
+
+    if jname not in res:
+        j_res = {}
+        j_res["rw"] = jconfig["rw"]
+        j_res["sync_mode"] = get_test_sync_mode(jconfig)
+        j_res["concurence"] = int(jconfig.get("numjobs", 1))
+        j_res["blocksize"] = jconfig["blocksize"]
+        j_res["jobname"] = job_output["jobname"]
+        j_res["timings"] = [int(jconfig.get("runtime", 0)),
+                            int(jconfig.get("ramp_time", 0))]
+    else:
+        j_res = res[jname]
+        assert j_res["rw"] == jconfig["rw"]
+        assert j_res["rw"] == jconfig["rw"]
+        assert j_res["sync_mode"] == get_test_sync_mode(jconfig)
+        assert j_res["concurence"] == int(jconfig.get("numjobs", 1))
+        assert j_res["blocksize"] == jconfig["blocksize"]
+        assert j_res["jobname"] == job_output["jobname"]
+
+        # ramp part is skipped for all tests, except first
+        # assert j_res["timings"] == (jconfig.get("runtime"),
+        #                             jconfig.get("ramp_time"))
+
+    def j_app(name, x):
+        j_res.setdefault(name, []).append(x)
+
+    j_app("bw", raw_result["bw"])
+    j_app("iops", raw_result["iops"])
+    j_app("lat", raw_result["lat"]["mean"])
+    j_app("clat", raw_result["clat"]["mean"])
+    j_app("slat", raw_result["slat"]["mean"])
+
+    res[jname] = j_res
+
+
+def compile(benchmark_config, params, runcycle=None):
+    whole_conf = list(parse_fio_config_full(benchmark_config, params))
+    res = ""
+
+    for bconf in next_test_portion(whole_conf, runcycle):
+        res += format_fio_config(bconf)
+
+    return res
+
+
+def run_fio(benchmark_config,
+            params,
+            runcycle=None,
+            raw_results_func=None,
+            skip_tests=0,
+            fake_fio=False):
+
+    whole_conf = list(parse_fio_config_full(benchmark_config, params))
+    whole_conf = whole_conf[skip_tests:]
+    res = {}
+    curr_test_num = skip_tests
+    executed_tests = 0
+    ok = True
+    try:
+        for bconf in next_test_portion(whole_conf, runcycle):
+
+            if fake_fio:
+                res_cfg_it = do_run_fio_fake(bconf)
+            else:
+                res_cfg_it = do_run_fio(bconf)
+
+            res_cfg_it = enumerate(res_cfg_it, curr_test_num)
+
+            for curr_test_num, (job_output, (jname, jconfig)) in res_cfg_it:
+                executed_tests += 1
+                if raw_results_func is not None:
+                    raw_results_func(executed_tests,
+                                     [job_output, jname, jconfig])
+
+                assert jname == job_output["jobname"], \
+                    "{0} != {1}".format(jname, job_output["jobname"])
+
+                if jname.startswith('_'):
+                    continue
+
+                add_job_results(jname, job_output, jconfig, res)
+
+            msg_template = "Done {0} tests from {1}. ETA: {2}"
+            exec_time = estimate_cfg(benchmark_config, params, curr_test_num)
+
+            print msg_template.format(curr_test_num - skip_tests,
+                                      len(whole_conf),
+                                      sec_to_str(exec_time))
+
+    except (SystemExit, KeyboardInterrupt):
+        raise
+
+    except Exception:
+        print "=========== ERROR ============="
+        traceback.print_exc()
+        print "======== END OF ERROR ========="
+        ok = False
+
+    return res, executed_tests, ok
+
+
+def run_benchmark(binary_tp, *argv, **kwargs):
+    if 'fio' == binary_tp:
+        return run_fio(*argv, **kwargs)
+    raise ValueError("Unknown behcnmark {0}".format(binary_tp))
+
+
+def read_config(fd, timeout=10):
+    job_cfg = ""
+    etime = time.time() + timeout
+    while True:
+        wtime = etime - time.time()
+        if wtime <= 0:
+            raise IOError("No config provided")
+
+        r, w, x = select.select([fd], [], [], wtime)
+        if len(r) == 0:
+            raise IOError("No config provided")
+
+        char = fd.read(1)
+        if '' == char:
+            return job_cfg
+
+        job_cfg += char
+
+
+def estimate_cfg(job_cfg, params, skip_tests=0):
+    bconf = list(parse_fio_config_full(job_cfg, params))[skip_tests:]
+    return calculate_execution_time(bconf)
+
+
+def sec_to_str(seconds):
+    h = seconds // 3600
+    m = (seconds % 3600) // 60
+    s = seconds % 60
+    return "{0}:{1:02d}:{2:02d}".format(h, m, s)
+
+
+def parse_args(argv):
+    parser = argparse.ArgumentParser(
+        description="Run fio' and return result")
+    parser.add_argument("--type", metavar="BINARY_TYPE",
+                        choices=['fio'], default='fio',
+                        help=argparse.SUPPRESS)
+    parser.add_argument("--start-at", metavar="START_AT_UTC", type=int,
+                        help="Start execution at START_AT_UTC")
+    parser.add_argument("--json", action="store_true", default=False,
+                        help="Json output format")
+    parser.add_argument("--output", default='-', metavar="FILE_PATH",
+                        help="Store results to FILE_PATH")
+    parser.add_argument("--estimate", action="store_true", default=False,
+                        help="Only estimate task execution time")
+    parser.add_argument("--compile", action="store_true", default=False,
+                        help="Compile config file to fio config")
+    parser.add_argument("--num-tests", action="store_true", default=False,
+                        help="Show total number of tests")
+    parser.add_argument("--runcycle", type=int, default=None,
+                        metavar="MAX_CYCLE_SECONDS",
+                        help="Max cycle length in seconds")
+    parser.add_argument("--show-raw-results", action='store_true',
+                        default=False, help="Output raw input and results")
+    parser.add_argument("--skip-tests", type=int, default=0, metavar="NUM",
+                        help="Skip NUM tests")
+    parser.add_argument("--faked-fio", action='store_true',
+                        default=False, help="Emulate fio with 0 test time")
+    parser.add_argument("--params", nargs="*", metavar="PARAM=VAL",
+                        default=[],
+                        help="Provide set of pairs PARAM=VAL to" +
+                             "format into job description")
+    parser.add_argument("jobfile")
+    return parser.parse_args(argv)
+
+
+def main(argv):
+    argv_obj = parse_args(argv)
+
+    if argv_obj.jobfile == '-':
+        job_cfg = read_config(sys.stdin)
+    else:
+        job_cfg = open(argv_obj.jobfile).read()
+
+    if argv_obj.output == '-':
+        out_fd = sys.stdout
+    else:
+        out_fd = open(argv_obj.output, "w")
+
+    params = {}
+    for param_val in argv_obj.params:
+        assert '=' in param_val
+        name, val = param_val.split("=", 1)
+        params[name] = val
+
+    if argv_obj.estimate:
+        print sec_to_str(estimate_cfg(job_cfg, params))
+        return 0
+
+    if argv_obj.num_tests or argv_obj.compile:
+        bconf = list(parse_fio_config_full(job_cfg, params))
+        bconf = bconf[argv_obj.skip_tests:]
+
+        if argv_obj.compile:
+            out_fd.write(format_fio_config(bconf))
+            out_fd.write("\n")
+
+        if argv_obj.num_tests:
+            print len(bconf)
+
+        return 0
+
+    if argv_obj.start_at is not None:
+        ctime = time.time()
+        if argv_obj.start_at >= ctime:
+            time.sleep(ctime - argv_obj.start_at)
+
+    def raw_res_func(test_num, data):
+        pref = "========= RAW_RESULTS({0}) =========\n".format(test_num)
+        out_fd.write(pref)
+        out_fd.write(json.dumps(data))
+        out_fd.write("\n========= END OF RAW_RESULTS =========\n")
+        out_fd.flush()
+
+    rrfunc = raw_res_func if argv_obj.show_raw_results else None
+
+    stime = time.time()
+    job_res, num_tests, ok = run_benchmark(argv_obj.type,
+                                           job_cfg,
+                                           params,
+                                           argv_obj.runcycle,
+                                           rrfunc,
+                                           argv_obj.skip_tests,
+                                           argv_obj.faked_fio)
+    etime = time.time()
+
+    res = {'__meta__': {'raw_cfg': job_cfg, 'params': params}, 'res': job_res}
+
+    oformat = 'json' if argv_obj.json else 'eval'
+    out_fd.write("\nRun {0} tests in {1} seconds\n".format(num_tests,
+                                                           int(etime - stime)))
+    out_fd.write("========= RESULTS(format={0}) =========\n".format(oformat))
+    if argv_obj.json:
+        out_fd.write(json.dumps(res))
+    else:
+        out_fd.write(pprint.pformat(res) + "\n")
+    out_fd.write("\n========= END OF RESULTS =========\n")
+
+    return 0 if ok else 1
+
+
+def fake_main(x):
+    import yaml
+    time.sleep(60)
+    out_fd = sys.stdout
+    fname = "/tmp/perf_tests/metempirical_alisha/raw_results.yaml"
+    res = yaml.load(open(fname).read())[0][1]
+    out_fd.write("========= RESULTS(format=json) =========\n")
+    out_fd.write(json.dumps(res))
+    out_fd.write("\n========= END OF RESULTS =========\n")
+    return 0
+
+
+if __name__ == '__main__':
+    # exit(fake_main(sys.argv[1:]))
+    exit(main(sys.argv[1:]))
diff --git a/wally/suits/io/formatter.py b/wally/suits/io/formatter.py
new file mode 100644
index 0000000..529b78a
--- /dev/null
+++ b/wally/suits/io/formatter.py
@@ -0,0 +1,53 @@
+import texttable
+
+from wally.utils import ssize_to_b
+from wally.statistic import med_dev
+from wally.suits.io.agent import get_test_summary
+
+
+def key_func(k_data):
+    _, data = k_data
+
+    return (data['rw'],
+            data['sync_mode'],
+            ssize_to_b(data['blocksize']),
+            data['concurence'])
+
+
+def format_results_for_console(test_set):
+    """
+    create a table with io performance report
+    for console
+    """
+    tab = texttable.Texttable()
+    tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
+    tab.set_cols_align(["l", "r", "r", "r", "r"])
+
+    prev_k = None
+    items = sorted(test_set['res'].items(), key=key_func)
+
+    for test_name, data in items:
+        curr_k = key_func((test_name, data))[:3]
+
+        if prev_k is not None:
+            if prev_k != curr_k:
+                tab.add_row(["---"] * 5)
+
+        prev_k = curr_k
+
+        descr = get_test_summary(data)
+
+        iops, _ = med_dev(data['iops'])
+        bw, bwdev = med_dev(data['bw'])
+
+        # 3 * sigma
+        dev_perc = int((bwdev * 300) / bw)
+
+        params = (descr, int(iops), int(bw), dev_perc,
+                  int(med_dev(data['lat'])[0]) // 1000)
+        tab.add_row(params)
+
+    header = ["Description", "IOPS", "BW KiBps", "Dev * 3 %", "clat ms"]
+    tab.header(header)
+
+    return tab.draw()
diff --git a/wally/suits/io/io_scenario_check_distribution.cfg b/wally/suits/io/io_scenario_check_distribution.cfg
new file mode 100644
index 0000000..6ba3f9f
--- /dev/null
+++ b/wally/suits/io/io_scenario_check_distribution.cfg
@@ -0,0 +1,13 @@
+[distrubution_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize=4k
+rw=randwrite
+direct=1
+ramp_time=5
+buffered=0
+wait_for_previous
+filename={FILENAME}
+iodepth=1
+size=10Gb
+time_based
+runtime=30
+group_reporting
diff --git a/wally/suits/io/io_scenario_check_linearity.cfg b/wally/suits/io/io_scenario_check_linearity.cfg
new file mode 100644
index 0000000..4017cf3
--- /dev/null
+++ b/wally/suits/io/io_scenario_check_linearity.cfg
@@ -0,0 +1,29 @@
+[defaults]
+NUM_ROUNDS=7
+
+ramp_time=5
+buffered=0
+wait_for_previous
+filename={FILENAME}
+iodepth=1
+size=10Gb
+time_based
+runtime=30
+
+# ---------------------------------------------------------------------
+# check read and write linearity. oper_time = func(size)
+# ---------------------------------------------------------------------
+[linearity_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize={% 512, 1k, 4k, 8k, 16k, 32k, 64k, 128k, 256k, 512k, 1m, 2m, 4m %}
+rw={% randwrite, randread %}
+direct=1
+
+# ---------------------------------------------------------------------
+# check sync write linearity. oper_time = func(size)
+# check sync BW as well
+# ---------------------------------------------------------------------
+[linearity_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize={% 512, 1k, 4k, 8k, 16k, 32k, 64k, 128k, 256k, 512k, 1m, 2m, 4m %}
+rw=randwrite
+sync=1
+
diff --git a/wally/suits/io/io_scenario_check_th_count.cfg b/wally/suits/io/io_scenario_check_th_count.cfg
new file mode 100644
index 0000000..3d57154
--- /dev/null
+++ b/wally/suits/io/io_scenario_check_th_count.cfg
@@ -0,0 +1,46 @@
+[defaults]
+NUM_ROUNDS=7
+ramp_time=5
+buffered=0
+wait_for_previous
+filename={FILENAME}
+iodepth=1
+size=10Gb
+time_based
+runtime=30
+group_reporting
+numjobs={% 1, 2, 5, 10, 15, 20, 25, 30, 35, 40 %}
+
+# ---------------------------------------------------------------------
+# check different thread count. (latency, bw) = func(th_count)
+#
+#    RANDOM R IOPS, DIRECT, should act same as AS (4k + randread + sync)
+#    just faster. Not sure, that we need it
+# 4k + randread  + direct
+#
+#     RANDOM R/W IOPS
+# 4k + randread  + sync
+# 4k + randwrite + sync
+#
+#     LINEAR BW
+# 1m + write     + direct
+# 1m + read      + direct
+#
+# ---------------------------------------------------------------------
+[concurrence_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize=4k
+rw={% randread %}
+direct=1
+sync=0
+
+[concurrence_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize=4k
+rw=randwrite
+direct=0
+sync=1
+
+[concurrence_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize=1m
+rw={% write, read %}
+direct=1
+sync=0
diff --git a/wally/suits/io/io_scenario_check_vm_count_ec2.cfg b/wally/suits/io/io_scenario_check_vm_count_ec2.cfg
new file mode 100644
index 0000000..19c9e50
--- /dev/null
+++ b/wally/suits/io/io_scenario_check_vm_count_ec2.cfg
@@ -0,0 +1,29 @@
+[defaults]
+NUM_ROUNDS=7
+ramp_time=5
+buffered=0
+wait_for_previous
+filename={FILENAME}
+iodepth=1
+size=10Gb
+time_based
+runtime=30
+group_reporting
+rate={BW_LIMIT}
+rate_iops={IOPS_LIMIT}
+
+# ---------------------------------------------------------------------
+# check different thread count. (latency, bw) = func(th_count)
+# ---------------------------------------------------------------------
+[vm_count_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize={% 4k, 1m %}
+rw={% randwrite, randread %}
+direct=1
+numjobs={% 1, 2, 5, 10, 15, 20, 25, 30, 35, 40 %}
+
+[vm_count_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize={% 4k, 1m %}
+rw=randwrite
+direct=0
+sync=1
+numjobs={% 1, 2, 5, 10, 15, 20, 25, 30, 35, 40 %}
diff --git a/wally/suits/io/io_scenario_check_warmup.cfg b/wally/suits/io/io_scenario_check_warmup.cfg
new file mode 100644
index 0000000..6a9c622
--- /dev/null
+++ b/wally/suits/io/io_scenario_check_warmup.cfg
@@ -0,0 +1,33 @@
+[defaults]
+NUM_ROUNDS=7
+
+ramp_time=5
+buffered=0
+wait_for_previous
+filename={FILENAME}
+iodepth=1
+size=10Gb
+time_based
+runtime=30
+
+# ---------------------------------------------------------------------
+# check test time, no warmup time. iops = func(rune_time)
+# ---------------------------------------------------------------------
+[runtime_test_wo_wu_{TEST_SUMM}_{UNIQ} * {NUM_ROUNDS}]
+startdelay=10
+blocksize={% 4k, 1m %}
+rw=randwrite
+direct=1
+ramp_time=0
+runtime={% 10, 15, 20, 30, 60, 120 %}
+
+# ---------------------------------------------------------------------
+# check test time, with warmup time. iops = func(rune_time)
+# ---------------------------------------------------------------------
+[runtime_test_w_wu_{TEST_SUMM}_{UNIQ} * {NUM_ROUNDS}]
+startdelay=10
+blocksize={% 4k, 1m %}
+rw=randwrite
+direct=1
+ramp_time={% 5, 10, 15 %}
+runtime={% 15, 30 %}
diff --git a/wally/suits/io/io_scenario_hdd.cfg b/wally/suits/io/io_scenario_hdd.cfg
new file mode 100644
index 0000000..5e24009
--- /dev/null
+++ b/wally/suits/io/io_scenario_hdd.cfg
@@ -0,0 +1,50 @@
+[defaults]
+wait_for_previous
+group_reporting
+time_based
+buffered=0
+iodepth=1
+softrandommap=1
+filename={FILENAME}
+NUM_ROUNDS=7
+
+ramp_time=5
+size=10Gb
+runtime=30
+
+# ---------------------------------------------------------------------
+# check different thread count, sync mode. (latency, iops) = func(th_count)
+# ---------------------------------------------------------------------
+[hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize=4k
+rw=randwrite
+sync=1
+numjobs={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
+
+# ---------------------------------------------------------------------
+# check different thread count, direct read mode. (latency, iops) = func(th_count)
+# also check iops for randread
+# ---------------------------------------------------------------------
+[hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize=4k
+rw=randread
+direct=1
+numjobs={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
+
+# ---------------------------------------------------------------------
+# check different thread count, direct read/write mode. (bw, iops) = func(th_count)
+# also check BW for seq read/write.
+# ---------------------------------------------------------------------
+[hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize=1m
+rw={% read, write %}
+direct=1
+numjobs={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
+
+# ---------------------------------------------------------------------
+# check IOPS randwrite.
+# ---------------------------------------------------------------------
+[hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize=4k
+rw=randwrite
+direct=1
diff --git a/wally/suits/io/io_scenario_long_test.cfg b/wally/suits/io/io_scenario_long_test.cfg
new file mode 100644
index 0000000..4b0a79d
--- /dev/null
+++ b/wally/suits/io/io_scenario_long_test.cfg
@@ -0,0 +1,28 @@
+[defaults]
+
+# 24h test
+NUM_ROUNDS1=270
+NUM_ROUNDS2=261
+
+buffered=0
+wait_for_previous
+filename={FILENAME}
+iodepth=1
+size=50Gb
+time_based
+runtime=300
+
+# ---------------------------------------------------------------------
+# check read and write linearity. oper_time = func(size)
+# ---------------------------------------------------------------------
+[24h_test * {NUM_ROUNDS1}]
+blocksize=128k
+rw=randwrite
+direct=1
+runtime=30
+
+[24h_test * {NUM_ROUNDS2}]
+blocksize=128k
+rw=randwrite
+direct=1
+
diff --git a/wally/suits/io/results_loader.py b/wally/suits/io/results_loader.py
new file mode 100644
index 0000000..25721eb
--- /dev/null
+++ b/wally/suits/io/results_loader.py
@@ -0,0 +1,55 @@
+import re
+import json
+
+
+from wally.utils import ssize_to_b
+from wally.statistic import med_dev
+
+
+def parse_output(out_err):
+    start_patt = r"(?ims)=+\s+RESULTS\(format=json\)\s+=+"
+    end_patt = r"(?ims)=+\s+END OF RESULTS\s+=+"
+
+    for block in re.split(start_patt, out_err)[1:]:
+        data, garbage = re.split(end_patt, block)
+        yield json.loads(data.strip())
+
+    start_patt = r"(?ims)=+\s+RESULTS\(format=eval\)\s+=+"
+    end_patt = r"(?ims)=+\s+END OF RESULTS\s+=+"
+
+    for block in re.split(start_patt, out_err)[1:]:
+        data, garbage = re.split(end_patt, block)
+        yield eval(data.strip())
+
+
+def filter_data(name_prefix, fields_to_select, **filters):
+    def closure(data):
+        for result in data:
+            if name_prefix is not None:
+                if not result['jobname'].startswith(name_prefix):
+                    continue
+
+            for k, v in filters.items():
+                if result.get(k) != v:
+                    break
+            else:
+                yield map(result.get, fields_to_select)
+    return closure
+
+
+def load_data(raw_data):
+    data = list(parse_output(raw_data))[0]
+
+    for key, val in data['res'].items():
+        val['blocksize_b'] = ssize_to_b(val['blocksize'])
+
+        val['iops_mediana'], val['iops_stddev'] = med_dev(val['iops'])
+        val['bw_mediana'], val['bw_stddev'] = med_dev(val['bw'])
+        val['lat_mediana'], val['lat_stddev'] = med_dev(val['lat'])
+        yield val
+
+
+def load_files(*fnames):
+    for fname in fnames:
+        for i in load_data(open(fname).read()):
+            yield i
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
new file mode 100644
index 0000000..91e9dd5
--- /dev/null
+++ b/wally/suits/itest.py
@@ -0,0 +1,237 @@
+import abc
+import time
+import os.path
+import logging
+
+from wally.ssh_utils import copy_paths, run_over_ssh, delete_file
+from wally.utils import ssize_to_b, open_for_append_or_create, sec_to_str
+
+from . import postgres
+from .io import agent as io_agent
+from .io import formatter as io_formatter
+from .io.results_loader import parse_output
+
+
+logger = logging.getLogger("wally")
+
+
+class IPerfTest(object):
+    def __init__(self, on_result_cb, log_directory=None, node=None):
+        self.on_result_cb = on_result_cb
+        self.log_directory = log_directory
+        self.node = node
+
+    def pre_run(self, conn):
+        pass
+
+    def cleanup(self, conn):
+        pass
+
+    @abc.abstractmethod
+    def run(self, conn, barrier):
+        pass
+
+    @classmethod
+    def format_for_console(cls, data):
+        msg = "{0}.format_for_console".format(cls.__name__)
+        raise NotImplementedError(msg)
+
+
+class TwoScriptTest(IPerfTest):
+    remote_tmp_dir = '/tmp'
+
+    def __init__(self, opts, on_result_cb, log_directory=None, node=None):
+        IPerfTest.__init__(self, on_result_cb, log_directory, node=node)
+        self.opts = opts
+
+        if 'run_script' in self.opts:
+            self.run_script = self.opts['run_script']
+            self.prepare_script = self.opts['prepare_script']
+
+    def get_remote_for_script(self, script):
+        return os.path.join(self.tmp_dir, script.rpartition('/')[2])
+
+    def copy_script(self, conn, src):
+        remote_path = self.get_remote_for_script(src)
+        copy_paths(conn, {src: remote_path})
+        return remote_path
+
+    def pre_run(self, conn):
+        remote_script = self.copy_script(conn, self.pre_run_script)
+        cmd = remote_script
+        run_over_ssh(conn, cmd, node=self.node)
+
+    def run(self, conn, barrier):
+        remote_script = self.copy_script(conn, self.run_script)
+        cmd_opts = ' '.join(["%s %s" % (key, val) for key, val
+                             in self.opts.items()])
+        cmd = remote_script + ' ' + cmd_opts
+        out_err = run_over_ssh(conn, cmd, node=self.node)
+        self.on_result(out_err, cmd)
+
+    def parse_results(self, out):
+        for line in out.split("\n"):
+            key, separator, value = line.partition(":")
+            if key and value:
+                self.on_result_cb((key, float(value)))
+
+    def on_result(self, out_err, cmd):
+        try:
+            self.parse_results(out_err)
+        except Exception as exc:
+            msg_templ = "Error during postprocessing results: {0!r}. {1}"
+            raise RuntimeError(msg_templ.format(exc.message, out_err))
+
+
+class PgBenchTest(TwoScriptTest):
+    root = os.path.dirname(postgres.__file__)
+    prepare_script = os.path.join(root, "prepare.sh")
+    run_script = os.path.join(root, "run.sh")
+
+
+class IOPerfTest(IPerfTest):
+    io_py_remote = "/tmp/disk_test_agent.py"
+
+    def __init__(self, test_options, on_result_cb,
+                 log_directory=None, node=None):
+        IPerfTest.__init__(self, on_result_cb, log_directory, node=node)
+        self.options = test_options
+        self.config_fname = test_options['cfg']
+        self.alive_check_interval = test_options.get('alive_check_interval')
+        self.config_params = test_options.get('params', {})
+        self.tool = test_options.get('tool', 'fio')
+        self.raw_cfg = open(self.config_fname).read()
+        self.configs = list(io_agent.parse_fio_config_full(self.raw_cfg,
+                                                           self.config_params))
+
+        cmd_log = os.path.join(self.log_directory, "task_compiled.cfg")
+        raw_res = os.path.join(self.log_directory, "raw_results.txt")
+
+        fio_command_file = open_for_append_or_create(cmd_log)
+        fio_command_file.write(io_agent.compile(self.raw_cfg,
+                                                self.config_params,
+                                                None))
+        self.fio_raw_results_file = open_for_append_or_create(raw_res)
+
+    def cleanup(self, conn):
+        delete_file(conn, self.io_py_remote)
+        # Need to remove tempo files, used for testing
+
+    def pre_run(self, conn):
+        try:
+            run_over_ssh(conn, 'which fio', node=self.node)
+        except OSError:
+            # TODO: install fio, if not installed
+            cmd = "sudo apt-get -y install fio"
+
+            for i in range(3):
+                try:
+                    run_over_ssh(conn, cmd, node=self.node)
+                    break
+                except OSError as err:
+                    time.sleep(3)
+            else:
+                raise OSError("Can't install fio - " + err.message)
+
+        local_fname = io_agent.__file__.rsplit('.')[0] + ".py"
+        self.files_to_copy = {local_fname: self.io_py_remote}
+        copy_paths(conn, self.files_to_copy)
+
+        files = {}
+
+        for secname, params in self.configs:
+            sz = ssize_to_b(params['size'])
+            msz = msz = sz / (1024 ** 2)
+            if sz % (1024 ** 2) != 0:
+                msz += 1
+
+            fname = params['filename']
+            files[fname] = max(files.get(fname, 0), msz)
+
+        # logger.warning("dd run DISABLED")
+        # cmd_templ = "dd if=/dev/zero of={0} bs={1} count={2}"
+
+        cmd_templ = "sudo dd if=/dev/zero of={0} bs={1} count={2}"
+        for fname, sz in files.items():
+            cmd = cmd_templ.format(fname, 1024 ** 2, msz)
+            run_over_ssh(conn, cmd, timeout=msz, node=self.node)
+
+    def run(self, conn, barrier):
+        cmd_templ = "sudo env python2 {0} --type {1} {2} --json -"
+        # cmd_templ = "env python2 {0} --type {1} {2} --json -"
+
+        params = " ".join("{0}={1}".format(k, v)
+                          for k, v in self.config_params.items())
+
+        if "" != params:
+            params = "--params " + params
+
+        cmd = cmd_templ.format(self.io_py_remote, self.tool, params)
+        logger.debug("Waiting on barrier")
+
+        exec_time = io_agent.estimate_cfg(self.raw_cfg, self.config_params)
+        exec_time_str = sec_to_str(exec_time)
+
+        try:
+            if barrier.wait():
+                templ = "Test should takes about {0}. Will wait at most {1}"
+                timeout = int(exec_time * 1.1 + 300)
+                logger.info(templ.format(exec_time_str, sec_to_str(timeout)))
+
+            out_err = run_over_ssh(conn, cmd,
+                                   stdin_data=self.raw_cfg,
+                                   timeout=timeout,
+                                   node=self.node)
+            logger.info("Done")
+        finally:
+            barrier.exit()
+
+        self.on_result(out_err, cmd)
+
+    def on_result(self, out_err, cmd):
+        try:
+            for data in parse_output(out_err):
+                self.on_result_cb(data)
+        except Exception as exc:
+            msg_templ = "Error during postprocessing results: {0!r}"
+            raise RuntimeError(msg_templ.format(exc.message))
+
+    def merge_results(self, results):
+        if len(results) == 0:
+            return None
+
+        merged_result = results[0]
+        merged_data = merged_result['res']
+        expected_keys = set(merged_data.keys())
+        mergable_fields = ['bw', 'clat', 'iops', 'lat', 'slat']
+
+        for res in results[1:]:
+            assert res['__meta__'] == merged_result['__meta__']
+
+            data = res['res']
+            diff = set(data.keys()).symmetric_difference(expected_keys)
+
+            msg = "Difference: {0}".format(",".join(diff))
+            assert len(diff) == 0, msg
+
+            for testname, test_data in data.items():
+                res_test_data = merged_data[testname]
+
+                diff = set(test_data.keys()).symmetric_difference(
+                            res_test_data.keys())
+
+                msg = "Difference: {0}".format(",".join(diff))
+                assert len(diff) == 0, msg
+
+                for k, v in test_data.items():
+                    if k in mergable_fields:
+                        res_test_data[k].extend(v)
+                    else:
+                        msg = "{0!r} != {1!r}".format(res_test_data[k], v)
+                        assert res_test_data[k] == v, msg
+
+        return merged_result
+
+    @classmethod
+    def format_for_console(cls, data):
+        return io_formatter.format_results_for_console(data)
diff --git a/wally/suits/postgres/__init__.py b/wally/suits/postgres/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/wally/suits/postgres/__init__.py
diff --git a/wally/suits/postgres/prepare.sh b/wally/suits/postgres/prepare.sh
new file mode 100755
index 0000000..e7ca3bc
--- /dev/null
+++ b/wally/suits/postgres/prepare.sh
@@ -0,0 +1,19 @@
+#!/bin/bash
+set -e
+
+if [ ! -d /etc/postgresql ]; then
+    apt-get update
+    apt-get install -y postgresql postgresql-contrib
+    err=$(pg_createcluster 9.3 main --start 2>&1  /dev/null )
+    if [ $? -ne 0 ]; then
+        echo "There was an error while creating cluster"
+        exit 1
+    fi
+fi
+
+sed -i 's/^local\s\+all\s\+all\s\+peer/local all all trust/g' /etc/postgresql/9.3/main/pg_hba.conf
+sudo sed -i "s/#listen_addresses = 'localhost'/listen_addresses = '*'/g" /etc/postgresql/9.3/main/postgresql.conf
+
+service postgresql restart
+
+exit 0
\ No newline at end of file
diff --git a/wally/suits/postgres/run.sh b/wally/suits/postgres/run.sh
new file mode 100755
index 0000000..daad499
--- /dev/null
+++ b/wally/suits/postgres/run.sh
@@ -0,0 +1,47 @@
+#!/bin/bash
+set -e
+
+while [[ $# > 1 ]]
+do
+key="$1"
+
+case $key in
+    num_clients)
+    CLIENTS="$2"
+    shift
+    ;;
+    transactions_per_client)
+    TRANSACTINOS_PER_CLIENT="$2"
+    shift
+    ;;
+    *)
+    echo "Unknown option $key"
+    exit 1
+    ;;
+esac
+shift
+done
+
+CLIENTS=$(echo $CLIENTS | tr ',' '\n')
+TRANSACTINOS_PER_CLIENT=$(echo $TRANSACTINOS_PER_CLIENT | tr ',' '\n')
+
+
+sudo -u postgres createdb -O postgres pgbench &> /dev/null
+sudo -u postgres pgbench -i -U postgres pgbench &> /dev/null
+
+
+for num_clients in $CLIENTS; do
+    for trans_per_cl in $TRANSACTINOS_PER_CLIENT; do
+        tps_all=''
+        for i in 1 2 3 4 5 6 7 8 9 10; do
+            echo -n "$num_clients $trans_per_cl:"
+            sudo -u postgres pgbench -c $num_clients -n -t $trans_per_cl -j 4 -r -U postgres pgbench |
+            grep "(excluding connections establishing)" | awk {'print $3'}
+        done
+    done
+done
+
+sudo -u postgres dropdb pgbench &> /dev/null
+
+exit 0
+