large commit. new code, with sensors, line count dropped, etc
diff --git a/config.yaml b/config.yaml
index 3c27c90..b365042 100644
--- a/config.yaml
+++ b/config.yaml
@@ -12,7 +12,8 @@
     #             service: all
     #             auth: cirros:cubswin:)
 
-    ceph: root@172.16.53.2::/home/koder/.ssh/id_rsa.pub
+    # ceph: root@172.16.53.2::/home/koder/.ssh/id_rsa.pub
+    ceph: local
 
     # fuel:
     #     connection:
@@ -20,31 +21,39 @@
     #         creds: admin:admin@admin
     #     discover: controller
 
-start:
-    count: x1
-    img_name: TestVM
-    flavor_name: m1.micro
-    keypair_name: test
-    network_zone_name: net04
-    flt_ip_pool: net04_ext
-    key_file: path/to/
-    user: username
+explicit_nodes:
+    "ssh://192.168.152.43": testnode
+
+# start:
+#     count: x1
+#     img_name: TestVM
+#     flavor_name: m1.micro
+#     keypair_name: test
+#     network_zone_name: net04
+#     flt_ip_pool: net04_ext
+#     key_file: path/to/
+#     user: username
 
 
 # sensors to be installed, accordingli to role
 sensors:
-    ceph-osd: ceph-io, ceph-cpu, ceph-ram, ceph-net
-    ceph-mon: ceph-io, ceph-cpu, ceph-ram, ceph-net
-    os-compute: io, net
-    test-vm: system-cpu
+    receiver_uri: udp://192.168.0.108:5699
+    roles_mapping:
+        ceph-osd: block-io #ceph-io, ceph-cpu, ceph-ram, ceph-net
+        # ceph-mon: ceph-io, ceph-cpu, ceph-ram, ceph-net
+        os-compute: io, net
+        test-vm: system-cpu
 
 
 # tests to run
 tests:
-    pgbench:
-        opts:
-            num_clients: [4, 8, 12]
-            transactions: [1, 2, 3]
+    # pgbench:
+    #     opts:
+    #         num_clients: [4, 8, 12]
+    #         transactions: [1, 2, 3]
+    io: 
+        tool: fio
+        config_file: io_task.cfg
 
 # where to store results
 results:
diff --git a/fake_run_test.py b/fake_run_test.py
index 7797cfc..8dc772c 100644
--- a/fake_run_test.py
+++ b/fake_run_test.py
@@ -91,7 +91,6 @@
                  stdout=None,
                  stderr=None,
                  stdin=None):
-        print "Running subprocess command: %s" % cmd
         self.stdin, self.stdout, self.stderr = get_fake_out(cmd)
 
     def wait(self):
diff --git a/io_scenario/fio b/io_scenario/fio
deleted file mode 100755
index 4686a1f..0000000
--- a/io_scenario/fio
+++ /dev/null
Binary files differ
diff --git a/io_scenario/io.py b/io_scenario/io.py
deleted file mode 100644
index d698546..0000000
--- a/io_scenario/io.py
+++ /dev/null
@@ -1,617 +0,0 @@
-import re
-import os
-import sys
-import stat
-import time
-import json
-import Queue
-import os.path
-import argparse
-import warnings
-import subprocess
-
-
-class BenchmarkOption(object):
-    def __init__(self, concurence, iodepth, action, blocksize, size):
-        self.iodepth = iodepth
-        self.action = action
-        self.blocksize = blocksize
-        self.concurence = concurence
-        self.size = size
-        self.direct_io = False
-        self.use_hight_io_priority = True
-        self.sync = False
-
-
-def which(program):
-    def is_exe(fpath):
-        return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
-
-    fpath, fname = os.path.split(program)
-    if fpath:
-        if is_exe(program):
-            return program
-    else:
-        for path in os.environ["PATH"].split(os.pathsep):
-            path = path.strip('"')
-            exe_file = os.path.join(path, program)
-            if is_exe(exe_file):
-                return exe_file
-
-    return None
-
-
-# ------------------------------ IOZONE SUPPORT ------------------------------
-
-
-class IOZoneParser(object):
-    "class to parse iozone results"
-
-    start_tests = re.compile(r"^\s+KB\s+reclen\s+")
-    resuts = re.compile(r"[\s0-9]+")
-    mt_iozone_re = re.compile(r"\s+Children see throughput " +
-                              r"for\s+\d+\s+(?P<cmd>.*?)\s+=\s+" +
-                              r"(?P<perf>[\d.]+)\s+KB/sec")
-
-    cmap = {'initial writers': 'write',
-            'rewriters': 'rewrite',
-            'initial readers': 'read',
-            're-readers': 'reread',
-            'random readers': 'random read',
-            'random writers': 'random write',
-            'readers': 'read'}
-
-    string1 = "                           " + \
-              "                   random  random    " + \
-              "bkwd   record   stride                                   "
-
-    string2 = "KB  reclen   write rewrite    " + \
-              "read    reread    read   write    " + \
-              "read  rewrite     read   fwrite frewrite   fread  freread"
-
-    @classmethod
-    def apply_parts(cls, parts, string, sep=' \t\n'):
-        add_offset = 0
-        for part in parts:
-            _, start, stop = part
-            start += add_offset
-            add_offset = 0
-
-            # condition splited to make pylint happy
-            while stop + add_offset < len(string):
-
-                # condition splited to make pylint happy
-                if not (string[stop + add_offset] not in sep):
-                    break
-
-                add_offset += 1
-
-            yield part, string[start:stop + add_offset]
-
-    @classmethod
-    def make_positions(cls):
-        items = [i for i in cls.string2.split() if i]
-
-        pos = 0
-        cls.positions = []
-
-        for item in items:
-            npos = cls.string2.index(item, 0 if pos == 0 else pos + 1)
-            cls.positions.append([item, pos, npos + len(item)])
-            pos = npos + len(item)
-
-        for itm, val in cls.apply_parts(cls.positions, cls.string1):
-            if val.strip():
-                itm[0] = val.strip() + " " + itm[0]
-
-    @classmethod
-    def parse_iozone_res(cls, res, mthreads=False):
-        parsed_res = None
-
-        sres = res.split('\n')
-
-        if not mthreads:
-            for pos, line in enumerate(sres[1:]):
-                if line.strip() == cls.string2 and \
-                            sres[pos].strip() == cls.string1.strip():
-                    add_pos = line.index(cls.string2)
-                    parsed_res = {}
-
-                    npos = [(name, start + add_pos, stop + add_pos)
-                            for name, start, stop in cls.positions]
-
-                    for itm, res in cls.apply_parts(npos, sres[pos + 2]):
-                        if res.strip() != '':
-                            parsed_res[itm[0]] = int(res.strip())
-
-                    del parsed_res['KB']
-                    del parsed_res['reclen']
-        else:
-            parsed_res = {}
-            for line in sres:
-                rr = cls.mt_iozone_re.match(line)
-                if rr is not None:
-                    cmd = rr.group('cmd')
-                    key = cls.cmap.get(cmd, cmd)
-                    perf = int(float(rr.group('perf')))
-                    parsed_res[key] = perf
-        return parsed_res
-
-
-IOZoneParser.make_positions()
-
-
-def iozone_do_prepare(params, filename, pattern_base):
-    all_files = []
-    threads = int(params.concurence)
-    if 1 != threads:
-        filename = filename + "_{0}"
-        all_files.extend(filename .format(i) for i in range(threads))
-    else:
-        all_files.append(filename)
-
-    bsz = 1024 if params.size > 1024 else params.size
-    if params.size % bsz != 0:
-        fsz = (params.size // bsz + 1) * bsz
-    else:
-        fsz = params.size
-
-    for fname in all_files:
-        with open(fname, "wb") as fd:
-            if fsz > 1024:
-                pattern = pattern_base * 1024 * 1024
-                for _ in range(int(fsz / 1024) + 1):
-                    fd.write(pattern)
-            else:
-                fd.write(pattern_base * 1024 * fsz)
-            fd.flush()
-    return all_files
-
-
-VERIFY_PATTERN = "\x6d"
-
-
-def prepare_iozone(params, filename):
-    return iozone_do_prepare(params, filename, VERIFY_PATTERN)
-
-
-def do_run_iozone(params, timeout, all_files,
-                  iozone_path='iozone',
-                  microsecond_mode=False,
-                  prepare_only=False):
-
-    cmd = [iozone_path, "-V", str(ord(VERIFY_PATTERN))]
-
-    if params.sync:
-        cmd.append('-o')
-
-    if params.direct_io:
-        cmd.append('-I')
-
-    if microsecond_mode:
-        cmd.append('-N')
-
-    threads = int(params.concurence)
-    if 1 != threads:
-        cmd.extend(('-t', str(threads), '-F'))
-        cmd.extend(all_files)
-    else:
-        cmd.extend(('-f', all_files[0]))
-
-    cmd.append('-i')
-
-    if params.action == 'write':
-        cmd.append("0")
-    elif params.action == 'read':
-        cmd.append("1")
-    elif params.action == 'randwrite' or params.action == 'randread':
-        cmd.append("2")
-    else:
-        raise ValueError("Unknown action {0!r}".format(params.action))
-
-    cmd.extend(('-s', str(params.size)))
-    cmd.extend(('-r', str(params.blocksize)))
-
-    # no retest
-    cmd.append('-+n')
-
-    raw_res = subprocess.check_output(cmd)
-
-    try:
-        parsed_res = IOZoneParser.parse_iozone_res(raw_res, threads > 1)
-
-        res = {}
-
-        if params.action == 'write':
-            res['bw_mean'] = parsed_res['write']
-        elif params.action == 'randwrite':
-            res['bw_mean'] = parsed_res['random write']
-        elif params.action == 'read':
-            res['bw_mean'] = parsed_res['read']
-        elif params.action == 'randread':
-            res['bw_mean'] = parsed_res['random read']
-    except:
-        raise
-
-    return res, " ".join(cmd)
-
-
-def run_iozone(benchmark, binary_path, all_files,
-               timeout=None):
-
-    if timeout is not None:
-        benchmark.size = benchmark.blocksize * 50
-        res_time = do_run_iozone(benchmark, timeout, all_files,
-                                 iozone_path=binary_path,
-                                 microsecond_mode=True)[0]
-
-        size = (benchmark.blocksize * timeout * 1000000)
-        size /= res_time["bw_mean"]
-        size = (size // benchmark.blocksize + 1) * benchmark.blocksize
-        benchmark.size = size
-
-    return do_run_iozone(benchmark, timeout, all_files,
-                         iozone_path=binary_path)
-
-
-def install_iozone_package():
-    if which('iozone'):
-        return
-
-    is_redhat = os.path.exists('/etc/centos-release')
-    is_redhat = is_redhat or os.path.exists('/etc/fedora-release')
-    is_redhat = is_redhat or os.path.exists('/etc/redhat-release')
-
-    if is_redhat:
-        subprocess.check_output(["yum", "install", 'iozone3'])
-        return
-
-    try:
-        os_release_cont = open('/etc/os-release').read()
-
-        is_ubuntu = "Ubuntu" in os_release_cont
-
-        if is_ubuntu or "Debian GNU/Linux" in os_release_cont:
-            subprocess.check_output(["apt-get", "install", "iozone3"])
-            return
-    except (IOError, OSError):
-        pass
-
-    raise RuntimeError("Unknown host OS.")
-
-
-def install_iozone_static(iozone_url, dst):
-    if not os.path.isfile(dst):
-        import urllib
-        urllib.urlretrieve(iozone_url, dst)
-
-    st = os.stat(dst)
-    os.chmod(dst, st.st_mode | stat.S_IEXEC)
-
-
-def locate_iozone():
-    binary_path = which('iozone')
-
-    if binary_path is None:
-        binary_path = which('iozone3')
-
-    if binary_path is None:
-        sys.stderr.write("Can't found neither iozone not iozone3 binary"
-                         "Provide --binary-path or --binary-url option")
-        return None
-
-    return binary_path
-
-# ------------------------------ FIO SUPPORT ---------------------------------
-
-
-def run_fio_once(benchmark, fio_path, tmpname, timeout=None):
-    if benchmark.size is None:
-        raise ValueError("You need to specify file size for fio")
-
-    cmd_line = [fio_path,
-                "--name=%s" % benchmark.action,
-                "--rw=%s" % benchmark.action,
-                "--blocksize=%sk" % benchmark.blocksize,
-                "--iodepth=%d" % benchmark.iodepth,
-                "--filename=%s" % tmpname,
-                "--size={0}k".format(benchmark.size),
-                "--numjobs={0}".format(benchmark.concurence),
-                "--output-format=json",
-                "--sync=" + ('1' if benchmark.sync else '0')]
-
-    if timeout is not None:
-        cmd_line.append("--timeout=%d" % timeout)
-        cmd_line.append("--runtime=%d" % timeout)
-
-    if benchmark.direct_io:
-        cmd_line.append("--direct=1")
-
-    if benchmark.use_hight_io_priority:
-        cmd_line.append("--prio=0")
-
-    raw_out = subprocess.check_output(cmd_line)
-    return json.loads(raw_out)["jobs"][0], " ".join(cmd_line)
-
-
-def run_fio(benchmark, binary_path, all_files, timeout=None):
-    job_output, cmd_line = run_fio_once(benchmark, binary_path,
-                                        all_files[0], timeout)
-
-    if benchmark.action in ('write', 'randwrite'):
-        raw_result = job_output['write']
-    else:
-        raw_result = job_output['read']
-
-    res = {}
-
-    # 'bw_dev bw_mean bw_max bw_min'.split()
-    for field in ["bw_mean", "iops"]:
-        res[field] = raw_result[field]
-    res["lat"] = raw_result["lat"]["mean"]
-    res["clat"] = raw_result["clat"]["mean"]
-    res["slat"] = raw_result["slat"]["mean"]
-
-    return res, cmd_line
-
-
-def locate_fio():
-    return which('fio')
-
-
-# ----------------------------------------------------------------------------
-
-
-def locate_binary(binary_tp, binary_path):
-    if binary_path is not None:
-        if os.path.isfile(binary_path):
-            if not os.access(binary_path, os.X_OK):
-                st = os.stat(binary_path)
-                os.chmod(binary_path, st.st_mode | stat.S_IEXEC)
-        else:
-            binary_path = None
-
-    if binary_path is not None:
-        return binary_path
-
-    if 'iozone' == binary_tp:
-        return locate_iozone()
-    else:
-        return locate_fio()
-
-
-def run_benchmark(binary_tp, *argv, **kwargs):
-    if 'iozone' == binary_tp:
-        return run_iozone(*argv, **kwargs)
-    else:
-        return run_fio(*argv, **kwargs)
-
-
-def prepare_benchmark(binary_tp, *argv, **kwargs):
-    if 'iozone' == binary_tp:
-        return {'all_files': prepare_iozone(*argv, **kwargs)}
-    else:
-        return {}
-
-
-def type_size(string):
-    try:
-        return re.match("\d+[KGBM]?", string, re.I).group(0)
-    except:
-        msg = "{0!r} don't looks like size-description string".format(string)
-        raise ValueError(msg)
-
-
-def type_size_ext(string):
-    if string.startswith("x"):
-        int(string[1:])
-        return string
-
-    if string.startswith("r"):
-        int(string[1:])
-        return string
-
-    try:
-        return re.match("\d+[KGBM]?", string, re.I).group(0)
-    except:
-        msg = "{0!r} don't looks like size-description string".format(string)
-        raise ValueError(msg)
-
-
-def ssize_to_kb(ssize):
-    try:
-        smap = dict(k=1, K=1, M=1024, m=1024, G=1024**2, g=1024**2)
-        for ext, coef in smap.items():
-            if ssize.endswith(ext):
-                return int(ssize[:-1]) * coef
-
-        if int(ssize) % 1024 != 0:
-            raise ValueError()
-
-        return int(ssize) / 1024
-
-    except (ValueError, TypeError, AttributeError):
-        tmpl = "Unknow size format {0!r} (or size not multiples 1024)"
-        raise ValueError(tmpl.format(ssize))
-
-
-def get_ram_size():
-    try:
-        with open("/proc/meminfo") as fd:
-            for ln in fd:
-                if "MemTotal:" in ln:
-                    sz, kb = ln.split(':')[1].strip().split(" ")
-                    assert kb == 'kB'
-                    return int(sz)
-    except (ValueError, TypeError, AssertionError):
-        raise
-        # return None
-
-
-def parse_args(argv):
-    parser = argparse.ArgumentParser(
-        description="Run 'iozone' or 'fio' and return result")
-    parser.add_argument(
-        "--type", metavar="BINARY_TYPE",
-        choices=['iozone', 'fio'], required=True)
-    parser.add_argument(
-        "--iodepth", metavar="IODEPTH", type=int,
-        help="I/O requests queue depths", required=True)
-    parser.add_argument(
-        '-a', "--action", metavar="ACTION", type=str,
-        help="actions to run", required=True,
-        choices=["read", "write", "randread", "randwrite"])
-    parser.add_argument(
-        "--blocksize", metavar="BLOCKSIZE", type=type_size,
-        help="single operation block size", required=True)
-    parser.add_argument(
-        "--timeout", metavar="TIMEOUT", type=int,
-        help="runtime of a single run", default=None)
-    parser.add_argument(
-        "--iosize", metavar="SIZE", type=type_size_ext,
-        help="file size", default=None)
-    parser.add_argument(
-        "-s", "--sync", default=False, action="store_true",
-        help="exec sync after each write")
-    parser.add_argument(
-        "-d", "--direct-io", default=False, action="store_true",
-        help="use O_DIRECT", dest='directio')
-    parser.add_argument(
-        "-t", "--sync-time", default=None, type=int, metavar="UTC_TIME",
-        help="sleep till sime utc time", dest='sync_time')
-    parser.add_argument(
-        "--test-file", help="file path to run test on",
-        default=None, dest='test_file')
-    parser.add_argument(
-        "--binary-path", help="path to binary, used for testing",
-        default=None, dest='binary_path')
-    parser.add_argument(
-        "--prepare-only", default=False, action="store_true")
-    parser.add_argument("--concurrency", default=1, type=int)
-
-    parser.add_argument("--preparation-results", default="{}")
-
-    return parser.parse_args(argv)
-
-
-def sensor_thread(sensor_list, cmd_q, data_q):
-    while True:
-        try:
-            cmd_q.get(timeout=0.5)
-            data_q.put([])
-            return
-        except Queue.Empty:
-            pass
-
-
-def clean_prep(preparation_results):
-    if 'all_files' in preparation_results:
-        for fname in preparation_results['all_files']:
-            if os.path.isfile(fname):
-                os.unlink(fname)
-
-
-def main(argv):
-    if argv[0] == '--clean':
-        clean_prep(json.loads(argv[1]))
-        return 0
-
-    argv_obj = parse_args(argv)
-    argv_obj.blocksize = ssize_to_kb(argv_obj.blocksize)
-
-    if argv_obj.iosize is not None:
-        if argv_obj.iosize.startswith('x'):
-            argv_obj.iosize = argv_obj.blocksize * int(argv_obj.iosize[1:])
-        elif argv_obj.iosize.startswith('r'):
-            rs = get_ram_size()
-            if rs is None:
-                sys.stderr.write("Can't determine ram size\n")
-                exit(1)
-            argv_obj.iosize = rs * int(argv_obj.iosize[1:])
-        else:
-            argv_obj.iosize = ssize_to_kb(argv_obj.iosize)
-
-    benchmark = BenchmarkOption(argv_obj.concurrency,
-                                argv_obj.iodepth,
-                                argv_obj.action,
-                                argv_obj.blocksize,
-                                argv_obj.iosize)
-
-    benchmark.direct_io = argv_obj.directio
-
-    if argv_obj.sync:
-        benchmark.sync = True
-
-    if argv_obj.prepare_only:
-        test_file_name = argv_obj.test_file
-        if test_file_name is None:
-            with warnings.catch_warnings():
-                warnings.simplefilter("ignore")
-                test_file_name = os.tmpnam()
-
-        fnames = prepare_benchmark(argv_obj.type,
-                                   benchmark,
-                                   test_file_name)
-        print json.dumps(fnames)
-    else:
-        preparation_results = json.loads(argv_obj.preparation_results)
-
-        if not isinstance(preparation_results, dict):
-            raise ValueError("preparation_results should be a dict" +
-                             " with all-string keys")
-
-        for key in preparation_results:
-            if not isinstance(key, basestring):
-                raise ValueError("preparation_results should be a dict" +
-                                 " with all-string keys")
-
-        if argv_obj.test_file and 'all_files' in preparation_results:
-            raise ValueError("Either --test-file or --preparation-results" +
-                             " options should be provided, not both")
-
-        if argv_obj.test_file is not None:
-            preparation_results['all_files'] = [argv_obj.test_file]
-
-        autoremove = False
-        if 'all_files' not in preparation_results:
-            with warnings.catch_warnings():
-                warnings.simplefilter("ignore")
-                preparation_results['all_files'] = [os.tmpnam()]
-                autoremove = True
-
-        binary_path = locate_binary(argv_obj.type,
-                                    argv_obj.binary_path)
-
-        if binary_path is None:
-            sys.stderr.write("Can't locate binary {0}\n".format(argv_obj.type))
-            return 1
-
-        try:
-            if argv_obj.sync_time is not None:
-                dt = argv_obj.sync_time - time.time()
-                if dt > 0:
-                    time.sleep(dt)
-
-            res, cmd = run_benchmark(argv_obj.type,
-                                     benchmark=benchmark,
-                                     binary_path=binary_path,
-                                     timeout=argv_obj.timeout,
-                                     **preparation_results)
-
-            res['__meta__'] = benchmark.__dict__.copy()
-            res['__meta__']['cmdline'] = cmd
-
-            import pprint
-            sys.stdout.write(pprint.pformat(res))
-
-            # sys.stdout.write(json.dumps(res))
-            if not argv_obj.prepare_only:
-                sys.stdout.write("\n")
-
-        finally:
-            if autoremove:
-                clean_prep(preparation_results)
-
-
-if __name__ == '__main__':
-    exit(main(sys.argv[1:]))
diff --git a/io_scenario/iozone b/io_scenario/iozone
deleted file mode 100755
index 5edce95..0000000
--- a/io_scenario/iozone
+++ /dev/null
Binary files differ
diff --git a/io_task.cfg b/io_task.cfg
new file mode 100644
index 0000000..cc3d07d
--- /dev/null
+++ b/io_task.cfg
@@ -0,0 +1,10 @@
+[writetest]
+blocksize=4k
+filename=/tmp/xxx.bin
+rw=randwrite
+direct=1
+buffered=0
+ioengine=libaio
+iodepth=1
+size=1Gb
+runtime=5
diff --git a/itest.py b/itest.py
index 2e782ee..daebd1a 100644
--- a/itest.py
+++ b/itest.py
@@ -1,12 +1,13 @@
 import abc
 import json
-import types
 import os.path
 import logging
+from StringIO import StringIO
+from ConfigParser import RawConfigParser
 
-from io_scenario import io
+from tests import io
 from ssh_utils import copy_paths
-from utils import run_over_ssh
+from utils import run_over_ssh, ssize_to_b
 
 
 logger = logging.getLogger("io-perf-tool")
@@ -14,19 +15,13 @@
 
 class IPerfTest(object):
     def __init__(self, on_result_cb):
-        self.set_result_cb(on_result_cb)
-
-    def set_result_cb(self, on_result_cb):
         self.on_result_cb = on_result_cb
 
-    def build(self, conn):
-        self.pre_run(conn)
-
     def pre_run(self, conn):
         pass
 
     @abc.abstractmethod
-    def run(self, conn):
+    def run(self, conn, barrier):
         pass
 
 
@@ -57,15 +52,15 @@
     def pre_run(self, conn):
         remote_script = self.copy_script(conn, self.pre_run_script)
         cmd = remote_script
-        code, out, err = run_over_ssh(conn, cmd)
+        code, out_err = run_over_ssh(conn, cmd)
         if code != 0:
-            raise Exception("Pre run failed. %s" % err)
+            raise Exception("Pre run failed. %s" % out_err)
 
-    def run(self, conn):
+    def run(self, conn, barrier):
         remote_script = self.copy_script(conn, self.run_script)
         cmd = remote_script + ' ' + ' '.join(self.opts)
-        code, out, err = run_over_ssh(conn, cmd)
-        self.on_result(code, out, err, cmd)
+        code, out_err = run_over_ssh(conn, cmd)
+        self.on_result(code, out_err, cmd)
 
     def parse_results(self, out):
         for line in out.split("\n"):
@@ -73,16 +68,16 @@
             if key and value:
                 self.on_result_cb((key, float(value)))
 
-    def on_result(self, code, out, err, cmd):
+    def on_result(self, code, out_err, cmd):
         if 0 == code:
             try:
-                self.parse_results(out)
-            except Exception as err:
+                self.parse_results(out_err)
+            except Exception as exc:
                 msg_templ = "Error during postprocessing results: {0!r}"
-                raise RuntimeError(msg_templ.format(err.message))
+                raise RuntimeError(msg_templ.format(exc.message))
         else:
             templ = "Command {0!r} failed with code {1}. Error output is:\n{2}"
-            logger.error(templ.format(cmd, code, err))
+            logger.error(templ.format(cmd, code, out_err))
 
 
 class PgBenchTest(TwoScriptTest):
@@ -94,68 +89,89 @@
         self.run_script = "hl_tests/postgres/run.sh"
 
 
-def run_test_iter(obj, conn):
-    logger.debug("Run preparation")
-    yield obj.pre_run(conn)
-    logger.debug("Run test")
-    res = obj.run(conn)
-    if isinstance(res, types.GeneratorType):
-        for vl in res:
-            yield vl
-    else:
-        yield res
-
-
 class IOPerfTest(IPerfTest):
+    io_py_remote = "/tmp/io.py"
+
     def __init__(self,
-                 script_opts,
-                 testtool_local,
-                 on_result_cb,
-                 keep_tmp_files):
-
+                 test_options,
+                 on_result_cb):
         IPerfTest.__init__(self, on_result_cb)
+        self.options = test_options
+        self.config_fname = test_options['config_file']
+        self.tool = test_options['tool']
+        self.configs = []
 
-        dst_testtool_path = '/tmp/io_tool'
-        self.script_opts = script_opts + ["--binary-path", dst_testtool_path]
-        io_py_local = os.path.join(os.path.dirname(io.__file__), "io.py")
-        self.io_py_remote = "/tmp/io.py"
+        cp = RawConfigParser()
+        cp.readfp(open(self.config_fname))
 
-        self.files_to_copy = {testtool_local: dst_testtool_path,
-                              io_py_local: self.io_py_remote}
+        for secname in cp.sections():
+            params = dict(cp.items(secname))
+            self.configs.append((secname, params))
 
     def pre_run(self, conn):
+        local_fname = io.__file__.rsplit('.')[0] + ".py"
+        self.files_to_copy = {local_fname: self.io_py_remote}
         copy_paths(conn, self.files_to_copy)
 
-        args = ['env', 'python2', self.io_py_remote] + \
-            self.script_opts + ['--prepare-only']
+        cmd_templ = "dd if=/dev/zero of={0} bs={1} count={2}"
+        for secname, params in self.configs:
+            sz = ssize_to_b(params['size'])
+            msz = msz = sz / (1024 ** 2)
+            if sz % (1024 ** 2) != 0:
+                msz += 1
 
-        code, self.prep_results, err = run_over_ssh(conn, " ".join(args))
+            cmd = cmd_templ.format(params['filename'], 1024 ** 2, msz)
+            code, out_err = run_over_ssh(conn, cmd)
+
         if code != 0:
-            raise RuntimeError("Preparation failed " + err)
+            raise RuntimeError("Preparation failed " + out_err)
 
-    def run(self, conn):
-        args = ['env', 'python2', self.io_py_remote] + self.script_opts
-        args.append('--preparation-results')
-        args.append("'{0}'".format(self.prep_results))
-        cmd = " ".join(args)
-        code, out, err = run_over_ssh(conn, cmd)
-        self.on_result(code, out, err, cmd)
-        args = ['env', 'python2', self.io_py_remote, '--clean',
-                "'{0}'".format(self.prep_results)]
-        logger.debug(" ".join(args))
-        code, _, err = run_over_ssh(conn, " ".join(args))
-        if 0 != code:
-            logger.error("Cleaning failed: " + err)
+    def run(self, conn, barrier):
+        cmd_templ = "env python2 {0} --type {1} --json -"
+        cmd = cmd_templ.format(self.io_py_remote, self.tool)
+        try:
+            for secname, _params in self.configs:
+                params = _params.copy()
+                count = params.pop('count', 1)
 
-    def on_result(self, code, out, err, cmd):
+                config = RawConfigParser()
+                config.add_section(secname)
+
+                for k, v in params.items():
+                    config.set(secname, k, v)
+
+                cfg = StringIO()
+                config.write(cfg)
+
+                # FIX python config parser-fio incompatibility
+                # remove spaces around '='
+                new_cfg = []
+                config_data = cfg.getvalue()
+                for line in config_data.split("\n"):
+                    if '=' in line:
+                        name, val = line.split('=', 1)
+                        name = name.strip()
+                        val = val.strip()
+                        line = "{0}={1}".format(name, val)
+                    new_cfg.append(line)
+
+                for _ in range(count):
+                    barrier.wait()
+                    code, out_err = run_over_ssh(conn, cmd,
+                                                 stdin_data="\n".join(new_cfg))
+                    self.on_result(code, out_err, cmd)
+        finally:
+            barrier.exit()
+
+    def on_result(self, code, out_err, cmd):
         if 0 == code:
             try:
-                for line in out.split("\n"):
+                for line in out_err.split("\n"):
                     if line.strip() != "":
                         self.on_result_cb(json.loads(line))
-            except Exception as err:
+            except Exception as exc:
                 msg_templ = "Error during postprocessing results: {0!r}"
-                raise RuntimeError(msg_templ.format(err.message))
+                raise RuntimeError(msg_templ.format(exc.message))
         else:
-            templ = "Command {0!r} failed with code {1}. Error output is:\n{2}"
-            logger.error(templ.format(cmd, code, err))
+            templ = "Command {0!r} failed with code {1}. Output is:\n{2}"
+            logger.error(templ.format(cmd, code, out_err))
diff --git a/keystone.py b/keystone.py
index e6308b5..3ae5e7a 100644
--- a/keystone.py
+++ b/keystone.py
@@ -75,7 +75,7 @@
             self.keystone.authenticate()
             self.headers['X-Auth-Token'] = self.keystone.auth_token
         except exceptions.AuthorizationFailure:
-            print 'asdfsf'
+            raise
 
     def do(self, method, path, params=None):
         """Do request. If gets 401 refresh token"""
diff --git a/nodes/ceph.py b/nodes/ceph.py
index 38ce177..793c021 100644
--- a/nodes/ceph.py
+++ b/nodes/ceph.py
@@ -1,6 +1,7 @@
 """ Collect data about ceph nodes"""
 import json
 import logging
+import subprocess
 
 
 from node import Node
@@ -10,14 +11,29 @@
 logger = logging.getLogger("io-perf-tool")
 
 
-def discover_ceph_node(ip):
+def local_execute(cmd):
+    return subprocess.check_output(cmd, stderr=subprocess.STDOUT, shell=True)
+
+
+def ssh_execute(ssh):
+    def closure(cmd):
+        _, chan, _ = ssh.exec_command(cmd)
+        return chan.read()
+    return closure
+
+
+def discover_ceph_nodes(ip):
     """ Return list of ceph's nodes ips """
     ips = {}
-    ssh = connect(ip)
 
-    osd_ips = get_osds_ips(ssh, get_osds_list(ssh))
-    mon_ips = get_mons_or_mds_ips(ssh, "mon")
-    mds_ips = get_mons_or_mds_ips(ssh, "mds")
+    if ip != 'local':
+        executor = ssh_execute(connect(ip))
+    else:
+        executor = local_execute
+
+    osd_ips = get_osds_ips(executor, get_osds_list(executor))
+    mon_ips = get_mons_or_mds_ips(executor, "mon")
+    mds_ips = get_mons_or_mds_ips(executor, "mds")
 
     for ip in osd_ips:
         url = "ssh://%s" % ip
@@ -31,29 +47,24 @@
         url = "ssh://%s" % ip
         ips.setdefault(url, []).append("ceph-mds")
 
-    return [Node(ip=url, roles=list(roles)) for url, roles in ips.items()]
+    return [Node(url, list(roles)) for url, roles in ips.items()]
 
 
-def get_osds_list(ssh):
+def get_osds_list(executor):
     """ Get list of osds id"""
-    _, chan, _ = ssh.exec_command("ceph osd ls")
-    return filter(None, chan.read().split("\n"))
+    return filter(None, executor("ceph osd ls").split("\n"))
 
 
-def get_mons_or_mds_ips(ssh, who):
+def get_mons_or_mds_ips(executor, who):
     """ Return mon ip list
         :param who - "mon" or "mds" """
-    if who == "mon":
-        _, chan, _ = ssh.exec_command("ceph mon dump")
-    elif who == "mds":
-        _, chan, _ = ssh.exec_command("ceph mds dump")
-    else:
+    if who not in ("mon", "mds"):
         raise ValueError(("'%s' in get_mons_or_mds_ips instead" +
                           "of mon/mds") % who)
 
-    line_res = chan.read().split("\n")
-    ips = set()
+    line_res = executor("ceph {0} dump".format(who)).split("\n")
 
+    ips = set()
     for line in line_res:
         fields = line.split()
 
@@ -67,12 +78,12 @@
     return ips
 
 
-def get_osds_ips(ssh, osd_list):
+def get_osds_ips(executor, osd_list):
     """ Get osd's ips
         :param osd_list - list of osd names from osd ls command"""
     ips = set()
     for osd_id in osd_list:
-        _, chan, _ = ssh.exec_command("ceph osd find {0}".format(osd_id))
-        ip = json.loads(chan.read())["ip"]
-        ips.add(ip.split(":")[0])
+        out = executor("ceph osd find {0}".format(osd_id))
+        ip = json.loads(out)["ip"]
+        ips.add(str(ip.split(":")[0]))
     return ips
diff --git a/nodes/discover.py b/nodes/discover.py
index ec98890..b95d306 100644
--- a/nodes/discover.py
+++ b/nodes/discover.py
@@ -50,5 +50,6 @@
             nodes_to_run.extend(fuel.discover_fuel_nodes(url, creads, roles))
 
         if cluster == "ceph":
-            nodes_to_run.extend(ceph.discover_ceph_node(cluster_info["ip"]))
+            nodes_to_run.extend(ceph.discover_ceph_nodes(cluster_info))
+
     return nodes_to_run
diff --git a/nodes/node.py b/nodes/node.py
index d7cae40..138d123 100644
--- a/nodes/node.py
+++ b/nodes/node.py
@@ -1,41 +1,16 @@
 class Node(object):
 
-    def __init__(self, ip, roles, username=None,
-                 password=None, key_path=None, port=None):
+    def __init__(self, conn_url, roles):
         self.roles = roles
-        self.ip = ip
-        self.username = username
-        self.password = password
-        self.port = port
-        self.key_path = key_path
+        self.conn_url = conn_url
         self.connection = None
 
     def __str__(self):
-        return "<Node: url={0!r} roles={1} >".format(self.ip,
-                                                     ", ".join(self.roles))
+        templ = "<Node: url={conn_url!r} roles={roles}" + \
+                " connected={is_connected}>"
+        return templ.format(conn_url=self.conn_url,
+                            roles=", ".join(self.roles),
+                            is_connected=self.connection is not None)
 
     def __repr__(self):
         return str(self)
-
-    def set_conn_attr(self, name, value):
-        setattr(self, name, value)
-
-    @property
-    def connection_url(self):
-        connection = []
-
-        if self.username:
-            connection.append(self.username)
-            if self.password:
-                connection.extend([":", self.password, "@"])
-            connection.append("@")
-
-        connection.append(self.ip)
-        if self.port:
-            connection.extend([":", self.port])
-            if self.key_path:
-                connection.extend([":", self.key_path])
-        else:
-            if self.key_path:
-                connection.extend([":", ":", self.key_path])
-        return "".join(connection)
diff --git a/report.py b/report.py
index 4d23b60..aaaaa53 100644
--- a/report.py
+++ b/report.py
@@ -2,7 +2,7 @@
 from collections import OrderedDict
 
 from chart import charts
-from utils import ssize_to_kb
+from utils import ssize_to_b
 
 
 OPERATIONS = (('async', ('randwrite asynchronous', 'randread asynchronous',
@@ -87,7 +87,7 @@
 
             OD = OrderedDict
             ordered_build_results = OD(sorted(build_results.items(),
-                                       key=lambda t: ssize_to_kb(t[0])))
+                                       key=lambda t: ssize_to_b(t[0])))
 
             if not scale_x:
                 scale_x = ordered_build_results.keys()
diff --git a/run_test.py b/run_test.py
index 25da0ab..d1a9f4f 100755
--- a/run_test.py
+++ b/run_test.py
@@ -1,120 +1,36 @@
-import os
 import sys
-import json
-import time
+import Queue
 import pprint
 import logging
-import os.path
 import argparse
+import threading
+import collections
 
+from concurrent.futures import ThreadPoolExecutor
+
+import utils
 import ssh_utils
-import io_scenario
 from nodes import discover
+from nodes.node import Node
 from config import cfg_dict
-from utils import log_error
-from rest_api import add_test
-from formatters import get_formatter
 from itest import IOPerfTest, PgBenchTest
 
+from sensors.api import start_monitoring
+
+
 logger = logging.getLogger("io-perf-tool")
-logger.setLevel(logging.DEBUG)
-ch = logging.StreamHandler()
-ch.setLevel(logging.DEBUG)
-logger.addHandler(ch)
-
-log_format = '%(asctime)s - %(levelname)s - %(name)s - %(message)s'
-formatter = logging.Formatter(log_format,
-                              "%H:%M:%S")
-ch.setFormatter(formatter)
 
 
-tool_type_mapper = {
-    "iozone": IOPerfTest,
-    "fio": IOPerfTest,
-    "pgbench": PgBenchTest,
-}
+def setup_logger(logger, level=logging.DEBUG):
+    logger.setLevel(level)
+    ch = logging.StreamHandler()
+    ch.setLevel(level)
+    logger.addHandler(ch)
 
-
-def run_io_test(tool,
-                script_args,
-                test_runner,
-                keep_temp_files=False):
-
-    files_dir = os.path.dirname(io_scenario.__file__)
-
-    path = 'iozone' if 'iozone' == tool else 'fio'
-    src_testtool_path = os.path.join(files_dir, path)
-
-    obj_cls = tool_type_mapper[tool]
-    obj = obj_cls(script_args,
-                  src_testtool_path,
-                  None,
-                  keep_temp_files)
-
-    return test_runner(obj)
-
-
-def conn_func(obj, barrier, latest_start_time, conn):
-    try:
-        test_iter = itest.run_test_iter(obj, conn)
-        next(test_iter)
-
-        wait_on_barrier(barrier, latest_start_time)
-
-        with log_error("!Run test"):
-            return next(test_iter)
-    except:
-        print traceback.format_exc()
-        raise
-
-
-def parse_args(argv):
-    parser = argparse.ArgumentParser(
-        description="Run disk io performance test")
-
-    parser.add_argument("-l", dest='extra_logs',
-                        action='store_true', default=False,
-                        help="print some extra log info")
-
-    parser.add_argument('stages', nargs="+",
-                        choices=["discover", "connect", "start_new_nodes",
-                                 "deploy_sensors"])
-
-    # THIS ALL MOVE TO CONFIG FILE
-    # parser.add_argument("-o", "--test-opts", dest='opts',
-    #                     help="cmd line options for test")
-
-    # parser.add_argument("-f", "--test-opts-file", dest='opts_file',
-    #                     type=argparse.FileType('r'), default=None,
-    #                     help="file with cmd line options for test")
-
-    # parser.add_argument("--max-preparation-time", default=300,
-    #                     type=int, dest="max_preparation_time")
-
-    # parser.add_argument("-b", "--build-info", default=None,
-    #                     dest="build_name")
-
-    # parser.add_argument("-d", "--data-server-url", default=None,
-    #                     dest="data_server_url")
-
-    # parser.add_argument("-n", "--lab-name", default=None,
-    #                     dest="lab_name")
-
-    # parser.add_argument("--create-vms-opts", default=None,
-    #                     help="Creating vm's before run ssh runner",
-    #                     dest="create_vms_opts")
-
-    # parser.add_argument("-k", "--keep", default=False,
-    #                     help="keep temporary files",
-    #                     dest="keep_temp_files", action='store_true')
-
-    # parser.add_argument("--runner", required=True,
-    #                     choices=["local", "ssh"], help="runner type")
-
-    # parser.add_argument("--runner-extra-opts", default=None,
-    #                     dest="runner_opts", help="runner extra options")
-
-    return parser.parse_args(argv[1:])
+    log_format = '%(asctime)s - %(levelname)-6s - %(name)s - %(message)s'
+    formatter = logging.Formatter(log_format,
+                                  "%H:%M:%S")
+    ch.setFormatter(formatter)
 
 
 def format_result(res, formatter):
@@ -127,59 +43,167 @@
 
 def connect_one(node):
     try:
-        node.connection = ssh_utils.connect(node.connection_url)
+        ssh_pref = "ssh://"
+        if node.conn_url.startswith(ssh_pref):
+            url = node.conn_url[len(ssh_pref):]
+            node.connection = ssh_utils.connect(url)
+        else:
+            raise ValueError("Unknown url type {0}".format(node.conn_url))
     except Exception:
-        logger.exception()
+        logger.exception("During connect to {0}".format(node))
 
 
 def connect_all(nodes):
+    logger.info("Connecting to nodes")
+    with ThreadPoolExecutor(32) as pool:
+        list(pool.map(connect_one, nodes))
+
+
+def save_sensors_data(q):
+    logger.info("Start receiving sensors data")
+    while True:
+        val = q.get()
+        if val is None:
+            break
+        # logger.debug("Sensors -> {0!r}".format(val))
+    logger.info("Sensors thread exits")
+
+
+def test_thread(test, node, barrier):
+    try:
+        logger.debug("Run preparation for {0}".format(node.conn_url))
+        test.pre_run(node.connection)
+        logger.debug("Run test for {0}".format(node.conn_url))
+        test.run(node.connection, barrier)
+    except:
+        logger.exception("In test {0} for node {1}".format(test, node))
+
+
+def run_tests(config, nodes):
+    tool_type_mapper = {
+        "io": IOPerfTest,
+        "pgbench": PgBenchTest,
+    }
+
+    test_nodes = [node for node in nodes
+                  if 'testnode' in node.roles]
+
+    res_q = Queue.Queue()
+
+    for name, params in config['tests'].items():
+        logger.info("Starting {0} tests".format(name))
+
+        threads = []
+        barrier = utils.Barrier(len(test_nodes))
+        for node in test_nodes:
+            msg = "Starting {0} test on {1} node"
+            logger.debug(msg.format(name, node.conn_url))
+            test = tool_type_mapper[name](params, res_q.put)
+            th = threading.Thread(None, test_thread, None,
+                                  (test, node, barrier))
+            threads.append(th)
+            th.daemon = True
+            th.start()
+
+        for th in threads:
+            th.join()
+
+        while not res_q.empty():
+            logger.info("Get test result {0!r}".format(res_q.get()))
+
+
+def parse_args(argv):
+    parser = argparse.ArgumentParser(
+        description="Run disk io performance test")
+
+    parser.add_argument("-l", dest='extra_logs',
+                        action='store_true', default=False,
+                        help="print some extra log info")
+
+    parser.add_argument('stages', nargs="+",
+                        choices=["discover", "connect", "start_new_nodes",
+                                 "deploy_sensors", "run_tests"])
+
+    return parser.parse_args(argv[1:])
+
+
+def log_nodes_statistic(nodes):
+    logger.info("Found {0} nodes total".format(len(nodes)))
+    per_role = collections.defaultdict(lambda: 0)
+    for node in nodes:
+        for role in node.roles:
+            per_role[role] += 1
+
+    for role, count in sorted(per_role.items()):
+        logger.debug("Found {0} nodes with role {1}".format(count, role))
+
+
+def log_sensors_config(cfg):
     pass
 
 
 def main(argv):
-    logging_conf = cfg_dict.get('logging')
-    if logging_conf:
-        if logging_conf.get('extra_logs'):
-            logger.setLevel(logging.DEBUG)
-            ch.setLevel(logging.DEBUG)
-
     opts = parse_args(argv)
+
+    level = logging.DEBUG if opts.extra_logs else logging.WARNING
+    setup_logger(logger, level)
+
+    nodes = []
+
     if 'discover' in opts.stages:
-        current_data = discover.discover(cfg_dict.get('discover'))
+        logger.info("Start node discovery")
+        nodes = discover.discover(cfg_dict.get('discover'))
+
+    if 'explicit_nodes' in cfg_dict:
+        for url, roles in cfg_dict['explicit_nodes'].items():
+            nodes.append(Node(url, roles.split(",")))
+
+    log_nodes_statistic(nodes)
 
     if 'connect' in opts.stages:
-        for node in current_data:
-            pass
+        connect_all(nodes)
 
-    print "\n".join(map(str, current_data))
+    if 'deploy_sensors' in opts.stages:
+        logger.info("Deploing sensors")
+        cfg = cfg_dict.get('sensors')
+        sens_cfg = []
+
+        for role, sensors_str in cfg["roles_mapping"].items():
+            sensors = [sens.strip() for sens in sensors_str.split(",")]
+
+            collect_cfg = dict((sensor, {}) for sensor in sensors)
+
+            for node in nodes:
+                if role in node.roles:
+                    sens_cfg.append((node.connection, collect_cfg))
+
+        log_sensors_config(sens_cfg)
+
+        sensor_cm = start_monitoring(cfg["receiver_uri"], None,
+                                     connected_config=sens_cfg)
+
+        with sensor_cm as sensors_control_queue:
+            th = threading.Thread(None, save_sensors_data, None,
+                                  (sensors_control_queue,))
+            th.daemon = True
+            th.start()
+
+            # TODO: wait till all nodes start to send sensors data
+
+            if 'run_tests' in opts.stages:
+                run_tests(cfg_dict, nodes)
+
+            sensors_control_queue.put(None)
+            th.join()
+    elif 'run_tests' in opts.stages:
+        run_tests(cfg_dict, nodes)
+
+    logger.info("Disconnecting")
+    for node in nodes:
+        node.connection.close()
+
     return 0
 
-    # tests = cfg_dict.get("tests", [])
-
-    # Deploy and start sensors
-    # deploy_and_start_sensors(cfg_dict.get('sensors'), nodes_to_run)
-
-    # for test_name, opts in tests.items():
-    #     cmd_line = " ".join(opts['opts'])
-    #     logger.debug("Run test with {0!r} params".format(cmd_line))
-    #     latest_start_time = 300 + time.time()
-    #     uris = [node.connection_url for node in nodes_to_run]
-    #     runner = ssh_runner.get_ssh_runner(uris, conn_func,
-    #                                        latest_start_time,
-    #                                        opts.get('keep_temp_files'))
-    #     res = run_io_test(test_name,
-    #                       opts['opts'],
-    #                       runner,
-    #                       opts.get('keep_temp_files'))
-    #     logger.debug(format_result(res, get_formatter(test_name)))
-
-    # if cfg_dict.get('data_server_url'):
-    #     result = json.loads(get_formatter(opts.tool_type)(res))
-    #     result['name'] = opts.build_name
-    #     add_test(opts.build_name, result, opts.data_server_url)
-
-    # return 0
-
 
 if __name__ == '__main__':
     exit(main(sys.argv))
diff --git a/sensors/api.py b/sensors/api.py
index f78e6a9..dc34af0 100644
--- a/sensors/api.py
+++ b/sensors/api.py
@@ -17,6 +17,7 @@
             data_q.put(proto.recv(0.1))
         except Timeout:
             pass
+
         try:
             val = cmd_q.get(False)
 
@@ -28,8 +29,9 @@
 
 
 @contextmanager
-def start_monitoring(uri, config):
-    deploy_and_start_sensors(uri, config)
+def start_monitoring(uri, config=None, connected_config=None):
+    deploy_and_start_sensors(uri, config=config,
+                             connected_config=connected_config)
     try:
         data_q = Queue.Queue()
         cmd_q = Queue.Queue()
@@ -44,4 +46,5 @@
             cmd_q.put(None)
             th.join()
     finally:
-        stop_and_remove_sensors(config)
+        stop_and_remove_sensors(config,
+                                connected_config=connected_config)
diff --git a/sensors/deploy_sensors.py b/sensors/deploy_sensors.py
index b2dc3f1..e0428d9 100644
--- a/sensors/deploy_sensors.py
+++ b/sensors/deploy_sensors.py
@@ -1,33 +1,50 @@
 import time
 import json
 import os.path
+import logging
 
 from concurrent.futures import ThreadPoolExecutor, wait
 
 from disk_perf_test_tool.ssh_utils import connect, copy_paths
 
+logger = logging.getLogger('io-perf-tool')
+
 
 def wait_all_ok(futures):
     return all(future.result() for future in futures)
 
 
-def deploy_and_start_sensors(monitor_uri, config, remote_path='/tmp/sensors'):
+def deploy_and_start_sensors(monitor_uri, config,
+                             remote_path='/tmp/sensors',
+                             connected_config=None):
     paths = {os.path.dirname(__file__): remote_path}
     with ThreadPoolExecutor(max_workers=32) as executor:
         futures = []
 
-        for uri, config in config.items():
+        if connected_config is not None:
+            assert config is None
+            node_iter = connected_config
+        else:
+            node_iter = config.items()
+
+        for uri_or_conn, config in node_iter:
             futures.append(executor.submit(deploy_and_start_sensor,
-                                           paths, uri, monitor_uri,
+                                           paths, uri_or_conn,
+                                           monitor_uri,
                                            config, remote_path))
 
         if not wait_all_ok(futures):
             raise RuntimeError("Sensor deployment fails on some nodes")
 
 
-def deploy_and_start_sensor(paths, uri, monitor_uri, config, remote_path):
+def deploy_and_start_sensor(paths, uri_or_conn, monitor_uri, config,
+                            remote_path):
     try:
-        conn = connect(uri)
+        if isinstance(uri_or_conn, basestring):
+            conn = connect(uri_or_conn)
+        else:
+            conn = uri_or_conn
+
         copy_paths(conn, paths)
         sftp = conn.open_sftp()
 
@@ -41,17 +58,23 @@
         cmd = cmd_templ.format(main_remote_path,
                                monitor_uri,
                                config_remote_path)
-        print "Executing", cmd
         conn.exec_command(cmd)
         sftp.close()
-        conn.close()
+
+        if isinstance(uri_or_conn, basestring):
+            conn.close()
     except:
+        logger.exception("During deploing sensors in {0}".format(uri_or_conn))
         return False
     return True
 
 
-def stop_and_remove_sensor(uri, remote_path):
-    conn = connect(uri)
+def stop_and_remove_sensor(uri_or_conn, remote_path):
+    if isinstance(uri_or_conn, basestring):
+        conn = connect(uri_or_conn)
+    else:
+        conn = uri_or_conn
+
     main_remote_path = os.path.join(remote_path, "main.py")
 
     cmd_templ = "python {0} -d stop"
@@ -62,15 +85,23 @@
 
     conn.exec_command("rm -rf {0}".format(remote_path))
 
-    conn.close()
+    if isinstance(uri_or_conn, basestring):
+        conn.close()
 
 
-def stop_and_remove_sensors(config, remote_path='/tmp/sensors'):
+def stop_and_remove_sensors(config, remote_path='/tmp/sensors',
+                            connected_config=None):
     with ThreadPoolExecutor(max_workers=32) as executor:
         futures = []
 
-        for uri, config in config.items():
+        if connected_config is not None:
+            assert config is None
+            conf_iter = connected_config
+        else:
+            conf_iter = config.items()
+
+        for uri_or_conn, config in conf_iter:
             futures.append(executor.submit(stop_and_remove_sensor,
-                                           uri, remote_path))
+                                           uri_or_conn, remote_path))
 
         wait(futures)
