run tests on nodes in offline mode
diff --git a/wally/run_test.py b/wally/run_test.py
index d578349..d4b33bf 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -2,7 +2,6 @@
 
 import os
 import sys
-import time
 import Queue
 import pprint
 import logging
@@ -78,23 +77,26 @@
 
 
 def test_thread(test, node, barrier, res_q):
+    exc = None
     try:
         logger.debug("Run preparation for {0}".format(node.get_conn_id()))
-        test.pre_run(node.connection)
+        test.pre_run()
         logger.debug("Run test for {0}".format(node.get_conn_id()))
-        test.run(node.connection, barrier)
+        test.run(barrier)
     except Exception as exc:
         logger.exception("In test {0} for node {1}".format(test, node))
-        res_q.put(exc)
 
     try:
-        test.cleanup(node.connection)
+        test.cleanup()
     except:
         msg = "Duringf cleanup - in test {0} for node {1}"
         logger.exception(msg.format(test, node))
 
+    if exc is not None:
+        res_q.put(exc)
 
-def run_tests(test_block, nodes):
+
+def run_tests(test_block, nodes, test_uuid):
     tool_type_mapper = {
         "io": IOPerfTest,
         "pgbench": PgBenchTest,
@@ -124,8 +126,8 @@
             if not os.path.exists(dr):
                 os.makedirs(dr)
 
-            test = tool_type_mapper[name](params, res_q.put, dr,
-                                          node=node.get_conn_id())
+            test = tool_type_mapper[name](params, res_q.put, test_uuid, node,
+                                          log_directory=dr)
             th = threading.Thread(None, test_thread, None,
                                   (test, node, barrier, res_q))
             threads.append(th)
@@ -313,10 +315,13 @@
 
                 if not cfg['no_tests']:
                     for test_group in config.get('tests', []):
-                        ctx.results.extend(run_tests(test_group, ctx.nodes))
+                        test_res = run_tests(test_group, ctx.nodes,
+                                             cfg['run_uuid'])
+                        ctx.results.extend(test_res)
         else:
             if not cfg['no_tests']:
-                ctx.results.extend(run_tests(group, ctx.nodes))
+                test_res = run_tests(group, ctx.nodes, cfg['run_uuid'])
+                ctx.results.extend(test_res)
 
 
 def shut_down_vms_stage(cfg, ctx):
diff --git a/wally/sensors/daemonize.py b/wally/sensors/daemonize.py
index a4fa157..bc4ab81 100644
--- a/wally/sensors/daemonize.py
+++ b/wally/sensors/daemonize.py
@@ -1,15 +1,14 @@
 # #!/usr/bin/python
 
-import fcntl
 import os
 import pwd
 import grp
 import sys
+import fcntl
 import signal
-import resource
-import logging
 import atexit
-from logging import handlers
+import logging
+import resource
 
 
 class Daemonize(object):
@@ -155,7 +154,7 @@
             # actually have such capabilities
             # on the machine we are running this.
             if os.path.isfile(syslog_address):
-                syslog = handlers.SysLogHandler(syslog_address)
+                syslog = logging.handlers.SysLogHandler(syslog_address)
                 if self.verbose:
                     syslog.setLevel(logging.DEBUG)
                 else:
diff --git a/wally/ssh_utils.py b/wally/ssh_utils.py
index f4de3b6..d3d6547 100644
--- a/wally/ssh_utils.py
+++ b/wally/ssh_utils.py
@@ -28,6 +28,9 @@
 
     @classmethod
     def put(cls, localfile, remfile):
+        dirname = os.path.dirname(remfile)
+        if not os.path.exists(dirname):
+            os.makedirs(dirname)
         shutil.copyfile(localfile, remfile)
 
     @classmethod
@@ -105,6 +108,16 @@
             time.sleep(1)
 
 
+def save_to_remote(sftp, path, content):
+    with sftp.open(path, "wb") as fd:
+        fd.write(content)
+
+
+def read_from_remote(sftp, path):
+    with sftp.open(path, "rb") as fd:
+        return fd.read()
+
+
 def normalize_dirpath(dirpath):
     while dirpath.endswith("/"):
         dirpath = dirpath[:-1]
@@ -119,7 +132,7 @@
     if intermediate:
         try:
             sftp.mkdir(remotepath, mode=mode)
-        except IOError:
+        except (IOError, OSError):
             upper_dir = remotepath.rsplit("/", 1)[0]
 
             if upper_dir == '' or upper_dir == '/':
@@ -265,6 +278,9 @@
     # ip_host:port:path_to_key_file
     # ip_host::path_to_key_file
 
+    if uri.startswith("ssh://"):
+        uri = uri[len("ssh://"):]
+
     res = ConnCreds()
     res.port = "22"
     res.key_file = None
diff --git a/wally/suits/io/agent.py b/wally/suits/io/agent.py
index b11f20e..25910b4 100644
--- a/wally/suits/io/agent.py
+++ b/wally/suits/io/agent.py
@@ -1,9 +1,11 @@
+import os
 import sys
 import time
 import json
 import copy
 import select
 import pprint
+import os.path
 import argparse
 import traceback
 import subprocess
@@ -516,7 +518,7 @@
                         help="Start execution at START_AT_UTC")
     parser.add_argument("--json", action="store_true", default=False,
                         help="Json output format")
-    parser.add_argument("--output", default='-', metavar="FILE_PATH",
+    parser.add_argument("-o", "--output", default='-', metavar="FILE_PATH",
                         help="Store results to FILE_PATH")
     parser.add_argument("--estimate", action="store_true", default=False,
                         help="Only estimate task execution time")
@@ -533,6 +535,9 @@
                         default=[],
                         help="Provide set of pairs PARAM=VAL to" +
                              "format into job description")
+    parser.add_argument("-p", "--pid-file", metavar="FILE_TO_STORE_PID",
+                        default=None, help="Store pid to FILE_TO_STORE_PID " +
+                        "and remove this file on exit")
     parser.add_argument("jobfile")
     return parser.parse_args(argv)
 
@@ -550,65 +555,78 @@
     else:
         out_fd = open(argv_obj.output, "w")
 
-    params = {}
-    for param_val in argv_obj.params:
-        assert '=' in param_val
-        name, val = param_val.split("=", 1)
-        params[name] = val
+    if argv_obj.pid_file is not None:
+        with open(argv_obj.pid_file, "w") as fd:
+            fd.write(str(os.getpid()))
 
-    slice_params = {
-        'runcycle': argv_obj.runcycle,
-    }
+    try:
+        params = {}
+        for param_val in argv_obj.params:
+            assert '=' in param_val
+            name, val = param_val.split("=", 1)
+            params[name] = val
 
-    sliced_it = parse_and_slice_all_in_1(job_cfg, params, **slice_params)
+        slice_params = {
+            'runcycle': argv_obj.runcycle,
+        }
 
-    if argv_obj.estimate:
-        it = map(calculate_execution_time, sliced_it)
-        print sec_to_str(sum(it))
-        return 0
+        sliced_it = parse_and_slice_all_in_1(job_cfg, params, **slice_params)
 
-    if argv_obj.num_tests or argv_obj.compile:
-        if argv_obj.compile:
-            for test_slice in sliced_it:
-                out_fd.write(fio_config_to_str(test_slice))
-                out_fd.write("\n#" + "-" * 70 + "\n\n")
+        if argv_obj.estimate:
+            it = map(calculate_execution_time, sliced_it)
+            print sec_to_str(sum(it))
+            return 0
 
-        if argv_obj.num_tests:
-            print len(list(sliced_it))
+        if argv_obj.num_tests or argv_obj.compile:
+            if argv_obj.compile:
+                for test_slice in sliced_it:
+                    out_fd.write(fio_config_to_str(test_slice))
+                    out_fd.write("\n#" + "-" * 70 + "\n\n")
 
-        return 0
+            if argv_obj.num_tests:
+                print len(list(sliced_it))
 
-    if argv_obj.start_at is not None:
-        ctime = time.time()
-        if argv_obj.start_at >= ctime:
-            time.sleep(ctime - argv_obj.start_at)
+            return 0
 
-    def raw_res_func(test_num, data):
-        pref = "========= RAW_RESULTS({0}) =========\n".format(test_num)
-        out_fd.write(pref)
-        out_fd.write(json.dumps(data))
-        out_fd.write("\n========= END OF RAW_RESULTS =========\n")
-        out_fd.flush()
+        if argv_obj.start_at is not None:
+            ctime = time.time()
+            if argv_obj.start_at >= ctime:
+                time.sleep(ctime - argv_obj.start_at)
 
-    rrfunc = raw_res_func if argv_obj.show_raw_results else None
+        def raw_res_func(test_num, data):
+            pref = "========= RAW_RESULTS({0}) =========\n".format(test_num)
+            out_fd.write(pref)
+            out_fd.write(json.dumps(data))
+            out_fd.write("\n========= END OF RAW_RESULTS =========\n")
+            out_fd.flush()
 
-    stime = time.time()
-    job_res, num_tests, ok = run_benchmark(argv_obj.type, sliced_it, rrfunc)
-    etime = time.time()
+        rrfunc = raw_res_func if argv_obj.show_raw_results else None
 
