working on reporting, this commit represent broking code state
diff --git a/wally/common_types.py b/wally/common_types.py
index ce497ad..a4805c3 100644
--- a/wally/common_types.py
+++ b/wally/common_types.py
@@ -1,11 +1,40 @@
-from typing import NamedTuple, Dict, Any
+import abc
+from typing import Any, Union, List, Dict, NamedTuple
 
-from .istorable import IStorable
 
 IP = str
 IPAddr = NamedTuple("IPAddr", [("host", IP), ("port", int)])
 
 
+class IStorable(metaclass=abc.ABCMeta):
+    """Interface for type, which can be stored"""
+
+    @abc.abstractmethod
+    def raw(self) -> Dict[str, Any]:
+        pass
+
+    @classmethod
+    def fromraw(cls, data: Dict[str, Any]) -> 'IStorable':
+        pass
+
+
+Basic = Union[int, str, bytes, bool, None]
+StorableType = Union[IStorable, Dict[str, Any], List[Any], int, str, bytes, bool, None]
+
+
+class Storable(IStorable):
+    """Default implementation"""
+
+    def raw(self) -> Dict[str, Any]:
+        return {name: val for name, val in self.__dict__.items() if not name.startswith("_")}
+
+    @classmethod
+    def fromraw(cls, data: Dict[str, Any]) -> 'IStorable':
+        obj = cls.__new__(cls)
+        obj.__dict__.update(data)
+        return obj
+
+
 class ConnCreds(IStorable):
     def __init__(self, host: str, user: str, passwd: str = None, port: str = '22',
                  key_file: str = None, key: bytes = None) -> None:
diff --git a/wally/config.py b/wally/config.py
index 7554fb8..2178d0c 100644
--- a/wally/config.py
+++ b/wally/config.py
@@ -1,5 +1,6 @@
 from typing import Any, Dict, Optional
-from .result_classes import IStorable
+
+from .common_types import IStorable
 
 
 ConfigBlock = Dict[str, Any]
diff --git a/wally/hlstorage.py b/wally/hlstorage.py
new file mode 100644
index 0000000..c71159f
--- /dev/null
+++ b/wally/hlstorage.py
@@ -0,0 +1,205 @@
+import re
+import os
+import array
+import struct
+import logging
+from typing import cast, Iterator, Tuple, Type, Dict, Set, List, Optional
+
+import numpy
+
+from .result_classes import (TestSuiteConfig, TestJobConfig, TimeSeries, DataSource,
+                             StatProps, IResultStorage)
+from .storage import Storage
+from .utils import StopTestError
+from .suits.all_suits import all_suits
+
+
+logger = logging.getLogger('wally')
+
+
+class DB_re:
+    node_id = r'\d+.\d+.\d+.\d+:\d+'
+    job_id = r'[-a-zA-Z0-9]+_\d+'
+    sensor = r'[a-z_]+'
+    dev = r'[-a-zA-Z0-9_]+'
+    suite_id = r'[a-z]+_\d+'
+    tag = r'[a-z_.]+'
+
+
+class DB_paths:
+    suite_cfg_r = r'results/{suite_id}_info\.yml'
+    suite_cfg = suite_cfg_r.replace("\\.", '.')
+
+    job_cfg_r = r'results/{suite_id}\.{job_id}/info\.yml'
+    job_cfg = job_cfg_r.replace("\\.", '.')
+
+    job_extra_r = r'results/{suite_id}\.{job_id}/{node_id}/{dev}\.{sensor}\.{tag}'
+    job_extra = job_extra_r.replace("\\.", '.')
+
+    ts_r = r'results/{suite_id}\.{job_id}/{node_id}/{dev}\.{sensor}\.{tag}'
+    ts = ts_r.replace("\\.", '.')
+
+    stat_r = r'results/{suite_id}\.{job_id}/{node_id}/{dev}\.{sensor}\.{tag}'
+    stat = stat_r.replace("\\.", '.')
+
+    plot_r = r'report/{suite_id}\.{job_id}/{node_id}/{dev}\.{sensor}\.{tag}'
+    plot = plot_r.replace("\\.", '.')
+
+    report = r'report/'
+
+
+DB_rr = {name: r"(?P<{}>{})".format(name, rr) for name, rr in DB_re.__dict__.items() if not name.startswith("__")}
+
+
+class ResultStorage(IResultStorage):
+    # TODO: check that all path components match required patterns
+
+    ts_header_format = "!IIIcc"
+    ts_arr_tag = 'bin'
+    ts_raw_tag = 'txt'
+
+    def __init__(self, storage: Storage) -> None:
+        self.storage = storage
+
+    def sync(self) -> None:
+        self.storage.sync()
+
+    def put_or_check_suite(self, suite: TestSuiteConfig) -> None:
+        path = DB_paths.suite_cfg.format(suite_id=suite.storage_id)
+        if path in self.storage:
+            db_cfg = self.storage.get(path)
+            if db_cfg != suite:
+                logger.error("Current suite %s config is not equal to found in storage at %s", suite.test_type, path)
+                raise StopTestError()
+
+        self.storage.put(suite, path)
+
+    def put_job(self, suite: TestSuiteConfig, job: TestJobConfig) -> None:
+        path = DB_paths.job_cfg.format(suite_id=suite.storage_id, job_id=job.storage_id)
+        self.storage.put(job, path)
+
+    def put_ts(self, ts: TimeSeries) -> None:
+        data = cast(List[int], ts.data)
+        times = cast(List[int], ts.times)
+
+        if len(data) % ts.second_axis_size != 0:
+            logger.error("Time series data size(%s) is not propotional to second_axis_size(%s).",
+                         len(data), ts.second_axis_size)
+            raise StopTestError()
+
+        if len(data) // ts.second_axis_size != len(times):
+            logger.error("Unbalanced data and time srray sizes. %s", ts)
+            raise StopTestError()
+
+        bin_path = DB_paths.ts.format(**ts.source(tag=self.ts_arr_tag).__dict__)
+
+        with self.storage.get_fd(bin_path, "cb") as fd:
+            header = struct.pack(self.ts_header_format,
+                                 ts.second_axis_size,
+                                 len(data),
+                                 len(times),
+                                 ts.data.typecode.encode("ascii"),
+                                 ts.times.typecode.encode("ascii"))
+            fd.write(header)
+            ts.data.tofile(fd)  # type: ignore
+            ts.times.tofile(fd)  # type: ignore
+
+        if ts.raw:
+            raw_path = DB_paths.ts.format(**ts.source(tag=self.ts_raw_tag).__dict__)
+            self.storage.put_raw(ts.raw, raw_path)
+
+    def put_extra(self, data: bytes, source: DataSource) -> None:
+        path = DB_paths.job_cfg.format(**source.__dict__)
+        self.storage.put_raw(data, path)
+
+    def put_stat(self, data: StatProps, source: DataSource) -> None:
+        path = DB_paths.stat.format(**source.__dict__)
+        self.storage.put(data, path)
+
+    def get_stat(self, stat_cls: Type[StatProps], source: DataSource) -> StatProps:
+        path = DB_paths.stat.format(**source.__dict__)
+        return self.storage.load(stat_cls, path)
+
+    def iter_paths(self, path_glob) -> Iterator[Tuple[bool, str, Dict[str, str]]]:
+        path = path_glob.format(**DB_rr).split("/")
+        yield from self.storage._iter_paths("", path, {})
+
+    def iter_suite(self, suite_type: str = None) -> Iterator[TestSuiteConfig]:
+        for is_file, suite_info_path, groups in self.iter_paths(DB_paths.suite_cfg_r):
+            assert is_file
+            suite = cast(TestSuiteConfig, self.storage.load(TestSuiteConfig, suite_info_path))
+            assert suite.storage_id == groups['suite_id']
+            if not suite_type or suite.test_type == suite_type:
+                yield suite
+
+    def iter_job(self, suite: TestSuiteConfig) -> Iterator[TestJobConfig]:
+        job_glob = DB_paths.job_cfg_r.replace('{suite_id}', suite.storage_id)
+        job_config_cls = all_suits[suite.test_type].job_config_cls
+
+        for is_file, path, groups in self.iter_paths(job_glob):
+            assert is_file
+            job = cast(TestJobConfig, self.storage.load(job_config_cls, path))
+            assert job.storage_id == groups['job_id']
+            yield job
+
+    def iter_datasource(self, suite: TestSuiteConfig, job: TestJobConfig) -> Iterator[Tuple[DataSource, Dict[str, str]]]:
+        ts_glob = DB_paths.ts_r.replace('{suite_id}', suite.storage_id).replace('{job_id}', job.storage_id)
+        ts_found = {}  # type: Dict[Tuple[str, str, str], Dict[str, str]]
+
+        for is_file, path, groups in self.iter_paths(ts_glob):
+            assert is_file
+            key = (groups['node_id'], groups['dev'], groups['sensor'])
+            ts_found.setdefault(key, {})[groups['tag']] = path
+
+        for (node_id, dev, sensor), tag2path in ts_found.items():
+            if self.ts_arr_tag in tag2path:
+                yield DataSource(suite_id=suite.storage_id,
+                                 job_id=job.storage_id,
+                                 node_id=node_id,
+                                 dev=dev, sensor=sensor, tag=None), tag2path
+
+    def load_ts(self, ds: DataSource, path: str) -> TimeSeries:
+        with self.storage.get_fd(path, "rb") as fd:
+            header = fd.read(struct.calcsize(self.ts_header_format))
+            second_axis_size, data_sz, time_sz, data_typecode, time_typecode = \
+                struct.unpack(self.ts_header_format, header)
+
+            data = array.array(data_typecode.decode("ascii"))
+            times = array.array(time_typecode.decode("ascii"))
+
+            data.fromfile(fd, data_sz)  # type: ignore
+            times.fromfile(fd, time_sz)  # type: ignore
+
+        return TimeSeries("{}.{}".format(ds.dev, ds.sensor),
+                          raw=None,
+                          data=numpy.array(data, dtype=numpy.dtype('float32')),
+                          times=numpy.array(times),
+                          second_axis_size=second_axis_size,
+                          source=ds)
+
+    def iter_ts(self, suite: TestSuiteConfig, job: TestJobConfig, **filters) -> Iterator[TimeSeries]:
+        for ds, tag2path in self.iter_datasource(suite, job):
+            for name, val in filters.items():
+                if val != getattr(ds, name):
+                    break
+            else:
+                ts = self.load_ts(ds, tag2path[self.ts_arr_tag])
+                if self.ts_raw_tag in tag2path:
+                    ts.raw = self.storage.get_raw(tag2path[self.ts_raw_tag])
+
+                yield ts
+
+    # return path to file to be inserted into report
+    def put_plot_file(self, data: bytes, source: DataSource) -> str:
+        path = DB_paths.plot.format(**source.__dict__)
+        return cast(str, self.storage.put_raw(data, path))
+
+    def check_plot_file(self, source: DataSource) -> Optional[str]:
+        path = DB_paths.plot.format(**source.__dict__)
+        fpath = self.storage.resolve_raw(path)
+        if os.path.exists(fpath):
+            return fpath
+        return None
+
+    def put_report(self, report: str, name: str) -> str:
+        return self.storage.put_raw(report.encode("utf8"), DB_paths.report + name)
diff --git a/wally/html.py b/wally/html.py
new file mode 100644
index 0000000..e92e7d1
--- /dev/null
+++ b/wally/html.py
@@ -0,0 +1,2 @@
+def img(link):
+    return '<img src="{}">'.format(link)
diff --git a/wally/hw_info.py b/wally/hw_info.py
index aa53f8e..9da8cb7 100644
--- a/wally/hw_info.py
+++ b/wally/hw_info.py
@@ -5,6 +5,7 @@
 from typing import List, Tuple, cast, Optional
 
 from . import utils
+from .node_utils import get_os
 from .node_interfaces import IRPCNode
 
 
@@ -130,7 +131,7 @@
 def get_sw_info(node: IRPCNode) -> SWInfo:
     res = SWInfo()
 
-    res.OS_version = utils.get_os(node)
+    res.OS_version = get_os(node)
     res.kernel_version = node.get_file_content('/proc/version').decode('utf8').strip()
     res.mtab = node.get_file_content('/etc/mtab').decode('utf8').strip()
 
@@ -159,7 +160,7 @@
     try:
         lshw_out = node.run('sudo lshw -xml 2>/dev/null')
     except Exception as exc:
-        logger.warning("lshw failed on node %s: %s", node.info.node_id(), exc)
+        logger.warning("lshw failed on node %s: %s", node.node_id, exc)
         return None
 
     res = HWInfo()
diff --git a/wally/istorable.py b/wally/istorable.py
deleted file mode 100644
index 467f901..0000000
--- a/wally/istorable.py
+++ /dev/null
@@ -1,31 +0,0 @@
-import abc
-from typing import Any, Union, List, Dict
-
-
-class IStorable(metaclass=abc.ABCMeta):
-    """Interface for type, which can be stored"""
-
-    @abc.abstractmethod
-    def raw(self) -> Dict[str, Any]:
-        pass
-
-    @abc.abstractclassmethod
-    def fromraw(cls, data: Dict[str, Any]) -> 'IStorable':
-        pass
-
-
-class Storable(IStorable):
-    """Default implementation"""
-
-    def raw(self) -> Dict[str, Any]:
-        return self.__dict__
-
-    @classmethod
-    def fromraw(cls, data: Dict[str, Any]) -> 'IStorable':
-        obj = cls.__new__(cls)
-        obj.__dict__.update(data)
-        return obj
-
-
-Basic = Union[int, str, bytes, bool, None]
-StorableType = Union[IStorable, Dict[str, Any], List[Any], int, str, bytes, bool, None]
diff --git a/wally/main.py b/wally/main.py
index 0553b4e..fd9b5a0 100644
--- a/wally/main.py
+++ b/wally/main.py
@@ -4,8 +4,10 @@
 import pprint
 import getpass
 import logging
+import tempfile
 import argparse
 import functools
+import subprocess
 import contextlib
 from typing import List, Tuple, Any, Callable, IO, cast, Optional, Iterator
 from yaml import load as _yaml_load
@@ -30,6 +32,7 @@
     faulthandler = None
 
 from . import utils, node
+from .node_utils import log_nodes_statistic
 from .storage import make_storage, Storage
 from .config import Config
 from .logger import setup_loggers
@@ -91,7 +94,7 @@
 
 
 def log_nodes_statistic_stage(ctx: TestRun) -> None:
-    utils.log_nodes_statistic(ctx.nodes)
+    log_nodes_statistic(ctx.nodes)
 
 
 def parse_args(argv):
@@ -121,6 +124,15 @@
     report_parser.add_argument("data_dir", help="folder with rest results")
 
     # ---------------------------------------------------------------------
+    ipython_help = 'run ipython in prepared environment'
+    ipython_parser = subparsers.add_parser('ipython', help=ipython_help)
+    ipython_parser.add_argument("storage_dir", help="Storage path")
+    # ---------------------------------------------------------------------
+    jupyter_help = 'run ipython in prepared environment'
+    jupyter_parser = subparsers.add_parser('jupyter', help=jupyter_help)
+    jupyter_parser.add_argument("storage_dir", help="Storage path")
+
+    # ---------------------------------------------------------------------
     test_parser = subparsers.add_parser('test', help='run tests')
     test_parser.add_argument('--build-description', type=str, default="Build info")
     test_parser.add_argument('--build-id', type=str, default="id")
@@ -141,7 +153,7 @@
     test_parser.add_argument("storage_dir", help="Path to test directory")
 
     # ---------------------------------------------------------------------
-    test_parser = subparsers.add_parser('db', help='resume tests')
+    test_parser = subparsers.add_parser('db', help='Exec command on DB')
     test_parser.add_argument("cmd", choices=("show",), help="Command to execute")
     test_parser.add_argument("params", nargs='*', help="Command params")
     test_parser.add_argument("storage_dir", help="Storage path")
@@ -206,6 +218,48 @@
             PrepareNodes()]
 
 
+notebook_kern = """
+{
+ "cells": [
+  {
+   "cell_type": "code",
+   "execution_count": 1,
+   "metadata": {
+    "collapsed": true
+   },
+   "outputs": [],
+   "source": [
+    "from wally.storage import make_storage\n",
+    "from wally.hlstorage import ResultStorage\n"
+    "storage = make_storage(\"$STORAGE\", existing=True)\n",
+    "rstorage = ResultStorage(storage=storage)\n"
+   ]
+  }
+ ],
+ "metadata": {
+  "kernelspec": {
+   "display_name": "Python 3",
+   "language": "python",
+   "name": "python3"
+  },
+  "language_info": {
+   "codemirror_mode": {
+    "name": "ipython",
+    "version": 3
+   },
+   "file_extension": ".py",
+   "mimetype": "text/x-python",
+   "name": "python",
+   "nbconvert_exporter": "python",
+   "pygments_lexer": "ipython3",
+   "version": "3.5.2"
+  }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 2
+}"""
+
+
 def main(argv: List[str]) -> int:
     if faulthandler is not None:
         faulthandler.register(signal.SIGUSR1, all_threads=True)
@@ -250,7 +304,8 @@
         config = storage.load(Config, 'config')
         stages.extend(get_run_stages())
         stages.append(LoadStoredNodesStage())
-        prev_opts = storage.get('cli')
+        prev_opts = storage.get('cli')  # type: List[str]
+
         if '--ssh-key-passwd' in prev_opts and opts.ssh_key_passwd:
             prev_opts[prev_opts.index("--ssh-key-passwd") + 1] = opts.ssh_key_passwd
 
@@ -293,11 +348,26 @@
             print("Unknown/not_implemented command {!r}".format(opts.cmd))
             return 1
         return 0
+    elif opts.subparser_name == 'ipython':
+        storage = make_storage(opts.storage_dir, existing=True)
+        from .hlstorage import ResultStorage
+        rstorage = ResultStorage(storage=storage)
+
+        import IPython
+        IPython.embed()
+
+        return 0
+    elif opts.subparser_name == 'jupyter':
+        with tempfile.NamedTemporaryFile() as fd:
+            fd.write(notebook_kern.replace("$STORAGE", opts.storage_dir))
+            subprocess.call("jupyter notebook ", shell=True)
+        return 0
+
 
     report_stages = []  # type: List[Stage]
     if not getattr(opts, "no_report", False):
-        report_stages.append(CalcStatisticStage())
-        report_stages.append(ConsoleReportStage())
+        # report_stages.append(CalcStatisticStage())
+        # report_stages.append(ConsoleReportStage())
         report_stages.append(HtmlReportStage())
 
     # log level is not a part of config
diff --git a/wally/node.py b/wally/node.py
index d4da52a..38342c6 100644
--- a/wally/node.py
+++ b/wally/node.py
@@ -1,4 +1,5 @@
 import os
+import zlib
 import time
 import json
 import socket
@@ -25,7 +26,11 @@
         self.info = info
 
     def __str__(self) -> str:
-        return self.info.node_id()
+        return self.node_id
+
+    @property
+    def node_id(self) -> str:
+        return self.info.node_id
 
     def put_to_file(self, path: Optional[str], content: bytes) -> str:
         if path is None:
@@ -163,17 +168,19 @@
     def __repr__(self) -> str:
         return str(self)
 
-    def get_file_content(self, path: str, expanduser: bool = False) -> bytes:
+    def get_file_content(self, path: str, expanduser: bool = False, compress: bool = True) -> bytes:
         logger.debug("GET %s from %s", path, self.info)
         if expanduser:
             path = self.conn.fs.expanduser(path)
