tests finally can works over pure novaclient/ssh
diff --git a/itest.py b/itest.py
index 4f631d7..d4f4f52 100644
--- a/itest.py
+++ b/itest.py
@@ -5,10 +5,14 @@
 
 from io_scenario import io
 from ssh_copy_directory import copy_paths
+from utils import run_over_ssh
 
 
 class IPerfTest(object):
     def __init__(self, on_result_cb):
+        self.set_result_cb(on_result_cb)
+
+    def set_result_cb(self, on_result_cb):
         self.on_result_cb = on_result_cb
 
     def build(self, conn):
@@ -54,9 +58,8 @@
 
     def run(self, conn):
         args = ['env', 'python2', self.io_py_remote] + self.script_opts
-        code, out, err = conn.execute(" ".join(args))
+        code, out, err = run_over_ssh(conn, " ".join(args))
         self.on_result(code, out, err)
-        return code, out, err
 
     def on_result(self, code, out, err):
         if 0 == code:
diff --git a/log.py b/log.py
new file mode 100644
index 0000000..fdef091
--- /dev/null
+++ b/log.py
@@ -0,0 +1,21 @@
+import os
+import sys
+import datetime
+
+
+def _log(x):
+    dt_str = datetime.datetime.now().strftime("%H:%M:%S")
+    pref = dt_str + " " + str(os.getpid()) + " >>>> "
+    sys.stderr.write(pref + x.replace("\n", "\n" + pref) + "\n")
+
+
+logger = _log
+
+
+def log(x):
+    logger(x)
+
+
+def setlogger(nlogger):
+    global logger
+    logger = nlogger
diff --git a/rally_runner.py b/rally_runner.py
index e9c4caa..093ae12 100644
--- a/rally_runner.py
+++ b/rally_runner.py
@@ -5,6 +5,7 @@
 import warnings
 import functools
 import contextlib
+import multiprocessing
 
 from rally import exceptions
 from rally.cmd import cliutils
@@ -13,11 +14,8 @@
 from rally.benchmark.scenarios.vm.vmtasks import VMTasks
 
 import itest
-from utils import get_barrier
-
-
-def log(x):
-    pass
+from log import log
+from utils import get_barrier, wait_on_barrier
 
 
 @contextlib.contextmanager
@@ -75,25 +73,13 @@
             raise exceptions.ScriptError("monkeypatch code fails on "
                                          "ssh._client.open_sftp()")
 
-        test_iter = itest.run_test_iter(test_obj, ssh)
+        test_iter = itest.run_test_iter(test_obj, ssh._get_client())
 
         next(test_iter)
 
         log("Start io test")
 
-        if barrier is not None:
-            if latest_start_time is not None:
-                timeout = latest_start_time - time.time()
-            else:
-                timeout = None
-
-            if timeout is not None and timeout > 0:
-                msg = "Ready and waiting on barrier. " + \
-                      "Will wait at most {0} seconds"
-                log(msg.format(int(timeout)))
-
-                if not barrier(timeout):
-                    log("Barrier timeouted")
+        wait_on_barrier(barrier, latest_start_time)
 
         try:
             code, out, err = next(test_iter)
@@ -120,10 +106,6 @@
         VMScenario.run_action = orig
 
 
-def run_rally(rally_args):
-    return cliutils.run(['rally', "--rally-debug"] + rally_args, categories)
-
-
 def prepare_files(files_dir):
 
     # we do need temporary named files
@@ -164,7 +146,7 @@
             with do_patch1(obj, barrier, max_release_time):
                 opts = ['task', 'start', yaml_file] + list(rally_extra_opts)
                 log("Start rally with opts '{0}'".format(" ".join(opts)))
-                run_rally(opts)
+                cliutils.run(['rally', "--rally-debug"] + opts, categories)
     finally:
         if not keep_temp_files:
             os.unlink(yaml_file)
@@ -176,9 +158,18 @@
                      keep_temp_files):
 
     def closure(obj):
+        result_queue = multiprocessing.Queue()
+        obj.set_result_cb(result_queue.put)
+
         run_tests_using_rally(obj,
                               files_dir,
                               max_preparation_time,
                               rally_extra_opts,
                               keep_temp_files)
+
+        test_result = []
+        while not result_queue.empty():
+            test_result.append(result_queue.get())
+
+        return test_result
     return closure
diff --git a/run_rally_test.py b/run_rally_test.py
deleted file mode 100644
index 5a4df58..0000000
--- a/run_rally_test.py
+++ /dev/null
@@ -1,137 +0,0 @@
-import os
-import sys
-import json
-import pprint
-import os.path
-import argparse
-import datetime
-import multiprocessing
-
-import io_scenario
-import rally_runner
-from itest import IOPerfTest
-
-
-def log(x):
-    dt_str = datetime.datetime.now().strftime("%H:%M:%S")
-    pref = dt_str + " " + str(os.getpid()) + " >>>> "
-    sys.stderr.write(pref + x.replace("\n", "\n" + pref) + "\n")
-
-
-def run_io_test(tool,
-                script_args,
-                test_runner,
-                keep_temp_files=False):
-
-    files_dir = os.path.dirname(io_scenario.__file__)
-
-    path = 'iozone' if 'iozone' == tool else 'fio'
-    src_testtool_path = os.path.join(files_dir, path)
-
-    result_queue = multiprocessing.Queue()
-
-    obj = IOPerfTest(script_args,
-                     src_testtool_path,
-                     result_queue.put,
-                     keep_temp_files)
-
-    test_runner(obj)
-
-    test_result = []
-    while not result_queue.empty():
-        test_result.append(result_queue.get())
-
-    return test_result
-
-
-def parse_args(argv):
-    parser = argparse.ArgumentParser(
-        description="Run rally disk io performance test")
-    parser.add_argument("tool_type", help="test tool type",
-                        choices=['iozone', 'fio'])
-    parser.add_argument("-l", dest='extra_logs',
-                        action='store_true', default=False,
-                        help="print some extra log info")
-    parser.add_argument("-o", "--io-opts", dest='io_opts',
-                        required=True,
-                        help="cmd line options for io.py")
-    parser.add_argument("-t", "--test-directory", help="directory with test",
-                        dest="test_directory", required=True)
-    parser.add_argument("--max-preparation-time", default=300,
-                        type=int, dest="max_preparation_time")
-    parser.add_argument("-k", "--keep", default=False,
-                        help="keep temporary files",
-                        dest="keep_temp_files", action='store_true')
-    parser.add_argument("--rally-extra-opts", dest="rally_extra_opts",
-                        default="", help="rally extra options")
-
-    return parser.parse_args(argv)
-
-
-def main(argv):
-    opts = parse_args(argv)
-
-    if not opts.extra_logs:
-        global log
-
-        def nolog(x):
-            pass
-
-        log = nolog
-    else:
-        rally_runner.log = log
-
-    script_args = [opt.strip()
-                   for opt in opts.io_opts.split(" ")
-                   if opt.strip() != ""]
-
-    runner = rally_runner.get_rally_runner(
-        files_dir=os.path.dirname(io_scenario.__file__),
-        rally_extra_opts=opts.rally_extra_opts.split(" "),
-        max_preparation_time=opts.max_preparation_time,
-        keep_temp_files=opts.keep_temp_files)
-
-    res = run_io_test(opts.tool_type,
-                      script_args,
-                      runner,
-                      opts.keep_temp_files)
-
-    print "=" * 80
-    print pprint.pformat(res)
-    print "=" * 80
-
-    if len(res) != 0:
-        bw_mean = 0.0
-        for measurement in res:
-            bw_mean += measurement["bw_mean"]
-
-        bw_mean /= len(res)
-
-        it = ((bw_mean - measurement["bw_mean"]) ** 2 for measurement in res)
-        bw_dev = sum(it) ** 0.5
-
-        meta = res[0]['__meta__']
-        key = "{0} {1} {2}k".format(meta['action'],
-                                    's' if meta['sync'] else 'a',
-                                    meta['blocksize'])
-
-        print
-        print "====> " + json.dumps({key: (int(bw_mean), int(bw_dev))})
-        print
-        print "=" * 80
-
-    return 0
-
-
-ostack_prepare = """
-glance image-create --name 'ubuntu' --disk-format qcow2
---container-format bare --is-public true --copy-from
-https://cloud-images.ubuntu.com/trusty/current/trusty-server-cloudimg-amd64-disk1.img
-
-nova flavor-create ceph.512 ceph.512 512 50 1
-nova server-group-create --policy anti-affinity ceph
-"""
-
-
-if __name__ == '__main__':
-    exit(main(sys.argv[1:]))
diff --git a/run_test.py b/run_test.py
new file mode 100644
index 0000000..04176b6
--- /dev/null
+++ b/run_test.py
@@ -0,0 +1,165 @@
+import os
+import sys
+import json
+import time
+import pprint
+import os.path
+import argparse
+
+import io_scenario
+from itest import IOPerfTest
+from log import setlogger
+
+import ssh_runner
+import rally_runner
+
+from starts_vms import nova_connect, create_vms_mt, clear_all
+
+
+def run_io_test(tool,
+                script_args,
+                test_runner,
+                keep_temp_files=False):
+
+    files_dir = os.path.dirname(io_scenario.__file__)
+
+    path = 'iozone' if 'iozone' == tool else 'fio'
+    src_testtool_path = os.path.join(files_dir, path)
+
+    obj = IOPerfTest(script_args,
+                     src_testtool_path,
+                     None,
+                     keep_temp_files)
+
+    return test_runner(obj)
+
+
+def parse_args(argv):
+    parser = argparse.ArgumentParser(
+        description="Run disk io performance test")
+
+    parser.add_argument("tool_type", help="test tool type",
+                        choices=['iozone', 'fio'])
+
+    parser.add_argument("-l", dest='extra_logs',
+                        action='store_true', default=False,
+                        help="print some extra log info")
+
+    parser.add_argument("-o", "--io-opts", dest='io_opts',
+                        required=True,
+                        help="cmd line options for io.py")
+
+    parser.add_argument("-t", "--test-directory", help="directory with test",
+                        dest="test_directory", required=True)
+
+    parser.add_argument("--max-preparation-time", default=300,
+                        type=int, dest="max_preparation_time")
+
+    parser.add_argument("-k", "--keep", default=False,
+                        help="keep temporary files",
+                        dest="keep_temp_files", action='store_true')
+
+    parser.add_argument("--runner", required=True,
+                        choices=["ssh", "rally"],
+                        help="runner type")
+
+    parser.add_argument("--runner-extra-opts", default="",
+                        dest="runner_opts", help="runner extra options")
+
+    return parser.parse_args(argv)
+
+
+def main(argv):
+    opts = parse_args(argv)
+
+    if not opts.extra_logs:
+        def nolog(x):
+            pass
+
+        setlogger(nolog)
+
+    script_args = [opt.strip()
+                   for opt in opts.io_opts.split(" ")
+                   if opt.strip() != ""]
+
+    if opts.runner == "rally":
+        runner = rally_runner.get_rally_runner(
+            files_dir=os.path.dirname(io_scenario.__file__),
+            rally_extra_opts=opts.runner_opts.split(" "),
+            max_preparation_time=opts.max_preparation_time,
+            keep_temp_files=opts.keep_temp_files)
+        res = run_io_test(opts.tool_type,
+                          script_args,
+                          runner,
+                          opts.keep_temp_files)
+    elif opts.runner == "ssh":
+        user, key_file = opts.runner_opts.split(" ", 1)
+
+        latest_start_time = opts.max_preparation_time + time.time()
+
+        nova = nova_connect()
+
+        # nova, amount, keypair_name, img_name,
+        # flavor_name, vol_sz=None, network_zone_name=None,
+        # flt_ip_pool=None, name_templ='ceph-test-{}',
+        # scheduler_hints=None
+
+        try:
+            ips = [i[0] for i in create_vms_mt(nova, 3,
+                                               keypair_name='ceph',
+                                               img_name='ubuntu',
+                                               flavor_name='ceph.512',
+                                               network_zone_name='net04',
+                                               flt_ip_pool='net04_ext')]
+
+            uris = ["{0}@{1}::{2}".format(user, ip, key_file) for ip in ips]
+
+            runner = ssh_runner.get_ssh_runner(uris,
+                                               latest_start_time,
+                                               opts.keep_temp_files)
+            res = run_io_test(opts.tool_type,
+                              script_args,
+                              runner,
+                              opts.keep_temp_files)
+        finally:
+            clear_all(nova)
+
+    print "=" * 80
+    print pprint.pformat(res)
+    print "=" * 80
+
+    if len(res) != 0:
+        bw_mean = 0.0
+        for measurement in res:
+            bw_mean += measurement["bw_mean"]
+
+        bw_mean /= len(res)
+
+        it = ((bw_mean - measurement["bw_mean"]) ** 2 for measurement in res)
+        bw_dev = sum(it) ** 0.5
+
+        meta = res[0]['__meta__']
+        key = "{0} {1} {2}k".format(meta['action'],
+                                    's' if meta['sync'] else 'a',
+                                    meta['blocksize'])
+
+        print
+        print "====> " + json.dumps({key: (int(bw_mean), int(bw_dev))})
+        print
+        print "=" * 80
+
+    return 0
+
+
+ostack_prepare = """
+glance image-create --name 'ubuntu' --disk-format qcow2
+--container-format bare --is-public true --copy-from
+https://cloud-images.ubuntu.com/trusty/current/trusty-server-cloudimg-amd64-disk1.img
+
+nova flavor-create ceph.512 ceph.512 512 50 1
+nova server-group-create --policy anti-affinity ceph
+"""
+
+
+if __name__ == '__main__':
+    exit(main(sys.argv[1:]))
diff --git a/scripts/run.sh b/scripts/run.sh
index 342356e..b5bc41c 100644
--- a/scripts/run.sh
+++ b/scripts/run.sh
@@ -7,12 +7,12 @@
 bsizes="1k 4k 64k 256k 1m"
 ops="write randwrite"
 osync="s a"
-three_times="1 2 3"
+num_times=3
 
 for bsize in $bsizes ; do
 	for op in $ops ; do 
 		for sync in $osync ; do 
-			for xxx in $three_times ; do
+			for counter in $(seq 1 $num_times) ; do
 				if [[ "$ops" == "write" && "$osync" == "s" ]] ; then
 					continue
 				fi
@@ -30,7 +30,7 @@
 				fi
 
 				io_opts="--type $type -a $op --iodepth 16 --blocksize $bsize --iosize $factor $ssync"
-				python run_rally_test.py -l -o "$io_opts" -t io-scenario $type --rally-extra-opts="--deployment $1"
+				python run_test.py --runner rally -l -o "$io_opts" -t io-scenario $type --runner-extra-opts="--deployment $1"
 			done
 		done
 	done
diff --git a/scripts/run_2.sh b/scripts/run_2.sh
index 7bd92fb..2265176 100644
--- a/scripts/run_2.sh
+++ b/scripts/run_2.sh
@@ -1,8 +1,33 @@
 #!/bin/bash
 set -x
-set -e
 
 type="iozone"
 
+# nova image-list | grep ' ubuntu ' >/dev/null
+# if [ $? -ne 0 ] ; then
+# 	url="https://cloud-images.ubuntu.com/trusty/current/trusty-server-cloudimg-amd64-disk1.img"
+# 	glance image-create --name 'ubuntu' --disk-format qcow2 --container-format bare --is-public true --copy-from $url
+# fi
+
+# nova flavor-list | grep ' ceph.512 ' >/dev/null
+# if [ $? -ne 0 ] ; then
+# 	nova flavor-create ceph.512 ceph.512 512 50 1
+# fi
+
+# nova server-group-list | grep ' ceph ' >/dev/null
+# if [ $? -ne 0 ] ; then
+# 	nova server-group-create --policy anti-affinity ceph
+# fi
+
+# nova keypair-list | grep ' ceph ' >/dev/null
+# if [ $? -ne 0 ] ; then
+# 	nova keypair-add ceph > ceph.pem
+# fi
+
+set -e
+
 io_opts="--type $type -a write --iodepth 16 --blocksize 1m --iosize x20"
-python run_rally_test.py -l -o "$io_opts" -t io-scenario $type --rally-extra-opts="--deployment $1"
+python run_test.py --runner ssh -l -o "$io_opts" -t io-scenario $type --runner-extra-opts="ubuntu ceph.pem"
+
+# io_opts="--type $type -a write --iodepth 16 --blocksize 1m --iosize x20"
+# python run_test.py --runner rally -l -o "$io_opts" -t io-scenario $type --runner-extra-opts="--deployment perf-1"
diff --git a/ssh_copy_directory.py b/ssh_copy_directory.py
index 5843c18..d074fcf 100644
--- a/ssh_copy_directory.py
+++ b/ssh_copy_directory.py
@@ -69,7 +69,7 @@
 
 
 def copy_paths(conn, paths):
-    sftp = conn._client.open_sftp()
+    sftp = conn.open_sftp()
     try:
         for src, dst in paths.items():
             try:
diff --git a/ssh_runner.py b/ssh_runner.py
new file mode 100644
index 0000000..76fc6b5
--- /dev/null
+++ b/ssh_runner.py
@@ -0,0 +1,116 @@
+import re
+import Queue
+import traceback
+import threading
+
+from utils import ssh_connect
+
+import itest
+from utils import get_barrier, log_error
+
+conn_uri_attrs = ("user", "passwd", "host", "port", "path")
+
+
+class ConnCreds(object):
+    def __init__(self):
+        for name in conn_uri_attrs:
+            setattr(self, name, None)
+
+
+uri_reg_exprs = []
+
+
+class URIsNamespace(object):
+    class ReParts(object):
+        user_rr = "[^:]*?"
+        host_rr = "[^:]*?"
+        port_rr = "\\d+"
+        key_file_rr = ".*"
+
+    re_dct = ReParts.__dict__
+
+    for attr_name, val in re_dct.items():
+        if attr_name.endswith('_rr'):
+            new_rr = "(?P<{0}>{1})".format(attr_name[:-3], val)
+            setattr(ReParts, attr_name, new_rr)
+
+    re_dct = ReParts.__dict__
+
+    templs = [
+        "^{user_rr}@{host_rr}::{key_file_rr}$"
+    ]
+
+    for templ in templs:
+        uri_reg_exprs.append(templ.format(**re_dct))
+
+
+def parse_ssh_uri(uri):
+    # user:passwd@ip_host:port
+    # user:passwd@ip_host
+    # user@ip_host:port
+    # user@ip_host
+    # ip_host:port
+    # ip_host
+    # user@ip_host:port:path_to_key_file
+    # user@ip_host::path_to_key_file
+    # ip_host:port:path_to_key_file
+    # ip_host::path_to_key_file
+
+    res = ConnCreds()
+
+    for rr in uri_reg_exprs:
+        rrm = re.match(rr, uri)
+        if rrm is not None:
+            res.__dict__.update(rrm.groupdict())
+            return res
+    raise ValueError("Can't parse {0!r} as ssh uri value".format(uri))
+
+
+def connect(uri):
+    creds = parse_ssh_uri(uri)
+    return ssh_connect(creds.host, creds.user, creds.key_file)
+
+
+def conn_func(obj, barrier, latest_start_time, conn):
+    try:
+        test_iter = itest.run_test_iter(obj, conn)
+        next(test_iter)
+
+        with log_error("!Run test"):
+            return next(test_iter)
+    except:
+        print traceback.format_exc()
+        raise
+
+
+def get_ssh_runner(uris,
+                   latest_start_time=None,
+                   keep_temp_files=False):
+
+    connections = [connect(uri) for uri in uris]
+    result_queue = Queue.Queue()
+    barrier = get_barrier(len(uris), threaded=True)
+
+    def closure(obj):
+        ths = []
+        obj.set_result_cb(result_queue.put)
+
+        params = (obj, barrier, latest_start_time)
+
+        for conn in connections:
+            th = threading.Thread(None, conn_func, None,
+                                  params + (conn,))
+            th.daemon = True
+            th.start()
+            ths.append(th)
+
+        for th in ths:
+            th.join()
+
+        test_result = []
+        while not result_queue.empty():
+            test_result.append(result_queue.get())
+
+        return test_result
+
+    return closure
diff --git a/scripts/starts_vms.py b/starts_vms.py
similarity index 62%
rename from scripts/starts_vms.py
rename to starts_vms.py
index b52c2d5..9a00b34 100644
--- a/scripts/starts_vms.py
+++ b/starts_vms.py
@@ -2,7 +2,8 @@
 import os
 import time
 
