add local runner, fix some bugs
diff --git a/io_scenario/io.py b/io_scenario/io.py
index 7d5516d..7256f02 100644
--- a/io_scenario/io.py
+++ b/io_scenario/io.py
@@ -297,9 +297,9 @@
     if binary_path is None:
         sys.stderr.write("Can't found neither iozone not iozone3 binary"
                          "Provide --bonary-path or --binary-url option")
-        return False, None
+        return None
 
-    return False, binary_path
+    return binary_path
 
 # ------------------------------ FIO SUPPORT ---------------------------------
 
@@ -354,26 +354,14 @@
 
 
 def locate_fio():
-    return False, which('fio')
+    return which('fio')
 
 
 # ----------------------------------------------------------------------------
 
 
-def locate_binary(binary_tp, binary_url, binary_path):
-    remove_binary = False
-
-    if binary_url is not None:
-        if binary_path is not None:
-            sys.stderr.write("At most one option from --binary-path and "
-                             "--binary-url should be provided")
-            return False, None
-
-        binary_path = os.tmpnam()
-        install_iozone_static(binary_url, binary_path)
-        remove_binary = True
-
-    elif binary_path is not None:
+def locate_binary(binary_tp, binary_path):
+    if binary_path is not None:
         if os.path.isfile(binary_path):
             if not os.access(binary_path, os.X_OK):
                 st = os.stat(binary_path)
@@ -382,7 +370,7 @@
             binary_path = None
 
     if binary_path is not None:
-        return remove_binary, binary_path
+        return binary_path
 
     if 'iozone' == binary_tp:
         return locate_iozone()
