large refactoring, ready to move away from rally
diff --git a/TODO b/TODO
new file mode 100644
index 0000000..9f383d3
--- /dev/null
+++ b/TODO
@@ -0,0 +1,15 @@
+выкинуть все упоминания о googledoc
+нам нужен backlog
+я буду productowner и буду вносить тикеты в борду
+
+код сайты вынести из __init__.py в отдельный файл
+перенести конфиг линта из тестов
+пройти все линтом
+move ssize_to_kb to utils
+unittests
+перейти на нормальное логирование
+
+zmq/tcp client should immitate ssh
+rpyc???
+детектирует vm по локальным ip, которые получаем из nova
+roadmap написать
\ No newline at end of file
diff --git a/data.py b/data.py
deleted file mode 100644
index 5569616..0000000
--- a/data.py
+++ /dev/null
@@ -1,113 +0,0 @@
-import re
-import sys
-
-
-splitter_rr = "(?ms)=====+\n"
-
-
-def get_data_from_output(fname):
-    results = {}
-    fc = open(fname).read()
-
-    for block in re.split(splitter_rr, fc):
-        block = block.strip()
-        if not block.startswith("[{u'__meta__':"):
-            continue
-        for val in eval(block):
-            meta = val['__meta__']
-            meta['sync'] = 's' if meta['sync'] else 'a'
-            key = "{action} {sync} {blocksize}k".format(**meta)
-            results.setdefault(key, []).append(val['bw_mean'])
-
-    processed_res = {}
-
-    for k, v in results.items():
-        v.sort()
-        med = float(sum(v)) / len(v)
-        ran = sum(abs(x - med) for x in v) / len(v)
-        processed_res[k] = (int(med), int(ran))
-
-    return processed_res
-
-
-def ksort(x):
-    op, sync, sz = x.split(" ")
-    return (op, sync, int(sz[:-1]))
-
-
-def show_data(path1, path2=None):
-    templ_1 = "  {:>10}  {:>5}  {:>5}     {:>6} ~ {:>5} {:>2}% {:>5}"
-    templ_2 = templ_1 + "      {:>6} ~ {:>5} {:>2}% {:>5} ----  {:>6}%  "
-
-    ln_1 = templ_1.replace("<", "^").replace(">", "^")
-    ln_1 = ln_1.format("Oper", "Sync", "BSZ", "BW1", "DEV1", "%", "IOPS1")
-
-    ln_2 = templ_2.replace("<", "^").replace(">", "^")
-    ln_2 = ln_2.format("Oper", "Sync", "BSZ", "BW1", "DEV1", "%",
-                       "IOPS1", "BW2", "DEV2", "%", "IOPS2", "DIFF %")
-
-    sep_1 = '-' * len(ln_1)
-    sep_2 = '-' * len(ln_2)
-
-    res_1 = get_data_from_output(path1)
-
-    if path2 is None:
-        res_2 = None
-        sep = sep_1
-        ln = ln_1
-        templ = templ_1
-    else:
-        res_2 = get_data_from_output(path2)
-        sep = sep_2
-        ln = ln_2
-        templ = templ_2
-
-    print sep
-    print ln
-    print sep
-
-    prev_tp = None
-
-    common_keys = set(res_1.keys())
-
-    if res_2 is not None:
-        common_keys &= set(res_2.keys())
-
-    for k in sorted(common_keys, key=ksort):
-        tp = k.rsplit(" ", 1)[0]
-        op, s, sz = k.split(" ")
-        s = 'sync' if s == 's' else 'async'
-
-        if tp != prev_tp and prev_tp is not None:
-            print sep
-
-        prev_tp = tp
-
-        m1, d1 = res_1[k]
-        iops1 = m1 / int(sz[:-1])
-        perc1 = int(d1 * 100.0 / m1 + 0.5)
-
-        if res_2 is not None:
-            m2, d2 = res_2[k]
-            iops2 = m2 / int(sz[:-1])
-            perc2 = int(d2 * 100.0 / m2 + 0.5)
-            avg_diff = int(((m2 - m1) * 100.) / m2 + 0.5)
-
-        if res_2 is not None:
-            print templ.format(op, s, sz, m1, d1, perc1, iops1,
-                               m2, d2, perc2, iops2, avg_diff)
-        else:
-            print templ.format(op, s, sz, m1, d1, perc1, iops1)
-
-    print sep
-
-
-def main(argv):
-    path1 = argv[0]
-    path2 = argv[1] if len(argv) > 1 else None
-    show_data(path1, path2)
-    return 0
-
-if __name__ == "__main__":
-    exit(main(sys.argv[1:]))
-# print " ", results[k]
diff --git a/data_extractor.py b/data_extractor.py
deleted file mode 100644
index d4e6ed1..0000000
--- a/data_extractor.py
+++ /dev/null
@@ -1,116 +0,0 @@
-import sys
-import json
-import sqlite3
-import contextlib
-
-
-def connect(url):
-    return sqlite3.connect(url)
-
-
-create_db_sql_templ = """
-CREATE TABLE build (id integer primary key,
-                    build text,
-                    type text,
-                    md5 text);
-
-CREATE TABLE params_combination (id integer primary key, {params});
-
-CREATE TABLE result (build_id integer,
-                     params_combination integer,
-                     bandwith float,
-                     deviation float);
-"""
-
-
-PARAM_COUNT = 20
-
-
-def get_all_tables(conn):
-    cursor = conn.cursor()
-    cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
-    return cursor.fetchall()
-
-
-def drop_database(conn):
-    cursor = conn.cursor()
-    cursor.execute("drop table result")
-    cursor.execute("drop table params_combination")
-    cursor.execute("drop table build")
-
-
-def init_database(conn):
-    cursor = conn.cursor()
-
-    params = ["param_{0} text".format(i) for i in range(PARAM_COUNT)]
-    create_db_sql = create_db_sql_templ.format(params=",".join(params))
-
-    for sql in create_db_sql.split(";"):
-        cursor.execute(sql)
-
-
-def insert_build(cursor, build_id, build_type, iso_md5):
-    cursor.execute("insert into build (build, type, md5) values (?, ?, ?)",
-                   (build_id, build_type, iso_md5))
-    return cursor.lastrowid
-
-
-def insert_params(cursor, *param_vals):
-    param_vals = param_vals + ("",) * (PARAM_COUNT - len(param_vals))
-
-    params = ",".join(['?'] * PARAM_COUNT)
-    select_templ = "select id from params_combination where {params_where}"
-
-    params_where = ["param_{0}=?".format(i) for i in range(PARAM_COUNT)]
-    req = select_templ.format(params_where=" AND ".join(params_where))
-    cursor.execute(req, param_vals)
-    res = cursor.fetchall()
-    if [] != res:
-        return res[0][0]
-
-    params = ",".join(['?'] * PARAM_COUNT)
-    param_insert_templ = "insert into params_combination ({0}) values ({1})"
-    param_names = ",".join("param_{0}".format(i) for i in range(PARAM_COUNT))
-    req = param_insert_templ.format(param_names, params)
-    cursor.execute(req, param_vals)
-    return cursor.lastrowid
-
-
-def insert_results(cursor, build_id, params_id, bw, dev):
-    req = "insert into result values (?, ?, ?, ?)"
-    cursor.execute(req, (build_id, params_id, bw, dev))
-
-
-@contextlib.contextmanager
-def transaction(conn):
-    try:
-        cursor = conn.cursor()
-        yield cursor
-    except:
-        conn.rollback()
-        raise
-    else:
-        conn.commit()
-
-
-def json_to_db(json_data, conn):
-    data = json.loads(json_data)
-    with transaction(conn) as cursor:
-        for build_data in data:
-            build_id = insert_build(cursor,
-                                    build_data.pop("build_id"),
-                                    build_data.pop("type"),
-                                    build_data.pop("iso_md5"))
-
-            for params, (bw, dev) in build_data.items():
-                param_id = insert_params(cursor, *params.split(" "))
-                insert_results(cursor, build_id, param_id, bw, dev)
-
-
-conn = sqlite3.connect(sys.argv[1])
-json_data = open(sys.argv[2]).read()
-
-if len(get_all_tables(conn)) == 0:
-    init_database(conn)
-
-json_to_db(json_data, conn)
diff --git a/envs/cz-perf-1.json b/envs/cz-perf-1.json
new file mode 100644
index 0000000..ca6b54c
--- /dev/null
+++ b/envs/cz-perf-1.json
@@ -0,0 +1,12 @@
+{
+    "type": "ExistingCloud",
+    "auth_url": "http://172.16.53.2:5000/v2.0/",
+    "region_name": "RegionOne",
+    "endpoint_type": "public",
+    "endpoint": "http://172.16.53.2:35357/v2.0/",
+    "admin": {
+        "username": "admin",
+        "password": "admin",
+        "tenant_name": "admin"
+    }
+}
\ No newline at end of file
diff --git a/envs/cz-perf-2.json b/envs/cz-perf-2.json
new file mode 100644
index 0000000..81eb1e4
--- /dev/null
+++ b/envs/cz-perf-2.json
@@ -0,0 +1,12 @@
+{
+    "type": "ExistingCloud",
+    "auth_url": "http://172.16.53.66:5000/v2.0/",
+    "region_name": "RegionOne",
+    "endpoint_type": "public",
+    "endpoint": "http://172.16.53.66:35357/v2.0/",
+    "admin": {
+        "username": "admin",
+        "password": "admin",
+        "tenant_name": "admin"
+    }
+}
\ No newline at end of file
diff --git a/io_scenario/__init__.py b/io_scenario/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/io_scenario/__init__.py
diff --git a/io-scenario/fio b/io_scenario/fio
similarity index 100%
rename from io-scenario/fio
rename to io_scenario/fio
Binary files differ
diff --git a/io-scenario/io.py b/io_scenario/io.py
similarity index 92%
rename from io-scenario/io.py
rename to io_scenario/io.py
index 8a3509a..e1756f6 100644
--- a/io-scenario/io.py
+++ b/io_scenario/io.py
@@ -139,31 +139,13 @@
 IOZoneParser.make_positions()
 
 