-        res = self.conn.fs.get_file(path)
+        res = self.conn.fs.get_file(path, compress)
         logger.debug("Download %s bytes from remote file %s from %s", len(res), path, self.info)
+        if compress:
+            res = zlib.decompress(res)
         return res
 
     def run(self, cmd: str, timeout: int = 60, nolog: bool = False, check_timeout: float = 0.01) -> str:
         if not nolog:
-            logger.debug("Node %s - run %s", self.info.node_id(), cmd)
+            logger.debug("Node %s - run %s", self.node_id, cmd)
 
         cmd_b = cmd.encode("utf8")
         proc_id = self.conn.cli.spawn(cmd_b, timeout=timeout, merge_out=True)
@@ -188,21 +195,26 @@
 
         if code != 0:
             templ = "Node {} - cmd {!r} failed with code {}. Output: {!r}."
-            raise OSError(templ.format(self.info.node_id(), cmd, code, out))
+            raise OSError(templ.format(self.node_id, cmd, code, out))
 
         return out
 
-    def copy_file(self, local_path: str, remote_path: str = None, expanduser: bool = False) -> str:
+    def copy_file(self, local_path: str, remote_path: str = None,
+                  expanduser: bool = False,
+                  compress: bool = False) -> str:
+
         if expanduser:
             remote_path = self.conn.fs.expanduser(remote_path)
 
         data = open(local_path, 'rb').read()  # type: bytes
-        return self.put_to_file(remote_path, data)
+        return self.put_to_file(remote_path, data, compress=compress)
 
-    def put_to_file(self, path: Optional[str], content: bytes, expanduser: bool = False) -> str:
+    def put_to_file(self, path: Optional[str], content: bytes, expanduser: bool = False, compress: bool = False) -> str:
         if expanduser:
             path = self.conn.fs.expanduser(path)
-        return self.conn.fs.store_file(path, content)
+        if compress:
+            content = zlib.compress(content)
+        return self.conn.fs.store_file(path, content, compress)
 
     def stat_file(self, path: str, expanduser: bool = False) -> Dict[str, int]:
         if expanduser:
@@ -267,7 +279,7 @@
         cmd = "{} {} --log-level={} server --listen-addr={}:{} --daemon --show-settings"
         cmd = cmd.format(python_cmd, code_file, log_level, ip, port) + " --stdout-file={}".format(log_file)
         logger.info("Agent logs for node {} stored on node in file {}. Log level is {}".format(
-            node.info.node_id(), log_file, log_level))
+            node.node_id, log_file, log_level))
     else:
         cmd = "{} {} --log-level=CRITICAL server --listen-addr={}:{} --daemon --show-settings"
         cmd = cmd.format(python_cmd, code_file, ip, port)
diff --git a/wally/node_interfaces.py b/wally/node_interfaces.py
index 71efc54..935ca41 100644
--- a/wally/node_interfaces.py
+++ b/wally/node_interfaces.py
@@ -1,11 +1,13 @@
 import abc
-from typing import Any, Set, Dict, NamedTuple, Optional
+import logging
+from typing import Any, Set, Dict, Optional, NamedTuple
+
 from .ssh_utils import ConnCreds
-from .common_types import IPAddr
-from .istorable import IStorable
+from .common_types import IPAddr, IStorable
 
 
 RPCCreds = NamedTuple("RPCCreds", [("addr", IPAddr), ("key_file", str), ("cert_file", str)])
+logger = logging.getLogger("wally")
 
 
 class NodeInfo(IStorable):
@@ -23,11 +25,12 @@
         if params is not None:
             self.params = params
 
+    @property
     def node_id(self) -> str:
         return "{0.host}:{0.port}".format(self.ssh_creds.addr)
 
     def __str__(self) -> str:
-        return self.node_id()
+        return self.node_id
 
     def __repr__(self) -> str:
         return str(self)
@@ -89,6 +92,10 @@
     conn = None  # type: Any
     rpc_log_file = None  # type: str
 
+    @property
+    def node_id(self) -> str:
+        return self.info.node_id
+
     @abc.abstractmethod
     def __str__(self) -> str:
         pass
diff --git a/wally/node_utils.py b/wally/node_utils.py
new file mode 100644
index 0000000..e66ba44
--- /dev/null
+++ b/wally/node_utils.py
@@ -0,0 +1,56 @@
+import logging
+import collections
+from typing import Dict, Sequence, NamedTuple
+
+from .node_interfaces import IRPCNode
+
+logger = logging.getLogger("wally")
+
+
+def log_nodes_statistic(nodes: Sequence[IRPCNode]) -> None:
+    logger.info("Found {0} nodes total".format(len(nodes)))
+
+    per_role = collections.defaultdict(int)  # type: Dict[str, int]
+    for node in nodes:
+        for role in node.info.roles:
+            per_role[role] += 1
+
+    for role, count in sorted(per_role.items()):
+        logger.debug("Found {0} nodes with role {1}".format(count, role))
+
+
+
+OSRelease = NamedTuple("OSRelease",
+                       [("distro", str),
+                        ("release", str),
+                        ("arch", str)])
+
+
+def get_os(node: IRPCNode) -> OSRelease:
+    """return os type, release and architecture for node.
+    """
+    arch = node.run("arch", nolog=True).strip()
+
+    try:
+        node.run("ls -l /etc/redhat-release", nolog=True)
+        return OSRelease('redhat', None, arch)
+    except:
+        pass
+
+    try:
+        node.run("ls -l /etc/debian_version", nolog=True)
+
+        release = None
+        for line in node.run("lsb_release -a", nolog=True).split("\n"):
+            if ':' not in line:
+                continue
+            opt, val = line.split(":", 1)
+
+            if opt == 'Codename':
+                release = val.strip()
+
+        return OSRelease('ubuntu', release, arch)
+    except:
+        pass
+
+    raise RuntimeError("Unknown os")
diff --git a/wally/openstack.py b/wally/openstack.py
index 8aa9913..b7fbe31 100644
--- a/wally/openstack.py
+++ b/wally/openstack.py
@@ -155,7 +155,7 @@
                     creds = ConnCreds(host=to_ip(ip), user=user_name, key_file=private_key_path)
                     info = NodeInfo(creds, {'testnode'})
                     info.os_vm_id = vm_id
-                    nid = info.node_id()
+                    nid = info.node_id
                     if nid in ctx.nodes_info:
                         logger.error("Test VM node has the same id(%s), as existing node %s", nid, ctx.nodes_info[nid])
                         raise StopTestError()
@@ -199,7 +199,7 @@
         with ctx.get_pool() as pool:
             for info in launch_vms(ctx.os_connection, params, pool):
                 info.roles.add('testnode')
-                nid = info.node_id()
+                nid = info.node_id
                 if nid in ctx.nodes_info:
                     logger.error("Test VM node has the same id(%s), as existing node %s", nid, ctx.nodes_info[nid])
                     raise StopTestError()
diff --git a/wally/process_results.py b/wally/process_results.py
index 5ca53af..112826e 100644
--- a/wally/process_results.py
+++ b/wally/process_results.py
@@ -1,58 +1,53 @@
 # put all result preprocessing here
 # selection, aggregation
 
+from io import BytesIO
 import logging
-
+from typing import Any
 
 from .stage import Stage, StepOrder
 from .test_run_class import TestRun
 from .statistic import calc_norm_stat_props, calc_histo_stat_props
-from .result_classes import TestJobConfig
-from .suits.itest import ResultStorage
+from .result_classes import StatProps, DataSource, TimeSeries
+from .hlstorage import ResultStorage
 from .suits.io.fio_hist import get_lat_vals, expected_lat_bins
+from .suits.io.fio import FioTest
 from .utils import StopTestError
 
-logger = logging.getLogger("wally")
-
 import matplotlib
-
-# have to be before pyplot import to avoid tkinter(default graph frontend) import error
 matplotlib.use('svg')
-
 import matplotlib.pyplot as plt
 
 
+logger = logging.getLogger("wally")
+
+
 class CalcStatisticStage(Stage):
     priority = StepOrder.TEST + 1
 
     def run(self, ctx: TestRun) -> None:
-        rstorage = ResultStorage(ctx.storage, TestJobConfig)
+        rstorage = ResultStorage(ctx.storage)
 
-        for suite_cfg, path in rstorage.list_suites():
-            if suite_cfg.test_type != 'fio':
-                continue
-
-            for job_cfg, path, _ in rstorage.list_jobs_in_suite(path):
+        for suite in rstorage.iter_suite(FioTest.name):
+            for job in rstorage.iter_job(suite):
                 results = {}
-                for node_id, dev, sensor_name in rstorage.list_ts_in_job(path):
-                    ts = rstorage.load_ts(path, node_id, dev, sensor_name)
-                    if dev == 'fio' and sensor_name == 'lat':
+                for ts in rstorage.iter_ts(suite, job):
+                    if ts.source.sensor == 'lat':
                         if ts.second_axis_size != expected_lat_bins:
                             logger.error("Sensor %s.%s on node %s has" +
                                          "second_axis_size=%s. Can only process sensors with second_axis_size=%s.",
-                                         dev, sensor_name, node_id, ts.second_axis_size, expected_lat_bins)
+                                         ts.source.dev, ts.source.sensor, ts.source.node_id,
+                                         ts.second_axis_size, expected_lat_bins)
                             continue
                         ts.bins_edges = get_lat_vals(ts.second_axis_size)
-                        stat_prop = calc_histo_stat_props(ts)
+                        stat_prop = calc_histo_stat_props(ts)  # type: StatProps
 
                     elif ts.second_axis_size != 1:
                         logger.warning("Sensor %s.%s on node %s provide 2D data with " +
                                        "ts.second_axis_size=%s. Can't process it.",
-                                       dev, sensor_name, node_id, ts.second_axis_size)
+                                       ts.source.dev, ts.source.sensor, ts.source.node_id, ts.second_axis_size)
                         continue
                     else:
                         stat_prop = calc_norm_stat_props(ts)
 
-                    results[(node_id, dev, sensor_name)] = stat_prop
-
         raise StopTestError()
diff --git a/wally/report.py b/wally/report.py
index cf1289b..f8d8c5a 100644
--- a/wally/report.py
+++ b/wally/report.py
@@ -1,44 +1,90 @@
+import os
+import re
 import abc
+import bisect
 import logging
-from typing import Dict, Any, Iterator, Tuple, cast, List
+from io import BytesIO
+from functools import wraps
+from typing import Dict, Any, Iterator, Tuple, cast, List, Callable
+from collections import defaultdict
 
 import numpy
-import scipy
 import matplotlib
-
 # have to be before pyplot import to avoid tkinter(default graph frontend) import error
 matplotlib.use('svg')
-
 import matplotlib.pyplot as plt
+import scipy.stats
 
+import wally
 
-
-
-from .utils import ssize2b
+from . import html
+from .utils import b2ssize
 from .stage import Stage, StepOrder
 from .test_run_class import TestRun
-from .result_classes import NormStatProps
+from .hlstorage import ResultStorage
+from .node_interfaces import NodeInfo
+from .storage import Storage
+from .statistic import calc_norm_stat_props, calc_histo_stat_props
+from .result_classes import (StatProps, DataSource, TimeSeries, TestSuiteConfig,
+                             NormStatProps, HistoStatProps, TestJobConfig)
+from .suits.io.fio_hist import get_lat_vals, expected_lat_bins
+from .suits.io.fio import FioTest, FioJobConfig
+from .suits.io.fio_task_parser import FioTestSumm
+from .statistic import approximate_curve, average, dev
 
 
 logger = logging.getLogger("wally")
 
 
-class ConsoleReportStage(Stage):
-
-    priority = StepOrder.REPORT
-
-    def run(self, ctx: TestRun) -> None:
-        # TODO(koder): load data from storage
-        raise NotImplementedError("...")
+# ----------------  CONSTS ---------------------------------------------------------------------------------------------
 
 
-class HtmlReportStage(Stage):
+DEBUG = False
+LARGE_BLOCKS = 256
+MiB2KiB = 1024
+MS2S = 1000
 
-    priority = StepOrder.REPORT
 
-    def run(self, ctx: TestRun) -> None:
-        # TODO(koder): load data from storage
-        raise NotImplementedError("...")
+# ----------------  PROFILES  ------------------------------------------------------------------------------------------
+
+
+class ColorProfile:
+    primary_color = 'b'
+    suppl_color1 = 'teal'
+    suppl_color2 = 'magenta'
+    box_color = 'y'
+
+    noise_alpha = 0.3
+    subinfo_alpha = 0.7
+
+
+class StyleProfile:
+    grid = True
+    tide_layout = True
+    hist_boxes = 10
+    min_points_for_dev = 5
+
+    dev_range_x = 2.0
+    dev_perc = 95
+
+    avg_range = 20
+
+    curve_approx_level = 5
+    curve_approx_points = 100
+    assert avg_range >= min_points_for_dev
+
+    extra_io_spine = True
+
+    legend_for_eng = True
+
+    units = {
+        'bw': ("MiBps", MiB2KiB, "bandwith"),
+        'iops': ("IOPS", 1, "iops"),
+        'lat': ("ms", 1, "latency")
+    }
+
+
+# ----------------  STRUCTS  -------------------------------------------------------------------------------------------
 
 
 # TODO: need to be revised, have to user StatProps fields instead
@@ -63,15 +109,497 @@
         self.lat_95 = None  # type: float
 
 