@@ -459,7 +447,7 @@
         choices=['iozone', 'fio'], required=True)
     parser.add_argument(
         "--iodepth", metavar="IODEPTH", type=int,
-        help="I/O depths to test in kb", required=True)
+        help="I/O requests queue depths", required=True)
     parser.add_argument(
         '-a', "--action", metavar="ACTION", type=str,
         help="actions to run", required=True,
@@ -480,16 +468,13 @@
         "-d", "--direct-io", default=False, action="store_true",
         help="use O_DIRECT", dest='directio')
     parser.add_argument(
-        "-t", "--sync-time", default=None, type=int,
+        "-t", "--sync-time", default=None, type=int, metavar="UTC_TIME",
         help="sleep till sime utc time", dest='sync_time')
     parser.add_argument(
-        "--binary-url", help="static binary url",
-        dest="binary_url", default=None)
-    parser.add_argument(
         "--test-file", help="file path to run test on",
         default=None, dest='test_file')
     parser.add_argument(
-        "--binary-path", help="binary path",
+        "--binary-path", help="path to binary, used for testing",
         default=None, dest='binary_path')
     parser.add_argument(
         "--prepare-only", default=False, dest='prepare_only',
@@ -545,9 +530,8 @@
             warnings.simplefilter("ignore")
             test_file_name = os.tmpnam()
 
-    remove_binary, binary_path = locate_binary(argv_obj.type,
-                                               argv_obj.binary_url,
-                                               argv_obj.binary_path)
+    binary_path = locate_binary(argv_obj.type,
+                                argv_obj.binary_path)
 
     if binary_path is None:
         sys.stderr.write("Can't locate binary {0}\n".format(argv_obj.type))
@@ -592,9 +576,6 @@
             sys.stdout.write("\n")
 
     finally:
-        if remove_binary:
-            os.unlink(binary_path)
-
         if os.path.isfile(test_file_name) and not argv_obj.prepare_only:
             os.unlink(test_file_name)
 
diff --git a/run_test.py b/run_test.py
index 2f09ef8..b1d5a59 100644
--- a/run_test.py
+++ b/run_test.py
@@ -2,30 +2,34 @@
 import sys
 import json
 import time
+import shutil
 import pprint
+import weakref
 import logging
 import os.path
 import argparse
 import traceback
+import subprocess
+import contextlib
 
-import io_scenario
-from itest import IOPerfTest
-from rest_api import add_test
 
 import ssh_runner
+import io_scenario
+from utils import log_error
+from rest_api import add_test
+from itest import IOPerfTest, run_test_iter
+from starts_vms import nova_connect, create_vms_mt, clear_all
 
 try:
     import rally_runner
 except ImportError:
     rally_runner = None
 
-from starts_vms import nova_connect, create_vms_mt, clear_all
-
 
 logger = logging.getLogger("io-perf-tool")
-logger.setLevel(logging.DEBUG)
+logger.setLevel(logging.INFO)
 ch = logging.StreamHandler()
-ch.setLevel(logging.DEBUG)
+ch.setLevel(logging.INFO)
 logger.addHandler(ch)
 
 log_format = '%(asctime)s - %(levelname)s - %(name)s - %(message)s'
@@ -52,6 +56,74 @@
     return test_runner(obj)
 
 
+class FileWrapper(object):
+    def __init__(self, fd, conn):
+        self.fd = fd
+        self.channel_wr = weakref.ref(conn)
+
+    def read(self):
+        return self.fd.read()
+
+    @property
+    def channel(self):
+        return self.channel_wr()
+
+
+class LocalConnection(object):
+    def __init__(self):
+        self.proc = None
+
+    def exec_command(self, cmd):
+        PIPE = subprocess.PIPE
+        self.proc = subprocess.Popen(cmd,
+                                     shell=True,
+                                     stdout=PIPE,
+                                     stderr=PIPE,
+                                     stdin=PIPE)
+        res = (self.proc.stdin,
+               FileWrapper(self.proc.stdout, self),
+               self.proc.stderr)
+
+        return res
+
+    def recv_exit_status(self):
+        return self.proc.wait()
+
+    def open_sftp(self):
+        return self
+
+    def close(self):
+        pass
+
+    def put(self, localfile, remfile):
+        return shutil.copy(localfile, remfile)
+
+    def mkdir(self, remotepath, mode):
+        os.mkdir(remotepath)
+        os.chmod(remotepath, mode)
+
+    def chmod(self, remotepath, mode):
+        os.chmod(remotepath, mode)
+
+    def copytree(self, src, dst):
+        shutil.copytree(src, dst)
+
+
+def get_local_runner(clear_tmp_files=True):
+    def closure(obj):
+        res = []
+        obj.set_result_cb(res.append)
+        test_iter = run_test_iter(obj,
+                                  LocalConnection())
+        next(test_iter)
+
+        with log_error("!Run test"):
+            next(test_iter)
+        return res
+
+    return closure
+
+
 def parse_args(argv):
     parser = argparse.ArgumentParser(
         description="Run disk io performance test")
@@ -85,18 +157,23 @@
     parser.add_argument("-n", "--lab-name", default=None,
                         dest="lab_name")
 
+    parser.add_argument("--create-vms-opts", default=None,
+                        help="Creating vm's before run ssh runner",
+                        dest="create_vms_opts")
+
     parser.add_argument("-k", "--keep", default=False,
                         help="keep temporary files",
                         dest="keep_temp_files", action='store_true')
 
-    choices = ["ssh"]
+    choices = ["local", "ssh"]
+
     if rally_runner is not None:
         choices.append("rally")
 
     parser.add_argument("--runner", required=True,
                         choices=choices, help="runner type")
 
-    parser.add_argument("--runner-extra-opts", default="",
+    parser.add_argument("--runner-extra-opts", default=None,
                         dest="runner_opts", help="runner extra options")
 
     return parser.parse_args(argv)
@@ -179,17 +256,58 @@
     return templ.format(data, format_measurements_stat(res), "=" * 80)
 
 
+@contextlib.contextmanager
+def start_test_vms(opts):
+    create_vms_opts = {}
+    for opt in opts.split(","):
+        name, val = opt.split("=", 1)
+        create_vms_opts[name] = val
+
+    user = create_vms_opts.pop("user")
+    key_file = create_vms_opts.pop("key_file")
+    aff_group = create_vms_opts.pop("aff_group", None)
+    raw_count = create_vms_opts.pop("count", "x1")
+
+    logger.debug("Connection to nova")
+    nova = nova_connect()
+
+    if raw_count.startswith("x"):
+        logger.debug("Getting amount of compute services")
+        count = len(nova.services.list(binary="nova-compute"))
+        count *= int(raw_count[1:])
+    else:
+        count = int(raw_count)
+
+    if aff_group is not None:
+        scheduler_hints = {'group': aff_group}
+    else:
+        scheduler_hints = None
+
+    create_vms_opts['scheduler_hints'] = scheduler_hints
+
+    logger.debug("Will start {0} vms".format(count))
+
+    try:
+        ips = [i[0] for i in create_vms_mt(nova, count, **create_vms_opts)]
+
+        uris = ["{0}@{1}::{2}".format(user, ip, key_file) for ip in ips]
+
+        yield uris
+    except:
+        traceback.print_exc()
+    finally:
+        logger.debug("Clearing")
+        clear_all(nova)
+
+
 def main(argv):
     opts = parse_args(argv)
 
-    if not opts.extra_logs:
-        def nolog(x):
-            pass
+    if opts.extra_logs:
+        logger.setLevel(logging.DEBUG)
+        ch.setLevel(logging.DEBUG)
 
     io_opts = get_io_opts(opts.io_opts_file, opts.io_opts)
-    data_server_url = opts.data_server_url
-    # lab_name = opts.lab_name
-    build_name = opts.build_name
 
     if opts.runner == "rally":
         logger.debug("Use rally runner")
@@ -210,47 +328,42 @@
                               opts.keep_temp_files)
             logger.debug(format_result(res))
 
+    elif opts.runner == "local":
+        logger.debug("Run on local computer")
+        try:
+            for script_args in io_opts:
+                cmd_line = " ".join(script_args)
+                logger.debug("Run test with {0!r} params".format(cmd_line))
+                runner = get_local_runner(opts.keep_temp_files)
+                res = run_io_test(opts.tool_type,
+                                  script_args,
+                                  runner,
+                                  opts.keep_temp_files)
+                logger.debug(format_result(res))
+        except:
+            traceback.print_exc()
+            return 1
+
     elif opts.runner == "ssh":
         logger.debug("Use ssh runner")
-        create_vms_opts = {}
-        for opt in opts.runner_opts.split(","):
-            name, val = opt.split("=", 1)
-            create_vms_opts[name] = val
 
-        user = create_vms_opts.pop("user")
-        key_file = create_vms_opts.pop("key_file")
-        aff_group = create_vms_opts.pop("aff_group", None)
-        raw_count = create_vms_opts.pop("count", "x1")
+        uris = []
 
-        logger.debug("Connection to nova")
-        nova = nova_connect()
-
-        if raw_count.startswith("x"):
-            logger.debug("Getting amount of compute services")
-            count = len(nova.services.list(binary="nova-compute"))
-            count *= int(raw_count[1:])
+        if opts.create_vms_opts is not None:
+            vm_context = start_test_vms(opts.create_vms_opts)
+            uris += vm_context.__enter__()
         else:
-            count = int(raw_count)
+            vm_context = None
 
-        if aff_group is not None:
-            scheduler_hints = {'group': aff_group}
-        else:
-            scheduler_hints = None
+        if opts.runner_opts is not None:
+            uris += opts.runner_opts.split(";")
 
-        create_vms_opts['scheduler_hints'] = scheduler_hints
-
-        # nova, amount, keypair_name, img_name,
-        # flavor_name, vol_sz=None, network_zone_name=None,
-        # flt_ip_pool=None, name_templ='ceph-test-{0}',
-        # scheduler_hints=None
-
-        logger.debug("Will start {0} vms".format(count))
+        if len(uris) == 0:
+            logger.critical("You need to provide at least" +
+                            " vm spawn params or ssh params")
+            return 1
 
         try:
-            ips = [i[0] for i in create_vms_mt(nova, count, **create_vms_opts)]
-
-            uris = ["{0}@{1}::{2}".format(user, ip, key_file) for ip in ips]
-
             for script_args in io_opts:
                 cmd_line = " ".join(script_args)
                 logger.debug("Run test with {0!r} params".format(cmd_line))
@@ -266,13 +379,17 @@
 
         except:
             traceback.print_exc()
+            return 1
         finally:
-            logger.debug("Clearing")
-            clear_all(nova)
+            if vm_context is not None:
+                vm_context.__exit__()
+                logger.debug("Clearing")
 
-    result = json.loads(format_measurements_stat(res))
-    result['name'] = build_name
-    add_test(build_name, result, data_server_url)
+    if opts.data_server_url:
+        result = json.loads(format_measurements_stat(res))
+        result['name'] = opts.build_name
+        add_test(opts.build_name, result, opts.data_server_url)
+
     return 0
 
 
diff --git a/ssh_copy_directory.py b/ssh_copy_directory.py
index d074fcf..265133c 100644
--- a/ssh_copy_directory.py
+++ b/ssh_copy_directory.py
@@ -29,6 +29,11 @@
 def put_dir_recursively(sftp, localpath, remotepath, preserve_perm=True):
     "upload local directory to remote recursively"
 
+    # hack for localhost connection
+    if hasattr(sftp, "copytree"):
+        sftp.copytree(localpath, remotepath)
+        return
+
     assert remotepath.startswith("/"), "%s must be absolute path" % remotepath
 
     # normalize
diff --git a/ssh_runner.py b/ssh_runner.py
index d5d7aca..ad3f82e 100644
--- a/ssh_runner.py
+++ b/ssh_runner.py
@@ -30,7 +30,8 @@
         user_rr = "[^:]*?"
         host_rr = "[^:]*?"
         port_rr = "\\d+"
-        key_file_rr = ".*"
+        key_file_rr = "[^:@]*"
+        passwd_rr = ".*?"
 
     re_dct = ReParts.__dict__
 
@@ -42,7 +43,11 @@
     re_dct = ReParts.__dict__
 
     templs = [
-        "^{user_rr}@{host_rr}::{key_file_rr}$"
+        "^{host_rr}$",
+        "^{user_rr}@{host_rr}::{key_file_rr}$",
+        "^{user_rr}@{host_rr}:{port_rr}:{key_file_rr}$",
+        "^{user_rr}:{passwd_rr}@@{host_rr}$",
+        "^{user_rr}:{passwd_rr}@@{host_rr}:{port_rr}$",
     ]
 
     for templ in templs:
@@ -62,6 +67,7 @@
     # ip_host::path_to_key_file
 
     res = ConnCreds()
+    res.port = "22"
 
     for rr in uri_reg_exprs:
         rrm = re.match(rr, uri)
@@ -73,7 +79,8 @@
 
 def connect(uri):
     creds = parse_ssh_uri(uri)
-    return ssh_connect(creds.host, creds.user, creds.key_file)
+    creds.port = int(creds.port)
+    return ssh_connect(creds)
 
 
 def conn_func(obj, barrier, latest_start_time, conn):
diff --git a/utils.py b/utils.py
index 78a8df6..f73090a 100644
--- a/utils.py
+++ b/utils.py
@@ -1,4 +1,5 @@
 import time
+import socket
 import logging
 import threading
 import contextlib
@@ -31,18 +32,33 @@
     return closure
 
 
-def ssh_connect(host, user, key_file, retry_count=60, timeout=1):
+def ssh_connect(creds, retry_count=60, timeout=1):
     ssh = paramiko.SSHClient()
     ssh.load_host_keys('/dev/null')
     ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
     ssh.known_hosts = None
-
     for i in range(retry_count):
         try:
-            ssh.connect(host, username=user, key_filename=key_file,
-                        look_for_keys=False)
-            return ssh
-        except:
+            if creds.passwd is not None:
+                ssh.connect(creds.host,
+                            username=creds.user,
+                            password=creds.passwd,
+                            port=creds.port,
+                            allow_agent=False,
+                            look_for_keys=False)
+                return ssh
+
+            if creds.key_file is not None:
+                ssh.connect(creds.host,
+                            username=creds.user,
+                            key_filename=creds.key_file,
+                            look_for_keys=False,
+                            port=creds.port)
+                return ssh
+            raise ValueError("Wrong credentials {0}".format(creds.__dict__))
+        except paramiko.PasswordRequiredException:
+            raise
+        except socket.error:
             if i == retry_count - 1:
                 raise
             time.sleep(timeout)
@@ -62,6 +78,8 @@
 
             if not barrier(timeout):
                 logger.debug("Barrier timeouted")
+            else:
+                logger.debug("Passing barrier, starting test")
 
 
 @contextlib.contextmanager