run tests on nodes in offline mode
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index 0f8afd4..5166fdd 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -1,12 +1,17 @@
 import abc
 import time
+import random
 import os.path
 import logging
 import datetime
 
-from wally.ssh_utils import copy_paths, run_over_ssh, delete_file
 from wally.utils import ssize_to_b, open_for_append_or_create, sec_to_str
 
+from wally.ssh_utils import (copy_paths, run_over_ssh,
+                             save_to_remote, ssh_mkdir,
+                             # delete_file,
+                             connect, read_from_remote, Local)
+
 from . import postgres
 from .io import agent as io_agent
 from .io import formatter as io_formatter
@@ -17,19 +22,20 @@
 
 
 class IPerfTest(object):
-    def __init__(self, on_result_cb, log_directory=None, node=None):
+    def __init__(self, on_result_cb, test_uuid, node, log_directory=None):
         self.on_result_cb = on_result_cb
         self.log_directory = log_directory
         self.node = node
+        self.test_uuid = test_uuid
 
-    def pre_run(self, conn):
+    def pre_run(self):
         pass
 
-    def cleanup(self, conn):
+    def cleanup(self):
         pass
 
     @abc.abstractmethod
-    def run(self, conn, barrier):
+    def run(self, barrier):
         pass
 
     @classmethod
@@ -37,12 +43,16 @@
         msg = "{0}.format_for_console".format(cls.__name__)
         raise NotImplementedError(msg)
 
+    def run_over_ssh(self, cmd, **kwargs):
+        return run_over_ssh(self.node.connection, cmd,
+                            node=self.node.get_conn_id(), **kwargs)
+
 
 class TwoScriptTest(IPerfTest):
     remote_tmp_dir = '/tmp'
 
-    def __init__(self, opts, on_result_cb, log_directory=None, node=None):
-        IPerfTest.__init__(self, on_result_cb, log_directory, node=node)
+    def __init__(self, opts, *dt, **mp):
+        IPerfTest.__init__(self, *dt, **mp)
         self.opts = opts
 
         if 'run_script' in self.opts:
@@ -52,22 +62,23 @@
     def get_remote_for_script(self, script):
         return os.path.join(self.tmp_dir, script.rpartition('/')[2])
 
-    def copy_script(self, conn, src):
+    def copy_script(self, src):
         remote_path = self.get_remote_for_script(src)
-        copy_paths(conn, {src: remote_path})
+        copy_paths(self.node.connection, {src: remote_path})
         return remote_path
 
-    def pre_run(self, conn):
-        remote_script = self.copy_script(conn, self.pre_run_script)
+    def pre_run(self):
+        remote_script = self.copy_script(self.node.connection,
+                                         self.pre_run_script)
         cmd = remote_script
-        run_over_ssh(conn, cmd, node=self.node)
+        self.run_over_ssh(cmd)
 
-    def run(self, conn, barrier):
-        remote_script = self.copy_script(conn, self.run_script)
+    def run(self, barrier):
+        remote_script = self.copy_script(self.node.connection, self.run_script)
         cmd_opts = ' '.join(["%s %s" % (key, val) for key, val
                              in self.opts.items()])
         cmd = remote_script + ' ' + cmd_opts
-        out_err = run_over_ssh(conn, cmd, node=self.node)
+        out_err = self.run_over_ssh(cmd)
         self.on_result(out_err, cmd)
 
     def parse_results(self, out):
@@ -91,11 +102,13 @@
 
 
 class IOPerfTest(IPerfTest):
-    io_py_remote = "/tmp/disk_test_agent.py"
+    io_py_remote_templ = "/tmp/test/{0}/io/disk_test_agent.py"
+    log_fl_templ = "/tmp/test/{0}/io/disk_test_agent_log.txt"
+    pid_file_templ = "/tmp/test/{0}/io/disk_test_agent_pid_file"
+    task_file_templ = "/tmp/test/{0}/io/io_task.cfg"
 
-    def __init__(self, test_options, on_result_cb,
-                 log_directory=None, node=None):
-        IPerfTest.__init__(self, on_result_cb, log_directory, node=node)
+    def __init__(self, test_options, *dt, **mp):
+        IPerfTest.__init__(self, *dt, **mp)
         self.options = test_options
         self.config_fname = test_options['cfg']
         self.alive_check_interval = test_options.get('alive_check_interval')
@@ -108,6 +121,11 @@
         cmd_log = os.path.join(self.log_directory, "task_compiled.cfg")
         raw_res = os.path.join(self.log_directory, "raw_results.txt")
 
+        self.io_py_remote = self.io_py_remote_templ.format(self.test_uuid)
+        self.log_fl = self.log_fl_templ.format(self.test_uuid)
+        self.pid_file = self.pid_file_templ.format(self.test_uuid)
+        self.task_file = self.task_file_templ.format(self.test_uuid)
+
         fio_command_file = open_for_append_or_create(cmd_log)
 
         cfg_s_it = io_agent.compile_all_in_1(self.raw_cfg, self.config_params)
@@ -115,29 +133,35 @@
         fio_command_file.write(splitter.join(cfg_s_it))
         self.fio_raw_results_file = open_for_append_or_create(raw_res)
 
-    def cleanup(self, conn):
-        delete_file(conn, self.io_py_remote)
+    def cleanup(self):
+        # delete_file(conn, self.io_py_remote)
         # Need to remove tempo files, used for testing
+        pass
 
-    def pre_run(self, conn):
+    def pre_run(self):
+        ssh_mkdir(self.node.connection.open_sftp(),
+                  os.path.dirname(self.io_py_remote),
+                  intermediate=True)
         try:
-            run_over_ssh(conn, 'which fio', node=self.node)
+            self.run_over_ssh('which fio')
         except OSError:
             # TODO: install fio, if not installed
-            cmd = "sudo apt-get -y install fio"
-
             for i in range(3):
                 try:
-                    run_over_ssh(conn, cmd, node=self.node)
+                    self.run_over_ssh("sudo apt-get -y install fio")
                     break
                 except OSError as err:
                     time.sleep(3)
             else:
                 raise OSError("Can't install fio - " + err.message)
 
-        local_fname = io_agent.__file__.rsplit('.')[0] + ".py"
-        self.files_to_copy = {local_fname: self.io_py_remote}
-        copy_paths(conn, self.files_to_copy)
+        local_fname = os.path.splitext(io_agent.__file__)[0] + ".py"
+
+        self.files_to_copy = {
+            local_fname: self.io_py_remote,
+        }
+
+        copy_paths(self.node.connection, self.files_to_copy)
 
         if self.options.get('prefill_files', True):
             files = {}
@@ -155,17 +179,19 @@
                 # take largest size
                 files[fname] = max(files.get(fname, 0), msz)
 
-            # logger.warning("dd run DISABLED")
-            # cmd_templ = "dd if=/dev/zero of={0} bs={1} count={2}"
+            cmd_templ = "dd oflag=direct " + \
+                        "if=/dev/zero of={0} bs={1} count={2}"
 
-            cmd_templ = "sudo dd if=/dev/zero of={0} bs={1} count={2}"
+            if self.options.get("use_sudo", True):
+                cmd_templ = "sudo " + cmd_templ
+
             ssize = 0
             stime = time.time()
 
             for fname, curr_sz in files.items():
                 cmd = cmd_templ.format(fname, 1024 ** 2, curr_sz)
                 ssize += curr_sz
-                run_over_ssh(conn, cmd, timeout=curr_sz, node=self.node)
+                self.run_over_ssh(cmd, timeout=curr_sz)
 
             ddtime = time.time() - stime
             if ddtime > 1E-3:
@@ -175,9 +201,13 @@
         else:
             logger.warning("Test files prefill disabled")
 
-    def run(self, conn, barrier):
-        cmd_templ = "sudo env python2 {0} --type {1} {2} --json -"
-        # cmd_templ = "env python2 {0} --type {1} {2} --json -"
+    def run(self, barrier):
+        cmd_templ = "screen -S {screen_name} -d -m " + \
+                    "env python2 {0} -p {pid_file} -o {results_file} " + \
+                    "--type {1} {2} --json {3}"
+
+        if self.options.get("use_sudo", True):
+            cmd_templ = "sudo " + cmd_templ
 
         params = " ".join("{0}={1}".format(k, v)
                           for k, v in self.config_params.items())
@@ -185,14 +215,24 @@
         if "" != params:
             params = "--params " + params
 
-        cmd = cmd_templ.format(self.io_py_remote, self.tool, params)
+        save_to_remote(self.node.connection.open_sftp(),
+                       self.task_file, self.raw_cfg)
+
+        screen_name = self.test_uuid
+        cmd = cmd_templ.format(self.io_py_remote,
+                               self.tool,
+                               params,
+                               self.task_file,
+                               pid_file=self.pid_file,
+                               results_file=self.log_fl,
+                               screen_name=screen_name)
         logger.debug("Waiting on barrier")
 
         exec_time = io_agent.calculate_execution_time(self.configs)
         exec_time_str = sec_to_str(exec_time)
 
         try:
-            timeout = int(exec_time * 1.2 + 300)
+            timeout = int(exec_time * 2 + 300)
             if barrier.wait():
                 templ = "Test should takes about {0}." + \
                         " Should finish at {1}," + \
@@ -205,11 +245,67 @@
                                          end_dt.strftime("%H:%M:%S"),
                                          wait_till.strftime("%H:%M:%S")))
 
-            out_err = run_over_ssh(conn, cmd,
-                                   stdin_data=self.raw_cfg,
-                                   timeout=timeout,
-                                   node=self.node)
-            logger.info("Done")
+            self.run_over_ssh(cmd)
+            logger.debug("Test started in screen {0}".format(screen_name))
+
+            end_of_wait_time = timeout + time.time()
+
+            # time_till_check = random.randint(30, 90)
+            time_till_check = 1
+
+            pid = None
+            no_pid_file = True
+            tcp_conn_timeout = 30
+            pid_get_timeout = 30 + time.time()
+
+            # TODO: add monitoring socket
+            if self.node.connection is not Local:
+                self.node.connection.close()
+
+            while end_of_wait_time > time.time():
+                conn = None
+                time.sleep(time_till_check)
+
+                try:
+                    if self.node.connection is not Local:
+                        conn = connect(self.node.conn_url,
+                                       conn_timeout=tcp_conn_timeout)
+                    else:
+                        conn = self.node.connection
+                except:
+                    logging.exception("During connect")
+                    continue
+
+                try:
+                    pid = read_from_remote(conn.open_sftp(), self.pid_file)
+                    no_pid_file = False
+                except (NameError, IOError):
+                    no_pid_file = True
+
+                if conn is not Local:
+                    conn.close()
+
+                if no_pid_file:
+                    if pid is None:
+                        if time.time() > pid_get_timeout:
+                            msg = "On node {0} pid file doesn't " + \
+                                  "appears in time"
+                            logging.error(msg.format(self.node.get_conn_id()))
+                            raise RuntimeError("Start timeout")
+                    else:
+                        # execution finished
+                        break
+
+            logger.debug("Done")
+
+            if self.node.connection is not Local:
+                timeout = tcp_conn_timeout * 3
+                self.node.connection = connect(self.node.conn_url,
+                                               conn_timeout=timeout)
+
+            # try to reboot and then connect
+            out_err = read_from_remote(self.node.connection.open_sftp(),
+                                       self.log_fl)
         finally:
             barrier.exit()