a lot of fixes, improve visualization speed, add c++ code
diff --git a/clib/Makefile b/clib/Makefile
new file mode 100644
index 0000000..9786d22
--- /dev/null
+++ b/clib/Makefile
@@ -0,0 +1,6 @@
+libwally.so: interpolate.o Makefile
+	g++ interpolate.o -shared -o libwally.so
+
+interpolate.o: interpolate.cpp
+	g++ -c -Wall -std=c++11 -fPIC -O3 interpolate.cpp -o interpolate.o
+
diff --git a/clib/interpolate.cpp b/clib/interpolate.cpp
new file mode 100644
index 0000000..478eb88
--- /dev/null
+++ b/clib/interpolate.cpp
@@ -0,0 +1,120 @@
+#include <algorithm>
+#include <cstdint>
+#include <cstdio>
+
+extern "C"
+void interpolate_ts_on_seconds_border(
+    unsigned int input_size,
+    unsigned int output_size,
+    uint64_t * times,
+    uint64_t * values,
+    unsigned int time_scale_coef,
+    uint64_t * output)
+{
+    auto first_output_cell = (*times / time_scale_coef) * time_scale_coef;
+    auto input_cell_end = *times - time_scale_coef;  // hack to unify loop
+    uint64_t input_cell_begin;
+
+//    std::printf("first_output_cell = %ld\n", (long)first_output_cell);
+
+    for(auto curr_times=times, curr_data_ptr=values;
+        curr_times < times + input_size ; ++curr_times, ++curr_data_ptr)
+    {
+        // take next cell from input array and calculate data rate in it
+        auto data_left = *curr_data_ptr;
+        input_cell_begin = input_cell_end;
+        input_cell_end = *curr_times;
+
+        auto rate = data_left / double(input_cell_end - input_cell_begin);
+
+//        std::printf("input_cell_begin=%ld input_cell_end=%ld\n", (long)input_cell_begin, (long)input_cell_end);
+//        std::printf("rate = %lf data_left=%ld\n", rate, (long)data_left);
+
+        uint32_t first_output_cell_idx;
+        if (input_cell_begin <= first_output_cell)
+            first_output_cell_idx = 0;
+        else
+            // +1 because first_output_cell is actually the end of first cell
+            first_output_cell_idx = (input_cell_begin - first_output_cell) / time_scale_coef + 1;
+
+        uint32_t last_output_cell_idx = (input_cell_end - first_output_cell) / time_scale_coef;
+
+        if ((input_cell_end - first_output_cell) % time_scale_coef != 0)
+            ++last_output_cell_idx;
+
+        last_output_cell_idx = std::min(last_output_cell_idx, output_size - 1);
+
+//        std::printf("fidx=%d lidx=%d\n", (int)first_output_cell_idx, (int)last_output_cell_idx);
+
+        for(auto output_idx = first_output_cell_idx; output_idx <= last_output_cell_idx ; ++output_idx)
+        {
+            // current output cell time slot
+            auto out_cell_begin = output_idx * time_scale_coef + first_output_cell - time_scale_coef;
+            auto out_cell_end = out_cell_begin + time_scale_coef;
+            auto slot = std::min(out_cell_end, input_cell_end) - std::max(out_cell_begin, input_cell_begin);
+
+            auto slice = uint64_t(rate * slot);
+
+//            std::printf("slot=%ld slice=%lf output_idx=%ld\n", (long)slot, (double)slice, (long)output_idx);
+
+            data_left -= slice;
+            output[output_idx] += slice;
+        }
+        output[last_output_cell_idx] += data_left;
+    }
+}
+
+
+extern "C"
+void interpolate_ts_on_seconds_border_v2(
+    unsigned int input_size,
+    unsigned int output_size,
+    uint64_t * times,
+    uint64_t * values,
+    unsigned int time_step,
+    uint64_t * output)
+{
+    auto output_end = (*times / time_step) * time_step;
+    auto output_begin = output_end - time_step;
+    auto output_cell = output;
+
+    auto input_cell = values;
+    auto input_time = times;
+    auto input_val = *input_cell;
+    auto input_begin = *input_time - time_step;
+    auto input_end = *input_time;
+    auto rate = ((double)*input_cell) / (input_end - input_begin);
+
+    // output array mush fully cover input array
+    while(output_cell < output + output_size) {
+        // check if cells intersect
+        auto intersection = ((int64_t)std::min(output_end, input_end)) - std::max(output_begin, input_begin);
+
+        // add intersection slice to output array
+        if(intersection > 0) {
+            auto slice = (uint64_t)(intersection * rate);
+            *output_cell += slice;
+            input_val -= slice;
+        }
+
+        // switch to next input or output cell
+        if (output_end >= input_end){
+            *output_cell += input_val;
+
+            ++input_cell;
+            ++input_time;
+
+            if(input_time == times + input_size)
+                return;
+
+            input_val = *input_cell;
+            input_begin = input_end;
+            input_end = *input_time;
+            rate = ((double)*input_cell) / (input_end - input_begin);
+        } else {
+            ++output_cell;
+            output_begin = output_end;
+            output_end += time_step;
+        }
+    }
+}
\ No newline at end of file
diff --git a/configs-examples/default.yaml b/configs-examples/default.yaml
index c3614ba..da50150 100644
--- a/configs-examples/default.yaml
+++ b/configs-examples/default.yaml
@@ -1,22 +1,30 @@
 #  ------------------------------------    CONFIGS   -------------------------------------------------------------------
-fuel:
-    url: http://172.16.44.13:8000/
-    creds: admin:admin@admin
-    ssh_creds: root:r00tme
-    openstack_env: test
-
-openstack:
-    skip_preparation: false
-    openrc: /home/koder/workspace/scale_openrc
-    openrc:
-        user: USER
-        passwd: PASSWD
-        tenant: KEY_FILE
-        auth_url: URL
-        SOME_OTHER_OPTS: OPTIONAL
-    vms:
-        - "USERNAME[:PASSWD]@VM_NAME_PREFIX[::KEY_FILE]"
-
+#fuel:
+#    url: http://172.16.44.13:8000/
+#    creds: admin:admin@admin
+#    ssh_creds: root:r00tme
+#    openstack_env: test
+#
+#openstack:
+#    skip_preparation: false
+#    openrc: /home/koder/workspace/scale_openrc
+#    openrc:
+#        user: USER
+#        passwd: PASSWD
+#        tenant: KEY_FILE
+#        auth_url: URL
+#        SOME_OTHER_OPTS: OPTIONAL
+#    vms:
+#        - "USERNAME[:PASSWD]@VM_NAME_PREFIX[::KEY_FILE]"
+#
+#ceph:
+#    cluster: ceph   << Optional
+#    config: PATH    << Optional
+#    keyfile: PATH   << Optional
+#    key: KEY   << not supported for now
+#    root_node: NODE_NAME
+#
+#
 # nodes: - map of explicit nodes URLS to node roles
 # in format
 #    USERNAME[:PASSWD]@VM_NAME_PREFIX[::KEY_FILE] or localhost: role1, role2, role3....
diff --git a/configs-examples/local_vm_ceph.yml b/configs-examples/local_vm_ceph.yml
new file mode 100644
index 0000000..2c75e31
--- /dev/null
+++ b/configs-examples/local_vm_ceph.yml
@@ -0,0 +1,19 @@
+include: default.yaml
+run_sensors: true
+results_storage: /var/wally_results
+discover: ceph
+
+ceph:
+    root_node: ceph-client
+
+sleep: 0
+
+nodes:
+    koder@ceph-client: testnode
+
+tests:
+  - fio:
+      load: verify
+      params:
+          FILENAME: /dev/rbd0
+          FILESIZE: 4G
diff --git a/configs-examples/perf_lab.yml b/configs-examples/perf_lab.yml
new file mode 100644
index 0000000..b53dfa4
--- /dev/null
+++ b/configs-examples/perf_lab.yml
@@ -0,0 +1,17 @@
+include: default.yaml
+run_sensors: true
+results_storage: /var/wally_results
+discover: ceph
+
+ceph:
+    root_node: root@cz7625
+
+nodes:
+    root@cz7625: testnode
+
+tests:
+  - fio:
+      load: ceph
+      params:
+          FILENAME: /dev/rbd0
+          FILESIZE: 100G
\ No newline at end of file
diff --git a/requirements.txt b/requirements.txt
index 004d52d..2162299 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -15,3 +15,12 @@
 python-cinderclient
 python-keystoneclient
 python-glanceclient
+oktest
+iso8601==0.1.10
+scipy
+numpy
+matplotlib
+psutil
+seaborn
+pytest
+
diff --git a/requirements_extra.txt b/requirements_extra.txt
index 6551654..e69de29 100644
--- a/requirements_extra.txt
+++ b/requirements_extra.txt
@@ -1,6 +0,0 @@
-oktest
-iso8601==0.1.10
-scipy
-numpy
-matplotlib
-psutil
diff --git a/run.py b/run.py
new file mode 100644
index 0000000..a8c0e3d
--- /dev/null
+++ b/run.py
@@ -0,0 +1,5 @@
+import sys
+from wally.main import main
+
+exit(main(sys.argv))
+
diff --git a/tests/test_hlstorage.py b/tests/test_hlstorage.py
new file mode 100644
index 0000000..dcc3f6e
--- /dev/null
+++ b/tests/test_hlstorage.py
@@ -0,0 +1,128 @@
+import shutil
+import tempfile
+import contextlib
+from typing import Tuple, Union, Dict, Any
+
+import numpy
+from oktest import ok
+
+
+from wally.result_classes import DataSource, TimeSeries, SuiteConfig
+from wally.suits.job import JobConfig, JobParams
+from wally.storage import make_storage
+from wally.hlstorage import ResultStorage
+
+
+@contextlib.contextmanager
+def in_temp_dir():
+    dname = tempfile.mkdtemp()
+    try:
+        yield dname
+    finally:
+        shutil.rmtree(dname)
+
+
+SUITE_ID = "suite1"
+JOB_ID = "job1"
+NODE_ID = "node1"
+SENSOR = "sensor"
+DEV = "dev"
+METRIC = "metric"
+TAG = "csv"
+DATA_UNITS = "x"
+TIME_UNITS = "us"
+
+
+class TestJobParams(JobParams):
+    def __init__(self) -> None:
+        JobParams.__init__(self)
+
+    @property
+    def summary(self) -> str:
+        return "UT_Job_CFG"
+
+    @property
+    def long_summary(self) -> str:
+        return "UT_Job_Config"
+
+    def copy(self, **updated) -> 'JobParams':
+        return self.__class__()
+
+    @property
+    def char_tpl(self) -> Tuple[Union[str, int, float, bool], ...]:
+        return (1, 2, 3)
+
+
+class TestJobConfig(JobConfig):
+    @property
+    def storage_id(self) -> str:
+        return JOB_ID
+
+    @property
+    def params(self) -> JobParams:
+        return TestJobParams()
+
+    def raw(self) -> Dict[str, Any]:
+        return {}
+
+    @classmethod
+    def fromraw(cls, data: Dict[str, Any]) -> 'TestJobConfig':
+        return cls()
+
+
+class TestSuiteConfig(SuiteConfig):
+    def __init__(self):
+        SuiteConfig.__init__(self, "UT", {}, "run_uuid", [], "/tmp", 0, False)
+        self.storage_id = SUITE_ID
+
+
+
+def test_sensor_ts():
+    with in_temp_dir() as root:
+        sensor_data = numpy.arange(5)
+        collected_at = numpy.arange(5) + 100
+
+        ds = DataSource(node_id=NODE_ID, sensor=SENSOR, dev=DEV, metric=METRIC)
+        cds = DataSource(node_id=NODE_ID, metric='collected_at')
+
+        with make_storage(root, existing=False) as storage:
+            rstorage = ResultStorage(storage)
+
+            rstorage.append_sensor(sensor_data, ds, units=DATA_UNITS, histo_bins=None)
+            rstorage.append_sensor(sensor_data, ds, units=DATA_UNITS, histo_bins=None)
+
+            rstorage.append_sensor(collected_at, cds, units=TIME_UNITS, histo_bins=None)
+            rstorage.append_sensor(collected_at + 5, cds, units=TIME_UNITS, histo_bins=None)
+
+        with make_storage(root, existing=True) as storage2:
+            rstorage2 = ResultStorage(storage2)
+            ts = rstorage2.load_sensor(ds)
+            assert (ts.data == numpy.array([0, 1, 2, 3, 4, 0, 1, 2, 3, 4])).all()
+            assert (ts.times == numpy.arange(10) + 100).all()
+
+
+def test_result_ts():
+    with in_temp_dir() as root:
+        sensor_data = numpy.arange(5, dtype=numpy.uint32)
+        collected_at = numpy.arange(5, dtype=numpy.uint32) + 100
+        ds = DataSource(suite_id=SUITE_ID, job_id=JOB_ID,
+                        node_id=NODE_ID, sensor=SENSOR, dev=DEV, metric=METRIC, tag=TAG)
+
+        ts = TimeSeries("xxxx", None, sensor_data, collected_at, DATA_UNITS, ds, TIME_UNITS)
+
+        suite = TestSuiteConfig()
+        job = TestJobConfig(1)
+
+        with make_storage(root, existing=False) as storage:
+            rstorage = ResultStorage(storage)
+            rstorage.put_or_check_suite(suite)
+            rstorage.put_job(suite, job)
+            rstorage.put_ts(ts)
+
+        with make_storage(root, existing=True) as storage2:
+            rstorage2 = ResultStorage(storage2)
+            suits = list(rstorage2.iter_suite('UT'))
+            suits2 = list(rstorage2.iter_suite())
+            ok(len(suits)) == 1
+            ok(len(suits2)) == 1
+            
diff --git a/tests/test_math.py b/tests/test_math.py
index 5a75858..d9c9688 100644
--- a/tests/test_math.py
+++ b/tests/test_math.py
@@ -1,5 +1,7 @@
 import numpy
 from wally.statistic import rebin_histogram
+from wally.result_classes import DataSource, TimeSeries
+from wally.data_selectors import interpolate_ts_on_seconds_border, c_interpolate_ts_on_seconds_border
 
 
 def array_eq(x: numpy.array, y: numpy.array, max_diff: float = 1E-3) -> bool:
@@ -30,3 +32,63 @@
     assert array_eq(new_edges, numpy.array([20, 30, 40]))
     assert new_histo.sum() == curr_histo.sum()
     assert list(new_histo) == [30, 10, 60]
