large commit. refactoring, fio code totally reworker, huge improvenent in test time and results, etc
diff --git a/tests/disk_test_agent.py b/tests/disk_test_agent.py
new file mode 100644
index 0000000..7537438
--- /dev/null
+++ b/tests/disk_test_agent.py
@@ -0,0 +1,457 @@
+import sys
+import time
+import json
+import select
+import pprint
+import argparse
+import traceback
+import subprocess
+import itertools
+from collections import OrderedDict
+
+
+SECTION = 0
+SETTING = 1
+
+
+def get_test_summary(params):
+    rw = {"randread": "rr",
+          "randwrite": "rw",
+          "read": "sr",
+          "write": "sw"}[params["rw"]]
+
+    if params.get("direct") == '1':
+        sync_mode = 'd'
+    elif params.get("sync") == '1':
+        sync_mode = 's'
+    else:
+        sync_mode = 'a'
+
+    th_count = int(params.get('numjobs', '1'))
+
+    return "{0}{1}{2}th{3}".format(rw, sync_mode,
+                                   params['blocksize'], th_count)
+
+
+counter = [0]
+
+
+def process_section(name, vals, defaults, format_params):
+    vals = vals.copy()
+    params = format_params.copy()
+
+    if '*' in name:
+        name, repeat = name.split('*')
+        name = name.strip()
+        repeat = int(repeat.format(**params))
+    else:
+        repeat = 1
+
+    # this code can be optimized
+    for i in range(repeat):
+        iterable_names = []
+        iterable_values = []
+        processed_vals = {}
+
+        for val_name, val in vals.items():
+            if val is None:
+                processed_vals[val_name] = val
+            # remove hardcode
+            elif val.startswith('{%'):
+                assert val.endswith("%}")
+                content = val[2:-2].format(**params)
+                iterable_names.append(val_name)
+                iterable_values.append(i.strip() for i in content.split(','))
+            else:
+                processed_vals[val_name] = val.format(**params)
+
+        if iterable_values == []:
+            params['UNIQ'] = 'UN{0}'.format(counter[0])
+            counter[0] += 1
+            params['TEST_SUMM'] = get_test_summary(processed_vals)
+            yield name.format(**params), processed_vals
+        else:
+            for it_vals in itertools.product(*iterable_values):
+                processed_vals.update(dict(zip(iterable_names, it_vals)))
+                params['UNIQ'] = 'UN{0}'.format(counter[0])
+                counter[0] += 1
+                params['TEST_SUMM'] = get_test_summary(processed_vals)
+                yield name.format(**params), processed_vals
+
+
+def calculate_execution_time(combinations):
+    time = 0
+    for _, params in combinations:
+        time += int(params.get('ramp_time', 0))
+        time += int(params.get('runtime', 0))
+    return time
+
+
+def parse_fio_config_full(fio_cfg, params=None):
+    defaults = {}
+    format_params = {}
+
+    if params is None:
+        ext_params = {}
+    else:
+        ext_params = params.copy()
+
+    curr_section = None
+    curr_section_name = None
+
+    for tp, name, val in parse_fio_config_iter(fio_cfg):
+        if tp == SECTION:
+            non_def = curr_section_name != 'defaults'
+            if curr_section_name is not None and non_def:
+                format_params.update(ext_params)
+                for sec in process_section(curr_section_name,
+                                           curr_section,
+                                           defaults,
+                                           format_params):
+                    yield sec
+
+            if name == 'defaults':
+                curr_section = defaults
+            else:
+                curr_section = OrderedDict()
+                curr_section.update(defaults)
+            curr_section_name = name
+
+        else:
+            assert tp == SETTING
+            assert curr_section_name is not None, "no section name"
+            if name == name.upper():
+                assert curr_section_name == 'defaults'
+                format_params[name] = val
+            else:
+                curr_section[name] = val
+
+    if curr_section_name is not None and curr_section_name != 'defaults':
+        format_params.update(ext_params)
+        for sec in process_section(curr_section_name,
+                                   curr_section,
+                                   defaults,
+                                   format_params):
+            yield sec
+
+
+def parse_fio_config_iter(fio_cfg):
+    for lineno, line in enumerate(fio_cfg.split("\n")):
+        try:
+            line = line.strip()
+
+            if line.startswith("#") or line.startswith(";"):
+                continue
+
+            if line == "":
+                continue
+
+            if line.startswith('['):
+                assert line.endswith(']'), "name should ends with ]"
+                yield SECTION, line[1:-1], None
+            elif '=' in line:
+                opt_name, opt_val = line.split('=', 1)
+                yield SETTING, opt_name.strip(), opt_val.strip()
+            else:
+                yield SETTING, line, None
+        except Exception as exc:
+            pref = "During parsing line number {0}\n".format(lineno)
+            raise ValueError(pref + exc.message)
+
+
+def format_fio_config(fio_cfg):
+    res = ""
+    for pos, (name, section) in enumerate(fio_cfg):
+        if pos != 0:
+            res += "\n"
+
+        res += "[{0}]\n".format(name)
+        for opt_name, opt_val in section.items():
+            if opt_val is None:
+                res += opt_name + "\n"
+            else:
+                res += "{0}={1}\n".format(opt_name, opt_val)
+    return res
+
+
+def do_run_fio(bconf):
+    benchmark_config = format_fio_config(bconf)
+    cmd = ["fio", "--output-format=json", "-"]
+    p = subprocess.Popen(cmd, stdin=subprocess.PIPE,
+                         stdout=subprocess.PIPE,
+                         stderr=subprocess.STDOUT)
+
+    # set timeout
+    raw_out, _ = p.communicate(benchmark_config)
+
+    try:
+        parsed_out = json.loads(raw_out)["jobs"]
+    except Exception:
+        msg = "Can't parse fio output: {0!r}\nError: {1}"
+        raise ValueError(msg.format(raw_out, traceback.format_exc()))
+
+    return zip(parsed_out, bconf)
+
+
+# limited by fio
+MAX_JOBS = 1000
+
+
+def next_test_portion(whole_conf, runcycle):
+    jcount = 0
+    runtime = 0
+    bconf = []
+
+    for pos, (name, sec) in enumerate(whole_conf):
+        jc = int(sec.get('numjobs', '1'))
+
+        if runcycle is not None:
+            curr_task_time = calculate_execution_time([(name, sec)])
+        else:
+            curr_task_time = 0
+
+        if jc > MAX_JOBS:
+            err_templ = "Can't process job {0!r} - too large numjobs"
+            raise ValueError(err_templ.format(name))
+
+        if runcycle is not None and len(bconf) != 0:
+            rc_ok = curr_task_time + runtime <= runcycle
+        else:
+            rc_ok = True
+
+        if jc + jcount <= MAX_JOBS and rc_ok:
+            runtime += curr_task_time
+            jcount += jc
+            bconf.append((name, sec))
+            continue
+
+        assert len(bconf) != 0
+        yield bconf
+
+        runtime = curr_task_time
+        jcount = jc
+        bconf = [(name, sec)]
+
+    if bconf != []:
+        yield bconf
+
+
+def add_job_results(jname, job_output, jconfig, res):
+    if job_output['write']['iops'] != 0:
+        raw_result = job_output['write']
+    else:
+        raw_result = job_output['read']
+
+    if jname not in res:
+        j_res = {}
+        j_res["action"] = jconfig["rw"]
+        j_res["direct_io"] = jconfig.get("direct", "0") == "1"
+        j_res["sync"] = jconfig.get("sync", "0") == "1"
+        j_res["concurence"] = int(jconfig.get("numjobs", 1))
+        j_res["size"] = jconfig["size"]
+        j_res["jobname"] = job_output["jobname"]
+        j_res["timings"] = (jconfig.get("runtime"),
+                            jconfig.get("ramp_time"))
+    else:
+        j_res = res[jname]
+        assert j_res["action"] == jconfig["rw"]
+
+        assert j_res["direct_io"] == \
+            (jconfig.get("direct", "0") == "1")
+
+        assert j_res["sync"] == (jconfig.get("sync", "0") == "1")
+        assert j_res["concurence"] == int(jconfig.get("numjobs", 1))
+        assert j_res["size"] == jconfig["size"]
+        assert j_res["jobname"] == job_output["jobname"]
+        assert j_res["timings"] == (jconfig.get("runtime"),
+                                    jconfig.get("ramp_time"))
+
+    def j_app(name, x):
+        j_res.setdefault(name, []).append(x)
+
+    # 'bw_dev bw_mean bw_max bw_min'.split()
+    j_app("bw_mean", raw_result["bw_mean"])
+    j_app("iops", raw_result["iops"])
+    j_app("lat", raw_result["lat"]["mean"])
+    j_app("clat", raw_result["clat"]["mean"])
+    j_app("slat", raw_result["slat"]["mean"])
+
+    res[jname] = j_res
+
+
+def run_fio(benchmark_config,
+            params,
+            runcycle=None,
+            raw_results_func=None,
+            skip_tests=0):
+
+    whole_conf = list(parse_fio_config_full(benchmark_config, params))
+    whole_conf = whole_conf[skip_tests:]
+    res = {}
+    curr_test_num = skip_tests
+    execited_tests = 0
+    try:
+        for bconf in next_test_portion(whole_conf, runcycle):
+            res_cfg_it = do_run_fio(bconf)
+            res_cfg_it = enumerate(res_cfg_it, curr_test_num)
+
+            for curr_test_num, (job_output, (jname, jconfig)) in res_cfg_it:
+                execited_tests += 1
+                if raw_results_func is not None:
+                    raw_results_func(curr_test_num,
+                                     [job_output, jname, jconfig])
+
+                assert jname == job_output["jobname"]
+
+                if jname.startswith('_'):
+                    continue
+
+                add_job_results(jname, job_output, jconfig, res)
+
+    except (SystemExit, KeyboardInterrupt):
+        pass
+
+    except Exception:
+        traceback.print_exc()
+
+    return res, execited_tests
+
+
+def run_benchmark(binary_tp, *argv, **kwargs):
+    if 'fio' == binary_tp:
+        return run_fio(*argv, **kwargs)
+    raise ValueError("Unknown behcnmark {0}".format(binary_tp))
+
+
+def parse_args(argv):
+    parser = argparse.ArgumentParser(
+        description="Run fio' and return result")
+    parser.add_argument("--type", metavar="BINARY_TYPE",
+                        choices=['fio'], default='fio',
+                        help=argparse.SUPPRESS)
+    parser.add_argument("--start-at", metavar="START_AT_UTC", type=int,
+                        help="Start execution at START_AT_UTC")
+    parser.add_argument("--json", action="store_true", default=False,
+                        help="Json output format")
+    parser.add_argument("--output", default='-', metavar="FILE_PATH",
+                        help="Store results to FILE_PATH")
+    parser.add_argument("--estimate", action="store_true", default=False,
+                        help="Only estimate task execution time")
+    parser.add_argument("--compile", action="store_true", default=False,
+                        help="Compile config file to fio config")
+    parser.add_argument("--num-tests", action="store_true", default=False,
+                        help="Show total number of tests")
+    parser.add_argument("--runcycle", type=int, default=None,
+                        metavar="MAX_CYCLE_SECONDS",
+                        help="Max cycle length in seconds")
+    parser.add_argument("--show-raw-results", action='store_true',
+                        default=False, help="Output raw input and results")
+    parser.add_argument("--skip-tests", type=int, default=0, metavar="NUM",
+                        help="Skip NUM tests")
+    parser.add_argument("--params", nargs="*", metavar="PARAM=VAL",
+                        default=[],
+                        help="Provide set of pairs PARAM=VAL to" +
+                             "format into job description")
+    parser.add_argument("jobfile")
+    return parser.parse_args(argv)
+
+
+def read_config(fd, timeout=10):
+    job_cfg = ""
+    etime = time.time() + timeout
+    while True:
+        wtime = etime - time.time()
+        if wtime <= 0:
+            raise IOError("No config provided")
+
+        r, w, x = select.select([fd], [], [], wtime)
+        if len(r) == 0:
+            raise IOError("No config provided")
+
+        char = fd.read(1)
+        if '' == char:
+            return job_cfg
+
+        job_cfg += char
+
+
+def main(argv):
+    argv_obj = parse_args(argv)
+
+    if argv_obj.jobfile == '-':
+        job_cfg = read_config(sys.stdin)
+    else:
+        job_cfg = open(argv_obj.jobfile).read()
+
+    if argv_obj.output == '-':
+        out_fd = sys.stdout
+    else:
+        out_fd = open(argv_obj.output, "w")
+
+    params = {}
+    for param_val in argv_obj.params:
+        assert '=' in param_val
+        name, val = param_val.split("=", 1)
+        params[name] = val
+
+    if argv_obj.num_tests or argv_obj.compile or argv_obj.estimate:
+        bconf = list(parse_fio_config_full(job_cfg, params))
+        bconf = bconf[argv_obj.skip_tests:]
+
+        if argv_obj.compile:
+            out_fd.write(format_fio_config(bconf))
+            out_fd.write("\n")
+
+        if argv_obj.num_tests:
+            print len(bconf)
+
+        if argv_obj.estimate:
+            seconds = calculate_execution_time(bconf)
+
+            h = seconds // 3600
+            m = (seconds % 3600) // 60
+            s = seconds % 60
+
+            print "{0}:{1}:{2}".format(h, m, s)
+        return 0
+
+    if argv_obj.start_at is not None:
+        ctime = time.time()
+        if argv_obj.start_at >= ctime:
+            time.sleep(ctime - argv_obj.start_at)
+
+    def raw_res_func(test_num, data):
+        pref = "========= RAW_RESULTS({0}) =========\n".format(test_num)
+        out_fd.write(pref)
+        out_fd.write(json.dumps(data))
+        out_fd.write("\n========= END OF RAW_RESULTS =========\n")
+        out_fd.flush()
+
+    rrfunc = raw_res_func if argv_obj.show_raw_results else None
+
+    stime = time.time()
+    job_res, num_tests = run_benchmark(argv_obj.type,
+                                       job_cfg,
+                                       params,
+                                       argv_obj.runcycle,
+                                       rrfunc,
+                                       argv_obj.skip_tests)
+    etime = time.time()
+
+    res = {'__meta__': {'raw_cfg': job_cfg}, 'res': job_res}
+
+    oformat = 'json' if argv_obj.json else 'eval'
+    out_fd.write("\nRun {} tests in {} seconds\n".format(num_tests,
+                                                         int(etime - stime)))
+    out_fd.write("========= RESULTS(format={0}) =========\n".format(oformat))
+    if argv_obj.json:
+        out_fd.write(json.dumps(res))
+    else:
+        out_fd.write(pprint.pformat(res) + "\n")
+    out_fd.write("\n========= END OF RESULTS =========\n".format(oformat))
+
+    return 0
+
+
+if __name__ == '__main__':
+    exit(main(sys.argv[1:]))
diff --git a/tests/fio_configs/1.cfg b/tests/fio_configs/1.cfg
new file mode 100644
index 0000000..d5240cd
--- /dev/null
+++ b/tests/fio_configs/1.cfg
@@ -0,0 +1,100 @@
+[writetest_10 * 55]
+startdelay=10
+numjobs=1
+blocksize=4k
+filename=/media/koder/a5230078-4c27-4c3b-99aa-26148e78b2e7/xxx.bin
+rw=randwrite
+direct=1
+buffered=0
+iodepth=1
+size=1Gb
+runtime=10
+time_based
+wait_for_previous
+
+[writetest_20 * 55]
+startdelay=10
+numjobs=1
+blocksize=4k
+filename=/media/koder/a5230078-4c27-4c3b-99aa-26148e78b2e7/xxx.bin
+rw=randwrite
+direct=1
+buffered=0
+iodepth=1
+size=1Gb
+runtime=20
+time_based
+wait_for_previous
+
+[writetest_30 * 55]
+startdelay=10
+numjobs=1
+blocksize=4k
+filename=/media/koder/a5230078-4c27-4c3b-99aa-26148e78b2e7/xxx.bin
+rw=randwrite
+direct=1
+buffered=0
+iodepth=1
+size=1Gb
+runtime=30
+time_based
+wait_for_previous
+
+[writetest_120 * 55]
+startdelay=10
+numjobs=1
+blocksize=4k
+filename=/media/koder/a5230078-4c27-4c3b-99aa-26148e78b2e7/xxx.bin
+rw=randwrite
+direct=1
+buffered=0
+iodepth=1
+size=1Gb
+runtime=120
+time_based
+wait_for_previous
+
+[writetest_30_5 * 55]
+ramp_time=5
+startdelay=10
+numjobs=1
+blocksize=4k
+filename=/media/koder/a5230078-4c27-4c3b-99aa-26148e78b2e7/xxx.bin
+rw=randwrite
+direct=1
+buffered=0
+iodepth=1
+size=1Gb
+runtime=30
+time_based
+wait_for_previous
+
+[writetest_30_10 * 55]
+ramp_time=10
+startdelay=10
+numjobs=1
+blocksize=4k
+filename=/media/koder/a5230078-4c27-4c3b-99aa-26148e78b2e7/xxx.bin
+rw=randwrite
+direct=1
+buffered=0
+iodepth=1
+size=1Gb
+runtime=30
+time_based
+wait_for_previous
+
+[writetest_30_15 * 55]
+ramp_time=15
+startdelay=10
+numjobs=1
+blocksize=4k
+filename=/media/koder/a5230078-4c27-4c3b-99aa-26148e78b2e7/xxx.bin
+rw=randwrite
+direct=1
+buffered=0
+iodepth=1
+size=1Gb
+runtime=30
+time_based
+wait_for_previous
diff --git a/tests/fio_configs/2.cfg b/tests/fio_configs/2.cfg
new file mode 100644
index 0000000..050e477
--- /dev/null
+++ b/tests/fio_configs/2.cfg
@@ -0,0 +1,13 @@
+[writetest_10_20 * 3]
+ramp_time=5
+numjobs=1
+blocksize=4k
+filename={FILENAME}
+rw=randwrite
+direct=1
+buffered=0
+iodepth=1
+size=1Gb
+runtime=5
+time_based
+wait_for_previous
diff --git a/tests/io.py b/tests/io.py
deleted file mode 100644
index 2405f53..0000000
--- a/tests/io.py
+++ /dev/null
@@ -1,97 +0,0 @@
-import sys
-import time
-import json
-import select
-import pprint
-import argparse
-import subprocess
-from StringIO import StringIO
-from ConfigParser import RawConfigParser
-
-
-def run_fio(benchmark_config):
-    cmd = ["fio", "--output-format=json", "-"]
-    p = subprocess.Popen(cmd, stdin=subprocess.PIPE,
-                         stdout=subprocess.PIPE,
-                         stderr=subprocess.STDOUT)
-    raw_out, _ = p.communicate(benchmark_config)
-    job_output = json.loads(raw_out)["jobs"][0]
-
-    if job_output['write']['iops'] != 0:
-        raw_result = job_output['write']
-    else:
-        raw_result = job_output['read']
-
-    res = {}
-
-    # 'bw_dev bw_mean bw_max bw_min'.split()
-    for field in ["bw_mean", "iops"]:
-        res[field] = raw_result[field]
-
-    res["lat"] = raw_result["lat"]["mean"]
-    res["clat"] = raw_result["clat"]["mean"]
-    res["slat"] = raw_result["slat"]["mean"]
-    res["util"] = json.loads(raw_out)["disk_util"][0]
-
-    res["util"] = dict((str(k), v) for k, v in res["util"].items())
-
-    return res
-
-
-def run_benchmark(binary_tp, *argv, **kwargs):
-    if 'fio' == binary_tp:
-        return run_fio(*argv, **kwargs)
-    raise ValueError("Unknown behcnmark {0}".format(binary_tp))
-
-
-def parse_args(argv):
-    parser = argparse.ArgumentParser(
-        description="Run fio' and return result")
-    parser.add_argument(
-        "--type", metavar="BINARY_TYPE",
-        choices=['fio'], required=True)
-    parser.add_argument("--start-at", metavar="START_TIME", type=int)
-    parser.add_argument("--json", action="store_true", default=False)
-    parser.add_argument("jobfile")
-    return parser.parse_args(argv)
-
-
-def main(argv):
-    argv_obj = parse_args(argv)
-    if argv_obj.jobfile == '-':
-        job_cfg = ""
-        dtime = 10
-        while True:
-            r, w, x = select.select([sys.stdin], [], [], dtime)
-            if len(r) == 0:
-                raise IOError("No config provided")
-            char = sys.stdin.read(1)
-            if '' == char:
-                break
-            job_cfg += char
-            dtime = 1
-    else:
-        job_cfg = open(argv_obj.jobfile).read()
-
-    rcp = RawConfigParser()
-    rcp.readfp(StringIO(job_cfg))
-    assert len(rcp.sections()) == 1
-
-    if argv_obj.start_at is not None:
-        ctime = time.time()
-        if argv_obj.start_at >= ctime:
-            time.sleep(ctime - argv_obj.start_at)
-
-    res = run_benchmark(argv_obj.type, job_cfg)
-    res['__meta__'] = dict(rcp.items(rcp.sections()[0]))
-    res['__meta__']['raw'] = job_cfg
-
-    if argv_obj.json:
-        sys.stdout.write(json.dumps(res))
-    else:
-        sys.stdout.write(pprint.pformat(res))
-        sys.stdout.write("\n")
-    return 0
-
-if __name__ == '__main__':
-    exit(main(sys.argv[1:]))
diff --git a/tests/io_scenario_check_assumptions.cfg b/tests/io_scenario_check_assumptions.cfg
new file mode 100644
index 0000000..25d99bc
--- /dev/null
+++ b/tests/io_scenario_check_assumptions.cfg
@@ -0,0 +1,59 @@
+[defaults]
+NUM_ROUNDS=7
+
+ramp_time=5
+buffered=0
+wait_for_previous
+filename=/media/koder/a5230078-4c27-4c3b-99aa-26148e78b2e7/xxx.bin
+iodepth=1
+size=10Gb
+time_based
+runtime=30
+
+# ---------------------------------------------------------------------
+# check test time, no warmup time. iops = func(rune_time)
+# ---------------------------------------------------------------------
+[runtime_test_wo_wu_{TEST_SUMM}_{UNIQ} * {NUM_ROUNDS}]
+startdelay=10
+blocksize={% 4k, 1m %}
+rw=randwrite
+direct=1
+ramp_time=0
+runtime={% 10, 15, 20, 30, 60, 120 %}
+
+# ---------------------------------------------------------------------
+# check test time, with warmup time. iops = func(rune_time)
+# ---------------------------------------------------------------------
+[runtime_test_w_wu_{TEST_SUMM}_{UNIQ} * {NUM_ROUNDS}]
+startdelay=10
+blocksize={% 4k, 1m %}
+rw=randwrite
+direct=1
+ramp_time={% 5, 10, 15 %}
+runtime={% 15, 30 %}
+
+# ---------------------------------------------------------------------
+# check read and write linearity. oper_time = func(size)
+# ---------------------------------------------------------------------
+[linearity_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize={% 512, 1k, 4k, 8k, 16k, 32k, 64k, 128k, 256k, 512k, 1m, 2m, 4m %}
+rw={% randwrite, randread %}
+direct=1
+
+# ---------------------------------------------------------------------
+# check sync write linearity. oper_time = func(size)
+# check sync BW as well
+# ---------------------------------------------------------------------
+[linearity_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize={% 512, 1k, 4k, 8k, 16k, 32k, 64k, 128k, 256k, 512k, 1m, 2m, 4m %}
+rw=randread
+sync=1
+
+# ---------------------------------------------------------------------
+# check different thread count. (latency, bw) = func(th_count)
+# ---------------------------------------------------------------------
+[concurrence_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize={% 4k, 1m %}
+rw=randwrite
+direct=1
+numjobs={% 1, 2, 5, 10, 15, 20, 25, 30, 35, 40 %}
diff --git a/tests/io_task.cfg b/tests/io_task.cfg
new file mode 100644
index 0000000..cc3d07d
--- /dev/null
+++ b/tests/io_task.cfg
@@ -0,0 +1,10 @@
+[writetest]
+blocksize=4k
+filename=/tmp/xxx.bin
+rw=randwrite
+direct=1
+buffered=0
+ioengine=libaio
+iodepth=1
+size=1Gb
+runtime=5
diff --git a/tests/io_task_test.cfg b/tests/io_task_test.cfg
new file mode 100644
index 0000000..a319fa0
--- /dev/null
+++ b/tests/io_task_test.cfg
@@ -0,0 +1,24 @@
+# [__warmup]
+# blocksize=4k
+# filename=/tmp/xxx.bin
+# rw=randwrite
+# direct=1
+# buffered=0
+# iodepth=1
+# size=1Gb
+# runtime=5
+# time_based
+
+[writetest * 3]
+numjobs=4
+wait_for_previous
+ramp_time=5
+blocksize=4k
+filename=/tmp/xxx.bin
+rw=randwrite
+direct=1
+buffered=0
+iodepth=1
+size=100Mb
+runtime=10
+time_based
diff --git a/tests/itest.py b/tests/itest.py
new file mode 100644
index 0000000..e7fd3eb
--- /dev/null
+++ b/tests/itest.py
@@ -0,0 +1,155 @@
+import re
+import abc
+import json
+import os.path
+import logging
+from StringIO import StringIO
+from ConfigParser import RawConfigParser
+
+from tests import disk_test_agent
+from ssh_utils import copy_paths
+from utils import run_over_ssh, ssize_to_b
+
+
+logger = logging.getLogger("io-perf-tool")
+
+
+class IPerfTest(object):
+    def __init__(self, on_result_cb):
+        self.on_result_cb = on_result_cb
+
+    def pre_run(self, conn):
+        pass
+
+    @abc.abstractmethod
+    def run(self, conn, barrier):
+        pass
+
+
+class TwoScriptTest(IPerfTest):
+    def __init__(self, opts, testtool, on_result_cb, keep_tmp_files):
+        super(TwoScriptTest, self).__init__(on_result_cb)
+        self.opts = opts
+        self.pre_run_script = None
+        self.run_script = None
+        self.tmp_dir = "/tmp/"
+        self.set_run_script()
+        self.set_pre_run_script()
+
+    def set_run_script(self):
+        self.pre_run_script = self.opts.pre_run_script
+
+    def set_pre_run_script(self):
+        self.run_script = self.opts.run_script
+
+    def get_remote_for_script(self, script):
+        return os.path.join(self.tmp_dir, script.rpartition('/')[2])
+
+    def copy_script(self, conn, src):
+        remote_path = self.get_remote_for_script(src)
+        copy_paths(conn, {src: remote_path})
+        return remote_path
+
+    def pre_run(self, conn):
+        remote_script = self.copy_script(conn, self.pre_run_script)
+        cmd = remote_script
+        code, out_err = run_over_ssh(conn, cmd)
+        if code != 0:
+            raise Exception("Pre run failed. %s" % out_err)
+
+    def run(self, conn, barrier):
+        remote_script = self.copy_script(conn, self.run_script)
+        cmd = remote_script + ' ' + ' '.join(self.opts)
+        code, out_err = run_over_ssh(conn, cmd)
+        self.on_result(code, out_err, cmd)
+
+    def parse_results(self, out):
+        for line in out.split("\n"):
+            key, separator, value = line.partition(":")
+            if key and value:
+                self.on_result_cb((key, float(value)))
+
+    def on_result(self, code, out_err, cmd):
+        if 0 == code:
+            try:
+                self.parse_results(out_err)
+            except Exception as exc:
+                msg_templ = "Error during postprocessing results: {0!r}"
+                raise RuntimeError(msg_templ.format(exc.message))
+        else:
+            templ = "Command {0!r} failed with code {1}. Error output is:\n{2}"
+            logger.error(templ.format(cmd, code, out_err))
+
+
+class PgBenchTest(TwoScriptTest):
+
+    def set_run_script(self):
+        self.pre_run_script = "hl_tests/postgres/prepare.sh"
+
+    def set_pre_run_script(self):
+        self.run_script = "hl_tests/postgres/run.sh"
+
+
+class IOPerfTest(IPerfTest):
+    io_py_remote = "/tmp/disk_test_agent.py"
+
+    def __init__(self,
+                 test_options,
+                 on_result_cb):
+        IPerfTest.__init__(self, on_result_cb)
+        self.options = test_options
+        self.config_fname = test_options['cfg']
+        self.config_params = test_options.get('params', {})
+        self.tool = test_options.get('tool', 'fio')
+        self.raw_cfg = open(self.config_fname).read()
+
+        parse_func = disk_test_agent.parse_fio_config_full
+        self.configs = parse_func(self.raw_cfg, self.config_params)
+
+    def pre_run(self, conn):
+
+        # TODO: install fio, if not installed
+        run_over_ssh(conn, "apt-get -y install fio")
+
+        local_fname = disk_test_agent.__file__.rsplit('.')[0] + ".py"
+        self.files_to_copy = {local_fname: self.io_py_remote}
+        copy_paths(conn, self.files_to_copy)
+
+        cmd_templ = "dd if=/dev/zero of={0} bs={1} count={2}"
+        for secname, params in self.configs:
+            sz = ssize_to_b(params['size'])
+            msz = msz = sz / (1024 ** 2)
+            if sz % (1024 ** 2) != 0:
+                msz += 1
+
+            cmd = cmd_templ.format(params['filename'], 1024 ** 2, msz)
+            code, out_err = run_over_ssh(conn, cmd)
+
+        if code != 0:
+            raise RuntimeError("Preparation failed " + out_err)
+
+    def run(self, conn, barrier):
+        cmd_templ = "env python2 {0} --type {1} --json -"
+        cmd = cmd_templ.format(self.io_py_remote, self.tool)
+        logger.debug("Run {0}".format(cmd))
+        try:
+            barrier.wait()
+            code, out_err = run_over_ssh(conn, cmd, stdin_data=self.raw_cfg)
+            self.on_result(code, out_err, cmd)
+        finally:
+            barrier.exit()
+
+    def on_result(self, code, out_err, cmd):
+        if 0 == code:
+            try:
+                start_patt = r"(?ims)=+\s+RESULTS\(format=json\)\s+=+"
+                end_patt = r"(?ims)=+\s+END OF RESULTS\s+=+"
+                for block in re.split(start_patt, out_err)[1:]:
+                    data, garbage = re.split(end_patt, block)
+                    self.on_result_cb(json.loads(data.strip()))
+            except Exception as exc:
+                msg_templ = "Error during postprocessing results: {0!r}"
+                raise RuntimeError(msg_templ.format(exc.message))
+        else:
+            templ = "Command {0!r} failed with code {1}. Output is:\n{2}"
+            logger.error(templ.format(cmd, code, out_err))