resume working
diff --git a/wally/common_types.py b/wally/common_types.py
index 884cd44..ef809ee 100644
--- a/wally/common_types.py
+++ b/wally/common_types.py
@@ -1,4 +1,5 @@
-from typing import NamedTuple
+import abc
+from typing import NamedTuple, Dict, Any
 
 IP = str
 IPAddr = NamedTuple("IPAddr", [("host", IP), ("port", int)])
diff --git a/wally/config.py b/wally/config.py
index 46669f0..cb47567 100644
--- a/wally/config.py
+++ b/wally/config.py
@@ -1,10 +1,13 @@
 from typing import Any, Dict, Optional
-from .storage import IStorable
+from .result_classes import IStorable
 
-ConfigBlock = Any
+
+ConfigBlock = Dict[str, Any]
 
 
 class Config(IStorable):
+    yaml_tag = 'config'
+
     def __init__(self, dct: ConfigBlock) -> None:
         # make mypy happy, set fake dict
         self.__dict__['_dct'] = {}
@@ -25,17 +28,21 @@
         # None, disabled, enabled, metadata, ignore_errors
         self.discovery = None  # type: Optional[str]
 
-        self.logging = None  # type: ConfigBlock
-        self.ceph = None  # type: ConfigBlock
-        self.openstack = None  # type: ConfigBlock
-        self.fuel = None  # type: ConfigBlock
-        self.test = None  # type: ConfigBlock
-        self.sensors = None  # type: ConfigBlock
+        self.logging = None  # type: 'Config'
+        self.ceph = None  # type: 'Config'
+        self.openstack = None  # type: 'Config'
+        self.fuel = None  # type: 'Config'
+        self.test = None  # type: 'Config'
+        self.sensors = None  # type: 'Config'
 
         self._dct.clear()
         self._dct.update(dct)
 
-    def raw(self) -> ConfigBlock:
+    @classmethod
+    def fromraw(cls, data: Dict[str, Any]) -> 'Config':
+        return cls(data)
+
+    def raw(self) -> Dict[str, Any]:
         return self._dct
 
     def get(self, path: str, default: Any = None) -> Any:
diff --git a/wally/fuel.py b/wally/fuel.py
index 902752a..37a5f5e 100644
--- a/wally/fuel.py
+++ b/wally/fuel.py
@@ -42,47 +42,49 @@
             logger.info("Skip FUEL discovery due to config setting")
             return
 
-        if 'all_nodes' in ctx.storage:
-            logger.debug("Skip FUEL discovery, use previously discovered nodes")
-            ctx.fuel_openstack_creds = ctx.storage['fuel_os_creds']  # type: ignore
-            ctx.fuel_version = ctx.storage['fuel_version']  # type: ignore
-            return
+        if "fuel_os_creds" in ctx.storage and 'fuel_version' in ctx.storage:
+            logger.debug("Skip FUEL credentials discovery, use previously discovered info")
+            ctx.fuel_openstack_creds = OSCreds(*cast(List, ctx.storage.get('fuel_os_creds')))
+            ctx.fuel_version = ctx.storage.get('fuel_version')
+            if 'all_nodes' in ctx.storage:
+                logger.debug("Skip FUEL nodes discovery, use data from DB")
+                return
+            elif discovery == 'metadata':
+                logger.debug("Skip FUEL nodes  discovery due to discovery settings")
+                return
 
         fuel = ctx.config.fuel
         fuel_node_info = ctx.merge_node(fuel.ssh_creds, {'fuel_master'})
         creds = dict(zip(("user", "passwd", "tenant"), parse_creds(fuel.creds)))
         fuel_conn = KeystoneAuth(fuel.url, creds)
 
-        # get cluster information from REST API
-        if "fuel_os_creds" in ctx.storage and 'fuel_version' in ctx.storage:
-            ctx.fuel_openstack_creds = ctx.storage['fuel_os_creds']  # type: ignore
-            ctx.fuel_version = ctx.storage['fuel_version']  # type: ignore
-            return
-
         cluster_id = get_cluster_id(fuel_conn, fuel.openstack_env)
         cluster = reflect_cluster(fuel_conn, cluster_id)
-        ctx.fuel_version = FuelInfo(fuel_conn).get_version()
-        ctx.storage["fuel_version"] = ctx.fuel_version
 
-        logger.info("Found FUEL {0}".format(".".join(map(str, ctx.fuel_version))))
-        openrc = cluster.get_openrc()
+        if ctx.fuel_version is None:
+            ctx.fuel_version = FuelInfo(fuel_conn).get_version()
+            ctx.storage.put(ctx.fuel_version, "fuel_version")
 
-        if openrc:
-            auth_url = cast(str, openrc['os_auth_url'])
-            if ctx.fuel_version >= [8, 0] and auth_url.startswith("https://"):
-                logger.warning("Fixing FUEL 8.0 AUTH url - replace https://->http://")
-                auth_url = auth_url.replace("https", "http", 1)
+            logger.info("Found FUEL {0}".format(".".join(map(str, ctx.fuel_version))))
+            openrc = cluster.get_openrc()
 
-            os_creds = OSCreds(name=cast(str, openrc['username']),
-                               passwd=cast(str, openrc['password']),
-                               tenant=cast(str, openrc['tenant_name']),
-                               auth_url=cast(str, auth_url),
-                               insecure=cast(bool, openrc['insecure']))
+            if openrc:
+                auth_url = cast(str, openrc['os_auth_url'])
+                if ctx.fuel_version >= [8, 0] and auth_url.startswith("https://"):
+                    logger.warning("Fixing FUEL 8.0 AUTH url - replace https://->http://")
+                    auth_url = auth_url.replace("https", "http", 1)
 
-            ctx.fuel_openstack_creds = os_creds
-        else:
-            ctx.fuel_openstack_creds = None
-        ctx.storage["fuel_os_creds"] = ctx.fuel_openstack_creds
+                os_creds = OSCreds(name=cast(str, openrc['username']),
+                                   passwd=cast(str, openrc['password']),
+                                   tenant=cast(str, openrc['tenant_name']),
+                                   auth_url=cast(str, auth_url),
+                                   insecure=cast(bool, openrc['insecure']))
+
+                ctx.fuel_openstack_creds = os_creds
+            else:
+                ctx.fuel_openstack_creds = None
+
+            ctx.storage.put(list(ctx.fuel_openstack_creds), "fuel_os_creds")
 
         if discovery == 'metadata':
             logger.debug("Skip FUEL nodes  discovery due to discovery settings")
diff --git a/wally/hw_info.py b/wally/hw_info.py
index e81a5c1..aa53f8e 100644
--- a/wally/hw_info.py
+++ b/wally/hw_info.py
@@ -107,6 +107,11 @@
         return str(self.hostname) + ":\n" + "\n".join("    " + i for i in res)
 
 
+class CephInfo:
+    def __init__(self) -> None:
+        pass
+
+
 class SWInfo:
     def __init__(self) -> None:
         self.mtab = None  # type: str
@@ -114,7 +119,12 @@
         self.libvirt_version = None  # type: Optional[str]
         self.qemu_version = None  # type: Optional[str]
         self.OS_version = None  # type: utils.OSRelease
-        self.ceph_version = None  # type: Optional[str]
+        self.ceph_info = None  # type: Optional[CephInfo]
+
+
+def get_ceph_services_info(node: IRPCNode) -> CephInfo:
+    # TODO: use ceph-monitoring module
+    return CephInfo()
 
 
 def get_sw_info(node: IRPCNode) -> SWInfo:
@@ -129,21 +139,17 @@
     except OSError:
         res.libvirt_version = None
 
-    # try:
-    #     # dpkg -l ??
-    #     res.libvirt_version = node.run("virsh -v", nolog=True).strip()
-    # except OSError:
-    #     res.libvirt_version = None
+    # dpkg -l ??
 
     try:
         res.qemu_version = node.run("qemu-system-x86_64 --version", nolog=True).strip()
     except OSError:
         res.qemu_version = None
 
-    try:
-        res.ceph_version = node.run("ceph --version", nolog=True).strip()
-    except OSError:
-        res.ceph_version = None
+    for role in ('ceph-osd', 'ceph-mon', 'ceph-mds'):
+        if role in node.info.roles:
+            res.ceph_info = get_ceph_services_info(node)
+            break
 
     return res
 
diff --git a/wally/main.py b/wally/main.py
index 6396b6b..16d884f 100644
--- a/wally/main.py
+++ b/wally/main.py
@@ -1,6 +1,7 @@
 import os
 import time
 import signal
+import pprint
 import logging
 import argparse
 import functools
@@ -41,7 +42,8 @@
 from .openstack import DiscoverOSStage
 from .fuel import DiscoverFuelStage
 from .run_test import (CollectInfoStage, ExplicitNodesStage, SaveNodesStage,
-                       RunTestsStage, ConnectStage, SleepStage, PrepareNodes)
+                       RunTestsStage, ConnectStage, SleepStage, PrepareNodes,
+                       LoadStoredNodesStage)
 from .report import ConsoleReportStage, HtmlReportStage
 from .sensors import StartSensorsStage, CollectSensorsStage
 
@@ -72,9 +74,9 @@
         except Exception as exc:
             logger.warning("Can't load folder {}. Error {}".format(full_path, exc))
 
