fixing fio runner
diff --git a/configs-examples/default.yaml b/configs-examples/default.yaml
index af4ca5f..adb114d 100644
--- a/configs-examples/default.yaml
+++ b/configs-examples/default.yaml
@@ -5,6 +5,7 @@
 connect_timeout: 30
 download_rpc_logs: true
 rpc_log_level: DEBUG
+default_test_local_folder: "/tmp/wally_{uuid}_{name}"
 
 logging:
     level: DEBUG
@@ -40,7 +41,7 @@
 sensors:
    online: true
    roles_mapping:
-       testnode: system-cpu, block-io, net-io
+#       testnode: system-cpu, block-io, net-io
        ceph-osd:
             system-cpu: ".*"
             block-io: ".*"
diff --git a/configs-examples/full.yaml b/configs-examples/full.yaml
index 5c336ff..7f07516 100644
--- a/configs-examples/full.yaml
+++ b/configs-examples/full.yaml
@@ -32,18 +32,18 @@
         aa_group_name: wally-aa-{0}
         security_group: wally_ssh_to_everyone
 
-clouds
-    fuel:
-        url: http://172.16.44.13:8000/
-        creds: admin:admin@admin
-        ssh_creds: root:r00tme
-        openstack_env: test
 
-    openstack:
-        OPENRC: /home/koder/workspace/scale_openrc
-        auth: USER:PASSWD:KEY_FILE
-        vms:
-            - "wally-phytographic-sharla,ubuntu,wally_vm_key.pem"
+fuel:
+    url: http://172.16.44.13:8000/
+    creds: admin:admin@admin
+    ssh_creds: root:r00tme
+    openstack_env: test
+
+openstack:
+    OPENRC: /home/koder/workspace/scale_openrc
+    auth: USER:PASSWD:KEY_FILE
+    vms:
+        - "wally-phytographic-sharla,ubuntu,wally_vm_key.pem"
 
 discover: fuel,openstack,fuel_openrc_only
 
@@ -69,6 +69,7 @@
         tests:
             - io:
                 node_limit: 2
+                use_system_fio: bool
                 load: ceph
                 params:
                     FILENAME: /dev/vdb
diff --git a/configs-examples/local_lxc_ceph.yaml b/configs-examples/local_lxc_ceph.yaml
index 4e90082..9312fc5 100644
--- a/configs-examples/local_lxc_ceph.yaml
+++ b/configs-examples/local_lxc_ceph.yaml
@@ -1,7 +1,16 @@
 include: default.yaml
 collect_info: false
 
-ceph:
-    root_node: localhost
+#ceph:
+#    root_node: localhost
 
-sleep: 0
+nodes:
+    koder@localhost: testnode
+
+tests:
+    - io:
+        load: rrd
+        params:
+            FILENAME: /tmp/fl.bin
+            FILESIZE: 16M
+
diff --git a/v2_plans.md b/v2_plans.md
index 1cda71d..02c798a 100644
--- a/v2_plans.md
+++ b/v2_plans.md
@@ -16,6 +16,7 @@
     * Result-to-yaml for UT
     * Flexible SSH connection creds - use agent, default ssh settings or part of config
     * RPC reconnect in case of errors
+    * Remove created temporary files
 
 * Infra:
     * Add script to download fio from git and build it
diff --git a/wally/ceph.py b/wally/ceph.py
index 592d595..a374ed6 100644
--- a/wally/ceph.py
+++ b/wally/ceph.py
@@ -11,7 +11,7 @@
 from .test_run_class import TestRun
 from .ssh_utils import parse_ssh_uri
 from .node import connect, setup_rpc
-from .utils import StopTestError
+from .utils import StopTestError, to_ip
 
 
 logger = logging.getLogger("wally")
@@ -140,7 +140,7 @@
                 ips = set()
                 for ip, osds_info in get_osds_info(node, conf, key).items():
                     ips.add(ip)
-                    creds = ConnCreds(cast(str, ip), user="root", key=ssh_key)
+                    creds = ConnCreds(to_ip(cast(str, ip)), user="root", key=ssh_key)
                     info = ctx.merge_node(creds, {'ceph-osd'})
                     info.params.setdefault('ceph-osds', []).extend(osds_info)
                     assert 'ceph' not in info.params or info.params['ceph'] == ceph_params
@@ -157,7 +157,7 @@
             try:
                 counter = 0
                 for counter, ip in enumerate(get_mons_ips(node, conf, key)):
-                    creds = ConnCreds(cast(str, ip), user="root", key=ssh_key)
+                    creds = ConnCreds(to_ip(cast(str, ip)), user="root", key=ssh_key)
                     info = ctx.merge_node(creds, {'ceph-mon'})
                     assert 'ceph' not in info.params or info.params['ceph'] == ceph_params
                     info.params['ceph'] = ceph_params
diff --git a/wally/fuel.py b/wally/fuel.py
index bcb76e8..902752a 100644
--- a/wally/fuel.py
+++ b/wally/fuel.py
@@ -4,9 +4,8 @@
 from paramiko.ssh_exception import AuthenticationException
 
 from .fuel_rest_api import get_cluster_id, reflect_cluster, FuelInfo, KeystoneAuth
-from .node_interfaces import NodeInfo
-from .ssh_utils import ConnCreds, parse_ssh_uri
-from .utils import check_input_param, StopTestError, parse_creds
+from .ssh_utils import ConnCreds
+from .utils import StopTestError, parse_creds, to_ip
 from .stage import Stage, StepOrder
 from .test_run_class import TestRun
 from .node import connect, setup_rpc
@@ -114,6 +113,6 @@
         count = 0
         for count, fuel_node in enumerate(list(cluster.get_nodes())):
             ip = str(fuel_node.get_ip(network))
-            ctx.merge_node(ConnCreds(ip, "root", key=fuel_key), set(fuel_node.get_roles()))
+            ctx.merge_node(ConnCreds(to_ip(ip), "root", key=fuel_key), set(fuel_node.get_roles()))
 
         logger.debug("Found {} FUEL nodes for env {}".format(count, fuel.openstack_env))
diff --git a/wally/node.py b/wally/node.py
index 9f4250b..2a57c65 100644
--- a/wally/node.py
+++ b/wally/node.py
@@ -158,7 +158,7 @@
         self.conn = conn
 
     def __str__(self) -> str:
-        return "Node(url={!r}, roles={!r})".format(self.info.ssh_creds, ",".join(self.info.roles))
+        return "Node({!r})".format(self.info.node_id())
 
     def __repr__(self) -> str:
         return str(self)
@@ -190,17 +190,15 @@
 
         return out
 
-    def copy_file(self, local_path: str, remote_path: str = None) -> str:
-        raise NotImplementedError()
+    def copy_file(self, local_path: str, remote_path: str = None, expand_user: bool = False) -> str:
+        data = open(local_path, 'rb').read()
+        return self.put_to_file(remote_path, data, expand_user)
 
-    def put_to_file(self, path: Optional[str], content: bytes) -> str:
-        raise NotImplementedError()
+    def put_to_file(self, path: Optional[str], content: bytes, expand_user: bool = False) -> str:
+        return self.conn.fs.store_file(path, content, expand_user)
 
-    def get_interface(self, ip: str) -> str:
-        raise NotImplementedError()
-
-    def stat_file(self, path: str) -> Any:
-        raise NotImplementedError()
+    def stat_file(self, path: str, expand_user: bool = False) -> Dict[str, int]:
+        return self.conn.fs.file_stat(path, expand_user)
 
     def __exit__(self, x, y, z) -> bool:
         self.disconnect(stop=True)
diff --git a/wally/node_interfaces.py b/wally/node_interfaces.py
index 8552a3d..c494b8d 100644
--- a/wally/node_interfaces.py
+++ b/wally/node_interfaces.py
@@ -66,6 +66,10 @@
     rpc_log_file = None  # type: str
 
     @abc.abstractmethod
+    def __str__(self) -> str:
+        pass
+
+    @abc.abstractmethod
     def run(self, cmd: str, timeout: int = 60, nolog: bool = False) -> str:
         pass
 
@@ -82,10 +86,6 @@
         pass
 
     @abc.abstractmethod
-    def get_interface(self, ip: str) -> str:
-        pass
-
-    @abc.abstractmethod
     def stat_file(self, path:str) -> Any:
         pass
 
diff --git a/wally/openstack.py b/wally/openstack.py
index 5e09d6d..5541d4c 100644
--- a/wally/openstack.py
+++ b/wally/openstack.py
@@ -1,7 +1,7 @@
 import os.path
 import socket
 import logging
-from typing import Dict, Any, List, Tuple, cast, Optional
+from typing import Dict, Any, List, Tuple, cast
 
 from .node_interfaces import NodeInfo
 from .config import ConfigBlock, Config