+
+
+SUITE_ID = "suite1"
+JOB_ID = "job1"
+NODE_ID = "node1"
+SENSOR = "sensor"
+DEV = "dev"
+METRIC = "metric"
+TAG = "csv"
+DATA_UNITS = "x"
+TIME_UNITS = "ms"
+
+
+def test_interpolate():
+    ds = DataSource(node_id=NODE_ID, sensor=SENSOR, dev=DEV, metric=METRIC)
+    samples = 200
+    ms_coef = 1000
+    s_offset = 377 * ms_coef
+    ms_offset = 300 + s_offset
+
+    for i in range(16):
+        source_times = numpy.random.randint(100, size=samples, dtype='uint64') + \
+            ms_coef * numpy.arange(samples, dtype='uint64') + s_offset + ms_offset
+        source_values = numpy.random.randint(30, 60, size=samples, dtype='uint64')
+
+        ts = TimeSeries("test", raw=None, data=source_values, times=source_times, units=DATA_UNITS,
+                        source=ds, time_units=TIME_UNITS)
+
+        # ts2 = interpolate_ts_on_seconds_border(ts)
+        ts2 = c_interpolate_ts_on_seconds_border(ts, nc=True)
+
+        # print()
+        # print(ts.times)
+        # print(ts.data, ts.data.sum())
+        # print(ts2.times)
+        # print(ts2.data, ts2.data.sum())
+
+        assert ts.time_units == 'ms'
+        assert ts2.time_units == 's'
+        assert ts2.times.dtype == ts.times.dtype
+        assert ts2.data.dtype == ts.data.dtype
+
+        assert ts.data.sum() == ts2.data.sum()
+
+        borders = 5
+        block_size = samples // 10
+        for begin_idx in numpy.random.randint(borders, samples - borders, size=20):
+            begin_idx = int(begin_idx)
+            end_idx = min(begin_idx + block_size, ts.times.size - 1)
+
+            first_cell_begin_time = ts.times[begin_idx - 1]
+            last_cell_end_time = ts.times[end_idx]
+            ts_sum = ts.data[begin_idx:end_idx].sum()
+
+            ts2_begin_idx = numpy.searchsorted(ts2.times, first_cell_begin_time // ms_coef)
+            ts2_end_idx = numpy.searchsorted(ts2.times, last_cell_end_time // ms_coef) + 1
+            ts2_max = ts.data[ts2_begin_idx: ts2_end_idx].sum()
+            ts2_min = ts.data[ts2_begin_idx + 1: ts2_end_idx - 1].sum()
+
+            assert ts2_min <= ts_sum <= ts2_max, "NOT {} <= {} <= {}".format(ts2_min, ts_sum, ts2_max)
diff --git a/v2_plans.md b/v2_plans.md
index 70a0caf..90bb288 100644
--- a/v2_plans.md
+++ b/v2_plans.md
@@ -46,7 +46,7 @@
 TODO next
 ---------
 
-* Remove DBStorage, merge FSStorage and serializer into
+* Merge FSStorage and serializer into
   ObjStorage, separate TSStorage.
 * Build WallyStorage on top of it, use only WallyStorage in code
 * check that OS key match what is stored on disk 
@@ -58,7 +58,6 @@
 * Cumulative statistic table for all jobs
 * Add column for job params, which show how many cluster resource consumed
 * show extra outliers with arrows
-* More X = func(QD) plots. Eg. - kurt/skew, etc.
 * Hide cluster load if no nodes available
 * Show latency skew and curtosis
 * Sort engineering report by result tuple
diff --git a/wally/ceph.py b/wally/ceph.py
index 36d6b15..3527db3 100644
--- a/wally/ceph.py
+++ b/wally/ceph.py
@@ -1,7 +1,7 @@
 """ Collect data about ceph nodes"""
-import json
+import os
 import logging
-from typing import Dict, cast, List, Set, Optional
+from typing import Dict, cast, List, Set
 
 
 from .node_interfaces import NodeInfo, IRPCNode
@@ -21,16 +21,16 @@
 logger = logging.getLogger("wally")
 
 
-def get_osds_info(node: IRPCNode, ceph_extra_args: str = "") -> Dict[IP, List[OSDInfo]]:
+def get_osds_info(node: IRPCNode, ceph_extra_args: str = "", thcount: int = 8) -> Dict[IP, List[OSDInfo]]:
     """Get set of osd's ip"""
     res = {}  # type: Dict[IP, List[OSDInfo]]
     return {IP(ip): osd_info_list
-            for ip, osd_info_list in discover.get_osds_nodes(node.run, ceph_extra_args)}
+            for ip, osd_info_list in discover.get_osds_nodes(node.run, ceph_extra_args, thcount=thcount).items()}
 
 
 def get_mons_ips(node: IRPCNode, ceph_extra_args: str = "") -> Set[IP]:
     """Return mon ip set"""
-    return {IP(ip) for ip in discover.get_mons_nodes(node.run, ceph_extra_args).values()}
+    return {IP(ip) for ip, _ in discover.get_mons_nodes(node.run, ceph_extra_args).values()}
 
 
 class DiscoverCephStage(Stage):
@@ -40,7 +40,8 @@
     def run(self, ctx: TestRun) -> None:
         """Return list of ceph's nodes NodeInfo"""
 
-        if 'ceph' not in ctx.config.discovery:
+        if 'ceph' not in ctx.config.discover:
+            print(ctx.config.discover)
             logger.debug("Skip ceph discovery due to config setting")
             return
 
@@ -48,11 +49,11 @@
             logger.debug("Skip ceph discovery, use previously discovered nodes")
             return
 
-        if 'metadata' in ctx.config.discovery:
+        if 'metadata' in ctx.config.discover:
             logger.exception("Ceph metadata discovery is not implemented")
             raise StopTestError()
 
-        ignore_errors = 'ignore_errors' in ctx.config.discovery
+        ignore_errors = 'ignore_errors' in ctx.config.discover
         ceph = ctx.config.ceph
         root_node_uri = cast(str, ceph.root_node)
         cluster = ceph.get("cluster", "ceph")
@@ -81,18 +82,31 @@
 
         ceph_params = {"cluster": cluster, "conf": conf, "key": key}
 
+        ip_remap = {
+            '10.8.0.4': '172.16.164.71',
+            '10.8.0.3': '172.16.164.72',
+            '10.8.0.2': '172.16.164.73',
+            '10.8.0.5': '172.16.164.74',
+            '10.8.0.6': '172.16.164.75',
+            '10.8.0.7': '172.16.164.76',
+            '10.8.0.8': '172.16.164.77',
+            '10.8.0.9': '172.16.164.78',
+        }
+
         with setup_rpc(connect(info), ctx.rpc_code, ctx.default_rpc_plugins,
                        log_level=ctx.config.rpc_log_level) as node:
 
-            ssh_key = node.get_file_content("~/.ssh/id_rsa")
+            # ssh_key = node.get_file_content("~/.ssh/id_rsa", expanduser=True)
 
             try:
                 ips = set()
-                for ip, osds_info in get_osds_info(node, ceph_extra_args).items():
+                for ip, osds_info in get_osds_info(node, ceph_extra_args, thcount=16).items():
+                    ip = ip_remap[ip]
                     ips.add(ip)
-                    creds = ConnCreds(to_ip(cast(str, ip)), user="root", key=ssh_key)
+                    # creds = ConnCreds(to_ip(cast(str, ip)), user="root", key=ssh_key)
+                    creds = ConnCreds(to_ip(cast(str, ip)), user="root")
                     info = ctx.merge_node(creds, {'ceph-osd'})
-                    info.params.setdefault('ceph-osds', []).extend(osds_info)
+                    info.params.setdefault('ceph-osds', []).extend(info.__dict__.copy() for info in osds_info)
                     assert 'ceph' not in info.params or info.params['ceph'] == ceph_params
                     info.params['ceph'] = ceph_params
 
@@ -107,7 +121,9 @@
             try:
                 counter = 0
                 for counter, ip in enumerate(get_mons_ips(node, ceph_extra_args)):
-                    creds = ConnCreds(to_ip(cast(str, ip)), user="root", key=ssh_key)
+                    ip = ip_remap[ip]
+                    # creds = ConnCreds(to_ip(cast(str, ip)), user="root", key=ssh_key)
+                    creds = ConnCreds(to_ip(cast(str, ip)), user="root")
                     info = ctx.merge_node(creds, {'ceph-mon'})
                     assert 'ceph' not in info.params or info.params['ceph'] == ceph_params
                     info.params['ceph'] = ceph_params
@@ -118,3 +134,33 @@
                     raise StopTestError()
                 else:
                     logger.warning("MON discovery failed %s", exc)
+
+
+def raw_dev_name(path):
+    if path.startswith("/dev/"):
+        path = path[5:]
+    while path and path[-1].isdigit():
+        path = path[:-1]
+    return path
+
+
+class FillCephInfoStage(Stage):
+    config_block = 'ceph'
+    priority = StepOrder.UPDATE_NODES_INFO
+
+    def run(self, ctx: TestRun) -> None:
+        for node in ctx.nodes:
+            if 'ceph_storage_devs' not in node.info.params:
+                if 'ceph-osd' in node.info.roles:
+                    jdevs = set()
+                    sdevs = set()
+                    for osd_info in node.info.params['ceph-osds']:
+                        for key, sset in [('journal', jdevs), ('storage', sdevs)]:
+                            path = osd_info.get(key)
+                            if path:
+                                dpath = node.conn.fs.get_dev_for_file(path)
+                                if isinstance(dpath, bytes):
+                                    dpath = dpath.decode('utf8')
+                                sset.add(raw_dev_name(dpath))
+                    node.info.params['ceph_storage_devs'] = list(sdevs)
+                    node.info.params['ceph_journal_devs'] = list(jdevs)
diff --git a/wally/config.py b/wally/config.py
index add4df5..95ae511 100644
--- a/wally/config.py
+++ b/wally/config.py
@@ -25,7 +25,7 @@
         self.debug_agents = False  # type: bool
 
         # None, disabled, enabled, metadata, ignore_errors
-        self.discovery = None  # type: Optional[str]
+        self.discover = None  # type: Optional[str]
 
         self.logging = None  # type: 'Config'
         self.ceph = None  # type: 'Config'
@@ -33,7 +33,7 @@
         self.fuel = None  # type: 'Config'
         self.test = None  # type: 'Config'
         self.sensors = None  # type: 'Config'
-        self.discovery = None  # type: Set[str]
+        self.discover = None  # type: Set[str]
 
         self._dct.clear()
         self._dct.update(dct)
diff --git a/wally/console_report.py b/wally/console_report.py
index bfa8881..3733de1 100644
--- a/wally/console_report.py
+++ b/wally/console_report.py
@@ -19,8 +19,8 @@
         for suite in rstorage.iter_suite(FioTest.name):
             table = texttable.Texttable(max_width=200)
 
-            table.header(["Description", "IOPS ~ Dev", 'Skew/Kurt', 'lat med', 'lat 95'])
-            table.set_cols_align(('l', 'r', 'r', 'r', 'r'))
+            table.header(["Description", "IOPS ~ Dev", "BW, MiBps", 'Skew/Kurt', 'lat med, ms', 'lat 95, ms'])
+            table.set_cols_align(('l', 'r', 'r', 'r', 'r', 'r'))
 
             for job in sorted(rstorage.iter_job(suite), key=lambda job: job.params):
                 bw_ts, = list(rstorage.iter_ts(suite, job, metric='bw'))
@@ -31,9 +31,9 @@
                 lat_ts, = list(rstorage.iter_ts(suite, job, metric='lat'))
                 bins_edges = numpy.array(get_lat_vals(lat_ts.data.shape[1]), dtype='float32') / 1000  # convert us to ms
                 lat_props = calc_histo_stat_props(lat_ts, bins_edges)
-
                 table.add_row([job.params.summary,
                                "{} ~ {}".format(float2str(avg_iops), float2str(iops_dev)),
+                               float2str(props.average / 1024),  # Ki -> Mi
                                "{}/{}".format(float2str(props.skew), float2str(props.kurt)),
                                float2str(lat_props.perc_50), float2str(lat_props.perc_95)])
 
diff --git a/wally/data_selectors.py b/wally/data_selectors.py
new file mode 100644
index 0000000..53b822b
--- /dev/null
+++ b/wally/data_selectors.py
@@ -0,0 +1,411 @@
+import ctypes
+import logging
+import os.path
+from typing import Tuple, List, Iterable, Iterator, Optional, Union
+from fractions import Fraction
+
+import numpy
+
+from cephlib.numeric import auto_edges2
+
+import wally
+from .hlstorage import ResultStorage
+from .node_interfaces import NodeInfo
+from .result_classes import DataSource, TimeSeries, SuiteConfig, JobConfig
+from .suits.io.fio import FioJobConfig
+from .suits.io.fio_hist import expected_lat_bins
+from .utils import unit_conversion_coef
+
+
+logger = logging.getLogger("wally")
+
+# Separately for each test heatmaps & agg acroos whole time histos:
+#   * fio latency heatmap for all instances
+#   * data dev iops across all osd
+#   * data dev bw across all osd
+#   * date dev qd across all osd
+#   * journal dev iops across all osd
+#   * journal dev bw across all osd
+#   * journal dev qd across all osd
+#   * net dev pps across all hosts
+#   * net dev bps across all hosts
+
+# Main API's
+#   get sensors by pattern
+#   allign values to seconds
+#   cut ranges for particular test
+#   transform into 2d histos (either make histos or rebin them) and clip outliers same time
+
+
+AGG_TAG = 'ALL'
+
+
+def find_nodes_by_roles(rstorage: ResultStorage, node_roles: Iterable[str]) -> List[NodeInfo]:
+    nodes = rstorage.storage.load_list(NodeInfo, 'all_nodes')  # type: List[NodeInfo]
+    node_roles_s = set(node_roles)
+    return [node for node in nodes if node.roles.intersection(node_roles_s)]
+
+
+def find_all_sensors(rstorage: ResultStorage,
+                     node_roles: Iterable[str],
+                     sensor: str,
+                     metric: str) -> Iterator[TimeSeries]:
+    all_nodes_rr = "|".join(node.node_id for node in find_nodes_by_roles(rstorage, node_roles))
+    all_nodes_rr = "(?P<node>{})".format(all_nodes_rr)
+
+    for path, ds in rstorage.iter_sensors(all_nodes_rr, sensor=sensor, metric=metric):
+        ts = rstorage.load_sensor(ds)
+
+        # for sensors ts.times is array of pairs - collection_start_at, colelction_finished_at
+        # to make this array consistent with times in load data second item if each pair is dropped
+        ts.times = ts.times[::2]
+        yield ts
+
+
+def find_all_series(rstorage: ResultStorage, suite: SuiteConfig, job: JobConfig, metric: str) -> Iterator[TimeSeries]:
+    "Iterated over selected metric for all nodes for given Suite/job"
+    return rstorage.iter_ts(suite, job, metric=metric)
+
+
+def get_aggregated(rstorage: ResultStorage, suite: SuiteConfig, job: FioJobConfig, metric: str) -> TimeSeries:
+    "Sum selected metric for all nodes for given Suite/job"
+
+    tss = list(find_all_series(rstorage, suite, job, metric))
+
+    if len(tss) == 0:
+        raise NameError("Can't found any TS for {},{},{}".format(suite, job, metric))
+
+    ds = DataSource(suite_id=suite.storage_id,
+                    job_id=job.storage_id,
+                    node_id=AGG_TAG,
+                    sensor='fio',
+                    dev=AGG_TAG,
+                    metric=metric,
+                    tag='csv')
+
+    agg_ts = TimeSeries(metric,
+                        raw=None,
+                        source=ds,
+                        data=numpy.zeros(tss[0].data.shape, dtype=tss[0].data.dtype),
+                        times=tss[0].times.copy(),
+                        units=tss[0].units,
+                        histo_bins=tss[0].histo_bins,
+                        time_units=tss[0].time_units)
+
+    for ts in tss:
+        if metric == 'lat' and (len(ts.data.shape) != 2 or ts.data.shape[1] != expected_lat_bins):
+            msg = "Sensor {}.{} on node %s has shape={}. Can only process sensors with shape=[X, {}].".format(
+                         ts.source.dev, ts.source.sensor, ts.source.node_id, ts.data.shape, expected_lat_bins)
+            logger.error(msg)
+            raise ValueError(msg)
+
+        if metric != 'lat' and len(ts.data.shape) != 1:
+            msg = "Sensor {}.{} on node {} has shape={}. Can only process 1D sensors.".format(
+                         ts.source.dev, ts.source.sensor, ts.source.node_id, ts.data.shape)
+            logger.error(msg)
+            raise ValueError(msg)
+
+        # TODO: match times on different ts
+        agg_ts.data += ts.data
+
+    return agg_ts
+
+
+interpolated_cache = {}
+
+
+def interpolate_ts_on_seconds_border(ts: TimeSeries, nc: bool = False) -> TimeSeries:
+    "Interpolate time series to values on seconds borders"
+
+    if not nc and ts.source.tpl in interpolated_cache:
+        return interpolated_cache[ts.source.tpl]
+
+    assert len(ts.times) == len(ts.data), "Time(={}) and data(={}) sizes doesn't equal for {!s}"\
+            .format(len(ts.times), len(ts.data), ts.source)
+
+    rcoef = 1 / unit_conversion_coef(ts.time_units, 's')  # type: Union[int, Fraction]
+
+    if isinstance(rcoef, Fraction):
+        assert rcoef.denominator == 1, "Incorrect conversion coef {!r}".format(rcoef)
+        rcoef = rcoef.numerator
+
+    assert rcoef >= 1 and isinstance(rcoef, int), "Incorrect conversion coef {!r}".format(rcoef)
+    coef = int(rcoef)   # make typechecker happy
+
+    # round to seconds border
+    begin = int(ts.times[0] / coef + 1) * coef
+    end = int(ts.times[-1] / coef) * coef
+
+    # current real data time chunk begin time
+    edge_it = iter(ts.times)
+
+    # current real data value
+    val_it = iter(ts.data)
+
+    # result array, cumulative value per second
+    result = numpy.empty([(end - begin) // coef], dtype=ts.data.dtype)
+    idx = 0
+    curr_summ = 0
+
+    # end of current time slot
+    results_cell_ends = begin + coef
+
+    # hack to unify looping
+    real_data_end = next(edge_it)
+    while results_cell_ends <= end:
+        real_data_start = real_data_end
+        real_data_end = next(edge_it)
+        real_val_left = next(val_it)
+
+        # real data "speed" for interval [real_data_start, real_data_end]
+        real_val_ps = float(real_val_left) / (real_data_end - real_data_start)
+
+        while real_data_end >= results_cell_ends and results_cell_ends <= end:
+            # part of current real value, which is fit into current result cell
+            curr_real_chunk = int((results_cell_ends - real_data_start) * real_val_ps)
+
+            # calculate rest of real data for next result cell
+            real_val_left -= curr_real_chunk
+            result[idx] = curr_summ + curr_real_chunk
+            idx += 1
+            curr_summ = 0
+
+            # adjust real data start time
+            real_data_start = results_cell_ends
+            results_cell_ends += coef
+
+        # don't lost any real data
+        curr_summ += real_val_left
+
+    assert idx == len(result), "Wrong output array size - idx(={}) != len(result)(={})".format(idx, len(result))
+
+    res_ts = TimeSeries(ts.name, None, result,
+                        times=int(begin // coef) + numpy.arange(idx, dtype=ts.times.dtype),
+                        units=ts.units,
+                        time_units='s',
+                        source=ts.source(),
+                        histo_bins=ts.histo_bins)
+
+    if not nc:
+        interpolated_cache[ts.source.tpl] = res_ts
+
+    return res_ts
+
+
+c_interp_func = None
+cdll = None
+
+
+def c_interpolate_ts_on_seconds_border(ts: TimeSeries, nc: bool = False) -> TimeSeries:
+    "Interpolate time series to values on seconds borders"
+
+    if not nc and ts.source.tpl in interpolated_cache:
+        return interpolated_cache[ts.source.tpl]
+
+    assert len(ts.times) == len(ts.data), "Time(={}) and data(={}) sizes doesn't equal for {!s}"\
+            .format(len(ts.times), len(ts.data), ts.source)
+
+    rcoef = 1 / unit_conversion_coef(ts.time_units, 's')  # type: Union[int, Fraction]
+
+    if isinstance(rcoef, Fraction):
+        assert rcoef.denominator == 1, "Incorrect conversion coef {!r}".format(rcoef)
+        rcoef = rcoef.numerator
+
+    assert rcoef >= 1 and isinstance(rcoef, int), "Incorrect conversion coef {!r}".format(rcoef)
+    coef = int(rcoef)   # make typechecker happy
+
+    global cdll
+    global c_interp_func
+    uint64_p = ctypes.POINTER(ctypes.c_uint64)
+
+    if c_interp_func is None:
+        dirname = os.path.dirname(os.path.dirname(wally.__file__))
+        path = os.path.join(dirname, 'clib', 'libwally.so')
+        cdll = ctypes.CDLL(path)
+        c_interp_func = cdll.interpolate_ts_on_seconds_border_v2
+        c_interp_func.argtypes = [
+            ctypes.c_uint,  # input_size
+            ctypes.c_uint,  # output_size
+            uint64_p,  # times
+            uint64_p,  # values
+            ctypes.c_uint,  # time_scale_coef
+            uint64_p,  # output
+        ]
+        c_interp_func.restype = None
+
+    assert ts.data.dtype.name == 'uint64', "Data dtype for {}=={} != uint64".format(ts.source, ts.data.dtype.name)
+    assert ts.times.dtype.name == 'uint64', "Time dtype for {}=={} != uint64".format(ts.source, ts.times.dtype.name)
+
+    output_sz = int(ts.times[-1]) // coef - int(ts.times[0]) // coef + 2
+    # print("output_sz =", output_sz, "coef =", coef)
+    result = numpy.zeros(output_sz, dtype=ts.data.dtype.name)
+
+    c_interp_func(ts.data.size,
+                  output_sz,
+                  ts.times.ctypes.data_as(uint64_p),
+                  ts.data.ctypes.data_as(uint64_p),
+                  coef,
+                  result.ctypes.data_as(uint64_p))
+
+    res_ts = TimeSeries(ts.name, None, result,
+                        times=int(ts.times[0] // coef) + numpy.arange(output_sz, dtype=ts.times.dtype),
+                        units=ts.units,
+                        time_units='s',
+                        source=ts.source(),
+                        histo_bins=ts.histo_bins)
+
+    if not nc:
+        interpolated_cache[ts.source.tpl] = res_ts
+    return res_ts
+
+
+def get_ts_for_time_range(ts: TimeSeries, time_range: Tuple[int, int]) -> TimeSeries:
+    """Return sensor values for given node for given period. Return per second estimated values array
+    Raise an error if required range is not full covered by data in storage"""
+
+    assert ts.time_units == 's', "{} != s for {!s}".format(ts.time_units, ts.source)
+    assert len(ts.times) == len(ts.data), "Time(={}) and data(={}) sizes doesn't equal for {!s}"\
+            .format(len(ts.times), len(ts.data), ts.source)
+
+    if time_range[0] < ts.times[0] or time_range[1] > ts.times[-1]:
+        raise AssertionError(("Incorrect data for get_sensor - time_range={!r}, collected_at=[{}, ..., {}]," +
+                              "sensor = {}_{}.{}.{}").format(time_range, ts.times[0], ts.times[-1],
+                                                             ts.source.node_id, ts.source.sensor, ts.source.dev,
+                                                             ts.source.metric))
+    idx1, idx2 = numpy.searchsorted(ts.times, time_range)
+    return TimeSeries(ts.name, None,
+                      ts.data[idx1:idx2],
+                      times=ts.times[idx1:idx2],
+                      units=ts.units,
+                      time_units=ts.time_units,
+                      source=ts.source,
+                      histo_bins=ts.histo_bins)
+
+
+def make_2d_histo(tss: List[TimeSeries],
+                  outliers_range: Tuple[float, float] = (0.02, 0.98),
+                  bins_count: int = 20,
+                  log_bins: bool = False) -> TimeSeries:
+
+    # validate input data
+    for ts in tss:
+        assert len(ts.times) == len(ts.data), "Time(={}) and data(={}) sizes doesn't equal for {!s}"\
+                .format(len(ts.times), len(ts.data), ts.source)
+        assert ts.time_units == 's', "All arrays should have the same data units"
+        assert ts.units == tss[0].units, "All arrays should have the same data units"
+        assert ts.data.shape == tss[0].data.shape, "All arrays should have the same data size"
+        assert len(ts.data.shape) == 1, "All arrays should be 1d"
+
+    whole_arr = numpy.concatenate([ts.data for ts in tss])
+    whole_arr.shape = [len(tss), -1]
+
+    if outliers_range is not None:
+        max_vl, begin, end, min_vl = numpy.percentile(whole_arr,
+                                                      [0, outliers_range[0] * 100, outliers_range[1] * 100, 100])
+        bins_edges = auto_edges2(begin, end, bins=bins_count, log_space=log_bins)
+        fixed_bins_edges = bins_edges.copy()
+        fixed_bins_edges[0] = begin
+        fixed_bins_edges[-1] = end
+    else:
+        begin, end = numpy.percentile(whole_arr, [0, 100])
+        bins_edges = auto_edges2(begin, end, bins=bins_count, log_space=log_bins)
+        fixed_bins_edges = bins_edges
+
+    res_data = numpy.concatenate(numpy.histogram(column, fixed_bins_edges) for column in whole_arr.T)
+    res_data.shape = (len(tss), -1)
+    res = TimeSeries(name=tss[0].name,
+                     raw=None,
+                     data=res_data,
+                     times=tss[0].times,
+                     units=tss[0].units,
+                     source=tss[0].source,
+                     time_units=tss[0].time_units,
+                     histo_bins=bins_edges)
+    return res
+
+
+def aggregate_histograms(tss: List[TimeSeries],
+                         outliers_range: Tuple[float, float] = (0.02, 0.98),
+                         bins_count: int = 20,
+                         log_bins: bool = False) -> TimeSeries:
+
+    # validate input data
+    for ts in tss:
+        assert len(ts.times) == len(ts.data), "Need to use stripped time"
+        assert ts.time_units == 's', "All arrays should have the same data units"
+        assert ts.units == tss[0].units, "All arrays should have the same data units"
+        assert ts.data.shape == tss[0].data.shape, "All arrays should have the same data size"
+        assert len(ts.data.shape) == 2, "All arrays should be 2d"
+        assert ts.histo_bins is not None, "All arrays should be 2d"
+
+    whole_arr = numpy.concatenate([ts.data for ts in tss])
+    whole_arr.shape = [len(tss), -1]
+
+    max_val = whole_arr.min()
+    min_val = whole_arr.max()
+
+    if outliers_range is not None:
+        begin, end = numpy.percentile(whole_arr, [outliers_range[0] * 100, outliers_range[1] * 100])
+    else:
+        begin = min_val
+        end = max_val
+
+    bins_edges = auto_edges2(begin, end, bins=bins_count, log_space=log_bins)
+
+    if outliers_range is not None:
+        fixed_bins_edges = bins_edges.copy()
+        fixed_bins_edges[0] = begin
+        fixed_bins_edges[-1] = end
+    else:
+        fixed_bins_edges = bins_edges
+
+    res_data = numpy.concatenate(numpy.histogram(column, fixed_bins_edges) for column in whole_arr.T)
+    res_data.shape = (len(tss), -1)
+    return TimeSeries(name=tss[0].name,
+                      raw=None,
+                      data=res_data,
+                      times=tss[0].times,
+                      units=tss[0].units,
+                      source=tss[0].source,
+                      time_units=tss[0].time_units,
+                      histo_bins=fixed_bins_edges)
+
+
+def summ_sensors(rstorage: ResultStorage,
+                 roles: List[str],
+                 sensor: str,
+                 metric: str,
+                 time_range: Tuple[int, int]) -> Optional[TimeSeries]:
+
+    res = None  # type: Optional[TimeSeries]
+    for node in find_nodes_by_roles(rstorage, roles):
+        for _, ds in rstorage.iter_sensors(node_id=node.node_id, sensor=sensor, metric=metric):
+            data = rstorage.load_sensor(ds)
+            data = c_interpolate_ts_on_seconds_border(data)
+            data = get_ts_for_time_range(data, time_range)
+            if res is None:
+                res = data
+                res.data = res.data.copy()
+            else:
+                res.data += data.data
+    return res
+
+
+def find_sensors_to_2d(rstorage: ResultStorage,
+                       roles: List[str],
+                       sensor: str,
+                       devs: List[str],
+                       metric: str,
+                       time_range: Tuple[int, int]) -> numpy.ndarray:
+
+    res = []  # type: List[TimeSeries]
+    for node in find_nodes_by_roles(rstorage, roles):
+        for dev in devs:
+            for _, ds in rstorage.iter_sensors(node_id=node.node_id, sensor=sensor, dev=dev, metric=metric):
+                data = rstorage.load_sensor(ds)
+                data = c_interpolate_ts_on_seconds_border(data)
+                data = get_ts_for_time_range(data, time_range)
+                res.append(data.data)
+    res2d = numpy.concatenate(res)
+    res2d.shape = ((len(res), -1))
+    return res2d
diff --git a/wally/fuel.py b/wally/fuel.py
index a565805..3a11403 100644
--- a/wally/fuel.py
+++ b/wally/fuel.py
@@ -37,9 +37,9 @@
         pass
 
     def run(self, ctx: TestRun) -> None:
-        full_discovery = 'fuel' in ctx.config.discovery
-        metadata_only = (not full_discovery) and ('metadata' in ctx.config.discovery)
-        ignore_errors = 'ignore_errors' in ctx.config.discovery
+        full_discovery = 'fuel' in ctx.config.discover
+        metadata_only = (not full_discovery) and ('metadata' in ctx.config.discover)
+        ignore_errors = 'ignore_errors' in ctx.config.discover
 
         if not (metadata_only or full_discovery):
             logger.debug("Skip ceph discovery due to config setting")
diff --git a/wally/hlstorage.py b/wally/hlstorage.py
index 411b515..733eab0 100644
--- a/wally/hlstorage.py
+++ b/wally/hlstorage.py
@@ -1,12 +1,12 @@
 import os
 import pprint
 import logging
-from typing import cast, Iterator, Tuple, Type, Dict, Optional, Any, List
+from typing import cast, Iterator, Tuple, Type, Dict, Optional, List
 
 import numpy
 
 from .suits.job import JobConfig
-from .result_classes import SuiteConfig, TimeSeries, DataSource, StatProps, IResultStorage
+from .result_classes import SuiteConfig, TimeSeries, DataSource, StatProps, IResultStorage, ArrayData
 from .storage import Storage
 from .utils import StopTestError
 from .suits.all_suits import all_suits
@@ -75,87 +75,140 @@
 
     def __init__(self, storage: Storage) -> None:
         self.storage = storage
-        self.cache = {}  # type: Dict[str, Tuple[int, int, Any, List[str]]]
+        self.cache = {}  # type: Dict[str, Tuple[int, int, ArrayData]]
 
     def sync(self) -> None:
         self.storage.sync()
 
     #  -----------------  SERIALIZATION / DESERIALIZATION  -------------------------------------------------------------
-    def load_array(self, path: str, skip_shape: bool = False) -> Tuple[numpy.array, Tuple[str, ...]]:
+    def read_headers(self, fd) -> Tuple[str, List[str], List[str], Optional[numpy.ndarray]]:
+        header = fd.readline().decode(self.csv_file_encoding).rstrip().split(",")
+        dtype, has_header2, header2_dtype, *ext_header = header
+
+        if has_header2 == 'true':
+            ln = fd.readline().decode(self.csv_file_encoding).strip()
+            header2 = numpy.fromstring(ln, sep=',', dtype=header2_dtype)
+        else:
+            assert has_header2 == 'false', \
+                "In file {} has_header2 is not true/false, but {!r}".format(fd.name, has_header2)
+            header2 = None
+        return dtype, ext_header, header, header2
+
+    def load_array(self, path: str) -> ArrayData:
+        """
+        Load array from file, shoult not be called directly
+        :param path: file path
+        :return: ArrayData
+        """
         with self.storage.get_fd(path, "rb") as fd:
+            fd.seek(0, os.SEEK_SET)
+
             stats = os.fstat(fd.fileno())
             if path in self.cache:
-                size, atime, obj, header = self.cache[path]
+                size, atime, arr_info = self.cache[path]
                 if size == stats.st_size and atime == stats.st_atime_ns:
-                    return obj, header
+                    return arr_info
 
-            header = fd.readline().decode(self.csv_file_encoding).strip().split(",")
+            data_dtype, header, _, header2 = self.read_headers(fd)
+            assert data_dtype == 'uint64', path
+            dt = fd.read().decode(self.csv_file_encoding).strip()
 
-            if skip_shape:
-                header = header[1:]
-            dt = fd.read().decode("utf-8").strip()
+        if len(dt) != 0:
+            arr = numpy.fromstring(dt.replace("\n", ','), sep=',', dtype=data_dtype)
+            lines = dt.count("\n") + 1
+            assert len(set(ln.count(',') for ln in dt.split("\n"))) == 1, \
+                "Data lines in {!r} have different element count".format(path)
+            arr.shape = [lines] if lines == arr.size else [lines, -1]
+        else:
+            arr = None
 
-            arr = numpy.fromstring(dt.replace("\n", ','), sep=',', dtype=header[0])
-            if len(dt) != 0:
-                lines = dt.count("\n") + 1
-                columns = dt.split("\n", 1)[0].count(",") + 1
-                assert lines * columns == len(arr)
-                if columns == 1:
-                    arr.shape = (lines,)
-                else:
-                    arr.shape = (lines, columns)
+        arr_data = ArrayData(header, header2, arr)
+        self.cache[path] = (stats.st_size, stats.st_atime_ns, arr_data)
+        return arr_data
 
-        self.cache[path] = (stats.st_size, stats.st_atime_ns, arr, header[1:])
-        return arr, header[1:]
+    def put_array(self, path: str, data: numpy.array, header: List[str], header2: numpy.ndarray = None,
+                  append_on_exists: bool = False) -> None:
 
-    def put_array(self, path:str, data: numpy.array, header: List[str], append_on_exists: bool = False) -> None:
-        header = [data.dtype.name] + header
+        header = [data.dtype.name] + \
+                 (['false', ''] if header2 is None else ['true', header2.dtype.name]) + \
+                 header
 
         exists = append_on_exists and path in self.storage
-        if len(data.shape) == 1:
-            # make array vertical to simplify reading
-            vw = data.view().reshape((data.shape[0], 1))
-        else:
-            vw = data
+        vw = data.view().reshape((data.shape[0], 1)) if len(data.shape) == 1 else data
+        mode = "cb" if not exists else "rb+"
 
-        with self.storage.get_fd(path, "cb" if not exists else "rb+") as fd:
+        with self.storage.get_fd(path, mode) as fd:
             if exists:
-                curr_header = fd.readline().decode(self.csv_file_encoding).rstrip().split(",")
-                assert header == curr_header, \
-                    "Path {!r}. Expected header ({!r}) and current header ({!r}) don't match"\
-                        .format(path, header, curr_header)
+                data_dtype, _, full_header, curr_header2 = self.read_headers(fd)
+
+                assert data_dtype == data.dtype.name, \
+                    "Path {!r}. Passed data type ({!r}) and current data type ({!r}) doesn't match"\
+                        .format(path, data.dtype.name, data_dtype)
+
+                assert header == full_header, \
+                    "Path {!r}. Passed header ({!r}) and current header ({!r}) doesn't match"\
+                        .format(path, header, full_header)
+
+                assert header2 == curr_header2, \
+                    "Path {!r}. Passed header2 != current header2: {!r}\n{!r}"\
+                        .format(path, header2, curr_header2)
+
                 fd.seek(0, os.SEEK_END)
             else:
                 fd.write((",".join(header) + "\n").encode(self.csv_file_encoding))
+                if header2 is not None:
+                    fd.write((",".join(map(str, header2)) + "\n").encode(self.csv_file_encoding))
 
             numpy.savetxt(fd, vw, delimiter=',', newline="\n", fmt="%lu")
 
     def load_ts(self, ds: DataSource, path: str) -> TimeSeries:
-        arr, header = self.load_array(path, skip_shape=True)
-        units, time_units = header
+        """
+        Load time series, generated by fio or other tool, should not be called directly,
+        use iter_ts istead.
+        :param ds: data source path
+        :param path: path in data storage
+        :return: TimeSeries
+        """
+        (units, time_units), header2, data = self.load_array(path)
+        times = data[:,0]
+        ts_data = data[:,1:]
 
-        data = arr[:,1:]
-        if data.shape[1] == 1:
-            data = data.reshape((-1,))
+        if ts_data.shape[1] == 1:
+            ts_data.shape = (ts_data.shape[0],)
 
         return TimeSeries("{}.{}".format(ds.dev, ds.sensor),
                           raw=None,
-                          data=data,
-                          times=arr[:,0],
+                          data=ts_data,
+                          times=times,
                           source=ds,
                           units=units,
-                          time_units=time_units)
+                          time_units=time_units,
+                          histo_bins=header2)
 
     def load_sensor(self, ds: DataSource) -> TimeSeries:
-        collected_at, collect_header = self.load_array(DB_paths.sensor_time.format(**ds.__dict__))
-        assert collect_header == [ds.node_id, 'collected_at', 'us'], repr(collect_header)
-        data, data_header = self.load_array(DB_paths.sensor_data.format(**ds.__dict__))
+        # sensors has no shape
+        path = DB_paths.sensor_time.format(**ds.__dict__)
+        collect_header, must_be_none, collected_at = self.load_array(path)
+
+        # cut 'collection end' time
+        collected_at = collected_at[::2]
+
+        # there must be no histogram for collected_at
+        assert must_be_none is None, "Extra header2 {!r} in collect_at file at {!r}".format(must_be_none, path)
+        assert collect_header == [ds.node_id, 'collected_at', 'us'],\
+            "Unexpected collect_at header {!r} at {!r}".format(collect_header, path)
+        assert len(collected_at.shape) == 1, "Collected_at must be 1D at {!r}".format(path)
+
+        data_path = DB_paths.sensor_data.format(**ds.__dict__)
+        data_header, must_be_none, data  = self.load_array(data_path)
+
+        # there must be no histogram for any sensors
+        assert must_be_none is None, "Extra header2 {!r} in sensor data file {!r}".format(must_be_none, data_path)
 
         data_units = data_header[2]
-        assert data_header == [ds.node_id, ds.metric_fqdn, data_units]
-
-        assert len(data.shape) == 1
-        assert len(collected_at.shape) == 1
+        assert data_header == [ds.node_id, ds.metric_fqdn, data_units], \
+            "Unexpected data header {!r} at {!r}".format(data_header, data_path)
+        assert len(data.shape) == 1, "Sensor data must be 1D at {!r}".format(data_path)
 
         return TimeSeries(ds.metric_fqdn,
                           raw=None,
@@ -190,12 +243,12 @@
         self.storage.put(job, path)
 
     def put_ts(self, ts: TimeSeries) -> None:
-        assert ts.data.dtype == ts.times.dtype
-        assert ts.data.dtype.kind == 'u'
-        assert ts.source.tag == self.ts_arr_tag
-
+        assert ts.data.dtype == ts.times.dtype, "Data type {!r} != time type {!r}".format(ts.data.dtype, ts.times.dtype)
+        assert ts.data.dtype.kind == 'u', "Only unsigned ints are accepted"
+        assert ts.source.tag == self.ts_arr_tag, "Incorrect source tag == {!r}, must be {!r}".format(ts.source.tag,
+                                                                                                     self.ts_arr_tag)
         csv_path = DB_paths.ts.format(**ts.source.__dict__)
-        header = [ts.data.dtype.name, ts.units, ts.time_units]
+        header = [ts.units, ts.time_units]
 
         tv = ts.times.view().reshape((-1, 1))
         if len(ts.data.shape) == 1:
@@ -204,7 +257,10 @@
             dv = ts.data
 
         result = numpy.concatenate((tv, dv), axis=1)
-        self.put_array(csv_path, result, header)
+        if ts.histo_bins is not None:
+            self.put_array(csv_path, result, header, header2=ts.histo_bins)
+        else:
+            self.put_array(csv_path, result, header)
 
         if ts.raw:
             raw_path = DB_paths.ts.format(**ts.source(tag=ts.raw_tag).__dict__)
@@ -225,14 +281,20 @@
     def put_report(self, report: str, name: str) -> str:
         return self.storage.put_raw(report.encode(self.csv_file_encoding), DB_paths.report_root + name)
 
-    def append_sensor(self, data: numpy.array, ds: DataSource, units: str) -> None:
+    def append_sensor(self, data: numpy.array, ds: DataSource, units: str, histo_bins: numpy.ndarray = None) -> None:
         if ds.metric == 'collected_at':
             path = DB_paths.sensor_time
             metrics_fqn = 'collected_at'
         else:
             path = DB_paths.sensor_data
             metrics_fqn = ds.metric_fqdn
-        self.put_array(path.format(**ds.__dict__), data, [ds.node_id, metrics_fqn, units], append_on_exists=True)
+
+        if ds.metric == 'lat':
+            assert len(data.shape) == 2, "Latency should be histo array"
+            assert histo_bins is not None, "Latency should have histo bins"
+
+        path = path.format(**ds.__dict__)
+        self.put_array(path, data, [ds.node_id, metrics_fqn, units], header2=histo_bins, append_on_exists=True)
 
     # -------------   GET DATA FROM STORAGE   --------------------------------------------------------------------------
 
@@ -284,11 +346,12 @@
             yield self.load_ts(ds, path)
 
     def iter_sensors(self, node_id: str = None, sensor: str = None, dev: str = None, metric: str = None) -> \
-            Iterator[Tuple[str, Dict[str, str]]]:
-
-        path = fill_path(DB_paths.sensor_data_r, node_id=node_id, sensor=sensor, dev=dev, metric=metric)
+            Iterator[Tuple[str, DataSource]]:
+        vls = dict(node_id=node_id, sensor=sensor, dev=dev, metric=metric)
+        path = fill_path(DB_paths.sensor_data_r, **vls)
         for is_file, path, groups in self.iter_paths(path):
-            assert is_file
-            yield path, groups
+            cvls = vls.copy()
+            cvls.update(groups)
+            yield path, DataSource(**cvls)
 
 
diff --git a/wally/main.py b/wally/main.py
index b931929..93922dd 100644
--- a/wally/main.py
+++ b/wally/main.py
@@ -4,10 +4,8 @@
 import pprint
 import getpass
 import logging
-import tempfile
 import argparse
 import functools
-import subprocess
 import contextlib
 from typing import List, Tuple, Any, Callable, IO, cast, Optional, Iterator
 from yaml import load as _yaml_load
@@ -43,7 +41,7 @@
 
 
 # stages
-from .ceph import DiscoverCephStage
+from .ceph import DiscoverCephStage, FillCephInfoStage
 from .openstack import DiscoverOSStage
 from .fuel import DiscoverFuelStage
 from .run_test import (CollectInfoStage, ExplicitNodesStage, SaveNodesStage,
@@ -212,6 +210,7 @@
 
 def get_run_stages() -> List[Stage]:
     return [DiscoverCephStage(),
+            FillCephInfoStage(),
             DiscoverOSStage(),
             DiscoverFuelStage(),
             ExplicitNodesStage(),
@@ -245,7 +244,7 @@
         config.build_description = opts.build_description
         config.build_type = opts.build_type
         config.settings_dir = get_config_path(config, opts.settings_dir)
-        config.discovery = set(config.get('discovery', '').split(","))
+        config.discover = set(name for name in config.get('discover', '').split(",") if name)
 
         storage = make_storage(config.storage_url)
         storage.put(config, 'config')
diff --git a/wally/node_interfaces.py b/wally/node_interfaces.py
index 3992048..b10f83d 100644
--- a/wally/node_interfaces.py
+++ b/wally/node_interfaces.py
@@ -109,19 +109,20 @@
         pass
 
     @abc.abstractmethod
-    def copy_file(self, local_path: str, remote_path: str = None, compress: bool = False) -> str:
+    def copy_file(self, local_path: str, remote_path: str = None,
+                  expanduser: bool = False, compress: bool = False) -> str:
         pass
 
     @abc.abstractmethod
-    def get_file_content(self, path: str, compress: bool = False) -> bytes:
+    def get_file_content(self, path: str, expanduser: bool = False, compress: bool = False) -> bytes:
         pass
 
     @abc.abstractmethod
-    def put_to_file(self, path: Optional[str], content: bytes, compress: bool = False) -> str:
+    def put_to_file(self, path: Optional[str], content: bytes, expanduser: bool = False, compress: bool = False) -> str:
         pass
 
     @abc.abstractmethod
-    def stat_file(self, path:str) -> Any:
+    def stat_file(self, path:str, expanduser: bool = False) -> Any:
         pass
 
     @abc.abstractmethod
diff --git a/wally/openstack.py b/wally/openstack.py
index b046656..88189f5 100644
--- a/wally/openstack.py
+++ b/wally/openstack.py
@@ -105,7 +105,7 @@
         pass
 
     def run(self, ctx: TestRun) -> None:
-        if 'openstack' not in ctx.config.discovery:
+        if 'openstack' not in ctx.config.discover:
             logger.debug("Skip openstack discovery due to settings")
             return
 
@@ -125,7 +125,7 @@
             user, password = os_nodes_auth.split(":")
             key_file = None
 
-        if 'metadata' not in ctx.config.discovery:
+        if 'metadata' not in ctx.config.discover:
             services = ctx.os_connection.nova.services.list()  # type: List[Any]
             host_services_mapping = {}  # type: Dict[str, List[str]]
 
diff --git a/wally/report.py b/wally/report.py
index 75b8028..b8a7713 100644
--- a/wally/report.py
+++ b/wally/report.py
@@ -1,14 +1,23 @@
 import os
 import abc
 import logging
+import warnings
 from io import BytesIO
 from functools import wraps
-from typing import Dict, Any, Iterator, Tuple, cast, List, Callable, Set, Optional
 from collections import defaultdict
+from typing import Dict, Any, Iterator, Tuple, cast, List, Callable, Set
 
 import numpy
 import scipy.stats
+
+import matplotlib
+# matplotlib.use('GTKAgg')
+
 import matplotlib.pyplot as plt
+from matplotlib import gridspec
+
+from cephlib.common import float2str
+from cephlib.plot import plot_hmap_with_y_histo, hmap_from_2d
 
 import wally
 
@@ -17,15 +26,19 @@
 from .test_run_class import TestRun
 from .hlstorage import ResultStorage
 from .node_interfaces import NodeInfo
-from .utils import b2ssize, b2ssize_10, STORAGE_ROLES
+from .utils import b2ssize, b2ssize_10, STORAGE_ROLES, unit_conversion_coef
 from .statistic import (calc_norm_stat_props, calc_histo_stat_props, moving_average, moving_dev,
-                        hist_outliers_perc, ts_hist_outliers_perc, find_ouliers_ts, approximate_curve)
-from .result_classes import (StatProps, DataSource, TimeSeries, NormStatProps, HistoStatProps, SuiteConfig,
-                             IResultStorage)
-from .suits.io.fio_hist import get_lat_vals, expected_lat_bins
+                        hist_outliers_perc, find_ouliers_ts, approximate_curve)
+from .result_classes import (StatProps, DataSource, TimeSeries, NormStatProps, HistoStatProps, SuiteConfig)
 from .suits.io.fio import FioTest, FioJobConfig
 from .suits.io.fio_job import FioJobParams
 from .suits.job import JobConfig
+from .data_selectors import get_aggregated, AGG_TAG, summ_sensors, find_sensors_to_2d, find_nodes_by_roles
+
+
+with warnings.catch_warnings():
+    warnings.simplefilter("ignore")
+    import seaborn
 
 
 logger = logging.getLogger("wally")
@@ -65,6 +78,7 @@
     hist_boxes = 10
     hist_lat_boxes = 25
     hm_hist_bins_count = 25
+    hm_x_slots = 25
     min_points_for_dev = 5
 
     dev_range_x = 2.0
@@ -86,11 +100,12 @@
     extra_io_spine = True
 
     legend_for_eng = True
-    heatmap_interpolation = '1d'
+    # heatmap_interpolation = '1d'
+    heatmap_interpolation = None
     heatmap_interpolation_points = 300
     outliers_q_nd = 3.0
     outliers_hide_q_nd = 4.0
-    outliers_lat = (0.01, 0.995)
+    outliers_lat = (0.01, 0.9)
 
     violin_instead_of_box = True
     violin_point_count = 30000
@@ -105,6 +120,10 @@
         'lat': ("ms", 1, "latency")
     }
 
+    qd_bins = [0, 1, 2, 4, 6, 8, 12, 16, 20, 26, 32, 40, 48, 56, 64, 96, 128]
+    iotime_bins = list(range(0, 1030, 50))
+    block_size_bins = [0, 2, 4, 8, 16, 32, 48, 64, 96, 128, 192, 256, 384, 512, 1024, 2048]
+
 
 # ----------------  STRUCTS  -------------------------------------------------------------------------------------------
 
@@ -151,81 +170,14 @@
 
 def make_iosum(rstorage: ResultStorage, suite: SuiteConfig, job: FioJobConfig) -> IOSummary:
     lat = get_aggregated(rstorage, suite, job, "lat")
-    bins_edges = numpy.array(get_lat_vals(lat.data.shape[1]), dtype='float32') / 1000
     io = get_aggregated(rstorage, suite, job, "bw")
 
     return IOSummary(job.qd,
                      nodes_count=len(suite.nodes_ids),
                      block_size=job.bsize,
-                     lat=calc_histo_stat_props(lat, bins_edges, rebins_count=StyleProfile.hist_boxes),
+                     lat=calc_histo_stat_props(lat, rebins_count=StyleProfile.hist_boxes),
                      bw=calc_norm_stat_props(io, StyleProfile.hist_boxes))
 
-#
-# def iter_io_results(rstorage: ResultStorage,
-#                     qds: List[int] = None,
-#                     op_types: List[str] = None,
-#                     sync_types: List[str] = None,
-#                     block_sizes: List[int] = None) -> Iterator[Tuple[TestSuiteConfig, FioJobConfig]]:
-#
-#     for suite in rstorage.iter_suite(FioTest.name):
-#         for job in rstorage.iter_job(suite):
-#             fjob = cast(FioJobConfig, job)
-#             assert int(fjob.vals['numjobs']) == 1
-#
-#             if sync_types is not None and fjob.sync_mode in sync_types:
-#                 continue
-#
-#             if block_sizes is not None and fjob.bsize not in block_sizes:
-#                 continue
-#
-#             if op_types is not None and fjob.op_type not in op_types:
-#                 continue
-#
-#             if qds is not None and fjob.qd not in qds:
-#                 continue
-#
-#             yield suite, fjob
-
-
-AGG_TAG = 'ALL'
-
-
-def get_aggregated(rstorage: ResultStorage, suite: SuiteConfig, job: FioJobConfig, metric: str) -> TimeSeries:
-    tss = list(rstorage.iter_ts(suite, job, metric=metric))
-    ds = DataSource(suite_id=suite.storage_id,
-                    job_id=job.storage_id,
-                    node_id=AGG_TAG,
-                    sensor='fio',
-                    dev=AGG_TAG,
-                    metric=metric,
-                    tag='csv')
-
-    agg_ts = TimeSeries(metric,
-                        raw=None,
-                        source=ds,
-                        data=numpy.zeros(tss[0].data.shape, dtype=tss[0].data.dtype),
-                        times=tss[0].times.copy(),
-                        units=tss[0].units)
-
-    for ts in tss:
-        if metric == 'lat' and (len(ts.data.shape) != 2 or ts.data.shape[1] != expected_lat_bins):
-            logger.error("Sensor %s.%s on node %s has" +
-                         "shape=%s. Can only process sensors with shape=[X, %s].",
-                         ts.source.dev, ts.source.sensor, ts.source.node_id,
-                         ts.data.shape, expected_lat_bins)
-            raise ValueError()
-
-        if metric != 'lat' and len(ts.data.shape) != 1:
-            logger.error("Sensor %s.%s on node %s has " +
-                         "shape=%s. Can only process 1D sensors.",
-                         ts.source.dev, ts.source.sensor, ts.source.node_id, ts.data.shape)
-            raise ValueError()
-
-        # TODO: match times on different ts
-        agg_ts.data += ts.data
-
-    return agg_ts
-
 
 def is_sensor_numarray(sensor: str, metric: str) -> bool:
     """Returns True if sensor provides one-dimension array of numeric values. One number per one measurement."""
@@ -246,86 +198,6 @@
     """Returns True if sensor provides deltas for cumulative value. E.g. io completed in given period"""
     return not is_level_sensor(sensor, metric)
 
-
-def get_sensor_for_time_range(storage: IResultStorage,
-                              node_id: str,
-                              sensor: str,
-                              dev: str,
-                              metric: str,
-                              time_range: Tuple[int, int]) -> numpy.array:
-    """Return sensor values for given node for given period. Return per second estimated values array
-
-    Raise an error if required range is not full covered by data in storage.
-    First it finds range of results from sensor, which fully covers requested range.
-    ...."""
-
-    ds = DataSource(node_id=node_id, sensor=sensor, dev=dev, metric=metric)
-    sensor_data = storage.load_sensor(ds)
-    assert sensor_data.time_units == 'us'
-
-    # collected_at is array of pairs (collection_started_at, collection_finished_at)
-    # extract start time from each pair
-    collection_start_at = sensor_data.times[::2]  # type: numpy.array
-
-    MICRO = 1000000
-
-    # convert seconds to us
-    begin = time_range[0] * MICRO
-    end = time_range[1] * MICRO
-
-    if begin < collection_start_at[0] or end > collection_start_at[-1] or end <= begin:
-        raise AssertionError(("Incorrect data for get_sensor - time_range={!r}, collected_at=[{}, ..., {}]," +
-                              "sensor = {}_{}.{}.{}").format(time_range,
-                                                             sensor_data.times[0] // MICRO,
-                                                             sensor_data.times[-1] // MICRO,
-                                                             node_id, sensor, dev, metric))
-
-    pos1, pos2 = numpy.searchsorted(collection_start_at, (begin, end))
-
-    # current real data time chunk begin time
-    edge_it = iter(collection_start_at[pos1 - 1: pos2 + 1])
-
-    # current real data value
-    val_it = iter(sensor_data.data[pos1 - 1: pos2 + 1])
-
-    # result array, cumulative value per second
-    result = numpy.zeros(int(end - begin) // MICRO)
-    idx = 0
-    curr_summ = 0
-
-    # end of current time slot
-    results_cell_ends = begin + MICRO
-
-    # hack to unify looping
-    real_data_end = next(edge_it)
-    while results_cell_ends <= end:
-        real_data_start = real_data_end
-        real_data_end = next(edge_it)
-        real_val_left = next(val_it)
-
-        # real data "speed" for interval [real_data_start, real_data_end]
-        real_val_ps = float(real_val_left) / (real_data_end - real_data_start)
-
-        while real_data_end >= results_cell_ends and results_cell_ends <= end:
-            # part of current real value, which is fit into current result cell
-            curr_real_chunk = int((results_cell_ends - real_data_start) * real_val_ps)
-
-            # calculate rest of real data for next result cell
-            real_val_left -= curr_real_chunk
-            result[idx] = curr_summ + curr_real_chunk
-            idx += 1
-            curr_summ = 0
-
-            # adjust real data start time
-            real_data_start = results_cell_ends
-            results_cell_ends += MICRO
-
-        # don't lost any real data
-        curr_summ += real_val_left
-
-    return result
-
-
 # --------------  PLOT HELPERS FUNCTIONS  ------------------------------------------------------------------------------
 
 def get_emb_data_svg(plt: Any, format: str = 'svg') -> bytes:
@@ -411,38 +283,76 @@
 
 
 @provide_plot
-def plot_v_over_time(title: str, units: str,
+def plot_simple_over_time(tss: List[Tuple[str, numpy.ndarray]],
+                          title: str,
+                          ylabel: str,
+                          xlabel: str = "time, s",
+                          average: bool = False,
+                          colors: Any = ColorProfile,
+                          style: Any = StyleProfile) -> None:
+    fig, ax = plt.subplots(figsize=(12, 6))
+    for name, arr in tss:
+        if average:
+            avg_vals = moving_average(arr, style.avg_range)
+            if style.approx_average:
+                time_points = numpy.arange(len(avg_vals))
+                avg_vals = approximate_curve(time_points, avg_vals, time_points, style.curve_approx_level)
+            arr = avg_vals
+        ax.plot(arr, label=name)
+    ax.set_title(title)
+    ax.set_ylabel(ylabel)
+    ax.set_xlabel(xlabel)
+    apply_style(style, eng=True)
+
+
+@provide_plot
+def plot_hmap_from_2d(data2d: numpy.ndarray,
+                      title: str, ylabel: str, xlabel: str = 'time, s', bins: numpy.ndarray = None,
+                      colors: Any = ColorProfile, style: Any = StyleProfile) -> None:
+    ioq1d, ranges = hmap_from_2d(data2d)
+    ax, _ = plot_hmap_with_y_histo(ioq1d, ranges, bins=bins)
+    ax.set_ylabel(ylabel)
+    ax.set_xlabel(xlabel)
+    ax.set_title(title)
+
+
+@provide_plot
+def plot_v_over_time(title: str,
+                     units: str,
                      ts: TimeSeries,
                      plot_avg_dev: bool = True,
+                     plot_points: bool = True,
                      colors: Any = ColorProfile, style: Any = StyleProfile) -> None:
 
     min_time = min(ts.times)
 
-    # /1000 is us to ms conversion
-    time_points = numpy.array([(val_time - min_time) / 1000 for val_time in ts.times])
+    # convert time to ms
+    coef = float(unit_conversion_coef(ts.time_units, 's'))
+    time_points = numpy.array([(val_time - min_time) * coef for val_time in ts.times])
 
     outliers_idxs = find_ouliers_ts(ts.data, cut_range=style.outliers_q_nd)
     outliers_4q_idxs = find_ouliers_ts(ts.data, cut_range=style.outliers_hide_q_nd)
     normal_idxs = numpy.logical_not(outliers_idxs)
     outliers_idxs = outliers_idxs & numpy.logical_not(outliers_4q_idxs)
-    hidden_outliers_count = numpy.count_nonzero(outliers_4q_idxs)
+    # hidden_outliers_count = numpy.count_nonzero(outliers_4q_idxs)
 
     data = ts.data[normal_idxs]
     data_times = time_points[normal_idxs]
     outliers = ts.data[outliers_idxs]
     outliers_times = time_points[outliers_idxs]
 
-    alpha = colors.noise_alpha if plot_avg_dev else 1.0
-    plt.plot(data_times, data, style.point_shape,
-             color=colors.primary_color, alpha=alpha, label="Data")
-    plt.plot(outliers_times, outliers, style.err_point_shape,
-             color=colors.err_color, label="Outliers")
+    if plot_points:
+        alpha = colors.noise_alpha if plot_avg_dev else 1.0
+        plt.plot(data_times, data, style.point_shape,
+                 color=colors.primary_color, alpha=alpha, label="Data")
+        plt.plot(outliers_times, outliers, style.err_point_shape,
+                 color=colors.err_color, label="Outliers")
 
     has_negative_dev = False
     plus_minus = "\xb1"
 
     if plot_avg_dev and len(data) < style.avg_range * 2:
-            logger.warning("Array %r to small to plot average over %s points", title, style.avg_range)
+        logger.warning("Array %r to small to plot average over %s points", title, style.avg_range)
     elif plot_avg_dev:
         avg_vals = moving_average(data, style.avg_range)
         dev_vals = moving_dev(data, style.avg_range)
@@ -467,7 +377,12 @@
 
     plt.xlim(-5, max(time_points) + 5)
     plt.xlabel("Time, seconds from test begin")
-    plt.ylabel("{}. Average and {}stddev over {} points".format(units, plus_minus, style.avg_range))
+
+    if plot_avg_dev:
+        plt.ylabel("{}. Average and {}stddev over {} points".format(units, plus_minus, style.avg_range))
+    else:
+        plt.ylabel(units)
+
     plt.title(title)
 
     if has_negative_dev:
@@ -477,7 +392,9 @@
 
 
 @provide_plot
-def plot_lat_over_time(title: str, ts: TimeSeries, bins_vals: List[int], samples: int = 5,
+def plot_lat_over_time(title: str, ts: TimeSeries,
+                       ylabel: str,
+                       samples: int = 5,
                        colors: Any = ColorProfile,
                        style: Any = StyleProfile) -> None:
 
@@ -499,16 +416,16 @@
             # cut outliers
             idx1, idx2 = hist_outliers_perc(agg_hist, style.outliers_lat)
             agg_hist = agg_hist[idx1:idx2]
-            curr_bins_vals = bins_vals[idx1:idx2]
+            curr_bins_vals = ts.histo_bins[idx1:idx2]
 
             correct_coef = style.violin_point_count / sum(agg_hist)
             if correct_coef > 1:
                 correct_coef = 1
         else:
-            curr_bins_vals = bins_vals
+            curr_bins_vals = ts.histo_bins
             correct_coef = 1
 
-        vals = numpy.empty(shape=(numpy.sum(agg_hist),), dtype='float32')
+        vals = numpy.empty(shape=[numpy.sum(agg_hist)], dtype='float32')
         cidx = 0
 
         non_zero, = agg_hist.nonzero()
@@ -541,36 +458,64 @@
         plt.boxplot(agg_data, 0, '', positions=positions, labels=labels, widths=step / 4)
 
     plt.xlim(min(times), max(times))
+    plt.ylabel(ylabel)
     plt.xlabel("Time, seconds from test begin, sampled for ~{} seconds".format(int(step)))
-    plt.ylabel("Latency, ms")
     plt.title(title)
     apply_style(style, eng=True, no_legend=True)
 
 
 @provide_plot
-def plot_heatmap(title: str,
-                 ts: TimeSeries,
-                 bins_vals: List[int],
-                 colors: Any = ColorProfile,
-                 style: Any = StyleProfile) -> None:
+def plot_histo_heatmap(title: str,
+                       ts: TimeSeries,
+                       ylabel: str,
+                       xlabel: str = "time, s",
+                       colors: Any = ColorProfile,
+                       style: Any = StyleProfile) -> None:
 
+    # only histogram-based ts can be plotted
     assert len(ts.data.shape) == 2
-    assert ts.data.shape[1] == len(bins_vals)
 
+    # Find global outliers. As load is expected to be stable during one job
+    # outliers range can be detected globally
     total_hist = ts.data.sum(axis=0)
+    idx1, idx2 = hist_outliers_perc(total_hist,
+                                    bounds_perc=style.outliers_lat,
+                                    min_bins_left=style.hm_hist_bins_count)
 
-    # idx1, idx2 = hist_outliers_perc(total_hist, style.outliers_lat)
-    idx1, idx2 = ts_hist_outliers_perc(ts.data, bounds_perc=style.outliers_lat)
+    # merge outliers with most close non-outliers cell
+    orig_data = ts.data[:, idx1:idx2].copy()
+    if idx1 > 0:
+        orig_data[:, 0] += ts.data[:, :idx1].sum(axis=1)
 
-    # don't cut too many bins
-    min_bins_left = style.hm_hist_bins_count
-    if idx2 - idx1 < min_bins_left:
-        missed = min_bins_left - (idx2 - idx1) // 2
-        idx2 = min(len(total_hist), idx2 + missed)
-        idx1 = max(0, idx1 - missed)
+    if idx2 < ts.data.shape[1]:
+        orig_data[:, -1] += ts.data[:, idx2:].sum(axis=1)
 
-    data = ts.data[:, idx1:idx2]
-    bins_vals = bins_vals[idx1:idx2]
+    bins_vals = ts.histo_bins[idx1:idx2]
+
+    # rebin over X axis
+    # aggregate some lines in ts.data to plot not more than style.hm_x_slots x bins
+    agg_idx = float(len(orig_data)) / style.hm_x_slots
+    if agg_idx >= 2:
+        data = numpy.zeros([style.hm_x_slots, orig_data.shape[1]], dtype=numpy.float32)  # type: List[numpy.ndarray]
+        next = agg_idx
+        count = 0
+        data_idx = 0
+        for idx, arr in enumerate(orig_data):
+            if idx >= next:
+                data[data_idx] /= count
+                data_idx += 1
+                next += agg_idx
+                count = 0
+            data[data_idx] += arr
+            count += 1
+
+        if count > 1:
+            data[-1] /= count
+    else:
+        data = orig_data
+
+    # rebin over Y axis
+    # =================
 
     # don't using rebin_histogram here, as we need apply same bins for many arrays
     step = (bins_vals[-1] - bins_vals[0]) / style.hm_hist_bins_count
@@ -586,35 +531,38 @@
         cmap.append(curr_bins)
     ncmap = numpy.array(cmap)
 
-    xmin = 0
-    xmax = (ts.times[-1] - ts.times[0]) / 1000 + 1
-    ymin = new_bins_edges[0]
-    ymax = new_bins_edges[-1]
+    # plot data
+    # =========
 
-    fig, ax = plt.subplots(figsize=style.figsize)
+    fig = plt.figure(figsize=(12, 6))
+    boxes = 3
+    gs = gridspec.GridSpec(1, boxes)
+    ax = fig.add_subplot(gs[0, :boxes - 1])
 
-    if style.heatmap_interpolation == '1d':
-        interpolation = 'none'
-        res = []
-        for column in ncmap:
-            new_x = numpy.linspace(0, len(column), style.heatmap_interpolation_points)
-            old_x = numpy.arange(len(column)) + 0.5
-            new_vals = numpy.interp(new_x, old_x, column)
-            res.append(new_vals)
-        ncmap = numpy.array(res)
-    else:
-        interpolation = style.heatmap_interpolation
+    labels = list(map(float2str, (new_bins_edges[:-1] + new_bins_edges[1:]) / 2)) + \
+        [float2str(new_bins_edges[-1]) + "+"]
+    seaborn.heatmap(ncmap[:,::-1].T, xticklabels=False, cmap="Blues", ax=ax)
+    ax.set_yticklabels(labels, rotation='horizontal')
+    ax.set_xticklabels([])
 
-    ax.imshow(ncmap[:,::-1].T,
-              interpolation=interpolation,
-              extent=(xmin, xmax, ymin, ymax),
-              cmap=colors.imshow_colormap)
+    # plot overall histogram
+    # =======================
 
-    ax.set_aspect((xmax - xmin) / (ymax - ymin) * (6 / 9))
-    ax.set_ylabel("Latency, ms")
-    ax.set_xlabel("Test time, s")
+    ax2 = fig.add_subplot(gs[0, boxes - 1])
+    ax2.set_yticklabels([])
+    ax2.set_xticklabels([])
 
-    plt.title(title)
+    histo = ncmap.sum(axis=0).reshape((-1,))
+    ax2.set_ylim(top=histo.size, bottom=0)
+    plt.barh(numpy.arange(histo.size) + 0.5, width=histo, axes=ax2)
+
+    # Set labels
+    # ==========
+
+    ax.set_title(title)
+    ax.set_ylabel(ylabel)
+    ax.set_xlabel(xlabel)
+
 
 
 @provide_plot
@@ -873,28 +821,6 @@
     """Creates graphs, which show how IOPS and Latency depend on block size"""
 
 
-def summ_sensors(rstorage: ResultStorage,
-                 nodes: List[str],
-                 sensor: str,
-                 metric: str,
-                 time_range: Tuple[int, int]) -> Optional[numpy.array]:
-
-    res = None  # type: Optional[numpy.array]
-    for node_id in nodes:
-        for _, groups in rstorage.iter_sensors(node_id=node_id, sensor=sensor, metric=metric):
-            data = get_sensor_for_time_range(rstorage,
-                                             node_id=node_id,
-                                             sensor=sensor,
-                                             dev=groups['dev'],
-                                             metric=metric,
-                                             time_range=time_range)
-            if res is None:
-                res = data
-            else:
-                res += data
-    return res
-
-
 # IOPS/latency distribution
 class StatInfo(JobReporter):
     """Statistic info for job results"""
@@ -957,49 +883,44 @@
             ["Data transfered", b2ssize(io_transfered) + "B", "-"]
         ]
 
-
         storage = rstorage.storage
         nodes = storage.load_list(NodeInfo, 'all_nodes')  # type: List[NodeInfo]
 
-        storage_nodes = [node.node_id for node in nodes if node.roles.intersection(STORAGE_ROLES)]
-        test_nodes = [node.node_id for node in nodes if "testnode" in node.roles]
-
-        trange = (job.reliable_info_range[0] / 1000, job.reliable_info_range[1] / 1000)
+        trange = (job.reliable_info_range[0] // 1000, job.reliable_info_range[1] // 1000)
         ops_done = io_transfered / fjob.bsize / KB
 
         all_metrics = [
-            ("Test nodes net send", 'net-io', 'send_bytes', b2ssize, test_nodes, "B", io_transfered),
-            ("Test nodes net recv", 'net-io', 'recv_bytes', b2ssize, test_nodes, "B", io_transfered),
+            ("Test nodes net send", 'net-io', 'send_bytes', b2ssize, ['testnode'], "B", io_transfered),
+            ("Test nodes net recv", 'net-io', 'recv_bytes', b2ssize, ['testnode'], "B", io_transfered),
 
-            ("Test nodes disk write", 'block-io', 'sectors_written', b2ssize, test_nodes, "B", io_transfered),
-            ("Test nodes disk read", 'block-io', 'sectors_read', b2ssize, test_nodes, "B", io_transfered),
-            ("Test nodes writes", 'block-io', 'writes_completed', b2ssize_10, test_nodes, "OP", ops_done),
-            ("Test nodes reads", 'block-io', 'reads_completed', b2ssize_10, test_nodes, "OP", ops_done),
+            ("Test nodes disk write", 'block-io', 'sectors_written', b2ssize, ['testnode'], "B", io_transfered),
+            ("Test nodes disk read", 'block-io', 'sectors_read', b2ssize, ['testnode'], "B", io_transfered),
+            ("Test nodes writes", 'block-io', 'writes_completed', b2ssize_10, ['testnode'], "OP", ops_done),
+            ("Test nodes reads", 'block-io', 'reads_completed', b2ssize_10, ['testnode'], "OP", ops_done),
 
-            ("Storage nodes net send", 'net-io', 'send_bytes', b2ssize, storage_nodes, "B", io_transfered),
-            ("Storage nodes net recv", 'net-io', 'recv_bytes', b2ssize, storage_nodes, "B", io_transfered),
+            ("Storage nodes net send", 'net-io', 'send_bytes', b2ssize, STORAGE_ROLES, "B", io_transfered),
+            ("Storage nodes net recv", 'net-io', 'recv_bytes', b2ssize, STORAGE_ROLES, "B", io_transfered),
 
-            ("Storage nodes disk write", 'block-io', 'sectors_written', b2ssize, storage_nodes, "B", io_transfered),
-            ("Storage nodes disk read", 'block-io', 'sectors_read', b2ssize, storage_nodes, "B", io_transfered),
-            ("Storage nodes writes", 'block-io', 'writes_completed', b2ssize_10, storage_nodes, "OP", ops_done),
-            ("Storage nodes reads", 'block-io', 'reads_completed', b2ssize_10, storage_nodes, "OP", ops_done),
+            ("Storage nodes disk write", 'block-io', 'sectors_written', b2ssize, STORAGE_ROLES, "B", io_transfered),
+            ("Storage nodes disk read", 'block-io', 'sectors_read', b2ssize, STORAGE_ROLES, "B", io_transfered),
+            ("Storage nodes writes", 'block-io', 'writes_completed', b2ssize_10, STORAGE_ROLES, "OP", ops_done),
+            ("Storage nodes reads", 'block-io', 'reads_completed', b2ssize_10, STORAGE_ROLES, "OP", ops_done),
         ]
 
         all_agg = {}
 
-        for descr, sensor, metric, ffunc, nodes, units, denom in all_metrics:
+        for descr, sensor, metric, ffunc, roles, units, denom in all_metrics:
             if not nodes:
                 continue
 
-            res_arr = summ_sensors(rstorage, nodes=nodes, sensor=sensor, metric=metric, time_range=trange)
-            if res_arr is None:
+            res_ts = summ_sensors(rstorage, roles, sensor=sensor, metric=metric, time_range=trange)
+            if res_ts is None:
                 continue
 
-            agg = res_arr.sum()
+            agg = res_ts.data.sum()
             resource_data.append([descr, ffunc(agg) + units, "{:.1f}".format(agg / denom)])
             all_agg[descr] = agg
 
-
         cums = [
             ("Test nodes writes", "Test nodes reads", "Total test ops", b2ssize_10, "OP", ops_done),
             ("Storage nodes writes", "Storage nodes reads", "Total storage ops", b2ssize_10, "OP", ops_done),
@@ -1018,6 +939,120 @@
         yield Menu1st.per_job, job.summary, HTMLBlock(res)
 
 
+# CPU load
+class CPULoadPlot(JobReporter):
+    def get_divs(self,
+                 suite: SuiteConfig,
+                 job: JobConfig,
+                 rstorage: ResultStorage) -> Iterator[Tuple[str, str, HTMLBlock]]:
+
+        trange = (job.reliable_info_range[0] // 1000, job.reliable_info_range[1] // 1000)
+
+        # plot CPU time
+        for rt, roles in [('storage', STORAGE_ROLES), ('test', ['testnode'])]:
+            cpu_ts = {}
+            cpu_metrics = "idle guest iowait irq nice sirq steal sys user".split()
+            for name in cpu_metrics:
+                cpu_ts[name] = summ_sensors(rstorage, roles, sensor='system-cpu', metric=name, time_range=trange)
+
+            it = iter(cpu_ts.values())
+            total_over_time = next(it).data.copy()
+            for ts in it:
+                total_over_time += ts.data
+
+            fname = plot_simple_over_time(rstorage, cpu_ts['idle'].source(metric='allcpu', tag=rt + '.plt.svg'),
+                                          tss=[(name, ts.data * 100 / total_over_time) for name, ts in cpu_ts.items()],
+                                          average=True,
+                                          ylabel="CPU time %",
+                                          title="{} nodes CPU usage".format(rt.capitalize()))
+
+            yield Menu1st.per_job, job.summary, HTMLBlock(html.img(fname))
+
+
+# IO time and QD
+class QDIOTimeHeatmap(JobReporter):
+    def get_divs(self,
+                 suite: SuiteConfig,
+                 job: JobConfig,
+                 rstorage: ResultStorage) -> Iterator[Tuple[str, str, HTMLBlock]]:
+
+        # TODO: fix this hardcode, need to track what devices are actually used on test and storage nodes
+        # use saved storage info in nodes
+
+        journal_devs = None
+        storage_devs = None
+        test_nodes_devs = ['rbd0']
+
+        for node in find_nodes_by_roles(rstorage, STORAGE_ROLES):
+            cjd = set(node.params['ceph_journal_devs'])
+            if journal_devs is None:
+                journal_devs = cjd
+            else:
+                assert journal_devs == cjd, "{!r} != {!r}".format(journal_devs, cjd)
+
+            csd = set(node.params['ceph_storage_devs'])
+            if storage_devs is None:
+                storage_devs = csd
+            else:
+                assert storage_devs == csd, "{!r} != {!r}".format(storage_devs, csd)
+
+        storage_nodes_devs = list(journal_devs) + list(storage_devs)
+        trange = (job.reliable_info_range[0] // 1000, job.reliable_info_range[1] // 1000)
+
+        for name, devs, roles in [('storage', storage_devs, STORAGE_ROLES),
+                                  ('journal', journal_devs, STORAGE_ROLES),
+                                  ('test', test_nodes_devs, ['testnode'])]:
+            # QD heatmap
+            ioq2d = find_sensors_to_2d(rstorage, roles, sensor='block-io', devs=devs,
+                                       metric='io_queue', time_range=trange)
+            fname = plot_hmap_from_2d(rstorage, DataSource(suite.storage_id,
+                                                           job.storage_id,
+                                                           AGG_TAG,
+                                                           'block-io',
+                                                           name,
+                                                           metric='io_queue',
+                                                           tag="hmap.svg"),
+                                      ioq2d, ylabel="IO QD", title=name.capitalize() + " devs QD",
+                                      bins=StyleProfile.qd_bins,
+                                      xlabel='Time')  # type: str
+            yield Menu1st.per_job, job.summary, HTMLBlock(html.img(fname))
+
+            # Block size heatmap
+            wc2d = find_sensors_to_2d(rstorage, roles, sensor='block-io', devs=devs,
+                                      metric='writes_completed', time_range=trange)
+            wc2d[wc2d < 1E-3] = 1
+            sw2d = find_sensors_to_2d(rstorage, roles, sensor='block-io', devs=devs,
+                                      metric='sectors_written', time_range=trange)
+            data2d = sw2d / wc2d / 1024
+            fname = plot_hmap_from_2d(rstorage, DataSource(suite.storage_id,
+                                                           job.storage_id,
+                                                           AGG_TAG,
+                                                           'block-io',
+                                                           name,
+                                                           metric='wr_block_size',
+                                                           tag="hmap.svg"),
+                                      data2d, ylabel="IO bsize, KiB", title=name.capitalize() + " write block size",
+                                      xlabel='Time',
+                                      bins=StyleProfile.block_size_bins)  # type: str
+            yield Menu1st.per_job, job.summary, HTMLBlock(html.img(fname))
+
+            # iotime heatmap
+            wtime2d = find_sensors_to_2d(rstorage, roles, sensor='block-io', devs=devs,
+                                         metric='io_time', time_range=trange)
+            fname = plot_hmap_from_2d(rstorage, DataSource(suite.storage_id,
+                                                           job.storage_id,
+                                                           AGG_TAG,
+                                                           'block-io',
+                                                           name,
+                                                           metric='io_time',
+                                                           tag="hmap.svg"),
+                                      wtime2d, ylabel="IO time (ms) per second",
+                                      title=name.capitalize() + " iotime",
+                                      xlabel='Time',
+                                      bins=StyleProfile.iotime_bins)  # type: str
+            yield Menu1st.per_job, job.summary, HTMLBlock(html.img(fname))
+
+
 # IOPS/latency distribution
 class IOHist(JobReporter):
     """IOPS.latency distribution histogram"""
@@ -1032,20 +1067,17 @@
 
         yield Menu1st.per_job, fjob.summary, HTMLBlock(html.H2(html.center("Load histograms")))
 
-        agg_lat = get_aggregated(rstorage, suite, fjob, "lat")
-        bins_edges = numpy.array(get_lat_vals(agg_lat.data.shape[1]), dtype='float32') / 1000  # convert us to ms
-        lat_stat_prop = calc_histo_stat_props(agg_lat, bins_edges, rebins_count=StyleProfile.hist_lat_boxes)
-
-        # import IPython
-        # IPython.embed()
-
-        long_summary = cast(FioJobParams, fjob.params).long_summary
-
-        title = "Latency distribution"
-        units = "ms"
-
-        fpath = plot_hist(rstorage, agg_lat.source(tag='hist.svg'), title, units, lat_stat_prop)  # type: str
-        yield Menu1st.per_job, fjob.summary, HTMLBlock(html.img(fpath))
+        # agg_lat = get_aggregated(rstorage, suite, fjob, "lat")
+        # # bins_edges = numpy.array(get_lat_vals(agg_lat.data.shape[1]), dtype='float32') / 1000  # convert us to ms
+        # lat_stat_prop = calc_histo_stat_props(agg_lat, bins_edges=None, rebins_count=StyleProfile.hist_lat_boxes)
+        #
+        # long_summary = cast(FioJobParams, fjob.params).long_summary
+        #
+        # title = "Latency distribution"
+        # units = "ms"
+        #
+        # fpath = plot_hist(rstorage, agg_lat.source(tag='hist.svg'), title, units, lat_stat_prop)  # type: str
+        # yield Menu1st.per_job, fjob.summary, HTMLBlock(html.img(fpath))
 
         agg_io = get_aggregated(rstorage, suite, fjob, "bw")
 
@@ -1075,30 +1107,35 @@
 
         fjob = cast(FioJobConfig, job)
 
-        yield Menu1st.per_job, fjob.summary, HTMLBlock(html.H2(html.center("Load over time")))
-
         agg_io = get_aggregated(rstorage, suite, fjob, "bw")
         if fjob.bsize >= LARGE_BLOCKS:
-            title = "Bandwidth"
+            title = "Fio measured Bandwidth over time"
             units = "MiBps"
             agg_io.data //= MiB2KiB
         else:
-            title = "IOPS"
+            title = "Fio measured IOPS over time"
             agg_io.data //= fjob.bsize
             units = "IOPS"
 
         fpath = plot_v_over_time(rstorage, agg_io.source(tag='ts.svg'), title, units, agg_io)  # type: str
         yield Menu1st.per_job, fjob.summary, HTMLBlock(html.img(fpath))
 
-        agg_lat = get_aggregated(rstorage, suite, fjob, "lat")
-        bins_edges = numpy.array(get_lat_vals(agg_lat.data.shape[1]), dtype='float32') / 1000
-        title = "Latency"
+        agg_lat = get_aggregated(rstorage, suite, fjob, "lat").copy()
+        TARGET_UNITS = 'ms'
+        coef = unit_conversion_coef(agg_lat.units, TARGET_UNITS)
+        agg_lat.histo_bins = agg_lat.histo_bins.copy() * float(coef)
+        agg_lat.units = TARGET_UNITS
 
-        fpath = plot_lat_over_time(rstorage, agg_lat.source(tag='ts.svg'), title, agg_lat, bins_edges)  # type: str
+        fpath = plot_lat_over_time(rstorage, agg_lat.source(tag='ts.svg'), "Latency",
+                                   agg_lat, ylabel="Latency, " + agg_lat.units)  # type: str
         yield Menu1st.per_job, fjob.summary, HTMLBlock(html.img(fpath))
 
-        title = "Latency heatmap"
-        fpath = plot_heatmap(rstorage, agg_lat.source(tag='hmap.png'), title, agg_lat, bins_edges)  # type: str
+        fpath = plot_histo_heatmap(rstorage,
+                                   agg_lat.source(tag='hmap.svg'),
+                                   "Latency heatmap",
+                                   agg_lat,
+                                   ylabel="Latency, " + agg_lat.units,
+                                   xlabel='Test time')  # type: str
 
         yield Menu1st.per_job, fjob.summary, HTMLBlock(html.img(fpath))
 
@@ -1128,40 +1165,21 @@
     storage_sensors = [
         ('block-io', 'reads_completed', "Read ops", 'iops'),
         ('block-io', 'writes_completed', "Write ops", 'iops'),
-        ('block-io', 'sectors_read', "Read kb", 'kb'),
-        ('block-io', 'sectors_written', "Write kb", 'kb'),
+        ('block-io', 'sectors_read', "Read kb", 'KB'),
+        ('block-io', 'sectors_written', "Write kb", 'KB'),
     ]
 
     def get_divs(self,
                  suite: SuiteConfig,
                  job: JobConfig,
                  rstorage: ResultStorage) -> Iterator[Tuple[str, str, HTMLBlock]]:
-        # split nodes on test and other
-        storage = rstorage.storage
-        nodes = storage.load_list(NodeInfo, "all_nodes")  # type: List[NodeInfo]
-
         yield Menu1st.per_job, job.summary, HTMLBlock(html.H2(html.center("Cluster load")))
-        test_nodes = {node.node_id for node in nodes if 'testnode' in node.roles}
-        cluster_nodes = {node.node_id for node in nodes if 'testnode' not in node.roles}
 
         # convert ms to s
         time_range = (job.reliable_info_range[0] // MS2S, job.reliable_info_range[1] // MS2S)
-        len = time_range[1] - time_range[0]
-        for sensor, metric, sensor_title, units in self.storage_sensors:
-            sum_testnode = numpy.zeros((len,))
-            sum_other = numpy.zeros((len,))
-            for path, groups in rstorage.iter_sensors(sensor=sensor, metric=metric):
-                # todo: should return sensor units
-                data = get_sensor_for_time_range(rstorage,
-                                                 groups['node_id'],
-                                                 sensor,
-                                                 groups['dev'],
-                                                 metric, time_range)
-                if groups['node_id'] in test_nodes:
-                    sum_testnode += data
-                else:
-                    sum_other += data
 
+        for sensor, metric, sensor_title, units in self.storage_sensors:
+            ts = summ_sensors(rstorage, ['testnode'], sensor, metric, time_range)
             ds = DataSource(suite_id=suite.storage_id,
                             job_id=job.storage_id,
                             node_id="test_nodes",
@@ -1170,14 +1188,16 @@
                             metric=metric,
                             tag="ts.svg")
 
-            # s to ms
+            data = ts.data if units != 'KB' else ts.data * float(unit_conversion_coef(ts.units, 'KB'))
+
             ts = TimeSeries(name="",
-                            times=numpy.arange(*time_range) * MS2S,
-                            data=sum_testnode,
+                            times=numpy.arange(*time_range),
+                            data=data,
                             raw=None,
-                            units=units,
-                            time_units="us",
-                            source=ds)
+                            units=units if ts.units is None else ts.units,
+                            time_units=ts.time_units,
+                            source=ds,
+                            histo_bins=ts.histo_bins)
 
             fpath = plot_v_over_time(rstorage, ds, sensor_title, sensor_title, ts=ts)  # type: str
             yield Menu1st.per_job, job.summary, HTMLBlock(html.img(fpath))
@@ -1211,11 +1231,12 @@
     def run(self, ctx: TestRun) -> None:
         rstorage = ResultStorage(ctx.storage)
 
-        job_reporters = [StatInfo(), IOTime(), IOHist(), ClusterLoad()] # type: List[JobReporter]
-        reporters = [IO_QD()]  # type: List[Reporter]
+        job_reporters = [StatInfo(), IOTime(), IOHist(), ClusterLoad(), CPULoadPlot(),
+                         QDIOTimeHeatmap()] # type: List[JobReporter]
+        reporters = []
 
+        # reporters = [IO_QD()]  # type: List[Reporter]
         # job_reporters = [ClusterLoad()]
-        # reporters = []
 
         root_dir = os.path.dirname(os.path.dirname(wally.__file__))
         doc_templ_path = os.path.join(root_dir, "report_templates/index.html")
@@ -1232,25 +1253,30 @@
         # StyleProfile.__dict__.update(ctx.config.reporting.style.raw())
 
         items = defaultdict(lambda: defaultdict(list))  # type: Dict[str, Dict[str, List[HTMLBlock]]]
-
+        DEBUG = False
         # TODO: filter reporters
         for suite in rstorage.iter_suite(FioTest.name):
             all_jobs = list(rstorage.iter_job(suite))
             all_jobs.sort(key=lambda job: job.params)
             for job in all_jobs:
                 for reporter in job_reporters:
+                    logger.debug("Start reporter %s on job %s suite %s",
+                                 reporter.__class__.__name__, job.summary, suite.test_type)
                     for block, item, html in reporter.get_divs(suite, job, rstorage):
                         items[block][item].append(html)
                 if DEBUG:
                     break
 
             for reporter in reporters:
+                logger.debug("Start reporter %s on suite %s", reporter.__class__.__name__, suite.test_type)
                 for block, item, html in reporter.get_divs(suite, rstorage):
                     items[block][item].append(html)
 
             if DEBUG:
                 break
 
+        logger.debug("Generating result html")
+
         for idx_1st, menu_1st in enumerate(sorted(items, key=lambda x: menu_1st_order.index(x))):
             menu_block.append(
                 '<a href="#item{}" class="nav-group" data-toggle="collapse" data-parent="#MainMenu">{}</a>'
diff --git a/wally/result_classes.py b/wally/result_classes.py
index 43ae721..1fbd094 100644
--- a/wally/result_classes.py
+++ b/wally/result_classes.py
@@ -1,5 +1,6 @@
 import abc
-from typing import Dict, List, Any, Optional, Tuple, cast, Type, Iterator
+import copy
+from typing import Dict, List, Any, Optional, Tuple, cast, Type, Iterator, NamedTuple
 
 
 import numpy
@@ -106,12 +107,13 @@
     def __init__(self,
                  name: str,
                  raw: Optional[bytes],
-                 data: numpy.array,
-                 times: numpy.array,
+                 data: numpy.ndarray,
+                 times: numpy.ndarray,
                  units: str,
                  source: DataSource,
                  time_units: str = 'us',
-                 raw_tag: str = 'txt') -> None:
+                 raw_tag: str = 'txt',
+                 histo_bins: numpy.ndarray = None) -> None:
 
         # Sensor name. Typically DEV_NAME.METRIC
         self.name = name
@@ -130,6 +132,7 @@
         self.raw = raw
         self.raw_tag = raw_tag
         self.source = source
+        self.histo_bins = histo_bins
 
     def __str__(self) -> str:
         res = "TS({}):\n".format(self.name)
@@ -141,6 +144,9 @@
     def __repr__(self) -> str:
         return str(self)
 
+    def copy(self) -> 'TimeSeries':
+        return copy.copy(self)
+
 
 # (node_name, source_dev, metric_name) => metric_results
 JobMetrics = Dict[Tuple[str, str, str], TimeSeries]
@@ -259,6 +265,12 @@
         self.processed = None  # type: JobStatMetrics
 
 
+ArrayData = NamedTuple("ArrayData",
+                       [('header', List[str]),
+                        ('histo_bins', Optional[numpy.ndarray]),
+                        ('data', Optional[numpy.ndarray])])
+
+
 class IResultStorage(metaclass=abc.ABCMeta):
 
     @abc.abstractmethod
@@ -266,10 +278,18 @@
         pass
 
     @abc.abstractmethod
+    def append_sensor(self, data: numpy.array, ds: DataSource, units: str, histo_bins: numpy.ndarray = None) -> None:
+        pass
+
+    @abc.abstractmethod
     def load_sensor(self, ds: DataSource) -> TimeSeries:
         pass
 
     @abc.abstractmethod
+    def iter_sensors(self, ds: DataSource) -> Iterator[TimeSeries]:
+        pass
+
+    @abc.abstractmethod
     def put_or_check_suite(self, suite: SuiteConfig) -> None:
         pass
 
diff --git a/wally/run_test.py b/wally/run_test.py
index d5f7d19..f8dc036 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -146,7 +146,7 @@
 class SaveNodesStage(Stage):
     """Save nodes list to file"""
 
-    priority = StepOrder.CONNECT
+    priority = StepOrder.UPDATE_NODES_INFO + 1
 
     def run(self, ctx: TestRun) -> None:
         ctx.storage.put_list(ctx.nodes_info.values(), 'all_nodes')
diff --git a/wally/sensors.py b/wally/sensors.py
index c19c350..1faa03b 100644
--- a/wally/sensors.py
+++ b/wally/sensors.py
@@ -4,14 +4,14 @@
 
 import numpy
 
+from cephlib import sensors_rpc_plugin
+
 from . import utils
 from .test_run_class import TestRun
 from .result_classes import DataSource
 from .stage import Stage, StepOrder
 from .hlstorage import ResultStorage
 
-from cephlib import sensors_rpc_plugin
-
 
 plugin_fname = sensors_rpc_plugin.__file__.rsplit(".", 1)[0] + ".py"
 SENSORS_PLUGIN_CODE = open(plugin_fname, "rb").read()  # type: bytes
@@ -21,16 +21,24 @@
 
 
 sensor_units = {
-    "system-cpu.idle_time": "ms",
-    "system-cpu.nice_processes": "",
+    "system-cpu.idle": "",
+    "system-cpu.nice": "",
+    "system-cpu.user": "",
+    "system-cpu.sys": "",
+    "system-cpu.iowait": "",
+    "system-cpu.irq": "",
+    "system-cpu.sirq": "",
+    "system-cpu.steal": "",
+    "system-cpu.guest": "",
+
     "system-cpu.procs_blocked": "",
     "system-cpu.procs_queue_x10": "",
-    "system-cpu.system_processes": "",
-    "system-cpu.user_processes": "",
+
     "net-io.recv_bytes": "B",
     "net-io.recv_packets": "",
     "net-io.send_bytes": "B",
     "net-io.send_packets": "",
+
     "block-io.io_queue": "",
     "block-io.io_time": "ms",
     "block-io.reads_completed": "",
@@ -69,9 +77,9 @@
 
         for name, val in ctx.config.sensors.roles_mapping.raw().items():
             if isinstance(val, str):
-                val = {vl.strip(): ".*" for vl in val.split(",")}
+                val = {vl.strip(): (".*" if vl.strip() != 'ceph' else {}) for vl in val.split(",")}
             elif isinstance(val, list):
-                val = {vl: ".*" for vl in val}
+                val = {vl: (".*" if vl != 'ceph' else {}) for vl in val}
             per_role_config[name] = val
 
         if 'all' in per_role_config:
@@ -96,7 +104,7 @@
                 # ceph requires additional settings
                 if 'ceph' in node_cfg:
                     node_cfg['ceph'].update(node.info.params['ceph'])
-                    node_cfg['ceph']['osds'] = [osd.id for osd in node.info.params['ceph-osds']]  # type: ignore
+                    node_cfg['ceph']['osds'] = [osd['id'] for osd in node.info.params['ceph-osds']]  # type: ignore
 
                 logger.debug("Setting up sensors RPC plugin for node %s", nid)
                 node.upload_plugin("sensors", SENSORS_PLUGIN_CODE)
@@ -128,6 +136,7 @@
                         logger.warning("Raw sensors data at path %r and, maybe, others are skipped", path)
                     raw_skipped = True
                     continue
+
                 if path == 'collected_at':
                     ds = DataSource(node_id=node_id, metric='collected_at')
                     units = 'us'
@@ -135,6 +144,7 @@
                     sensor, dev, metric = path.split(".")
                     ds = DataSource(node_id=node_id, metric=metric, dev=dev, sensor=sensor)
                     units = sensor_units["{}.{}".format(sensor, metric)]
+
                 rstorage.append_sensor(numpy.array(value), ds, units)
 
 
diff --git a/wally/stage.py b/wally/stage.py
index 95542f3..14e80ae 100644
--- a/wally/stage.py
+++ b/wally/stage.py
@@ -9,10 +9,11 @@
     DISCOVER = 0
     SPAWN = 10
     CONNECT = 20
-    START_SENSORS = 30
-    TEST = 40
-    COLLECT_SENSORS = 50
-    REPORT = 60
+    UPDATE_NODES_INFO = 30
+    START_SENSORS = 40
+    TEST = 50
+    COLLECT_SENSORS = 60
+    REPORT = 70
 
 
 class Stage(metaclass=abc.ABCMeta):
diff --git a/wally/statistic.py b/wally/statistic.py
index a256587..8543e0f 100644
--- a/wally/statistic.py
+++ b/wally/statistic.py
@@ -123,10 +123,12 @@
 
 
 def calc_histo_stat_props(ts: TimeSeries,
-                          bins_edges: numpy.array,
+                          bins_edges: numpy.array = None,
                           rebins_count: int = None,
                           tail: float = 0.005) -> HistoStatProps:
-    log_bins = False
+    if bins_edges is None:
+        bins_edges = ts.histo_bins
+
     res = HistoStatProps(ts.data)
 
     # summ across all series
@@ -286,27 +288,37 @@
 
 
 def hist_outliers_perc(bin_populations: numpy.array,
-                       bounds_perc: Tuple[float, float] = (0.01, 0.99)) -> Tuple[int, int]:
+                       bounds_perc: Tuple[float, float] = (0.01, 0.99),
+                       min_bins_left: int = None) -> Tuple[int, int]:
     assert len(bin_populations.shape) == 1
     total_count = bin_populations.sum()
     lower_perc = total_count * bounds_perc[0]
     upper_perc = total_count * bounds_perc[1]
-    return numpy.searchsorted(numpy.cumsum(bin_populations), [lower_perc, upper_perc])
+    idx1, idx2 = numpy.searchsorted(numpy.cumsum(bin_populations), [lower_perc, upper_perc])
+
+    # don't cut too many bins. At least min_bins_left must left
+    if min_bins_left is not None and idx2 - idx1 < min_bins_left:
+        missed = min_bins_left - (idx2 - idx1) // 2
+        idx2 = min(len(bin_populations), idx2 + missed)
+        idx1 = max(0, idx1 - missed)
+
+    return idx1, idx2
 
 
 def ts_hist_outliers_perc(bin_populations: numpy.array,
                           window_size: int = 10,
-                          bounds_perc: Tuple[float, float] = (0.01, 0.99)) -> Tuple[int, int]:
+                          bounds_perc: Tuple[float, float] = (0.01, 0.99),
+                          min_bins_left: int = None) -> Tuple[int, int]:
     assert len(bin_populations.shape) == 2
 
     points = list(range(0, len(bin_populations), window_size))
     if len(bin_populations) % window_size != 0:
         points.append(points[-1] + window_size)
 
-    ranges = []
+    ranges = []  # type: List[List[int]]
     for begin, end in zip(points[:-1], points[1:]):
         window_hist = bin_populations[begin:end].sum(axis=0)
-        ranges.append(hist_outliers_perc(window_hist, bounds_perc=bounds_perc))
+        ranges.append(hist_outliers_perc(window_hist, bounds_perc=bounds_perc, min_bins_left=min_bins_left))
 
     return min(i[0] for i in ranges), max(i[1] for i in ranges)
 
diff --git a/wally/storage.py b/wally/storage.py
index 0459305..aa90ac9 100644
--- a/wally/storage.py
+++ b/wally/storage.py
@@ -235,6 +235,7 @@
     def __init__(self, sstorage: ISimpleStorage, serializer: ISerializer) -> None:
         self.sstorage = sstorage
         self.serializer = serializer
+        self.cache = {}
 
     def sub_storage(self, *path: str) -> 'Storage':
         fpath = "/".join(path)
@@ -293,13 +294,17 @@
 
     def load_list(self, obj_class: Type[ObjClass], *path: str) -> List[ObjClass]:
         path_s = "/".join(path)
-        raw_val = cast(List[Dict[str, Any]], self.get(path_s))
-        assert isinstance(raw_val, list)
-        return [cast(ObjClass, obj_class.fromraw(val)) for val in raw_val]
+        if path_s not in self.cache:
+            raw_val = cast(List[Dict[str, Any]], self.get(path_s))
+            assert isinstance(raw_val, list)
+            self.cache[path_s] = [cast(ObjClass, obj_class.fromraw(val)) for val in raw_val]
+        return self.cache[path_s]
 
     def load(self, obj_class: Type[ObjClass], *path: str) -> ObjClass:
         path_s = "/".join(path)
-        return cast(ObjClass, obj_class.fromraw(self.get(path_s)))
+        if path_s not in self.cache:
+            self.cache[path_s] = cast(ObjClass, obj_class.fromraw(self.get(path_s)))
+        return self.cache[path_s]
 
     def sync(self) -> None:
         self.sstorage.sync()
diff --git a/wally/suits/io/ceph.cfg b/wally/suits/io/ceph.cfg
index a44c749..22a4937 100644
--- a/wally/suits/io/ceph.cfg
+++ b/wally/suits/io/ceph.cfg
@@ -1,12 +1,13 @@
 [global]
 include defaults_qd.cfg
 
-QD_R={% 1, 5, 10, 15, 25, 40, 80, 120 %}
-QD_W={% 1, 5, 10, 15, 25, 40 %}
-QD_SEQ_R={% 1, 3, 10 %}
-QD_SEQ_W={% 1, 2, 4 %}
+# QD_R={% 1, 5, 10, 15, 25, 40, 80, 120 %}
+QD_R={% 1, 5, 10, 20, 40, 60, 100, 150, 200, 500 %}
+QD_W={% 1, 5, 10, 20, 40, 60, 100, 150, 200 %}
+QD_SEQ_R=1
+QD_SEQ_W=1
 
-ramp_time=30
+ramp_time=15
 runtime=180
 
 # ---------------------------------------------------------------------
@@ -30,12 +31,12 @@
 # ---------------------------------------------------------------------
 # sync write
 # ---------------------------------------------------------------------
-[ceph_{TEST_SUMM}]
-blocksize=4k
-rw=randwrite
-direct=1
-sync=1
-numjobs=1
+#[ceph_{TEST_SUMM}]
+#blocksize=4k
+#rw=randwrite
+#direct=1
+#sync=1
+#numjobs=1
 
 # ---------------------------------------------------------------------
 # this is essentially sequential write operations
diff --git a/wally/suits/io/fio.py b/wally/suits/io/fio.py
index 16da091..1895b6c 100644
--- a/wally/suits/io/fio.py
+++ b/wally/suits/io/fio.py
@@ -13,7 +13,7 @@
 from ..job import JobConfig
 from .fio_task_parser import execution_time, fio_cfg_compile, FioJobConfig, FioParams, get_log_files
 from . import rpc_plugin
-from .fio_hist import expected_lat_bins
+from .fio_hist import get_lat_vals, expected_lat_bins
 
 
 logger = logging.getLogger("wally")
@@ -223,9 +223,10 @@
                             vals = [int(i.strip()) for i in rest]
 
                             if len(vals) != expected_lat_bins:
-                                logger.error("Expect {} bins in latency histogram, but found {} at time {}"
-                                             .format(expected_lat_bins, len(vals), time_ms_s))
-                                raise StopTestError()
+                                msg = "Expect {} bins in latency histogram, but found {} at time {}" \
+                                             .format(expected_lat_bins, len(vals), time_ms_s)
+                                logger.error(msg)
+                                raise StopTestError(msg)
 
                             parsed.append(vals)
                         else:
@@ -237,13 +238,16 @@
             if not self.suite.keep_raw_files:
                 raw_result = None
 
+            histo_bins = None if name != 'lat' else numpy.array(get_lat_vals())
+
             result.append(TimeSeries(name=name,
                                      raw=raw_result,
                                      data=numpy.array(parsed, dtype='uint64'),
                                      units=units,
                                      times=numpy.array(times, dtype='uint64'),
                                      time_units='ms',
-                                     source=path(metric=name, tag='csv')))
+                                     source=path(metric=name, tag='csv'),
+                                     histo_bins=histo_bins))
         return result
 
     def format_for_console(self, data: Any) -> str:
diff --git a/wally/suits/io/verify.cfg b/wally/suits/io/verify.cfg
index 58a94b0..0d85f94 100644
--- a/wally/suits/io/verify.cfg
+++ b/wally/suits/io/verify.cfg
@@ -1,8 +1,9 @@
 [global]
 include defaults_qd.cfg
-QD={% 1, 2, 4 %}
-runtime=30
+QD={% 32, 64, 128, 256 %}
+runtime=600
 direct=1
+ramp_time=30
 
 # ---------------------------------------------------------------------
 
diff --git a/wally/test_run_class.py b/wally/test_run_class.py
index fea846d..c4b5bc5 100644
--- a/wally/test_run_class.py
+++ b/wally/test_run_class.py
@@ -41,12 +41,13 @@
     def get_pool(self):
         return ThreadPoolExecutor(self.config.get('worker_pool_sz', 32))
 
-    def merge_node(self, creds: ConnCreds, roles: Set[str]) -> NodeInfo:
-        info = NodeInfo(creds, roles)
+    def merge_node(self, creds: ConnCreds, roles: Set[str], **params) -> NodeInfo:
+        info = NodeInfo(creds, roles, params)
         nid = info.node_id
 
         if nid in self.nodes_info:
             self.nodes_info[nid].roles.update(info.roles)
+            self.nodes_info[nid].params.update(info.params)
             return self.nodes_info[nid]
         else:
             self.nodes_info[nid] = info
diff --git a/wally/texttable.py b/wally/texttable.py
index 1917663..504b04d 100644
--- a/wally/texttable.py
+++ b/wally/texttable.py
@@ -306,6 +306,7 @@
         cells = []
         for i, x in enumerate(array):
             cells.append(self._str(i, x))
+
         self._rows.append(cells)
 
     def add_rows(self, rows, header=True):
@@ -366,6 +367,9 @@
             i - index of the cell datatype in self._dtype
             x - cell data to format
         """
+        if isinstance(x, str):
+            return x
+
         try:
             f = float(x)
         except:
diff --git a/wally/utils.py b/wally/utils.py
index 5904aa7..151ed5f 100644
--- a/wally/utils.py
+++ b/wally/utils.py
@@ -462,7 +462,7 @@
     if len(units) > 1 and units[0] in _coefs:
         return _coefs[units[0]], units[1:]
     else:
-        return Fraction(1), units
+        return 1, units
 
 
 def unit_conversion_coef(from_unit: str, to_unit: str) -> Union[Fraction, int]:
@@ -471,10 +471,18 @@
 
     assert u1 == u2, "Can't convert {!r} to {!r}".format(from_unit, to_unit)
 
-    if isinstance(f1, int) and isinstance(f2, int) and f1 % f2 != 0:
-        return Fraction(f1, f2)
+    if isinstance(f1, int) and isinstance(f2, int):
+        if f1 % f2 != 0:
+            return Fraction(f1, f2)
+        else:
+            return f1 // f2
 
-    return f1 // f2
+    res = f1 / f2
+
+    if isinstance(res, Fraction) and cast(Fraction, res).denominator == 1:
+        return cast(Fraction, res).numerator
+
+    return res
 
 
 def shape2str(shape: Iterable[int]) -> str: