add local runner, fix some bugs
diff --git a/run_test.py b/run_test.py
index 2f09ef8..b1d5a59 100644
--- a/run_test.py
+++ b/run_test.py
@@ -2,30 +2,34 @@
 import sys
 import json
 import time
+import shutil
 import pprint
+import weakref
 import logging
 import os.path
 import argparse
 import traceback
+import subprocess
+import contextlib
 
-import io_scenario
-from itest import IOPerfTest
-from rest_api import add_test
 
 import ssh_runner
+import io_scenario
+from utils import log_error
+from rest_api import add_test
+from itest import IOPerfTest, run_test_iter
+from starts_vms import nova_connect, create_vms_mt, clear_all
 
 try:
     import rally_runner
 except ImportError:
     rally_runner = None
 
-from starts_vms import nova_connect, create_vms_mt, clear_all
-
 
 logger = logging.getLogger("io-perf-tool")
-logger.setLevel(logging.DEBUG)
+logger.setLevel(logging.INFO)
 ch = logging.StreamHandler()
-ch.setLevel(logging.DEBUG)
+ch.setLevel(logging.INFO)
 logger.addHandler(ch)
 
 log_format = '%(asctime)s - %(levelname)s - %(name)s - %(message)s'
@@ -52,6 +56,74 @@
     return test_runner(obj)
 
 
+class FileWrapper(object):
+    def __init__(self, fd, conn):
+        self.fd = fd
+        self.channel_wr = weakref.ref(conn)
+
+    def read(self):
+        return self.fd.read()
+
+    @property
+    def channel(self):
+        return self.channel_wr()
+
+
+class LocalConnection(object):
+    def __init__(self):
+        self.proc = None
+
+    def exec_command(self, cmd):
+        PIPE = subprocess.PIPE
+        self.proc = subprocess.Popen(cmd,
+                                     shell=True,
+                                     stdout=PIPE,
+                                     stderr=PIPE,
+                                     stdin=PIPE)
+        res = (self.proc.stdin,
+               FileWrapper(self.proc.stdout, self),
+               self.proc.stderr)
+
+        return res
+
+    def recv_exit_status(self):
+        return self.proc.wait()
+
+    def open_sftp(self):
+        return self
+
+    def close(self):
+        pass
+
+    def put(self, localfile, remfile):
+        return shutil.copy(localfile, remfile)
+
+    def mkdir(self, remotepath, mode):
+        os.mkdir(remotepath)
+        os.chmod(remotepath, mode)
+
+    def chmod(self, remotepath, mode):
+        os.chmod(remotepath, mode)
+
+    def copytree(self, src, dst):
+        shutil.copytree(src, dst)
+
+
+def get_local_runner(clear_tmp_files=True):
+    def closure(obj):
+        res = []
+        obj.set_result_cb(res.append)
+        test_iter = run_test_iter(obj,
+                                  LocalConnection())
+        next(test_iter)
+
+        with log_error("!Run test"):
+            next(test_iter)
+        return res
+
+    return closure
+
+
 def parse_args(argv):
     parser = argparse.ArgumentParser(
         description="Run disk io performance test")
@@ -85,18 +157,23 @@
     parser.add_argument("-n", "--lab-name", default=None,
                         dest="lab_name")
 
+    parser.add_argument("--create-vms-opts", default=None,
+                        help="Creating vm's before run ssh runner",
+                        dest="create_vms_opts")
+
     parser.add_argument("-k", "--keep", default=False,
                         help="keep temporary files",
                         dest="keep_temp_files", action='store_true')
 
-    choices = ["ssh"]
+    choices = ["local", "ssh"]
+
     if rally_runner is not None:
         choices.append("rally")
 
     parser.add_argument("--runner", required=True,
                         choices=choices, help="runner type")
 
-    parser.add_argument("--runner-extra-opts", default="",
+    parser.add_argument("--runner-extra-opts", default=None,
                         dest="runner_opts", help="runner extra options")
 
     return parser.parse_args(argv)
@@ -179,17 +256,58 @@
     return templ.format(data, format_measurements_stat(res), "=" * 80)
 
 
+@contextlib.contextmanager
+def start_test_vms(opts):
+    create_vms_opts = {}
+    for opt in opts.split(","):
+        name, val = opt.split("=", 1)
+        create_vms_opts[name] = val
+
+    user = create_vms_opts.pop("user")
+    key_file = create_vms_opts.pop("key_file")
+    aff_group = create_vms_opts.pop("aff_group", None)
+    raw_count = create_vms_opts.pop("count", "x1")
+
+    logger.debug("Connection to nova")
+    nova = nova_connect()
+
+    if raw_count.startswith("x"):
+        logger.debug("Getting amount of compute services")
+        count = len(nova.services.list(binary="nova-compute"))
+        count *= int(raw_count[1:])
+    else:
+        count = int(raw_count)
+
+    if aff_group is not None:
+        scheduler_hints = {'group': aff_group}
+    else:
+        scheduler_hints = None
+
+    create_vms_opts['scheduler_hints'] = scheduler_hints
+
+    logger.debug("Will start {0} vms".format(count))
+
+    try:
+        ips = [i[0] for i in create_vms_mt(nova, count, **create_vms_opts)]
+
+        uris = ["{0}@{1}::{2}".format(user, ip, key_file) for ip in ips]
+
+        yield uris
+    except:
+        traceback.print_exc()
+    finally:
+        logger.debug("Clearing")
+        clear_all(nova)
+
+
 def main(argv):
     opts = parse_args(argv)
 
-    if not opts.extra_logs:
-        def nolog(x):
-            pass
+    if opts.extra_logs:
+        logger.setLevel(logging.DEBUG)
+        ch.setLevel(logging.DEBUG)
 
     io_opts = get_io_opts(opts.io_opts_file, opts.io_opts)
-    data_server_url = opts.data_server_url
-    # lab_name = opts.lab_name
-    build_name = opts.build_name
 
     if opts.runner == "rally":
         logger.debug("Use rally runner")
@@ -210,47 +328,42 @@
                               opts.keep_temp_files)
             logger.debug(format_result(res))
 
+    elif opts.runner == "local":
+        logger.debug("Run on local computer")
+        try:
+            for script_args in io_opts:
+                cmd_line = " ".join(script_args)
+                logger.debug("Run test with {0!r} params".format(cmd_line))
+                runner = get_local_runner(opts.keep_temp_files)
+                res = run_io_test(opts.tool_type,
+                                  script_args,
+                                  runner,
+                                  opts.keep_temp_files)
+                logger.debug(format_result(res))
+        except:
+            traceback.print_exc()
+            return 1
+
     elif opts.runner == "ssh":
         logger.debug("Use ssh runner")
-        create_vms_opts = {}
-        for opt in opts.runner_opts.split(","):
-            name, val = opt.split("=", 1)
-            create_vms_opts[name] = val
 
-        user = create_vms_opts.pop("user")
-        key_file = create_vms_opts.pop("key_file")
-        aff_group = create_vms_opts.pop("aff_group", None)
-        raw_count = create_vms_opts.pop("count", "x1")
+        uris = []
 
-        logger.debug("Connection to nova")
-        nova = nova_connect()
-
-        if raw_count.startswith("x"):
-            logger.debug("Getting amount of compute services")
-            count = len(nova.services.list(binary="nova-compute"))
-            count *= int(raw_count[1:])
+        if opts.create_vms_opts is not None:
+            vm_context = start_test_vms(opts.create_vms_opts)
+            uris += vm_context.__enter__()
         else:
-            count = int(raw_count)
+            vm_context = None
 
-        if aff_group is not None:
-            scheduler_hints = {'group': aff_group}
-        else:
-            scheduler_hints = None
+        if opts.runner_opts is not None:
+            uris += opts.runner_opts.split(";")
 
-        create_vms_opts['scheduler_hints'] = scheduler_hints
-
-        # nova, amount, keypair_name, img_name,
-        # flavor_name, vol_sz=None, network_zone_name=None,
-        # flt_ip_pool=None, name_templ='ceph-test-{0}',
-        # scheduler_hints=None
-
-        logger.debug("Will start {0} vms".format(count))
+        if len(uris) == 0:
+            logger.critical("You need to provide at least" +
+                            " vm spawn params or ssh params")
+            return 1
 
         try:
-            ips = [i[0] for i in create_vms_mt(nova, count, **create_vms_opts)]
-
-            uris = ["{0}@{1}::{2}".format(user, ip, key_file) for ip in ips]
-
             for script_args in io_opts:
                 cmd_line = " ".join(script_args)
                 logger.debug("Run test with {0!r} params".format(cmd_line))
@@ -266,13 +379,17 @@
 
         except:
             traceback.print_exc()
+            return 1
         finally:
-            logger.debug("Clearing")
-            clear_all(nova)
+            if vm_context is not None:
+                vm_context.__exit__()
+                logger.debug("Clearing")
 
-    result = json.loads(format_measurements_stat(res))
-    result['name'] = build_name
-    add_test(build_name, result, data_server_url)
+    if opts.data_server_url:
+        result = json.loads(format_measurements_stat(res))
+        result['name'] = opts.build_name
+        add_test(opts.build_name, result, opts.data_server_url)
+
     return 0