-def do_run_iozone(params, filename, timeout, iozone_path='iozone',
-                  microsecond_mode=False):
-
-    PATTERN = "\x6d"
-
-    cmd = [iozone_path, "-V", "109"]
-
-    if params.sync:
-        cmd.append('-o')
-
-    if params.direct_io:
-        cmd.append('-I')
-
-    if microsecond_mode:
-        cmd.append('-N')
-
+def iozone_do_prepare(params, filename, pattern):
     all_files = []
     threads = int(params.concurence)
     if 1 != threads:
-        cmd.extend(('-t', str(threads), '-F'))
         filename = filename + "_{}"
-        cmd.extend(filename % i for i in range(threads))
         all_files.extend(filename % i for i in range(threads))
     else:
-        cmd.extend(('-f', filename))
         all_files.append(filename)
 
     bsz = 1024 if params.size > 1024 else params.size
@@ -175,12 +157,41 @@
     for fname in all_files:
         with open(fname, "wb") as fd:
             if fsz > 1024:
-                pattern = PATTERN * 1024 * 1024
+                pattern = pattern * 1024 * 1024
                 for _ in range(int(fsz / 1024) + 1):
                     fd.write(pattern)
             else:
-                fd.write(PATTERN * 1024 * fsz)
+                fd.write(pattern * 1024 * fsz)
             fd.flush()
+    return all_files
+
+
+VERIFY_PATTERN = "\x6d"
+
+
+def do_run_iozone(params, filename, timeout, iozone_path='iozone',
+                  microsecond_mode=False,
+                  prepare_only=False):
+
+    cmd = [iozone_path, "-V", str(ord(VERIFY_PATTERN))]
+
+    if params.sync:
+        cmd.append('-o')
+
+    if params.direct_io:
+        cmd.append('-I')
+
+    if microsecond_mode:
+        cmd.append('-N')
+
+    all_files = iozone_do_prepare(params, filename, VERIFY_PATTERN)
+
+    threads = int(params.concurence)
+    if 1 != threads:
+        cmd.extend(('-t', str(threads), '-F'))
+        cmd.extend(all_files)
+    else:
+        cmd.extend(('-f', all_files[0]))
 
     cmd.append('-i')
 
@@ -217,10 +228,6 @@
     except:
         raise
 
-    # res['bw_dev'] = 0
-    # res['bw_max'] = res["bw_mean"]
-    # res['bw_min'] = res["bw_mean"]
-
     return res, " ".join(cmd)
 
 
@@ -318,7 +325,10 @@
     return json.loads(raw_out)["jobs"][0], " ".join(cmd_line)
 
 
-def run_fio(benchmark, fio_path, tmpname, timeout=None):
+def run_fio(benchmark, fio_path, tmpname, timeout=None, prepare_only=False):
+    if prepare_only:
+        return {}, ""
+
     job_output, cmd_line = run_fio_once(benchmark, fio_path, tmpname, timeout)
 
     if benchmark.action in ('write', 'randwrite'):
@@ -473,6 +483,9 @@
     parser.add_argument(
         "--binary-path", help="binary path",
         default=None, dest='binary_path')
+    parser.add_argument(
+        "--prepare-only", default=False, dest='prepare_only',
+        action="store_true")
     return parser.parse_args(argv)
 
 
@@ -526,31 +539,24 @@
         res, cmd = run_benchmark(argv_obj.type,
                                  benchmark,
                                  binary_path,
-                                 test_file_name)
-        res['__meta__'] = benchmark.__dict__.copy()
-        res['__meta__']['cmdline'] = cmd
-        sys.stdout.write(json.dumps(res) + "\n")
+                                 test_file_name,
+                                 argv_obj.prepare_only)
+        if not argv_obj.prepare_only:
+            res['__meta__'] = benchmark.__dict__.copy()
+            res['__meta__']['cmdline'] = cmd
+
+        sys.stdout.write(json.dumps(res))
+
+        if not argv_obj.prepare_only:
+            sys.stdout.write("\n")
+
     finally:
         if remove_binary:
             os.unlink(binary_path)
 
-        if os.path.isfile(test_file_name):
+        if os.path.isfile(test_file_name) and not argv_obj.prepare_only:
             os.unlink(test_file_name)
 
 
-# function-marker for patching, don't 'optimize' it
-def INSERT_TOOL_ARGS(x):
-    return [x]
-
-
 if __name__ == '__main__':
-    # this line would be patched in case of run under rally
-    # don't modify it!
-    argvs = INSERT_TOOL_ARGS(sys.argv[1:])
-    code = 0
-    for argv in argvs:
-        tcode = main(argv)
-        if tcode != 0:
-            code = tcode
-
-    exit(code)
+    exit(main(sys.argv[1:]))
diff --git a/io-scenario/io.yaml b/io_scenario/io.yaml
similarity index 100%
rename from io-scenario/io.yaml
rename to io_scenario/io.yaml
diff --git a/io-scenario/iozone b/io_scenario/iozone
similarity index 100%
rename from io-scenario/iozone
rename to io_scenario/iozone
Binary files differ
diff --git a/itest.py b/itest.py
new file mode 100644
index 0000000..6457aa8
--- /dev/null
+++ b/itest.py
@@ -0,0 +1,69 @@
+import abc
+import json
+import types
+import os.path
+
+from io_scenario import io
+from ssh_copy_directory import copy_paths
+
+
+class IPerfTest(object):
+    def __init__(self, on_result_cb):
+        self.on_result_cb = on_result_cb
+
+    def build(self, conn):
+        self.pre_run(conn)
+
+    def pre_run(self, conn):
+        pass
+
+    @abc.abstractmethod
+    def run(self, conn):
+        pass
+
+
+def run_test_iter(obj, conn):
+    yield obj.pre_run(conn)
+    res = obj.run(conn)
+    if isinstance(res, types.GeneratorType):
+        for vl in res:
+            yield vl
+    else:
+        yield res
+
+
+class IOPerfTest(IPerfTest):
+    def __init__(self,
+                 script_opts,
+                 testtool_local,
+                 on_result_cb,
+                 keep_tmp_files):
+
+        IPerfTest.__init__(self, on_result_cb)
+        dst_testtool_path = '/tmp/io_tool'
+        self.files_to_copy = {testtool_local: dst_testtool_path}
+        self.script_opts = script_opts + ["--binary-path", dst_testtool_path]
+
+    def pre_run(self, conn):
+        copy_paths(self.files_to_copy)
+
+    def run(self, conn):
+        io_py = os.path.dirname(io.__file__)
+
+        if io_py.endswith('.pyc'):
+            io_py = io_py[:-1]
+
+        args = ['env', 'python2', io_py] + self.script_opts
+        code, out, err = conn.execute(args)
+        self.on_result(code, out, err)
+        return code, out, err
+
+    def on_result(self, code, out, err):
+        if 0 == code:
+            try:
+                for line in out.split("\n"):
+                    if line.strip() != "":
+                        self.on_result_cb(json.loads(line))
+            except Exception as err:
+                msg = "Error during postprocessing results: {0!r}".format(err)
+                raise RuntimeError(msg)
diff --git a/rally_runner.py b/rally_runner.py
new file mode 100644
index 0000000..cc95309
--- /dev/null
+++ b/rally_runner.py
@@ -0,0 +1,189 @@
+import os
+import json
+import time
+import yaml
+import warnings
+import functools
+import contextlib
+
+from rally import exceptions
+from rally.cmd import cliutils
+from rally.cmd.main import categories
+from rally.benchmark.scenarios.vm.utils import VMScenario
+from rally.benchmark.scenarios.vm.vmtasks import VMTasks
+
+import itest
+from utils import get_barrier
+
+
+def log(x):
+    pass
+
+
+@contextlib.contextmanager
+def patch_VMTasks_boot_runcommand_delete():
+
+    try:
+        orig = VMTasks.boot_runcommand_delete
+    except AttributeError:
+        # rally code was changed
+        log("VMTasks class was changed and have no boot_runcommand_delete"
+            " method anymore. Update patch code.")
+        raise exceptions.ScriptError("monkeypatch code fails on "
+                                     "VMTasks.boot_runcommand_delete")
+
+    @functools.wraps(orig)
+    def new_boot_runcommand_delete(self, *args, **kwargs):
+        if 'rally_affinity_group' in os.environ:
+            group_id = os.environ['rally_affinity_group']
+            kwargs['scheduler_hints'] = {'group': group_id}
+        return orig(self, *args, **kwargs)
+
+    VMTasks.boot_runcommand_delete = new_boot_runcommand_delete
+
+    try:
+        yield
+    finally:
+        VMTasks.boot_runcommand_delete = orig
+
+
+# should actually use mock module for this,
+# but don't wanna to add new dependency
+@contextlib.contextmanager
+def patch_VMScenario_run_command_over_ssh(test_obj,
+                                          barrier=None,
+                                          latest_start_time=None):
+
+    try:
+        orig = VMScenario.run_action
+    except AttributeError:
+        # rally code was changed
+        log("VMScenario class was changed and have no run_action"
+            " method anymore. Update patch code.")
+        raise exceptions.ScriptError("monkeypatch code fails on "
+                                     "VMScenario.run_action")
+
+    @functools.wraps(orig)
+    def closure(self, ssh, *args, **kwargs):
+        try:
+            ssh._client.open_sftp
+        except AttributeError:
+            # rally code was changed
+            log("Prototype of VMScenario.run_command_over_ssh "
+                "was changed. Update patch code.")
+            raise exceptions.ScriptError("monkeypatch code fails on "
+                                         "ssh._client.open_sftp()")
+
+        test_iter = itest.run_test_iter(test_obj, ssh)
+
+        next(test_iter)
+
+        log("Start io test")
+
+        if barrier is not None:
+            if latest_start_time is not None:
+                timeout = latest_start_time - time.time()
+            else:
+                timeout = None
+
+            if timeout is not None and timeout > 0:
+                msg = "Ready and waiting on barrier. " + \
+                      "Will wait at most {0} seconds"
+                log(msg.format(int(timeout)))
+
+                if not barrier(timeout):
+                    log("Barrier timeouted")
+
+        try:
+            code, out, err = next(test_iter)
+        except Exception as exc:
+            log("Rally raises exception {0}".format(exc.message))
+            raise
+
+        if 0 != code:
+            templ = "Script returns error! code={0}\n {1}"
+            log(templ.format(code, err.rstrip()))
+        else:
+            log("Test finished")
+
+            # result = {"rally": 0, "srally": 1}
+            result = {"rally": 0}
+            out = json.dumps(result)
+
+        return code, out, err
+
+    VMScenario.run_action = closure
+
+    try:
+        yield
+    finally:
+        VMScenario.run_action = orig
+
+
+def run_rally(rally_args):
+    return cliutils.run(['rally'] + rally_args, categories)
+
+
+def prepare_files(dst_testtool_path, files_dir):
+
+    # we do need temporary named files
+    with warnings.catch_warnings():
+        warnings.simplefilter("ignore")
+        yaml_file = os.tmpnam()
+
+    yaml_src_cont = open(os.path.join(files_dir, "io.yaml")).read()
+    task_params = yaml.load(yaml_src_cont)
+    rcd_params = task_params['VMTasks.boot_runcommand_delete']
+    rcd_params[0]['args']['script'] = os.path.join(files_dir, "io.py")
+    yaml_dst_cont = yaml.dump(task_params)
+
+    open(yaml_file, "w").write(yaml_dst_cont)
+
+    return yaml_file
+
+
+def run_tests_using_rally(obj,
+                          files_dir,
+                          testtool_py_args,
+                          dst_testtool_path,
+                          max_preparation_time,
+                          rally_extra_opts,
+                          keep_temp_files):
+
+    yaml_file, py_file = prepare_files(testtool_py_args,
+                                       dst_testtool_path,
+                                       files_dir)
+
+    try:
+        do_patch1 = patch_VMScenario_run_command_over_ssh
+        config = yaml.load(open(yaml_file).read())
+
+        vm_sec = 'VMTasks.boot_runcommand_delete'
+        concurrency = config[vm_sec][0]['runner']['concurrency']
+
+        barrier = get_barrier(concurrency)
+        max_release_time = time.time() + max_preparation_time
+
+        with patch_VMTasks_boot_runcommand_delete():
+            with do_patch1(obj, barrier, max_release_time):
+                opts = ['task', 'start', yaml_file] + list(rally_extra_opts)
+                log("Start rally with opts '{0}'".format(" ".join(opts)))
+                run_rally(opts)
+    finally:
+        if not keep_temp_files:
+            os.unlink(yaml_file)
+            os.unlink(py_file)
+
+
+def get_rally_runner(files_dir,
+                     max_preparation_time,
+                     rally_extra_opts,
+                     keep_temp_files):
+
+    def closure(obj):
+        run_tests_using_rally(obj,
+                              files_dir,
+                              max_preparation_time,
+                              rally_extra_opts,
+                              keep_temp_files)
+    return closure
diff --git a/run_2.sh b/run_2.sh
deleted file mode 100644
index 566b1d5..0000000
--- a/run_2.sh
+++ /dev/null
@@ -1,43 +0,0 @@
-#!/bin/bash
-set -x
-set -e
-
-type="iozone"
-
-bsizes="1k 4k 64k 256k 1m"
-ops="randwrite write"
-osync="s a"
-three_times="1 2 3"
-
-for bsize in $bsizes ; do
-	for op in $ops ; do 
-		for sync in $osync ; do 
-			for xxx in $three_times ; do
-				if [[ "$sync" == "s" ]] ; then
-					ssync="-s"
-					factor="x500"
-				else
-					if [[ "$bsize" == "1k" || "$bsize" == "4k" ]] ; then
-						continue
-					fi
-
-					ssync=
-					factor="r2"
-				fi
-
-				python run_rally_test.py -l -o "--type $type -a $op --iodepth 16 --blocksize $bsize --iosize $factor $ssync" -t io-scenario $type --rally-extra-opts="--deployment perf-2"
-			done
-		done
-	done
-done
-
-bsizes="4k 64k 256k 1m"
-ops="randread read"
-
-for bsize in $bsizes ; do
-	for op in $ops ; do 
-		for xxx in $three_times ; do
-			python run_rally_test.py -l -o "--type $type -a $op --iodepth 16 --blocksize $bsize --iosize r2" -t io-scenario $type --rally-extra-opts="--deployment perf-2"
-		done
-	done
-done
diff --git a/run_rally_test.py b/run_rally_test.py
index ffad0ff..5a4df58 100644
--- a/run_rally_test.py
+++ b/run_rally_test.py
@@ -1,25 +1,15 @@
 import os