+class IOSummary:
+    def __init__(self,
+                 qd: int,
+                 block_size: int,
+                 nodes_count:int,
+                 bw: NormStatProps,
+                 lat: HistoStatProps) -> None:
+
+        self.qd = qd
+        self.nodes_count = nodes_count
+        self.block_size = block_size
+
+        self.bw = bw
+        self.lat = lat
+
+
+# --------------  AGGREGATION AND STAT FUNCTIONS  ----------------------------------------------------------------------
+rexpr = {
+    'sensor': r'(?P<sensor>[-a-z]+)',
+    'dev': r'(?P<dev>[^.]+)',
+    'metric': r'(?P<metric>[a-z_]+)',
+    'node': r'(?P<node>\d+\.\d+\.\d+\.\d+:\d+)',
+}
+
+def iter_sensors(storage: Storage, node: str = None, sensor: str = None, dev: str = None, metric: str = None):
+    if node is None:
+        node = rexpr['node']
+    if sensor is None:
+        sensor = rexpr['sensor']
+    if dev is None:
+        dev = rexpr['dev']
+    if metric is None:
+        metric = rexpr['metric']
+
+    rr = r"{}_{}\.{}\.{}$".format(node, sensor, dev, metric)
+    sensor_name_re = re.compile(rr)
+
+    for is_file, sensor_data_name in storage.list("sensors"):
+        if is_file:
+            rr = sensor_name_re.match(sensor_data_name)
+            if rr:
+                yield 'sensors/' + sensor_data_name, rr.groupdict()
+
+
+def make_iosum(rstorage: ResultStorage, suite: TestSuiteConfig, job: FioJobConfig) -> IOSummary:
+    lat = get_aggregated(rstorage, suite, job, "lat")
+    bins_edges = numpy.array(get_lat_vals(lat.second_axis_size), dtype='float32') / 1000
+    io = get_aggregated(rstorage, suite, job, "bw")
+
+    return IOSummary(job.qd,
+                     nodes_count=len(suite.nodes_ids),
+                     block_size=job.bsize,
+                     lat=calc_histo_stat_props(lat, bins_edges, StyleProfile.hist_boxes),
+                     bw=calc_norm_stat_props(io, StyleProfile.hist_boxes))
+
+#
+# def iter_io_results(rstorage: ResultStorage,
+#                     qds: List[int] = None,
+#                     op_types: List[str] = None,
+#                     sync_types: List[str] = None,
+#                     block_sizes: List[int] = None) -> Iterator[Tuple[TestSuiteConfig, FioJobConfig]]:
+#
+#     for suite in rstorage.iter_suite(FioTest.name):
+#         for job in rstorage.iter_job(suite):
+#             fjob = cast(FioJobConfig, job)
+#             assert int(fjob.vals['numjobs']) == 1
+#
+#             if sync_types is not None and fjob.sync_mode in sync_types:
+#                 continue
+#
+#             if block_sizes is not None and fjob.bsize not in block_sizes:
+#                 continue
+#
+#             if op_types is not None and fjob.op_type not in op_types:
+#                 continue
+#
+#             if qds is not None and fjob.qd not in qds:
+#                 continue
+#
+#             yield suite, fjob
+
+
+def get_aggregated(rstorage: ResultStorage, suite: TestSuiteConfig, job: FioJobConfig, sensor: str) -> TimeSeries:
+    tss = list(rstorage.iter_ts(suite, job, sensor=sensor))
+    ds = DataSource(suite_id=suite.storage_id,
+                    job_id=job.storage_id,
+                    node_id="__all__",
+                    dev='fio',
+                    sensor=sensor,
+                    tag=None)
+
+    agg_ts = TimeSeries(sensor,
+                        raw=None,
+                        source=ds,
+                        data=numpy.zeros(tss[0].data.shape, dtype=tss[0].data.dtype),
+                        times=tss[0].times.copy(),
+                        second_axis_size=tss[0].second_axis_size)
+
+    for ts in tss:
+        if sensor == 'lat' and ts.second_axis_size != expected_lat_bins:
+            logger.error("Sensor %s.%s on node %s has" +
+                         "second_axis_size=%s. Can only process sensors with second_axis_size=%s.",
+                         ts.source.dev, ts.source.sensor, ts.source.node_id,
+                         ts.second_axis_size, expected_lat_bins)
+            continue
+
+        if sensor != 'lat' and ts.second_axis_size != 1:
+            logger.error("Sensor %s.%s on node %s has" +
+                         "second_axis_size=%s. Can only process sensors with second_axis_size=1.",
+                         ts.source.dev, ts.source.sensor, ts.source.node_id, ts.second_axis_size)
+            continue
+
+        # TODO: match times on different ts
+        agg_ts.data += ts.data
+
+    return agg_ts
+
+
+# --------------  PLOT HELPERS FUNCTIONS  ------------------------------------------------------------------------------
+
+def get_emb_data_svg(plt: Any) -> bytes:
+    bio = BytesIO()
+    plt.savefig(bio, format='svg')
+    img_start = "<!-- Created with matplotlib (http://matplotlib.org/) -->"
+    return bio.getvalue().decode("utf8").split(img_start, 1)[1].encode("utf8")
+
+
+def provide_plot(func: Callable[..., None]) -> Callable[..., str]:
+    @wraps(func)
+    def closure1(storage: ResultStorage, path: DataSource, *args, **kwargs) -> str:
+        fpath = storage.check_plot_file(path)
+        if not fpath:
+            func(*args, **kwargs)
+            fpath = storage.put_plot_file(get_emb_data_svg(plt), path)
+            plt.clf()
+            logger.debug("Save plot for %s to %r", path, fpath)
+        return fpath
+    return closure1
+
+
+def apply_style(style: StyleProfile, eng: bool = True, no_legend: bool = False) -> None:
+    if style.grid:
+        plt.grid(True)
+
+    if (style.legend_for_eng or not eng) and not no_legend:
+        legend_location = "center left"
+        legend_bbox_to_anchor = (1.03, 0.81)
+        plt.legend(loc=legend_location, bbox_to_anchor=legend_bbox_to_anchor)
+
+
+# --------------  PLOT FUNCTIONS  --------------------------------------------------------------------------------------
+
+
+@provide_plot
+def plot_hist(title: str, units: str,
+              prop: StatProps,
+              colors: Any = ColorProfile,
+              style: Any = StyleProfile) -> None:
+
+    # TODO: unit should came from ts
+    total = sum(prop.bins_populations)
+    mids = prop.bins_mids
+    normed_bins = [population / total for population in prop.bins_populations]
+    bar_width = mids[1] - mids[0]
+    plt.bar(mids - bar_width / 2, normed_bins, color=colors.box_color, width=bar_width, label="Real data")
+
+    plt.xlabel(units)
+    plt.ylabel("Value probability")
+    plt.title(title)
+
+    dist_plotted = False
+    if isinstance(prop, NormStatProps):
+        nprop = cast(NormStatProps, prop)
+        stats = scipy.stats.norm(nprop.average, nprop.deviation)
+
+        # xpoints = numpy.linspace(mids[0], mids[-1], style.curve_approx_points)
+        # ypoints = stats.pdf(xpoints) / style.curve_approx_points
+
+        edges, step = numpy.linspace(mids[0], mids[-1], len(mids) * 10, retstep=True)
+
+        ypoints = stats.cdf(edges) * 11
+        ypoints = [next - prev for (next, prev) in zip(ypoints[1:], ypoints[:-1])]
+        xpoints = (edges[1:] + edges[:-1]) / 2
+
+        plt.plot(xpoints, ypoints, color=colors.primary_color, label="Expected from\nnormal distribution")
+        dist_plotted = True
+
+    apply_style(style, eng=True, no_legend=not dist_plotted)
+
+
+@provide_plot
+def plot_v_over_time(title: str, units: str,
+                     ts: TimeSeries,
+                     plot_avg_dev: bool = True,
+                     colors: Any = ColorProfile, style: Any = StyleProfile) -> None:
+
+    min_time = min(ts.times)
+
+    # /1000 is us to ms conversion
+    time_points = [(val_time - min_time) / 1000 for val_time in ts.times]
+
+    alpha = colors.noise_alpha if plot_avg_dev else 1.0
+    plt.plot(time_points, ts.data, "o", color=colors.primary_color, alpha=alpha, label="Data")
+
+    if plot_avg_dev:
+        avg_vals = []
+        low_vals_dev = []
+        hight_vals_dev = []
+        avg_times = []
+        dev_times = []
+
+        start = (len(ts.data) % style.avg_range) // 2
+        points = list(range(start, len(ts.data) + 1, style.avg_range))
+
+        for begin, end in zip(points[:-1], points[1:]):
+            vals = ts.data[begin: end]
+
+            cavg = average(vals)
+            cdev = dev(vals)
+            tavg = average(time_points[begin: end])
+
+            avg_vals.append(cavg)
+            avg_times.append(tavg)
+
+            low_vals_dev.append(cavg - style.dev_range_x * cdev)
+            hight_vals_dev.append(cavg + style.dev_range_x * cdev)
+            dev_times.append(tavg)
+
+        avg_timepoints = cast(List[float], numpy.linspace(avg_times[0], avg_times[-1], style.curve_approx_points))
+
+        low_vals_dev = approximate_curve(dev_times, low_vals_dev, avg_timepoints, style.curve_approx_level)
+        hight_vals_dev = approximate_curve(dev_times, hight_vals_dev, avg_timepoints, style.curve_approx_level)
+        new_vals_avg = approximate_curve(avg_times, avg_vals, avg_timepoints, style.curve_approx_level)
+
+        plt.plot(avg_timepoints, new_vals_avg, c=colors.suppl_color1,
+                 label="Average\nover {}s".format(style.avg_range))
+        plt.plot(avg_timepoints, low_vals_dev, c=colors.suppl_color2,
+                 label="Avg \xB1 {} * stdev\nover {}s".format(style.dev_range_x, style.avg_range))
+        plt.plot(avg_timepoints, hight_vals_dev, c=colors.suppl_color2)
+
+    plt.xlim(-5, max(time_points) + 5)
+
+    plt.xlabel("Time, seconds from test begin")
+    plt.ylabel("{}. Average and \xB1stddev over {} points".format(units, style.avg_range))
+    plt.title(title)
+    apply_style(style, eng=True)
+
+
+@provide_plot
+def plot_lat_over_time(title: str, ts: TimeSeries, bins_vals: List[int], samples: int = 5,
+                       colors: Any = ColorProfile, style: Any = StyleProfile) -> None:
+
+    min_time = min(ts.times)
+    times = [int(tm - min_time + 500) // 1000 for tm in ts.times]
+    ts_len = len(times)
+    step = ts_len / samples
+    points = [times[int(i * step + 0.5)] for i in range(samples)]
+    points.append(times[-1])
+    bounds = list(zip(points[:-1], points[1:]))
+    data = numpy.array(ts.data, dtype='int32')
+    data.shape = [len(ts.data) // ts.second_axis_size, ts.second_axis_size]  # type: ignore
+    agg_data = []
+    positions = []
+    labels = []
+
+    min_idxs = []
+    max_idxs = []
+
+    for begin, end in bounds:
+        agg_hist = numpy.sum(data[begin:end], axis=0)
+
+        vals = numpy.empty(shape=(numpy.sum(agg_hist),), dtype='float32')
+        cidx = 0
+        non_zero = agg_hist.nonzero()[0]
+        min_idxs.append(non_zero[0])
+        max_idxs.append(non_zero[-1])
+
+        for pos in non_zero:
+            vals[cidx:cidx + agg_hist[pos]] = bins_vals[pos]
+            cidx += agg_hist[pos]
+
+        agg_data.append(vals)
+        positions.append((end + begin) / 2)
+        labels.append(str((end + begin) // 2))
+
+    min_y = bins_vals[min(min_idxs)]
+    max_y = bins_vals[max(max_idxs)]
+
+    min_y -= (max_y - min_y) * 0.05
+    max_y += (max_y - min_y) * 0.05
+
+    # plot box size adjust (only plot, not spines and legend)
+    plt.boxplot(agg_data, 0, '', positions=positions, labels=labels, widths=step / 4)
+    plt.xlim(min(times), max(times))
+    plt.ylim(min_y, max_y)
+    plt.xlabel("Time, seconds from test begin, sampled for ~{} seconds".format(int(step)))
+    plt.ylabel("Latency, ms")
+    plt.title(title)
+    apply_style(style, eng=True, no_legend=True)
+
+
+@provide_plot
+def plot_heatmap(title: str, ts: TimeSeries, bins_vals: List[int], samples: int = 5,
+                 colors: Any = ColorProfile, style: Any = StyleProfile) -> None:
+    hist_bins_count = 20
+    bin_top = [100 * 2 ** i for i in range(20)]
+    bin_ranges = [[0, 0]]
+    cborder_it = iter(bin_top)
+    cborder = next(cborder_it)
+    for bin_val in bins_vals:
+        if bin_val < cborder:
+            bin_ranges
+
+    # bins: [100us, 200us, ...., 104s]
+    # msp origin bins ranges to heatmap bins
+
+@provide_plot
+def io_chart(title: str,
+             legend: str,
+             iosums: List[IOSummary],
+             iops_log_spine: bool = False,
+             lat_log_spine: bool = False,
+             colors: Any = ColorProfile,
+             style: Any = StyleProfile) -> None:
+
+    # --------------  MAGIC VALUES  ---------------------
+    # IOPS bar width
+    width = 0.35
+
+    # offset from center of bar to deviation/confidence range indicator
+    err_x_offset = 0.05
+
+    # figure size in inches
+    figsize = (12, 6)
+
+    # extra space on top and bottom, comparing to maximal tight layout
+    extra_y_space = 0.05
+
+    # additional spine for BW/IOPS on left side of plot
+    extra_io_spine_x_offset = -0.1
+
+    # extra space on left and right sides
+    extra_x_space = 0.5
+
+    # legend location settings
+    legend_location = "center left"
+    legend_bbox_to_anchor = (1.1, 0.81)
+
+    # plot box size adjust (only plot, not spines and legend)
+    plot_box_adjust = {'right': 0.66}
+    # --------------  END OF MAGIC VALUES  ---------------------
+
+    block_size = iosums[0].block_size
+    lc = len(iosums)
+    xt = list(range(1, lc + 1))
+
+    # x coordinate of middle of the bars
+    xpos = [i - width / 2 for i in xt]
+
+    # import matplotlib.gridspec as gridspec
+    # gs = gridspec.GridSpec(1, 3, width_ratios=[1, 4, 1])
+    # p1 = plt.subplot(gs[1])
+
+    fig, p1 = plt.subplots(figsize=figsize)
+
+    # plot IOPS/BW bars
+    if block_size >= LARGE_BLOCKS:
+        iops_primary = False
+        coef = MiB2KiB
+        p1.set_ylabel("BW (MiBps)")
+    else:
+        iops_primary = True
+        coef = block_size
+        p1.set_ylabel("IOPS")
+
+    p1.bar(xpos, [iosum.bw.average / coef for iosum in iosums], width=width, color=colors.box_color, label=legend)
+
+    # set correct x limits for primary IO spine
+    min_io = min(iosum.bw.average - iosum.bw.deviation * style.dev_range_x for iosum in iosums)
+    max_io = max(iosum.bw.average + iosum.bw.deviation * style.dev_range_x for iosum in iosums)
+    border = (max_io - min_io) * extra_y_space
+    io_lims = (min_io - border, max_io + border)
+
+    p1.set_ylim(io_lims[0] / coef, io_lims[-1] / coef)
+
+    # plot deviation and confidence error ranges
+    err1_legend = err2_legend = None
+    for pos, iosum in zip(xpos, iosums):
+        err1_legend = p1.errorbar(pos + width / 2 - err_x_offset,
+                                  iosum.bw.average / coef,
+                                  iosum.bw.deviation * style.dev_range_x / coef,
+                                  alpha=colors.subinfo_alpha,
+                                  color=colors.suppl_color1)  # 'magenta'
+        err2_legend = p1.errorbar(pos + width / 2 + err_x_offset,
+                                  iosum.bw.average / coef,
+                                  iosum.bw.confidence / coef,
+                                  alpha=colors.subinfo_alpha,
+                                  color=colors.suppl_color2)  # 'teal'
+
+    if style.grid:
+        p1.grid(True)
+
+    handles1, labels1 = p1.get_legend_handles_labels()
+
+    handles1 += [err1_legend, err2_legend]
+    labels1 += ["{}% dev".format(style.dev_perc),
+                "{}% conf".format(int(100 * iosums[0].bw.confidence_level))]
+
+    # extra y spine for latency on right side
+    p2 = p1.twinx()
+
+    # plot median and 95 perc latency
+    p2.plot(xt, [iosum.lat.perc_50 for iosum in iosums], label="lat med")
+    p2.plot(xt, [iosum.lat.perc_95 for iosum in iosums], label="lat 95%")
+
+    # limit and label x spine
+    plt.xlim(extra_x_space, lc + extra_x_space)
+    plt.xticks(xt, ["{0} * {1}".format(iosum.qd, iosum.nodes_count) for iosum in iosums])
+    p1.set_xlabel("QD * Test node count")
+
+    # apply log scales for X spines, if set
+    if iops_log_spine:
+        p1.set_yscale('log')
+
+    if lat_log_spine:
+        p2.set_yscale('log')
+
+    # extra y spine for BW/IOPS on left side
+    if style.extra_io_spine:
+        p3 = p1.twinx()
+        if iops_log_spine:
+            p3.set_yscale('log')
+
+        if iops_primary:
+            p3.set_ylabel("BW (MiBps)")
+            p3.set_ylim(io_lims[0] / MiB2KiB, io_lims[1] / MiB2KiB)
+        else:
+            p3.set_ylabel("IOPS")
+            p3.set_ylim(io_lims[0] / block_size, io_lims[1] / block_size)
+
+        p3.spines["left"].set_position(("axes", extra_io_spine_x_offset))
+        p3.spines["left"].set_visible(True)
+        p3.yaxis.set_label_position('left')
+        p3.yaxis.set_ticks_position('left')
+
+    p2.set_ylabel("Latency (ms)")
+
+    plt.title(title)
+
+    # legend box
+    handles2, labels2 = p2.get_legend_handles_labels()
+    plt.legend(handles1 + handles2, labels1 + labels2, loc=legend_location, bbox_to_anchor=legend_bbox_to_anchor)
+
+    # adjust central box size to fit legend
+    plt.subplots_adjust(**plot_box_adjust)
+    apply_style(style, eng=False, no_legend=True)
+
+
+#  --------------------  REPORT HELPERS --------------------------------------------------------------------------------
+
+
 class HTMLBlock:
     data = None  # type: str
     js_links = []  # type: List[str]
     css_links = []  # type: List[str]
 
 
+class Menu1st:
+    engineering = "Engineering"
+    summary = "Summary"
+
+
+class Menu2ndEng:
+    iops_time = "IOPS(time)"
+    hist = "IOPS/lat overall histogram"
+    lat_time = "Lat(time)"
+
+
+class Menu2ndSumm:
+    io_lat_qd = "IO & Lat vs QD"
+
+
+menu_1st_order = [Menu1st.summary, Menu1st.engineering]
+
+
+#  --------------------  REPORTS  --------------------------------------------------------------------------------------
+
+
 class Reporter(metaclass=abc.ABCMeta):
     @abc.abstractmethod
-    def get_divs(self, config, storage) -> Iterator[Tuple[str, str, HTMLBlock]]:
+    def get_divs(self, suite: TestSuiteConfig, storage: ResultStorage) -> Iterator[Tuple[str, str, HTMLBlock]]:
         pass
 
 
@@ -81,8 +609,38 @@
 
 
 # Main performance report
-class IOPS_QD(Reporter):
+class IO_QD(Reporter):
     """Creates graph, which show how IOPS and Latency depend on QD"""
+    def get_divs(self, suite: TestSuiteConfig, rstorage: ResultStorage) -> Iterator[Tuple[str, str, HTMLBlock]]:
+        ts_map = {}  # type: Dict[FioTestSumm, List[IOSummary]]
+        str_summary = {}  # type: Dict[FioTestSumm, List[IOSummary]]
+        for job in rstorage.iter_job(suite):
+            fjob = cast(FioJobConfig, job)
+            tpl_no_qd = fjob.characterized_tuple_no_qd()
+            io_summ = make_iosum(rstorage, suite, job)
+
+            if tpl_no_qd not in ts_map:
+                ts_map[tpl_no_qd] = [io_summ]
+                str_summary[tpl_no_qd] = (fjob.summary_no_qd(), fjob.long_summary_no_qd())
+            else:
+                ts_map[tpl_no_qd].append(io_summ)
+
+        for tpl, iosums in ts_map.items():
+            iosums.sort(key=lambda x: x.qd)
+            summary, summary_long = str_summary[tlp]
+
+            ds = DataSource(suite_id=suite.storage_id,
+                            job_id="io_over_qd_".format(summary),
+                            node_id="__all__",
+                            dev='fio',
+                            sensor="io_over_qd",
+                            tag="svg")
+
+            title = "IOPS, BW, Lat vs. QD.\n" + summary_long
+            fpath = io_chart(rstorage, ds, title=title, legend="IOPS/BW", iosums=iosums)
+            yield Menu1st.summary, Menu2ndSumm.io_lat_qd, html.img(fpath)
+            if DEBUG:
+                return
 
 
 # Linearization report
@@ -91,21 +649,231 @@
 
 
 # IOPS/latency distribution
-class IOPSHist(Reporter):
+class IOHist(Reporter):
     """IOPS.latency distribution histogram"""
+    def get_divs(self, suite: TestSuiteConfig, rstorage: ResultStorage) -> Iterator[Tuple[str, str, HTMLBlock]]:
+        for job in rstorage.iter_job(suite):
+            fjob = cast(FioJobConfig, job)
+            agg_lat = get_aggregated(rstorage, suite, fjob, "lat")
+            bins_edges = numpy.array(get_lat_vals(agg_lat.second_axis_size), dtype='float32') / 1000  # convert us to ms
+            lat_stat_prop = calc_histo_stat_props(agg_lat, bins_edges, bins_count=StyleProfile.hist_boxes)
+
+            title = "Latency distribution. " + fjob.long_summary
+            units = "ms"
+
+            fpath = plot_hist(rstorage, agg_lat.source(tag='hist.svg'), title, units, lat_stat_prop)
+            if DEBUG:
+                yield Menu1st.summary, Menu2ndSumm.io_lat_qd, html.img(fpath)
+            else:
+                yield Menu1st.engineering, Menu2ndEng.hist, html.img(fpath)
+
+            agg_io = get_aggregated(rstorage, suite, fjob, "bw")
+
+            if fjob.bsize >= LARGE_BLOCKS:
+                title = "BW distribution. " + fjob.long_summary
+                units = "MiBps"
+                agg_io.data /= MiB2KiB
+            else:
+                title = "IOPS distribution. " + fjob.long_summary
+                agg_io.data /= fjob.bsize
+                units = "IOPS"
+
+            io_stat_prop = calc_norm_stat_props(agg_io, bins_count=StyleProfile.hist_boxes)
+            fpath = plot_hist(rstorage, agg_io.source(tag='hist.svg'), title, units, io_stat_prop)
+            if DEBUG:
+                yield Menu1st.summary, Menu2ndSumm.io_lat_qd, html.img(fpath)
+                return
+            else:
+                yield Menu1st.engineering, Menu2ndEng.hist, html.img(fpath)
 
 
-# IOPS/latency over test time
-class IOPSTime(Reporter):
+# IOPS/latency over test time for each job
+class IOTime(Reporter):
     """IOPS/latency during test"""
-    def get_divs(self, config, storage) -> Iterator[Tuple[str, str, HTMLBlock]]:
-        pass
+    def get_divs(self, suite: TestSuiteConfig, rstorage: ResultStorage) -> Iterator[Tuple[str, str, HTMLBlock]]:
+        for job in rstorage.iter_job(suite):
+            fjob = cast(FioJobConfig, job)
+            agg_lat = get_aggregated(rstorage, suite, fjob, "lat")
+            bins_edges = numpy.array(get_lat_vals(agg_lat.second_axis_size), dtype='float32') / 1000
+            title = "Latency during test. " + fjob.long_summary
+
+            fpath = plot_lat_over_time(rstorage, agg_lat.source(tag='ts.svg'), title, agg_lat, bins_edges)
+            if DEBUG:
+                yield Menu1st.summary, Menu2ndSumm.io_lat_qd, html.img(fpath)
+            else:
+                yield Menu1st.engineering, Menu2ndEng.lat_time, html.img(fpath)
+
+            fpath = plot_heatmap(rstorage, agg_lat.source(tag='hmap.svg'), title, agg_lat, bins_edges)
+            if DEBUG:
+                yield Menu1st.summary, Menu2ndSumm.io_lat_qd, html.img(fpath)
+            else:
+                yield Menu1st.engineering, Menu2ndEng.lat_time, html.img(fpath)
+
+            agg_io = get_aggregated(rstorage, suite, fjob, "bw")
+            if fjob.bsize >= LARGE_BLOCKS:
+                title = "BW during test. " + fjob.long_summary
+                units = "MiBps"
+                agg_io.data /= MiB2KiB
+            else:
+                title = "IOPS during test. " + fjob.long_summary
+                agg_io.data /= fjob.bsize
+                units = "IOPS"
+
+            fpath = plot_v_over_time(rstorage, agg_io.source(tag='ts.svg'), title, units, agg_io)
+
+            if DEBUG:
+                yield Menu1st.summary, Menu2ndSumm.io_lat_qd, html.img(fpath)
+                return
+            else:
+                yield Menu1st.engineering, Menu2ndEng.iops_time, html.img(fpath)
+
+
+def is_sensor_numarray(sensor: str, metric: str) -> bool:
+    """Returns True if sensor provides one-dimension array of numeric values. One number per one measurement."""
+    return True
+
+
+LEVEL_SENSORS = {("block-io", "io_queue"),
+                 ("system-cpu", "procs_blocked"),
+                 ("system-cpu", "procs_queue")}
+
+
+def is_level_sensor(sensor: str, metric: str) -> bool:
+    """Returns True if sensor measure level of any kind, E.g. queue depth."""
+    return (sensor, metric) in LEVEL_SENSORS
+
+
+def is_delta_sensor(sensor: str, metric: str) -> bool:
+    """Returns True if sensor provides deltas for cumulative value. E.g. io completed in given period"""
+    return not is_level_sensor(sensor, metric)
+
+
+
+def get_sensor(storage: Storage, node: str, sensor: str, dev: str, metric: str,
+               time_range: Tuple[int, int]) -> numpy.array:
+    """Return sensor values for given node for given period. Return per second estimated values array
+
+    Raise an error if required range is not full covered by data in storage.
+    First it finds range of results from sensor, which fully covers requested range.
+    ...."""
+
+    collected_at = numpy.array(storage.get_array("sensors/{}_collected_at".format(node)), dtype="int")
+    data = numpy.array(storage.get_array("sensors/{}_{}.{}.{}".format(node, sensor, dev, metric)))
+
+    # collected_at is array of pairs (collection_started_at, collection_finished_at)
+    collection_start_at = collected_at[::2]
+
+    MICRO = 1000000
+
+    # convert secods to us
+    begin = time_range[0] * MICRO
+    end = time_range[1] * MICRO
+
+    if begin < collection_start_at[0] or end > collection_start_at[-1] or end <= begin:
+        raise AssertionError(("Incorrect data for get_sensor - time_range={!r}, collected_at=[{}, ..., {}]," +
+                              "sensor = {}_{}.{}.{}").format(time_range,
+                                                             collected_at[0] // MICRO,
+                                                             collected_at[-1] // MICRO,
+                                                             node, sensor, dev, metric))
+
+    pos1, pos2 = numpy.searchsorted(collection_start_at, (begin, end))
+    assert pos1 >= 1
+
+    time_bounds = collection_start_at[pos1 - 1: pos2]
+    edge_it = iter(time_bounds)
+    val_it = iter(data[pos1 - 1: pos2])
+
+    result = []
+    curr_summ = 0
+
+    results_cell_ends = begin + MICRO
+    curr_end = next(edge_it)
+
+    while results_cell_ends <= end:
+        curr_start = curr_end
+        curr_end = next(edge_it)
+        curr_val = next(val_it)
+        while curr_end >= results_cell_ends and results_cell_ends <= end:
+            current_part = (results_cell_ends - curr_start) / (curr_end - curr_start) * curr_val
+            result.append(curr_summ + current_part)
+            curr_summ = 0
+            curr_val -= current_part
+            curr_start = results_cell_ends
+            results_cell_ends += MICRO
+        curr_summ += curr_val
+
+    assert len(result) == (end - begin) // MICRO
+    return result
+
+
+class ResourceUsage:
+    def __init__(self, io_r_ops: int, io_w_ops: int, io_r_kb: int, io_w_kb: int) -> None:
+        self.io_w_ops = io_w_ops
+        self.io_r_ops = io_r_ops
+        self.io_w_kb = io_w_kb
+        self.io_r_kb = io_r_kb
+
+        self.cpu_used_user = None  # type: int
+        self.cpu_used_sys = None  # type: int
+        self.cpu_wait_io = None  # type: int
+
+        self.net_send_packets = None  # type: int
+        self.net_recv_packets = None  # type: int
+        self.net_send_kb = None  # type: int
+        self.net_recv_kb = None  # type: int
 
 
 # Cluster load over test time
 class ClusterLoad(Reporter):
     """IOPS/latency during test"""
 
+    storage_sensors = [
+        ('block-io', 'reads_completed', "Read ops"),
+        ('block-io', 'writes_completed', "Write ops"),
+        ('block-io', 'sectors_read', "Read kb"),
+        ('block-io', 'sectors_written', "Write kb"),
+    ]
+
+    def get_divs(self, suite: TestSuiteConfig, rstorage: ResultStorage) -> Iterator[Tuple[str, str, HTMLBlock]]:
+        # split nodes on test and other
+        storage = rstorage.storage
+        nodes = storage.load_list(NodeInfo, "all_nodes")  # type: List[NodeInfo]
+
+        test_nodes = {node.node_id for node in nodes if 'testnode' in node.roles}
+        cluster_nodes = {node.node_id for node in nodes if 'testnode' not in node.roles}
+
+        for job in rstorage.iter_job(suite):
+            # convert ms to s
+            time_range = (job.reliable_info_starts_at // MS2S, job.reliable_info_stops_at // MS2S)
+            len = time_range[1] - time_range[0]
+
+            for sensor, metric, sensor_title in self.storage_sensors:
+                sum_testnode = numpy.zeros((len,))
+                sum_other = numpy.zeros((len,))
+
+                for path, groups in iter_sensors(rstorage.storage, sensor=sensor, metric=metric):
+                    data = get_sensor(rstorage.storage, groups['node'], sensor, groups['dev'], metric, time_range)
+                    if groups['node'] in test_nodes:
+                        sum_testnode += data
+                    else:
+                        sum_other += data
+
+                ds = DataSource(suite_id=suite.storage_id, job_id=job.summary, node_id="cluster",
+                                dev=sensor, sensor=metric, tag="ts.svg")
+
+                # s to ms
+                ts = TimeSeries(name="", times=numpy.arange(*time_range) * MS2S, data=sum_testnode, raw=None)
+                fpath = plot_v_over_time(rstorage, ds, "{}.{}".format(sensor, metric), sensor_title, ts=ts)
+                yield Menu1st.engineering, Menu2ndEng.iops_time, html.img(fpath)
+
+            if DEBUG:
+                return
+
+
+# Ceph cluster summary
+class ResourceConsumption(Reporter):
+    """Resources consumption report, only text"""
+
 
 # Node load over test time
 class NodeLoad(Reporter):
@@ -117,12 +885,72 @@
     """IOPS/latency during test"""
 
 
-# TODO: Resource consumption report
 # TODO: Ceph operation breakout report
 # TODO: Resource consumption for different type of test
 
 
-#
+# ------------------------------------------  REPORT STAGES  -----------------------------------------------------------
+
+
+class HtmlReportStage(Stage):
+    priority = StepOrder.REPORT
+
+    def run(self, ctx: TestRun) -> None:
+        rstorage = ResultStorage(ctx.storage)
+        reporters = [ClusterLoad()] # IO_QD(), IOTime(), IOHist()] # type: List[Reporter]
+
+        root_dir = os.path.dirname(os.path.dirname(wally.__file__))
+        doc_templ_path = os.path.join(root_dir, "report_templates/index.html")
+        report_template = open(doc_templ_path, "rt").read()
+        css_file_src = os.path.join(root_dir, "report_templates/main.css")
+        css_file = open(css_file_src, "rt").read()
+
+        menu_block = []
+        content_block = []
+        link_idx = 0
+
+        matplotlib.rcParams.update({'font.size': 10})
+
+        items = defaultdict(lambda: defaultdict(list))  # type: Dict[str, Dict[str, list]]
+        for suite in rstorage.iter_suite(FioTest.name):
+            for reporter in reporters:
+                for block, item, html in reporter.get_divs(suite, rstorage):
+                    items[block][item].append(html)
+
+        for idx_1st, menu_1st in enumerate(sorted(items, key=lambda x: menu_1st_order.index(x))):
+            menu_block.append(
+                '<a href="#item{}" class="nav-group" data-toggle="collapse" data-parent="#MainMenu">{}</a>'
+                .format(idx_1st, menu_1st)
+            )
+            menu_block.append('<div class="collapse" id="item{}">'.format(idx_1st))
+            for menu_2nd in sorted(items[menu_1st]):
+                menu_block.append('    <a href="#content{}" class="nav-group-item">{}</a>'
+                                  .format(link_idx, menu_2nd))
+                content_block.append('<div id="content{}">'.format(link_idx))
+                content_block.extend("    " + x for x in items[menu_1st][menu_2nd])
+                content_block.append('</div>')
+                link_idx += 1
+            menu_block.append('</div>')
+
+        report = report_template.replace("{{{menu}}}", ("\n" + " " * 16).join(menu_block))
+        report = report.replace("{{{content}}}", ("\n" + " " * 16).join(content_block))
+        report_path = rstorage.put_report(report, "index.html")
+        rstorage.put_report(css_file, "main.css")
+        logger.info("Report is stored into %r", report_path)
+
+
+class ConsoleReportStage(Stage):
+
+    priority = StepOrder.REPORT
+
+    def run(self, ctx: TestRun) -> None:
+        # TODO(koder): load data from storage
+        raise NotImplementedError("...")
+
+
+#  ---------------------------   LEGASY --------------------------------------------------------------------------------
+
+
 # # disk_info = None
 # # base = None
 # # linearity = None
diff --git a/wally/result_classes.py b/wally/result_classes.py
index 3616f47..1a148ed 100644
--- a/wally/result_classes.py
+++ b/wally/result_classes.py
@@ -1,5 +1,7 @@
+import abc
 import array
-from typing import Dict, List, Any, Optional, Tuple, cast
+from typing import Dict, List, Any, Optional, Tuple, cast, Type, Iterator
+from collections import OrderedDict
 
 
 import numpy
@@ -7,13 +9,31 @@
 
 
 from .node_interfaces import IRPCNode
-from .istorable import IStorable, Storable
+from .common_types import IStorable, Storable
 from .utils import round_digits, Number
 
 
-class TestJobConfig(Storable):
-    def __init__(self) -> None:
-        self.summary = None  # type: str
+class TestJobConfig(Storable, metaclass=abc.ABCMeta):
+    def __init__(self, idx: int) -> None:
+        self.idx = idx
+        self.reliable_info_time_range = None  # type: Tuple[int, int]
+        self.vals = OrderedDict()  # type: Dict[str, Any]
+
+    @property
+    def storage_id(self) -> str:
+        return "{}_{}".format(self.summary, self.idx)
+
+    @abc.abstractproperty
+    def characterized_tuple(self) -> Tuple:
+        pass
+
+    @abc.abstractproperty
+    def summary(self, *excluded_fields) -> str:
+        pass
+
+    @abc.abstractproperty
+    def long_summary(self, *excluded_fields) -> str:
+        pass
 
 
 class TestSuiteConfig(IStorable):
@@ -31,15 +51,22 @@
                  params: Dict[str, Any],
                  run_uuid: str,
                  nodes: List[IRPCNode],
-                 remote_dir: str) -> None:
+                 remote_dir: str,
+                 idx: int) -> None:
         self.test_type = test_type
         self.params = params
         self.run_uuid = run_uuid
         self.nodes = nodes
-        self.nodes_ids = [node.info.node_id() for node in nodes]
+        self.nodes_ids = [node.node_id for node in nodes]
         self.remote_dir = remote_dir
+        self.storage_id = "{}_{}".format(self.test_type, idx)
 
-    def __eq__(self, other: 'TestSuiteConfig') -> bool:
+    def __eq__(self, o: object) -> bool:
+        if type(o) is not self.__class__:
+            return False
+
+        other = cast(TestSuiteConfig, o)
+
         return (self.test_type == other.test_type and
                 self.params == other.params and
                 set(self.nodes_ids) == set(other.nodes_ids))
@@ -62,23 +89,50 @@
         return obj
 
 
+class DataSource:
+    def __init__(self,
+                 suite_id: str = None,
+                 job_id: str = None,
+                 node_id: str = None,
+                 dev: str = None,
+                 sensor: str = None,
+                 tag: str = None) -> None:
+        self.suite_id = suite_id
+        self.job_id = job_id
+        self.node_id = node_id
+        self.dev = dev
+        self.sensor = sensor
+        self.tag = tag
+
+    def __call__(self, **kwargs) -> 'DataSource':
+        dct = self.__dict__.copy()
+        dct.update(kwargs)
+        return self.__class__(**dct)
+
+    def __str__(self) -> str:
+        return "{0.suite_id}.{0.job_id}/{0.node_id}/{0.dev}.{0.sensor}.{0.tag}".format(self)
+
+    def __repr__(self) -> str:
+        return str(self)
+
+
 class TimeSeries:
     """Data series from sensor - either system sensor or from load generator tool (e.g. fio)"""
 
     def __init__(self,
                  name: str,
                  raw: Optional[bytes],
-                 data: array.array,
-                 times: array.array,
+                 data: numpy.array,
+                 times: numpy.array,
                  second_axis_size: int = 1,
-                 bins_edges: List[float] = None) -> None:
+                 source: DataSource = None) -> None:
 
         # Sensor name. Typically DEV_NAME.METRIC
         self.name = name
 
         # Time series times and values. Time in ms from Unix epoch.
-        self.times = times  # type: List[int]
-        self.data = data  # type: List[int]
+        self.times = times
+        self.data = data
 
         # Not equal to 1 in case of 2d sensors, like latency, when each measurement is a histogram.
         self.second_axis_size = second_axis_size
@@ -86,8 +140,18 @@
         # Raw sensor data (is provided). Like log file for fio iops/bw/lat.
         self.raw = raw
 
-        # bin edges for historgam timeseries
-        self.bins_edges = bins_edges
+        self.source = source
+
+    def __str__(self) -> str:
+        res = "TS({}):\n".format(self.name)
+        res += "    source={}\n".format(self.source)
+        res += "    times_size={}\n".format(len(self.times))
+        res += "    data_size={}\n".format(len(self.data))
+        res += "    data_shape={}x{}\n".format(len(self.data) // self.second_axis_size, self.second_axis_size)
+        return res
+
+    def __repr__(self) -> str:
+        return str(self)
 
 
 # (node_name, source_dev, metric_name) => metric_results
@@ -96,7 +160,7 @@
 
 class StatProps(IStorable):
     "Statistic properties for timeseries with unknown data distribution"
-    def __init__(self, data: List[Number]) -> None:
+    def __init__(self, data: numpy.array) -> None:
         self.perc_99 = None  # type: float
         self.perc_95 = None  # type: float
         self.perc_90 = None  # type: float
@@ -106,8 +170,8 @@
         self.max = None  # type: Number
 
         # bin_center: bin_count
-        self.bins_populations = None  # type: List[int]
-        self.bins_edges = None  # type: List[float]
+        self.bins_populations = None # type: numpy.array
+        self.bins_mids = None  # type: numpy.array
         self.data = data
 
     def __str__(self) -> str:
@@ -122,14 +186,16 @@
 
     def raw(self) -> Dict[str, Any]:
         data = self.__dict__.copy()
-        data['bins_edges'] = list(self.bins_edges)
+        del data['data']
+        data['bins_mids'] = list(self.bins_mids)
         data['bins_populations'] = list(self.bins_populations)
         return data
 
     @classmethod
     def fromraw(cls, data: Dict[str, Any]) -> 'StatProps':
-        data['bins_edges'] = numpy.array(data['bins_edges'])
+        data['bins_mids'] = numpy.array(data['bins_mids'])
         data['bins_populations'] = numpy.array(data['bins_populations'])
+        data['data'] = None
         res = cls.__new__(cls)
         res.__dict__.update(data)
         return res
@@ -138,14 +204,14 @@
 class HistoStatProps(StatProps):
     """Statistic properties for 2D timeseries with unknown data distribution and histogram as input value.
     Used for latency"""
-    def __init__(self, data: List[Number], second_axis_size: int) -> None:
+    def __init__(self, data: numpy.array, second_axis_size: int) -> None:
         self.second_axis_size = second_axis_size
         StatProps.__init__(self, data)
 
 
 class NormStatProps(StatProps):
     "Statistic properties for timeseries with normal data distribution. Used for iops/bw"
-    def __init__(self, data: List[Number]) -> None:
+    def __init__(self, data: numpy.array) -> None:
         StatProps.__init__(self, data)
 
         self.average = None  # type: float
@@ -153,6 +219,8 @@
         self.confidence = None  # type: float
         self.confidence_level = None  # type: float
         self.normtest = None  # type: NormaltestResult
+        self.skew = None  # type: float
+        self.kurt = None  # type: float
 
     def __str__(self) -> str:
         res = ["NormStatProps(size = {}):".format(len(self.data)),
@@ -163,13 +231,16 @@
                "    perc_95 = {}".format(round_digits(self.perc_95)),
                "    perc_99 = {}".format(round_digits(self.perc_99)),
                "    range {} {}".format(round_digits(self.min), round_digits(self.max)),
-               "    normtest = {0.normtest}".format(self)]
+               "    normtest = {0.normtest}".format(self),
+               "    skew ~ kurt = {0.skew} ~ {0.kurt}".format(self)]
         return "\n".join(res)
 
     def raw(self) -> Dict[str, Any]:
         data = self.__dict__.copy()
         data['normtest'] = (data['nortest'].statistic, data['nortest'].pvalue)
-        data['bins_edges'] = list(self.bins_edges)
+        del data['data']
+        data['bins_mids'] = list(self.bins_mids)
+        data['bins_populations'] = list(self.bins_populations)
         return data
 
     @classmethod
@@ -195,3 +266,51 @@
         self.run_interval = (begin_time, end_time)
         self.raw = raw  # type: JobMetrics
         self.processed = None  # type: JobStatMetrics
+
+
+class IResultStorage(metaclass=abc.ABCMeta):
+
+    @abc.abstractmethod
+    def sync(self) -> None:
+        pass
+
+    @abc.abstractmethod
+    def put_or_check_suite(self, suite: TestSuiteConfig) -> None:
+        pass
+
+    @abc.abstractmethod
+    def put_job(self, suite: TestSuiteConfig, job: TestJobConfig) -> None:
+        pass
+
+    @abc.abstractmethod
+    def put_ts(self, ts: TimeSeries) -> None:
+        pass
+
+    @abc.abstractmethod
+    def put_extra(self, data: bytes, source: DataSource) -> None:
+        pass
+
+    @abc.abstractmethod
+    def put_stat(self, data: StatProps, source: DataSource) -> None:
+        pass
+
+    @abc.abstractmethod
+    def get_stat(self, stat_cls: Type[StatProps], source: DataSource) -> StatProps:
+        pass
+
+    @abc.abstractmethod
+    def iter_suite(self, suite_type: str = None) -> Iterator[TestSuiteConfig]:
+        pass
+
+    @abc.abstractmethod
+    def iter_job(self, suite: TestSuiteConfig) -> Iterator[TestJobConfig]:
+        pass
+
+    @abc.abstractmethod
+    def iter_ts(self, suite: TestSuiteConfig, job: TestJobConfig) -> Iterator[TimeSeries]:
+        pass
+
+    # return path to file to be inserted into report
+    @abc.abstractmethod
+    def put_plot_file(self, data: bytes, source: DataSource) -> str:
+        pass
diff --git a/wally/run_test.py b/wally/run_test.py
index a11b43d..d3c68b6 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -10,21 +10,11 @@
 from .node_interfaces import NodeInfo, IRPCNode
 from .stage import Stage, StepOrder
 from .sensors import collect_sensors_data
-from .suits.io.fio import IOPerfTest
-from .suits.itest import TestSuiteConfig
-from .suits.mysql import MysqlTest
-from .suits.omgbench import OmgTest
-from .suits.postgres import PgBenchTest
+from .suits.all_suits import all_suits
 from .test_run_class import TestRun
 from .utils import StopTestError
-
-
-TOOL_TYPE_MAPPER = {
-    "io": IOPerfTest,
-    "pgbench": PgBenchTest,
-    "mysql": MysqlTest,
-    "omg": OmgTest,
-}
+from .result_classes import TestSuiteConfig
+from .hlstorage import ResultStorage
 
 
 logger = logging.getLogger("wally")
@@ -81,7 +71,7 @@
         if ctx.config.get("download_rpc_logs", False):
             for node in ctx.nodes:
                 if node.rpc_log_file is not None:
-                    nid = node.info.node_id()
+                    nid = node.node_id
                     path = "rpc_logs/" + nid
                     node.conn.server.flush_logs()
                     log = node.get_file_content(node.rpc_log_file)
@@ -110,7 +100,7 @@
         with ctx.get_pool() as pool:
             # can't make next RPC request until finish with previous
             for node in ctx.nodes:
-                nid = node.info.node_id()
+                nid = node.node_id
                 hw_info_path = "hw_info/{}".format(nid)
                 if hw_info_path not in ctx.storage:
                     futures[(hw_info_path, nid)] = pool.submit(hw_info.get_hw_info, node)
@@ -124,7 +114,7 @@
 
             futures.clear()
             for node in ctx.nodes:
-                nid = node.info.node_id()
+                nid = node.node_id
                 sw_info_path = "sw_info/{}".format(nid)
                 if sw_info_path not in ctx.storage:
                     futures[(sw_info_path, nid)] = pool.submit(hw_info.get_sw_info, node)
@@ -253,15 +243,17 @@
                 logger.error("No nodes found for test, skipping it.")
                 continue
 
-            test_cls = TOOL_TYPE_MAPPER[name]
+            test_cls = all_suits[name]
             remote_dir = ctx.config.default_test_local_folder.format(name=name, uuid=ctx.config.run_uuid)
-            test_cfg = TestSuiteConfig(test_cls.name,
-                                       params=params,
-                                       run_uuid=ctx.config.run_uuid,
-                                       nodes=test_nodes,
-                                       remote_dir=remote_dir)
+            suite = TestSuiteConfig(test_cls.name,
+                                    params=params,
+                                    run_uuid=ctx.config.run_uuid,
+                                    nodes=test_nodes,
+                                    remote_dir=remote_dir,
+                                    idx=suite_idx)
 
-            test_cls(storage=ctx.storage, config=test_cfg, idx=suite_idx,
+            test_cls(storage=ResultStorage(ctx.storage),
+                     suite=suite,
                      on_idle=lambda: collect_sensors_data(ctx, False)).run()
 
     @classmethod
@@ -278,6 +270,6 @@
                 logger.error("Internal error: Some nodes already stored in " +
                              "nodes_info before LoadStoredNodesStage stage")
                 raise StopTestError()
-            ctx.nodes_info = {node.node_id(): node
+            ctx.nodes_info = {node.node_id: node
                               for node in ctx.storage.load_list(NodeInfo, "all_nodes")}
             logger.info("%s nodes loaded from database", len(ctx.nodes_info))
diff --git a/wally/sensors.py b/wally/sensors.py
index 0a1bbe9..54ae1ad 100644
--- a/wally/sensors.py
+++ b/wally/sensors.py
@@ -63,7 +63,7 @@
             for role in node.info.roles:
                 node_cfg.update(per_role_config.get(role, {}))  # type: ignore
 
-            nid = node.info.node_id()
+            nid = node.node_id
             if node_cfg:
                 # ceph requires additional settings
                 if 'ceph' in node_cfg:
@@ -81,7 +81,7 @@
 
 def collect_sensors_data(ctx: TestRun, stop: bool = False):
     for node in ctx.nodes:
-        node_id = node.info.node_id()
+        node_id = node.node_id
         if node_id in ctx.sensors_run_on:
 
             if stop:
@@ -91,7 +91,7 @@
 
             # TODO: data is unpacked/repacked here with no reason
             for path, value in sensors_rpc_plugin.unpack_rpc_updates(func()):
-                ctx.storage.append(value, "metric", node_id, path)
+                ctx.storage.append(value, "sensors/{}_{}".format(node_id, path))
 
 
 class CollectSensorsStage(Stage):
diff --git a/wally/sensors_rpc_plugin.py b/wally/sensors_rpc_plugin.py
index be5e5db..7400ab5 100644
--- a/wally/sensors_rpc_plugin.py
+++ b/wally/sensors_rpc_plugin.py
@@ -2,6 +2,7 @@
 import sys
 import json
 import time
+import zlib
 import array
 import pprint
 import logging
@@ -11,6 +12,9 @@
 import collections
 
 
+import Pool  # type: ignore
+
+
 mod_name = "sensors"
 __version__ = (0, 1)
 
@@ -36,7 +40,7 @@
         pass
 
     @classmethod
-    def unpack_results(cls, device, metrics, data):
+    def unpack_results(cls, device, metrics, data, typecode):
         pass
 
     def init(self):
@@ -52,26 +56,30 @@
     def __init__(self, params, allowed=None, disallowed=None):
         Sensor.__init__(self, params, allowed, disallowed)
         self.data = collections.defaultdict(lambda: array.array(self.typecode))
+        self.prev_vals = {}
 
     def add_data(self, device, name, value):
         self.data[(device, name)].append(value)
 
+    def add_relative(self, device, name, value):
+        key = (device, name)
+        pval = self.prev_vals.get(key)
+        if pval is not None:
+            self.data[key].append(value - pval)
+        self.prev_vals[key] = value
+
     def get_updates(self):
         res = self.data
         self.data = collections.defaultdict(lambda: array.array(self.typecode))
-        packed = {
-            name: arr.typecode + arr.tostring()
-            for name, arr in res.items()
-        }
-        return packed
+        return {key: (arr.typecode, arr.tostring()) for key, arr in res.items()}
 
     @classmethod
-    def unpack_results(cls, device, metrics, packed):
-        arr = array.array(chr(packed[0]))
+    def unpack_results(cls, device, metrics, packed, typecode):
+        arr = array.array(typecode)
         if sys.version_info >= (3, 0, 0):
-            arr.frombytes(packed[1:])
+            arr.frombytes(packed)
         else:
-            arr.fromstring(packed[1:])
+            arr.fromstring(packed)
         return arr
 
     def is_dev_accepted(self, name):
@@ -86,6 +94,9 @@
         return dev_ok
 
 
+time_array_typechar = ArraysSensor.typecode
+
+
 def provides(name):
     def closure(cls):
         SensorsMap[name] = cls
@@ -157,24 +168,32 @@
 
     def __init__(self, *args, **kwargs):
         ArraysSensor.__init__(self, *args, **kwargs)
+
         if self.disallowed is None:
             self.disallowed = ('ram', 'loop')
-        self.allowed_devs = set()
 
         for line in open('/proc/diskstats'):
-            dev_name = line.split()[2]
-            if self.is_dev_accepted(dev_name) and not dev_name[-1].isdigit():
-                self.allowed_devs.add(dev_name)
-
-    def collect(self):
-        for line in open('/proc/diskstats'):
             vals = line.split()
             dev_name = vals[2]
-            if dev_name not in self.allowed_devs:
+            if self.is_dev_accepted(dev_name) and not dev_name[-1].isdigit():
+                self.allowed_names.add(dev_name)
+
+        self.collect(init_rel=True)
+
+    def collect(self, init_rel=False):
+        for line in open('/proc/diskstats'):
+            vals = line.split()
+            dev_name = vals[2]
+
+            if dev_name not in self.allowed_names:
                 continue
 
-            for pos, name, _ in self.io_values_pos:
-                self.add_data(dev_name, name, int(vals[pos]))
+            for pos, name, aggregated in self.io_values_pos:
+                vl = int(vals[pos])
+                if aggregated:
+                    self.add_relative(dev_name, name, vl)
+                elif not init_rel:
+                    self.add_data(dev_name, name, int(vals[pos]))
 
 
 @provides("net-io")
@@ -210,21 +229,26 @@
         if self.allowed is None:
             self.allowed = ('eth',)
 
-        self.allowed_devs = set()
-        for line in open('/proc/net/dev').readlines()[2:]:
-            dev_name = line.split(":", 1)[0].strip()
-            dev_ok = self.is_dev_accepted(dev_name)
-            if dev_ok and ('.' not in dev_name or not dev_name.split('.')[-1].isdigit()):
-                self.allowed_devs.add(dev_name)
+        for _, _, aggregated in self.net_values_pos:
+            assert aggregated, "Non-aggregated values is not supported in net sensor"
 
-    def collect(self):
         for line in open('/proc/net/dev').readlines()[2:]:
             dev_name, stats = line.split(":", 1)
             dev_name = dev_name.strip()
-            if dev_name in self.allowed_devs:
+            if self.is_dev_accepted(dev_name):
+                self.allowed_names.add(dev_name)
+
+        self.collect(init_rel=True)
+
+    def collect(self, init_rel=False):
+        for line in open('/proc/net/dev').readlines()[2:]:
+            dev_name, stats = line.split(":", 1)
+            dev_name = dev_name.strip()
+            if dev_name in self.allowed_names:
                 vals = stats.split()
                 for pos, name, _ in self.net_values_pos:
-                    self.add_data(dev_name, name, int(vals[pos]))
+                    vl = int(vals[pos])
+                    self.add_relative(dev_name, name, vl )
 
 
 def pid_stat(pid):
@@ -448,27 +472,28 @@
         res = super().get_updates()
 
         for osd_id, data in self.historic.items():
-            res[("osd{}".format(osd_id), "historic")] = data
+            res[("osd{}".format(osd_id), "historic")] = (None, data)
 
         self.historic = {}
 
         for osd_id, data in self.in_flight.items():
-            res[("osd{}".format(osd_id), "in_flight")] = data
+            res[("osd{}".format(osd_id), "in_flight")] = (None, data)
 
         self.in_flight = {}
 
         return res
 
     @classmethod
-    def unpack_results(cls, device, metrics, packed):
+    def unpack_results(cls, device, metrics, packed, typecode):
         if metrics in ('historic', 'in_flight'):
+            assert typecode is None
             return packed
 
-        arr = array.array(chr(packed[0]))
+        arr = array.array(typecode)
         if sys.version_info >= (3, 0, 0):
-            arr.frombytes(packed[1:])
+            arr.frombytes(packed)
         else:
-            arr.fromstring(packed[1:])
+            arr.fromstring(packed)
 
         return arr
 
@@ -476,7 +501,7 @@
 class SensorsData(object):
     def __init__(self):
         self.cond = threading.Condition()
-        self.collected_at = array.array("f")
+        self.collected_at = array.array(time_array_typechar)
         self.stop = False
         self.sensors = {}
         self.data_fd = None  # temporary file to store results
@@ -501,12 +526,23 @@
 def sensors_bg_thread(sensors_config, sdata):
     try:
         next_collect_at = time.time()
+        if "pool_sz" in sensors_config:
+            sensors_config = sensors_config.copy()
+            pool_sz = sensors_config.pop("pool_sz")
+        else:
+            pool_sz = 32
+
+        if pool_sz != 0:
+            pool = Pool(sensors_config.get("pool_sz"))
+        else:
+            pool = None
 
         # prepare sensor classes
         with sdata.cond:
             sdata.sensors = {}
             for name, config in sensors_config.items():
                 params = {'params': config}
+                logger.debug("Start sensor %r with config %r", name, config)
 
                 if "allow" in config:
                     params["allowed_prefixes"] = config["allow"]
@@ -535,11 +571,18 @@
 
             ctm = time.time()
             with sdata.cond:
-                sdata.collected_at.append(ctm)
-                for sensor in sdata.sensors.values():
-                    sensor.collect()
+                sdata.collected_at.append(int(ctm * 1000000))
+                if pool is not None:
+                    caller = lambda x: x()
+                    for ok, val in pool.map(caller, [sensor.collect for sensor in sdata.sensors.values()]):
+                        if not ok:
+                            raise val
+                else:
+                    for sensor in sdata.sensors.values():
+                        sensor.collect()
                 etm = time.time()
-                sdata.collected_at.append(etm)
+                sdata.collected_at.append(int(etm * 1000000))
+                logger.debug("Add data to collected_at - %s, %s", ctm, etm)
 
             if etm - ctm > 0.1:
                 # TODO(koder): need to signal that something in not really ok with sensor collecting
@@ -551,7 +594,6 @@
     finally:
         for sensor in sdata.sensors.values():
             sensor.stop()
-        sdata.sensors = None
 
 
 sensors_thread = None
@@ -577,39 +619,46 @@
 
 
 def unpack_rpc_updates(res_tuple):
-    data, collected_at_b = res_tuple
-    collected_at = array.array('f')
+    offset_map, compressed_blob, compressed_collected_at_b = res_tuple
+    blob = zlib.decompress(compressed_blob)
+    collected_at_b = zlib.decompress(compressed_collected_at_b)
+    collected_at = array.array(time_array_typechar)
     collected_at.frombytes(collected_at_b)
     yield 'collected_at', collected_at
 
     # TODO: data is unpacked/repacked here with no reason
-    for sensor_path, packed_data in data.items():
+    for sensor_path, (offset, size, typecode) in offset_map.items():
         sensor_path = sensor_path.decode("utf8")
         sensor_name, device, metric = sensor_path.split('.', 2)
-        data = SensorsMap[sensor_name].unpack_results(device, metric, packed_data)
-        yield sensor_path, data
+        sensor_data = SensorsMap[sensor_name].unpack_results(device,
+                                                             metric,
+                                                             blob[offset:offset + size],
+                                                             typecode.decode("ascii"))
+        yield sensor_path, sensor_data
 
 
 def rpc_get_updates():
     if sdata is None:
         raise ValueError("No sensor thread running")
 
-    res = collected_at = None
+    offset_map = collected_at = None
+    blob = ""
 
     with sdata.cond:
         if sdata.exception:
             raise Exception(sdata.exception)
 
-        res = {}
+        offset_map = {}
         for sensor_name, sensor in sdata.sensors.items():
-            for (device, metrics), val in sensor.get_updates().items():
-                res["{}.{}.{}".format(sensor_name, device, metrics)] = val
+            for (device, metrics), (typecode, val) in sensor.get_updates().items():
+                offset_map["{}.{}.{}".format(sensor_name, device, metrics)] = (len(blob), len(val), typecode)
+                blob += val
 
         collected_at = sdata.collected_at
         sdata.collected_at = array.array(sdata.collected_at.typecode)
 
-    # TODO: pack data before send
-    return res, collected_at.tostring()
+    logger.debug(str(collected_at))
+    return offset_map, zlib.compress(blob), zlib.compress(collected_at.tostring())
 
 
 def rpc_stop():
diff --git a/wally/statistic.py b/wally/statistic.py
index 259ac69..b80fb22 100644
--- a/wally/statistic.py
+++ b/wally/statistic.py
@@ -1,5 +1,4 @@
 import math
-import array
 import logging
 import itertools
 import statistics
@@ -17,17 +16,19 @@
 
 logger = logging.getLogger("wally")
 DOUBLE_DELTA = 1e-8
+MIN_VALUES_FOR_CONFIDENCE = 7
 
 
-average = statistics.mean
-dev = lambda x: math.sqrt(statistics.variance(x))
+average = numpy.mean
+dev = lambda x: math.sqrt(numpy.var(x, ddof=1))
 
 
-def calc_norm_stat_props(ts: TimeSeries, confidence: float = 0.95) -> NormStatProps:
+def calc_norm_stat_props(ts: TimeSeries, bins_count: int, confidence: float = 0.95) -> NormStatProps:
     "Calculate statistical properties of array of numbers"
 
-    data = ts.data
-    res = NormStatProps(data)
+    # array.array has very basic support
+    data = cast(List[int], ts.data)
+    res = NormStatProps(data)  # type: ignore
 
     if len(data) == 0:
         raise ValueError("Input array is empty")
@@ -39,57 +40,104 @@
     res.max = data[-1]
     res.min = data[0]
 
-    res.perc_50 = numpy.percentile(data, 50)
-    res.perc_90 = numpy.percentile(data, 90)
-    res.perc_95 = numpy.percentile(data, 95)
-    res.perc_99 = numpy.percentile(data, 99)
+    res.perc_50, res.perc_90, res.perc_99, res.perc_99 = numpy.percentile(data, q=[50., 90., 95., 99.])
 
-    if len(data) >= 3:
+    if len(data) >= MIN_VALUES_FOR_CONFIDENCE:
         res.confidence = stats.sem(data) * \
                          stats.t.ppf((1 + confidence) / 2, len(data) - 1)
+        res.confidence_level = confidence
     else:
         res.confidence = None
+        res.confidence_level = None
 
-    res.bin_populations, res.bin_edges = numpy.histogram(data, 'auto')
+    res.bins_populations, bins_edges = numpy.histogram(data, bins=bins_count)
+    res.bins_mids = (bins_edges[:-1] + bins_edges[1:]) / 2
 
     try:
         res.normtest = stats.mstats.normaltest(data)
     except Exception as exc:
         logger.warning("stats.mstats.normaltest failed with error: %s", exc)
 
+    res.skew = stats.skew(data)
+    res.kurt = stats.kurtosis(data)
+
     return res
 
 
-def calc_histo_stat_props(ts: TimeSeries) -> HistoStatProps:
-    data = numpy.array(ts.data)
-    data.shape = [len(ts.data) // ts.second_axis_size, ts.second_axis_size]
+def calc_histo_stat_props(ts: TimeSeries,
+                          bins_edges: numpy.array,
+                          bins_count: int,
+                          min_valuable: float = 0.0001) -> HistoStatProps:
+    data = numpy.array(ts.data, dtype='int')
+    data.shape = [len(ts.data) // ts.second_axis_size, ts.second_axis_size]  # type: ignore
 
     res = HistoStatProps(ts.data, ts.second_axis_size)
-    aggregated = numpy.sum(data, axis=0)
 
-    full_sum = numpy.sum(aggregated)
-    expected = [full_sum * 0.5, full_sum * 0.9, full_sum * 0.95, full_sum * 0.99]
+    # summ across all series
+    aggregated = numpy.sum(data, axis=0, dtype='int')
+    total = numpy.sum(aggregated)
+
+    # minimal value used for histo
+    min_val_on_histo = total * min_valuable
+
+    # percentiles levels
+    expected = [total * 0.5, total * 0.9, total * 0.95, total * 0.99]
     percentiles = []
 
-    val_min = None
-    val_max = None
+    # all indexes, where values greater than min_val_on_histo
+    valuable_idxs = []
 
-    for idx, val in enumerate(aggregated):
-        while expected and full_sum + val >= expected[0]:
-            percentiles.append(idx)
+    curr_summ = 0
+    non_zero = aggregated.nonzero()[0]
+
+    # calculate percentiles and valuable_indexes
+    for idx in non_zero:
+        val = aggregated[idx]
+        while expected and curr_summ + val >= expected[0]:
+            percentiles.append(bins_edges[idx])
             del expected[0]
 
-        full_sum += val
+        curr_summ += val
 
-        if val != 0:
-            if val_min is None:
-                val_min = idx
-            val_max = idx
+        if val >= min_val_on_histo:
+            valuable_idxs.append(idx)
 
-    res.perc_50, res.perc_90, res.perc_95, res.perc_99 = map(ts.bins_edges.__getitem__, percentiles)
-    res.min = ts.bins_edges[val_min]
-    res.max = ts.bins_edges[val_max]
-    res.bin_populations = aggregated
+    res.perc_50, res.perc_90, res.perc_95, res.perc_99 = percentiles
+
+    # minimax and maximal non-zero elements
+    res.min = bins_edges[aggregated[non_zero[0]]]
+    res.max = bins_edges[non_zero[-1] + (1 if non_zero[-1] != len(bins_edges) else 0)]
+
+    # minimal and maximal valueble evelemts
+    val_idx_min = valuable_idxs[0]
+    val_idx_max = valuable_idxs[-1]
+
+    raw_bins_populations = aggregated[val_idx_min: val_idx_max + 1]
+    raw_bins_edges = bins_edges[val_idx_min: val_idx_max + 2]
+    raw_bins_mids = cast(numpy.array, (raw_bins_edges[1:] + raw_bins_edges[:-1]) / 2)
+
+    step = (raw_bins_mids[-1] + raw_bins_mids[0]) / bins_count
+    next = raw_bins_mids[0]
+
+    # aggregate raw histogram with many bins into result histogram with bins_count bins
+    cidx = 0
+    bins_populations = []
+    bins_mids = []
+
+    while cidx < len(raw_bins_mids):
+        next += step
+        bin_population = 0
+
+        while cidx < len(raw_bins_mids) and raw_bins_mids[cidx] <= next:
+            bin_population += raw_bins_populations[cidx]
+            cidx += 1
+
+        bins_populations.append(bin_population)
+        bins_mids.append(next - step / 2)
+
+    res.bins_populations = numpy.array(bins_populations, dtype='int')
+    res.bins_mids = numpy.array(bins_mids, dtype='float32')
+
     return res
 
 
diff --git a/wally/storage.py b/wally/storage.py
index e4e010c..3e8bbab 100644
--- a/wally/storage.py
+++ b/wally/storage.py
@@ -3,11 +3,12 @@
 """
 
 import os
+import re
 import abc
 import array
 import shutil
 import sqlite3
-import threading
+import logging
 from typing import Any, TypeVar, Type, IO, Tuple, cast, List, Dict, Iterable, Iterator
 
 import yaml
@@ -17,7 +18,10 @@
     from yaml import Loader, Dumper  # type: ignore
 
 
-from .result_classes import IStorable
+from .common_types import IStorable
+
+
+logger = logging.getLogger("wally")
 
 
 class ISimpleStorage(metaclass=abc.ABCMeta):
@@ -278,6 +282,10 @@
 
 class Storage:
     """interface for storage"""
+
+    typechar_pad_size = 16
+    typepad = bytes(0 for i in range(typechar_pad_size - 1))
+
     def __init__(self, fs_storage: ISimpleStorage, db_storage: ISimpleStorage, serializer: ISerializer) -> None:
         self.fs = fs_storage
         self.db = db_storage
@@ -287,15 +295,15 @@
         fpath = "/".join(path)
         return self.__class__(self.fs.sub_storage(fpath), self.db.sub_storage(fpath), self.serializer)
 
-    def put(self, value: IStorable, *path: str) -> None:
-        dct_value = value.raw() if isinstance(value, IStorable) else value
-        serialized = self.serializer.pack(dct_value)
+    def put(self, value: Any, *path: str) -> None:
+        dct_value = cast(IStorable, value).raw() if isinstance(value, IStorable) else value
+        serialized = self.serializer.pack(dct_value)  # type: ignore
         fpath = "/".join(path)
         self.db.put(serialized, fpath)
         self.fs.put(serialized, fpath)
 
     def put_list(self, value: Iterable[IStorable], *path: str) -> None:
-        serialized = self.serializer.pack([obj.raw() for obj in value])
+        serialized = self.serializer.pack([obj.raw() for obj in value])  # type: ignore
         fpath = "/".join(path)
         self.db.put(serialized, fpath)
         self.fs.put(serialized, fpath)
@@ -318,8 +326,14 @@
     def __contains__(self, path: str) -> bool:
         return path in self.fs or path in self.db
 
-    def put_raw(self, val: bytes, *path: str) -> None:
-        self.fs.put(val, "/".join(path))
+    def put_raw(self, val: bytes, *path: str) -> str:
+        fpath = "/".join(path)
+        self.fs.put(val, fpath)
+        # TODO: dirty hack
+        return self.resolve_raw(fpath)
+
+    def resolve_raw(self, fpath) -> str:
+        return cast(FSStorage, self.fs).j(fpath)
 
     def get_raw(self, *path: str) -> bytes:
         return self.fs.get("/".join(path))
@@ -333,35 +347,52 @@
         return self.fs.get_fd(path, mode)
 
     def put_array(self, value: array.array, *path: str) -> None:
+        typechar = value.typecode.encode('ascii')
+        assert len(typechar) == 1
         with self.get_fd("/".join(path), "wb") as fd:
+            fd.write(typechar + self.typepad)
             value.tofile(fd)  # type: ignore
 
-    def get_array(self, typecode: str, *path: str) -> array.array:
-        res = array.array(typecode)
+    def get_array(self, *path: str) -> array.array:
         path_s = "/".join(path)
         with self.get_fd(path_s, "rb") as fd:
             fd.seek(0, os.SEEK_END)
-            size = fd.tell()
+            size = fd.tell() - self.typechar_pad_size
             fd.seek(0, os.SEEK_SET)
+            typecode = chr(fd.read(self.typechar_pad_size)[0])
+            res = array.array(typecode)
             assert size % res.itemsize == 0, "Storage object at path {} contains no array of {} or corrupted."\
                 .format(path_s, typecode)
             res.fromfile(fd, size // res.itemsize)  # type: ignore
         return res
 
     def append(self, value: array.array, *path: str) -> None:
+        typechar = value.typecode.encode('ascii')
+        assert len(typechar) == 1
+        expected_typeheader =  typechar + self.typepad
         with self.get_fd("/".join(path), "cb") as fd:
             fd.seek(0, os.SEEK_END)
+            if fd.tell() != 0:
+                fd.seek(0, os.SEEK_SET)
+                real_typecode = fd.read(self.typechar_pad_size)
+                if real_typecode[0] != expected_typeheader[0]:
+                    logger.error("Try to append array with typechar %r to array with typechar %r at path %r",
+                                 value.typecode, typechar, "/".join(path))
+                    raise StopIteration()
+                fd.seek(0, os.SEEK_END)
+            else:
+                fd.write(expected_typeheader)
             value.tofile(fd)  # type: ignore
 
     def load_list(self, obj_class: Type[ObjClass], *path: str) -> List[ObjClass]:
         path_s = "/".join(path)
         raw_val = cast(List[Dict[str, Any]], self.get(path_s))
         assert isinstance(raw_val, list)
-        return [obj_class.fromraw(val) for val in raw_val]
+        return [cast(ObjClass, obj_class.fromraw(val)) for val in raw_val]
 
     def load(self, obj_class: Type[ObjClass], *path: str) -> ObjClass:
         path_s = "/".join(path)
-        return obj_class.fromraw(self.get(path_s))
+        return cast(ObjClass, obj_class.fromraw(self.get(path_s)))
 
     def sync(self) -> None:
         self.db.sync()
@@ -376,6 +407,33 @@
     def list(self, *path: str) -> Iterator[Tuple[bool, str]]:
         return self.fs.list("/".join(path))
 
+    def _iter_paths(self,
+                    root: str,
+                    path_parts: List[str],
+                    groups: Dict[str, str]) -> Iterator[Tuple[bool, str, Dict[str, str]]]:
+
+        curr = path_parts[0]
+        rest = path_parts[1:]
+
+        for is_file, name in self.list(root):
+            if rest and is_file:
+                continue
+
+            rr = re.match(pattern=curr + "$", string=name)
+            if rr:
+                if root:
+                    path = root + "/" + name
+                else:
+                    path = name
+
+                new_groups = rr.groupdict().copy()
+                new_groups.update(groups)
+
+                if rest:
+                    yield from self._iter_paths(path, rest, new_groups)
+                else:
+                    yield is_file, path, new_groups
+
 
 def make_storage(url: str, existing: bool = False) -> Storage:
     return Storage(FSStorage(url, existing),
diff --git a/wally/storage_structure.yaml b/wally/storage_structure.yaml
index e748dd8..8493e29 100644
--- a/wally/storage_structure.yaml
+++ b/wally/storage_structure.yaml
@@ -5,6 +5,7 @@
 # {dev} - device name '[^.]+'
 # {suite} - suite name '[a-z]+'
 # {profile} - profile name '[a-z_]+'
+# {sensor} - sensor name '[-a-z]+'
 
 
 config: Config                   # test input configuration
@@ -14,6 +15,7 @@
 fuel_version: List[int]          # FUEL master node version
 fuel_os_creds: OSCreds           # openstack creds, discovered from fuel (or None)
 openstack_openrc: OSCreds        # openrc used for openstack cluster
+
 info:
     comment : str               # run comment
     run_uuid : str              # run uuid
@@ -26,11 +28,11 @@
 
         # dev in next line is tool name - fio/vdbench/....
         '{node}_{dev}.{metric_name}:raw' : bytes  # raw log, where name from {'bw', 'iops', 'lat', ..}
+        '{node}_{dev}.{metric_name}:stat' : StatProps  # type of props detected by content
         '{node}_{dev}.{metric_name}': List[uint64]   # measurements data concatenated with collect times in
-                                                     # microseconds from unix epoch
-
-sensors:
-    '{node}_{dev}.{metric_name}:raw' : bytes          # raw log, where name from {'bw', 'iops', 'lat', ..}
-    '{node}_{dev}.{metric_name}': List[uint64]   # measurements data cotaneted with collect times in microseconds from unix epoch
+                                                     # microseconds from unix epoch and typechars
+'sensors/{node}_{sensor}.{dev}.{metric_name}': typechar + array[uint64]  # sensor values
+'sensors/{node}_{sensor}.{dev}.{metric_name}:stat': StatProps # statistic data
+'sensors/{node}_collected_at': typechar + array[uint64]  # collection time
 
 'rpc_logs/{node}' : bytes   # rpc server log from node
diff --git a/wally/suits/all_suits.py b/wally/suits/all_suits.py
new file mode 100644
index 0000000..310b1f5
--- /dev/null
+++ b/wally/suits/all_suits.py
@@ -0,0 +1,8 @@
+from .io.fio import FioTest
+# from .suits.itest import TestSuiteConfig
+# from .suits.mysql import MysqlTest
+# from .suits.omgbench import OmgTest
+# from .suits.postgres import PgBenchTest
+
+
+all_suits = {suite.name: suite for suite in [FioTest]}
diff --git a/wally/suits/io/defaults_qd.cfg b/wally/suits/io/defaults_qd.cfg
index fcf5c16..c3dee19 100644
--- a/wally/suits/io/defaults_qd.cfg
+++ b/wally/suits/io/defaults_qd.cfg
@@ -18,7 +18,6 @@
 filename={FILENAME}
 size={FILESIZE}
 
-write_iops_log=fio_iops_log
 write_bw_log=fio_bw_log
 log_avg_msec=1000
 write_hist_log=fio_lat_hist_log
diff --git a/wally/suits/io/fio.py b/wally/suits/io/fio.py
index 7b2c3e3..33e8343 100644
--- a/wally/suits/io/fio.py
+++ b/wally/suits/io/fio.py
@@ -1,14 +1,15 @@
 import array
 import os.path
 import logging
-from typing import cast, Any
+from typing import cast, Any, Tuple, List
 
 import wally
 
-from ...utils import StopTestError, get_os, ssize2b
+from ...utils import StopTestError, ssize2b, b2ssize
 from ...node_interfaces import IRPCNode
+from ...node_utils import get_os
 from ..itest import ThreadedTest
-from ...result_classes import TimeSeries, JobMetrics
+from ...result_classes import TimeSeries, DataSource, TestJobConfig
 from .fio_task_parser import execution_time, fio_cfg_compile, FioJobConfig, FioParams, get_log_files
 from . import rpc_plugin
 from .fio_hist import expected_lat_bins
@@ -17,7 +18,7 @@
 logger = logging.getLogger("wally")
 
 
-class IOPerfTest(ThreadedTest):
+class FioTest(ThreadedTest):
     soft_runcycle = 5 * 60
     retry_time = 30
     configs_dir = os.path.dirname(__file__)  # type: str
@@ -27,7 +28,7 @@
     def __init__(self, *args, **kwargs) -> None:
         super().__init__(*args, **kwargs)
 
-        get = self.config.params.get
+        get = self.suite.params.get
 
         self.remote_task_file = self.join_remote("task.fio")
         self.remote_output_file = self.join_remote("fio_result.json")
@@ -35,7 +36,7 @@
         self.use_sudo = get("use_sudo", True)  # type: bool
         self.force_prefill = get('force_prefill', False)  # type: bool
 
-        self.load_profile_name = self.config.params['load']  # type: str
+        self.load_profile_name = self.suite.params['load']  # type: str
 
         if os.path.isfile(self.load_profile_name):
             self.load_profile_path = self.load_profile_name   # type: str
@@ -47,16 +48,16 @@
         if self.use_system_fio:
             self.fio_path = "fio"    # type: str
         else:
-            self.fio_path = os.path.join(self.config.remote_dir, "fio")
+            self.fio_path = os.path.join(self.suite.remote_dir, "fio")
 
-        self.load_params = self.config.params['params']
+        self.load_params = self.suite.params['params']
         self.file_name = self.load_params['FILENAME']
 
         if 'FILESIZE' not in self.load_params:
             logger.debug("Getting test file sizes on all nodes")
             try:
-                sizes = {node.conn.fs.file_stat(self.file_name)['size']
-                         for node in self.config.nodes}
+                sizes = {node.conn.fs.file_stat(self.file_name)[b'size']
+                         for node in self.suite.nodes}
             except Exception:
                 logger.exception("FILESIZE is not set in config file and fail to detect it." +
                                  "Set FILESIZE or fix error and rerun test")
@@ -68,7 +69,7 @@
                 raise StopTestError()
 
             self.file_size = list(sizes)[0]
-            logger.info("Detected test file size is %s", self.file_size)
+            logger.info("Detected test file size is %sB", b2ssize(self.file_size))
             self.load_params['FILESIZE'] = self.file_size
         else:
             self.file_size = ssize2b(self.load_params['FILESIZE'])
@@ -80,31 +81,41 @@
             logger.error("Empty fio config provided")
             raise StopTestError()
 
-        self.exec_folder = self.config.remote_dir
+        self.exec_folder = self.suite.remote_dir
 
     def config_node(self, node: IRPCNode) -> None:
         plugin_code = open(rpc_plugin.__file__.rsplit(".", 1)[0] + ".py", "rb").read()  # type: bytes
         node.upload_plugin("fio", plugin_code)
 
         try:
-            node.conn.fs.rmtree(self.config.remote_dir)
+            node.conn.fs.rmtree(self.suite.remote_dir)
         except Exception:
             pass
 
         try:
-            node.conn.fs.makedirs(self.config.remote_dir)
+            node.conn.fs.makedirs(self.suite.remote_dir)
         except Exception:
-            msg = "Failed to recreate folder {} on remote {}.".format(self.config.remote_dir, node)
+            msg = "Failed to recreate folder {} on remote {}.".format(self.suite.remote_dir, node)
             logger.exception(msg)
             raise StopTestError()
 
+        # TODO: check this during config validation
+        if self.file_size % (4 * (1024 ** 2)) != 0:
+            logger.error("Test file size must be proportional to 4MiB")
+            raise StopTestError()
+
         self.install_utils(node)
 
         mb = int(self.file_size / 1024 ** 2)
-        logger.info("Filling test file %s with %sMiB of random data", self.file_name, mb)
-        fill_bw = node.conn.fio.fill_file(self.file_name, mb, force=self.force_prefill, fio_path=self.fio_path)
-        if fill_bw is not None:
-            logger.info("Initial fio fill bw is {} MiBps for {}".format(fill_bw, node))
+        logger.info("Filling test file %s on node %s with %sMiB of random data", self.file_name, node.info, mb)
+        is_prefilled, fill_bw = node.conn.fio.fill_file(self.file_name, mb,
+                                                        force=self.force_prefill,
+                                                        fio_path=self.fio_path)
+
+        if not is_prefilled:
+            logger.info("Test file on node %s is already prefilled", node.info)
+        elif fill_bw is not None:
+            logger.info("Initial fio fill bw is %s MiBps for %s", fill_bw, node.info)
 
     def install_utils(self, node: IRPCNode) -> None:
         os_info = get_os(node)
@@ -126,19 +137,19 @@
                 raise StopTestError()
 
             bz_dest = self.join_remote('fio.bz2')  # type: str
-            node.copy_file(fio_path, bz_dest)
+            node.copy_file(fio_path, bz_dest, compress=False)
             node.run("bzip2 --decompress {} ; chmod a+x {}".format(bz_dest, self.join_remote("fio")))
 
-    def get_expected_runtime(self, job_config: FioJobConfig) -> int:
+    def get_expected_runtime(self, job_config: TestJobConfig) -> int:
         return execution_time(cast(FioJobConfig, job_config))
 
-    def prepare_iteration(self, node: IRPCNode, job_config: FioJobConfig) -> None:
-        node.put_to_file(self.remote_task_file, str(job_config).encode("utf8"))
+    def prepare_iteration(self, node: IRPCNode, job: TestJobConfig) -> None:
+        node.put_to_file(self.remote_task_file, str(job).encode("utf8"))
 
     # TODO: get a link to substorage as a parameter
-    def run_iteration(self, node: IRPCNode, iter_config: FioJobConfig, job_root: str) -> JobMetrics:
-        f_iter_config = cast(FioJobConfig, iter_config)
-        exec_time = execution_time(f_iter_config)
+    def run_iteration(self, node: IRPCNode, job: TestJobConfig) -> List[TimeSeries]:
+        exec_time = execution_time(cast(FioJobConfig, job))
+
 
         fio_cmd_templ = "cd {exec_folder}; " + \
                         "{fio_path} --output-format=json --output={out_file} --alloc-size=262144 {job_file}"
@@ -152,20 +163,26 @@
         if must_be_empty:
             logger.error("Unexpected fio output: %r", must_be_empty)
 
-        res = {}  # type: JobMetrics
-
         # put fio output into storage
         fio_out = node.get_file_content(self.remote_output_file)
-        self.rstorage.put_extra(job_root, node.info.node_id(), "fio_raw", fio_out)
+
+        path = DataSource(suite_id=self.suite.storage_id,
+                          job_id=job.storage_id,
+                          node_id=node.node_id,
+                          dev='fio',
+                          sensor='stdout',
+                          tag='json')
+
+        self.storage.put_extra(fio_out, path)
         node.conn.fs.unlink(self.remote_output_file)
 
         files = [name for name in node.conn.fs.listdir(self.exec_folder)]
-
-        for name, path in get_log_files(f_iter_config):
-            log_files = [fname for fname in files if fname.startswith(path)]
+        result = []
+        for name, file_path in get_log_files(cast(FioJobConfig, job)):
+            log_files = [fname for fname in files if fname.startswith(file_path)]
             if len(log_files) != 1:
                 logger.error("Found %s files, match log pattern %s(%s) - %s",
-                             len(log_files), path, name, ",".join(log_files[10:]))
+                             len(log_files), file_path, name, ",".join(log_files[10:]))
                 raise StopTestError()
 
             fname = os.path.join(self.exec_folder, log_files[0])
@@ -203,14 +220,13 @@
                         logger.exception("Error during parse %s fio log file in line %s: %r", name, idx, line)
                         raise StopTestError()
 
-            ts = TimeSeries(name=name,
-                            raw=raw_result,
-                            second_axis_size=expected_lat_bins if name == 'lat' else 1,
-                            data=parsed,
-                            times=times)
-            res[(node.info.node_id(), 'fio', name)] = ts
-
-        return res
+            result.append(TimeSeries(name=name,
+                                     raw=raw_result,
+                                     second_axis_size=expected_lat_bins if name == 'lat' else 1,
+                                     data=parsed,
+                                     times=times,
+                                     source=path(sensor=name, tag=None)))
+        return result
 
     def format_for_console(self, data: Any) -> str:
         raise NotImplementedError()
diff --git a/wally/suits/io/fio_task_parser.py b/wally/suits/io/fio_task_parser.py
index 6940aaf..03702ae 100644
--- a/wally/suits/io/fio_task_parser.py
+++ b/wally/suits/io/fio_task_parser.py
@@ -7,13 +7,12 @@
 import os.path
 import argparse
 import itertools
-from typing import Optional, Iterator, Union, Dict, Iterable, List, TypeVar, Callable, Tuple, NamedTuple, Any
+from typing import Optional, Iterator, Union, Dict, Iterable, List, TypeVar, Callable, Tuple, NamedTuple, Any, cast
 from collections import OrderedDict
 
 
-from ...result_classes import IStorable
 from ...result_classes import TestJobConfig
-from ...utils import sec_to_str, ssize2b
+from ...utils import sec_to_str, ssize2b, b2ssize, flatmap
 
 
 SECTION = 0
@@ -29,23 +28,162 @@
                       ('tp', int),
                       ('name', str),
                       ('val', Any)])
+FioTestSumm = NamedTuple("FioTestSumm",
+                         [("oper", str),
+                          ("sync_mode", str),
+                          ("bsize", int),
+                          ("qd", int),
+                          ("thcount", int),
+                          ("write_perc", Optional[int])])
 
-TestSumm = NamedTuple("TestSumm",
-                      [("oper", str),
-                       ("mode", str),
-                       ("bsize", int),
-                       ("iodepth", int),
-                       ("vm_count", int)])
+
+def is_fio_opt_true(vl: Union[str, int]) -> bool:
+    return str(vl).lower() in ['1', 'true', 't', 'yes', 'y']
 
 
 class FioJobConfig(TestJobConfig):
-    def __init__(self, name: str) -> None:
-        TestJobConfig.__init__(self)
-        self.vals = OrderedDict()  # type: Dict[str, Any]
-        self.name = name
 
-    def __eq__(self, other: 'FioJobConfig') -> bool:
-        return self.vals == other.vals
+    ds2mode = {(True, True): 'x',
+               (True, False): 's',
+               (False, True): 'd',
+               (False, False): 'a'}
+
+    sync2long = {'x': "sync direct",
+                 's': "sync",
+                 'd': "direct",
+                 'a': "buffered"}
+
+    op_type2short = {"randread": "rr",
+                     "randwrite": "rw",
+                     "read": "sr",
+                     "write": "sw",
+                     "randrw": "rx"}
+
+    def __init__(self, name: str, idx: int) -> None:
+        TestJobConfig.__init__(self, idx)
+        self.name = name
+        self._sync_mode = None  # type: Optional[str]
+        self._ctuple = None  # type: Optional[FioTestSumm]
+        self._ctuple_no_qd = None  # type: Optional[FioTestSumm]
+
+    # ------------- BASIC PROPERTIES -----------------------------------------------------------------------------------
+
+    @property
+    def write_perc(self) -> Optional[int]:
+        try:
+            return int(self.vals["rwmixwrite"])
+        except (KeyError, TypeError):
+            try:
+                return 100 - int(self.vals["rwmixread"])
+            except (KeyError, TypeError):
+                return None
+
+    @property
+    def qd(self) -> int:
+        return int(self.vals['iodepth'])
+
+    @property
+    def bsize(self) -> int:
+        return ssize2b(self.vals['blocksize']) // 1024
+
+    @property
+    def oper(self) -> str:
+        return self.vals['rw']
+
+    @property
+    def op_type_short(self) -> str:
+        return self.op_type2short[self.vals['rw']]
+
+    @property
+    def thcount(self) -> int:
+        return int(self.vals.get('numjobs', 1))
+
+    @property
+    def sync_mode(self) -> str:
+        if self._sync_mode is None:
+            direct = is_fio_opt_true(self.vals.get('direct', '0')) or \
+                     not is_fio_opt_true(self.vals.get('buffered', '0'))
+            sync = is_fio_opt_true(self.vals.get('sync', '0'))
+            self._sync_mode = self.ds2mode[(sync, direct)]
+        return cast(str, self._sync_mode)
+
+    @property
+    def sync_mode_long(self) -> str:
+        return self.sync2long[self.sync_mode]
+
+    # ----------- COMPLEX PROPERTIES -----------------------------------------------------------------------------------
+
+    @property
+    def characterized_tuple(self) -> Tuple:
+        if self._ctuple is None:
+            self._ctuple = FioTestSumm(oper=self.oper,
+                                       sync_mode=self.sync_mode,
+                                       bsize=self.bsize,
+                                       qd=self.qd,
+                                       thcount=self.thcount,
+                                       write_perc=self.write_perc)
+
+        return cast(Tuple, self._ctuple)
+
+    @property
+    def characterized_tuple_no_qd(self) -> FioTestSumm:
+        if self._ctuple_no_qd is None:
+            self._ctuple_no_qd = FioTestSumm(oper=self.oper,
+                                             sync_mode=self.sync_mode,
+                                             bsize=self.bsize,
+                                             qd=None,
+                                             thcount=self.thcount,
+                                             write_perc=self.write_perc)
+
+        return cast(FioTestSumm, self._ctuple_no_qd)
+
+    @property
+    def long_summary(self) -> str:
+        res = "{0.sync_mode_long} {0.oper} {1} QD={0.qd}".format(self, b2ssize(self.bsize * 1024))
+        if self.thcount != 1:
+            res += " threads={}".format(self.thcount)
+        if self.write_perc is not None:
+            res += " write_perc={}%".format(self.write_perc)
+        return res
+
+    @property
+    def long_summary_no_qd(self) -> str:
+        res = "{0.sync_mode_long} {0.oper} {1}".format(self, b2ssize(self.bsize * 1024))
+        if self.thcount != 1:
+            res += " threads={}".format(self.thcount)
+        if self.write_perc is not None:
+            res += " write_perc={}%".format(self.write_perc)
+        return res
+
+    @property
+    def summary(self) -> str:
+        tpl = cast(FioTestSumm, self.characterized_tuple)
+        res = "{0.oper}{0.sync_mode}{0.bsize}_qd{0.qd}".format(tpl)
+
+        if tpl.thcount != 1:
+            res += "th" + str(tpl.thcount)
+        if tpl.write_perc != 1:
+            res += "wr" + str(tpl.write_perc)
+
+        return res
+
+    @property
+    def summary_no_qd(self) -> str:
+        tpl = cast(FioTestSumm, self.characterized_tuple)
+        res = "{0.oper}{0.sync_mode}{0.bsize}".format(tpl)
+
+        if tpl.thcount != 1:
+            res += "th" + str(tpl.thcount)
+        if tpl.write_perc != 1:
+            res += "wr" + str(tpl.write_perc)
+
+        return res
+    # ------------------------------------------------------------------------------------------------------------------
+
+    def __eq__(self, o: object) -> bool:
+        if not isinstance(o, FioJobConfig):
+            return False
+        return self.vals == cast(FioJobConfig, o).vals
 
     def copy(self) -> 'FioJobConfig':
         return copy.deepcopy(self)
@@ -75,17 +213,17 @@
         return str(self)
 
     def raw(self) -> Dict[str, Any]:
-        return {
-            'vals': [[key, val] for key, val in self.vals.items()],
-            'summary': self.summary,
-            'name': self.name
-        }
+        res = self.__dict__.copy()
+        del res['_sync_mode']
+        res['vals'] = [[key, val] for key, val in self.vals.items()]
+        return res
 
     @classmethod
     def fromraw(cls, data: Dict[str, Any]) -> 'FioJobConfig':
-        obj = cls(data['name'])
-        obj.summary = data['summary']
-        obj.vals.update(data['vals'])
+        obj = cls.__new__(cls)
+        data['vals'] = OrderedDict(data['vals'])
+        data['_sync_mode'] = None
+        obj.__dict__.update(data)
         return obj
 
 
@@ -203,6 +341,8 @@
 
         lexed_lines = new_lines
 
+    suite_section_idx = 0
+
     for fname, lineno, oline, tp, name, val in lexed_lines:
         if tp == SECTION:
             if curr_section is not None:
@@ -215,7 +355,8 @@
                 in_globals = True
             else:
                 in_globals = False
-                curr_section = FioJobConfig(name)
+                curr_section = FioJobConfig(name, idx=suite_section_idx)
+                suite_section_idx += 1
                 curr_section.vals = glob_vals.copy()
             sections_count += 1
         else:
@@ -332,68 +473,13 @@
     params = sec.vals.copy()
     params['UNIQ'] = 'UN{0}'.format(counter[0])
     params['COUNTER'] = str(counter[0])
-    params['TEST_SUMM'] = get_test_summary(sec)
+    params['TEST_SUMM'] = sec.summary
     sec.name = sec.name.format(**params)
     counter[0] += 1
 
     return sec
 
 
-def get_test_sync_mode(sec: FioJobConfig) -> str:
-    if isinstance(sec, dict):
-        vals = sec
-    else:
-        vals = sec.vals
-
-    is_sync = str(vals.get("sync", "0")) == "1"
-    is_direct = str(vals.get("direct", "0")) == "1"
-
-    if is_sync and is_direct:
-        return 'x'
-    elif is_sync:
-        return 's'
-    elif is_direct:
-        return 'd'
-    else:
-        return 'a'
-
-
-def get_test_summary_tuple(sec: FioJobConfig, vm_count: int = None) -> TestSumm:
-    if isinstance(sec, dict):
-        vals = sec
-    else:
-        vals = sec.vals
-
-    rw = {"randread": "rr",
-          "randwrite": "rw",
-          "read": "sr",
-          "write": "sw",
-          "randrw": "rm",
-          "rw": "sm",
-          "readwrite": "sm"}[vals["rw"]]
-
-    sync_mode = get_test_sync_mode(sec)
-
-    return TestSumm(rw,
-                    sync_mode,
-                    vals['blocksize'],
-                    vals.get('iodepth', '1'),
-                    vm_count)
-
-
-def get_test_summary(sec: FioJobConfig, vm_count: int = None, noiodepth: bool = False) -> str:
-    tpl = get_test_summary_tuple(sec, vm_count)
-
-    res = "{0.oper}{0.mode}{0.bsize}".format(tpl)
-    if not noiodepth:
-        res += "qd{}".format(tpl.iodepth)
-
-    if tpl.vm_count is not None:
-        res += "vm{}".format(tpl.vm_count)
-
-    return res
-
-
 def execution_time(sec: FioJobConfig) -> int:
     return sec.vals.get('ramp_time', 0) + sec.vals.get('runtime', 0)
 
@@ -402,23 +488,18 @@
     return fio_config_parse(fio_config_lexer(source, fname))
 
 
-FM_FUNC_INPUT = TypeVar("FM_FUNC_INPUT")
-FM_FUNC_RES = TypeVar("FM_FUNC_RES")
-
-
-def flatmap(func: Callable[[FM_FUNC_INPUT], Iterable[FM_FUNC_RES]],
-            inp_iter: Iterable[FM_FUNC_INPUT]) -> Iterator[FM_FUNC_RES]:
-    for val in inp_iter:
-        for res in func(val):
-            yield res
-
-
-def get_log_files(sec: FioJobConfig) -> List[Tuple[str, str]]:
+def get_log_files(sec: FioJobConfig, iops: bool = False) -> List[Tuple[str, str]]:
     res = []  # type: List[Tuple[str, str]]
-    for key, name in (('write_iops_log', 'iops'), ('write_bw_log', 'bw'), ('write_hist_log', 'lat')):
+
+    keys = [('write_bw_log', 'bw'), ('write_hist_log', 'lat')]
+    if iops:
+        keys.append(('write_iops_log', 'iops'))
+
+    for key, name in keys:
         log = sec.vals.get(key)
         if log is not None:
             res.append((name, log))
+
     return res
 
 
@@ -427,7 +508,6 @@
     it = (apply_params(sec, test_params) for sec in it)
     it = flatmap(process_cycles, it)
     for sec in map(final_process, it):
-        sec.summary = get_test_summary(sec)
         yield sec
 
 
diff --git a/wally/suits/io/one_step.cfg b/wally/suits/io/one_step.cfg
new file mode 100644
index 0000000..3e08c8b
--- /dev/null
+++ b/wally/suits/io/one_step.cfg
@@ -0,0 +1,9 @@
+[global]
+include defaults_qd.cfg
+ramp_time=0
+runtime={RUNTIME}
+
+[test_{TEST_SUMM}]
+blocksize=60k
+rw=randread
+iodepth=1
\ No newline at end of file
diff --git a/wally/suits/io/rpc_plugin.py b/wally/suits/io/rpc_plugin.py
index 98e55f0..5f5cfb5 100644
--- a/wally/suits/io/rpc_plugin.py
+++ b/wally/suits/io/rpc_plugin.py
@@ -13,6 +13,7 @@
 logger = logging.getLogger("agent.fio")
 
 
+# TODO: fix this in case if file is block device
 def check_file_prefilled(path, used_size_mb):
     used_size = used_size_mb * 1024 ** 2
     blocks_to_check = 16
@@ -20,9 +21,9 @@
     try:
         fstats = os.stat(path)
         if stat.S_ISREG(fstats.st_mode) and fstats.st_size < used_size:
-            return True
+            return False
     except EnvironmentError:
-        return True
+        return False
 
     offsets = [random.randrange(used_size - 1024) for _ in range(blocks_to_check)]
     offsets.append(used_size - 1024)
@@ -32,15 +33,15 @@
         for offset in offsets:
             fd.seek(offset)
             if b"\x00" * 1024 == fd.read(1024):
-                return True
+                return False
 
-    return False
+    return True
 
 
 def rpc_fill_file(fname, size, force=False, fio_path='fio'):
     if not force:
-        if not check_file_prefilled(fname, size):
-            return
+        if check_file_prefilled(fname, size):
+            return False, None
 
     assert size % 4 == 0, "File size must be proportional to 4M"
 
@@ -50,7 +51,9 @@
     subprocess.check_output(cmd_templ.format(fio_path, fname, size), shell=True)
     run_time = time.time() - run_time
 
-    return None if run_time < 1.0 else int(size / run_time)
+    prefill_bw = None if run_time < 1.0 else int(size / run_time)
+
+    return True, prefill_bw
 
 
 def rpc_install(name, binary):
diff --git a/wally/suits/io/rrd.cfg b/wally/suits/io/rrd.cfg
index ca3c613..ae2d960 100644
--- a/wally/suits/io/rrd.cfg
+++ b/wally/suits/io/rrd.cfg
@@ -1,7 +1,7 @@
 [global]
 include defaults_qd.cfg
 ramp_time=0
-runtime=10
+runtime={RUNTIME}
 
 [test_{TEST_SUMM}]
 blocksize=60k
@@ -12,3 +12,23 @@
 iodepth=16
 blocksize=60k
 rw=randread
+
+[test_{TEST_SUMM}]
+blocksize=60k
+rw=randwrite
+iodepth=1
+
+[test_{TEST_SUMM}]
+iodepth=16
+blocksize=60k
+rw=randwrite
+
+[test_{TEST_SUMM}]
+iodepth=1
+blocksize=1m
+rw=write
+
+[test_{TEST_SUMM}]
+iodepth=1
+blocksize=1m
+rw=read
diff --git a/wally/suits/io/rrd_qd_scan.cfg b/wally/suits/io/rrd_qd_scan.cfg
new file mode 100644
index 0000000..e0937c9
--- /dev/null
+++ b/wally/suits/io/rrd_qd_scan.cfg
@@ -0,0 +1,9 @@
+[global]
+include defaults_qd.cfg
+ramp_time=0
+runtime={RUNTIME}
+
+[test_{TEST_SUMM}]
+blocksize=4k
+rw=randread
+iodepth={QDS}
diff --git a/wally/suits/io/rrd_raw.cfg b/wally/suits/io/rrd_raw.cfg
new file mode 100644
index 0000000..2b0fc74
--- /dev/null
+++ b/wally/suits/io/rrd_raw.cfg
@@ -0,0 +1,21 @@
+[test]
+blocksize=4k
+rw=randread
+iodepth=1
+ramp_time=0
+runtime=120
+buffered=0
+direct=1
+sync=0
+ioengine=libaio
+group_reporting=1
+unified_rw_reporting=1
+norandommap=1
+numjobs=1
+thread=1
+time_based=1
+wait_for_previous=1
+per_job_logs=0
+randrepeat=0
+filename=/dev/sdb
+size=100G
\ No newline at end of file
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index bc6b115..ac9e1c1 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -1,19 +1,14 @@
-import re
 import abc
 import time
-import array
-import struct
 import logging
 import os.path
-import datetime
-from typing import Any, List, Optional, Callable, cast, Iterator, Tuple, Iterable
+from typing import Any, List, Optional, Callable, Tuple, Iterable, cast
 
 from concurrent.futures import ThreadPoolExecutor, wait, Future
 
-from ..utils import StopTestError, sec_to_str, get_time_interval_printable_info
+from ..utils import StopTestError, get_time_interval_printable_info
 from ..node_interfaces import IRPCNode
-from ..storage import Storage
-from ..result_classes import TestSuiteConfig, TestJobConfig, JobMetrics, TimeSeries
+from ..result_classes import TestSuiteConfig, TestJobConfig, JobMetrics, TimeSeries, IResultStorage
 
 
 logger = logging.getLogger("wally")
@@ -22,127 +17,6 @@
 __doc__ = "Contains base classes for performance tests"
 
 
-class ResultStorage:
-    ts_header_format = "!IIIcc"
-
-    def __init__(self, storage: Storage, job_config_cls: type) -> None:
-        self.storage = storage
-        self.job_config_cls = job_config_cls
-
-    def get_suite_root(self, suite_type: str, idx: int) -> str:
-        return "results/{}_{}".format(suite_type, idx)
-
-    def get_job_root(self, suite_root: str, summary: str, run_id: int) -> str:
-        return "{}/{}_{}".format(suite_root, summary, run_id)
-
-    # store
-    def put_suite_config(self, config: TestSuiteConfig, root: str) -> None:
-        self.storage.put(config, root, "config.yml")
-
-    def put_job_config(self, config: TestJobConfig, root: str) -> None:
-        self.storage.put(config, root, "config.yml")
-
-    def get_suite_config(self, suite_root: str) -> TestSuiteConfig:
-        return self.storage.load(TestSuiteConfig, suite_root, "config.yml")
-
-    def get_job_node_prefix(self, job_root_path: str, node_id: str) -> str:
-        return "{}/{}".format(job_root_path, node_id)
-
-    def get_ts_path(self, job_root_path: str, node_id: str, dev: str, sensor_name: str) -> str:
-        return "{}_{}.{}".format(self.get_job_node_prefix(job_root_path, node_id), dev, sensor_name)
-
-    def put_ts(self, ts: TimeSeries, job_root_path: str, node_id: str, dev: str, sensor_name: str) -> None:
-        # TODO: check that 'metrics', 'dev' and 'node_id' match required patterns
-        root_path = self.get_ts_path(job_root_path, node_id, dev, sensor_name)
-
-        if len(ts.data) / ts.second_axis_size != len(ts.times):
-            logger.error("Unbalanced time series data. Array size has % elements, while time size has %",
-                         len(ts.data) / ts.second_axis_size, len(ts.times))
-            raise StopTestError()
-
-        with self.storage.get_fd(root_path, "cb") as fd:
-            header = struct.pack(self.ts_header_format,
-                                 ts.second_axis_size,
-                                 len(ts.data),
-                                 len(ts.times),
-                                 cast(array.array, ts.data).typecode.encode("ascii"),
-                                 cast(array.array, ts.times).typecode.encode("ascii"))
-            fd.write(header)
-            cast(array.array, ts.data).tofile(fd)
-            cast(array.array, ts.times).tofile(fd)
-
-        if ts.raw is not None:
-            self.storage.put_raw(ts.raw, root_path + ":raw")
-
-    def put_extra(self, job_root: str, node_id: str, key: str, data: bytes) -> None:
-        self.storage.put_raw(data, job_root, node_id + "_" + key)
-
-    def list_suites(self) -> Iterator[Tuple[TestSuiteConfig, str]]:
-        """iterates over (suite_name, suite_id, suite_root_path)
-        primary this function output should be used as input into list_jobs_in_suite method
-        """
-        ts_re = re.compile(r"[a-zA-Z]+_\d+$")
-        for is_file, name in self.storage.list("results"):
-            if not is_file:
-                rr = ts_re.match(name)
-                if rr:
-                    path = "results/" + name
-                    yield self.get_suite_config(path), path
-
-    def list_jobs_in_suite(self, suite_root_path: str) -> Iterator[Tuple[TestJobConfig, str, int]]:
-        """iterates over (job_summary, job_root_path)
-        primary this function output should be used as input into list_ts_in_job method
-        """
-        ts_re = re.compile(r"(?P<job_summary>[a-zA-Z0-9]+)_(?P<id>\d+)$")
-        for is_file, name in self.storage.list(suite_root_path):
-            if is_file:
-                continue
-            rr = ts_re.match(name)
-            if rr:
-                config_path = "{}/{}/config.yml".format(suite_root_path, name)
-                if config_path in self.storage:
-                    cfg = self.storage.load(self.job_config_cls, config_path)
-                    yield cfg, "{}/{}".format(suite_root_path, name), int(rr.group("id"))
-
-    def list_ts_in_job(self, job_root_path: str) -> Iterator[Tuple[str, str, str]]:
-        """iterates over (node_id, device_name, sensor_name)
-        primary this function output should be used as input into load_ts method
-        """
-        # TODO: check that all TS files available
-        ts_re = re.compile(r"(?P<node_id>\d+\.\d+\.\d+\.\d+:\d+)_(?P<dev>[^.]+)\.(?P<sensor>[a-z_]+)$")
-        already_found = set()
-        for is_file, name in self.storage.list(job_root_path):
-            if not is_file:
-                continue
-            rr = ts_re.match(name)
-            if rr:
-                key = (rr.group("node_id"), rr.group("dev"), rr.group("sensor"))
-                if key not in already_found:
-                    already_found.add(key)
-                    yield key
-
-    def load_ts(self, root_path: str, node_id: str, dev: str, sensor_name: str) -> TimeSeries:
-        path = self.get_ts_path(root_path, node_id, dev, sensor_name)
-
-        with self.storage.get_fd(path, "rb") as fd:
-            header = fd.read(struct.calcsize(self.ts_header_format))
-            second_axis_size, data_sz, time_sz, data_typecode, time_typecode = \
-                struct.unpack(self.ts_header_format, header)
-
-            data = array.array(data_typecode.decode("ascii"))
-            times = array.array(time_typecode.decode("ascii"))
-
-            data.fromfile(fd, data_sz)
-            times.fromfile(fd, time_sz)
-
-            # calculate number of elements
-            return TimeSeries("{}.{}".format(dev, sensor_name),
-                              raw=None,
-                              data=data,
-                              times=times,
-                              second_axis_size=second_axis_size)
-
-
 class PerfTest(metaclass=abc.ABCMeta):
     """Base class for all tests"""
     name = None  # type: str
@@ -150,20 +24,18 @@
     retry_time = 30
     job_config_cls = None  # type: type
 
-    def __init__(self, storage: Storage, config: TestSuiteConfig, idx: int, on_idle: Callable[[], None] = None) -> None:
-        self.config = config
+    def __init__(self, storage: IResultStorage, suite: TestSuiteConfig, on_idle: Callable[[], None] = None) -> None:
+        self.suite = suite
         self.stop_requested = False
-        self.sorted_nodes_ids = sorted(node.info.node_id() for node in self.config.nodes)
+        self.sorted_nodes_ids = sorted(node.node_id for node in self.suite.nodes)
         self.on_idle = on_idle
         self.storage = storage
-        self.rstorage = ResultStorage(self.storage, self.job_config_cls)
-        self.idx = idx
 
     def request_stop(self) -> None:
         self.stop_requested = True
 
     def join_remote(self, path: str) -> str:
-        return os.path.join(self.config.remote_dir, path)
+        return os.path.join(self.suite.remote_dir, path)
 
     @abc.abstractmethod
     def run(self) -> None:
@@ -185,62 +57,51 @@
 
     def __init__(self, *args, **kwargs) -> None:
         PerfTest.__init__(self, *args, **kwargs)
-        self.job_configs = [None]  # type: List[Optional[TestJobConfig]]
-        self.suite_root_path = self.rstorage.get_suite_root(self.config.test_type, self.idx)
+        self.job_configs = None  # type: List[TestJobConfig]
 
     @abc.abstractmethod
     def get_expected_runtime(self, iter_cfg: TestJobConfig) -> Optional[int]:
         pass
 
-    def get_not_done_stages(self) -> Iterable[Tuple[int, TestJobConfig]]:
-        all_jobs = dict(enumerate(self.job_configs))
-        for db_config, path, jid in self.rstorage.list_jobs_in_suite(self.suite_root_path):
-            if jid in all_jobs:
-                job_config = all_jobs[jid]
-                if job_config != db_config:
-                    logger.error("Test info at path '%s/config' is not equal to expected config for iteration %s.%s." +
+    def get_not_done_jobs(self) -> Iterable[TestJobConfig]:
+        jobs_map = {job.storage_id: job for job in self.job_configs}
+        already_in_storage = set()
+        for db_config in cast(List[TestJobConfig], self.storage.iter_job(self.suite)):
+            if db_config.storage_id in jobs_map:
+                job = jobs_map[db_config.storage_id]
+                if job != db_config:
+                    logger.error("Test info at '%s.%s' is not equal to expected config for iteration %s.%s." +
                                  " Maybe configuration was changed before test was restarted. " +
                                  "DB cfg is:\n    %s\nExpected cfg is:\n    %s\nFix DB or rerun test from beginning",
-                                 path, self.name, job_config.summary,
+                                 self.suite.storage_id, job.storage_id, self.name, job.summary,
                                  str(db_config).replace("\n", "\n    "),
-                                 str(job_config).replace("\n", "\n    "))
+                                 str(job).replace("\n", "\n    "))
                     raise StopTestError()
 
-                logger.info("Test iteration %s.%s found in storage and will be skipped",
-                            self.name, job_config.summary)
-                del all_jobs[jid]
-        return all_jobs.items()
+                logger.info("Test iteration %s.%s found in storage and will be skipped", self.name, job.summary)
+                already_in_storage.add(db_config.storage_id)
+
+        return [job for job in self.job_configs if job.storage_id not in already_in_storage]
 
     def run(self) -> None:
-        try:
-            cfg = self.rstorage.get_suite_config(self.suite_root_path)
-        except KeyError:
-            cfg = None
+        self.storage.put_or_check_suite(self.suite)
 
-        if cfg is not None and cfg != self.config:
-            logger.error("Current suite %s config is not equal to found in storage at %s",
-                         self.config.test_type, self.suite_root_path)
-            raise StopTestError()
-
-        not_in_storage = list(self.get_not_done_stages())
-
+        not_in_storage = list(self.get_not_done_jobs())
         if not not_in_storage:
             logger.info("All test iteration in storage already. Skip test")
             return
 
-        self.rstorage.put_suite_config(self.config, self.suite_root_path)
-
         logger.debug("Run test %s with profile %r on nodes %s.", self.name,
                                                                  self.load_profile_name,
                                                                  ",".join(self.sorted_nodes_ids))
         logger.debug("Prepare nodes")
 
 
-        with ThreadPoolExecutor(len(self.config.nodes)) as pool:
+        with ThreadPoolExecutor(len(self.suite.nodes)) as pool:
             # config nodes
-            list(pool.map(self.config_node, self.config.nodes))
+            list(pool.map(self.config_node, self.suite.nodes))
 
-            run_times = [self.get_expected_runtime(job_config) for _, job_config in not_in_storage]
+            run_times = list(map(self.get_expected_runtime, not_in_storage))
 
             if None not in run_times:
                 # +5% - is a rough estimation for additional operations
@@ -249,51 +110,52 @@
                 exec_time_s, end_dt_s = get_time_interval_printable_info(expected_run_time)
                 logger.info("Entire test should takes around %s and finished at %s", exec_time_s, end_dt_s)
 
-            for run_id, job_config in not_in_storage:
-                job_path = self.rstorage.get_job_root(self.suite_root_path, job_config.summary, run_id)
-
-                jfutures = []  # type: List[Future]
-                for idx in range(self.max_retry):
-                    logger.debug("Prepare job %s", job_config.summary)
+            for job in not_in_storage:
+                results = []  # type: List[TimeSeries]
+                for retry_idx in range(self.max_retry):
+                    logger.debug("Prepare job %s", job.summary)
 
                     # prepare nodes for new iterations
-                    wait([pool.submit(self.prepare_iteration, node, job_config) for node in self.config.nodes])
+                    wait([pool.submit(self.prepare_iteration, node, job) for node in self.suite.nodes])
 
-                    expected_job_time = self.get_expected_runtime(job_config)
+                    expected_job_time = self.get_expected_runtime(job)
                     exec_time_s, end_dt_s = get_time_interval_printable_info(expected_job_time)
                     logger.info("Job should takes around %s and finished at %s", exec_time_s, end_dt_s)
 
-                    try:
-                        jfutures = []
-                        for node in self.config.nodes:
-                            future = pool.submit(self.run_iteration, node, job_config, job_path)
-                            jfutures.append(future)
-                        # test completed successfully, stop retrying
-                        break
-                    except EnvironmentError:
-                        if self.max_retry - 1 == idx:
-                            logger.exception("Fio failed")
-                            raise StopTestError()
-                        logger.exception("During fio run")
-                        logger.info("Sleeping %ss and retrying job", self.retry_time)
-                        time.sleep(self.retry_time)
+                    jfutures = [pool.submit(self.run_iteration, node, job) for node in self.suite.nodes]
+                    failed = False
+                    for future in jfutures:
+                        try:
+                            results.extend(future.result())
+                        except EnvironmentError:
+                            failed = True
 
+                    if not failed:
+                        break
+
+                    if self.max_retry - 1 == retry_idx:
+                        logger.exception("Fio failed")
+                        raise StopTestError()
+
+                    logger.exception("During fio run")
+                    logger.info("Sleeping %ss and retrying job", self.retry_time)
+                    time.sleep(self.retry_time)
+                    results = []
+
+                # per node jobs start and stop times
                 start_times = []  # type: List[int]
                 stop_times = []  # type: List[int]
 
-                for future in jfutures:
-                    for (node_id, dev, sensor_name), ts in future.result().items():
-                        self.rstorage.put_ts(ts, job_path, node_id=node_id, dev=dev, sensor_name=sensor_name)
-
-                        if len(ts.times) >= 2:
-                            start_times.append(ts.times[0])
-                            stop_times.append(ts.times[-1])
+                for ts in results:
+                    self.storage.put_ts(ts)
+                    if len(ts.times) >= 2:  # type: ignore
+                        start_times.append(ts.times[0])
+                        stop_times.append(ts.times[-1])
 
                 if len(start_times) > 0:
                     min_start_time = min(start_times)
                     max_start_time = max(start_times)
                     min_stop_time = min(stop_times)
-                    max_stop_time = max(stop_times)
 
                     max_allowed_time_diff = int((min_stop_time - max_start_time) * self.max_rel_time_diff)
                     max_allowed_time_diff = max(max_allowed_time_diff, self.max_time_diff)
@@ -301,16 +163,19 @@
                     if min_start_time + self.max_time_diff < max_allowed_time_diff:
                         logger.warning("Too large difference in %s:%s start time - %s. " +
                                        "Max recommended difference is %s",
-                                       self.name, job_config.summary,
+                                       self.name, job.summary,
                                        max_start_time - min_start_time, self.max_time_diff)
 
                     if min_stop_time + self.max_time_diff < max_allowed_time_diff:
                         logger.warning("Too large difference in %s:%s stop time - %s. " +
                                        "Max recommended difference is %s",
-                                       self.name, job_config.summary,
+                                       self.name, job.summary,
                                        max_start_time - min_start_time, self.max_time_diff)
 
-                self.rstorage.put_job_config(job_config, job_path)
+                    job.reliable_info_starts_at = max_start_time
+                    job.reliable_info_stops_at = min_stop_time
+
+                self.storage.put_job(self.suite, job)
                 self.storage.sync()
 
                 if self.on_idle is not None:
@@ -321,24 +186,25 @@
         pass
 
     @abc.abstractmethod
-    def prepare_iteration(self, node: IRPCNode, iter_config: TestJobConfig) -> None:
+    def prepare_iteration(self, node: IRPCNode, job: TestJobConfig) -> None:
         pass
 
     @abc.abstractmethod
-    def run_iteration(self, node: IRPCNode, iter_config: TestJobConfig, stor_prefix: str) -> JobMetrics:
+    def run_iteration(self, node: IRPCNode, job: TestJobConfig) -> List[TimeSeries]:
         pass
 
 
 class TwoScriptTest(ThreadedTest, metaclass=abc.ABCMeta):
     def __init__(self, *dt, **mp) -> None:
         ThreadedTest.__init__(self, *dt, **mp)
-        self.prerun_script = self.config.params['prerun_script']
-        self.run_script = self.config.params['run_script']
-        self.prerun_tout = self.config.params.get('prerun_tout', 3600)
-        self.run_tout = self.config.params.get('run_tout', 3600)
-        self.iterations_configs = [None]
+        self.prerun_script = self.suite.params['prerun_script']
+        self.run_script = self.suite.params['run_script']
+        self.prerun_tout = self.suite.params.get('prerun_tout', 3600)
+        self.run_tout = self.suite.params.get('run_tout', 3600)
+        # TODO: fix job_configs field
+        raise NotImplementedError("Fix job configs")
 
-    def get_expected_runtime(self, iter_cfg: TestJobConfig) -> Optional[int]:
+    def get_expected_runtime(self, job: TestJobConfig) -> Optional[int]:
         return None
 
     def config_node(self, node: IRPCNode) -> None:
@@ -346,19 +212,19 @@
         node.copy_file(self.prerun_script, self.join_remote(self.prerun_script))
 
         cmd = self.join_remote(self.prerun_script)
-        cmd += ' ' + self.config.params.get('prerun_opts', '')
+        cmd += ' ' + self.suite.params.get('prerun_opts', '')
         node.run(cmd, timeout=self.prerun_tout)
 
-    def prepare_iteration(self, node: IRPCNode, iter_config: TestJobConfig) -> None:
+    def prepare_iteration(self, node: IRPCNode, job: TestJobConfig) -> None:
         pass
 
-    def run_iteration(self, node: IRPCNode, iter_config: TestJobConfig, stor_prefix: str) -> JobMetrics:
+    def run_iteration(self, node: IRPCNode, job: TestJobConfig) -> List[TimeSeries]:
         # TODO: have to store logs
         cmd = self.join_remote(self.run_script)
-        cmd += ' ' + self.config.params.get('run_opts', '')
+        cmd += ' ' + self.suite.params.get('run_opts', '')
         return self.parse_results(node.run(cmd, timeout=self.run_tout))
 
     @abc.abstractmethod
-    def parse_results(self, data: str) -> JobMetrics:
+    def parse_results(self, data: str) -> List[TimeSeries]:
         pass
 
diff --git a/wally/test_run_class.py b/wally/test_run_class.py
index a0c014f..fea846d 100644
--- a/wally/test_run_class.py
+++ b/wally/test_run_class.py
@@ -43,7 +43,7 @@
 
     def merge_node(self, creds: ConnCreds, roles: Set[str]) -> NodeInfo:
         info = NodeInfo(creds, roles)
-        nid = info.node_id()
+        nid = info.node_id
 
         if nid in self.nodes_info:
             self.nodes_info[nid].roles.update(info.roles)
diff --git a/wally/utils.py b/wally/utils.py
index 078a019..78235a8 100644
--- a/wally/utils.py
+++ b/wally/utils.py
@@ -11,11 +11,8 @@
 import threading
 import contextlib
 import subprocess
-import collections
 
-from .node_interfaces import IRPCNode
-from typing import (Any, Tuple, Union, List, Iterator, Dict, Iterable, Optional,
-                    IO, Sequence, NamedTuple, cast, TypeVar)
+from typing import Any, Tuple, Union, List, Iterator, Iterable, Optional, IO, cast, TypeVar, Callable
 
 try:
     import psutil
@@ -347,59 +344,6 @@
     return user, passwd, tenant, auth_url, insecure
 
 
-OSRelease = NamedTuple("OSRelease",
-                       [("distro", str),
-                        ("release", str),
-                        ("arch", str)])
-
-
-def get_os(node: IRPCNode) -> OSRelease:
-    """return os type, release and architecture for node.
-    """
-    arch = node.run("arch", nolog=True).strip()
-
-    try:
-        node.run("ls -l /etc/redhat-release", nolog=True)
-        return OSRelease('redhat', None, arch)
-    except:
-        pass
-
-    try:
-        node.run("ls -l /etc/debian_version", nolog=True)
-
-        release = None
-        for line in node.run("lsb_release -a", nolog=True).split("\n"):
-            if ':' not in line:
-                continue
-            opt, val = line.split(":", 1)
-
-            if opt == 'Codename':
-                release = val.strip()
-
-        return OSRelease('ubuntu', release, arch)
-    except:
-        pass
-
-    raise RuntimeError("Unknown os")
-
-
-@contextlib.contextmanager
-def empty_ctx(val: Any = None) -> Iterator[Any]:
-    yield val
-
-
-def log_nodes_statistic(nodes: Sequence[IRPCNode]) -> None:
-    logger.info("Found {0} nodes total".format(len(nodes)))
-
-    per_role = collections.defaultdict(int)  # type: Dict[str, int]
-    for node in nodes:
-        for role in node.info.roles:
-            per_role[role] += 1
-
-    for role, count in sorted(per_role.items()):
-        logger.debug("Found {0} nodes with role {1}".format(count, role))
-
-
 def which(program: str) -> Optional[str]:
     def is_exe(fpath):
         return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
@@ -413,6 +357,11 @@
     return None
 
 
+@contextlib.contextmanager
+def empty_ctx(val: Any = None) -> Iterator[Any]:
+    yield val
+
+
 def get_uniq_path_uuid(path: str, max_iter: int = 10) -> Tuple[str, str]:
     for i in range(max_iter):
         run_uuid = pet_generate(2, "_")
@@ -442,3 +391,16 @@
     now_dt = datetime.datetime.now()
     end_dt = now_dt + datetime.timedelta(0, seconds)
     return exec_time_s, "{:%H:%M:%S}".format(end_dt)
+
+
+FM_FUNC_INPUT = TypeVar("FM_FUNC_INPUT")
+FM_FUNC_RES = TypeVar("FM_FUNC_RES")
+
+
+def flatmap(func: Callable[[FM_FUNC_INPUT], Iterable[FM_FUNC_RES]],
+            inp_iter: Iterable[FM_FUNC_INPUT]) -> Iterator[FM_FUNC_RES]:
+    for val in inp_iter:
+        for res in func(val):
+            yield res
+
+