@@ -10,7 +10,7 @@
                             OSCreds, get_openstack_credentials, prepare_os, launch_vms, clear_nodes)
 from .test_run_class import TestRun
 from .stage import Stage, StepOrder
-from .utils import LogError, StopTestError, get_creds_openrc
+from .utils import LogError, StopTestError, get_creds_openrc, to_ip
 
 
 logger = logging.getLogger("wally")
@@ -131,7 +131,7 @@
             logger.debug("Found %s openstack service nodes" % len(host_services_mapping))
 
             for host, services in host_services_mapping.items():
-                creds = ConnCreds(host=host, user=user, passwd=password, key_file=key_file)
+                creds = ConnCreds(host=to_ip(host), user=user, passwd=password, key_file=key_file)
                 ctx.merge_node(creds, set(services))
             # TODO: log OS nodes discovery results
         else:
@@ -151,7 +151,7 @@
                 ensure_connected_to_openstack(ctx)
 
                 for ip, vm_id in find_vms(ctx.os_connection, vm_name_pattern):
-                    creds = ConnCreds(host=ip, user=user_name, key_file=private_key_path)
+                    creds = ConnCreds(host=to_ip(ip), user=user_name, key_file=private_key_path)
                     info = NodeInfo(creds, {'testnode'})
                     info.os_vm_id = vm_id
                     nid = info.node_id()
diff --git a/wally/openstack_api.py b/wally/openstack_api.py
index aa9d7f3..0f6c6fc 100644
--- a/wally/openstack_api.py
+++ b/wally/openstack_api.py
@@ -16,9 +16,8 @@
 from cinderclient.client import Client as CinderClient
 from glanceclient import Client as GlanceClient
 
-from .utils import Timeout
+from .utils import Timeout, to_ip
 from .node_interfaces import NodeInfo
-from .storage import IStorable
 from .ssh_utils import ConnCreds
 
 
@@ -454,7 +453,7 @@
     user = params['image']['user']
 
     for ip, os_node in create_vms_mt(conn, count, executor, **vm_params):
-        info = NodeInfo(ConnCreds(ip, user, key_file=private_key_path), set())
+        info = NodeInfo(ConnCreds(to_ip(ip), user, key_file=private_key_path), set())
         info.os_vm_id = os_node.id
         yield info
 
diff --git a/wally/run_test.py b/wally/run_test.py
index c72e456..891df5a 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -144,8 +144,9 @@
             logger.info("Skip explicid nodes filling, as all_nodes all ready in storage")
             return
 
-        for url, roles in ctx.config.get('explicit_nodes', {}).items():
+        for url, roles in ctx.config.get('nodes', {}).raw().items():
             ctx.merge_node(ssh_utils.parse_ssh_uri(url), set(roles.split(",")))
+            logger.debug("Add node %s with roles %s", url, roles)
 
 
 class SaveNodesStage(Stage):
diff --git a/wally/ssh_utils.py b/wally/ssh_utils.py
index efb67b3..9b3c074 100644
--- a/wally/ssh_utils.py
+++ b/wally/ssh_utils.py
@@ -1,12 +1,17 @@
 import re
 import yaml
 import getpass
+import logging
 from typing import List, Dict, Any
 
 
+from . import utils
 from .common_types import IPAddr
 
 
+logger = logging.getLogger("wally")
+
+
 class URIsNamespace:
     class ReParts:
         user_rr = "[^:]*?"
@@ -90,6 +95,7 @@
         if rrm is not None:
             params = {"user": getpass.getuser()}  # type: Dict[str, str]
             params.update(rrm.groupdict())
+            params['host'] = utils.to_ip(params['host'])
             return ConnCreds(**params)  # type: ignore
 
     raise ValueError("Can't parse {0!r} as ssh uri value".format(uri))
diff --git a/wally/storage.py b/wally/storage.py
index 93a7cdd..a17e3c0 100644
--- a/wally/storage.py
+++ b/wally/storage.py
@@ -105,6 +105,9 @@
 
     def list(self, path: str = "") -> Iterator[Tuple[bool, str]]:
         jpath = self.j(path)
+        if not os.path.exists(jpath):
+            return
+
         for entry in os.scandir(jpath):
             if not entry.name in ('..', '.'):
                 yield entry.is_file(), entry.name
diff --git a/wally/suits/io/ceph.cfg b/wally/suits/io/ceph.cfg
index 9287030..a44c749 100644
--- a/wally/suits/io/ceph.cfg
+++ b/wally/suits/io/ceph.cfg
@@ -1,9 +1,10 @@
 [global]
-include defaults.cfg
+include defaults_qd.cfg
 
-NUMJOBS_R={% 1, 5, 10, 15, 25, 40, 80, 120 %}
-NUMJOBS_W={% 1, 5, 10, 15, 25, 40 %}
-NUMJOBS_SEQ_OPS={% 1, 3, 10 %}
+QD_R={% 1, 5, 10, 15, 25, 40, 80, 120 %}
+QD_W={% 1, 5, 10, 15, 25, 40 %}
+QD_SEQ_R={% 1, 3, 10 %}
+QD_SEQ_W={% 1, 2, 4 %}
 
 ramp_time=30
 runtime=180
@@ -14,8 +15,7 @@
 [ceph_{TEST_SUMM}]
 blocksize=4k
 rw=randwrite
-sync=1
-numjobs={NUMJOBS_W}
+iodepth={QD_W}
 
 # ---------------------------------------------------------------------
 # check different thread count, direct read mode. (latency, iops) = func(th_count)
@@ -25,24 +25,34 @@
 blocksize=4k
 rw=randread
 direct=1
-numjobs={NUMJOBS_R}
+iodepth={QD_R}
 
 # ---------------------------------------------------------------------
-# direct write
+# sync write
 # ---------------------------------------------------------------------
 [ceph_{TEST_SUMM}]
 blocksize=4k
 rw=randwrite
 direct=1
+sync=1
 numjobs=1
 
 # ---------------------------------------------------------------------
-# this is essentially sequential write/read operations
+# this is essentially sequential write operations
 # we can't use sequential with numjobs > 1 due to caching and block merging
 # ---------------------------------------------------------------------
 [ceph_{TEST_SUMM}]
 blocksize=16m
-rw={% randread, randwrite %}
+rw=randwrite
 direct=1
-numjobs={NUMJOBS_SEQ_OPS}
+iodepth={QD_SEQ_W}
 
+# ---------------------------------------------------------------------
+# this is essentially sequential read operations
+# we can't use sequential with numjobs > 1 due to caching and block merging
+# ---------------------------------------------------------------------
+[ceph_{TEST_SUMM}]
+blocksize=16m
+rw=randread
+direct=1
+iodepth={QD_SEQ_R}
diff --git a/wally/suits/io/defaults_qd.cfg b/wally/suits/io/defaults_qd.cfg
index 873e6b7..0418e8a 100644
--- a/wally/suits/io/defaults_qd.cfg
+++ b/wally/suits/io/defaults_qd.cfg
@@ -1,5 +1,6 @@
 buffered=0
 direct=1
+sync=0
 ioengine=libaio
 
 group_reporting=1
@@ -9,16 +10,17 @@
 thread=1
 time_based=1
 wait_for_previous=1
+per_job_logs=0
 
 # this is critical for correct results in multy-node run
 randrepeat=0
 
 filename={FILENAME}
-size={TEST_FILE_SIZE}
-iodepth={QD}
+size={FILESIZE}
 
 write_iops_log=fio_iops_log
+write_bw_log=fio_ibw_log
 log_avg_msec=1000
-write_hist_log=fio_log_h
+write_hist_log=fio_lat_hist_log
 log_hist_msec=1000
 log_unix_epoch=1
diff --git a/wally/suits/io/fio.py b/wally/suits/io/fio.py
index 1b5f38e..e055d98 100644
--- a/wally/suits/io/fio.py
+++ b/wally/suits/io/fio.py
@@ -1,14 +1,14 @@
 import os.path
 import logging
-from typing import Dict, List, Union, cast
+from typing import cast
 
 import wally
 
-from ...utils import ssize2b, StopTestError, get_os
+from ...utils import StopTestError, get_os, ssize2b
 from ...node_interfaces import IRPCNode
 from ..itest import ThreadedTest, IterationConfig, RunTestRes
-from .fio_task_parser import execution_time, fio_cfg_compile, FioJobSection, FioParams
-
+from .fio_task_parser import execution_time, fio_cfg_compile, FioJobSection, FioParams, get_log_files
+from . import rpc_plugin
 
 logger = logging.getLogger("wally")
 
@@ -23,82 +23,97 @@
 
         get = self.config.params.get
 