diff --git a/sensors/main.py b/sensors/main.py
index fea46a3..3c953fa 100644
--- a/sensors/main.py
+++ b/sensors/main.py
@@ -64,6 +64,10 @@
         time.sleep(opts.timeout)
 
 
+def pid_running(pid):
+    return os.path.exists("/proc/" + str(pid))
+
+
 def main(argv):
     opts = parse_args(argv)
 
@@ -86,12 +90,12 @@
         elif opts.daemon == 'stop':
             if os.path.isfile(pid_file):
                 pid = int(open(pid_file).read())
-                if os.path.exists("/proc/" + str(pid)):
+                if pid_running(pid):
                     os.kill(pid, signal.SIGTERM)
 
                 time.sleep(0.1)
 
-                if os.path.exists("/proc/" + str(pid)):
+                if pid_running(pid):
                     os.kill(pid, signal.SIGKILL)
 
                 if os.path.isfile(pid_file):
@@ -99,7 +103,7 @@
         elif opts.daemon == 'status':
             if os.path.isfile(pid_file):
                 pid = int(open(pid_file).read())
-                if os.path.exists("/proc/" + str(pid)):
+                if pid_running(pid):
                     print "running"
                     return
             print "stopped"
diff --git a/sensors/protocol.py b/sensors/protocol.py
index cfdd93e..02b661a 100644
--- a/sensors/protocol.py
+++ b/sensors/protocol.py
@@ -1,3 +1,4 @@
+import sys
 import time
 import socket
 import select
@@ -57,6 +58,7 @@
 
 # ------------------------------------- Transports ---------------------------
 
+
 class ITransport(object):
     def __init__(self, receiver):
         pass
@@ -73,12 +75,14 @@
 
     def __init__(self, receiver, delta=True):
         if receiver:
-            raise ValueError("StdoutTransport don't allows receiving")
+            cname = self.__class__.__name__
+            raise ValueError("{0} don't allows receiving".format(cname))
 
         self.headers = None
         self.line_format = ""
         self.prev = {}
         self.delta = delta
+        self.fd = sys.stdout
 
     def send(self, data):
         if self.headers is None:
@@ -100,10 +104,17 @@
         else:
             vals = [data[header].value for header in self.headers]
 
-        print self.line_format.format(*vals)
+        self.fd.write(self.line_format.format(*vals) + "\n")
 
     def recv(self, timeout=None):
-        raise ValueError("StdoutTransport don't allows receiving")
+        cname = self.__class__.__name__
+        raise ValueError("{0} don't allows receiving".format(cname))
+
+
+class FileTransport(StdoutTransport):
+    def __init__(self, receiver, fname, delta=True):
+        StdoutTransport.__init__(self, receiver, delta)
+        self.fd = open(fname, "w")
 
 
 class UDPTransport(ITransport):
@@ -170,10 +181,12 @@
         ip, port = parsed_uri.netloc.split(":")
         return UDPTransport(receiver, ip=ip, port=int(port),
                             packer_cls=PickleSerializer)
+    elif parsed_uri.scheme == 'file':
+        return FileTransport(receiver, parsed_uri.path)
     elif parsed_uri.scheme == 'hugeudp':
         ip, port = parsed_uri.netloc.split(":")
         return HugeUDPTransport(receiver, ip=ip, port=int(port),
-                            packer_cls=MSGPackSerializer)
+                                packer_cls=MSGPackSerializer)
     else:
         templ = "Can't instantiate transport from {0!r}"
         raise ValueError(templ.format(uri))
diff --git a/ssh_utils.py b/ssh_utils.py
index 7c859cf..e546b72 100644
--- a/ssh_utils.py
+++ b/ssh_utils.py
@@ -1,19 +1,14 @@
 import re
 import time
-import Queue
 import logging
 import os.path
 import getpass
-import threading
 
 import socket
 import paramiko
-from concurrent.futures import ThreadPoolExecutor
 
-from utils import get_barrier
 
 logger = logging.getLogger("io-perf-tool")
-conn_uri_attrs = ("user", "passwd", "host", "port", "path")
 
 
 def ssh_connect(creds, retry_count=60, timeout=1):
@@ -67,7 +62,10 @@
     return dirpath
 
 
-def ssh_mkdir(sftp, remotepath, mode=0777, intermediate=False):
+ALL_RWX_MODE = ((1 << 9) - 1)
+
+
+def ssh_mkdir(sftp, remotepath, mode=ALL_RWX_MODE, intermediate=False):
     remotepath = normalize_dirpath(remotepath)
     if intermediate:
         try:
@@ -83,7 +81,7 @@
 def ssh_copy_file(sftp, localfile, remfile, preserve_perm=True):
     sftp.put(localfile, remfile)
     if preserve_perm:
-        sftp.chmod(remfile, os.stat(localfile).st_mode & 0777)
+        sftp.chmod(remfile, os.stat(localfile).st_mode & ALL_RWX_MODE)
 
 
 def put_dir_recursively(sftp, localpath, remotepath, preserve_perm=True):
@@ -121,9 +119,9 @@
             sftp.chdir(remroot)
         except IOError:
             if preserve_perm:
-                mode = os.stat(root).st_mode & 0777
+                mode = os.stat(root).st_mode & ALL_RWX_MODE
             else:
-                mode = 0777
+                mode = ALL_RWX_MODE
             ssh_mkdir(sftp, remroot, mode=mode, intermediate=True)
             sftp.chdir(remroot)
 
@@ -156,8 +154,10 @@
 
 
 class ConnCreds(object):
+    conn_uri_attrs = ("user", "passwd", "host", "port", "path")
+
     def __init__(self):
-        for name in conn_uri_attrs:
+        for name in self.conn_uri_attrs:
             setattr(self, name, None)
 
 
@@ -224,43 +224,42 @@
     return ssh_connect(creds)
 
 
-def get_ssh_runner(uris,
-                   conn_func,
-                   latest_start_time=None,
-                   keep_temp_files=False):
-    logger.debug("Connecting to servers")
+# def get_ssh_runner(uris,
+#                    conn_func,
+#                    latest_start_time=None,
+#                    keep_temp_files=False):
+#     logger.debug("Connecting to servers")
 
-    with ThreadPoolExecutor(max_workers=16) as executor:
-        connections = list(executor.map(connect, uris))
+#     with ThreadPoolExecutor(max_workers=16) as executor:
+#         connections = list(executor.map(connect, uris))
 
-    result_queue = Queue.Queue()
-    barrier = get_barrier(len(uris), threaded=True)
+#     result_queue = Queue.Queue()
+#     barrier = get_barrier(len(uris), threaded=True)
 
-    def closure(obj):
-        ths = []
-        obj.set_result_cb(result_queue.put)
+#     def closure(obj):
+#         ths = []
+#         obj.set_result_cb(result_queue.put)
 
-        params = (obj, barrier, latest_start_time)
+#         params = (obj, barrier, latest_start_time)
 
-        logger.debug("Start tests")
-        for conn in connections:
-            th = threading.Thread(None, conn_func, None,
-                                  params + (conn,))
-            th.daemon = True
-            th.start()
-            ths.append(th)
+#         logger.debug("Start tests")
+#         for conn in connections:
+#             th = threading.Thread(None, conn_func, None,
+#                                   params + (conn,))
+#             th.daemon = True
+#             th.start()
+#             ths.append(th)
 
-        for th in ths:
-            th.join()
+#         for th in ths:
+#             th.join()
 
-        test_result = []
-        while not result_queue.empty():
-            test_result.append(result_queue.get())
+#         test_result = []
+#         while not result_queue.empty():
+#             test_result.append(result_queue.get())
 
-        logger.debug("Done. Closing connection")
-        for conn in connections:
-            conn.close()
+#         logger.debug("Done. Closing connection")
+#         for conn in connections:
+#             conn.close()
 
-        return test_result
-
-    return closure
+#         return test_result
+#     return closure
diff --git a/starts_vms.py b/starts_vms.py
index 62bff8c..16e7d08 100644
--- a/starts_vms.py
+++ b/starts_vms.py
@@ -199,7 +199,7 @@
         if isinstance(vol.display_name, basestring):
             if re.match(name_templ.format("\\d+"), vol.display_name):
                 if vol.status in ('available', 'error'):
-                    print "Deleting volume", vol.display_name
+                    logger.debug("Deleting volume " + vol.display_name)
                     cinder.volumes.delete(vol)
 
     logger.debug("Clearing done (yet some volumes may still deleting)")