-import re
 import sys
-import time
-import yaml
 import json
 import pprint
 import os.path
 import argparse
 import datetime
-import warnings
-import functools
-import contextlib
 import multiprocessing
 
-from rally import exceptions
-from rally.cmd import cliutils
-from rally.cmd.main import categories
-from rally.benchmark.scenarios.vm.utils import VMScenario
-from rally.benchmark.scenarios.vm.vmtasks import VMTasks
-
-from ssh_copy_directory import put_dir_recursively, ssh_copy_file
+import io_scenario
+import rally_runner
+from itest import IOPerfTest
 
 
 def log(x):
@@ -28,229 +18,30 @@
     sys.stderr.write(pref + x.replace("\n", "\n" + pref) + "\n")
 
 
-@contextlib.contextmanager
-def patch_VMTasks_boot_runcommand_delete():
+def run_io_test(tool,
+                script_args,
+                test_runner,
+                keep_temp_files=False):
 
-    try:
-        orig = VMTasks.boot_runcommand_delete
-    except AttributeError:
-        # rally code was changed
-        log("VMTasks class was changed and have no boot_runcommand_delete"
-            " method anymore. Update patch code.")
-        raise exceptions.ScriptError("monkeypatch code fails on "
-                                     "VMTasks.boot_runcommand_delete")
-
-    @functools.wraps(orig)
-    def new_boot_runcommand_delete(self, *args, **kwargs):
-        if 'rally_affinity_group' in os.environ:
-            group_id = os.environ['rally_affinity_group']
-            kwargs['scheduler_hints'] = {'group': group_id}
-        return orig(self, *args, **kwargs)
-
-    VMTasks.boot_runcommand_delete = new_boot_runcommand_delete
-
-    try:
-        yield
-    finally:
-        VMTasks.boot_runcommand_delete = orig
-
-
-def get_barrier(count):
-    val = multiprocessing.Value('i', count)
-    cond = multiprocessing.Condition()
-
-    def closure(timeout):
-        me_released = False
-        with cond:
-            val.value -= 1
-            if val.value == 0:
-                me_released = True
-                cond.notify_all()
-            else:
-                cond.wait(timeout)
-            return val.value == 0
-
-        if me_released:
-            log("Test begins!")
-
-    return closure
-
-
-# should actually use mock module for this,
-# but don't wanna to add new dependency
-@contextlib.contextmanager
-def patch_VMScenario_run_command_over_ssh(paths,
-                                          on_result_cb,
-                                          barrier=None,
-                                          latest_start_time=None):
-
-    try:
-        orig = VMScenario.run_action
-    except AttributeError:
-        # rally code was changed
-        log("VMScenario class was changed and have no run_action"
-            " method anymore. Update patch code.")
-        raise exceptions.ScriptError("monkeypatch code fails on "
-                                     "VMScenario.run_action")
-
-    @functools.wraps(orig)
-    def closure(self, ssh, *args, **kwargs):
-        try:
-            sftp = ssh._client.open_sftp()
-        except AttributeError:
-            # rally code was changed
-            log("Prototype of VMScenario.run_command_over_ssh "
-                "was changed. Update patch code.")
-            raise exceptions.ScriptError("monkeypatch code fails on "
-                                         "ssh._client.open_sftp()")
-        try:
-            for src, dst in paths.items():
-                try:
-                    if os.path.isfile(src):
-                        ssh_copy_file(sftp, src, dst)
-                    elif os.path.isdir(src):
-                        put_dir_recursively(sftp, src, dst)
-                    else:
-                        templ = "Can't copy {0!r} - " + \
-                                "it neither a file not a directory"
-                        msg = templ.format(src)
-                        log(msg)
-                        raise exceptions.ScriptError(msg)
-                except exceptions.ScriptError:
-                    raise
-                except Exception as exc:
-                    tmpl = "Scp {0!r} => {1!r} failed - {2!r}"
-                    msg = tmpl.format(src, dst, exc)
-                    log(msg)
-                    raise exceptions.ScriptError(msg)
-        finally:
-            sftp.close()
-
-        log("Start io test")
-
-        if barrier is not None:
-            if latest_start_time is not None:
-                timeout = latest_start_time - time.time()
-            else:
-                timeout = None
-
-            if timeout is not None and timeout > 0:
-                msg = "Ready and waiting on barrier. " + \
-                      "Will wait at most {0} seconds"
-                log(msg.format(int(timeout)))
-
-                if not barrier(timeout):
-                    log("Barrier timeouted")
-
-        try:
-            code, out, err = orig(self, ssh, *args, **kwargs)
-        except Exception as exc:
-            log("Rally raises exception {0}".format(exc.message))
-            raise
-
-        if 0 != code:
-            templ = "Script returns error! code={0}\n {1}"
-            log(templ.format(code, err.rstrip()))
-        else:
-            log("Test finished")
-
-            try:
-                for line in out.split("\n"):
-                    if line.strip() != "":
-                        result = json.loads(line)
-                        on_result_cb(result)
-            except Exception as err:
-                log("Error during postprocessing results: {0!r}".format(err))
-
-            # result = {"rally": 0, "srally": 1}
-            result = {"rally": 0}
-            out = json.dumps(result)
-
-        return code, out, err
-
-    VMScenario.run_action = closure
-
-    try:
-        yield
-    finally:
-        VMScenario.run_action = orig
-
-
-def run_rally(rally_args):
-    return cliutils.run(['rally'] + rally_args, categories)
-
-
-def prepare_files(testtool_py_args_v, dst_testtool_path, files_dir):
-
-    # we do need temporary named files
-    with warnings.catch_warnings():
-        warnings.simplefilter("ignore")
-        py_file = os.tmpnam()
-        yaml_file = os.tmpnam()
-
-    testtool_py_inp_path = os.path.join(files_dir, "io.py")
-    py_src_cont = open(testtool_py_inp_path).read()
-    args_repl_rr = r'INSERT_TOOL_ARGS\(sys\.argv.*?\)'
-    py_dst_cont = re.sub(args_repl_rr, repr(testtool_py_args_v), py_src_cont)
-
-    if py_dst_cont == args_repl_rr:
-        templ = "Can't find replace marker in file {0}"
-        msg = templ.format(testtool_py_inp_path)
-        log(msg)
-        raise ValueError(msg)
-
-    yaml_src_cont = open(os.path.join(files_dir, "io.yaml")).read()
-    task_params = yaml.load(yaml_src_cont)
-    rcd_params = task_params['VMTasks.boot_runcommand_delete']
-    rcd_params[0]['args']['script'] = py_file
-    yaml_dst_cont = yaml.dump(task_params)
-
-    open(py_file, "w").write(py_dst_cont)
-    open(yaml_file, "w").write(yaml_dst_cont)
-
-    return yaml_file, py_file
-
-
-def run_test(tool, testtool_py_args_v, dst_testtool_path, files_dir,
-             rally_extra_opts, max_preparation_time=300,
-             keet_temp_files=False):
+    files_dir = os.path.dirname(io_scenario.__file__)
 
     path = 'iozone' if 'iozone' == tool else 'fio'