-        comment = cast(str, stor['info/comment'])
-        run_uuid = cast(str, stor['info/run_uuid'])
-        run_time = cast(float, stor['info/run_time'])
+        comment = cast(str, stor.get('info/comment'))
+        run_uuid = cast(str, stor.get('info/run_uuid'))
+        run_time = cast(float, stor.get('info/run_time'))
         test_types = ""
         results.append((run_time,
                         run_uuid,
@@ -135,6 +137,12 @@
     test_parser = subparsers.add_parser('resume', help='resume tests')
     test_parser.add_argument("storage_dir", help="Path to test directory")
 
+    # ---------------------------------------------------------------------
+    test_parser = subparsers.add_parser('db', help='resume tests')
+    test_parser.add_argument("cmd", choices=("show",), help="Command to execute")
+    test_parser.add_argument("params", nargs='*', help="Command params")
+    test_parser.add_argument("storage_dir", help="Storage path")
+
     return parser.parse_args(argv[1:])
 
 
@@ -182,6 +190,19 @@
     return Config(cfg_dict)
 
 
+def get_run_stages() -> List[Stage]:
+    return [DiscoverCephStage(),
+            DiscoverOSStage(),
+            DiscoverFuelStage(),
+            ExplicitNodesStage(),
+            StartSensorsStage(),
+            RunTestsStage(),
+            CollectSensorsStage(),
+            ConnectStage(),
+            SleepStage(),
+            PrepareNodes()]
+
+
 def main(argv: List[str]) -> int:
     if faulthandler is not None:
         faulthandler.register(signal.SIGUSR1, all_threads=True)
@@ -195,7 +216,6 @@
 
     if opts.subparser_name == 'test':
         config = load_config(opts.config_file)
-
         config.storage_url, config.run_uuid = utils.get_uniq_path_uuid(config.results_dir)
         config.comment = opts.comment
         config.keep_vm = opts.keep_vm
@@ -207,20 +227,9 @@
         config.settings_dir = get_config_path(config, opts.settings_dir)
 
         storage = make_storage(config.storage_url)
-
-        storage['config'] = config  # type: ignore
-
-        stages.append(DiscoverCephStage())
-        stages.append(DiscoverOSStage())
-        stages.append(DiscoverFuelStage())
-        stages.append(ExplicitNodesStage())
-        stages.append(SaveNodesStage())
-        stages.append(StartSensorsStage())
-        stages.append(RunTestsStage())
-        stages.append(CollectSensorsStage())
-        stages.append(ConnectStage())
-        stages.append(SleepStage())
-        stages.append(PrepareNodes())
+        storage.put(config, 'config')
+        stages.extend(get_run_stages())
+        stages.extend(SaveNodesStage())
 
         if not opts.dont_collect:
             stages.append(CollectInfoStage())
@@ -229,15 +238,21 @@
         if '--ssh-key-passwd' in argv2:
             # don't save ssh key password to storage
             argv2[argv2.index("--ssh-key-passwd") + 1] = "<removed from output>"
-        storage['cli'] = argv2
+        storage.put(argv2, 'cli')
 
     elif opts.subparser_name == 'resume':
+        opts.resumed = True
         storage = make_storage(opts.storage_dir, existing=True)
         config = storage.load(Config, 'config')
-        # TODO: fix this
-        # TODO: add node loading from storage
-        # TODO: fill nodes conncreds with keys
-        raise NotImplementedError("Resume in not fully implemented")
+        stages.extend(get_run_stages())
+        stages.append(LoadStoredNodesStage())
+        prev_opts = storage.get('cli')
+        if '--ssh-key-passwd' in prev_opts and opts.ssh_key_passwd:
+            prev_opts[prev_opts.index("--ssh-key-passwd") + 1] = opts.ssh_key_passwd
+
+        restored_opts = parse_args(prev_opts)
+        opts.__dict__.update(restored_opts.__dict__)
+        opts.subparser_name = 'resume'
 
     elif opts.subparser_name == 'ls':
         tab = texttable.Texttable(max_width=200)
@@ -259,6 +274,19 @@
         #     [x['io'][0], y['io'][0]]))
         return 0
 
+    elif opts.subparser_name == 'db':
+        storage = make_storage(opts.storage_dir, existing=True)
+        if opts.cmd == 'show':
+            if len(opts.params) != 1:
+                print("'show' command requires parameter - key to show")
+                return 1
+            pprint.pprint(storage.get(opts.params[0]))
+        else:
+            print("Unknown/not_implemented command {!r}".format(opts.cmd))
+            return 1
+        return 0
+
+
     report_stages = []  # type: List[Stage]
     if not getattr(opts, "no_report", False):
         report_stages.append(ConsoleReportStage())
@@ -276,7 +304,7 @@
         log_config_file = find_cfg_file(log_config_file, opts.config_file)
 
     setup_loggers(getattr(logging, str_level),
-                  log_fd=storage.get_stream('log', "w"),
+                  log_fd=storage.get_fd('log', "w"),
                   config_file=log_config_file)
 
     logger.info("All info would be stored into %r", config.storage_url)
@@ -292,6 +320,7 @@
     # TODO: run only stages, which have config
     failed = False
     cleanup_stages = []
+
     for stage in stages:
         if stage.config_block is not None:
             if stage.config_block not in ctx.config:
@@ -304,6 +333,8 @@
         except (Exception, KeyboardInterrupt):
             failed = True
             break
+        ctx.storage.sync()
+    ctx.storage.sync()
 
     logger.debug("Start cleanup")
     cleanup_failed = False
@@ -313,12 +344,15 @@
                 stage.cleanup(ctx)
         except:
             cleanup_failed = True
+        ctx.storage.sync()
 
     if not failed:
         for report_stage in report_stages:
             with log_stage(report_stage):
                 report_stage.run(ctx)
 
+    ctx.storage.sync()
+
     logger.info("All info is stored into %r", config.storage_url)
 
     if failed or cleanup_failed:
diff --git a/wally/node.py b/wally/node.py
index 6624bdc..e85ded0 100644
--- a/wally/node.py
+++ b/wally/node.py
@@ -163,9 +163,11 @@
     def __repr__(self) -> str:
         return str(self)
 
-    def get_file_content(self, path: str) -> bytes:
+    def get_file_content(self, path: str, expanduser: bool = False) -> bytes:
         logger.debug("GET %s from %s", path, self.info)
-        res = self.conn.fs.get_file(self.conn.fs.expanduser(path))
+        if expanduser:
+            path = self.conn.fs.expanduser(path)
+        res = self.conn.fs.get_file(path)
         logger.debug("Download %s bytes from remote file %s from %s", len(res), path, self.info)
         return res
 
@@ -190,14 +192,21 @@
 
         return out
 
-    def copy_file(self, local_path: str, remote_path: str = None) -> str:
+    def copy_file(self, local_path: str, remote_path: str = None, expanduser: bool = False) -> str:
+        if expanduser:
+            remote_path = self.conn.fs.expanduser(remote_path)
+
         data = open(local_path, 'rb').read()
         return self.put_to_file(remote_path, data)
 
-    def put_to_file(self, path: Optional[str], content: bytes) -> str:
+    def put_to_file(self, path: Optional[str], content: bytes, expanduser: bool = False) -> str:
+        if expanduser:
+            path = self.conn.fs.expanduser(path)
         return self.conn.fs.store_file(path, content)
 
-    def stat_file(self, path: str) -> Dict[str, int]:
+    def stat_file(self, path: str, expanduser: bool = False) -> Dict[str, int]:
+        if expanduser:
+            path = self.conn.fs.expanduser(path)
         return self.conn.fs.file_stat(path)
 
     def __exit__(self, x, y, z) -> bool:
diff --git a/wally/node_interfaces.py b/wally/node_interfaces.py
index caca8cc..d0fabac 100644
--- a/wally/node_interfaces.py
+++ b/wally/node_interfaces.py
@@ -1,20 +1,25 @@
 import abc
-from typing import Any, Set, Optional, Dict, NamedTuple, Optional
+from typing import Any, Set, Dict, NamedTuple, Optional
 from .ssh_utils import ConnCreds
 from .common_types import IPAddr
+from .result_classes import IStorable
 
 
 RPCCreds = NamedTuple("RPCCreds", [("addr", IPAddr), ("key_file", str), ("cert_file", str)])
 
 
-class NodeInfo:
+class NodeInfo(IStorable):
     """Node information object, result of discovery process or config parsing"""
-    def __init__(self, ssh_creds: ConnCreds, roles: Set[str], params: Dict[str, Any] = None) -> None:
 
+    yaml_tag = 'node_info'
+
+    def __init__(self, ssh_creds: ConnCreds, roles: Set[str], params: Dict[str, Any] = None) -> None:
         # ssh credentials
         self.ssh_creds = ssh_creds
+
         # credentials for RPC connection
         self.rpc_creds = None  # type: Optional[RPCCreds]
+
         self.roles = roles
         self.os_vm_id = None  # type: Optional[int]
         self.params = {}  # type: Dict[str, Any]
@@ -30,6 +35,28 @@
     def __repr__(self) -> str:
         return str(self)
 
+    def raw(self) -> Dict[str, Any]:
+        dct = self.__dict__.copy()
+
+        if self.rpc_creds is not None:
+            dct['rpc_creds'] = list(self.rpc_creds)
+
+        dct['ssh_creds'] = self.ssh_creds.raw()
+        dct['roles'] = list(self.roles)
+        return dct
+
+    @classmethod
+    def fromraw(cls, data: Dict[str, Any]) -> 'NodeInfo':
+        data = data.copy()
+        if data['rpc_creds'] is not None:
+            data['rpc_creds'] = RPCCreds(*data['rpc_creds'])
+
+        data['ssh_creds'] = ConnCreds.fromraw(data['ssh_creds'])
+        data['roles'] = set(data['roles'])
+        obj = cls.__new__(cls)
+        obj.__dict__.update(data)
+        return obj
+
 
 class ISSHHost(metaclass=abc.ABCMeta):
     """Minimal interface, required to setup RPC connection"""
diff --git a/wally/openstack.py b/wally/openstack.py
index 3b5ced5..264cfbb 100644
--- a/wally/openstack.py
+++ b/wally/openstack.py
@@ -36,10 +36,10 @@
 
 def get_OS_credentials(ctx: TestRun) -> OSCreds:
     if "openstack_openrc" in ctx.storage:
-        return ctx.storage.load(OSCreds, "openstack_openrc")
+        return OSCreds(*cast(List, ctx.storage.get("openstack_openrc")))
 
-    creds = None
-    os_creds = None
+    creds = None  # type: OSCreds
+    os_creds = None  # type: OSCreds
     force_insecure = False
     cfg = ctx.config
 
@@ -80,7 +80,7 @@
     logger.debug(("OS_CREDS: user={0.name} tenant={0.tenant} " +
                   "auth_url={0.auth_url} insecure={0.insecure}").format(creds))
 
-    ctx.storage["openstack_openrc"] = creds  # type: ignore
+    ctx.storage.put(list(creds), "openstack_openrc")
     return creds
 
 
@@ -169,7 +169,7 @@
 
     def run(self, ctx: TestRun) -> None:
         if 'all_nodes' in ctx.storage:
-            ctx.os_spawned_nodes_ids = ctx.storage['os_spawned_nodes_ids']  # type: ignore
+            ctx.os_spawned_nodes_ids = ctx.storage.get('os_spawned_nodes_ids')
             logger.info("Skipping OS VMS discovery/spawn as all data found in storage")
             return
 
@@ -205,7 +205,7 @@
                 ctx.nodes_info[nid] = info
                 ctx.os_spawned_nodes_ids.append(info.os_vm_id)
 
-        ctx.storage['os_spawned_nodes_ids'] = ctx.os_spawned_nodes_ids  # type: ignore
+        ctx.storage.put(ctx.os_spawned_nodes_ids, 'os_spawned_nodes_ids')
 
     def cleanup(self, ctx: TestRun) -> None:
         # keep nodes in case of error for future test restart
@@ -213,7 +213,7 @@
             logger.info("Removing nodes")
 
             clear_nodes(ctx.os_connection, ctx.os_spawned_nodes_ids)
-            del ctx.storage['spawned_os_nodes']
+            ctx.storage.rm('spawned_os_nodes')
 
             logger.info("OS spawned nodes has been successfully removed")
 
diff --git a/wally/process_results.py b/wally/process_results.py
new file mode 100644
index 0000000..f01b14e
--- /dev/null
+++ b/wally/process_results.py
@@ -0,0 +1,2 @@
+# put all result preprocessing here
+# selection, aggregation
diff --git a/wally/report.py b/wally/report.py
index d2bef7e..0c96280 100644
--- a/wally/report.py
+++ b/wally/report.py
@@ -1,11 +1,12 @@
 import os
 import csv
+import abc
 import bisect
 import logging
 import itertools
 import collections
 from io import StringIO
-from typing import Dict, Any, Iterator, Tuple, cast
+from typing import Dict, Any, Iterator, Tuple, cast, List
 
 try:
     import numpy
@@ -18,7 +19,6 @@
 
 import wally
 from .utils import ssize2b
-from .statistic import round_3_digit
 from .storage import Storage
 from .stage import Stage, StepOrder
 from .test_run_class import TestRun
@@ -33,30 +33,31 @@
 
 
 def load_test_results(storage: Storage) -> Iterator[FullTestResult]:
-    sensors_data = {}  # type: Dict[Tuple[str, str, str], SensorInfo]
-
-    mstorage = storage.sub_storage("metric")
-    for _, node_id in mstorage.list():
-        for _, dev_name in mstorage.list(node_id):
-            for _, sensor_name in mstorage.list(node_id, dev_name):
-                key = (node_id, dev_name, sensor_name)
-                si = SensorInfo(*key)
-                si.begin_time, si.end_time, si.data = storage[node_id, dev_name, sensor_name]  # type: ignore
-                sensors_data[key] = si
-
-    rstorage = storage.sub_storage("result")
-    for _, run_id in rstorage.list():
-        ftr = FullTestResult()
-        ftr.test_info = rstorage.load(TestInfo, run_id, "info")
-        ftr.performance_data = {}
-
-        p1 = "{}/measurement".format(run_id)
-        for _, node_id in rstorage.list(p1):
-            for _, measurement_name in rstorage.list(p1, node_id):
-                perf_key = (node_id, measurement_name)
-                ftr.performance_data[perf_key] = rstorage["{}/{}/{}".format(p1, *perf_key)]  # type: ignore
-
-        yield ftr
+    raise NotImplementedError()
+    # sensors_data = {}  # type: Dict[Tuple[str, str, str], SensorInfo]
+    #
+    # mstorage = storage.sub_storage("metric")
+    # for _, node_id in mstorage.list():
+    #     for _, dev_name in mstorage.list(node_id):
+    #         for _, sensor_name in mstorage.list(node_id, dev_name):
+    #             key = (node_id, dev_name, sensor_name)
+    #             si = SensorInfo(*key)
+    #             si.begin_time, si.end_time, si.data = storage[node_id, dev_name, sensor_name]  # type: ignore
+    #             sensors_data[key] = si
+    #
+    # rstorage = storage.sub_storage("result")
+    # for _, run_id in rstorage.list():
+    #     ftr = FullTestResult()
+    #     ftr.test_info = rstorage.load(TestInfo, run_id, "info")
+    #     ftr.performance_data = {}
+    #
+    #     p1 = "{}/measurement".format(run_id)
+    #     for _, node_id in rstorage.list(p1):
+    #         for _, measurement_name in rstorage.list(p1, node_id):
+    #             perf_key = (node_id, measurement_name)
+    #             ftr.performance_data[perf_key] = rstorage["{}/{}/{}".format(p1, *perf_key)]  # type: ignore
+    #
+    #     yield ftr
 
 
 class ConsoleReportStage(Stage):
@@ -67,6 +68,7 @@
         # TODO(koder): load data from storage
         raise NotImplementedError("...")
 
+
 class HtmlReportStage(Stage):
 
     priority = StepOrder.REPORT
@@ -75,26 +77,86 @@
         # TODO(koder): load data from storage
         raise NotImplementedError("...")
 
-# class StoragePerfInfo:
-#     def __init__(self, name: str, summary: Any, params, testnodes_count) -> None:
-#         self.direct_iops_r_max = 0  # type: int
-#         self.direct_iops_w_max = 0  # type: int
-#
-#         # 64 used instead of 4k to faster feed caches
-#         self.direct_iops_w64_max = 0  # type: int
-#
-#         self.rws4k_10ms = 0  # type: int
-#         self.rws4k_30ms = 0  # type: int
-#         self.rws4k_100ms = 0  # type: int
-#         self.bw_write_max = 0  # type: int
-#         self.bw_read_max = 0  # type: int
-#
-#         self.bw = None  #
-#         self.iops = None
-#         self.lat = None
-#         self.lat_50 = None
-#         self.lat_95 = None
-#
+
+# TODO: need to be revised, have to user StatProps fields instead
+class StoragePerfSummary:
+    def __init__(self, name: str) -> None:
+        self.direct_iops_r_max = 0  # type: int
+        self.direct_iops_w_max = 0  # type: int
+
+        # 64 used instead of 4k to faster feed caches
+        self.direct_iops_w64_max = 0  # type: int
+
+        self.rws4k_10ms = 0  # type: int
+        self.rws4k_30ms = 0  # type: int
+        self.rws4k_100ms = 0  # type: int
+        self.bw_write_max = 0  # type: int
+        self.bw_read_max = 0  # type: int
+
+        self.bw = None  # type: float
+        self.iops = None  # type: float
+        self.lat = None  # type: float
+        self.lat_50 = None  # type: float
+        self.lat_95 = None  # type: float
+
+
+class HTMLBlock:
+    data = None  # type: str
+    js_links = []  # type: List[str]
+    css_links = []  # type: List[str]
+
+
+class Reporter(metaclass=abc.ABCMeta):
+    @abc.abstractmethod
+    def get_divs(self, config, storage) -> Iterator[Tuple[str, str, HTMLBlock]]:
+        pass
+
+
+# Main performance report
+class PerformanceSummary(Reporter):
+    """Creates graph, which show how IOPS and Latency depend on QD"""
+
+
+# Main performance report
+class IOPS_QD(Reporter):
+    """Creates graph, which show how IOPS and Latency depend on QD"""
+
+
+# Linearization report
+class IOPS_Bsize(Reporter):
+    """Creates graphs, which show how IOPS and Latency depend on block size"""
+
+
+# IOPS/latency distribution
+class IOPSHist(Reporter):
+    """IOPS.latency distribution histogram"""
+
+
+# IOPS/latency over test time
+class IOPSTime(Reporter):
+    """IOPS/latency during test"""
+
+
+# Cluster load over test time
+class ClusterLoad(Reporter):
+    """IOPS/latency during test"""
+
+
+# Node load over test time
+class NodeLoad(Reporter):
+    """IOPS/latency during test"""
+
+
+# Ceph cluster summary
+class CephClusterSummary(Reporter):
+    """IOPS/latency during test"""
+
+
+# TODO: Resource consumption report
+# TODO: Ceph operation breakout report
+# TODO: Resource consumption for different type of test
+
+
 #
 # # disk_info = None
 # # base = None
diff --git a/wally/result_classes.py b/wally/result_classes.py
index 19925f2..7244225 100644
--- a/wally/result_classes.py
+++ b/wally/result_classes.py
@@ -1,5 +1,6 @@
+import abc
 import array
-from typing import Dict, List, Any, Tuple, Optional
+from typing import Dict, List, Any, Tuple, Optional, Union, Type
 
 
 class TimeSerie:
@@ -81,3 +82,20 @@
 
     # {(node_id, perf_metrics_name): values}
     sensors_data = None  # type: Dict[Tuple[str, str, str], SensorInfo]
+
+
+class IStorable(metaclass=abc.ABCMeta):
+    """Interface for type, which can be stored"""
+
+    @abc.abstractmethod
+    def raw(self) -> Dict[str, Any]:
+        pass
+
+    @abc.abstractclassmethod
+    def fromraw(cls, data: Dict[str, Any]) -> 'IStorable':
+        pass
+
+
+Basic = Union[int, str, bytes, bool, None]
+Storable = Union[IStorable, Dict[str, Any], List[Any], int, str, bytes, bool, None]
+
diff --git a/wally/run_test.py b/wally/run_test.py
index 52803d1..8e8a4e9 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -7,14 +7,17 @@
 from . import utils, ssh_utils, hw_info
 from .config import ConfigBlock
 from .node import setup_rpc, connect
-from .node_interfaces import NodeInfo, IRPCNode, ISSHHost
+from .node_interfaces import NodeInfo, IRPCNode
 from .stage import Stage, StepOrder
+from .sensors import collect_sensors_data
 from .suits.io.fio import IOPerfTest
 from .suits.itest import TestInputConfig
 from .suits.mysql import MysqlTest
 from .suits.omgbench import OmgTest
 from .suits.postgres import PgBenchTest
 from .test_run_class import TestRun
+from .statistic import calc_stat_props
+from .utils import StopTestError
 
 
 TOOL_TYPE_MAPPER = {
@@ -83,7 +86,11 @@
                     path = "rpc_logs/" + nid
                     node.conn.server.flush_logs()
                     log = node.get_file_content(node.rpc_log_file)
-                    ctx.storage.store_raw(log, path)
+                    if path in ctx.storage:
+                        previous = ctx.storage.get_raw(path)
+                    else:
+                        previous = b""
+                    ctx.storage.put_raw(previous + log, path)
                     logger.debug("RPC log from node {} stored into storage::{}".format(nid, path))
 
         with ctx.get_pool() as pool:
@@ -112,7 +119,7 @@
 
             for (path, nid), future in futures.items():
                 try:
-                    ctx.storage[path] = future.result()
+                    ctx.storage.put(future.result(), path)
                 except Exception:
                     logger.exception("During collecting hardware info from %s", nid)
                     raise utils.StopTestError()
@@ -126,7 +133,7 @@
 
             for (path, nid), future in futures.items():
                 try:
-                    ctx.storage[path] = future.result()
+                    ctx.storage.put(future.result(), path)
                 except Exception:
                     logger.exception("During collecting software info from %s", nid)
                     raise utils.StopTestError()
@@ -154,7 +161,7 @@
     priority = StepOrder.CONNECT
 
     def run(self, ctx: TestRun) -> None:
-        ctx.storage['all_nodes'] = list(ctx.nodes_info.values())   # type: ignore
+        ctx.storage.put_list(ctx.nodes_info.values(), 'all_nodes')
 
 
 class SleepStage(Stage):
@@ -226,7 +233,7 @@
 
                 if not test_nodes:
                     logger.error("No test nodes found")
-                    return
+                    raise StopTestError()
 
                 for name, params in test_group.items():
                     vm_count = params.get('node_limit', None)  # type: Optional[int]
@@ -250,8 +257,46 @@
                                                storage=ctx.storage,
                                                remote_dir=remote_dir)
 
-                    test_cls(test_cfg).run()
+                    test_cls(test_cfg,
+                             on_idle=lambda: collect_sensors_data(ctx, False)).run()
 
     @classmethod
     def validate_config(cls, cfg: ConfigBlock) -> None:
         pass
+
+
+class CalcStatisticStage(Stage):
+    priority = StepOrder.TEST + 1
+
+    def run(self, ctx: TestRun) -> None:
+        results = {}
+        for name, summary, stor_path in ctx.storage.get("all_results"):
+            if name == 'fio':
+                test_info = ctx.storage.get(stor_path, "info")
+                for node in test_info['nodes']:
+                    iops = ctx.storage.get_array(stor_path, node, 'iops_data')
+                    bw = ctx.storage.get_array(stor_path, node, 'bw_data')
+                    lat = ctx.storage.get_array(stor_path, node, 'lat_data')
+                    results[summary] = (iops, bw, lat)
+
+        for name, (iops, bw, lat) in results.items():
+            print(" -------------------  IOPS -------------------")
+            print(calc_stat_props(iops))  # type: ignore
+            print(" -------------------  BW -------------------")
+            print(calc_stat_props(bw))  # type: ignore
+            # print(" -------------------  LAT -------------------")
+            # print(calc_stat_props(lat))
+
+
+class LoadStoredNodesStage(Stage):
+    priority = StepOrder.DISCOVER
+
+    def run(self, ctx: TestRun) -> None:
+        if 'all_nodes' in ctx.storage:
+            if ctx.nodes_info:
+                logger.error("Internal error: Some nodes already stored in " +
+                             "nodes_info before LoadStoredNodesStage stage")
+                raise StopTestError()
+            ctx.nodes_info = {node.node_id(): node
+                              for node in ctx.storage.load_list(NodeInfo, "all_nodes")}
+            logger.info("%s nodes loaded from database", len(ctx.nodes_info))
diff --git a/wally/sensors_rpc_plugin.py b/wally/sensors_rpc_plugin.py
index 55d8b84..be5e5db 100644
--- a/wally/sensors_rpc_plugin.py
+++ b/wally/sensors_rpc_plugin.py
@@ -3,6 +3,7 @@
 import json
 import time
 import array
+import pprint
 import logging
 import threading
 import traceback
@@ -38,6 +39,12 @@
     def unpack_results(cls, device, metrics, data):
         pass
 
+    def init(self):
+        pass
+
+    def stop(self):
+        pass
+
 
 class ArraysSensor(Sensor):
     typecode = 'L'
@@ -384,16 +391,21 @@
     admin_socket = None
 
 
-def run_ceph_daemon_cmd(cluster, osd_id, *args):
-    asok = "/var/run/ceph/{}-osd.{}.asok".format(cluster, osd_id)
-    if admin_socket:
-        return admin_socket(asok, args)
-    else:
-        return subprocess.check_output("ceph daemon {} {}".format(asok, " ".join(args)), shell=True)
-
-
 @provides("ceph")
 class CephSensor(ArraysSensor):
+
+    historic_duration = 2
+    historic_size = 200
+
+    def run_ceph_daemon_cmd(self, osd_id, *args):
+        asok = "/var/run/ceph/{}-osd.{}.asok".format(self.cluster, osd_id)
+        if admin_socket:
+            res = admin_socket(asok, args)
+        else:
+            res = subprocess.check_output("ceph daemon {} {}".format(asok, " ".join(args)), shell=True)
+
+        return res
+
     def collect(self):
         def get_val(dct, path):
             if '/' in path:
@@ -402,10 +414,64 @@
             return dct[path]
 
         for osd_id in self.params['osds']:
-            data = json.loads(run_ceph_daemon_cmd(self.params['cluster'], osd_id, 'perf', 'dump'))
+            data = json.loads(self.run_ceph_daemon_cmd(osd_id, 'perf', 'dump'))
             for key_name in self.params['counters']:
                 self.add_data("osd{}".format(osd_id), key_name.replace("/", "."), get_val(data, key_name))
 
+            if 'historic' in self.params.get('sources', {}):
+                self.historic.setdefault(osd_id, []).append(self.run_ceph_daemon_cmd(osd_id, "dump_historic_ops"))
+
+            if 'in_flight' in self.params.get('sources', {}):
+                self.in_flight.setdefault(osd_id, []).append(self.run_ceph_daemon_cmd(osd_id, "dump_ops_in_flight"))
+
+    def set_osd_historic(self, duration, keep, osd_id):
+        data = json.loads(self.run_ceph_daemon_cmd(osd_id, "dump_historic_ops"))
+        self.run_ceph_daemon_cmd(osd_id, "config set osd_op_history_duration {}".format(duration))
+        self.run_ceph_daemon_cmd(osd_id, "config set osd_op_history_size {}".format(keep))
+        return (data["duration to keep"], data["num to keep"])
+
+    def init(self):
+        self.cluster = self.params['cluster']
+        self.prev_vals = {}
+        self.historic = {}
+        self.in_flight = {}
+
+        if 'historic' in self.params.get('sources', {}):
+            for osd_id in self.params['osds']:
+                self.prev_vals[osd_id] = self.set_osd_historic(self.historic_duration, self.historic_size, osd_id)
+
+    def stop(self):
+        for osd_id, (duration, keep) in self.prev_vals.items():
+            self.prev_vals[osd_id] = self.set_osd_historic(duration, keep, osd_id)
+
+    def get_updates(self):
+        res = super().get_updates()
+
+        for osd_id, data in self.historic.items():
+            res[("osd{}".format(osd_id), "historic")] = data
+
+        self.historic = {}
+
+        for osd_id, data in self.in_flight.items():
+            res[("osd{}".format(osd_id), "in_flight")] = data
+
+        self.in_flight = {}
+
+        return res
+
+    @classmethod
+    def unpack_results(cls, device, metrics, packed):
+        if metrics in ('historic', 'in_flight'):
+            return packed
+
+        arr = array.array(chr(packed[0]))
+        if sys.version_info >= (3, 0, 0):
+            arr.frombytes(packed[1:])
+        else:
+            arr.fromstring(packed[1:])
+
+        return arr
+
 
 class SensorsData(object):
     def __init__(self):
@@ -432,7 +498,6 @@
     return curr
 
 
-# TODO(koder): a lot code here can be optimized and cached, but nobody cares (c)
 def sensors_bg_thread(sensors_config, sdata):
     try:
         next_collect_at = time.time()
@@ -450,8 +515,8 @@
                     params["disallowed_prefixes"] = config["disallow"]
 
                 sdata.sensors[name] = SensorsMap[name](**params)
+                sdata.sensors[name].init()
 
-            import pprint
             logger.debug("sensors.config = %s", pprint.pformat(sensors_config))
             logger.debug("Sensors map keys %s", ", ".join(sdata.sensors.keys()))
 
@@ -483,6 +548,10 @@
     except Exception:
         logger.exception("In sensor BG thread")
         sdata.exception = traceback.format_exc()
+    finally:
+        for sensor in sdata.sensors.values():
+            sensor.stop()
+        sdata.sensors = None
 
 
 sensors_thread = None
@@ -539,6 +608,7 @@
         collected_at = sdata.collected_at
         sdata.collected_at = array.array(sdata.collected_at.typecode)
 
+    # TODO: pack data before send
     return res, collected_at.tostring()
 
 
diff --git a/wally/ssh_utils.py b/wally/ssh_utils.py
index c224bf4..e9e118d 100644
--- a/wally/ssh_utils.py
+++ b/wally/ssh_utils.py
@@ -1,5 +1,4 @@
 import re
-import yaml
 import getpass
 import logging
 from typing import List, Dict, Any
@@ -7,6 +6,7 @@
 
 from . import utils
 from .common_types import IPAddr
+from .result_classes import IStorable
 
 
 logger = logging.getLogger("wally")
@@ -47,9 +47,7 @@
         uri_reg_exprs.append(templ.format(**re_dct))
 
 
-class ConnCreds(yaml.YAMLObject):  # type: ignore
-    yaml_tag = '!ConnCreds'
-
+class ConnCreds(IStorable):
     def __init__(self, host: str, user: str, passwd: str = None, port: str = '22',
                  key_file: str = None, key: bytes = None) -> None:
         self.user = user
@@ -64,21 +62,18 @@
     def __repr__(self) -> str:
         return str(self)
 
-    @classmethod
-    def to_yaml(cls, dumper: Any, data: 'ConnCreds') -> Any:
-        dict_representation = {
-            'user': data.user,
-            'host': data.addr.host,
-            'port': data.addr.port,
-            'passwd': data.passwd,
-            'key_file': data.key_file
+    def raw(self) -> Dict[str, Any]:
+        return {
+            'user': self.user,
+            'host': self.addr.host,
+            'port': self.addr.port,
+            'passwd': self.passwd,
+            'key_file': self.key_file
         }
-        return dumper.represent_mapping(data.yaml_tag, dict_representation)
 
     @classmethod
-    def from_yaml(cls, loader: Any, node: Any) -> 'ConnCreds':
-        dct = loader.construct_mapping(node)
-        return cls(**dct)
+    def fromraw(cls, data) -> 'ConnCreds':
+        return cls(**data)
 
 
 def parse_ssh_uri(uri: str) -> ConnCreds:
diff --git a/wally/statistic.py b/wally/statistic.py
index e2021e1..2263788 100644
--- a/wally/statistic.py
+++ b/wally/statistic.py
@@ -1,48 +1,119 @@
 import math
 import itertools
+import statistics
+from typing import Union, List, TypeVar, Callable, Iterable, Tuple, Any, cast, Dict
 
-try:
-    from scipy import stats
-    from numpy import array, linalg
-    from scipy.optimize import leastsq
-    from numpy.polynomial.chebyshev import chebfit, chebval
-    no_numpy = False
-except ImportError:
-    no_numpy = True
+import numpy
+from scipy import stats, optimize
+from numpy import linalg
+from numpy.polynomial.chebyshev import chebfit, chebval
 
 
-def average(data):
-    return sum(data) / len(data)
+from .result_classes import IStorable
 
 
-def med_dev(vals):
-    if len(vals) == 1:
-        return vals[0], 0.0
-
-    med = sum(vals) / len(vals)
-    dev = ((sum(abs(med - i) ** 2.0 for i in vals) / (len(vals) - 1)) ** 0.5)
-    return med, dev
+Number = Union[int, float]
+TNumber = TypeVar('TNumber', int, float)
 
 
-def round_3_digit(val):
-    return round_deviation((val, val / 10.0))[0]
+DOUBLE_DELTA = 1e-8
 
 
-def round_deviation(med_dev):
-    med, dev = med_dev
-
-    if dev < 1E-7:
-        return med_dev
-
-    dev_div = 10.0 ** (math.floor(math.log10(dev)) - 1)
-    dev = int(dev / dev_div) * dev_div
-    med = int(med / dev_div) * dev_div
-    return [type(med_dev[0])(med),
-            type(med_dev[1])(dev)]
+average = statistics.mean
+dev = statistics.variance
 
 
-def groupby_globally(data, key_func):
-    grouped = {}
+class StatProps(IStorable):
+    "Statistic properties for timeserie"
+
+    yaml_tag = 'stat'
+
+    def __init__(self, data: List[Number]) -> None:
+        self.average = None  # type: float
+        self.deviation = None  # type: float
+        self.confidence = None  # type: float
+        self.confidence_level = None  # type: float
+
+        self.perc_99 = None  # type: float
+        self.perc_95 = None  # type: float
+        self.perc_90 = None  # type: float
+        self.perc_50 = None   # type: float
+
+        self.min = None  # type: Number
+        self.max = None  # type: Number
+
+        # bin_center: bin_count
+        self.histo = None  # type: Tuple[List[int], List[float]]
+        self.data = data
+
+        self.normtest = None  # type: Any
+
+    def __str__(self) -> str:
+        res = ["StatProps(num_el={}):".format(len(self.data)),
+               "    distr = {0.average} ~ {0.deviation}".format(self),
+               "    confidence({0.confidence_level}) = {0.confidence}".format(self),
+               "    perc50={0.perc50}".format(self),
+               "    perc90={0.perc90}".format(self),
+               "    perc95={0.perc95}".format(self),
+               "    perc95={0.perc99}".format(self),
+               "    range {0.min} {0.max}".format(self),
+               "    nurmtest = {0.nortest}".format(self)]
+        return "\n".join(res)
+
+    def __repr__(self) -> str:
+        return str(self)
+
+    def raw(self) -> Dict[str, Any]:
+        return self.__dict__.copy()
+
+    @classmethod
+    def fromraw(cls, data: Dict[str, Any]) -> 'StatProps':
+        res = cls.__new__(cls)
+        res.__dict__.update(data)
+        return res
+
+
+def greater_digit_pos(val: Number) -> int:
+    return int(math.floor(math.log10(val))) + 1
+
+
+def round_digits(val: TNumber, num_digits: int = 3) -> TNumber:
+    pow = 10 ** (greater_digit_pos(val) - num_digits)
+    return type(val)(int(val / pow) * pow)
+
+
+def calc_stat_props(data: List[Number], confidence: float = 0.95) -> StatProps:
+    "Calculate statistical properties of array of numbers"
+
+    res = StatProps(data)
+
+    if len(data) == 0:
+        raise ValueError("Input array is empty")
+
+    data = sorted(data)
+    res.average = average(data)
+    res.deviation = dev(data)
+    res.max = data[-1]
+    res.min = data[0]
+
+    res.perc_50 = numpy.percentile(data, 50)
+    res.perc_90 = numpy.percentile(data, 90)
+    res.perc_95 = numpy.percentile(data, 95)
+    res.perc_99 = numpy.percentile(data, 99)
+
+    if len(data) >= 3:
+        res.confidence = stats.sem(data) * \
+                         stats.t.ppf((1 + confidence) / 2, len(data) - 1)
+    else:
+        res.confidence = None
+
+    res.histo = numpy.histogram(data, 'auto')
+    res.normtest = stats.mstats.normaltest(data)
+    return res
+
+
+def groupby_globally(data: Iterable, key_func: Callable):
+    grouped = {}  # type: ignore
     grouped_iter = itertools.groupby(data, key_func)
 
     for (bs, cache_tp, act, conc), curr_data_it in grouped_iter:
@@ -52,25 +123,19 @@
     return grouped
 
 
-def approximate_curve(x, y, xnew, curved_coef):
+def approximate_curve(x: List[Number], y: List[float], xnew: List[Number], curved_coef: int) -> List[float]:
     """returns ynew - y values of some curve approximation"""
-    if no_numpy:
-        return None
-
-    return chebval(xnew, chebfit(x, y, curved_coef))
+    return cast(List[float], chebval(xnew, chebfit(x, y, curved_coef)))
 
 
-def approximate_line(x, y, xnew, relative_dist=False):
-    """ x, y - test data, xnew - dots, where we want find approximation
-        if not relative_dist distance = y - newy
-        returns ynew - y values of linear approximation"""
-
-    if no_numpy:
-        return None
-
-    # convert to numpy.array (don't work without it)
-    ox = array(x)
-    oy = array(y)
+def approximate_line(x: List[Number], y: List[float], xnew: List[Number], relative_dist: bool = False) -> List[float]:
+    """
+    x, y - test data, xnew - dots, where we want find approximation
+    if not relative_dist distance = y - newy
+    returns ynew - y values of linear approximation
+    """
+    ox = numpy.array(x)
+    oy = numpy.array(y)
 
     # set approximation function
     def func_line(tpl, x):
@@ -89,108 +154,41 @@
                                      oy[:2]))
 
     # find line
-    tpl_final, success = leastsq(error_func,
-                                 tpl_initial[:],
-                                 args=(ox, oy))
+    tpl_final, success = optimize.leastsq(error_func, tpl_initial[:], args=(ox, oy))
 
     # if error
     if success not in range(1, 5):
         raise ValueError("No line for this dots")
 
     # return new dots
-    return func_line(tpl_final, array(xnew))
+    return func_line(tpl_final, numpy.array(xnew))
 
 
-def difference(y, ynew):
-    """returns average and maximum relative and
-       absolute differences between y and ynew
-       result may contain None values for y = 0
-       return value - tuple:
-       [(abs dif, rel dif) * len(y)],
-       (abs average, abs max),
-       (rel average, rel max)"""
-
-    abs_dlist = []
-    rel_dlist = []
-
-    for y1, y2 in zip(y, ynew):
-        # absolute
-        abs_dlist.append(y1 - y2)
-
-        if y1 > 1E-6:
-            rel_dlist.append(abs(abs_dlist[-1] / y1))
-        else:
-            raise ZeroDivisionError("{0!r} is too small".format(y1))
-
-    da_avg = sum(abs_dlist) / len(abs_dlist)
-    dr_avg = sum(rel_dlist) / len(rel_dlist)
-
-    return (zip(abs_dlist, rel_dlist),
-            (da_avg, max(abs_dlist)), (dr_avg, max(rel_dlist))
-            )
-
-
-def calculate_distribution_properties(data):
-    """chi, etc"""
-
-
-def minimal_measurement_count(data, max_diff, req_probability):
-    """
-    should returns amount of measurements to get results (avg and deviation)
-    with error less, that max_diff in at least req_probability% cases
-    """
-
-
-class StatProps(object):
-    def __init__(self):
-        self.average = None
-        self.mediana = None
-        self.perc_95 = None
-        self.perc_5 = None
-        self.deviation = None
-        self.confidence = None
-        self.min = None
-        self.max = None
-        self.raw = None
-
-    def rounded_average_conf(self):
-        return round_deviation((self.average, self.confidence))
-
-    def rounded_average_dev(self):
-        return round_deviation((self.average, self.deviation))
-
-    def __str__(self):
-        return "StatProps({0} ~ {1})".format(round_3_digit(self.average),
-                                             round_3_digit(self.deviation))
-
-    def __repr__(self):
-        return str(self)
-
-
-def data_property(data, confidence=0.95):
-    res = StatProps()
-    if len(data) == 0:
-        return res
-
-    data = sorted(data)
-    res.average, res.deviation = med_dev(data)
-    res.max = data[-1]
-    res.min = data[0]
-
-    ln = len(data)
-    if ln % 2 == 0:
-        res.mediana = (data[ln / 2] + data[ln / 2 - 1]) / 2
-    else:
-        res.mediana = data[ln / 2]
-
-    res.perc_95 = data[int((ln - 1) * 0.95)]
-    res.perc_5 = data[int((ln - 1) * 0.05)]
-
-    if not no_numpy and ln >= 3:
-        res.confidence = stats.sem(data) * \
-                         stats.t.ppf((1 + confidence) / 2, ln - 1)
-    else:
-        res.confidence = res.deviation
-
-    res.raw = data[:]
-    return res
+# TODO: revise next
+# def difference(y, ynew):
+#     """returns average and maximum relative and
+#        absolute differences between y and ynew
+#        result may contain None values for y = 0
+#        return value - tuple:
+#        [(abs dif, rel dif) * len(y)],
+#        (abs average, abs max),
+#        (rel average, rel max)"""
+#
+#     abs_dlist = []
+#     rel_dlist = []
+#
+#     for y1, y2 in zip(y, ynew):
+#         # absolute
+#         abs_dlist.append(y1 - y2)
+#
+#         if y1 > 1E-6:
+#             rel_dlist.append(abs(abs_dlist[-1] / y1))
+#         else:
+#             raise ZeroDivisionError("{0!r} is too small".format(y1))
+#
+#     da_avg = sum(abs_dlist) / len(abs_dlist)
+#     dr_avg = sum(rel_dlist) / len(rel_dlist)
+#
+#     return (zip(abs_dlist, rel_dlist),
+#             (da_avg, max(abs_dlist)), (dr_avg, max(rel_dlist))
+#             )
diff --git a/wally/storage.py b/wally/storage.py
index 6879dcf..2c0a26b 100644
--- a/wally/storage.py
+++ b/wally/storage.py
@@ -6,8 +6,8 @@
 import abc
 import array
 import shutil
-from typing import Any, Iterator, TypeVar, Type, IO, Tuple, cast, List, Dict, Union, Iterable
-
+import sqlite3
+from typing import Any, TypeVar, Type, IO, Tuple, cast, List, Dict, Iterable
 
 import yaml
 try:
@@ -16,16 +16,7 @@
     from yaml import Loader, Dumper  # type: ignore
 
 
-class IStorable(metaclass=abc.ABCMeta):
-    """Interface for type, which can be stored"""
-
-basic_types = {list, dict, tuple, set, type(None), int, str, bytes, bool, float}
-for btype in basic_types:
-    # pylint: disable=E1101
-    IStorable.register(btype)  # type: ignore
-
-
-ObjClass = TypeVar('ObjClass')
+from .result_classes import Storable, IStorable
 
 
 class ISimpleStorage(metaclass=abc.ABCMeta):
@@ -33,15 +24,19 @@
     and can operate only on bytes"""
 
     @abc.abstractmethod
-    def __setitem__(self, path: str, value: bytes) -> None:
+    def put(self, value: bytes, path: str) -> None:
         pass
 
     @abc.abstractmethod
-    def __getitem__(self, path: str) -> bytes:
+    def get(self, path: str) -> bytes:
         pass
 
     @abc.abstractmethod
-    def __delitem__(self, path: str) -> None:
+    def rm(self, path: str) -> None:
+        pass
+
+    @abc.abstractmethod
+    def sync(self) -> None:
         pass
 
     @abc.abstractmethod
@@ -49,75 +44,153 @@
         pass
 
     @abc.abstractmethod
-    def list(self, path: str) -> Iterator[Tuple[bool, str]]:
-        pass
-
-    @abc.abstractmethod
-    def get_stream(self, path: str, mode: str = "rb+") -> IO:
+    def get_fd(self, path: str, mode: str = "rb+") -> IO:
         pass
 
     @abc.abstractmethod
     def sub_storage(self, path: str) -> 'ISimpleStorage':
         pass
 
-    @abc.abstractmethod
-    def clear(self, path: str) -> None:
-        pass
-
 
 class ISerializer(metaclass=abc.ABCMeta):
     """Interface for serialization class"""
     @abc.abstractmethod
-    def pack(self, value: IStorable) -> bytes:
+    def pack(self, value: Storable) -> bytes:
         pass
 
     @abc.abstractmethod
-    def unpack(self, data: bytes) -> IStorable:
+    def unpack(self, data: bytes) -> Any:
         pass
 
 
+class DBStorage(ISimpleStorage):
+
+    create_tb_sql = "CREATE TABLE IF NOT EXISTS wally_storage (key text, data blob, type text)"
+    insert_sql = "INSERT INTO wally_storage VALUES (?, ?, ?)"
+    update_sql = "UPDATE wally_storage SET data=?, type=? WHERE key=?"
+    select_sql = "SELECT data, type FROM wally_storage WHERE key=?"
+    contains_sql = "SELECT 1 FROM wally_storage WHERE key=?"
+    rm_sql = "DELETE FROM wally_storage WHERE key LIKE '{}%'"
+    list2_sql = "SELECT key, length(data), type FROM wally_storage"
+
+    def __init__(self, db_path: str = None, existing: bool = False,
+                 prefix: str = None, db: sqlite3.Connection = None) -> None:
+
+        assert not prefix or "'" not in prefix, "Broken sql prefix {!r}".format(prefix)
+
+        if db_path:
+            self.existing = existing
+            if existing:
+                if not os.path.isfile(db_path):
+                    raise IOError("No storage found at {!r}".format(db_path))
+
+            os.makedirs(os.path.dirname(db_path), exist_ok=True)
+            try:
+                self.db = sqlite3.connect(db_path)
+            except sqlite3.OperationalError as exc:
+                raise IOError("Can't open database at {!r}".format(db_path)) from exc
+
+            self.db.execute(self.create_tb_sql)
+        else:
+            if db is None:
+                raise ValueError("Either db or db_path parameter must be passed")
+            self.db = db
+
+        if prefix is None:
+            self.prefix = ""
+        elif not prefix.endswith('/'):
+            self.prefix = prefix + '/'
+        else:
+            self.prefix = prefix
+
+    def put(self, value: bytes, path: str) -> None:
+        c = self.db.cursor()
+        fpath = self.prefix + path
+        c.execute(self.contains_sql, (fpath,))
+        if len(c.fetchall()) == 0:
+            c.execute(self.insert_sql, (fpath, value, 'yaml'))
+        else:
+            c.execute(self.update_sql, (value, 'yaml', fpath))
+
+    def get(self, path: str) -> bytes:
+        c = self.db.cursor()
+        c.execute(self.select_sql, (self.prefix + path,))
+        res = cast(List[Tuple[bytes, str]], c.fetchall())  # type: List[Tuple[bytes, str]]
+        if not res:
+            raise KeyError(path)
+        assert len(res) == 1
+        val, tp = res[0]
+        assert tp == 'yaml'
+        return val
+
+    def rm(self, path: str) -> None:
+        c = self.db.cursor()
+        path = self.prefix + path
+        assert "'" not in path, "Broken sql path {!r}".format(path)
+        c.execute(self.rm_sql.format(path))
+
+    def __contains__(self, path: str) -> bool:
+        c = self.db.cursor()
+        path = self.prefix + path
+        c.execute(self.contains_sql, (self.prefix + path,))
+        return len(c.fetchall()) != 0
+
+    def print_tree(self):
+        c = self.db.cursor()
+        c.execute(self.list2_sql)
+        data = list(c.fetchall())
+        data.sort()
+        print("------------------ DB ---------------------")
+        for key, data_ln, type in data:
+            print(key, data_ln, type)
+        print("------------------ END --------------------")
+
+    def get_fd(self, path: str, mode: str = "rb+") -> IO[bytes]:
+        raise NotImplementedError("SQLITE3 doesn't provide fd-like interface")
+
+    def sub_storage(self, path: str) -> 'DBStorage':
+        return self.__class__(prefix=self.prefix + path, db=self.db)
+
+    def sync(self):
+        self.db.commit()
+
+
+DB_REL_PATH = "__db__.db"
+
+
 class FSStorage(ISimpleStorage):
     """Store all data in files on FS"""
 
     def __init__(self, root_path: str, existing: bool) -> None:
         self.root_path = root_path
         self.existing = existing
-        if existing:
-            if not os.path.isdir(self.root_path):
-                raise IOError("No storage found at {!r}".format(root_path))
 
     def j(self, path: str) -> str:
         return os.path.join(self.root_path, path)
 
-    def __setitem__(self, path: str, value: bytes) -> None:
+    def put(self, value: bytes, path: str) -> None:
         jpath = self.j(path)
         os.makedirs(os.path.dirname(jpath), exist_ok=True)
         with open(jpath, "wb") as fd:
             fd.write(value)
 
-    def __delitem__(self, path: str) -> None:
+    def get(self, path: str) -> bytes:
         try:
-            os.unlink(path)
-        except FileNotFoundError:
-            pass
+            with open(self.j(path), "rb") as fd:
+                return fd.read()
+        except FileNotFoundError as exc:
+            raise KeyError(path) from exc
 
-    def __getitem__(self, path: str) -> bytes:
-        with open(self.j(path), "rb") as fd:
-            return fd.read()
+    def rm(self, path: str) -> None:
+        if os.path.isdir(path):
+            shutil.rmtree(path, ignore_errors=True)
+        elif os.path.exists(path):
+            os.unlink(path)
 
     def __contains__(self, path: str) -> bool:
         return os.path.exists(self.j(path))
 
-    def list(self, path: str = "") -> Iterator[Tuple[bool, str]]:
-        jpath = self.j(path)
-        if not os.path.exists(jpath):
-            return
-
-        for entry in os.scandir(jpath):
-            if not entry.name in ('..', '.'):
-                yield entry.is_file(), entry.name
-
-    def get_stream(self, path: str, mode: str = "rb+") -> IO[bytes]:
+    def get_fd(self, path: str, mode: str = "rb+") -> IO[bytes]:
         jpath = self.j(path)
 
         if "cb" == mode:
@@ -140,77 +213,89 @@
     def sub_storage(self, path: str) -> 'FSStorage':
         return self.__class__(self.j(path), self.existing)
 
-    def clear(self, path: str) -> None:
-        if os.path.exists(path):
-            shutil.rmtree(self.j(path))
+    def sync(self):
+        pass
 
 
 class YAMLSerializer(ISerializer):
     """Serialize data to yaml"""
-    def pack(self, value: Any) -> bytes:
-        if type(value) not in basic_types:
-            # for name, val in value.__dict__.items():
-            #     if type(val) not in basic_types:
-            #         raise ValueError(("Can't pack {!r}. Attribute {} has value {!r} (type: {}), but only" +
-            #                           " basic types accepted as attributes").format(value, name, val, type(val)))
-            value = value.__dict__
-        return yaml.dump(value, Dumper=Dumper, encoding="utf8")
+    def pack(self, value: Storable) -> bytes:
+        try:
+            return yaml.dump(value, Dumper=Dumper, encoding="utf8")
+        except Exception as exc:
+            raise ValueError("Can't pickle object {!r} to yaml".format(type(value))) from exc
 
-    def unpack(self, data: bytes) -> IStorable:
+    def unpack(self, data: bytes) -> Any:
         return yaml.load(data, Loader=Loader)
 
 
+class SAFEYAMLSerializer(ISerializer):
+    """Serialize data to yaml"""
+    def pack(self, value: Storable) -> bytes:
+        try:
+            return yaml.safe_dump(value, encoding="utf8")
+        except Exception as exc:
+            raise ValueError("Can't pickle object {!r} to yaml".format(type(value))) from exc
+
+    def unpack(self, data: bytes) -> Any:
+        return yaml.safe_load(data)
+
+
+ObjClass = TypeVar('ObjClass', bound=IStorable)
+
+
 class Storage:
     """interface for storage"""
-    def __init__(self, storage: ISimpleStorage, serializer: ISerializer) -> None:
-        self.storage = storage
+    def __init__(self, fs_storage: ISimpleStorage, db_storage: ISimpleStorage, serializer: ISerializer) -> None:
+        self.fs = fs_storage
+        self.db = db_storage
         self.serializer = serializer
 
     def sub_storage(self, *path: str) -> 'Storage':
-        return self.__class__(self.storage.sub_storage("/".join(path)), self.serializer)
+        fpath = "/".join(path)
+        return self.__class__(self.fs.sub_storage(fpath), self.db.sub_storage(fpath), self.serializer)
 
-    def __setitem__(self, path: Union[str, Iterable[str]], value: Any) -> None:
-        if not isinstance(path, str):
-            path = "/".join(path)
+    def put(self, value: Storable, *path: str) -> None:
+        dct_value = value.raw() if isinstance(value, IStorable) else value
+        serialized = self.serializer.pack(dct_value)
+        fpath = "/".join(path)
+        self.db.put(serialized, fpath)
+        self.fs.put(serialized, fpath)
 
-        self.storage[path] = self.serializer.pack(cast(IStorable, value))
+    def put_list(self, value: Iterable[IStorable], *path: str) -> None:
+        serialized = self.serializer.pack([obj.raw() for obj in value])
+        fpath = "/".join(path)
+        self.db.put(serialized, fpath)
+        self.fs.put(serialized, fpath)
 
-    def __getitem__(self, path: Union[str, Iterable[str]]) -> IStorable:
-        if not isinstance(path, str):
-            path = "/".join(path)
+    def get(self, *path: str) -> Any:
+        return self.serializer.unpack(self.db.get("/".join(path)))
 
-        return self.serializer.unpack(self.storage[path])
+    def rm(self, *path: str) -> None:
+        fpath = "/".join(path)
+        self.fs.rm(fpath)
+        self.db.rm(fpath)
 
-    def __delitem__(self, path: Union[str, Iterable[str]]) -> None:
-        if not isinstance(path, str):
-            path = "/".join(path)
-        del self.storage[path]
+    def __contains__(self, path: str) -> bool:
+        return path in self.fs or path in self.db
 
-    def __contains__(self, path: Union[str, Iterable[str]]) -> bool:
-        if not isinstance(path, str):
-            path = "/".join(path)
-        return path in self.storage
-
-    def store_raw(self, val: bytes, *path: str) -> None:
-        self.storage["/".join(path)] = val
-
-    def clear(self, *path: str) -> None:
-        self.storage.clear("/".join(path))
+    def put_raw(self, val: bytes, *path: str) -> None:
+        self.fs.put(val, "/".join(path))
 
     def get_raw(self, *path: str) -> bytes:
-        return self.storage["/".join(path)]
+        return self.fs.get("/".join(path))
 
-    def list(self, *path: str) -> Iterator[Tuple[bool, str]]:
-        return self.storage.list("/".join(path))
+    def get_fd(self, path: str, mode: str = "r") -> IO:
+        return self.fs.get_fd(path, mode)
 
-    def set_array(self, value: array.array, *path: str) -> None:
-        with self.get_stream("/".join(path), "wb") as fd:
+    def put_array(self, value: array.array, *path: str) -> None:
+        with self.get_fd("/".join(path), "wb") as fd:
             value.tofile(fd)  # type: ignore
 
     def get_array(self, typecode: str, *path: str) -> array.array:
         res = array.array(typecode)
         path_s = "/".join(path)
-        with self.get_stream(path_s, "rb") as fd:
+        with self.get_fd(path_s, "rb") as fd:
             fd.seek(0, os.SEEK_END)
             size = fd.tell()
             fd.seek(0, os.SEEK_SET)
@@ -220,55 +305,33 @@
         return res
 
     def append(self, value: array.array, *path: str) -> None:
-        with self.get_stream("/".join(path), "cb") as fd:
+        with self.get_fd("/".join(path), "cb") as fd:
             fd.seek(0, os.SEEK_END)
             value.tofile(fd)  # type: ignore
 
-    def construct(self, path: str, raw_val: Dict, obj_class: Type[ObjClass]) -> ObjClass:
-        "Internal function, used to construct user type from raw unpacked value"
-        if obj_class in (int, str, dict, list, None):
-            raise ValueError("Can't load into build-in value - {!r} into type {}")
-
-        if not isinstance(raw_val, dict):
-            raise ValueError("Can't load path {!r} into python type. Raw value not dict".format(path))
-
-        if not all(isinstance(key, str) for key in raw_val.keys()):
-            raise ValueError("Can't load path {!r} into python type.".format(path) +
-                             "Raw not all keys in raw value is strings")
-
-        obj = obj_class.__new__(obj_class)  # type: ObjClass
-        obj.__dict__.update(raw_val)
-        return obj
-
     def load_list(self, obj_class: Type[ObjClass], *path: str) -> List[ObjClass]:
         path_s = "/".join(path)
-        raw_val = self[path_s]
+        raw_val = cast(List[Dict[str, Any]], self.get(path_s))
         assert isinstance(raw_val, list)
-        return [self.construct(path_s, val, obj_class) for val in cast(list, raw_val)]
+        return [obj_class.fromraw(val) for val in raw_val]
 
     def load(self, obj_class: Type[ObjClass], *path: str) -> ObjClass:
         path_s = "/".join(path)
-        return self.construct(path_s, cast(Dict, self[path_s]), obj_class)
+        return obj_class.fromraw(self.get(path_s))
 
-    def get_stream(self, path: str, mode: str = "r") -> IO:
-        return self.storage.get_stream(path, mode)
-
-    def get(self, path: Union[str, Iterable[str]], default: Any = None) -> Any:
-        if not isinstance(path, str):
-            path = "/".join(path)
-
-        try:
-            return self[path]
-        except Exception:
-            return default
+    def sync(self) -> None:
+        self.db.sync()
+        self.fs.sync()
 
     def __enter__(self) -> 'Storage':
         return self
 
     def __exit__(self, x: Any, y: Any, z: Any) -> None:
-        return
+        self.sync()
 
 
 def make_storage(url: str, existing: bool = False) -> Storage:
-    return Storage(FSStorage(url, existing), YAMLSerializer())
+    return Storage(FSStorage(url, existing),
+                   DBStorage(os.path.join(url, DB_REL_PATH)),
+                   SAFEYAMLSerializer())
 
diff --git a/wally/storage_structure.txt b/wally/storage_structure.txt
index 307c069..460214c 100644
--- a/wally/storage_structure.txt
+++ b/wally/storage_structure.txt
@@ -1,5 +1,6 @@
 config: Config - full configuration
 all_nodes: List[NodeInfo] - all nodes
+all_results: List[Tuple[str, str, str]] - (test_type, test_summary, result_path)
 cli: List[str] - cli options
 spawned_nodes_ids: List[int] - list of openstack VM, spawned for test
 
@@ -14,7 +15,7 @@
 
 # test results
 result/{descr}_{id}/info : TestInfo - test information: name, cluster config, test parameters, etc.
-result/{descr}_{id}/measurement/{node}/{name}_raw : bytes - raw log
+result/{descr}_{id}/measurement/{node}/{name}_raw : bytes - raw log, where name from {'bw', 'iops', 'lat'}
 result/{descr}_{id}/measurement/{node}/{name}_data - List[uint64] - measurements data.
 result/{descr}_{id}/measurement/{node}/{name}_meta - Dict[str, Any] - measurements metadata.
 
diff --git a/wally/suits/io/fio.py b/wally/suits/io/fio.py
index ee34af4..fb18165 100644
--- a/wally/suits/io/fio.py
+++ b/wally/suits/io/fio.py
@@ -1,7 +1,7 @@
 import array
 import os.path
 import logging
-from typing import cast, Dict
+from typing import cast, Any
 
 import wally
 
@@ -12,7 +12,7 @@
 from .fio_task_parser import execution_time, fio_cfg_compile, FioJobSection, FioParams, get_log_files, get_test_summary
 from . import rpc_plugin
 from .fio_hist import expected_lat_bins
-from ...storage import Storage
+
 
 logger = logging.getLogger("wally")
 
@@ -21,6 +21,7 @@
     soft_runcycle = 5 * 60
     retry_time = 30
     configs_dir = os.path.dirname(__file__)  # type: str
+    name = 'fio'
 
     def __init__(self, *args, **kwargs) -> None:
         super().__init__(*args, **kwargs)
@@ -136,8 +137,9 @@
         node.put_to_file(self.remote_task_file, str(iter_config).encode("utf8"))
 
     # TODO: get a link to substorage as a parameter
-    def run_iteration(self, node: IRPCNode, iter_config: IterationConfig, substorage: Storage) -> NodeTestResults:
-        exec_time = execution_time(cast(FioJobSection, iter_config))
+    def run_iteration(self, node: IRPCNode, iter_config: IterationConfig, stor_prefix: str) -> NodeTestResults:
+        f_iter_config = cast(FioJobSection, iter_config)
+        exec_time = execution_time(f_iter_config)
 
         fio_cmd_templ = "cd {exec_folder}; " + \
                         "{fio_path} --output-format=json --output={out_file} --alloc-size=262144 {job_file}"
@@ -151,12 +153,10 @@
         if must_be_empty:
             logger.error("Unexpected fio output: %r", must_be_empty)
 
-        res = NodeTestResults(self.__class__.__name__,
-                              node.info.node_id(),
-                              get_test_summary(cast(FioJobSection, iter_config)))
+        res = NodeTestResults(self.__class__.__name__, node.info.node_id(), get_test_summary(f_iter_config))
 
         res.extra_logs['fio'] = node.get_file_content(self.remote_output_file)
-        substorage.store_raw(res.extra_logs['fio'], "fio_raw")
+        self.store_data(res.extra_logs['fio'], "raw", stor_prefix, "fio_raw")
         node.conn.fs.unlink(self.remote_output_file)
 
         files = [name for name in node.conn.fs.listdir(self.exec_folder)]
@@ -164,7 +164,7 @@
         expected_time_delta = 1000  # 1000ms == 1s
         max_time_diff = 50  # 50ms - 5%
 
-        for name, path in get_log_files(cast(FioJobSection, iter_config)):
+        for name, path in get_log_files(f_iter_config):
             log_files = [fname for fname in files if fname.startswith(path)]
             if len(log_files) != 1:
                 logger.error("Found %s files, match log pattern %s(%s) - %s",
@@ -173,7 +173,7 @@
 
             fname = os.path.join(self.exec_folder, log_files[0])
             raw_result = node.get_file_content(fname)  # type: bytes
-            substorage.store_raw(raw_result, "{}_raw".format(name))
+            self.store_data(raw_result, "raw", stor_prefix, "{}_raw".format(name))
             node.conn.fs.unlink(fname)
 
             try:
@@ -222,7 +222,10 @@
                                          step=expected_time_delta,
                                          data=parsed)
 
-            substorage.set_array(parsed, "{}_data".format(name))  # type: ignore
-            substorage["{}_meta".format(name)] = res.series[name].meta()  # type: ignore
+            self.store_data(parsed, "array", stor_prefix, "{}_data".format(name))
+            self.store_data(res.series[name].meta(), "yaml", stor_prefix, "{}_meta".format(name))
 
         return res
+
+    def format_for_console(self, data: Any) -> str:
+        raise NotImplementedError()
diff --git a/wally/suits/io/fio_task_parser.py b/wally/suits/io/fio_task_parser.py
index 8390e3a..6790c97 100644
--- a/wally/suits/io/fio_task_parser.py
+++ b/wally/suits/io/fio_task_parser.py
@@ -11,6 +11,7 @@
 from collections import OrderedDict
 
 
+from ...result_classes import IStorable
 from ..itest import IterationConfig
 from ...utils import sec_to_str, ssize2b
 
@@ -37,7 +38,9 @@
                        ("vm_count", int)])
 
 
-class FioJobSection(IterationConfig):
+class FioJobSection(IterationConfig, IStorable):
+    yaml_tag = 'fio_job'
+
     def __init__(self, name: str) -> None:
         self.name = name
         self.vals = OrderedDict()  # type: Dict[str, Any]
@@ -67,6 +70,20 @@
 
         return res
 
+    def raw(self) -> Dict[str, Any]:
+        return {
+            'name': self.name,
+            'vals': list(map(list, self.vals.items())),
+            'summary': self.summary
+        }
+
+    @classmethod
+    def fromraw(cls, data: Dict[str, Any]) -> 'FioJobSection':
+        obj = cls(data['name'])
+        obj.summary = data['summary']
+        obj.vals.update(data['vals'])
+        return obj
+
 
 class ParseError(ValueError):
     def __init__(self, msg: str, fname: str, lineno: int, line_cont:Optional[str] = "") -> None:
diff --git a/wally/suits/io/rrd.cfg b/wally/suits/io/rrd.cfg
index 1075aea..d32d6a8 100644
--- a/wally/suits/io/rrd.cfg
+++ b/wally/suits/io/rrd.cfg
@@ -1,8 +1,14 @@
 [global]
 include defaults_qd.cfg
-ramp_time=5
-runtime=5
+ramp_time=0
+runtime=4
 
 [test_{TEST_SUMM}]
 blocksize=60k
 rw=randread
+iodepth=1
+
+[test_{TEST_SUMM}]
+iodepth=16
+blocksize=60k
+rw=randread
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index aae475c..78986f6 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -3,14 +3,15 @@
 import logging
 import os.path
 import datetime
-from typing import Dict, Any, List, Optional, Tuple, cast
+from typing import Dict, Any, List, Optional, Callable, cast
 
 from concurrent.futures import ThreadPoolExecutor, wait
 
 from ..utils import Barrier, StopTestError, sec_to_str
 from ..node_interfaces import IRPCNode
 from ..storage import Storage
-from ..result_classes import NodeTestResults, TimeSerie
+from ..result_classes import NodeTestResults, IStorable
+from queue import Queue
 
 
 logger = logging.getLogger("wally")
@@ -45,22 +46,23 @@
         self.remote_dir = remote_dir
 
 
-class IterationConfig:
+class IterationConfig(IStorable):
     name = None  # type: str
     summary = None  # type: str
 
 
-class PerfTest:
+class PerfTest(metaclass=abc.ABCMeta):
     """Base class for all tests"""
     name = None  # type: str
     max_retry = 3
     retry_time = 30
 
-    def __init__(self, config: TestInputConfig) -> None:
+    def __init__(self, config: TestInputConfig, on_idle: Callable[[], None] = None) -> None:
         self.config = config
         self.stop_requested = False
         self.nodes = self.config.nodes  # type: List[IRPCNode]
         self.sorted_nodes_ids = sorted(node.info.node_id() for node in self.nodes)
+        self.on_idle = on_idle
 
     def request_stop(self) -> None:
         self.stop_requested = True
@@ -85,29 +87,22 @@
     max_time_diff = 5
     max_rel_time_diff = 0.05
 
-    def __init__(self, config: TestInputConfig) -> None:
-        PerfTest.__init__(self, config)
+    def __init__(self, *args, **kwargs) -> None:
+        PerfTest.__init__(self, *args, **kwargs)
         self.iterations_configs = [None]  # type: List[Optional[IterationConfig]]
+        self.storage_q = Queue()  # type: Any
 
     @abc.abstractmethod
     def get_expected_runtime(self, iter_cfg: IterationConfig) -> Optional[int]:
         pass
 
     def get_not_done_stages(self, storage: Storage) -> Dict[int, IterationConfig]:
-        done_stages = [int(name_id.split("_")[1])
-                       for is_leaf, name_id in storage.list('result')
-                       if not is_leaf]
-        if len(done_stages) == 0:
-            start_run_id = 0
-        else:
-            start_run_id = max(done_stages) + 1
+        not_done = {}  # type: Dict[int, IterationConfig]
 
-        not_in_storage = {}  # type: Dict[int, IterationConfig]
-
-        for run_id, iteration_config in enumerate(self.iterations_configs[start_run_id:], start_run_id):
+        for run_id, iteration_config in enumerate(self.iterations_configs):
             info_path = "result/{}/info".format(run_id)
             if info_path in storage:
-                info = cast(Dict[str, Any], storage[info_path]) # type: Dict[str, Any]
+                info = cast(Dict[str, Any], storage.get(info_path)) # type: Dict[str, Any]
 
                 assert isinstance(info, dict), \
                     "Broken storage at path {}. Expect test info dict, obtain {!r}".format(info_path, info)
@@ -120,7 +115,7 @@
                 expected_config = {
                     'name': self.name,
                     'iteration_name': iter_name,
-                    'iteration_config': iteration_config,
+                    'iteration_config': iteration_config.raw(),
                     'params': self.config.params,
                     'nodes': self.sorted_nodes_ids
                 }
@@ -132,8 +127,9 @@
 
                 logger.info("Test iteration {} found in storage and will be skipped".format(iter_name))
             else:
-                not_in_storage[run_id] = iteration_config
-        return not_in_storage
+                not_done[run_id] = iteration_config
+
+        return not_done
 
     def run(self) -> None:
         not_in_storage = self.get_not_done_stages(self.config.storage)
@@ -162,6 +158,7 @@
                 iter_name = "Unnamed" if iteration_config is None else iteration_config.name
                 logger.info("Run test iteration %s", iter_name)
 
+                current_result_path = "result/{}_{}".format(iteration_config.summary, run_id)
                 results = []  # type: List[NodeTestResults]
                 for idx in range(self.max_retry):
                     logger.debug("Prepare iteration %s", iter_name)
@@ -175,12 +172,8 @@
                     try:
                         futures = []
                         for node in self.nodes:
-                            path = "result/{}_{}/measurement/{}".format(iteration_config.summary,
-                                                                        run_id,
-                                                                        node.info.node_id())
-                            self.config.storage.clear(path)
-                            mstorage = self.config.storage.sub_storage(path)
-                            futures.append(pool.submit(self.run_iteration, node, iteration_config, mstorage))
+                            path = "{}/measurement/{}".format(current_result_path, node.info.node_id())
+                            futures.append(pool.submit(self.run_iteration, node, iteration_config, path))
 
                         results = [fut.result() for fut in futures]
                         break
@@ -196,11 +189,7 @@
                 stop_times = []  # type: List[int]
 
                 # TODO: FIX result processing - NodeTestResults
-                result = None  # type: NodeTestResults
                 for result in results:
-                    mstorage = self.config.storage.sub_storage("result/{}_{}/measurement/{}"
-                                                               .format(result.summary, run_id, result.node_id))
-                    serie = None   # type: TimeSerie
                     for name, serie in result.series.items():
                         start_times.append(serie.start_at)
                         stop_times.append(serie.step * len(serie.data))
@@ -224,14 +213,43 @@
                 test_config = {
                     'name': self.name,
                     'iteration_name': iter_name,
-                    'iteration_config': iteration_config,
+                    'iteration_config': iteration_config.raw(),
                     'params': self.config.params,
                     'nodes': self.sorted_nodes_ids,
                     'begin_time': min_start_time,
                     'end_time': max_stop_time
                 }
 
-                self.config.storage["result", str(run_id), "info"] = test_config  # type: ignore
+                self.process_storage_queue()
+                self.config.storage.put(test_config, "result", str(run_id), "info")
+
+                if "all_results" in self.config.storage:
+                    all_results = self.config.storage.get("all_results")
+                else:
+                    all_results = []
+
+                all_results.append([self.name, iteration_config.summary, current_result_path])
+                self.config.storage.put(all_results, "all_results")
+                self.config.storage.sync()
+
+                if self.on_idle is not None:
+                    self.on_idle()
+
+    def store_data(self, val: Any, type: str, prefix: str, *path: str) -> None:
+        self.storage_q.put((val, type, prefix, path))
+
+    def process_storage_queue(self) -> None:
+        while not self.storage_q.empty():
+            value, val_type, subpath, val_path = self.storage_q.get()
+            if val_type == 'raw':
+                self.config.storage.put_raw(value, subpath, *val_path)
+            elif val_type == 'yaml':
+                self.config.storage.put(value, subpath, *val_path)
+            elif val_type == 'array':
+                self.config.storage.put_array(value, subpath, *val_path)
+            else:
+                logger.error("Internal logic error - unknown data stop type {!r}".format(val_path))
+                raise StopTestError()
 
     @abc.abstractmethod
     def config_node(self, node: IRPCNode) -> None:
@@ -242,7 +260,7 @@
         pass
 
     @abc.abstractmethod
-    def run_iteration(self, node: IRPCNode, iter_config: IterationConfig, substorage: Storage) -> NodeTestResults:
+    def run_iteration(self, node: IRPCNode, iter_config: IterationConfig, stor_prefix: str) -> NodeTestResults:
         pass
 
 
@@ -269,7 +287,7 @@
     def prepare_iteration(self, node: IRPCNode, iter_config: IterationConfig) -> None:
         pass
 
-    def run_iteration(self, node: IRPCNode, iter_config: IterationConfig, substorage: Storage) -> NodeTestResults:
+    def run_iteration(self, node: IRPCNode, iter_config: IterationConfig, stor_prefix: str) -> NodeTestResults:
         # TODO: have to store logs
         cmd = self.join_remote(self.run_script)
         cmd += ' ' + self.config.params.get('run_opts', '')