-    res = {'__meta__': {'raw_cfg': job_cfg, 'params': params}, 'res': job_res}
+        stime = time.time()
+        job_res, num_tests, ok = run_benchmark(argv_obj.type,
+                                               sliced_it, rrfunc)
+        etime = time.time()
 
-    oformat = 'json' if argv_obj.json else 'eval'
-    out_fd.write("\nRun {0} tests in {1} seconds\n".format(num_tests,
-                                                           int(etime - stime)))
-    out_fd.write("========= RESULTS(format={0}) =========\n".format(oformat))
-    if argv_obj.json:
-        out_fd.write(json.dumps(res))
-    else:
-        out_fd.write(pprint.pformat(res) + "\n")
-    out_fd.write("\n========= END OF RESULTS =========\n")
+        res = {'__meta__': {'raw_cfg': job_cfg, 'params': params},
+               'res': job_res}
 
-    return 0 if ok else 1
+        oformat = 'json' if argv_obj.json else 'eval'
+        msg = "\nRun {0} tests in {1} seconds\n"
+        out_fd.write(msg.format(num_tests, int(etime - stime)))
+
+        msg = "========= RESULTS(format={0}) =========\n"
+        out_fd.write(msg.format(oformat))
+        if argv_obj.json:
+            out_fd.write(json.dumps(res))
+        else:
+            out_fd.write(pprint.pformat(res) + "\n")
+        out_fd.write("\n========= END OF RESULTS =========\n")
+
+        return 0 if ok else 1
+    finally:
+        if argv_obj.pid_file is not None:
+            if os.path.exists(argv_obj.pid_file):
+                os.unlink(argv_obj.pid_file)
 
 
 def fake_main(x):
diff --git a/wally/suits/io/ceph.cfg b/wally/suits/io/ceph.cfg
index cc8e411..1352b1e 100644
--- a/wally/suits/io/ceph.cfg
+++ b/wally/suits/io/ceph.cfg
@@ -9,7 +9,7 @@
 NUM_ROUNDS=7
 NUMJOBS={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
 
-size=5G
+size=30G
 ramp_time=30
 runtime=60
 
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index 0f8afd4..5166fdd 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -1,12 +1,17 @@
 import abc
 import time
+import random
 import os.path
 import logging
 import datetime
 
-from wally.ssh_utils import copy_paths, run_over_ssh, delete_file
 from wally.utils import ssize_to_b, open_for_append_or_create, sec_to_str
 
+from wally.ssh_utils import (copy_paths, run_over_ssh,
+                             save_to_remote, ssh_mkdir,
+                             # delete_file,
+                             connect, read_from_remote, Local)
+
 from . import postgres
 from .io import agent as io_agent
 from .io import formatter as io_formatter
@@ -17,19 +22,20 @@
 
 
 class IPerfTest(object):
-    def __init__(self, on_result_cb, log_directory=None, node=None):
+    def __init__(self, on_result_cb, test_uuid, node, log_directory=None):
         self.on_result_cb = on_result_cb
         self.log_directory = log_directory
         self.node = node
+        self.test_uuid = test_uuid
 
-    def pre_run(self, conn):
+    def pre_run(self):
         pass
 
-    def cleanup(self, conn):
+    def cleanup(self):
         pass
 
     @abc.abstractmethod
-    def run(self, conn, barrier):
+    def run(self, barrier):
         pass
 
     @classmethod
@@ -37,12 +43,16 @@
         msg = "{0}.format_for_console".format(cls.__name__)
         raise NotImplementedError(msg)
 
+    def run_over_ssh(self, cmd, **kwargs):
+        return run_over_ssh(self.node.connection, cmd,
+                            node=self.node.get_conn_id(), **kwargs)
+
 
 class TwoScriptTest(IPerfTest):
     remote_tmp_dir = '/tmp'
 
-    def __init__(self, opts, on_result_cb, log_directory=None, node=None):
-        IPerfTest.__init__(self, on_result_cb, log_directory, node=node)
+    def __init__(self, opts, *dt, **mp):
+        IPerfTest.__init__(self, *dt, **mp)
         self.opts = opts
 
         if 'run_script' in self.opts:
@@ -52,22 +62,23 @@
     def get_remote_for_script(self, script):
         return os.path.join(self.tmp_dir, script.rpartition('/')[2])
 
-    def copy_script(self, conn, src):
+    def copy_script(self, src):
         remote_path = self.get_remote_for_script(src)
-        copy_paths(conn, {src: remote_path})
+        copy_paths(self.node.connection, {src: remote_path})
         return remote_path
 
-    def pre_run(self, conn):
-        remote_script = self.copy_script(conn, self.pre_run_script)
+    def pre_run(self):
+        remote_script = self.copy_script(self.node.connection,
+                                         self.pre_run_script)
         cmd = remote_script
-        run_over_ssh(conn, cmd, node=self.node)
+        self.run_over_ssh(cmd)
 
-    def run(self, conn, barrier):
-        remote_script = self.copy_script(conn, self.run_script)
+    def run(self, barrier):
+        remote_script = self.copy_script(self.node.connection, self.run_script)
         cmd_opts = ' '.join(["%s %s" % (key, val) for key, val
                              in self.opts.items()])
         cmd = remote_script + ' ' + cmd_opts
-        out_err = run_over_ssh(conn, cmd, node=self.node)
+        out_err = self.run_over_ssh(cmd)
         self.on_result(out_err, cmd)
 
     def parse_results(self, out):
@@ -91,11 +102,13 @@
 
 
 class IOPerfTest(IPerfTest):
-    io_py_remote = "/tmp/disk_test_agent.py"
+    io_py_remote_templ = "/tmp/test/{0}/io/disk_test_agent.py"
+    log_fl_templ = "/tmp/test/{0}/io/disk_test_agent_log.txt"
+    pid_file_templ = "/tmp/test/{0}/io/disk_test_agent_pid_file"
+    task_file_templ = "/tmp/test/{0}/io/io_task.cfg"
 
-    def __init__(self, test_options, on_result_cb,
-                 log_directory=None, node=None):
-        IPerfTest.__init__(self, on_result_cb, log_directory, node=node)
+    def __init__(self, test_options, *dt, **mp):
+        IPerfTest.__init__(self, *dt, **mp)
         self.options = test_options
         self.config_fname = test_options['cfg']
         self.alive_check_interval = test_options.get('alive_check_interval')
@@ -108,6 +121,11 @@
         cmd_log = os.path.join(self.log_directory, "task_compiled.cfg")
         raw_res = os.path.join(self.log_directory, "raw_results.txt")
 
+        self.io_py_remote = self.io_py_remote_templ.format(self.test_uuid)
+        self.log_fl = self.log_fl_templ.format(self.test_uuid)
+        self.pid_file = self.pid_file_templ.format(self.test_uuid)
+        self.task_file = self.task_file_templ.format(self.test_uuid)
+
         fio_command_file = open_for_append_or_create(cmd_log)
 
         cfg_s_it = io_agent.compile_all_in_1(self.raw_cfg, self.config_params)
@@ -115,29 +133,35 @@
         fio_command_file.write(splitter.join(cfg_s_it))
         self.fio_raw_results_file = open_for_append_or_create(raw_res)
 
-    def cleanup(self, conn):
-        delete_file(conn, self.io_py_remote)
+    def cleanup(self):
+        # delete_file(conn, self.io_py_remote)
         # Need to remove tempo files, used for testing
+        pass
 
-    def pre_run(self, conn):
+    def pre_run(self):
+        ssh_mkdir(self.node.connection.open_sftp(),
+                  os.path.dirname(self.io_py_remote),
+                  intermediate=True)
         try:
-            run_over_ssh(conn, 'which fio', node=self.node)
+            self.run_over_ssh('which fio')
         except OSError:
             # TODO: install fio, if not installed
-            cmd = "sudo apt-get -y install fio"
-
             for i in range(3):
                 try:
-                    run_over_ssh(conn, cmd, node=self.node)
+                    self.run_over_ssh("sudo apt-get -y install fio")
                     break
                 except OSError as err:
                     time.sleep(3)
             else:
                 raise OSError("Can't install fio - " + err.message)
 
-        local_fname = io_agent.__file__.rsplit('.')[0] + ".py"
-        self.files_to_copy = {local_fname: self.io_py_remote}
-        copy_paths(conn, self.files_to_copy)
+        local_fname = os.path.splitext(io_agent.__file__)[0] + ".py"
+
+        self.files_to_copy = {
+            local_fname: self.io_py_remote,
+        }
+
+        copy_paths(self.node.connection, self.files_to_copy)
 
         if self.options.get('prefill_files', True):
             files = {}
@@ -155,17 +179,19 @@
                 # take largest size
                 files[fname] = max(files.get(fname, 0), msz)
 
-            # logger.warning("dd run DISABLED")
-            # cmd_templ = "dd if=/dev/zero of={0} bs={1} count={2}"
+            cmd_templ = "dd oflag=direct " + \
+                        "if=/dev/zero of={0} bs={1} count={2}"
 
-            cmd_templ = "sudo dd if=/dev/zero of={0} bs={1} count={2}"
+            if self.options.get("use_sudo", True):
+                cmd_templ = "sudo " + cmd_templ
+
             ssize = 0
             stime = time.time()
 
             for fname, curr_sz in files.items():
                 cmd = cmd_templ.format(fname, 1024 ** 2, curr_sz)
                 ssize += curr_sz
-                run_over_ssh(conn, cmd, timeout=curr_sz, node=self.node)
+                self.run_over_ssh(cmd, timeout=curr_sz)
 
             ddtime = time.time() - stime
             if ddtime > 1E-3:
@@ -175,9 +201,13 @@
         else:
             logger.warning("Test files prefill disabled")
 
-    def run(self, conn, barrier):
-        cmd_templ = "sudo env python2 {0} --type {1} {2} --json -"
-        # cmd_templ = "env python2 {0} --type {1} {2} --json -"
+    def run(self, barrier):
+        cmd_templ = "screen -S {screen_name} -d -m " + \
+                    "env python2 {0} -p {pid_file} -o {results_file} " + \
+                    "--type {1} {2} --json {3}"
+
+        if self.options.get("use_sudo", True):
+            cmd_templ = "sudo " + cmd_templ
 
         params = " ".join("{0}={1}".format(k, v)
                           for k, v in self.config_params.items())
@@ -185,14 +215,24 @@
         if "" != params:
             params = "--params " + params
 
-        cmd = cmd_templ.format(self.io_py_remote, self.tool, params)
+        save_to_remote(self.node.connection.open_sftp(),
+                       self.task_file, self.raw_cfg)
+
+        screen_name = self.test_uuid
+        cmd = cmd_templ.format(self.io_py_remote,
+                               self.tool,
+                               params,
+                               self.task_file,
+                               pid_file=self.pid_file,
+                               results_file=self.log_fl,
+                               screen_name=screen_name)
         logger.debug("Waiting on barrier")
 
         exec_time = io_agent.calculate_execution_time(self.configs)
         exec_time_str = sec_to_str(exec_time)
 
         try:
-            timeout = int(exec_time * 1.2 + 300)
+            timeout = int(exec_time * 2 + 300)
             if barrier.wait():
                 templ = "Test should takes about {0}." + \
                         " Should finish at {1}," + \
@@ -205,11 +245,67 @@
                                          end_dt.strftime("%H:%M:%S"),
                                          wait_till.strftime("%H:%M:%S")))
 
-            out_err = run_over_ssh(conn, cmd,
-                                   stdin_data=self.raw_cfg,
-                                   timeout=timeout,
-                                   node=self.node)
-            logger.info("Done")
+            self.run_over_ssh(cmd)
+            logger.debug("Test started in screen {0}".format(screen_name))
+
+            end_of_wait_time = timeout + time.time()
+
+            # time_till_check = random.randint(30, 90)
+            time_till_check = 1
+
+            pid = None
+            no_pid_file = True
+            tcp_conn_timeout = 30
+            pid_get_timeout = 30 + time.time()
+
+            # TODO: add monitoring socket
+            if self.node.connection is not Local:
+                self.node.connection.close()
+
+            while end_of_wait_time > time.time():
+                conn = None
+                time.sleep(time_till_check)
+
+                try:
+                    if self.node.connection is not Local:
+                        conn = connect(self.node.conn_url,
+                                       conn_timeout=tcp_conn_timeout)
+                    else:
+                        conn = self.node.connection
+                except:
+                    logging.exception("During connect")
+                    continue
+
+                try:
+                    pid = read_from_remote(conn.open_sftp(), self.pid_file)
+                    no_pid_file = False
+                except (NameError, IOError):
+                    no_pid_file = True
+
+                if conn is not Local:
+                    conn.close()
+
+                if no_pid_file:
+                    if pid is None:
+                        if time.time() > pid_get_timeout:
+                            msg = "On node {0} pid file doesn't " + \
+                                  "appears in time"
+                            logging.error(msg.format(self.node.get_conn_id()))
+                            raise RuntimeError("Start timeout")
+                    else:
+                        # execution finished
+                        break
+
+            logger.debug("Done")
+
+            if self.node.connection is not Local:
+                timeout = tcp_conn_timeout * 3
+                self.node.connection = connect(self.node.conn_url,
+                                               conn_timeout=timeout)
+
+            # try to reboot and then connect
+            out_err = read_from_remote(self.node.connection.open_sftp(),
+                                       self.log_fl)
         finally:
             barrier.exit()