-    testtool_local = os.path.join(files_dir, path)
-    yaml_file, py_file = prepare_files(testtool_py_args_v,
-                                       dst_testtool_path,
-                                       files_dir)
-    try:
-        config = yaml.load(open(yaml_file).read())
+    src_testtool_path = os.path.join(files_dir, path)
 
-        vm_sec = 'VMTasks.boot_runcommand_delete'
-        concurrency = config[vm_sec][0]['runner']['concurrency']
-        copy_files = {testtool_local: dst_testtool_path}
+    result_queue = multiprocessing.Queue()
 
-        result_queue = multiprocessing.Queue()
-        results_cb = result_queue.put
+    obj = IOPerfTest(script_args,
+                     src_testtool_path,
+                     result_queue.put,
+                     keep_temp_files)
 
-        do_patch1 = patch_VMScenario_run_command_over_ssh
+    test_runner(obj)
 
-        barrier = get_barrier(concurrency)
-        max_release_time = time.time() + max_preparation_time
+    test_result = []
+    while not result_queue.empty():
+        test_result.append(result_queue.get())
 
-        with patch_VMTasks_boot_runcommand_delete():
-            with do_patch1(copy_files, results_cb, barrier, max_release_time):
-                opts = ['task', 'start', yaml_file] + list(rally_extra_opts)
-                log("Start rally with opts '{0}'".format(" ".join(opts)))
-                run_rally(opts)
-
-        rally_result = []
-        while not result_queue.empty():
-            rally_result.append(result_queue.get())
-
-        return rally_result
-
-    finally:
-        if not keet_temp_files:
-            os.unlink(yaml_file)
-            os.unlink(py_file)
+    return test_result
 
 
 def parse_args(argv):
@@ -262,7 +53,7 @@
                         action='store_true', default=False,
                         help="print some extra log info")
     parser.add_argument("-o", "--io-opts", dest='io_opts',
-                        nargs="*", default=[],
+                        required=True,
                         help="cmd line options for io.py")
     parser.add_argument("-t", "--test-directory", help="directory with test",
                         dest="test_directory", required=True)
@@ -270,7 +61,7 @@
                         type=int, dest="max_preparation_time")
     parser.add_argument("-k", "--keep", default=False,
                         help="keep temporary files",
-                        dest="keet_temp_files", action='store_true')
+                        dest="keep_temp_files", action='store_true')
     parser.add_argument("--rally-extra-opts", dest="rally_extra_opts",
                         default="", help="rally extra options")
 
@@ -279,7 +70,6 @@
 
 def main(argv):
     opts = parse_args(argv)
-    dst_testtool_path = '/tmp/io_tool'
 
     if not opts.extra_logs:
         global log
@@ -288,43 +78,23 @@
             pass
 
         log = nolog
-
-    if opts.io_opts == []:
-        testtool_py_args_v = []
-
-        block_sizes = ["4k", "64k"]
-        ops = ['randwrite']
-        iodepths = ['8']
-        syncs = [True]
-
-        for block_size in block_sizes:
-            for op in ops:
-                for iodepth in iodepths:
-                    for sync in syncs:
-                        tt_argv = ['--type', opts.tool_type,
-                                   '-a', op,
-                                   '--iodepth', iodepth,
-                                   '--blocksize', block_size,
-                                   '--iosize', '20M']
-                        if sync:
-                            tt_argv.append('-s')
-            testtool_py_args_v.append(tt_argv)
     else:
-        testtool_py_args_v = []
-        for o in opts.io_opts:
-            ttopts = [opt.strip() for opt in o.split(" ") if opt.strip() != ""]
-            testtool_py_args_v.append(ttopts)
+        rally_runner.log = log
 
-    for io_argv_list in testtool_py_args_v:
-        io_argv_list.extend(['--binary-path', dst_testtool_path])
+    script_args = [opt.strip()
+                   for opt in opts.io_opts.split(" ")
+                   if opt.strip() != ""]
 
-    res = run_test(opts.tool_type,
-                   testtool_py_args_v,
-                   dst_testtool_path,
-                   files_dir=opts.test_directory,
-                   rally_extra_opts=opts.rally_extra_opts.split(" "),
-                   max_preparation_time=opts.max_preparation_time,
-                   keet_temp_files=opts.keet_temp_files)
+    runner = rally_runner.get_rally_runner(
+        files_dir=os.path.dirname(io_scenario.__file__),
+        rally_extra_opts=opts.rally_extra_opts.split(" "),
+        max_preparation_time=opts.max_preparation_time,
+        keep_temp_files=opts.keep_temp_files)
+
+    res = run_io_test(opts.tool_type,
+                      script_args,
+                      runner,
+                      opts.keep_temp_files)
 
     print "=" * 80
     print pprint.pformat(res)
