test code working
diff --git a/wally/hw_info.py b/wally/hw_info.py
index 2a5c1e5..e81a5c1 100644
--- a/wally/hw_info.py
+++ b/wally/hw_info.py
@@ -129,6 +129,12 @@
     except OSError:
         res.libvirt_version = None
 
+    # try:
+    #     # dpkg -l ??
+    #     res.libvirt_version = node.run("virsh -v", nolog=True).strip()
+    # except OSError:
+    #     res.libvirt_version = None
+
     try:
         res.qemu_version = node.run("qemu-system-x86_64 --version", nolog=True).strip()
     except OSError:
diff --git a/wally/main.py b/wally/main.py
index f57e1a5..6396b6b 100644
--- a/wally/main.py
+++ b/wally/main.py
@@ -40,7 +40,8 @@
 from .ceph import DiscoverCephStage
 from .openstack import DiscoverOSStage
 from .fuel import DiscoverFuelStage
-from .run_test import CollectInfoStage, ExplicitNodesStage, SaveNodesStage, RunTestsStage, ConnectStage, SleepStage
+from .run_test import (CollectInfoStage, ExplicitNodesStage, SaveNodesStage,
+                       RunTestsStage, ConnectStage, SleepStage, PrepareNodes)
 from .report import ConsoleReportStage, HtmlReportStage
 from .sensors import StartSensorsStage, CollectSensorsStage
 
@@ -219,6 +220,7 @@
         stages.append(CollectSensorsStage())
         stages.append(ConnectStage())
         stages.append(SleepStage())
+        stages.append(PrepareNodes())
 
         if not opts.dont_collect:
             stages.append(CollectInfoStage())
diff --git a/wally/node.py b/wally/node.py
index 2a57c65..6624bdc 100644
--- a/wally/node.py
+++ b/wally/node.py
@@ -164,12 +164,12 @@
         return str(self)
 
     def get_file_content(self, path: str) -> bytes:
-        logger.debug("GET %s", path)
-        res = self.conn.fs.get_file(path, expanduser=True)
-        logger.debug("Receive %s bytes from %s", len(res), path)
+        logger.debug("GET %s from %s", path, self.info)
+        res = self.conn.fs.get_file(self.conn.fs.expanduser(path))
+        logger.debug("Download %s bytes from remote file %s from %s", len(res), path, self.info)
         return res
 
-    def run(self, cmd: str, timeout: int = 60, nolog: bool = False) -> str:
+    def run(self, cmd: str, timeout: int = 60, nolog: bool = False, check_timeout: float = 0.01) -> str:
         if not nolog:
             logger.debug("Node %s - run %s", self.info.node_id(), cmd)
 
@@ -182,7 +182,7 @@
             out += outb.decode("utf8")
             if code is not None:
                 break
-            time.sleep(0.01)
+            time.sleep(check_timeout)
 
         if code != 0:
             templ = "Node {} - cmd {!r} failed with code {}. Output: {!r}."
@@ -190,15 +190,15 @@
 
         return out
 
-    def copy_file(self, local_path: str, remote_path: str = None, expand_user: bool = False) -> str:
+    def copy_file(self, local_path: str, remote_path: str = None) -> str:
         data = open(local_path, 'rb').read()
-        return self.put_to_file(remote_path, data, expand_user)
+        return self.put_to_file(remote_path, data)
 
-    def put_to_file(self, path: Optional[str], content: bytes, expand_user: bool = False) -> str:
-        return self.conn.fs.store_file(path, content, expand_user)
+    def put_to_file(self, path: Optional[str], content: bytes) -> str:
+        return self.conn.fs.store_file(path, content)
 
-    def stat_file(self, path: str, expand_user: bool = False) -> Dict[str, int]:
-        return self.conn.fs.file_stat(path, expand_user)
+    def stat_file(self, path: str) -> Dict[str, int]:
+        return self.conn.fs.file_stat(path)
 
     def __exit__(self, x, y, z) -> bool:
         self.disconnect(stop=True)
@@ -217,6 +217,23 @@
         self.conn = None
 
 
+def get_node_python_27(node: ISSHHost) -> Optional[str]:
+    python_cmd = None  # type: Optional[str]
+    try:
+        python_cmd = node.run('which python2.7').strip()
+    except Exception as exc:
+        pass
+
+    if python_cmd is None:
+        try:
+            if '2.7' in node.run('python --version'):
+                python_cmd = node.run('which python').strip()
+        except Exception as exc:
+            pass
+
+    return python_cmd
+
+
 def setup_rpc(node: ISSHHost,
               rpc_server_code: bytes,
               plugins: Dict[str, bytes] = None,
@@ -224,19 +241,27 @@
               log_level: str = None) -> IRPCNode:
 
     logger.debug("Setting up RPC connection to {}".format(node.info))
+    python_cmd = get_node_python_27(node)
+    if python_cmd:
+        logger.debug("python2.7 on node {} path is {}".format(node.info, python_cmd))
+    else:
+        logger.error(("Can't find python2.7 on node {}. " +
+                      "Install python2.7 and rerun test").format(node.info))
+        raise ValueError("Python not found")
+
     code_file = node.put_to_file(None, rpc_server_code)
     ip = node.info.ssh_creds.addr.host
 
     log_file = None  # type: Optional[str]
     if log_level:
         log_file = node.run("mktemp", nolog=True).strip()
-        cmd = "python {} --log-level={} server --listen-addr={}:{} --daemon --show-settings"
-        cmd = cmd.format(code_file, log_level, ip, port) + " --stdout-file={}".format(log_file)
+        cmd = "{} {} --log-level={} server --listen-addr={}:{} --daemon --show-settings"
+        cmd = cmd.format(python_cmd, code_file, log_level, ip, port) + " --stdout-file={}".format(log_file)
         logger.info("Agent logs for node {} stored on node in file {}. Log level is {}".format(
             node.info.node_id(), log_file, log_level))
     else:
-        cmd = "python {} --log-level=CRITICAL server --listen-addr={}:{} --daemon --show-settings"
-        cmd = cmd.format(code_file, ip, port)
+        cmd = "{} {} --log-level=CRITICAL server --listen-addr={}:{} --daemon --show-settings"
+        cmd = cmd.format(python_cmd, code_file, ip, port)
 
     params_js = node.run(cmd).strip()
     params = json.loads(params_js)
diff --git a/wally/node_interfaces.py b/wally/node_interfaces.py
index c494b8d..caca8cc 100644
--- a/wally/node_interfaces.py
+++ b/wally/node_interfaces.py
@@ -70,7 +70,7 @@
         pass
 
     @abc.abstractmethod
-    def run(self, cmd: str, timeout: int = 60, nolog: bool = False) -> str:
+    def run(self, cmd: str, timeout: int = 60, nolog: bool = False, check_timeout: float = 0.01) -> str:
         pass
 
     @abc.abstractmethod
@@ -90,7 +90,7 @@
         pass
 
     @abc.abstractmethod
-    def disconnect(self) -> str:
+    def disconnect(self) -> None:
         pass
 
     @abc.abstractmethod
diff --git a/wally/openstack.py b/wally/openstack.py
index 5541d4c..3b5ced5 100644
--- a/wally/openstack.py
+++ b/wally/openstack.py
@@ -1,7 +1,7 @@
 import os.path
 import socket
 import logging
-from typing import Dict, Any, List, Tuple, cast
+from typing import Dict, Any, List, Tuple, cast, Optional
 
 from .node_interfaces import NodeInfo
 from .config import ConfigBlock, Config
diff --git a/wally/result_classes.py b/wally/result_classes.py
index 9b488b7..19925f2 100644
--- a/wally/result_classes.py
+++ b/wally/result_classes.py
@@ -1,19 +1,30 @@
-from typing import Union, Dict, List, Any, Tuple
-
-# Stores test result for integral value, which
-# can be expressed as a single value for given time period,
-# like IO, BW, etc.
-TimeSeriesIntegral = List[float]
+import array
+from typing import Dict, List, Any, Tuple, Optional
 
 
-# Stores test result for value, which
-# requires distribution to be stored for any time period,
-# like latency.
-TimeSeriesHistogram = List[List[float]]
+class TimeSerie:
+    name = None  # type: str
+    start_at = None  # type: int
+    step = None  # type: int
+    data = None  # type: List[int]
+    second_axis_size = None  # type: int
+    raw = None  # type: Optional[bytes]
 
+    def __init__(self, name: str, raw: Optional[bytes], second_axis_size: int,
+                 start_at: int, step: int, data: array.array) -> None:
+        self.name = name
+        self.start_at = start_at
+        self.step = step
+        self.second_axis_size = second_axis_size
+        self.data = data # type: ignore
+        self.raw = raw
 
-TimeSeries = Union[TimeSeriesIntegral, TimeSeriesHistogram]
-RawTestResults = Dict[str, TimeSeries]
+    def meta(self) -> Dict[str, Any]:
+        return {
+            "start_at": self.start_at,
+            "step": self.step,
+            "second_axis_size": self.second_axis_size
+        }
 
 
 class SensorInfo:
@@ -23,7 +34,7 @@
     sensor_name = None  # type: str
     begin_time = None  # type: int
     end_time = None  # type: int
-    data = None  # type: TimeSeries
+    data = None  # type: List[int]
 
     def __init__(self, node_id: str, source_id: str, sensor_name: str) -> None:
         self.node_id = node_id
@@ -43,12 +54,30 @@
     node_ids = None # type: List[str]
 
 
+class NodeTestResults:
+    name = None  # type: str
+    node_id = None  # type: str
+    summary = None  # type: str
+
+    load_start_at = None  # type: int
+    load_stop_at = None  # type: int
+
+    series = None  # type: Dict[str, TimeSerie]
+
+    def __init__(self, name: str, node_id: str, summary: str) -> None:
+        self.name = name
+        self.node_id = node_id
+        self.summary = summary
+        self.series = {}
+        self.extra_logs = {}  # type: Dict[str, bytes]
+
+
 class FullTestResult:
     test_info = None  # type: TestInfo
 
     # TODO(koder): array.array or numpy.array?
     # {(node_id, perf_metrics_name): values}
-    performance_data = None  # type: Dict[Tuple[str, str], TimeSeries]
+    performance_data = None  # type: Dict[Tuple[str, str], List[int]]
 
     # {(node_id, perf_metrics_name): values}
     sensors_data = None  # type: Dict[Tuple[str, str, str], SensorInfo]
diff --git a/wally/run_test.py b/wally/run_test.py
index 891df5a..52803d1 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -1,4 +1,5 @@
 import time
+import json
 import logging
 from concurrent.futures import Future
 from typing import List, Dict, Tuple, Optional, Union, cast
@@ -6,7 +7,7 @@
 from . import utils, ssh_utils, hw_info
 from .config import ConfigBlock
 from .node import setup_rpc, connect
-from .node_interfaces import NodeInfo, IRPCNode
+from .node_interfaces import NodeInfo, IRPCNode, ISSHHost
 from .stage import Stage, StepOrder
 from .suits.io.fio import IOPerfTest
 from .suits.itest import TestInputConfig
@@ -39,6 +40,7 @@
             def connect_ext(node_info: NodeInfo) -> Tuple[bool, Union[IRPCNode, NodeInfo]]:
                 try:
                     ssh_node = connect(node_info, conn_timeout=ctx.config.connect_timeout)
+
                     return True, setup_rpc(ssh_node,
                                            ctx.rpc_code,
                                            ctx.default_rpc_plugins,
@@ -66,7 +68,7 @@
                 logger.warning(msg.format(", ".join(map(str, failed_nodes))))
 
             if failed_testnodes:
-                msg = "Can't connect to testnode(s) " + ",".join(map(str, failed_testnodes))
+                msg = "Can't start RPC on testnode(s) " + ",".join(map(str, failed_testnodes))
                 logger.error(msg)
                 raise utils.StopTestError(msg)
 
@@ -74,9 +76,6 @@
                 logger.info("All nodes connected successfully")
 
     def cleanup(self, ctx: TestRun) -> None:
-        # TODO(koder): what next line was for?
-        # ssh_utils.close_all_sessions()
-
         if ctx.config.get("download_rpc_logs", False):
             for node in ctx.nodes:
                 if node.rpc_log_file is not None:
@@ -94,7 +93,7 @@
 class CollectInfoStage(Stage):
     """Collect node info"""
 
-    priority = StepOrder.START_SENSORS - 1
+    priority = StepOrder.START_SENSORS - 2
     config_block = 'collect_info'
 
     def run(self, ctx: TestRun) -> None:
@@ -169,6 +168,52 @@
         time.sleep(ctx.config.sleep)
 
 
+class PrepareNodes(Stage):
+    priority = StepOrder.START_SENSORS - 1
+
+    def __init__(self):
+        Stage.__init__(self)
+        self.nodeepscrub_updated = False
+        self.noscrub_updated = False
+
+    def run(self, ctx: TestRun) -> None:
+        ceph_sett = ctx.config.get('ceph_settings', "").split()
+        if ceph_sett:
+            for node in ctx.nodes:
+                if "ceph-mon" in node.info.roles or "ceph-osd" in node.info.roles:
+                    state = json.loads(node.run("ceph health --format json"))["summary"]["summary"]
+                    if 'noscrub' in ceph_sett:
+                        if 'noscrub' in state:
+                            logger.debug("noscrub already set on cluster")
+                        else:
+                            logger.info("Applying noscrub settings to ceph cluster")
+                            node.run("ceph osd set noscrub")
+                            self.noscrub_updated = True
+
+                    if 'nodeepscrub' in ceph_sett:
+                        if 'nodeepscrub' in state:
+                            logger.debug("noscrub already set on cluster")
+                        else:
+                            logger.info("Applying noscrub settings to ceph cluster")
+                            node.run("ceph osd set noscrub")
+                            self.nodeepscrub_updated = True
+                    break
+
+    def cleanup(self, ctx: TestRun) -> None:
+        if self.nodeepscrub_updated or self.noscrub_updated:
+            for node in ctx.nodes:
+                if "ceph-mon" in node.info.roles or "ceph-osd" in node.info.roles :
+                    if self.noscrub_updated:
+                        logger.info("Reverting noscrub setting for ceph cluster")
+                        node.run("ceph osd unset noscrub")
+                        self.noscrub_updated = False
+
+                    if self.nodeepscrub_updated:
+                        logger.info("Reverting noscrub setting for ceph cluster")
+                        node.run("ceph osd unset nodeepscrub")
+                        self.nodeepscrub_updated = False
+
+
 class RunTestsStage(Stage):
 
     priority = StepOrder.TEST
diff --git a/wally/sensors.py b/wally/sensors.py
index eef7864..0a1bbe9 100644
--- a/wally/sensors.py
+++ b/wally/sensors.py
@@ -9,7 +9,7 @@
 from .stage import Stage, StepOrder
 
 plugin_fname = sensors_rpc_plugin.__file__.rsplit(".", 1)[0] + ".py"
-SENSORS_PLUGIN_CODE = open(plugin_fname, "rb").read()
+SENSORS_PLUGIN_CODE = open(plugin_fname, "rb").read()  # type: bytes
 
 
 logger = logging.getLogger("wally")
@@ -27,6 +27,16 @@
             logger.critical(message.format(array.array('L').itemsize))
             raise utils.StopTestError()
 
+        # TODO: need carefully fix this
+        # sensors config is:
+        #   role:
+        #     sensor: [str]
+        # or
+        #  role:
+        #     sensor:
+        #        allowed: [str]
+        #        dissallowed: [str]
+        #        params: Any
         per_role_config = {}  # type: Dict[str, Dict[str, str]]
 
         for name, val in ctx.config.sensors.roles_mapping.raw().items():
@@ -41,7 +51,7 @@
             all_roles = set(per_role_config)
 
             for node in ctx.nodes:
-                all_roles.update(node.info.roles)
+                all_roles.update(node.info.roles)  # type: ignore
 
             for name, vals in list(per_role_config.items()):
                 new_vals = all_vl.copy()
@@ -51,14 +61,14 @@
         for node in ctx.nodes:
             node_cfg = {}  # type: Dict[str, Dict[str, str]]
             for role in node.info.roles:
-                node_cfg.update(per_role_config.get(role, {}))
+                node_cfg.update(per_role_config.get(role, {}))  # type: ignore
 
             nid = node.info.node_id()
             if node_cfg:
                 # ceph requires additional settings
                 if 'ceph' in node_cfg:
                     node_cfg['ceph'].update(node.info.params['ceph'])
-                    node_cfg['ceph']['osds'] = [osd.id for osd in node.info.params['ceph-osds']]
+                    node_cfg['ceph']['osds'] = [osd.id for osd in node.info.params['ceph-osds']]  # type: ignore
 
                 logger.debug("Setting up sensort RPC plugin for node %s", nid)
                 node.upload_plugin("sensors", SENSORS_PLUGIN_CODE)
diff --git a/wally/sensors_rpc_plugin.py b/wally/sensors_rpc_plugin.py
index 1c508ee..55d8b84 100644
--- a/wally/sensors_rpc_plugin.py
+++ b/wally/sensors_rpc_plugin.py
@@ -157,7 +157,7 @@
         for line in open('/proc/diskstats'):
             dev_name = line.split()[2]
             if self.is_dev_accepted(dev_name) and not dev_name[-1].isdigit():
-                self.accepted_devs.add(dev_name)
+                self.allowed_devs.add(dev_name)
 
     def collect(self):
         for line in open('/proc/diskstats'):
@@ -353,7 +353,7 @@
 
 
 @provides("system-ram")
-class SystemCPUSensor(ArraysSensor):
+class SystemRAMSensor(ArraysSensor):
     # return this values or setted in allowed
     ram_fields = ['MemTotal', 'MemFree', 'Buffers', 'Cached', 'SwapCached',
                   'Dirty', 'Writeback', 'SwapTotal', 'SwapFree']
@@ -378,6 +378,20 @@
                 self.add_data("ram", field, int(vals[1]))
 
 
+try:
+    from ceph_daemon import admin_socket
+except ImportError:
+    admin_socket = None
+
+
+def run_ceph_daemon_cmd(cluster, osd_id, *args):
+    asok = "/var/run/ceph/{}-osd.{}.asok".format(cluster, osd_id)
+    if admin_socket:
+        return admin_socket(asok, args)
+    else:
+        return subprocess.check_output("ceph daemon {} {}".format(asok, " ".join(args)), shell=True)
+
+
 @provides("ceph")
 class CephSensor(ArraysSensor):
     def collect(self):
@@ -388,9 +402,7 @@
             return dct[path]
 
         for osd_id in self.params['osds']:
-            asok = '/var/run/ceph/{}-osd.{}.asok'.format(self.params['cluster'], osd_id)
-            out = subprocess.check_output('ceph daemon {} perf dump'.format(asok), shell=True)
-            data = json.loads(out)
+            data = json.loads(run_ceph_daemon_cmd(self.params['cluster'], osd_id, 'perf', 'dump'))
             for key_name in self.params['counters']:
                 self.add_data("osd{}".format(osd_id), key_name.replace("/", "."), get_val(data, key_name))
 
@@ -513,6 +525,8 @@
     if sdata is None:
         raise ValueError("No sensor thread running")
 
+    res = collected_at = None
+
     with sdata.cond:
         if sdata.exception:
             raise Exception(sdata.exception)
diff --git a/wally/ssh.py b/wally/ssh.py
index a577ffb..6bba020 100644
--- a/wally/ssh.py
+++ b/wally/ssh.py
@@ -24,7 +24,7 @@
 
 def set_key_for_node(host_port: IPAddr, key: bytes) -> None:
     with StringIO(key.decode("utf8")) as sio:
-        NODE_KEYS[host_port] = paramiko.RSAKey.from_private_key(sio)
+        NODE_KEYS[host_port] = paramiko.RSAKey.from_private_key(sio)  # type: ignore
 
 
 def connect(creds: ConnCreds,
@@ -63,7 +63,7 @@
                 ssh.connect(creds.addr.host,
                             username=creds.user,
                             timeout=c_tcp_timeout,
-                            pkey=paramiko.RSAKey.from_private_key(creds.key_file, password=SSH_KEY_PASSWD),
+                            pkey=paramiko.RSAKey.from_private_key_file(creds.key_file, password=SSH_KEY_PASSWD),
                             look_for_keys=False,
                             port=creds.addr.port,
                             **banner_timeout_arg)
@@ -72,7 +72,7 @@
                     ssh.connect(creds.addr.host,
                                 username=creds.user,
                                 timeout=c_tcp_timeout,
-                                pkey=paramiko.RSAKey.from_private_key(sio, password=SSH_KEY_PASSWD),
+                                pkey=paramiko.RSAKey.from_private_key(sio, password=SSH_KEY_PASSWD),  # type: ignore
                                 look_for_keys=False,
                                 port=creds.addr.port,
                                 **banner_timeout_arg)
diff --git a/wally/ssh_utils.py b/wally/ssh_utils.py
index 9b3c074..c224bf4 100644
--- a/wally/ssh_utils.py
+++ b/wally/ssh_utils.py
@@ -47,7 +47,7 @@
         uri_reg_exprs.append(templ.format(**re_dct))
 
 
-class ConnCreds(yaml.YAMLObject):
+class ConnCreds(yaml.YAMLObject):  # type: ignore
     yaml_tag = '!ConnCreds'
 
     def __init__(self, host: str, user: str, passwd: str = None, port: str = '22',
diff --git a/wally/storage.py b/wally/storage.py
index a17e3c0..6879dcf 100644
--- a/wally/storage.py
+++ b/wally/storage.py
@@ -5,6 +5,7 @@
 import os
 import abc
 import array
+import shutil
 from typing import Any, Iterator, TypeVar, Type, IO, Tuple, cast, List, Dict, Union, Iterable
 
 
@@ -59,6 +60,10 @@
     def sub_storage(self, path: str) -> 'ISimpleStorage':
         pass
 
+    @abc.abstractmethod
+    def clear(self, path: str) -> None:
+        pass
+
 
 class ISerializer(metaclass=abc.ABCMeta):
     """Interface for serialization class"""
@@ -135,6 +140,10 @@
     def sub_storage(self, path: str) -> 'FSStorage':
         return self.__class__(self.j(path), self.existing)
 
+    def clear(self, path: str) -> None:
+        if os.path.exists(path):
+            shutil.rmtree(self.j(path))
+
 
 class YAMLSerializer(ISerializer):
     """Serialize data to yaml"""
@@ -175,7 +184,6 @@
     def __delitem__(self, path: Union[str, Iterable[str]]) -> None:
         if not isinstance(path, str):
             path = "/".join(path)
-
         del self.storage[path]
 
     def __contains__(self, path: Union[str, Iterable[str]]) -> bool:
@@ -184,14 +192,13 @@
         return path in self.storage
 
     def store_raw(self, val: bytes, *path: str) -> None:
-        if not isinstance(path, str):
-            path = "/".join(path)
-        self.storage[path] = val
+        self.storage["/".join(path)] = val
+
+    def clear(self, *path: str) -> None:
+        self.storage.clear("/".join(path))
 
     def get_raw(self, *path: str) -> bytes:
-        if not isinstance(path, str):
-            path = "/".join(path)
-        return self.storage[path]
+        return self.storage["/".join(path)]
 
     def list(self, *path: str) -> Iterator[Tuple[bool, str]]:
         return self.storage.list("/".join(path))
diff --git a/wally/storage_structure.txt b/wally/storage_structure.txt
index 82ba4e8..307c069 100644
--- a/wally/storage_structure.txt
+++ b/wally/storage_structure.txt
@@ -13,10 +13,10 @@
 info/run_time : float - run unix time
 
 # test results
-result/{id}/info : TestInfo - test information: name, cluster config, test parameters, etc.
-result/{id}/measurement/{node}/{name} : List[float] - measurements data. E.g.:
-    result/{id}/measurement/node-12/iops   - for BW uses iops * block_sz
-    result/{id}/measurement/node-12/lat_histo
+result/{descr}_{id}/info : TestInfo - test information: name, cluster config, test parameters, etc.
+result/{descr}_{id}/measurement/{node}/{name}_raw : bytes - raw log
+result/{descr}_{id}/measurement/{node}/{name}_data - List[uint64] - measurements data.
+result/{descr}_{id}/measurement/{node}/{name}_meta - Dict[str, Any] - measurements metadata.
 
 metric/{node_name}/{dev}/{metric_name} : List[float] - node metrics data. E.g.:
     metric/node-22/cpu/load
diff --git a/wally/suits/io/defaults_qd.cfg b/wally/suits/io/defaults_qd.cfg
index 0418e8a..fcf5c16 100644
--- a/wally/suits/io/defaults_qd.cfg
+++ b/wally/suits/io/defaults_qd.cfg
@@ -19,7 +19,7 @@
 size={FILESIZE}
 
 write_iops_log=fio_iops_log
-write_bw_log=fio_ibw_log
+write_bw_log=fio_bw_log
 log_avg_msec=1000
 write_hist_log=fio_lat_hist_log
 log_hist_msec=1000
diff --git a/wally/suits/io/fio.py b/wally/suits/io/fio.py
index e055d98..ee34af4 100644
--- a/wally/suits/io/fio.py
+++ b/wally/suits/io/fio.py
@@ -1,14 +1,18 @@
+import array
 import os.path
 import logging
-from typing import cast
+from typing import cast, Dict
 
 import wally
 
 from ...utils import StopTestError, get_os, ssize2b
 from ...node_interfaces import IRPCNode
-from ..itest import ThreadedTest, IterationConfig, RunTestRes
-from .fio_task_parser import execution_time, fio_cfg_compile, FioJobSection, FioParams, get_log_files
+from ..itest import ThreadedTest, IterationConfig, NodeTestResults
+from ...result_classes import TimeSerie
+from .fio_task_parser import execution_time, fio_cfg_compile, FioJobSection, FioParams, get_log_files, get_test_summary
 from . import rpc_plugin
+from .fio_hist import expected_lat_bins
+from ...storage import Storage
 
 logger = logging.getLogger("wally")
 
@@ -79,8 +83,8 @@
         self.exec_folder = self.config.remote_dir
 
     def config_node(self, node: IRPCNode) -> None:
-        plugin_code = open(rpc_plugin.__file__.rsplit(".", 1)[0] + ".py", "rb").read()
-        node.upload_plugin(code=plugin_code, name="fio")
+        plugin_code = open(rpc_plugin.__file__.rsplit(".", 1)[0] + ".py", "rb").read()  # type: bytes
+        node.upload_plugin("fio", plugin_code)
 
         try:
             node.conn.fs.rmtree(self.config.remote_dir)
@@ -102,9 +106,6 @@
         if fill_bw is not None:
             logger.info("Initial fio fill bw is {} MiBps for {}".format(fill_bw, node))
 
-        fio_config = "\n".join(map(str, self.iterations_configs))
-        node.put_to_file(self.remote_task_file, fio_config.encode("utf8"))
-
     def install_utils(self, node: IRPCNode) -> None:
         os_info = get_os(node)
         if self.use_system_fio:
@@ -131,22 +132,97 @@
     def get_expected_runtime(self, iteration_info: IterationConfig) -> int:
         return execution_time(cast(FioJobSection, iteration_info))
 
-    def do_test(self, node: IRPCNode, iter_config: IterationConfig) -> RunTestRes:
+    def prepare_iteration(self, node: IRPCNode, iter_config: IterationConfig) -> None:
+        node.put_to_file(self.remote_task_file, str(iter_config).encode("utf8"))
+
+    # TODO: get a link to substorage as a parameter
+    def run_iteration(self, node: IRPCNode, iter_config: IterationConfig, substorage: Storage) -> NodeTestResults:
         exec_time = execution_time(cast(FioJobSection, iter_config))
+
         fio_cmd_templ = "cd {exec_folder}; " + \
                         "{fio_path} --output-format=json --output={out_file} --alloc-size=262144 {job_file}"
 
-        bw_log, iops_log, lat_hist_log = get_log_files(iter_config)
-
         cmd = fio_cmd_templ.format(exec_folder=self.exec_folder,
                                    fio_path=self.fio_path,
                                    out_file=self.remote_output_file,
                                    job_file=self.remote_task_file)
-        raw_res = node.run(cmd, timeout=exec_time + max(300, exec_time))
-        
-        return
+        must_be_empty = node.run(cmd, timeout=exec_time + max(300, exec_time), check_timeout=1).strip()
 
-        # TODO(koder): fix next error
-        # raise NotImplementedError("Need to extract time from test result")
-        # return raw_res, (0, 0)
+        if must_be_empty:
+            logger.error("Unexpected fio output: %r", must_be_empty)
 
+        res = NodeTestResults(self.__class__.__name__,
+                              node.info.node_id(),
+                              get_test_summary(cast(FioJobSection, iter_config)))
+
+        res.extra_logs['fio'] = node.get_file_content(self.remote_output_file)
+        substorage.store_raw(res.extra_logs['fio'], "fio_raw")
+        node.conn.fs.unlink(self.remote_output_file)
+
+        files = [name for name in node.conn.fs.listdir(self.exec_folder)]
+
+        expected_time_delta = 1000  # 1000ms == 1s
+        max_time_diff = 50  # 50ms - 5%
+
+        for name, path in get_log_files(cast(FioJobSection, iter_config)):
+            log_files = [fname for fname in files if fname.startswith(path)]
+            if len(log_files) != 1:
+                logger.error("Found %s files, match log pattern %s(%s) - %s",
+                             len(log_files), path, name, ",".join(log_files[10:]))
+                raise StopTestError()
+
+            fname = os.path.join(self.exec_folder, log_files[0])
+            raw_result = node.get_file_content(fname)  # type: bytes
+            substorage.store_raw(raw_result, "{}_raw".format(name))
+            node.conn.fs.unlink(fname)
+
+            try:
+                log_data = raw_result.decode("utf8").split("\n")
+            except UnicodeEncodeError:
+                logger.exception("Error during parse %s fio log file - can't decode usint UTF8", name)
+                raise StopTestError()
+
+            parsed = array.array('L' if name == 'lat' else 'Q')
+            prev_ts = None
+            load_start_at = None
+
+            # TODO: need to adjust vals for timedelta
+            for idx, line in enumerate(log_data):
+                line = line.strip()
+                if line:
+                    try:
+                        time_ms_s, val_s, _, *rest = line.split(",")
+                        time_ms = int(time_ms_s.strip())
+                        if prev_ts and abs(time_ms - prev_ts - expected_time_delta) > max_time_diff:
+                            logger.warning("Too large gap in {} log at {} - {}ms"
+                                           .format(time_ms, name, time_ms - prev_ts))
+                        else:
+                            prev_ts = time_ms - expected_time_delta
+                            load_start_at = time_ms
+                        if name == 'lat':
+                            vals = [int(i.strip()) for i in rest]
+
+                            if len(vals) != expected_lat_bins:
+                                logger.error("Expect {} bins in latency histogram, but found {} at time {}"
+                                             .format(expected_lat_bins, len(vals), time_ms_s))
+                                raise StopTestError()
+
+                            parsed.extend(vals)
+                        else:
+                            parsed.append(int(val_s.strip()))
+                    except ValueError:
+                        logger.exception("Error during parse %s fio log file in line %s: %r", name, idx, line)
+                        raise StopTestError()
+                    prev_ts += expected_time_delta
+
+            res.series[name] = TimeSerie(name=name,
+                                         raw=raw_result,
+                                         second_axis_size=expected_lat_bins if name == 'lat' else 1,
+                                         start_at=load_start_at,
+                                         step=expected_time_delta,
+                                         data=parsed)
+
+            substorage.set_array(parsed, "{}_data".format(name))  # type: ignore
+            substorage["{}_meta".format(name)] = res.series[name].meta()  # type: ignore
+
+        return res
diff --git a/wally/suits/io/fio_hist.py b/wally/suits/io/fio_hist.py
new file mode 100644
index 0000000..eb5d9ee
--- /dev/null
+++ b/wally/suits/io/fio_hist.py
@@ -0,0 +1,57 @@
+from typing import List
+
+
+expected_lat_bins = 1216
+
+
+#----------------------------  FIO HIST LOG PARSE CODE -----------------------------------------------------------------
+
+# Copy-paste from fio/tools/hist/fiologparser_hist.py.
+# Because that's impossible to understand or improve,
+# you can only copy such a pearl.
+
+def _plat_idx_to_val(idx: int , edge: float = 0.5, FIO_IO_U_PLAT_BITS: int = 6, FIO_IO_U_PLAT_VAL: int = 64) -> float:
+    """ Taken from fio's stat.c for calculating the latency value of a bin
+        from that bin's index.
+
+            idx  : the value of the index into the histogram bins
+            edge : fractional value in the range [0,1]** indicating how far into
+            the bin we wish to compute the latency value of.
+
+        ** edge = 0.0 and 1.0 computes the lower and upper latency bounds
+           respectively of the given bin index. """
+
+    # MSB <= (FIO_IO_U_PLAT_BITS-1), cannot be rounded off. Use
+    # all bits of the sample as index
+    if (idx < (FIO_IO_U_PLAT_VAL << 1)):
+        return idx
+
+    # Find the group and compute the minimum value of that group
+    error_bits = (idx >> FIO_IO_U_PLAT_BITS) - 1
+    base = 1 << (error_bits + FIO_IO_U_PLAT_BITS)
+
+    # Find its bucket number of the group
+    k = idx % FIO_IO_U_PLAT_VAL
+
+    # Return the mean (if edge=0.5) of the range of the bucket
+    return base + ((k + edge) * (1 << error_bits))
+
+
+def plat_idx_to_val_coarse(idx: int, coarseness: int, edge: float = 0.5) -> float:
+    """ Converts the given *coarse* index into a non-coarse index as used by fio
+        in stat.h:plat_idx_to_val(), subsequently computing the appropriate
+        latency value for that bin.
+        """
+
+    # Multiply the index by the power of 2 coarseness to get the bin
+    # bin index with a max of 1536 bins (FIO_IO_U_PLAT_GROUP_NR = 24 in stat.h)
+    stride = 1 << coarseness
+    idx = idx * stride
+    lower = _plat_idx_to_val(idx, edge=0.0)
+    upper = _plat_idx_to_val(idx + stride, edge=1.0)
+    return lower + (upper - lower) * edge
+
+
+def get_lat_vals(columns: int = 1216, coarseness: int = 0) -> List[float]:
+    return [plat_idx_to_val_coarse(val, coarseness) for val in range(columns)]
+
diff --git a/wally/suits/io/fio_task_parser.py b/wally/suits/io/fio_task_parser.py
index aaf4b36..8390e3a 100644
--- a/wally/suits/io/fio_task_parser.py
+++ b/wally/suits/io/fio_task_parser.py
@@ -41,6 +41,7 @@
     def __init__(self, name: str) -> None:
         self.name = name
         self.vals = OrderedDict()  # type: Dict[str, Any]
+        self.summary = None
 
     def copy(self) -> 'FioJobSection':
         return copy.deepcopy(self)
@@ -398,15 +399,22 @@
             yield res
 
 
-def get_log_files(sec: FioJobSection) -> Tuple[Optional[str], Optional[str], Optional[str]]:
-    return sec.vals.get('write_iops_log'), sec.vals.get('write_bw_log'), sec.vals.get('write_hist_log')
+def get_log_files(sec: FioJobSection) -> List[Tuple[str, str]]:
+    res = []  # type: List[Tuple[str, str]]
+    for key, name in (('write_iops_log', 'iops'), ('write_bw_log', 'bw'), ('write_hist_log', 'lat')):
+        log = sec.vals.get(key)
+        if log is not None:
+            res.append((name, log))
+    return res
 
 
 def fio_cfg_compile(source: str, fname: str, test_params: FioParams) -> Iterator[FioJobSection]:
     it = parse_all_in_1(source, fname)
     it = (apply_params(sec, test_params) for sec in it)
     it = flatmap(process_cycles, it)
-    return map(final_process, it)
+    for sec in map(final_process, it):
+        sec.summary = get_test_summary(sec)
+        yield sec
 
 
 def parse_args(argv):
diff --git a/wally/suits/io/rpc_plugin.py b/wally/suits/io/rpc_plugin.py
index 306af28..98e55f0 100644
--- a/wally/suits/io/rpc_plugin.py
+++ b/wally/suits/io/rpc_plugin.py
@@ -11,7 +11,6 @@
 
 
 logger = logging.getLogger("agent.fio")
-SensorsMap = {}
 
 
 def check_file_prefilled(path, used_size_mb):
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index f328e13..aae475c 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -5,12 +5,12 @@
 import datetime
 from typing import Dict, Any, List, Optional, Tuple, cast
 
-from concurrent.futures import ThreadPoolExecutor
+from concurrent.futures import ThreadPoolExecutor, wait
 
 from ..utils import Barrier, StopTestError, sec_to_str
 from ..node_interfaces import IRPCNode
 from ..storage import Storage
-from ..result_classes import RawTestResults
+from ..result_classes import NodeTestResults, TimeSerie
 
 
 logger = logging.getLogger("wally")
@@ -47,6 +47,7 @@
 
 class IterationConfig:
     name = None  # type: str
+    summary = None  # type: str
 
 
 class PerfTest:
@@ -76,9 +77,6 @@
         pass
 
 
-RunTestRes = Tuple[RawTestResults, Tuple[int, int]]
-
-
 class ThreadedTest(PerfTest, metaclass=abc.ABCMeta):
     """Base class for tests, which spawn separated thread for each node"""
 
@@ -96,11 +94,13 @@
         pass
 
     def get_not_done_stages(self, storage: Storage) -> Dict[int, IterationConfig]:
-        done_stages = list(storage.list('result'))
+        done_stages = [int(name_id.split("_")[1])
+                       for is_leaf, name_id in storage.list('result')
+                       if not is_leaf]
         if len(done_stages) == 0:
             start_run_id = 0
         else:
-            start_run_id = max(int(name) for _, name in done_stages) + 1
+            start_run_id = max(done_stages) + 1
 
         not_in_storage = {}  # type: Dict[int, IterationConfig]
 
@@ -143,10 +143,7 @@
             return
 
         logger.debug("Run test {} on nodes {}.".format(self.name, ",".join(self.sorted_nodes_ids)))
-
-        barrier = Barrier(len(self.nodes))
-
-        logger.debug("Run preparation")
+        logger.debug("Prepare nodes")
 
         with ThreadPoolExecutor(len(self.nodes)) as pool:
             list(pool.map(self.config_node, self.nodes))
@@ -163,31 +160,50 @@
 
             for run_id, iteration_config in sorted(not_in_storage.items()):
                 iter_name = "Unnamed" if iteration_config is None else iteration_config.name
-                logger.info("Run test iteration {} ".format(iter_name))
+                logger.info("Run test iteration %s", iter_name)
 
-                results = []  # type: List[RunTestRes]
+                results = []  # type: List[NodeTestResults]
                 for idx in range(self.max_retry):
-                    barrier.wait()
-                    try:
-                        futures = [pool.submit(self.do_test, node, iteration_config) for node in self.nodes]
-                        results = [fut.result() for fut in futures]
-                    except EnvironmentError as exc:
-                        if self.max_retry - 1 == idx:
-                            raise StopTestError("Fio failed") from exc
-                        logger.exception("During fio run")
+                    logger.debug("Prepare iteration %s", iter_name)
 
-                    logger.info("Sleeping %ss and retrying", self.retry_time)
-                    time.sleep(self.retry_time)
+                    # prepare nodes for new iterations
+                    futures = [pool.submit(self.prepare_iteration, node, iteration_config) for node in self.nodes]
+                    wait(futures)
+
+                    # run iteration
+                    logger.debug("Run iteration %s", iter_name)
+                    try:
+                        futures = []
+                        for node in self.nodes:
+                            path = "result/{}_{}/measurement/{}".format(iteration_config.summary,
+                                                                        run_id,
+                                                                        node.info.node_id())
+                            self.config.storage.clear(path)
+                            mstorage = self.config.storage.sub_storage(path)
+                            futures.append(pool.submit(self.run_iteration, node, iteration_config, mstorage))
+
+                        results = [fut.result() for fut in futures]
+                        break
+                    except EnvironmentError:
+                        if self.max_retry - 1 == idx:
+                            logger.exception("Fio failed")
+                            raise StopTestError()
+                        logger.exception("During fio run")
+                        logger.info("Sleeping %ss and retrying", self.retry_time)
+                        time.sleep(self.retry_time)
 
                 start_times = []  # type: List[int]
                 stop_times = []  # type: List[int]
 
-                mstorage = self.config.storage.sub_storage("result", str(run_id), "measurement")
-                for (result, (t_start, t_stop)), node in zip(results, self.config.nodes):
-                    for metrics_name, data in result.items():
-                        mstorage[node.info.node_id(), metrics_name] = data  # type: ignore
-                    start_times.append(t_start)
-                    stop_times.append(t_stop)
+                # TODO: FIX result processing - NodeTestResults
+                result = None  # type: NodeTestResults
+                for result in results:
+                    mstorage = self.config.storage.sub_storage("result/{}_{}/measurement/{}"
+                                                               .format(result.summary, run_id, result.node_id))
+                    serie = None   # type: TimeSerie
+                    for name, serie in result.series.items():
+                        start_times.append(serie.start_at)
+                        stop_times.append(serie.step * len(serie.data))
 
                 min_start_time = min(start_times)
                 max_start_time = max(start_times)
@@ -222,7 +238,11 @@
         pass
 
     @abc.abstractmethod
-    def do_test(self, node: IRPCNode, iter_config: IterationConfig) -> RunTestRes:
+    def prepare_iteration(self, node: IRPCNode, iter_config: IterationConfig) -> None:
+        pass
+
+    @abc.abstractmethod
+    def run_iteration(self, node: IRPCNode, iter_config: IterationConfig, substorage: Storage) -> NodeTestResults:
         pass
 
 
@@ -246,15 +266,16 @@
         cmd += ' ' + self.config.params.get('prerun_opts', '')
         node.run(cmd, timeout=self.prerun_tout)
 
-    def do_test(self, node: IRPCNode, iter_config: IterationConfig) -> RunTestRes:
+    def prepare_iteration(self, node: IRPCNode, iter_config: IterationConfig) -> None:
+        pass
+
+    def run_iteration(self, node: IRPCNode, iter_config: IterationConfig, substorage: Storage) -> NodeTestResults:
+        # TODO: have to store logs
         cmd = self.join_remote(self.run_script)
         cmd += ' ' + self.config.params.get('run_opts', '')
-        t1 = time.time()
-        res = self.parse_results(node.run(cmd, timeout=self.run_tout))
-        t2 = time.time()
-        return res, (int(t1), int(t2))
+        return self.parse_results(node.run(cmd, timeout=self.run_tout))
 
     @abc.abstractmethod
-    def parse_results(self, data: str) -> RawTestResults:
+    def parse_results(self, data: str) -> NodeTestResults:
         pass