diff --git a/io_scenario/__init__.py b/tests/__init__.py
similarity index 100%
rename from io_scenario/__init__.py
rename to tests/__init__.py
diff --git a/tests/io.py b/tests/io.py
new file mode 100644
index 0000000..2405f53
--- /dev/null
+++ b/tests/io.py
@@ -0,0 +1,97 @@
+import sys
+import time
+import json
+import select
+import pprint
+import argparse
+import subprocess
+from StringIO import StringIO
+from ConfigParser import RawConfigParser
+
+
+def run_fio(benchmark_config):
+    cmd = ["fio", "--output-format=json", "-"]
+    p = subprocess.Popen(cmd, stdin=subprocess.PIPE,
+                         stdout=subprocess.PIPE,
+                         stderr=subprocess.STDOUT)
+    raw_out, _ = p.communicate(benchmark_config)
+    job_output = json.loads(raw_out)["jobs"][0]
+
+    if job_output['write']['iops'] != 0:
+        raw_result = job_output['write']
+    else:
+        raw_result = job_output['read']
+
+    res = {}
+
+    # 'bw_dev bw_mean bw_max bw_min'.split()
+    for field in ["bw_mean", "iops"]:
+        res[field] = raw_result[field]
+
+    res["lat"] = raw_result["lat"]["mean"]
+    res["clat"] = raw_result["clat"]["mean"]
+    res["slat"] = raw_result["slat"]["mean"]
+    res["util"] = json.loads(raw_out)["disk_util"][0]
+
+    res["util"] = dict((str(k), v) for k, v in res["util"].items())
+
+    return res
+
+
+def run_benchmark(binary_tp, *argv, **kwargs):
+    if 'fio' == binary_tp:
+        return run_fio(*argv, **kwargs)
+    raise ValueError("Unknown behcnmark {0}".format(binary_tp))
+
+
+def parse_args(argv):
+    parser = argparse.ArgumentParser(
+        description="Run fio' and return result")
+    parser.add_argument(
+        "--type", metavar="BINARY_TYPE",
+        choices=['fio'], required=True)
+    parser.add_argument("--start-at", metavar="START_TIME", type=int)
+    parser.add_argument("--json", action="store_true", default=False)
+    parser.add_argument("jobfile")
+    return parser.parse_args(argv)
+
+
+def main(argv):
+    argv_obj = parse_args(argv)
+    if argv_obj.jobfile == '-':
+        job_cfg = ""
+        dtime = 10
+        while True:
+            r, w, x = select.select([sys.stdin], [], [], dtime)
+            if len(r) == 0:
+                raise IOError("No config provided")
+            char = sys.stdin.read(1)
+            if '' == char:
+                break
+            job_cfg += char
+            dtime = 1
+    else:
+        job_cfg = open(argv_obj.jobfile).read()
+
+    rcp = RawConfigParser()
+    rcp.readfp(StringIO(job_cfg))
+    assert len(rcp.sections()) == 1
+
+    if argv_obj.start_at is not None:
+        ctime = time.time()
+        if argv_obj.start_at >= ctime:
+            time.sleep(ctime - argv_obj.start_at)
+
+    res = run_benchmark(argv_obj.type, job_cfg)
+    res['__meta__'] = dict(rcp.items(rcp.sections()[0]))
+    res['__meta__']['raw'] = job_cfg
+
+    if argv_obj.json:
+        sys.stdout.write(json.dumps(res))
+    else:
+        sys.stdout.write(pprint.pformat(res))
+        sys.stdout.write("\n")
+    return 0
+
+if __name__ == '__main__':
+    exit(main(sys.argv[1:]))
diff --git a/io_scenario/__init__.py b/tests/postgres/__init__.py
similarity index 100%
copy from io_scenario/__init__.py
copy to tests/postgres/__init__.py
diff --git a/hl_tests/postgres/prepare.sh b/tests/postgres/prepare.sh
similarity index 100%
rename from hl_tests/postgres/prepare.sh
rename to tests/postgres/prepare.sh
diff --git a/hl_tests/postgres/run.sh b/tests/postgres/run.sh
similarity index 100%
rename from hl_tests/postgres/run.sh
rename to tests/postgres/run.sh
diff --git a/utils.py b/utils.py
index 059101a..4a784ec 100644
--- a/utils.py
+++ b/utils.py
@@ -1,10 +1,8 @@
 import time
 import socket
-import os.path
 import logging
 import threading
 import contextlib
-import multiprocessing
 
 
 logger = logging.getLogger("io-perf-tool")
@@ -22,43 +20,32 @@
     return user, passwd, host
 
 
-def get_barrier(count, threaded=False):
-    if threaded:
-        class val(object):
-            value = count
-        cond = threading.Condition()
-    else:
-        val = multiprocessing.Value('i', count)
-        cond = multiprocessing.Condition()
+class TaksFinished(Exception):
+    pass
 
-    def closure(timeout):
-        with cond:
-            val.value -= 1
-            if val.value == 0:
-                cond.notify_all()
+
+class Barrier(object):
+    def __init__(self, count):
+        self.count = count
+        self.curr_count = 0
+        self.cond = threading.Condition()
+        self.exited = False
+
+    def wait(self, timeout=None):
+        with self.cond:
+            if self.exited:
+                raise TaksFinished()
+
+            self.curr_count += 1
+            if self.curr_count == self.count:
+                self.curr_count = 0
+                self.cond.notify_all()
             else:
-                cond.wait(timeout)
-            return val.value == 0
+                self.cond.wait(timeout=timeout)
 
-    return closure
-
-
-def wait_on_barrier(barrier, latest_start_time):
-    if barrier is not None:
-        if latest_start_time is not None:
-            timeout = latest_start_time - time.time()
-        else:
-            timeout = None
-
-        if timeout is not None and timeout > 0:
-            msg = "Ready and waiting on barrier. " + \
-                  "Will wait at most {0} seconds"
-            logger.debug(msg.format(int(timeout)))
-
-            if not barrier(timeout):
-                logger.debug("Barrier timeouted")
-            else:
-                logger.debug("Passing barrier, starting test")
+    def exit(self):
+        with self.cond:
+            self.exited = True
 
 
 @contextlib.contextmanager
@@ -77,56 +64,51 @@
         raise
 
 
-def run_over_ssh(conn, cmd):
+def run_over_ssh(conn, cmd, stdin_data=None, exec_timeout=60):
     "should be replaces by normal implementation, with select"
-
-    stdin, stdout, stderr = conn.exec_command(cmd)
-    out = stdout.read()
-    err = stderr.read()
-    code = stdout.channel.recv_exit_status()
-    return code, out, err
-
-
-def kb_to_ssize(ssize):
-    size_ext = {
-        4: 'P',
-        3: 'T',
-        2: 'G',
-        1: 'M',
-        0: 'K'
-    }
-
-    for idx in reversed(sorted(size_ext)):
-        if ssize > 1024 ** idx:
-            ext = size_ext[idx]
-            return "{0}{1}".format(int(ssize / 1024 ** idx), ext)
-    raise ValueError("Can't convert {0} to kb".format(ssize))
-
-
-def ssize_to_kb(ssize):
+    transport = conn.get_transport()
+    session = transport.open_session()
     try:
-        smap = dict(k=1, K=1, M=1024, m=1024, G=1024**2, g=1024**2)
-        for ext, coef in smap.items():
-            if ssize.endswith(ext):
-                return int(ssize[:-1]) * coef
+        session.set_combine_stderr(True)
 
-        if int(ssize) % 1024 != 0:
-            raise ValueError()
+        stime = time.time()
+        session.exec_command(cmd)
 
-        return int(ssize) / 1024
+        if stdin_data is not None:
+            session.sendall(stdin_data)
 
-    except (ValueError, TypeError, AttributeError):
-        tmpl = "Unknow size format {0!r} (or size not multiples 1024)"
-        raise ValueError(tmpl.format(ssize))
+        session.settimeout(1)
+        session.shutdown_write()
+        output = ""
+
+        while True:
+            try:
+                ndata = session.recv(1024)
+                output += ndata
+                if "" == ndata:
+                    break
+            except socket.timeout:
+                pass
+            if time.time() - stime > exec_timeout:
+                return 1, output + "\nExecution timeout"
+        code = session.recv_exit_status()
+    finally:
+        session.close()
+
+    return code, output
+
+
+SMAP = dict(k=1024, m=1024 ** 2, g=1024 ** 3, t=1024 ** 4)
 
 
 def ssize_to_b(ssize):
     try:
-        smap = dict(k=1, K=1, M=1024, m=1024, G=1024**2, g=1024**2)
-        for ext, coef in smap.items():
-            if ssize.endswith(ext):
-                return int(ssize[:-1]) * coef * 1024
+        ssize = ssize.lower()
 
+        if ssize.endswith("b"):
+            ssize = ssize[:-1]
+        if ssize[-1] in SMAP:
+            return int(ssize[:-1]) * SMAP[ssize[-1]]
         return int(ssize)
     except (ValueError, TypeError, AttributeError):
         tmpl = "Unknow size format {0!r} (or size not multiples 1024)"