@@ -352,14 +122,16 @@
 
     return 0
 
-# ubuntu cloud image
-# https://cloud-images.ubuntu.com/trusty/current/trusty-server-cloudimg-amd64-disk1.img
 
-# glance image-create --name 'ubuntu' --disk-format qcow2
-# --container-format bare --is-public true --copy-from
-# https://cloud-images.ubuntu.com/trusty/current/trusty-server-cloudimg-amd64-disk1.img
-# nova flavor-create ceph.512 ceph.512 512 50 1
-# nova server-group-create --policy anti-affinity ceph
+ostack_prepare = """
+glance image-create --name 'ubuntu' --disk-format qcow2
+--container-format bare --is-public true --copy-from
+https://cloud-images.ubuntu.com/trusty/current/trusty-server-cloudimg-amd64-disk1.img
+
+nova flavor-create ceph.512 ceph.512 512 50 1
+nova server-group-create --policy anti-affinity ceph
+"""
+
 
 if __name__ == '__main__':
     exit(main(sys.argv[1:]))
diff --git a/scripts/data.py b/scripts/data.py
new file mode 100644
index 0000000..dfe97a2
--- /dev/null
+++ b/scripts/data.py
@@ -0,0 +1,115 @@
+import re
+import sys
+import json
+
+splitter_rr = "(?ms)=====+\n"
+
+
+def get_data_from_output(fname):
+    results = {}
+    fc = open(fname).read()
+
+    for block in re.split(splitter_rr, fc):
+        block = block.strip()
+        if not block.startswith("[{u'__meta__':"):
+            continue
+        for val in eval(block):
+            meta = val['__meta__']
+            meta['sync'] = 's' if meta['sync'] else 'a'
+            key = "{action} {sync} {blocksize}k".format(**meta)
+            results.setdefault(key, []).append(val['bw_mean'])
+
+    processed_res = {}
+
+    for k, v in results.items():
+        v.sort()
+        med = float(sum(v)) / len(v)
+        ran = sum(abs(x - med) for x in v) / len(v)
+        processed_res[k] = (int(med), int(ran))
+
+    return meta, processed_res
+
+
+def ksort(x):
+    op, sync, sz = x.split(" ")
+    return (op, sync, int(sz[:-1]))
+
+
+def create_json_results(meta, file_data):
+    row = {"build_id": "",
+           "type": "",
+           "iso_md5": ""}
+    row.update(file_data)
+    return json.dumps(row)
+
+
+def show_data(*pathes):
+    begin = "|  {:>10}  {:>5}  {:>5}"
+    first_file_templ = "  |  {:>6} ~ {:>5} {:>2}% {:>5}"
+    other_file_templ = "  |  {:>6} ~ {:>5} {:>2}% {:>5} ----  {:>6}%"
+
+    line_templ = begin + first_file_templ + \
+        other_file_templ * (len(pathes) - 1) + "  |"
+
+    header_ln = line_templ.replace("<", "^").replace(">", "^")
+
+    params = ["Oper", "Sync", "BSZ", "BW1", "DEV1", "%", "IOPS1"]
+    for pos in range(1, len(pathes)):
+        params += "BW{0}+DEV{0}+%+IOPS{0}+DIFF %".format(pos).split("+")
+
+    header_ln = header_ln.format(*params)
+
+    sep = '-' * len(header_ln)
+
+    results = [get_data_from_output(path)[1] for path in pathes]
+
+    print sep
+    print header_ln
+    print sep
+
+    prev_tp = None
+
+    common_keys = set(results[0].keys())
+    for result in results[1:]:
+        common_keys &= set(result.keys())
+
+    for k in sorted(common_keys, key=ksort):
+        tp = k.rsplit(" ", 1)[0]
+        op, s, sz = k.split(" ")
+        s = 'sync' if s == 's' else 'async'
+
+        if tp != prev_tp and prev_tp is not None:
+            print sep
+
+        prev_tp = tp
+
+        results0 = results[0]
+        m0, d0 = results0[k]
+        iops0 = m0 / int(sz[:-1])
+        perc0 = int(d0 * 100.0 / m0 + 0.5)
+
+        data = [op, s, sz, m0, d0, perc0, iops0]
+
+        for result in results[1:]:
+            m, d = result[k]
+            iops = m / int(sz[:-1])
+            perc = int(d * 100.0 / m + 0.5)
+            avg_diff = int(((m - m0) * 100.) / m + 0.5)
+            data.extend([m, d, perc, iops, avg_diff])
+
+        print line_templ.format(*data)
+
+    print sep
+
+
+def main(argv):
+    path1 = argv[0]
+    if path1 == '--json':
+        print create_json_results(*get_data_from_output(argv[1]))
+    else:
+        show_data(*argv)
+    return 0
+
+if __name__ == "__main__":
+    exit(main(sys.argv[1:]))
+# print " ", results[k]
diff --git a/scripts/data_extractor.py b/scripts/data_extractor.py
new file mode 100644
index 0000000..d655f17
--- /dev/null
+++ b/scripts/data_extractor.py
@@ -0,0 +1,196 @@
+import sys
+import json
+import sqlite3
+import contextlib
+
+
+def connect(url):
+    return sqlite3.connect(url)
+
+
+create_db_sql_templ = """
+CREATE TABLE build (id integer primary key,
+                    build text,
+                    type text,
+                    md5 text);
+
+CREATE TABLE params_combination (id integer primary key, {params});
+CREATE TABLE param (id integer primary key, name text, type text);
+
+CREATE TABLE result (build_id integer,
+                     params_combination integer,
+                     bandwith float,
+                     deviation float);
+"""
+
+
+PARAM_COUNT = 20
+
+
+def get_all_tables(conn):
+    cursor = conn.cursor()
+    cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
+    return cursor.fetchall()
+
+
+def drop_database(conn):
+    cursor = conn.cursor()
+    cursor.execute("drop table result")
+    cursor.execute("drop table params_combination")
+    cursor.execute("drop table build")
+    cursor.execute("drop table param")
+
+
+def init_database(conn):
+    cursor = conn.cursor()
+
+    params = ["param_{0} text".format(i) for i in range(PARAM_COUNT)]
+    create_db_sql = create_db_sql_templ.format(params=",".join(params))
+
+    for sql in create_db_sql.split(";"):
+        cursor.execute(sql)
+
+
+def insert_io_params(conn):
+    sql = """insert into param (name, type) values ('operation',
+                '{write,randwrite,read,randread}');
+             insert into param (name, type) values ('sync', '{a,s}');
+             insert into param (name, type) values ('block_size', 'size_kmg');
+          """
+
+    for insert in sql.split(";"):
+        conn.execute(insert)
+
+
+def insert_build(cursor, build_id, build_type, iso_md5):
+    cursor.execute("insert into build (build, type, md5) values (?, ?, ?)",
+                   (build_id, build_type, iso_md5))
+    return cursor.lastrowid
+
+
+def insert_params(cursor, *param_vals):
+    param_vals = param_vals + ("",) * (PARAM_COUNT - len(param_vals))
+
+    params = ",".join(['?'] * PARAM_COUNT)
+    select_templ = "select id from params_combination where {params_where}"
+
+    params_where = ["param_{0}=?".format(i) for i in range(PARAM_COUNT)]
+    req = select_templ.format(params_where=" AND ".join(params_where))
+    cursor.execute(req, param_vals)
+    res = cursor.fetchall()
+    if [] != res:
+        return res[0][0]
+
+    params = ",".join(['?'] * PARAM_COUNT)
+    param_insert_templ = "insert into params_combination ({0}) values ({1})"
+    param_names = ",".join("param_{0}".format(i) for i in range(PARAM_COUNT))
+    req = param_insert_templ.format(param_names, params)
+    cursor.execute(req, param_vals)
+    return cursor.lastrowid
+
+
+def insert_results(cursor, build_id, params_id, bw, dev):
+    req = "insert into result values (?, ?, ?, ?)"
+    cursor.execute(req, (build_id, params_id, bw, dev))
+
+
+@contextlib.contextmanager
+def transaction(conn):
+    try:
+        cursor = conn.cursor()
+        yield cursor
+    except:
+        conn.rollback()
+        raise
+    else:
+        conn.commit()
+
+
+def json_to_db(json_data, conn):
+    data = json.loads(json_data)
+    with transaction(conn) as cursor:
+        for build_data in data:
+            build_id = insert_build(cursor,
+                                    build_data.pop("build_id"),
+                                    build_data.pop("type"),
+                                    build_data.pop("iso_md5"))
+
+            for params, (bw, dev) in build_data.items():
+                param_id = insert_params(cursor, *params.split(" "))
+                insert_results(cursor, build_id, param_id, bw, dev)
+
+
+def to_db():
+    conn = sqlite3.connect(sys.argv[1])
+    json_data = open(sys.argv[2]).read()
+
+    if len(get_all_tables(conn)) == 0:
+        init_database(conn)
+
+    json_to_db(json_data, conn)
+
+
+def ssize_to_kb(ssize):
+    try:
+        smap = dict(k=1, K=1, M=1024, m=1024, G=1024**2, g=1024**2)
+        for ext, coef in smap.items():
+            if ssize.endswith(ext):
+                return int(ssize[:-1]) * coef
+
+        if int(ssize) % 1024 != 0:
+            raise ValueError()
+
+        return int(ssize) / 1024
+
+    except (ValueError, TypeError, AttributeError):
+        tmpl = "Unknow size format {0!r} (or size not multiples 1024)"
+        raise ValueError(tmpl.format(ssize))
+
+
+def load_slice(cursor, build_id, y_param, **params):
+    params_id = {}
+    for param in list(params) + [y_param]:
+        cursor.execute("select id from param where name=?", (param,))
+        params_id[param] = cursor.fetchone()
+
+    sql = """select params_combination.param_{0}, result.bandwith
+             from params_combination, result
+             where result.build_id=?""".format(params_id[y_param])
+
+    for param, val in params.items():
+        pid = params_id[param]
+        sql += " and params_combination.param_{0}='{1}'".format(pid, val)
+
+    cursor.execute(sql)
+
+
+def from_db():
+    conn = sqlite3.connect(sys.argv[1])
+    # sql = sys.argv[2]
+    cursor = conn.cursor()
+
+    sql = """select params_combination.param_2, result.bandwith
+    from params_combination, result
+    where params_combination.param_0="write"
+          and params_combination.param_1="s"
+          and params_combination.id=result.params_combination
+          and result.build_id=60"""
+
+    cursor.execute(sql)
+    data = []
+
+    for (sz, bw) in cursor.fetchall():
+        data.append((ssize_to_kb(sz), sz, bw))
+
+    data.sort()
+
+    import matplotlib.pyplot as plt
+    xvals = range(len(data))
+    plt.plot(xvals, [dt[2] for dt in data])
+    plt.ylabel('bandwith')
+    plt.xlabel('block size')
+    plt.xticks(xvals, [dt[1] for dt in data])
+    plt.show()
+
+
+from_db()
diff --git a/data_generator.py b/scripts/data_generator.py
similarity index 100%
rename from data_generator.py
rename to scripts/data_generator.py
diff --git a/run.sh b/scripts/run.sh
old mode 100755
new mode 100644
similarity index 100%
rename from run.sh
rename to scripts/run.sh
diff --git a/scripts/run_2.sh b/scripts/run_2.sh
new file mode 100644
index 0000000..7bd92fb
--- /dev/null
+++ b/scripts/run_2.sh
@@ -0,0 +1,8 @@
+#!/bin/bash
+set -x
+set -e
+
+type="iozone"
+
+io_opts="--type $type -a write --iodepth 16 --blocksize 1m --iosize x20"
+python run_rally_test.py -l -o "$io_opts" -t io-scenario $type --rally-extra-opts="--deployment $1"
diff --git a/run_tests.sh b/scripts/run_tests.sh
similarity index 100%
rename from run_tests.sh
rename to scripts/run_tests.sh
diff --git a/scripts/starts_vms.py b/scripts/starts_vms.py
new file mode 100644
index 0000000..b52c2d5
--- /dev/null
+++ b/scripts/starts_vms.py
@@ -0,0 +1,240 @@
+import re
+import os
+import time
+
+import paramiko
+from novaclient.client import Client as n_client
+from cinderclient.v1.client import Client as c_client
+
+
+def ostack_get_creds():
+    env = os.environ.get
+    name = env('OS_USERNAME')
+    passwd = env('OS_PASSWORD')
+    tenant = env('OS_TENANT_NAME')
+    auth_url = env('OS_AUTH_URL')
+    return name, passwd, tenant, auth_url
+
+
+def nova_connect():
+    return n_client('1.1', *ostack_get_creds())
+
+
+def create_keypair(nova, name, key_path):
+    with open(key_path) as key:
+        return nova.keypairs.create(name, key.read())
+
+
+def create_volume(size, name=None, volid=[0]):
+    cinder = c_client(*ostack_get_creds())
+    name = 'ceph-test-{0}'.format(volid[0])
+    volid[0] = volid[0] + 1
+    vol = cinder.volumes.create(size=size, display_name=name)
+    err_count = 0
+    while vol.status != 'available':
+        if vol.status == 'error':
+            if err_count == 3:
+                print "Fail to create volume"
+                raise RuntimeError("Fail to create volume")
+            else:
+                err_count += 1
+                cinder.volumes.delete(vol)
+                time.sleep(1)
+                vol = cinder.volumes.create(size=size, display_name=name)
+                continue
+        time.sleep(1)
+        vol = cinder.volumes.get(vol.id)
+    return vol
+
+
+def wait_for_server_active(nova, server, timeout=240):
+    t = time.time()
+    while True:
+        time.sleep(5)
+        sstate = getattr(server, 'OS-EXT-STS:vm_state').lower()
+
+        if sstate == 'active':
+            return True
+
+        print "Curr state is", sstate, "waiting for active"
+
+        if sstate == 'error':
+            return False
+
+        if time.time() - t > timeout:
+            return False
+
+        server = nova.servers.get(server)
+
+
+def get_or_create_floating_ip(nova, pool, used_ip):
+    ip_list = nova.floating_ips.list()
+
+    if pool is not None:
+        ip_list = [ip for ip in ip_list if ip.pool == pool]
+
+    ip_list = [ip for ip in ip_list if ip.instance_id is None]
+    ip_list = [ip for ip in ip_list if ip.ip not in used_ip]
+
+    if len(ip_list) > 0:
+        return ip_list[0]
+    else:
+        return nova.floating_ips.create(pool)
+
+
+def create_vms(nova, amount, keypair_name, img_name,
+               flavor_name, vol_sz, network_zone_name=None):
+
+    network = nova.networks.find(label=network_zone_name)
+    nics = [{'net-id': network.id}]
+    fl = nova.flavors.find(name=flavor_name)
+    img = nova.images.find(name=img_name)
+    srvs = []
+    counter = 0
+
+    for i in range(3):
+        amount_left = amount - len(srvs)
+
+        new_srvs = []
+        for i in range(amount_left):
+            print "creating server"
+            srv = nova.servers.create("ceph-test-{0}".format(counter),
+                                      flavor=fl, image=img, nics=nics,
+                                      key_name=keypair_name)
+            counter += 1
+            new_srvs.append(srv)
+            print srv
+
+        deleted_servers = []
+        for srv in new_srvs:
+            if not wait_for_server_active(nova, srv):
+                print "Server", srv.name, "fails to start. Kill it and",
+                print " try again"
+
+                nova.servers.delete(srv)
+                deleted_servers.append(srv)
+            else:
+                srvs.append(srv)
+
+        if len(deleted_servers) != 0:
+            time.sleep(5)
+
+    if len(srvs) != amount:
+        print "ERROR: can't start required amount of servers. Exit"
+        raise RuntimeError("Fail to create {0} servers".format(amount))
+
+    result = {}
+    for srv in srvs:
+        print "wait till server be ready"
+        wait_for_server_active(nova, srv)
+        print "creating volume"
+        vol = create_volume(vol_sz)
+        print "attach volume to server"
+        nova.volumes.create_server_volume(srv.id, vol.id, None)
+        print "create floating ip"
+        flt_ip = get_or_create_floating_ip(nova, 'net04_ext', result.keys())
+        print "attaching ip to server"
+        srv.add_floating_ip(flt_ip)
+        result[flt_ip.ip] = srv
+
+    return result
+
+
+def clear_all(nova):
+    deleted_srvs = set()
+    for srv in nova.servers.list():
+        if re.match(r"ceph-test-\d+", srv.name):
+            print "Deleting server", srv.name
+            nova.servers.delete(srv)
+            deleted_srvs.add(srv.id)
+
+    while deleted_srvs != set():
+        print "Waiting till all servers are actually deleted"
+        all_id = set(srv.id for srv in nova.servers.list())
+        if all_id.intersection(deleted_srvs) == set():
+            print "Done, deleting volumes"
+            break
+        time.sleep(1)
+
+    # wait till vm actually deleted
+
+    cinder = c_client(*ostack_get_creds())
+    for vol in cinder.volumes.list():
+        if isinstance(vol.display_name, basestring):
+            if re.match(r'ceph-test-\d+', vol.display_name):
+                if vol.status in ('available', 'error'):
+                    print "Deleting volume", vol.display_name
+                    cinder.volumes.delete(vol)
+
+    print "Clearing done (yet some volumes may still deleting)"
+
+
+def wait_ssh_ready(host, user, key_file, retry_count=10, timeout=5):
+    ssh = paramiko.SSHClient()
+    ssh.load_host_keys('/dev/null')
+    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+    ssh.known_hosts = None
+
+    for i in range(retry_count):
+        try:
+            ssh.connect(host, username=user, key_filename=key_file,
+                        look_for_keys=False)
+            break
+        except:
+            if i == retry_count - 1:
+                raise
+            time.sleep(timeout)
+
+
+# def prepare_host(key_file, ip, fio_path, dst_fio_path, user='cirros'):
+#     print "Wait till ssh ready...."
+#     wait_ssh_ready(ip, user, key_file)
+
+#     print "Preparing host >"
+#     print "    Coping fio"
+#     copy_fio(key_file, ip, fio_path, user, dst_fio_path)
+
+#     key_opts = '-o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no'
+#     args = (key_file, user, ip, key_opts)
+#     cmd_format = "ssh {3} -i {0} {1}@{2} '{{0}}'".format(*args).format
+
+#     def exec_on_host(cmd):
+#         print "    " + cmd
+#         subprocess.check_call(cmd_format(cmd), shell=True)
+
+#     exec_on_host("sudo /usr/sbin/mkfs.ext4 /dev/vdb")
+#     exec_on_host("sudo /bin/mkdir /media/ceph")
+#     exec_on_host("sudo /bin/mount /dev/vdb /media/ceph")
+#     exec_on_host("sudo /bin/chmod a+rwx /media/ceph")
+
+
+def main():
+    image_name = 'TestVM'
+    flavor_name = 'ceph'
+    vol_sz = 50
+    network_zone_name = 'net04'
+    amount = 10
+    keypair_name = 'ceph-test'
+
+    nova = nova_connect()
+    clear_all(nova)
+
+    try:
+        ips = []
+        params = dict(vol_sz=vol_sz)
+        params['image_name'] = image_name
+        params['flavor_name'] = flavor_name
+        params['network_zone_name'] = network_zone_name
+        params['amount'] = amount
+        params['keypair_name'] = keypair_name
+
+        for ip, host in create_vms(nova, **params).items():
+            ips.append(ip)
+
+        print "All setup done! Ips =", " ".join(ips)
+        print "Starting tests"
+    finally:
+        clear_all(nova)
+
+if __name__ == "__main__":
+    exit(main())
diff --git a/ssh_copy_directory.py b/ssh_copy_directory.py
index dca0843..d074fcf 100644
--- a/ssh_copy_directory.py
+++ b/ssh_copy_directory.py
@@ -66,3 +66,25 @@
             remfile = os.path.join(remroot, f)
             localfile = os.path.join(root, f)
             ssh_copy_file(sftp, localfile, remfile, preserve_perm)
+
+
+def copy_paths(conn, paths):
+    sftp = conn.open_sftp()
+    try:
+        for src, dst in paths.items():
+            try:
+                if os.path.isfile(src):
+                    ssh_copy_file(sftp, src, dst)
+                elif os.path.isdir(src):
+                    put_dir_recursively(sftp, src, dst)
+                else:
+                    templ = "Can't copy {0!r} - " + \
+                            "it neither a file not a directory"
+                    msg = templ.format(src)
+                    raise OSError(msg)
+            except Exception as exc:
+                tmpl = "Scp {0!r} => {1!r} failed - {2!r}"
+                msg = tmpl.format(src, dst, exc)
+                raise OSError(msg)
+    finally:
+        sftp.close()
diff --git a/utils.py b/utils.py
new file mode 100644
index 0000000..8941ca5
--- /dev/null
+++ b/utils.py
@@ -0,0 +1,17 @@
+import multiprocessing
+
+
+def get_barrier(count):
+    val = multiprocessing.Value('i', count)
+    cond = multiprocessing.Condition()
+
+    def closure(timeout):
+        with cond:
+            val.value -= 1
+            if val.value == 0:
+                cond.notify_all()
+            else:
+                cond.wait(timeout)
+            return val.value == 0
+
+    return closure