-import paramiko
+from concurrent.futures import ThreadPoolExecutor
+
 from novaclient.client import Client as n_client
 from cinderclient.v1.client import Client as c_client
 
@@ -25,10 +26,8 @@
         return nova.keypairs.create(name, key.read())
 
 
-def create_volume(size, name=None, volid=[0]):
+def create_volume(size, name):
     cinder = c_client(*ostack_get_creds())
-    name = 'ceph-test-{0}'.format(volid[0])
-    volid[0] = volid[0] + 1
     vol = cinder.volumes.create(size=size, display_name=name)
     err_count = 0
     while vol.status != 'available':
@@ -67,83 +66,99 @@
         server = nova.servers.get(server)
 
 
-def get_or_create_floating_ip(nova, pool, used_ip):
+class Allocate(object):
+    pass
+
+
+def get_floating_ips(nova, pool, amount):
     ip_list = nova.floating_ips.list()
 
     if pool is not None:
         ip_list = [ip for ip in ip_list if ip.pool == pool]
 
-    ip_list = [ip for ip in ip_list if ip.instance_id is None]
-    ip_list = [ip for ip in ip_list if ip.ip not in used_ip]
+    return [ip for ip in ip_list if ip.instance_id is None][:amount]
 
-    if len(ip_list) > 0:
-        return ip_list[0]
+
+def create_vms_mt(nova, amount, keypair_name, img_name,
+                  flavor_name, vol_sz=None, network_zone_name=None,
+                  flt_ip_pool=None, name_templ='ceph-test-{}',
+                  scheduler_hints=None):
+
+    if network_zone_name is not None:
+        network = nova.networks.find(label=network_zone_name)
+        nics = [{'net-id': network.id}]
     else:
-        return nova.floating_ips.create(pool)
+        nics = None
 
-
-def create_vms(nova, amount, keypair_name, img_name,
-               flavor_name, vol_sz, network_zone_name=None):
-
-    network = nova.networks.find(label=network_zone_name)
-    nics = [{'net-id': network.id}]
     fl = nova.flavors.find(name=flavor_name)
     img = nova.images.find(name=img_name)
-    srvs = []
-    counter = 0
 
+    if flt_ip_pool is not None:
+        ips = get_floating_ips(nova, flt_ip_pool, amount)
+        ips += [Allocate] * (amount - len(ips))
+    else:
+        ips = [None] * amount
+
+    print "Try to start {0} servers".format(amount)
+    names = map(name_templ.format, range(amount))
+
+    with ThreadPoolExecutor(max_workers=16) as executor:
+        futures = []
+        for name, flt_ip in zip(names, ips):
+            params = (nova, name, keypair_name, img, fl,
+                      nics, vol_sz, flt_ip, scheduler_hints,
+                      flt_ip_pool)
+
+            futures.append(executor.submit(create_vm, *params))
+        return [future.result() for future in futures]
+
+
+def create_vm(nova, name, keypair_name, img,
+              fl, nics, vol_sz=None,
+              flt_ip=False,
+              scheduler_hints=None,
+              pool=None):
     for i in range(3):