+        self.remote_task_file = self.join_remote("task.fio")
+        self.remote_output_file = self.join_remote("fio_result.json")
+        self.use_system_fio = get('use_system_fio', False)  # type: bool
+        self.use_sudo = get("use_sudo", True)  # type: bool
+        self.force_prefill = get('force_prefill', False)  # type: bool
+
         self.load_profile_name = self.config.params['load']  # type: str
         self.name = "io." + self.load_profile_name
 
         if os.path.isfile(self.load_profile_name):
-            self.load_profile_path = os.path.join(self.configs_dir, self.load_profile_name+ '.cfg')  # type: str
+            self.load_profile_path = self.load_profile_name   # type: str
         else:
-            self.load_profile_path = self.load_profile_name
+            self.load_profile_path = os.path.join(self.configs_dir, self.load_profile_name+ '.cfg')
 
         self.load_profile = open(self.load_profile_path, 'rt').read()  # type: str
 
-        self.use_system_fio = get('use_system_fio', False)  # type: bool
-
         if self.use_system_fio:
             self.fio_path = "fio"    # type: str
         else:
             self.fio_path = os.path.join(self.config.remote_dir, "fio")
 
-        self.force_prefill = get('force_prefill', False)  # type: bool
+        self.load_params = self.config.params['params']
+        self.file_name = self.load_params['FILENAME']
 
-        if 'FILESIZE' not in self.config.params:
-            raise NotImplementedError("File size detection is not implemented")
+        if 'FILESIZE' not in self.load_params:
+            logger.debug("Getting test file sizes on all nodes")
+            try:
+                sizes = {node.conn.fs.file_stat(self.file_name)['size']
+                         for node in self.config.nodes}
+            except Exception:
+                logger.exception("FILESIZE is not set in config file and fail to detect it." +
+                                 "Set FILESIZE or fix error and rerun test")
+                raise StopTestError()
 
-        # self.max_latency = get("max_lat")  # type: Optional[int]
-        # self.min_bw_per_thread = get("min_bw")   # type: Optional[int]
+            if len(sizes) != 1:
+                logger.error("IO target file %r has different sizes on test nodes - %r",
+                             self.file_name, sizes)
+                raise StopTestError()
 
-        self.use_sudo = get("use_sudo", True)  # type: bool
+            self.file_size = list(sizes)[0]
+            logger.info("Detected test file size is %s", self.file_size)
+            self.load_params['FILESIZE'] = self.file_size
+        else:
+            self.file_size = ssize2b(self.load_params['FILESIZE'])
 
-        self.fio_configs = list(fio_cfg_compile(self.load_profile,
-                                                self.load_profile_path,
-                                                cast(FioParams, self.config.params)))
+        self.fio_configs = list(fio_cfg_compile(self.load_profile, self.load_profile_path,
+                                                cast(FioParams, self.load_params)))
 
         if len(self.fio_configs) == 0:
-            logger.exception("Empty fio config provided")
-            raise StopTestError("Empty fio config provided")
+            logger.error("Empty fio config provided")
+            raise StopTestError()
 
         self.iterations_configs = self.fio_configs  # type: ignore
-        self.files_sizes = self.get_file_sizes()
-
         self.exec_folder = self.config.remote_dir
-        self.fio_path = "" if self.use_system_fio else self.exec_folder
-
-    def get_file_sizes(self) -> Dict[str, int]:
-        files_sizes = {}  # type: Dict[str, int]
-
-        for section in self.fio_configs:
-            sz = ssize2b(section.vals['size'])
-            msz = sz // (1024 ** 2) + (1 if sz % (1024 ** 2) != 0 else 0)
-            fname = section.vals['filename']  # type: str
-
-            # if already has other test with the same file name
-            # take largest size
-            files_sizes[fname] = max(files_sizes.get(fname, 0), msz)
-
-        return files_sizes
 
     def config_node(self, node: IRPCNode) -> None:
+        plugin_code = open(rpc_plugin.__file__.rsplit(".", 1)[0] + ".py", "rb").read()
+        node.upload_plugin(code=plugin_code, name="fio")
+
         try:
-            node.conn.rmdir(self.config.remote_dir, recursive=True, ignore_missing=True)
-            node.conn.mkdir(self.config.remote_dir)
-        except Exception as exc:
-            msg = "Failed to create folder {} on remote {}.".format(self.config.remote_dir, node)
+            node.conn.fs.rmtree(self.config.remote_dir)
+        except Exception:
+            pass
+
+        try:
+            node.conn.fs.makedirs(self.config.remote_dir)
+        except Exception:
+            msg = "Failed to recreate folder {} on remote {}.".format(self.config.remote_dir, node)
             logger.exception(msg)