-        amount_left = amount - len(srvs)
+        srv = nova.servers.create(name,
+                                  flavor=fl, image=img, nics=nics,
+                                  key_name=keypair_name,
+                                  scheduler_hints=scheduler_hints)
 
-        new_srvs = []
-        for i in range(amount_left):
-            print "creating server"
-            srv = nova.servers.create("ceph-test-{0}".format(counter),
-                                      flavor=fl, image=img, nics=nics,
-                                      key_name=keypair_name)
-            counter += 1
-            new_srvs.append(srv)
-            print srv
+        if not wait_for_server_active(nova, srv):
+            msg = "Server {0} fails to start. Kill it and try again"
+            print msg.format(srv.name)
+            nova.servers.delete(srv)
 
-        deleted_servers = []
-        for srv in new_srvs:
-            if not wait_for_server_active(nova, srv):
-                print "Server", srv.name, "fails to start. Kill it and",
-                print " try again"
+            while True:
+                print "wait till server deleted"
+                all_id = set(alive_srv.id for alive_srv in nova.servers.list())
+                if srv.id not in all_id:
+                    break
+                time.sleep(1)
+        else:
+            break
 
-                nova.servers.delete(srv)
-                deleted_servers.append(srv)
-            else:
-                srvs.append(srv)
-
-        if len(deleted_servers) != 0:
-            time.sleep(5)
-
-    if len(srvs) != amount:
-        print "ERROR: can't start required amount of servers. Exit"
-        raise RuntimeError("Fail to create {0} servers".format(amount))
-
-    result = {}
-    for srv in srvs:
-        print "wait till server be ready"
-        wait_for_server_active(nova, srv)
+    if vol_sz is not None:
         print "creating volume"
-        vol = create_volume(vol_sz)
+        vol = create_volume(vol_sz, name)
         print "attach volume to server"
         nova.volumes.create_server_volume(srv.id, vol.id, None)
-        print "create floating ip"
-        flt_ip = get_or_create_floating_ip(nova, 'net04_ext', result.keys())
+
+    if flt_ip is Allocate:
+        flt_ip = nova.floating_ips.create(pool)
+
+    if flt_ip is not None:
         print "attaching ip to server"
         srv.add_floating_ip(flt_ip)
-        result[flt_ip.ip] = srv
-
-    return result
+        return (flt_ip.ip, srv)
+    else:
+        return (None, srv)
 
 
-def clear_all(nova):
+def clear_all(nova, name_templ="ceph-test-{}"):
     deleted_srvs = set()
     for srv in nova.servers.list():
-        if re.match(r"ceph-test-\d+", srv.name):
+        if re.match(name_templ.format("\\d+"), srv.name):
             print "Deleting server", srv.name
             nova.servers.delete(srv)
             deleted_srvs.add(srv.id)
@@ -161,7 +176,7 @@
     cinder = c_client(*ostack_get_creds())
     for vol in cinder.volumes.list():
         if isinstance(vol.display_name, basestring):
-            if re.match(r'ceph-test-\d+', vol.display_name):
+            if re.match(name_templ.format("\\d+"), vol.display_name):
                 if vol.status in ('available', 'error'):
                     print "Deleting volume", vol.display_name
                     cinder.volumes.delete(vol)
@@ -169,23 +184,6 @@
     print "Clearing done (yet some volumes may still deleting)"
 
 
-def wait_ssh_ready(host, user, key_file, retry_count=10, timeout=5):
-    ssh = paramiko.SSHClient()
-    ssh.load_host_keys('/dev/null')
-    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
-    ssh.known_hosts = None
-
-    for i in range(retry_count):
-        try:
-            ssh.connect(host, username=user, key_filename=key_file,
-                        look_for_keys=False)
-            break
-        except:
-            if i == retry_count - 1:
-                raise
-            time.sleep(timeout)
-
-
 # def prepare_host(key_file, ip, fio_path, dst_fio_path, user='cirros'):
 #     print "Wait till ssh ready...."
 #     wait_ssh_ready(ip, user, key_file)
@@ -228,7 +226,7 @@
         params['amount'] = amount
         params['keypair_name'] = keypair_name
 
-        for ip, host in create_vms(nova, **params).items():
+        for ip, host in create_vms(nova, **params):
             ips.append(ip)
 
         print "All setup done! Ips =", " ".join(ips)
diff --git a/storage_api.py b/storage_api.py
index 5578984..6f1b714 100644
--- a/storage_api.py
+++ b/storage_api.py
@@ -15,7 +15,7 @@
 
     def __str__(self):
         return self.build + " " + self.build_type + " " + \
-        self.md5 + " " + str(self.results)
+            self.md5 + " " + str(self.results)
 
 
 def create_storage(url, email=None, password=None):
diff --git a/utils.py b/utils.py
index 8941ca5..0a38ad5 100644
--- a/utils.py
+++ b/utils.py
@@ -1,9 +1,21 @@
+import time
+import threading
+import contextlib
 import multiprocessing
 
+import paramiko
 
-def get_barrier(count):
-    val = multiprocessing.Value('i', count)
-    cond = multiprocessing.Condition()
+from log import log
+
+
+def get_barrier(count, threaded=False):
+    if threaded:
+        class val(object):
+            value = count
+        cond = threading.Condition()
+    else:
+        val = multiprocessing.Value('i', count)
+        cond = multiprocessing.Condition()
 
     def closure(timeout):
         with cond:
@@ -15,3 +27,62 @@
             return val.value == 0
 
     return closure
+
+
+def ssh_connect(host, user, key_file, retry_count=10, timeout=5):
+    ssh = paramiko.SSHClient()
+    ssh.load_host_keys('/dev/null')
+    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+    ssh.known_hosts = None
+
+    for i in range(retry_count):
+        try:
+            ssh.connect(host, username=user, key_filename=key_file,
+                        look_for_keys=False)
+            return ssh
+        except:
+            if i == retry_count - 1:
+                raise
+            time.sleep(timeout)
+
+
+def wait_on_barrier(barrier, latest_start_time):
+    if barrier is not None:
+        if latest_start_time is not None:
+            timeout = latest_start_time - time.time()
+        else:
+            timeout = None
+
+        if timeout is not None and timeout > 0:
+            msg = "Ready and waiting on barrier. " + \
+                  "Will wait at most {0} seconds"
+            log(msg.format(int(timeout)))
+
+            if not barrier(timeout):
+                log("Barrier timeouted")
+
+
+@contextlib.contextmanager
+def log_error(action, types=(Exception,)):
+    if not action.startswith("!"):
+        log("Starts : " + action)
+    else:
+        action = action[1:]
+
+    try:
+        yield
+    except Exception as exc:
+        if isinstance(exc, types) and not isinstance(exc, StopIteration):
+            templ = "Error during {0} stage: {1}"
+            log(templ.format(action, exc.message))
+        raise
+
+
+def run_over_ssh(conn, cmd):
+    "should be replaces by normal implementation, with select"
+
+    stdin, stdout, stderr = conn.exec_command(cmd)
+    out = stdout.read()
+    err = stderr.read()
+    code = stdout.channel.recv_exit_status()
+    return code, out, err