-            raise StopTestError(msg) from exc
+            raise StopTestError()
 
         self.install_utils(node)
-        logger.info("Prefilling test files with random data")
-        fill_bw = node.conn.prefill_test_files(self.files_sizes, force=self.force_prefill, fio_path=self.fio_path)
+
+        mb = int(self.file_size / 1024 ** 2)
+        logger.info("Filling test file %s with %sMiB of random data", self.file_name, mb)
+        fill_bw = node.conn.fio.fill_file(self.file_name, mb, force=self.force_prefill, fio_path=self.fio_path)
         if fill_bw is not None:
-            logger.info("Initial fio fill bw is {} MiBps for {}".format(fill_bw, node.info.node_id()))
+            logger.info("Initial fio fill bw is {} MiBps for {}".format(fill_bw, node))
+
+        fio_config = "\n".join(map(str, self.iterations_configs))
+        node.put_to_file(self.remote_task_file, fio_config.encode("utf8"))
 
     def install_utils(self, node: IRPCNode) -> None:
+        os_info = get_os(node)
         if self.use_system_fio:
-            node.conn.install('fio', binary='fio')
-
-        if not self.use_system_fio:
-            os_info = get_os(node)
+            if os_info.distro != 'ubuntu':
+                logger.error("Only ubuntu supported on test VM")
+                raise StopTestError()
+            node.conn.fio.install('fio', binary='fio')
+        else:
+            node.conn.fio.install('bzip2', binary='bzip2')
             fio_dir = os.path.dirname(os.path.dirname(wally.__file__))  # type: str
             fio_dir = os.path.join(os.getcwd(), fio_dir)
             fio_dir = os.path.join(fio_dir, 'fio_binaries')
@@ -106,23 +121,32 @@
             fio_path = os.path.join(fio_dir, fname)  # type: str
 
             if not os.path.exists(fio_path):
-                raise RuntimeError("No prebuild fio binary available for {0}".format(os_info))
+                logger.error("No prebuild fio binary available for {0}".format(os_info))
+                raise StopTestError()
 
             bz_dest = self.join_remote('fio.bz2')  # type: str
             node.copy_file(fio_path, bz_dest)
-            node.run("bzip2 --decompress {}" + bz_dest)
-            node.run("chmod a+x " + self.join_remote("fio"))
+            node.run("bzip2 --decompress {} ; chmod a+x {}".format(bz_dest, self.join_remote("fio")))
 
     def get_expected_runtime(self, iteration_info: IterationConfig) -> int:
         return execution_time(cast(FioJobSection, iteration_info))
 
     def do_test(self, node: IRPCNode, iter_config: IterationConfig) -> RunTestRes:
         exec_time = execution_time(cast(FioJobSection, iter_config))
-        raw_res = node.conn.fio.run_fio(self.fio_path,
-                                        self.exec_folder,
-                                        str(cast(FioJobSection, iter_config)),
-                                        exec_time + max(300, exec_time))
+        fio_cmd_templ = "cd {exec_folder}; " + \
+                        "{fio_path} --output-format=json --output={out_file} --alloc-size=262144 {job_file}"
+
+        bw_log, iops_log, lat_hist_log = get_log_files(iter_config)
+
+        cmd = fio_cmd_templ.format(exec_folder=self.exec_folder,
+                                   fio_path=self.fio_path,
+                                   out_file=self.remote_output_file,
+                                   job_file=self.remote_task_file)
+        raw_res = node.run(cmd, timeout=exec_time + max(300, exec_time))
+        
+        return
+
         # TODO(koder): fix next error
-        raise NotImplementedError("Need to extract time from test result")
-        return raw_res, (0, 0)
+        # raise NotImplementedError("Need to extract time from test result")
+        # return raw_res, (0, 0)
 
diff --git a/wally/suits/io/fio_task_parser.py b/wally/suits/io/fio_task_parser.py
index 1bdbb15..aaf4b36 100644
--- a/wally/suits/io/fio_task_parser.py
+++ b/wally/suits/io/fio_task_parser.py
@@ -291,7 +291,7 @@
 MAGIC_OFFSET = 0.1885
 
 
-def finall_process(sec: FioJobSection, counter: List[int] = [0]) -> FioJobSection:
+def final_process(sec: FioJobSection, counter: List[int] = [0]) -> FioJobSection:
     sec = sec.copy()
 
     sec.vals['unified_rw_reporting'] = '1'
@@ -362,7 +362,7 @@
     return TestSumm(rw,
                     sync_mode,
                     vals['blocksize'],
-                    vals['iodepth'],
+                    vals.get('iodepth', '1'),
                     vm_count)
 
 
@@ -398,11 +398,15 @@
             yield res
 
 
+def get_log_files(sec: FioJobSection) -> Tuple[Optional[str], Optional[str], Optional[str]]:
+    return sec.vals.get('write_iops_log'), sec.vals.get('write_bw_log'), sec.vals.get('write_hist_log')
+
+
 def fio_cfg_compile(source: str, fname: str, test_params: FioParams) -> Iterator[FioJobSection]:
     it = parse_all_in_1(source, fname)
     it = (apply_params(sec, test_params) for sec in it)
     it = flatmap(process_cycles, it)
-    return map(finall_process, it)
+    return map(final_process, it)
 
 
 def parse_args(argv):
diff --git a/wally/suits/io/rpc_plugin.py b/wally/suits/io/rpc_plugin.py
index 8e2e09f..306af28 100644
--- a/wally/suits/io/rpc_plugin.py
+++ b/wally/suits/io/rpc_plugin.py
@@ -2,29 +2,19 @@
 import time
 import stat
 import random
+import logging
 import subprocess
 
 
-def rpc_run_fio(cfg):
-    fio_cmd_templ = "cd {exec_folder}; {fio_path}fio --output-format=json " + \
-                    "--output={out_file} --alloc-size=262144 {job_file}"
+mod_name = "fio"
+__version__ = (0, 1)
 
-    result = {
-        "name": [float],
-        "lat_name": [[float]]
-    }
 
-    return result
-    # fnames_before = node.run("ls -1 " + exec_folder, nolog=True)
-    #
-    # timeout = int(exec_time + max(300, exec_time))
-    # soft_end_time = time.time() + exec_time
-    # logger.error("Fio timeouted on node {}. Killing it".format(node))
-    # end = time.time()
-    # fnames_after = node.run("ls -1 " + exec_folder, nolog=True)
-    #
+logger = logging.getLogger("agent.fio")
+SensorsMap = {}
 
-def rpc_check_file_prefilled(path, used_size_mb):
+
+def check_file_prefilled(path, used_size_mb):
     used_size = used_size_mb * 1024 ** 2
     blocks_to_check = 16
 
@@ -48,42 +38,24 @@
     return False
 
 
-def rpc_prefill_test_files(files, force=False, fio_path='fio'):
-    cmd_templ = "{0} --name=xxx --filename={1} --direct=1" + \
-                " --bs=4m --size={2}m --rw=write"
+def rpc_fill_file(fname, size, force=False, fio_path='fio'):
+    if not force:
+        if not check_file_prefilled(fname, size):
+            return
 
-    ssize = 0
-    ddtime = 0.0
+    assert size % 4 == 0, "File size must be proportional to 4M"
 
-    for fname, curr_sz in files.items():
-        if not force:
-            if not rpc_check_file_prefilled(fname, curr_sz):
-                continue
+    cmd_templ = "{} --name=xxx --filename={} --direct=1 --bs=4m --size={}m --rw=write"
 
-        cmd = cmd_templ.format(fio_path, fname, curr_sz)
-        ssize += curr_sz
+    run_time = time.time()
+    subprocess.check_output(cmd_templ.format(fio_path, fname, size), shell=True)
+    run_time = time.time() - run_time
 
-        stime = time.time()
-        subprocess.check_call(cmd)
-        ddtime += time.time() - stime
-
-    if ddtime > 1.0:
-        return int(ssize / ddtime)
-
-    return None
+    return None if run_time < 1.0 else int(size / run_time)
 
 
-def load_fio_log_file(fname):
-    with open(fname) as fd:
-        it = [ln.split(',')[:2] for ln in fd]
-
-    return [(float(off) / 1000,  # convert us to ms
-            float(val.strip()) + 0.5)  # add 0.5 to compemsate average value
-                                       # as fio trimm all values in log to integer
-            for off, val in it]
-
-
-
-
-
-
+def rpc_install(name, binary):
+    try:
+        subprocess.check_output("which {}".format(binary), shell=True)
+    except:
+        subprocess.check_output("apt-get install -y {}".format(name), shell=True)
diff --git a/wally/suits/io/rrd.cfg b/wally/suits/io/rrd.cfg
index 86de738..1075aea 100644
--- a/wally/suits/io/rrd.cfg
+++ b/wally/suits/io/rrd.cfg
@@ -1,6 +1,5 @@
 [global]
-include defaults.cfg
-NUMJOBS=8
+include defaults_qd.cfg
 ramp_time=5
 runtime=5
 
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index 8636596..f328e13 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -12,8 +12,6 @@
 from ..storage import Storage
 from ..result_classes import RawTestResults
 
-import agent
-
 
 logger = logging.getLogger("wally")
 
@@ -70,7 +68,7 @@
         return os.path.join(self.config.remote_dir, path)
 
     @abc.abstractmethod
-    def run(self, storage: Storage) -> None:
+    def run(self) -> None:
         pass
 
     @abc.abstractmethod
@@ -98,9 +96,15 @@
         pass
 
     def get_not_done_stages(self, storage: Storage) -> Dict[int, IterationConfig]:
-        start_run_id = max(int(name) for _, name in storage.list('result')) + 1
+        done_stages = list(storage.list('result'))
+        if len(done_stages) == 0:
+            start_run_id = 0
+        else:
+            start_run_id = max(int(name) for _, name in done_stages) + 1
+
         not_in_storage = {}  # type: Dict[int, IterationConfig]
-        for run_id, iteration_config in enumerate(self.iterations_configs, start_run_id):
+
+        for run_id, iteration_config in enumerate(self.iterations_configs[start_run_id:], start_run_id):
             info_path = "result/{}/info".format(run_id)
             if info_path in storage:
                 info = cast(Dict[str, Any], storage[info_path]) # type: Dict[str, Any]
@@ -131,8 +135,8 @@
                 not_in_storage[run_id] = iteration_config
         return not_in_storage
 
-    def run(self, storage: Storage) -> None:
-        not_in_storage = self.get_not_done_stages(storage)
+    def run(self) -> None:
+        not_in_storage = self.get_not_done_stages(self.config.storage)
 
         if not not_in_storage:
             logger.info("All test iteration in storage already. Skip test")
@@ -171,9 +175,6 @@
                         if self.max_retry - 1 == idx:
                             raise StopTestError("Fio failed") from exc
                         logger.exception("During fio run")
-                    else:
-                        if all(results):
-                            break
 
                     logger.info("Sleeping %ss and retrying", self.retry_time)
                     time.sleep(self.retry_time)
@@ -181,7 +182,7 @@
                 start_times = []  # type: List[int]
                 stop_times = []  # type: List[int]
 
-                mstorage = storage.sub_storage("result", str(run_id), "measurement")
+                mstorage = self.config.storage.sub_storage("result", str(run_id), "measurement")
                 for (result, (t_start, t_stop)), node in zip(results, self.config.nodes):
                     for metrics_name, data in result.items():
                         mstorage[node.info.node_id(), metrics_name] = data  # type: ignore
@@ -214,7 +215,7 @@
                     'end_time': max_stop_time
                 }
 
-                storage["result", str(run_id), "info"] = test_config  # type: ignore
+                self.config.storage["result", str(run_id), "info"] = test_config  # type: ignore
 
     @abc.abstractmethod
     def config_node(self, node: IRPCNode) -> None:
diff --git a/wally/test_run_class.py b/wally/test_run_class.py
index 40ce395..a0c014f 100644
--- a/wally/test_run_class.py
+++ b/wally/test_run_class.py
@@ -1,4 +1,4 @@
-from typing import List, Callable, Any, Dict, Optional, Set, Union
+from typing import List, Callable, Any, Dict, Optional, Set
 from concurrent.futures import ThreadPoolExecutor
 
 
diff --git a/wally/utils.py b/wally/utils.py
index 45b67b4..dd9cdd5 100644
--- a/wally/utils.py
+++ b/wally/utils.py
@@ -437,3 +437,14 @@
         if not self.tick():
             raise StopIteration()
         return self.end_time - time.time()
+
+
+def to_ip(host_or_ip: str) -> str:
+    # translate hostname to address
+    try:
+        ipaddress.ip_address(host_or_ip)
+        return host_or_ip
+    except ValueError:
+        ip_addr = socket.gethostbyname(host_or_ip)
+        logger.info("Will use ip_addr %r instead of hostname %r", ip_addr, host_or_ip)
+        return ip_addr