fixes, fixes
diff --git a/clib/interpolate.cpp b/clib/interpolate.cpp
index a388ec1..c7f2b50 100644
--- a/clib/interpolate.cpp
+++ b/clib/interpolate.cpp
@@ -1,24 +1,33 @@
 #include <algorithm>
 #include <cstdint>
-#include <cstdio>
+//#include <cstdio>
 
 
-extern "C" void interpolate_ts_on_seconds_border(unsigned int input_size,
-                                                 unsigned int output_size,
-                                                 const uint64_t * times,
-                                                 const uint64_t * values,
-                                                 unsigned int time_step,
-                                                 uint64_t * output)
+uint64_t round(double x) {
+    return (uint64_t)(x + 0.5);
+}
+
+
+extern "C"
+unsigned int interpolate_ts_on_seconds_border(unsigned int input_size,
+                                              unsigned int output_size,
+                                              const uint64_t * times,
+                                              const uint64_t * values,
+                                              unsigned int time_step,
+                                              uint64_t * output)
 {
-    auto output_end = (*times / time_step) * time_step;
-    auto output_begin = output_end - time_step;
+    auto output_begin = *times - time_step;
+    auto output_end = *times;
+
+    auto input_begin = *times - time_step;
+    auto input_end = *times;
+
     auto output_cell = output;
 
     auto input_cell = values;
-    auto input_time = times;
     auto input_val = *input_cell;
-    auto input_begin = *input_time - time_step;
-    auto input_end = *input_time;
+
+    auto input_time = times;
     auto rate = ((double)*input_cell) / (input_end - input_begin);
 
     // output array mush fully cover input array
@@ -28,7 +37,7 @@
 
         // add intersection slice to output array
         if(intersection > 0) {
-            auto slice = (uint64_t)(intersection * rate);
+            auto slice = std::min(input_val, round(intersection * rate));
             *output_cell += slice;
             input_val -= slice;
         }
@@ -41,7 +50,13 @@
             ++input_time;
 
             if(input_time == times + input_size)
-                return;
+                return output_cell - output + 1;
+
+            if (output_end == input_end) {
+                ++output_cell;
+                output_begin = output_end;
+                output_end += time_step;
+            }
 
             input_val = *input_cell;
             input_begin = input_end;
@@ -53,6 +68,7 @@
             output_end += time_step;
         }
     }
+    return output_size;
 }
 
 
@@ -78,3 +94,44 @@
 
     return output_size;
 }
+
+
+extern "C" int interpolate_ts_on_seconds_border_fio(unsigned int input_size,
+                                                    unsigned int output_size,
+                                                    const uint64_t * times,
+                                                    unsigned int time_step,
+                                                    uint64_t * output_idx,
+                                                    uint64_t empty_cell_placeholder)
+{
+    auto input_end = times + input_size;
+    auto output_end = output_idx + output_size;
+    float no_step = time_step * 0.1;
+    float more_then_step = time_step * 1.9;
+    float step_min = time_step * 0.9;
+    float step_max = time_step * 1.1;
+
+    auto cinput = times;
+    auto ctime_val = *cinput - time_step;
+
+    for(; output_idx < output_end; ++output_idx) {
+
+        // skip repetition of same time
+        while(*cinput - ctime_val <= no_step and cinput < input_end)
+            ++cinput;
+
+        if (cinput == input_end)
+            break;
+
+        auto dt = *cinput - ctime_val;
+        if (dt <= step_max and dt > step_min) {
+            *output_idx = cinput - times;
+            ctime_val += time_step;
+        } else if (dt >= more_then_step) {
+            *output_idx = empty_cell_placeholder;
+            ctime_val += time_step;
+        } else
+            return -(int)(cinput - times);
+    }
+
+    return output_size - (output_end - output_idx);
+}
\ No newline at end of file
diff --git a/configs-examples/default.yaml b/configs-examples/default.yaml
index 01551ee..843a672 100644
--- a/configs-examples/default.yaml
+++ b/configs-examples/default.yaml
@@ -37,6 +37,7 @@
 include: logging.yaml
 default_test_local_folder: "/tmp/wally_{name}_{uuid}"
 keep_raw_files: true
+download_rpc_logs: true
 
 vm_configs:
     keypair_file_private: wally_vm_key_perf3.pem
diff --git a/configs-examples/perf_lab.yml b/configs-examples/perf_lab.yml
index ddb05be..eae4c5b 100644
--- a/configs-examples/perf_lab.yml
+++ b/configs-examples/perf_lab.yml
@@ -8,12 +8,14 @@
 
 nodes:
     root@cz7625: testnode
+    root@cz7626: testnode
+    root@cz7627: testnode
 
-sleep: 5
+# sleep: 5
 
 tests:
   - fio:
       load: verify
       params:
           FILENAME: /dev/rbd0
-          FILESIZE: 100G
+          FILESIZE: 700G
diff --git a/requirements.txt b/requirements.txt
index 2162299..38c8b79 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -23,4 +23,4 @@
 psutil
 seaborn
 pytest
-
+statsmodels
diff --git a/tests/test_math.py b/tests/test_math.py
index 9298792..9cebffd 100644
--- a/tests/test_math.py
+++ b/tests/test_math.py
@@ -1,4 +1,7 @@
 import numpy
+import pytest
+
+
 from wally.statistic import rebin_histogram
 from wally.result_classes import DataSource, TimeSeries
 from wally.data_selectors import c_interpolate_ts_on_seconds_border
@@ -105,6 +108,26 @@
             assert ts2_min <= ts_sum <= ts2_max, "NOT {} <= {} <= {}".format(ts2_min, ts_sum, ts2_max)
 
 
+def test_interpolate2():
+    ds = DataSource(node_id=NODE_ID, sensor=SENSOR, dev=DEV, metric=METRIC)
+    samples = 5
+    ms_coef = 1000
+
+    source_times = numpy.arange(samples, dtype='uint64') * ms_coef + ms_coef + 347
+    source_values = numpy.random.randint(10, 1000, size=samples, dtype='uint64')
+
+    ts = TimeSeries("test", raw=None, data=source_values, times=source_times, units=DATA_UNITS,
+                    source=ds, time_units=TIME_UNITS)
+
+    ts2 = c_interpolate_ts_on_seconds_border(ts, nc=True)
+
+    assert ts.time_units == 'ms'
+    assert ts2.time_units == 's'
+    assert ts2.times.dtype == ts.times.dtype
+    assert ts2.data.dtype == ts.data.dtype
+    assert (ts2.data == ts.data).all()
+
+
 def test_interpolate_qd():
     ds = DataSource(node_id=NODE_ID, sensor=SENSOR, dev=DEV, metric=METRIC)
     samples = 200
@@ -120,14 +143,13 @@
         ts = TimeSeries("test", raw=None, data=source_values, times=source_times, units=DATA_UNITS,
                         source=ds, time_units=TIME_UNITS)
 
-        ts2 = c_interpolate_ts_on_seconds_border(ts, nc=True, qd=True)
+        ts2 = c_interpolate_ts_on_seconds_border(ts, nc=True, tp='qd')
 
         assert ts.time_units == 'ms'
         assert ts2.time_units == 's'
         assert ts2.times.dtype == ts.times.dtype
         assert ts2.data.dtype == ts.data.dtype
         assert ts2.data.size == ts2.times.size
-        assert abs(ts2.data.size - ts.data.size) <= 1
 
         coef = unit_conversion_coef(ts2.time_units, ts.time_units)
         assert isinstance(coef, int)
@@ -136,3 +158,52 @@
 
         idxs = numpy.searchsorted(ts.times, ts2.times * coef - dtime)
         assert (ts2.data == ts.data[idxs]).all()
+
+
+def test_interpolate_fio():
+    ds = DataSource(node_id=NODE_ID, sensor=SENSOR, dev=DEV, metric=METRIC)
+    ms_coef = 1000
+    s_offset = 377 * ms_coef
+    gap_start = 5
+    gap_size = 5
+    full_size = 15
+
+    times = list(range(gap_start)) + list(range(gap_start + gap_size, full_size))
+    src_times = numpy.array(times, dtype='uint64') * ms_coef + s_offset
+    src_values = numpy.random.randint(10, 100, size=len(src_times), dtype='uint64')
+
+    ts = TimeSeries("test", raw=None, data=src_values, times=src_times, units=DATA_UNITS,
+                    source=ds, time_units=TIME_UNITS)
+
+    ts2 = c_interpolate_ts_on_seconds_border(ts, nc=True, tp='fio')
+
+    assert ts.time_units == 'ms'
+    assert ts2.time_units == 's'
+    assert ts2.times.dtype == ts.times.dtype
+    assert ts2.data.dtype == ts.data.dtype
+    assert ts2.times[0] == ts.times[0] // ms_coef
+    assert ts2.times[-1] == ts.times[-1] // ms_coef
+    assert ts2.data.size == ts2.times.size
+
+    expected_times = numpy.arange(ts.times[0] // ms_coef, ts.times[-1] // ms_coef + 1, dtype='uint64')
+    assert ts2.times.size == expected_times.size
+    assert (ts2.times == expected_times).all()
+
+    assert (ts2.data[:gap_start] == ts.data[:gap_start]).all()
+    assert (ts2.data[gap_start:gap_start + gap_size] == 0).all()
+    assert (ts2.data[gap_start + gap_size:] == ts.data[gap_start:]).all()
+
+
+def test_interpolate_fio_negative():
+    ds = DataSource(node_id=NODE_ID, sensor=SENSOR, dev=DEV, metric=METRIC)
+    ms_coef = 1000
+    s_offset = 377 * ms_coef
+
+    src_times = (numpy.array([1, 2, 3, 4.5, 5, 6, 7]) * ms_coef + s_offset).astype('uint64')
+    src_values = numpy.random.randint(10, 100, size=len(src_times), dtype='uint64')
+
+    ts = TimeSeries("test", raw=None, data=src_values, times=src_times, units=DATA_UNITS,
+                    source=ds, time_units=TIME_UNITS)
+
+    with pytest.raises(ValueError):
+        c_interpolate_ts_on_seconds_border(ts, nc=True, tp='fio')
diff --git a/v2_plans.md b/v2_plans.md
index 2e69de3..a2d5244 100644
--- a/v2_plans.md
+++ b/v2_plans.md
@@ -1,14 +1,41 @@
 TODO today:
 -----------
 
-* Добавить логирование скачивания сенсоров с размером скачанных данных
-* Почистить логи агента
-* Проверить/унифицировать генерацию картинок
-* Добавить тест дикки-фуллера
-* Генерировать суммарный отчет
-* Почистить таблицу потребления ресурсов, добавить в нее тест дикки-фуллера
-* Построить чарт потребления ресурсов
-* Изменить интерполяцию для iotime, что бы не скрывать 100% для журнала
+* Хранить OSD config отдельно в json для ускорения дампа/загрузки
+* Поправить границы heatmap - для QD они должны проходить по нечетным
+  числам. Верхняя должна включать часть предыдущего интервала, etc
+* Маркать девайсы на нодах по ролям при диагностике нод
+* Собирать idle load. Отдельная стадия типа sleep после установки
+  сенсоров, которая сохраняет время начала и конца слипа
+* Проверить и унифицировать все кеши. Отдельно поиск TS, который всегда
+  выдает Union[DataSource, Iterable[DataSource]]. Отдельно одна
+  унифицированная функция загрузки/постобработки, в которой все базовые
+  кеши. Дополнительно слой постобработки(агрегация по ролям, девайсам,
+  CPU) со своими кешами. Хранить original в TS.
+* Попробовать перевести 16m randwr в линейную операцию с
+  предрасчитанными смещениями? offset, offset_increment
+* Текстовый репорт поломан
+* Проверять опции дебага на сефе/уменьшать их для теста
+* Подписи к IOPS vs QD не влезают, если данных много, нужно наклонить
+* Построить resource consumption vs. QD
+* Не показывать девиацию для слишком маленьких значений
+* Добавить io_wait на графики CPU
+* Отсортировать репорты по типу и длинне очереди
+* Репорт криво генерируется
+* Что делать с дырой в данных от сенсоров при перезапуске теста
+* Убивать фоновые процессы агента при стопе работы. Сделать доп фу-цию,
+  в агенте, которая находит и убивает все дочерние процессы при запросе
+  стопа
+* починить текстовый репорт
+* bottleneck table
+* Рассмотреть pandas.dataframe как универсальный посредник для
+  ф-ций визуализации
+* Форматировать тики на графиках с помошью b2ssize/b2ssize_10
+* Сравнивать время на нодах и выдавать ошибку при его рассогласованности
+* scipy.stats.probplot - QQ plot
+* grid явно выставлять для выбранных графиков
+* Посмотреть почему тест дикки-фуллера так фигово работает
+* Генерировать суммарный отчет - 
 
 Wally состоит из частей, которые стоит разделить и унифицировать с другими тулами:
 ----------------------------------------------------------------------------------
@@ -57,7 +84,6 @@
 
 TODO next
 ---------
-* Тест дикки-фуллера для результатов
 * Merge FSStorage and serializer into ObjStorage, separate TSStorage.
 * Build WallyStorage on top of it, use only WallyStorage in code
 * check that OS key match what is stored on disk 
diff --git a/wally/data_selectors.py b/wally/data_selectors.py
index 66a6ee5..02d5075 100644
--- a/wally/data_selectors.py
+++ b/wally/data_selectors.py
@@ -1,7 +1,7 @@
 import ctypes
 import logging
 import os.path
-from typing import Tuple, List, Iterable, Iterator, Optional, Union
+from typing import Tuple, List, Iterable, Iterator, Optional, Union, Dict
 from fractions import Fraction
 
 import numpy
@@ -72,6 +72,20 @@
 
     tss = list(find_all_series(rstorage, suite, job, metric))
 
+    # TODO replace this with universal interpolator
+    # for ts in tss:
+    #     from_s = float(unit_conversion_coef('s', ts.time_units))
+    #     prev_time = ts.times[0]
+    #     res = [ts.data[0]]
+    #
+    #     for ln, (tm, val) in enumerate(zip(ts.times[1:], ts.data[1:]), 1):
+    #         assert tm > prev_time, "Failed tm > prev_time, src={}, ln={}".format(ts.source, ln)
+    #         while tm - prev_time > from_s * 1.2:
+    #             res.append(0)
+    #             prev_time += from_s
+    #         res.append(val)
+    #         prev_time = tm
+
     if len(tss) == 0:
         raise NameError("Can't found any TS for {},{},{}".format(suite, job, metric))
 
@@ -106,7 +120,15 @@
             raise ValueError(msg)
 
         # TODO: match times on different ts
-        agg_ts.data += ts.data
+        if abs(len(agg_ts.data) - len(ts.data)) > 1:
+            # import IPython
+            # IPython.embed()
+            pass
+        assert abs(len(agg_ts.data) - len(ts.data)) <= 1, \
+            "len(agg_ts.data)={}, len(ts.data)={}, need to be almost equals".format(len(agg_ts.data), len(ts.data))
+
+        mlen = min(len(agg_ts.data), len(ts.data))
+        agg_ts.data[:mlen] += ts.data[:mlen]
 
     return agg_ts
 
@@ -193,13 +215,14 @@
     return res_ts
 
 
-c_interp_func = None
+c_interp_func_agg = None
 c_interp_func_qd = None
+c_interp_func_fio = None
 
 
-def c_interpolate_ts_on_seconds_border(ts: TimeSeries, nc: bool = False, qd: bool = False) -> TimeSeries:
+def c_interpolate_ts_on_seconds_border(ts: TimeSeries, nc: bool = False, tp: str = 'agg') -> TimeSeries:
     "Interpolate time series to values on seconds borders"
-    key = (ts.source.tpl, qd)
+    key = (ts.source.tpl, tp)
     if not nc and key in interpolated_cache:
         return interpolated_cache[key].copy()
 
@@ -221,64 +244,86 @@
     assert rcoef >= 1 and isinstance(rcoef, int), "Incorrect conversion coef {!r}".format(rcoef)
     coef = int(rcoef)   # make typechecker happy
 
-    global c_interp_func
+    global c_interp_func_agg
     global c_interp_func_qd
+    global c_interp_func_fio
 
     uint64_p = ctypes.POINTER(ctypes.c_uint64)
 
-    if c_interp_func is None:
+    if c_interp_func_agg is None:
         dirname = os.path.dirname(os.path.dirname(wally.__file__))
         path = os.path.join(dirname, 'clib', 'libwally.so')
         cdll = ctypes.CDLL(path)
 
-        c_interp_func = cdll.interpolate_ts_on_seconds_border
-        c_interp_func.argtypes = [
-            ctypes.c_uint,  # input_size
-            ctypes.c_uint,  # output_size
-            uint64_p,  # times
-            uint64_p,  # values
-            ctypes.c_uint,  # time_scale_coef
-            uint64_p,  # output
-        ]
-        c_interp_func.restype = None
-
+        c_interp_func_agg = cdll.interpolate_ts_on_seconds_border
         c_interp_func_qd = cdll.interpolate_ts_on_seconds_border_qd
-        c_interp_func_qd.argtypes = [
-            ctypes.c_uint,  # input_size
-            ctypes.c_uint,  # output_size
-            uint64_p,  # times
-            uint64_p,  # values
-            ctypes.c_uint,  # time_scale_coef
-            uint64_p,  # output
-        ]
-        c_interp_func_qd.restype = ctypes.c_uint
+
+        for func in (c_interp_func_agg, c_interp_func_qd):
+            func.argtypes = [
+                ctypes.c_uint,  # input_size
+                ctypes.c_uint,  # output_size
+                uint64_p,  # times
+                uint64_p,  # values
+                ctypes.c_uint,  # time_scale_coef
+                uint64_p,  # output
+            ]
+            func.restype = ctypes.c_uint  # output array used size
+
+        c_interp_func_fio = cdll.interpolate_ts_on_seconds_border_fio
+        c_interp_func_fio.restype = ctypes.c_int
+        c_interp_func_fio.argtypes = [
+                ctypes.c_uint,  # input_size
+                ctypes.c_uint,  # output_size
+                uint64_p,  # times
+                ctypes.c_uint,  # time_scale_coef
+                uint64_p,  # output indexes
+                ctypes.c_uint64,  # empty placeholder
+            ]
 
     assert ts.data.dtype.name == 'uint64', "Data dtype for {}=={} != uint64".format(ts.source, ts.data.dtype.name)
     assert ts.times.dtype.name == 'uint64', "Time dtype for {}=={} != uint64".format(ts.source, ts.times.dtype.name)
 
     output_sz = int(ts.times[-1]) // coef - int(ts.times[0]) // coef + 2
-    # print("output_sz =", output_sz, "coef =", coef)
     result = numpy.zeros(output_sz, dtype=ts.data.dtype.name)
 
-    if qd:
-        func = c_interp_func_qd
-    else:
-        func = c_interp_func
+    if tp in ('qd', 'agg'):
+        func = c_interp_func_qd if tp == 'qd' else c_interp_func_agg
+        sz = func(ts.data.size,
+                  output_sz,
+                  ts.times.ctypes.data_as(uint64_p),
+                  ts.data.ctypes.data_as(uint64_p),
+                  coef,
+                  result.ctypes.data_as(uint64_p))
 
-    sz = func(ts.data.size,
-              output_sz,
-              ts.times.ctypes.data_as(uint64_p),
-              ts.data.ctypes.data_as(uint64_p),
-              coef,
-              result.ctypes.data_as(uint64_p))
-
-    if qd:
         result = result[:sz]
         output_sz = sz
-    else:
-        assert sz is None
 
-    rtimes = int(ts.times[0] // coef) + numpy.arange(output_sz, dtype=ts.times.dtype)
+        rtimes = int(ts.times[0] // coef) + numpy.arange(output_sz, dtype=ts.times.dtype)
+    else:
+        assert tp == 'fio'
+        ridx = numpy.zeros(output_sz, dtype=ts.times.dtype)
+        no_data = (output_sz + 1)
+        sz_or_err = c_interp_func_fio(ts.times.size,
+                                      output_sz,
+                                      ts.times.ctypes.data_as(uint64_p),
+                                      coef,
+                                      ridx.ctypes.data_as(uint64_p),
+                                      no_data)
+
+        if sz_or_err <= 0:
+            raise ValueError("Error in input array at index %s. %s", -sz_or_err, ts.source)
+
+        rtimes = int(ts.times[0] // coef) + numpy.arange(sz_or_err, dtype=ts.times.dtype)
+
+        empty = numpy.zeros(len(ts.histo_bins), dtype=ts.data.dtype) if ts.source.metric == 'lat' else 0
+        res = []
+        for idx in ridx[:sz_or_err]:
+            if idx == no_data:
+                res.append(empty)
+            else:
+                res.append(ts.data[idx])
+        result = numpy.array(res, dtype=ts.data.dtype)
+
     res_ts = TimeSeries(ts.name, None, result,
                         times=rtimes,
                         units=ts.units,
@@ -405,26 +450,38 @@
 
 
 qd_metrics = {'io_queue'}
+summ_sensors_cache = {}  # type: Dict[Tuple[Tuple[str, ...], str, str, Tuple[int, int], int], Optional[TimeSeries]]
 
 
 def summ_sensors(rstorage: ResultStorage,
                  roles: List[str],
                  sensor: str,
                  metric: str,
-                 time_range: Tuple[int, int]) -> Optional[TimeSeries]:
+                 time_range: Tuple[int, int],
+                 nc: bool = False) -> Optional[TimeSeries]:
+
+    key = (tuple(roles), sensor, metric, time_range, id(ResultStorage))
+    if not nc and key in summ_sensors_cache:
+        return summ_sensors_cache[key].copy()
 
     res = None  # type: Optional[TimeSeries]
     for node in find_nodes_by_roles(rstorage, roles):
         for _, ds in rstorage.iter_sensors(node_id=node.node_id, sensor=sensor, metric=metric):
             data = rstorage.load_sensor(ds)
-            data = c_interpolate_ts_on_seconds_border(data, qd=metric in qd_metrics)
+            data = c_interpolate_ts_on_seconds_border(data, 'qd' if metric in qd_metrics else 'agg')
             data = get_ts_for_time_range(data, time_range)
             if res is None:
                 res = data
                 res.data = res.data.copy()
             else:
                 res.data += data.data
-    return res
+
+    if not nc:
+        summ_sensors_cache[key] = res
+        if len(summ_sensors_cache) > 1024:
+            logger.warning("summ_sensors_cache cache too large %s > 1024", len(summ_sensors_cache))
+
+    return res if res is None else res.copy()
 
 
 def find_sensors_to_2d(rstorage: ResultStorage,
@@ -439,7 +496,7 @@
         for dev in devs:
             for _, ds in rstorage.iter_sensors(node_id=node.node_id, sensor=sensor, dev=dev, metric=metric):
                 data = rstorage.load_sensor(ds)
-                data = c_interpolate_ts_on_seconds_border(data, qd=metric in qd_metrics)
+                data = c_interpolate_ts_on_seconds_border(data, 'qd' if metric in qd_metrics else 'agg')
                 data = get_ts_for_time_range(data, time_range)
                 res.append(data.data)
     res2d = numpy.concatenate(res)
diff --git a/wally/html.py b/wally/html.py
index 553aff5..a1db5b3 100644
--- a/wally/html.py
+++ b/wally/html.py
@@ -22,7 +22,7 @@
     return '<img src="{}">'.format(link)
 
 
-def table(caption: str, headers: Optional[List[str]], data: List[List[str]]) -> str:
+def table(caption: str, headers: Optional[List[str]], data: List[List[str]], align: List[str] = None) -> str:
     doc = xmlbuilder3.XMLBuilder("table",
                                  **{"class": "table table-bordered table-striped table-condensed table-hover",
                                     "style": "width: auto;"})
@@ -35,10 +35,20 @@
                 for header in headers:
                     doc.th(header)
 
+    max_cols = max(len(line) for line in data if not isinstance(line, str))
+
     with doc.tbody:
         for line in data:
             with doc.tr:
-                for vl in line:
-                    doc.td(vl)
+                if isinstance(line, str):
+                    with doc.td(colspan=str(max_cols)):
+                        doc.center.b(line)
+                else:
+                    if align:
+                        for vl, col_align in zip(line, align):
+                            doc.td(vl, align=col_align)
+                    else:
+                        for vl in line:
+                            doc.td(vl)
 
     return xmlbuilder3.tostr(doc).split("\n", 1)[1]
\ No newline at end of file
diff --git a/wally/main.py b/wally/main.py
index 93922dd..a20ade5 100644
--- a/wally/main.py
+++ b/wally/main.py
@@ -292,6 +292,7 @@
         storage = make_storage(opts.data_dir, existing=True)
         config = storage.load(Config, 'config')
         stages.append(LoadStoredNodesStage())
+        stages.append(SaveNodesStage())
 
     elif opts.subparser_name == 'compare':
         # x = run_test.load_data_from_path(opts.data_path1)
@@ -320,11 +321,6 @@
         IPython.embed()
 
         return 0
-    # elif opts.subparser_name == 'jupyter':
-    #     with tempfile.NamedTemporaryFile() as fd:
-    #         fd.write(notebook_kern.replace("$STORAGE", opts.storage_dir))
-    #         subprocess.call("jupyter notebook ", shell=True)
-    #     return 0
     else:
         print("Subparser {!r} is not supported".format(opts.subparser_name))
         return 1
diff --git a/wally/report.py b/wally/report.py
index da81ab5..8c2c181 100644
--- a/wally/report.py
+++ b/wally/report.py
@@ -5,20 +5,19 @@
 from io import BytesIO
 from functools import wraps
 from collections import defaultdict
-from typing import Dict, Any, Iterator, Tuple, cast, List, Callable, Set
+from typing import Dict, Any, Iterator, Tuple, cast, List, Callable, Set, Optional, Union
 
 import numpy
 import scipy.stats
-
-# import matplotlib
-# matplotlib.use('GTKAgg')
-
+import matplotlib.style
 from matplotlib.figure import Figure
 import matplotlib.pyplot as plt
 from matplotlib import gridspec
+from statsmodels.tsa.stattools import adfuller
 
 from cephlib.common import float2str
 from cephlib.plot import plot_hmap_with_y_histo, hmap_from_2d
+import xmlbuilder3
 
 import wally
 
@@ -26,7 +25,6 @@
 from .stage import Stage, StepOrder
 from .test_run_class import TestRun
 from .hlstorage import ResultStorage
-from .node_interfaces import NodeInfo
 from .utils import b2ssize, b2ssize_10, STORAGE_ROLES, unit_conversion_coef
 from .statistic import (calc_norm_stat_props, calc_histo_stat_props, moving_average, moving_dev,
                         hist_outliers_perc, find_ouliers_ts, approximate_curve)
@@ -34,7 +32,8 @@
 from .suits.io.fio import FioTest, FioJobConfig
 from .suits.io.fio_job import FioJobParams
 from .suits.job import JobConfig
-from .data_selectors import get_aggregated, AGG_TAG, summ_sensors, find_sensors_to_2d, find_nodes_by_roles
+from .data_selectors import (get_aggregated, AGG_TAG, summ_sensors, find_sensors_to_2d, find_nodes_by_roles,
+                             get_ts_for_time_range)
 
 
 with warnings.catch_warnings():
@@ -49,7 +48,6 @@
 
 
 DEBUG = False
-LARGE_BLOCKS = 256
 
 
 # ----------------  PROFILES  ------------------------------------------------------------------------------------------
@@ -69,12 +67,17 @@
     subinfo_alpha = 0.7
 
     imshow_colormap = None  # type: str
+    hmap_cmap = "Blues"
 
 
 default_format = 'svg'
+io_chart_format = 'svg'
 
 
 class StyleProfile:
+    default_style = 'seaborn-white'
+    io_chart_style = 'classic'
+
     dpi = 80
     grid = True
     tide_layout = False
@@ -84,6 +87,8 @@
     hm_x_slots = 25
     min_points_for_dev = 5
 
+    x_label_rotation = 35
+
     dev_range_x = 2.0
     dev_perc = 95
 
@@ -99,11 +104,12 @@
 
     # figure size in inches
     figsize = (8, 4)
-    figsize_long = (5.5, 3)
+    figsize_long = (8, 4)
+    qd_chart_inches = (16, 9)
 
     subplot_adjust_r = 0.75
-    subplot_adjust_r_no_leg = 0.9
-    title_font_size = 10
+    subplot_adjust_r_no_legend = 0.9
+    title_font_size = 12
 
     extra_io_spine = True
 
@@ -125,6 +131,7 @@
     qd_bins = [0, 1, 2, 4, 6, 8, 12, 16, 20, 26, 32, 40, 48, 56, 64, 96, 128]
     iotime_bins = list(range(0, 1030, 50))
     block_size_bins = [0, 2, 4, 8, 16, 32, 48, 64, 96, 128, 192, 256, 384, 512, 1024, 2048]
+    large_blocks = 256
 
 
 DefColorProfile = ColorProfile()
@@ -174,15 +181,27 @@
 
 # --------------  AGGREGATION AND STAT FUNCTIONS  ----------------------------------------------------------------------
 
-def make_iosum(rstorage: ResultStorage, suite: SuiteConfig, job: FioJobConfig) -> IOSummary:
+iosum_cache = {}  # type: Dict[Tuple[str, str]]
+
+
+def make_iosum(rstorage: ResultStorage, suite: SuiteConfig, job: FioJobConfig, nc: bool = False) -> IOSummary:
+    key = (suite.storage_id, job.storage_id)
+    if not nc and key in iosum_cache:
+        return iosum_cache[key]
+
     lat = get_aggregated(rstorage, suite, job, "lat")
     io = get_aggregated(rstorage, suite, job, "bw")
 
-    return IOSummary(job.qd,
-                     nodes_count=len(suite.nodes_ids),
-                     block_size=job.bsize,
-                     lat=calc_histo_stat_props(lat, rebins_count=StyleProfile.hist_boxes),
-                     bw=calc_norm_stat_props(io, StyleProfile.hist_boxes))
+    res = IOSummary(job.qd,
+                    nodes_count=len(suite.nodes_ids),
+                    block_size=job.bsize,
+                    lat=calc_histo_stat_props(lat, rebins_count=StyleProfile.hist_boxes),
+                    bw=calc_norm_stat_props(io, StyleProfile.hist_boxes))
+
+    if not nc:
+        iosum_cache[key] = res
+
+    return res
 
 
 def is_sensor_numarray(sensor: str, metric: str) -> bool:
@@ -204,6 +223,38 @@
     """Returns True if sensor provides deltas for cumulative value. E.g. io completed in given period"""
     return not is_level_sensor(sensor, metric)
 
+
+cpu_load_cache = {}  # type: Dict[Tuple[int, Tuple[str, ...], Tuple[int, int]], Dict[str, TimeSeries]]
+
+
+def get_cluster_cpu_load(rstorage: ResultStorage, roles: List[str],
+                         time_range: Tuple[int, int], nc: bool = False) -> Dict[str, TimeSeries]:
+
+    key = (id(rstorage), tuple(roles), time_range)
+    if not nc and key in cpu_load_cache:
+        return cpu_load_cache[key]
+
+    cpu_ts = {}
+    cpu_metrics = "idle guest iowait sirq nice irq steal sys user".split()
+    for name in cpu_metrics:
+        cpu_ts[name] = summ_sensors(rstorage, roles, sensor='system-cpu', metric=name, time_range=time_range)
+
+    it = iter(cpu_ts.values())
+    total_over_time = next(it).data.copy()  # type: numpy.ndarray
+    for ts in it:
+        if ts is not None:
+            total_over_time += ts.data
+
+    total = cpu_ts['idle'].copy(no_data=True)
+    total.data = total_over_time
+    cpu_ts['total'] = total
+
+    if not nc:
+        cpu_load_cache[key] = cpu_ts
+
+    return cpu_ts
+
+
 # --------------  PLOT HELPERS FUNCTIONS  ------------------------------------------------------------------------------
 
 def get_emb_image(fig: Figure, format: str, **opts) -> bytes:
@@ -226,6 +277,7 @@
         if not fpath:
             format = path.tag.split(".")[-1]
             fig = plt.figure(figsize=StyleProfile.figsize)
+            plt.style.use(StyleProfile.default_style)
             func(fig, *args, **kwargs)
             fpath = storage.put_plot_file(get_emb_image(fig, format=format, dpi=DefStyleProfile.dpi), path)
             logger.debug("Plot %s saved to %r", path, fpath)
@@ -234,7 +286,8 @@
     return closure1
 
 
-def apply_style(fig: Figure, title: str, style: StyleProfile, eng: bool = True, no_legend: bool = False) -> None:
+def apply_style(fig: Figure, title: str, style: StyleProfile, eng: bool = True,
+                no_legend: bool = False) -> None:
 
     for ax in fig.axes:
         ax.grid(style.grid)
@@ -246,7 +299,7 @@
         for ax in fig.axes:
             ax.legend(loc=legend_location, bbox_to_anchor=legend_bbox_to_anchor)
     else:
-        fig.subplots_adjust(right=StyleProfile.subplot_adjust_r_no_leg)
+        fig.subplots_adjust(right=StyleProfile.subplot_adjust_r_no_legend)
 
     if style.tide_layout:
         fig.set_tight_layout(True)
@@ -317,13 +370,36 @@
 
 
 @provide_plot
+def plot_simple_bars(fig: Figure,
+                     title: str,
+                     names: List[str],
+                     values: List[float],
+                     errs: List[float] = None,
+                     colors: ColorProfile = DefColorProfile,
+                     style: StyleProfile = DefStyleProfile) -> None:
+
+    ax = fig.add_subplot(111)
+    ind = numpy.arange(len(names))
+    width = 0.35
+    ax.barh(ind, values, width, xerr=errs)
+
+    ax.set_yticks(ind + width / 2)
+    ax.set_yticklabels(names)
+    ax.set_xlim(0, max(val + err for val, err in zip(values, errs)) * 1.1)
+
+    apply_style(fig, title, style, no_legend=True)
+    ax.axvline(x=1.0, color='r', linestyle='--', linewidth=1, alpha=0.5)
+    fig.subplots_adjust(left=0.2)
+
+
+@provide_plot
 def plot_hmap_from_2d(fig: Figure,
                       data2d: numpy.ndarray,
                       title: str, ylabel: str, xlabel: str = 'time, s', bins: numpy.ndarray = None,
                       colors: ColorProfile = DefColorProfile, style: StyleProfile = DefStyleProfile) -> None:
     fig.set_size_inches(*style.figsize_long)
     ioq1d, ranges = hmap_from_2d(data2d)
-    ax, _ = plot_hmap_with_y_histo(fig, ioq1d, ranges, bins=bins)
+    ax, _ = plot_hmap_with_y_histo(fig, ioq1d, ranges, bins=bins, cmap=colors.hmap_cmap)
     ax.set(ylabel=ylabel, xlabel=xlabel)
     apply_style(fig, title, style, no_legend=True)
 
@@ -468,15 +544,15 @@
         if style.legend_for_eng:
             legend_location = "center left"
             legend_bbox_to_anchor = (1.03, 0.81)
-            plt.legend([patches['cmeans'], patches['cmedians']], ["mean", "median"],
-                       loc=legend_location, bbox_to_anchor=legend_bbox_to_anchor)
+            ax.legend([patches['cmeans'], patches['cmedians']], ["mean", "median"],
+                      loc=legend_location, bbox_to_anchor=legend_bbox_to_anchor)
     else:
         ax.boxplot(agg_data, 0, '', positions=positions, labels=labels, widths=step / 4)
 
     ax.set_xlim(min(times), max(times))
     ax.set(ylabel=ylabel, xlabel="Time, seconds from test begin, sampled for ~{} seconds".format(int(step)))
-
     apply_style(fig, title, style, eng=True, no_legend=True)
+    fig.subplots_adjust(right=style.subplot_adjust_r)
 
 
 @provide_plot
@@ -588,10 +664,10 @@
 
     # --------------  MAGIC VALUES  ---------------------
     # IOPS bar width
-    width = 0.35
+    width = 0.2
 
     # offset from center of bar to deviation/confidence range indicator
-    err_x_offset = 0.05
+    err_x_offset = 0.03
 
     # extra space on top and bottom, comparing to maximal tight layout
     extra_y_space = 0.05
@@ -606,38 +682,25 @@
     legend_location = "center left"
     legend_bbox_to_anchor = (1.1, 0.81)
 
-    # plot box size adjust (only plot, not spines and legend)
-    plot_box_adjust = {'right': 0.66}
     # --------------  END OF MAGIC VALUES  ---------------------
 
+    matplotlib.style.use(style.io_chart_style)
+
     block_size = iosums[0].block_size
-    lc = len(iosums)
-    xt = list(range(1, lc + 1))
+    xpos = numpy.arange(1, len(iosums) + 1, dtype='uint')
 
-    # x coordinate of middle of the bars
-    xpos = [i - width / 2 for i in xt]
-
-    # import matplotlib.gridspec as gridspec
-    # gs = gridspec.GridSpec(1, 3, width_ratios=[1, 4, 1])
-    # p1 = plt.subplot(gs[1])
-
-    logger.warning("Check coef usage!")
     ax = fig.add_subplot(111)
 
-    # plot IOPS/BW bars
-    if block_size >= LARGE_BLOCKS:
-        iops_primary = False
-        coef = float(unit_conversion_coef(iosums[0].bw.units, "MiBps"))
-        ax.set_ylabel("BW (MiBps)")
-    else:
-        iops_primary = True
-        coef = float(unit_conversion_coef(iosums[0].bw.units, "MiBps")) / block_size
-        ax.set_ylabel("IOPS")
+    coef_mb = float(unit_conversion_coef(iosums[0].bw.units, "MiBps"))
+    coef_iops = float(unit_conversion_coef(iosums[0].bw.units, "KiBps")) / block_size
+
+    iops_primary = block_size < style.large_blocks
+
+    coef = coef_iops if iops_primary else coef_mb
+    ax.set_ylabel("IOPS" if iops_primary else "BW (MiBps)")
 
     vals = [iosum.bw.average * coef for iosum in iosums]
 
-    ax.bar(xpos, vals, width=width, color=colors.box_color, label=legend)
-
     # set correct x limits for primary IO spine
     min_io = min(iosum.bw.average - iosum.bw.deviation * style.dev_range_x for iosum in iosums)
     max_io = max(iosum.bw.average + iosum.bw.deviation * style.dev_range_x for iosum in iosums)
@@ -645,16 +708,20 @@
     io_lims = (min_io - border, max_io + border)
 
     ax.set_ylim(io_lims[0] * coef, io_lims[-1] * coef)
+    ax.bar(xpos - width / 2, vals, width=width, color=colors.box_color, label=legend)
 
     # plot deviation and confidence error ranges
     err1_legend = err2_legend = None
     for pos, iosum in zip(xpos, iosums):
-        err1_legend = ax.errorbar(pos + width / 2 - err_x_offset,
+        dev_bar_pos = pos - err_x_offset
+        err1_legend = ax.errorbar(dev_bar_pos,
                                   iosum.bw.average * coef,
                                   iosum.bw.deviation * style.dev_range_x * coef,
                                   alpha=colors.subinfo_alpha,
                                   color=colors.suppl_color1)  # 'magenta'
-        err2_legend = ax.errorbar(pos + width / 2 + err_x_offset,
+
+        conf_bar_pos = pos + err_x_offset
+        err2_legend = ax.errorbar(conf_bar_pos,
                                   iosum.bw.average * coef,
                                   iosum.bw.confidence * coef,
                                   alpha=colors.subinfo_alpha,
@@ -673,13 +740,43 @@
     ax2 = ax.twinx()
 
     # plot median and 95 perc latency
-    ax2.plot(xt, [iosum.lat.perc_50 for iosum in iosums], label="lat med")
-    ax2.plot(xt, [iosum.lat.perc_95 for iosum in iosums], label="lat 95%")
+    lat_coef_ms = float(unit_conversion_coef(iosums[0].lat.units, "ms"))
+    ax2.plot(xpos, [iosum.lat.perc_50 * lat_coef_ms for iosum in iosums], label="lat med")
+    ax2.plot(xpos, [iosum.lat.perc_95 * lat_coef_ms for iosum in iosums], label="lat 95%")
+
+    for grid_line in ax2.get_ygridlines():
+        grid_line.set_linestyle(":")
+
+    # extra y spine for BW/IOPS on left side
+    if style.extra_io_spine:
+        ax3 = ax.twinx()
+        if iops_log_spine:
+            ax3.set_yscale('log')
+
+        ax3.set_ylabel("BW (MiBps)" if iops_primary else "IOPS")
+        secondary_coef = coef_mb if iops_primary else coef_iops
+        ax3.set_ylim(io_lims[0] * secondary_coef, io_lims[1] * secondary_coef)
+        ax3.spines["left"].set_position(("axes", extra_io_spine_x_offset))
+        ax3.spines["left"].set_visible(True)
+        ax3.yaxis.set_label_position('left')
+        ax3.yaxis.set_ticks_position('left')
+    else:
+        ax3 = None
+
+    ax2.set_ylabel("Latency (ms)")
+
+    # legend box
+    handles2, labels2 = ax2.get_legend_handles_labels()
+    ax.legend(handles1 + handles2, labels1 + labels2,
+              loc=legend_location,
+              bbox_to_anchor=legend_bbox_to_anchor)
 
     # limit and label x spine
-    plt.xlim(extra_x_space, lc + extra_x_space)
-    plt.xticks(xt, ["{0} * {1}".format(iosum.qd, iosum.nodes_count) for iosum in iosums])
-    ax.set_xlabel("QD * Test node count")
+    ax.set_xlim(extra_x_space, len(iosums) + extra_x_space)
+    ax.set_xticks(xpos)
+    ax.set_xticklabels(["{0} * {1} = {2}".format(iosum.qd, iosum.nodes_count, iosum.qd * iosum.nodes_count)
+                       for iosum in iosums])
+    ax.set_xlabel("IO queue depth * test node count = total parallel requests")
 
     # apply log scales for X spines, if set
     if iops_log_spine:
@@ -688,36 +785,16 @@
     if lat_log_spine:
         ax2.set_yscale('log')
 
-    # extra y spine for BW/IOPS on left side
-    if style.extra_io_spine:
-        ax3 = ax.twinx()
-        if iops_log_spine:
-            ax3.set_yscale('log')
-
-        if iops_primary:
-            ax3.set_ylabel("BW (MiBps)")
-            ax3.set_ylim(io_lims[0] * coef, io_lims[1] * coef)
-        else:
-            ax3.set_ylabel("IOPS")
-            ax3.set_ylim(io_lims[0] * coef, io_lims[1] * coef)
-
-        ax3.spines["left"].set_position(("axes", extra_io_spine_x_offset))
-        ax3.spines["left"].set_visible(True)
-        ax3.yaxis.set_label_position('left')
-        ax3.yaxis.set_ticks_position('left')
-
-    ax2.set_ylabel("Latency (ms)")
-
-    # legend box
-    handles2, labels2 = ax2.get_legend_handles_labels()
-    plt.legend(handles1 + handles2, labels1 + labels2,
-               loc=legend_location,
-               bbox_to_anchor=legend_bbox_to_anchor)
-
     # adjust central box size to fit legend
-    # plt.subplots_adjust(**plot_box_adjust)
     apply_style(fig, title, style, eng=False, no_legend=True)
 
+    # override some styles
+    fig.set_size_inches(*style.qd_chart_inches)
+    fig.subplots_adjust(right=StyleProfile.subplot_adjust_r)
+
+    if style.extra_io_spine:
+        ax3.grid(False)
+
 
 #  --------------------  REPORT HELPERS --------------------------------------------------------------------------------
 
@@ -812,21 +889,25 @@
             ts_map[fjob_no_qd].append((suite, fjob))
 
         for tpl, suites_jobs in ts_map.items():
-            if len(suites_jobs) > StyleProfile.min_iops_vs_qd_jobs:
+            if len(suites_jobs) >= StyleProfile.min_iops_vs_qd_jobs:
+
                 iosums = [make_iosum(rstorage, suite, job) for suite, job in suites_jobs]
                 iosums.sort(key=lambda x: x.qd)
                 summary, summary_long = str_summary[tpl]
+
+                yield Menu1st.summary, Menu2ndSumm.io_lat_qd, \
+                    HTMLBlock(html.H2(html.center("IOPS, BW, Lat = func(QD). " + summary_long)))
+
                 ds = DataSource(suite_id=suite.storage_id,
                                 job_id=summary,
                                 node_id=AGG_TAG,
                                 sensor="fio",
                                 dev=AGG_TAG,
                                 metric="io_over_qd",
-                                tag="svg")
+                                tag=io_chart_format)
 
-                title = "IOPS, BW, Lat vs. QD.\n" + summary_long
-                fpath = io_chart(rstorage, ds, title=title, legend="IOPS/BW", iosums=iosums)  # type: str
-                yield Menu1st.summary, Menu2ndSumm.io_lat_qd, HTMLBlock(html.img(fpath))
+                fpath = io_chart(rstorage, ds, title="", legend="IOPS/BW", iosums=iosums)  # type: str
+                yield Menu1st.summary, Menu2ndSumm.io_lat_qd, HTMLBlock(html.center(html.img(fpath)))
 
 
 # Linearization report
@@ -834,7 +915,6 @@
     """Creates graphs, which show how IOPS and Latency depend on block size"""
 
 
-# IOPS/latency distribution
 class StatInfo(JobReporter):
     """Statistic info for job results"""
     suite_types = {'fio'}
@@ -845,17 +925,22 @@
         fjob = cast(FioJobConfig, job)
         io_sum = make_iosum(rstorage, suite, fjob)
 
-        summary_data = [
-            ["Summary", job.params.long_summary],
-        ]
-
-        res = html.H2(html.center("Test summary"))
-        res += html.table("Test info", None, summary_data)
-        stat_data_headers = ["Name", "Average ~ Dev", "Conf interval", "Mediana", "Mode", "Kurt / Skew", "95%", "99%"]
+        res = html.H2(html.center("Test summary - " + job.params.long_summary))
+        stat_data_headers = ["Name", "Average ~ Dev", "Conf interval", "Mediana", "Mode", "Kurt / Skew", "95%", "99%",
+                             "ADF test"]
 
         bw_target_units = 'Bps'
         bw_coef = float(unit_conversion_coef(io_sum.bw.units, bw_target_units))
 
+        adf_v, *_1, stats, _2 = adfuller(io_sum.bw.data)
+
+        for v in ("1%", "5%", "10%"):
+            if adf_v <= stats[v]:
+                ad_test = v
+                break
+        else:
+            ad_test = "Failed"
+
         bw_data = ["Bandwidth",
                    "{}{} ~ {}{}".format(b2ssize(io_sum.bw.average * bw_coef), bw_target_units,
                                         b2ssize(io_sum.bw.deviation * bw_coef), bw_target_units),
@@ -864,7 +949,8 @@
                    "-",
                    "{:.2f} / {:.2f}".format(io_sum.bw.kurt, io_sum.bw.skew),
                    b2ssize(io_sum.bw.perc_5 * bw_coef) + bw_target_units,
-                   b2ssize(io_sum.bw.perc_1 * bw_coef) + bw_target_units]
+                   b2ssize(io_sum.bw.perc_1 * bw_coef) + bw_target_units,
+                   ad_test]
 
         iops_coef = float(unit_conversion_coef(io_sum.bw.units, 'KiBps')) / fjob.bsize
         iops_data = ["IOPS",
@@ -875,7 +961,8 @@
                      "-",
                      "{:.2f} / {:.2f}".format(io_sum.bw.kurt, io_sum.bw.skew),
                      b2ssize_10(io_sum.bw.perc_5 * iops_coef) + "IOPS",
-                     b2ssize_10(io_sum.bw.perc_1 * iops_coef) + "IOPS"]
+                     b2ssize_10(io_sum.bw.perc_1 * iops_coef) + "IOPS",
+                     ad_test]
 
         lat_target_unit = 's'
         lat_coef = unit_conversion_coef(io_sum.lat.units, lat_target_unit)
@@ -887,76 +974,304 @@
                     "-",
                     "-",
                     b2ssize_10(io_sum.lat.perc_95 * lat_coef) + lat_target_unit,
-                    b2ssize_10(io_sum.lat.perc_99 * lat_coef) + lat_target_unit]
+                    b2ssize_10(io_sum.lat.perc_99 * lat_coef) + lat_target_unit,
+                    '-']
 
         # sensor usage
         stat_data = [iops_data, bw_data, lat_data]
-        res += html.table("Load stats info", stat_data_headers, stat_data)
+        res += html.center(html.table("Load stats info", stat_data_headers, stat_data))
+        yield Menu1st.per_job, job.summary, HTMLBlock(res)
 
-        resource_headers = ["Resource", "Usage count", "Proportional to work done"]
 
-        tot_io_coef = float(unit_conversion_coef(io_sum.bw.units, "KiBps"))
-        tot_ops_coef = tot_io_coef / fjob.bsize
+def avg_dev_div(vec: numpy.ndarray, denom: numpy.ndarray, avg_ranges: int = 10) -> Tuple[float, float]:
+    step = min(vec.size, denom.size) // avg_ranges
+    assert step >= 1
+    vals = []
 
-        io_transfered = io_sum.bw.data.sum() * tot_io_coef
-        resource_data = [
-            ["IO made", b2ssize_10(io_transfered * tot_ops_coef) + "OP", "-"],
-            ["Data transfered", b2ssize(io_transfered) + "B", "-"]
-        ]
+    whole_sum = denom.sum() / denom.size * step * 0.5
+    for i in range(0, avg_ranges):
+        s1 = denom[i * step: (i + 1) * step].sum()
+        if s1 >= whole_sum:
+            vals.append(vec[i * step: (i + 1) * step].sum() / s1)
 
-        storage = rstorage.storage
-        nodes = storage.load_list(NodeInfo, 'all_nodes')  # type: List[NodeInfo]
+    assert len(vals) > 1
+    return vec.sum() / denom.sum(), numpy.std(vals, ddof=1)
 
-        ops_done = io_transfered * tot_ops_coef
+
+class Resources(JobReporter):
+    """Statistic info for job results"""
+    suite_types = {'fio'}
+
+    def get_divs(self, suite: SuiteConfig, job: JobConfig,
+                 rstorage: ResultStorage) -> Iterator[Tuple[str, str, HTMLBlock]]:
+
+        fjob = cast(FioJobConfig, job)
+        io_sum = make_iosum(rstorage, suite, fjob)
+
+        tot_io_coef = float(unit_conversion_coef(io_sum.bw.units, "Bps"))
+        io_transfered = io_sum.bw.data * tot_io_coef
+        ops_done = io_transfered / (fjob.bsize * float(unit_conversion_coef("KiBps", "Bps")))
+
+        io_made = "Client IOP made"
+        data_tr = "Client data transfered"
+
+        records = {
+            io_made: (b2ssize_10(ops_done.sum()) + "OP", None, None),
+            data_tr: (b2ssize(io_transfered.sum()) + "B", None, None)
+        }  # type: Dict[str, Tuple[str, float, float]]
+
+        test_send = "Test nodes net send"
+        test_recv = "Test nodes net recv"
+        test_net = "Test nodes net total"
+        test_send_pkt = "Test nodes send pkt"
+        test_recv_pkt = "Test nodes recv pkt"
+        test_net_pkt = "Test nodes total pkt"
+
+        test_write = "Test nodes disk write"
+        test_read = "Test nodes disk read"
+        test_write_iop = "Test nodes write IOP"
+        test_read_iop = "Test nodes read IOP"
+        test_iop = "Test nodes IOP"
+        test_rw = "Test nodes disk IO"
+
+        storage_send = "Storage nodes net send"
+        storage_recv = "Storage nodes net recv"
+        storage_send_pkt = "Storage nodes send pkt"
+        storage_recv_pkt = "Storage nodes recv pkt"
+        storage_net = "Storage nodes net total"
+        storage_net_pkt = "Storage nodes total pkt"
+
+        storage_write = "Storage nodes disk write"
+        storage_read = "Storage nodes disk read"
+        storage_write_iop = "Storage nodes write IOP"
+        storage_read_iop = "Storage nodes read IOP"
+        storage_iop = "Storage nodes IOP"
+        storage_rw = "Storage nodes disk IO"
+
+        storage_cpu = "Storage nodes CPU"
+        storage_cpu_s = "Storage nodes CPU s/IOP"
+        storage_cpu_s_b = "Storage nodes CPU s/B"
 
         all_metrics = [
-            ("Test nodes net send", 'net-io', 'send_bytes', b2ssize, ['testnode'], "B", io_transfered),
-            ("Test nodes net recv", 'net-io', 'recv_bytes', b2ssize, ['testnode'], "B", io_transfered),
+            (test_send, 'net-io', 'send_bytes', b2ssize, ['testnode'], "B", io_transfered),
+            (test_recv, 'net-io', 'recv_bytes', b2ssize, ['testnode'], "B", io_transfered),
+            (test_send_pkt, 'net-io', 'send_packets', b2ssize_10, ['testnode'], "pkt", ops_done),
+            (test_recv_pkt, 'net-io', 'recv_packets', b2ssize_10, ['testnode'], "pkt", ops_done),
 
-            ("Test nodes disk write", 'block-io', 'sectors_written', b2ssize, ['testnode'], "B", io_transfered),
-            ("Test nodes disk read", 'block-io', 'sectors_read', b2ssize, ['testnode'], "B", io_transfered),
-            ("Test nodes writes", 'block-io', 'writes_completed', b2ssize_10, ['testnode'], "OP", ops_done),
-            ("Test nodes reads", 'block-io', 'reads_completed', b2ssize_10, ['testnode'], "OP", ops_done),
+            (test_write, 'block-io', 'sectors_written', b2ssize, ['testnode'], "B", io_transfered),
+            (test_read, 'block-io', 'sectors_read', b2ssize, ['testnode'], "B", io_transfered),
+            (test_write_iop, 'block-io', 'writes_completed', b2ssize_10, ['testnode'], "OP", ops_done),
+            (test_read_iop, 'block-io', 'reads_completed', b2ssize_10, ['testnode'], "OP", ops_done),
 
-            ("Storage nodes net send", 'net-io', 'send_bytes', b2ssize, STORAGE_ROLES, "B", io_transfered),
-            ("Storage nodes net recv", 'net-io', 'recv_bytes', b2ssize, STORAGE_ROLES, "B", io_transfered),
+            (storage_send, 'net-io', 'send_bytes', b2ssize, STORAGE_ROLES, "B", io_transfered),
+            (storage_recv, 'net-io', 'recv_bytes', b2ssize, STORAGE_ROLES, "B", io_transfered),
+            (storage_send_pkt, 'net-io', 'send_packets', b2ssize_10, STORAGE_ROLES, "OP", ops_done),
+            (storage_recv_pkt, 'net-io', 'recv_packets', b2ssize_10, STORAGE_ROLES, "OP", ops_done),
 
-            ("Storage nodes disk write", 'block-io', 'sectors_written', b2ssize, STORAGE_ROLES, "B", io_transfered),
-            ("Storage nodes disk read", 'block-io', 'sectors_read', b2ssize, STORAGE_ROLES, "B", io_transfered),
-            ("Storage nodes writes", 'block-io', 'writes_completed', b2ssize_10, STORAGE_ROLES, "OP", ops_done),
-            ("Storage nodes reads", 'block-io', 'reads_completed', b2ssize_10, STORAGE_ROLES, "OP", ops_done),
+            (storage_write, 'block-io', 'sectors_written', b2ssize, STORAGE_ROLES, "B", io_transfered),
+            (storage_read, 'block-io', 'sectors_read', b2ssize, STORAGE_ROLES, "B", io_transfered),
+            (storage_write_iop, 'block-io', 'writes_completed', b2ssize_10, STORAGE_ROLES, "OP", ops_done),
+            (storage_read_iop, 'block-io', 'reads_completed', b2ssize_10, STORAGE_ROLES, "OP", ops_done),
         ]
 
         all_agg = {}
 
-        for descr, sensor, metric, ffunc, roles, units, denom in all_metrics:
-            if not nodes:
-                continue
-
+        for vname, sensor, metric, ffunc, roles, units, service_provided_count in all_metrics:
             res_ts = summ_sensors(rstorage, roles, sensor=sensor, metric=metric, time_range=job.reliable_info_range_s)
             if res_ts is None:
                 continue
 
-            agg = res_ts.data.sum()
-            resource_data.append([descr, ffunc(agg) + units, "{:.1f}".format(agg / denom)])
-            all_agg[descr] = agg
+            data = res_ts.data
+            if units == "B":
+                data = data * float(unit_conversion_coef(res_ts.units, "B"))
+
+            records[vname] = (ffunc(data.sum()) + units, *avg_dev_div(data, service_provided_count))
+            all_agg[vname] = data
+
+        # cpu usage
+        nodes_count = len(list(find_nodes_by_roles(rstorage, STORAGE_ROLES)))
+        cpu_ts = get_cluster_cpu_load(rstorage, STORAGE_ROLES, job.reliable_info_range_s)
+
+        cpus_used_sec = (1.0 - cpu_ts['idle'].data / cpu_ts['total'].data) * nodes_count
+        used_s = b2ssize_10(cpus_used_sec.sum()) + 's'
+
+        all_agg[storage_cpu] = cpus_used_sec
+        records[storage_cpu_s] = (used_s, *avg_dev_div(cpus_used_sec, ops_done))
+        records[storage_cpu_s_b] = (used_s, *avg_dev_div(cpus_used_sec, io_transfered))
 
         cums = [
-            ("Test nodes writes", "Test nodes reads", "Total test ops", b2ssize_10, "OP", ops_done),
-            ("Storage nodes writes", "Storage nodes reads", "Total storage ops", b2ssize_10, "OP", ops_done),
-            ("Storage nodes disk write", "Storage nodes disk read", "Total storage IO size", b2ssize,
-             "B", io_transfered),
-            ("Test nodes disk write", "Test nodes disk read", "Total test nodes IO size", b2ssize, "B", io_transfered),
+            (test_iop, test_read_iop, test_write_iop, b2ssize_10, "OP", ops_done),
+            (test_rw, test_read, test_write, b2ssize, "B", io_transfered),
+            (test_net, test_send, test_recv, b2ssize, "B", io_transfered),
+            (test_net_pkt, test_send_pkt, test_recv_pkt, b2ssize_10, "pkt", ops_done),
+
+            (storage_iop, storage_read_iop, storage_write_iop, b2ssize_10, "OP", ops_done),
+            (storage_rw, storage_read, storage_write, b2ssize, "B", io_transfered),
+            (storage_net, storage_send, storage_recv, b2ssize, "B", io_transfered),
+            (storage_net_pkt, storage_send_pkt, storage_recv_pkt, b2ssize_10, "pkt", ops_done),
         ]
 
-        for name1, name2, descr, ffunc, units, denom in cums:
+        for vname, name1, name2, ffunc, units, service_provided_masked in cums:
             if name1 in all_agg and name2 in all_agg:
                 agg = all_agg[name1] + all_agg[name2]
-                resource_data.append([descr, ffunc(agg) + units, "{:.1f}".format(agg / denom)])
+                records[vname] = (ffunc(agg.sum()) + units, *avg_dev_div(agg, service_provided_masked))
 
-        res += html.table("Resources usage", resource_headers, resource_data)
+        table_structure = [
+            "Service provided",
+            (io_made, data_tr),
+            "Test nodes total load",
+            (test_send_pkt, test_send),
+            (test_recv_pkt, test_recv),
+            (test_net_pkt, test_net),
+            (test_write_iop, test_write),
+            (test_read_iop, test_read),
+            (test_iop, test_rw),
+            (test_iop, test_rw),
+            "Storage nodes resource consumed",
+            (storage_send_pkt, storage_send),
+            (storage_recv_pkt, storage_recv),
+            (storage_net_pkt, storage_net),
+            (storage_write_iop, storage_write),
+            (storage_read_iop, storage_read),
+            (storage_iop, storage_rw),
+            (storage_cpu_s, storage_cpu_s_b),
+        ]  # type: List[Union[str, Tuple[Optional[str], Optional[str]]]
 
-        yield Menu1st.per_job, job.summary, HTMLBlock(res)
+        yield Menu1st.per_job, job.summary, HTMLBlock(html.H2(html.center("Resources usage")))
+
+        doc = xmlbuilder3.XMLBuilder("table",
+                                     **{"class": "table table-bordered table-striped table-condensed table-hover",
+                                        "style": "width: auto;"})
+
+        with doc.thead:
+            with doc.tr:
+                [doc.th(header) for header in ["Resource", "Usage count", "To service"] * 2]
+
+        cols = 6
+
+        short_name = {
+            name: (name if name in {io_made, data_tr} else " ".join(name.split()[2:]).capitalize())
+            for name in records.keys()
+        }
+
+        short_name[storage_cpu_s] = "CPU (s/IOP)"
+        short_name[storage_cpu_s_b] = "CPU (s/B)"
+
+        with doc.tbody:
+            with doc.tr:
+                doc.td(colspan=str(cols // 2)).center.b("Operations")
+                doc.td(colspan=str(cols // 2)).center.b("Bytes")
+
+            for line in table_structure:
+                with doc.tr:
+                    if isinstance(line, str):
+                        with doc.td(colspan=str(cols)):
+                            doc.center.b(line)
+                    else:
+                        for name in line:
+                            if name is None:
+                                doc.td("-", colspan=str(cols // 2))
+                                continue
+
+                            amount_s, avg, dev = records[name]
+
+                            if name in (storage_cpu_s, storage_cpu_s_b) and avg is not None:
+                                dev_s = str(int(dev * 100 / avg)) + "%" if avg > 1E-9 else b2ssize_10(dev) + 's'
+                                rel_val_s = "{}s ~ {}".format(b2ssize_10(avg), dev_s)
+                            else:
+                                if avg is None:
+                                    rel_val_s = '-'
+                                else:
+                                    avg_s = int(avg) if avg > 10 else '{:.1f}'.format(avg)
+                                    if avg > 1E-5:
+                                        dev_s = str(int(dev * 100 / avg)) + "%"
+                                    else:
+                                        dev_s = int(dev) if dev > 10 else '{:.1f}'.format(dev)
+                                    rel_val_s = "{} ~ {}".format(avg_s, dev_s)
+
+                            doc.td(short_name[name], align="left")
+                            doc.td(amount_s, align="right")
+
+                            if avg is None or avg < 0.9:
+                                doc.td(rel_val_s, align="right")
+                            elif avg < 2.0:
+                                doc.td(align="right").font(rel_val_s, color='green')
+                            elif avg < 5.0:
+                                doc.td(align="right").font(rel_val_s, color='orange')
+                            else:
+                                doc.td(align="right").font(rel_val_s, color='red')
+
+        res = xmlbuilder3.tostr(doc).split("\n", 1)[1]
+        yield Menu1st.per_job, job.summary, HTMLBlock(html.center(res))
+
+        iop_names = [test_write_iop, test_read_iop, test_iop,
+                     storage_write_iop, storage_read_iop, storage_iop]
+
+        bytes_names = [test_write, test_read, test_rw,
+                       test_send, test_recv, test_net,
+                       storage_write, storage_read, storage_rw,
+                       storage_send, storage_recv, storage_net]
+
+        net_pkt_names = [test_send_pkt, test_recv_pkt, test_net_pkt,
+                         storage_send_pkt, storage_recv_pkt, storage_net_pkt]
+
+        for tp, names in [('iop', iop_names), ("bytes", bytes_names), ('Net packets per IOP', net_pkt_names)]:
+            vals = []
+            devs = []
+            avail_names = []
+            for name in names:
+                if name in records:
+                    avail_names.append(name)
+                    _, avg, dev = records[name]
+                    vals.append(avg)
+                    devs.append(dev)
+
+            # synchronously sort values and names, values is a key
+            vals, names, devs = map(list, zip(*sorted(zip(vals, names, devs))))
+
+            ds = DataSource(suite_id=suite.storage_id,
+                            job_id=job.storage_id,
+                            node_id=AGG_TAG,
+                            sensor='resources',
+                            dev=AGG_TAG,
+                            metric=tp.replace(' ', "_") + '2service_bar',
+                            tag=default_format)
+
+            fname = plot_simple_bars(rstorage, ds,
+                                     "Resource consuption / service provided, " + tp,
+                                     [name.replace(" nodes", "") for name in names],
+                                     vals, devs)
+
+            yield Menu1st.per_job, job.summary, HTMLBlock(html.img(fname))
+
+
+class BottleNeck(JobReporter):
+    """Statistic info for job results"""
+    suite_types = {'fio'}
+
+    def get_divs(self, suite: SuiteConfig, job: JobConfig, rstorage: ResultStorage) -> \
+            Iterator[Tuple[str, str, HTMLBlock]]:
+
+        nodes = list(find_nodes_by_roles(rstorage, STORAGE_ROLES))
+
+        sensor = 'block-io'
+        metric = 'io_queue'
+        bn_val = 16
+
+        for node in nodes:
+            bn = 0
+            tot = 0
+            for _, ds in rstorage.iter_sensors(node_id=node.node_id, sensor=sensor, metric=metric):
+                if ds.dev in ('sdb', 'sdc', 'sdd', 'sde'):
+                    data = rstorage.load_sensor(ds)
+                    p1 = job.reliable_info_range_s[0] * unit_conversion_coef('s', data.time_units)
+                    p2 = job.reliable_info_range_s[1] * unit_conversion_coef('s', data.time_units)
+                    idx1, idx2 = numpy.searchsorted(data.times, (p1, p2))
+                    bn += (data.data[idx1: idx2] > bn_val).sum()
+                    tot += idx2 - idx1
+            print(node, bn, tot)
+
+        yield Menu1st.per_job, job.summary, HTMLBlock("")
 
 
 # CPU load
@@ -968,22 +1283,15 @@
 
         # plot CPU time
         for rt, roles in [('storage', STORAGE_ROLES), ('test', ['testnode'])]:
-            cpu_ts = {}
-            cpu_metrics = "idle guest iowait irq nice sirq steal sys user".split()
-            for name in cpu_metrics:
-                cpu_ts[name] = summ_sensors(rstorage, roles, sensor='system-cpu', metric=name,
-                                            time_range=job.reliable_info_range_s)
-
-            it = iter(cpu_ts.values())
-            total_over_time = next(it).data.copy()
-            for ts in it:
-                total_over_time += ts.data
-
+            cpu_ts = get_cluster_cpu_load(rstorage, roles, job.reliable_info_range_s)
+            tss = [(name, ts.data * 100 / cpu_ts['total'].data)
+                   for name, ts in cpu_ts.items()
+                   if name in {'user', 'sys', 'irq', 'idle'}]
             fname = plot_simple_over_time(rstorage,
                                           cpu_ts['idle'].source(job_id=job.storage_id,
                                                                 suite_id=suite.storage_id,
                                                                 metric='allcpu', tag=rt + '.plt.' + default_format),
-                                          tss=[(name, ts.data * 100 / total_over_time) for name, ts in cpu_ts.items()],
+                                          tss=tss,
                                           average=True,
                                           ylabel="CPU time %",
                                           title="{} nodes CPU usage".format(rt.capitalize()))
@@ -1023,19 +1331,23 @@
         for name, devs, roles in [('storage', storage_devs, STORAGE_ROLES),
                                   ('journal', journal_devs, STORAGE_ROLES),
                                   ('test', test_nodes_devs, ['testnode'])]:
+
+            yield Menu1st.per_job, job.summary, \
+                HTMLBlock(html.H2(html.center("{} IO heatmaps".format(name.capitalize()))))
+
             # QD heatmap
             ioq2d = find_sensors_to_2d(rstorage, roles, sensor='block-io', devs=devs,
                                        metric='io_queue', time_range=trange)
-            fname = plot_hmap_from_2d(rstorage, DataSource(suite.storage_id,
-                                                           job.storage_id,
-                                                           AGG_TAG,
-                                                           'block-io',
-                                                           name,
-                                                           metric='io_queue',
-                                                           tag="hmap." + default_format),
-                                      ioq2d, ylabel="IO QD", title=name.capitalize() + " devs QD",
-                                      bins=StyleProfile.qd_bins,
-                                      xlabel='Time')  # type: str
+
+            ds = DataSource(suite.storage_id, job.storage_id, AGG_TAG, 'block-io', name, tag="hmap." + default_format)
+
+            fname = plot_hmap_from_2d(rstorage,
+                                      ds(metric='io_queue'),
+                                      ioq2d,
+                                      ylabel="IO QD",
+                                      title=name.capitalize() + " devs QD",
+                                      xlabel='Time',
+                                      bins=StyleProfile.qd_bins)  # type: str
             yield Menu1st.per_job, job.summary, HTMLBlock(html.img(fname))
 
             # Block size heatmap
@@ -1045,14 +1357,11 @@
             sw2d = find_sensors_to_2d(rstorage, roles, sensor='block-io', devs=devs,
                                       metric='sectors_written', time_range=trange)
             data2d = sw2d / wc2d / 1024
-            fname = plot_hmap_from_2d(rstorage, DataSource(suite.storage_id,
-                                                           job.storage_id,
-                                                           AGG_TAG,
-                                                           'block-io',
-                                                           name,
-                                                           metric='wr_block_size',
-                                                           tag="hmap." + default_format),
-                                      data2d, ylabel="IO bsize, KiB", title=name.capitalize() + " write block size",
+            fname = plot_hmap_from_2d(rstorage,
+                                      ds(metric='wr_block_size'),
+                                      data2d,
+                                      ylabel="IO bsize, KiB",
+                                      title=name.capitalize() + " write block size",
                                       xlabel='Time',
                                       bins=StyleProfile.block_size_bins)  # type: str
             yield Menu1st.per_job, job.summary, HTMLBlock(html.img(fname))
@@ -1060,65 +1369,18 @@
             # iotime heatmap
             wtime2d = find_sensors_to_2d(rstorage, roles, sensor='block-io', devs=devs,
                                          metric='io_time', time_range=trange)
-            fname = plot_hmap_from_2d(rstorage, DataSource(suite.storage_id,
-                                                           job.storage_id,
-                                                           AGG_TAG,
-                                                           'block-io',
-                                                           name,
-                                                           metric='io_time',
-                                                           tag="hmap." + default_format),
-                                      wtime2d, ylabel="IO time (ms) per second",
+            fname = plot_hmap_from_2d(rstorage,
+                                      ds(metric='io_time'),
+                                      wtime2d,
+                                      ylabel="IO time (ms) per second",
                                       title=name.capitalize() + " iotime",
                                       xlabel='Time',
                                       bins=StyleProfile.iotime_bins)  # type: str
             yield Menu1st.per_job, job.summary, HTMLBlock(html.img(fname))
 
 
-# IOPS/latency distribution
-class IOHist(JobReporter):
-    """IOPS.latency distribution histogram"""
-    suite_types = {'fio'}
-
-    def get_divs(self,
-                 suite: SuiteConfig,
-                 job: JobConfig,
-                 rstorage: ResultStorage) -> Iterator[Tuple[str, str, HTMLBlock]]:
-
-        fjob = cast(FioJobConfig, job)
-
-        yield Menu1st.per_job, fjob.summary, HTMLBlock(html.H2(html.center("Load histograms")))
-
-        # agg_lat = get_aggregated(rstorage, suite, fjob, "lat")
-        # # bins_edges = numpy.array(get_lat_vals(agg_lat.data.shape[1]), dtype='float32') / 1000  # convert us to ms
-        # lat_stat_prop = calc_histo_stat_props(agg_lat, bins_edges=None, rebins_count=StyleProfile.hist_lat_boxes)
-        #
-        # long_summary = cast(FioJobParams, fjob.params).long_summary
-        #
-        # title = "Latency distribution"
-        # units = "ms"
-        #
-        # fpath = plot_hist(rstorage, agg_lat.source(tag='hist.svg'), title, units, lat_stat_prop)  # type: str
-        # yield Menu1st.per_job, fjob.summary, HTMLBlock(html.img(fpath))
-
-        agg_io = get_aggregated(rstorage, suite, fjob, "bw")
-
-        if fjob.bsize >= LARGE_BLOCKS:
-            title = "BW distribution"
-            units = "MiBps"
-            agg_io.data //= int(unit_conversion_coef(units, agg_io.units))
-        else:
-            title = "IOPS distribution"
-            agg_io.data //= (int(unit_conversion_coef("KiBps", agg_io.units)) * fjob.bsize)
-            units = "IOPS"
-
-        io_stat_prop = calc_norm_stat_props(agg_io, bins_count=StyleProfile.hist_boxes)
-        fpath = plot_hist(rstorage, agg_io.source(tag='hist.' + default_format),
-                          title, units, io_stat_prop)  # type: str
-        yield Menu1st.per_job, fjob.summary, HTMLBlock(html.img(fpath))
-
-
 # IOPS/latency over test time for each job
-class IOTime(JobReporter):
+class LoadToolResults(JobReporter):
     """IOPS/latency during test"""
     suite_types = {'fio'}
 
@@ -1129,8 +1391,10 @@
 
         fjob = cast(FioJobConfig, job)
 
+        yield Menu1st.per_job, job.summary, HTMLBlock(html.H2(html.center("Load tool results")))
+
         agg_io = get_aggregated(rstorage, suite, fjob, "bw")
-        if fjob.bsize >= LARGE_BLOCKS:
+        if fjob.bsize >= DefStyleProfile.large_blocks:
             title = "Fio measured Bandwidth over time"
             units = "MiBps"
             agg_io.data //= int(unit_conversion_coef(units, agg_io.units))
@@ -1161,22 +1425,35 @@
 
         yield Menu1st.per_job, fjob.summary, HTMLBlock(html.img(fpath))
 
+        fjob = cast(FioJobConfig, job)
 
-class ResourceUsage:
-    def __init__(self, io_r_ops: int, io_w_ops: int, io_r_kb: int, io_w_kb: int) -> None:
-        self.io_w_ops = io_w_ops
-        self.io_r_ops = io_r_ops
-        self.io_w_kb = io_w_kb
-        self.io_r_kb = io_r_kb
+        # agg_lat = get_aggregated(rstorage, suite, fjob, "lat")
+        # # bins_edges = numpy.array(get_lat_vals(agg_lat.data.shape[1]), dtype='float32') / 1000  # convert us to ms
+        # lat_stat_prop = calc_histo_stat_props(agg_lat, bins_edges=None, rebins_count=StyleProfile.hist_lat_boxes)
+        #
+        # long_summary = cast(FioJobParams, fjob.params).long_summary
+        #
+        # title = "Latency distribution"
+        # units = "ms"
+        #
+        # fpath = plot_hist(rstorage, agg_lat.source(tag='hist.svg'), title, units, lat_stat_prop)  # type: str
+        # yield Menu1st.per_job, fjob.summary, HTMLBlock(html.img(fpath))
 
-        self.cpu_used_user = None  # type: int
-        self.cpu_used_sys = None  # type: int
-        self.cpu_wait_io = None  # type: int
+        agg_io = get_aggregated(rstorage, suite, fjob, "bw")
 
-        self.net_send_packets = None  # type: int
-        self.net_recv_packets = None  # type: int
-        self.net_send_kb = None  # type: int
-        self.net_recv_kb = None  # type: int
+        if fjob.bsize >= DefStyleProfile.large_blocks:
+            title = "BW distribution"
+            units = "MiBps"
+            agg_io.data //= int(unit_conversion_coef(units, agg_io.units))
+        else:
+            title = "IOPS distribution"
+            agg_io.data //= (int(unit_conversion_coef("KiBps", agg_io.units)) * fjob.bsize)
+            units = "IOPS"
+
+        io_stat_prop = calc_norm_stat_props(agg_io, bins_count=StyleProfile.hist_boxes)
+        fpath = plot_hist(rstorage, agg_io.source(tag='hist.' + default_format),
+                          title, units, io_stat_prop)  # type: str
+        yield Menu1st.per_job, fjob.summary, HTMLBlock(html.img(fpath))
 
 
 # Cluster load over test time
@@ -1187,8 +1464,8 @@
     storage_sensors = [
         ('block-io', 'reads_completed', "Read", 'iop'),
         ('block-io', 'writes_completed', "Write", 'iop'),
-        ('block-io', 'sectors_read', "Read", 'KiB'),
-        ('block-io', 'sectors_written', "Write", 'KiB'),
+        ('block-io', 'sectors_read', "Read", 'MiB'),
+        ('block-io', 'sectors_written', "Write", 'MiB'),
     ]
 
     def get_divs(self,
@@ -1198,16 +1475,16 @@
         yield Menu1st.per_job, job.summary, HTMLBlock(html.H2(html.center("Cluster load")))
 
         for sensor, metric, op, units in self.storage_sensors:
-            ts = summ_sensors(rstorage, ['testnode'], sensor, metric, job.reliable_info_range_s)
+            ts = summ_sensors(rstorage, STORAGE_ROLES, sensor, metric, job.reliable_info_range_s)
             ds = DataSource(suite_id=suite.storage_id,
                             job_id=job.storage_id,
-                            node_id="test_nodes",
+                            node_id="storage",
                             sensor=sensor,
                             dev=AGG_TAG,
                             metric=metric,
                             tag="ts." + default_format)
 
-            data = ts.data if units != 'KiB' else ts.data * float(unit_conversion_coef(ts.units, 'KiB'))
+            data = ts.data if units != 'MiB' else ts.data * float(unit_conversion_coef(ts.units, 'MiB'))
             ts = TimeSeries(name="",
                             times=numpy.arange(*job.reliable_info_range_s),
                             data=data,
@@ -1218,14 +1495,10 @@
                             histo_bins=ts.histo_bins)
 
             sensor_title = "{} {}".format(op, units)
-            fpath = plot_v_over_time(rstorage, ds, sensor_title, sensor_title, ts=ts)  # type: str
+            fpath = plot_v_over_time(rstorage, ds, sensor_title, units, ts=ts)  # type: str
             yield Menu1st.per_job, job.summary, HTMLBlock(html.img(fpath))
 
 
-# Ceph cluster summary
-class ResourceConsumption(Reporter):
-    """Resources consumption report, only text"""
-
 
 # Node load over test time
 class NodeLoad(Reporter):
@@ -1250,12 +1523,12 @@
     def run(self, ctx: TestRun) -> None:
         rstorage = ResultStorage(ctx.storage)
 
-        job_reporters = [StatInfo(), IOTime(), IOHist(), ClusterLoad(), CPULoadPlot(),
+        job_reporters = [StatInfo(), Resources(), LoadToolResults(), ClusterLoad(), CPULoadPlot(),
                          QDIOTimeHeatmap()] # type: List[JobReporter]
-        reporters = []
-
-        # reporters = [IO_QD()]  # type: List[Reporter]
-        # job_reporters = [ClusterLoad()]
+        # job_reporters = [QDIOTimeHeatmap()] # type: List[JobReporter]
+        # job_reporters = []
+        reporters = [IO_QD()]  # type: List[Reporter]
+        # reporters = []  # type: List[Reporter]
 
         root_dir = os.path.dirname(os.path.dirname(wally.__file__))
         doc_templ_path = os.path.join(root_dir, "report_templates/index.html")
@@ -1278,18 +1551,25 @@
             all_jobs = list(rstorage.iter_job(suite))
             all_jobs.sort(key=lambda job: job.params)
             for job in all_jobs:
-                for reporter in job_reporters:
-                    logger.debug("Start reporter %s on job %s suite %s",
-                                 reporter.__class__.__name__, job.summary, suite.test_type)
-                    for block, item, html in reporter.get_divs(suite, job, rstorage):
-                        items[block][item].append(html)
-                if DEBUG:
-                    break
+                if 'rwd16384_qd1' == job.summary:
+                    try:
+                        for reporter in job_reporters:
+                            logger.debug("Start reporter %s on job %s suite %s",
+                                         reporter.__class__.__name__, job.summary, suite.test_type)
+                            for block, item, html in reporter.get_divs(suite, job, rstorage):
+                                items[block][item].append(html)
+                        if DEBUG:
+                            break
+                    except Exception:
+                        logger.exception("Failed to generate report for %s", job)
 
             for reporter in reporters:
-                logger.debug("Start reporter %s on suite %s", reporter.__class__.__name__, suite.test_type)
-                for block, item, html in reporter.get_divs(suite, rstorage):
-                    items[block][item].append(html)
+                try:
+                    logger.debug("Start reporter %s on suite %s", reporter.__class__.__name__, suite.test_type)
+                    for block, item, html in reporter.get_divs(suite, rstorage):
+                        items[block][item].append(html)
+                except Exception as exc:
+                    logger.exception("Failed to generate report")
 
             if DEBUG:
                 break
diff --git a/wally/result_classes.py b/wally/result_classes.py
index 9d2e83e..60a0a55 100644
--- a/wally/result_classes.py
+++ b/wally/result_classes.py
@@ -144,10 +144,13 @@
     def __repr__(self) -> str:
         return str(self)
 
-    def copy(self) -> 'TimeSeries':
+    def copy(self, no_data: bool = False) -> 'TimeSeries':
         cp = copy.copy(self)
-        cp.times = self.times.copy()
-        cp.data = self.data.copy()
+
+        if not no_data:
+            cp.times = self.times.copy()
+            cp.data = self.data.copy()
+
         cp.source = self.source()
         return cp
 
diff --git a/wally/run_test.py b/wally/run_test.py
index f8dc036..c4581d3 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -1,5 +1,6 @@
 import time
 import json
+import copy
 import logging
 from concurrent.futures import Future
 from typing import List, Dict, Tuple, Optional, Union, cast
@@ -143,15 +144,6 @@
             logger.debug("Add node %s with roles %s", url, roles)
 
 
-class SaveNodesStage(Stage):
-    """Save nodes list to file"""
-
-    priority = StepOrder.UPDATE_NODES_INFO + 1
-
-    def run(self, ctx: TestRun) -> None:
-        ctx.storage.put_list(ctx.nodes_info.values(), 'all_nodes')
-
-
 class SleepStage(Stage):
     """Save nodes list to file"""
 
@@ -266,15 +258,38 @@
         pass
 
 
+class SaveNodesStage(Stage):
+    """Save nodes list to file"""
+    nodes_path = 'all_nodes'
+    params_path = 'all_nodes_params.js'
+    priority = StepOrder.UPDATE_NODES_INFO + 1
+
+    def run(self, ctx: TestRun) -> None:
+        infos = list(ctx.nodes_info.values())
+        params = {node.node_id: node.params for node in infos}
+        ninfos = [copy.copy(node) for node in infos]
+        for node in ninfos:
+            node.params = "in {!r} file".format(self.params_path)
+        ctx.storage.put_list(ninfos, self.nodes_path)
+        ctx.storage.put_raw(json.dumps(params).encode('utf8'), self.params_path)
+
+
 class LoadStoredNodesStage(Stage):
     priority = StepOrder.DISCOVER
 
     def run(self, ctx: TestRun) -> None:
-        if 'all_nodes' in ctx.storage:
+        if SaveNodesStage.nodes_path in ctx.storage:
             if ctx.nodes_info:
                 logger.error("Internal error: Some nodes already stored in " +
                              "nodes_info before LoadStoredNodesStage stage")
                 raise StopTestError()
-            ctx.nodes_info = {node.node_id: node
-                              for node in ctx.storage.load_list(NodeInfo, "all_nodes")}
+
+            nodes = {node.node_id: node for node in ctx.storage.load_list(NodeInfo, SaveNodesStage.nodes_path)}
+
+            if SaveNodesStage.params_path in ctx.storage:
+                params = json.loads(ctx.storage.get_raw(SaveNodesStage.params_path).decode('utf8'))
+                for node_id, node in nodes.items():
+                    node.params = params.get(node_id, {})
+
+            ctx.nodes_info = nodes
             logger.info("%s nodes loaded from database", len(ctx.nodes_info))
diff --git a/wally/sensors.py b/wally/sensors.py
index 60830a1..89bc224 100644
--- a/wally/sensors.py
+++ b/wally/sensors.py
@@ -1,7 +1,7 @@
 import bz2
 import array
 import logging
-from typing import List, Dict, Tuple
+from typing import Dict
 
 import numpy
 
@@ -21,37 +21,6 @@
 logger = logging.getLogger("wally")
 
 
-sensor_units = {
-    "system-cpu.idle": "",
-    "system-cpu.nice": "",
-    "system-cpu.user": "",
-    "system-cpu.sys": "",
-    "system-cpu.iowait": "",
-    "system-cpu.irq": "",
-    "system-cpu.sirq": "",
-    "system-cpu.steal": "",
-    "system-cpu.guest": "",
-
-    "system-cpu.procs_blocked": "",
-    "system-cpu.procs_queue_x10": "",
-
-    "net-io.recv_bytes": "B",
-    "net-io.recv_packets": "",
-    "net-io.send_bytes": "B",
-    "net-io.send_packets": "",
-
-    "block-io.io_queue": "",
-    "block-io.io_time": "ms",
-    "block-io.reads_completed": "",
-    "block-io.rtime": "ms",
-    "block-io.sectors_read": "B",
-    "block-io.sectors_written": "B",
-    "block-io.writes_completed": "",
-    "block-io.wtime": "ms",
-    "block-io.weighted_io_time": "ms"
-}
-
-
 # TODO(koder): in case if node has more than one role sensor settings might be incorrect
 class StartSensorsStage(Stage):
     priority = StepOrder.START_SENSORS
@@ -118,27 +87,29 @@
 
 def collect_sensors_data(ctx: TestRun, stop: bool = False):
     rstorage = ResultStorage(ctx.storage)
+    total_sz = 0
+
+    logger.info("Start loading sensors")
     for node in ctx.nodes:
         node_id = node.node_id
         if node_id in ctx.sensors_run_on:
+            func = node.conn.sensors.stop if stop else node.conn.sensors.get_updates
 
-            if stop:
-                func = node.conn.sensors.stop
-            else:
-                func = node.conn.sensors.get_updates
+            # hack to calculate total transferred size
+            offset_map, compressed_blob, compressed_collected_at_b = func()
+            data_tpl = (offset_map, compressed_blob, compressed_collected_at_b)
 
-            # TODO: units should came along with data
-            # TODO: process raw sensors data
+            total_sz += len(compressed_blob) + len(compressed_collected_at_b) + sum(map(len, offset_map)) + \
+                16 * len(offset_map)
 
-            for path, value, is_array in sensors_rpc_plugin.unpack_rpc_updates(func()):
+            for path, value, is_array, units in sensors_rpc_plugin.unpack_rpc_updates(data_tpl):
                 if path == 'collected_at':
                     ds = DataSource(node_id=node_id, metric='collected_at', tag='csv')
-                    rstorage.append_sensor(numpy.array(value), ds, 'us')
+                    rstorage.append_sensor(numpy.array(value), ds, units)
                 else:
                     sensor, dev, metric = path.split(".")
                     ds = DataSource(node_id=node_id, metric=metric, dev=dev, sensor=sensor, tag='csv')
                     if is_array:
-                        units = sensor_units["{}.{}".format(sensor, metric)]
                         rstorage.append_sensor(numpy.array(value), ds, units)
                     else:
                         if metric == 'historic':
@@ -146,6 +117,7 @@
                         else:
                             assert metric in ('perf_dump', 'historic_js')
                             rstorage.put_sensor_raw(value, ds(tag='js'))
+    logger.info("Download %sB of sensors data", utils.b2ssize(total_sz))
 
 
 
@@ -156,33 +128,3 @@
     def run(self, ctx: TestRun) -> None:
         collect_sensors_data(ctx, True)
 
-
-# def delta(func, only_upd=True):
-#     prev = {}
-#     while True:
-#         for dev_name, vals in func():
-#             if dev_name not in prev:
-#                 prev[dev_name] = {}
-#                 for name, (val, _) in vals.items():
-#                     prev[dev_name][name] = val
-#             else:
-#                 dev_prev = prev[dev_name]
-#                 res = {}
-#                 for stat_name, (val, accum_val) in vals.items():
-#                     if accum_val:
-#                         if stat_name in dev_prev:
-#                             delta = int(val) - int(dev_prev[stat_name])
-#                             if not only_upd or 0 != delta:
-#                                 res[stat_name] = str(delta)
-#                         dev_prev[stat_name] = val
-#                     elif not only_upd or '0' != val:
-#                         res[stat_name] = val
-#
-#                 if only_upd and len(res) == 0:
-#                     continue
-#                 yield dev_name, res
-#         yield None, None
-#
-#
-
-
diff --git a/wally/storage.py b/wally/storage.py
index aa90ac9..e07d9b3 100644
--- a/wally/storage.py
+++ b/wally/storage.py
@@ -6,7 +6,6 @@
 import re
 import abc
 import shutil
-import sqlite3
 import logging
 from typing import Any, TypeVar, Type, IO, Tuple, cast, List, Dict, Iterable, Iterator
 
@@ -15,7 +14,6 @@
     from yaml import CLoader as Loader, CDumper as Dumper  # type: ignore
 except ImportError:
     from yaml import Loader, Dumper  # type: ignore
-import numpy
 
 from .common_types import IStorable
 
diff --git a/wally/suits/io/ceph.cfg b/wally/suits/io/ceph.cfg
index 22a4937..e134e4c 100644
--- a/wally/suits/io/ceph.cfg
+++ b/wally/suits/io/ceph.cfg
@@ -11,7 +11,7 @@
 runtime=180
 
 # ---------------------------------------------------------------------
-# check different thread count, sync mode. (latency, iops) = func(th_count)
+# check different QD, direct mode. (latency, iops) = func(th_count)
 # ---------------------------------------------------------------------
 [ceph_{TEST_SUMM}]
 blocksize=4k
@@ -19,17 +19,16 @@
 iodepth={QD_W}
 
 # ---------------------------------------------------------------------
-# check different thread count, direct read mode. (latency, iops) = func(th_count)
+# check different QD, direct read mode. (latency, iops) = func(th_count)
 # also check iops for randread
 # ---------------------------------------------------------------------
 [ceph_{TEST_SUMM}]
 blocksize=4k
 rw=randread
-direct=1
 iodepth={QD_R}
 
 # ---------------------------------------------------------------------
-# sync write
+# sync write - disabled for now
 # ---------------------------------------------------------------------
 #[ceph_{TEST_SUMM}]
 #blocksize=4k
@@ -43,10 +42,15 @@
 # we can't use sequential with numjobs > 1 due to caching and block merging
 # ---------------------------------------------------------------------
 [ceph_{TEST_SUMM}]
-blocksize=16m
-rw=randwrite
-direct=1
-iodepth={QD_SEQ_W}
+blocksize=1m
+rw=write
+iodepth=1
+# offset_increment={OFFSET_INC}
+
+#[ceph_{TEST_SUMM}]
+#blocksize=16m
+#rw=randwrite
+#iodepth={QD_SEQ_W}
 
 # ---------------------------------------------------------------------
 # this is essentially sequential read operations
@@ -55,5 +59,4 @@
 [ceph_{TEST_SUMM}]
 blocksize=16m
 rw=randread
-direct=1
 iodepth={QD_SEQ_R}
diff --git a/wally/suits/io/verify.cfg b/wally/suits/io/verify.cfg
index 40055d7..b8d1076 100644
--- a/wally/suits/io/verify.cfg
+++ b/wally/suits/io/verify.cfg
@@ -1,18 +1,23 @@
 [global]
 include defaults_qd.cfg
-# QD={% 32, 64, 128, 256 %}
-QD={% 32, 256 %}
-runtime=120
+# QD={% 16, 32, 64, 128 %}
+# QD={% 32, 256 %}
+QD=1
+runtime=10
 direct=1
-ramp_time=15
+ramp_time=0
 
 # ---------------------------------------------------------------------
 
 [verify_{TEST_SUMM}]
-blocksize=4k
-rw=randwrite
-direct=1
-iodepth={QD}
+blocksize=1m
+rw=write
+iodepth=1
+
+#[verify_{TEST_SUMM}]
+#blocksize=16m
+#rw=randwrite
+#iodepth={% 1, 4 %}
 
 # [verify_{TEST_SUMM}]
 # blocksize=4k
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index 9aa4ce6..36c349e 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -108,7 +108,7 @@
                 expected_run_time = int(sum(run_times) * 1.05)
 
                 exec_time_s, end_dt_s = get_time_interval_printable_info(expected_run_time)
-                logger.info("Entire test should takes around %s and finished at %s", exec_time_s, end_dt_s)
+                logger.info("Entire test should takes around %s and finish at %s", exec_time_s, end_dt_s)
 
             for job in not_in_storage:
                 results = []  # type: List[TimeSeries]
@@ -120,7 +120,7 @@
 
                     expected_job_time = self.get_expected_runtime(job)
                     exec_time_s, end_dt_s = get_time_interval_printable_info(expected_job_time)
-                    logger.info("Job should takes around %s and finished at %s", exec_time_s, end_dt_s)
+                    logger.info("Job should takes around %s and finish at %s", exec_time_s, end_dt_s)
 
                     jfutures = [pool.submit(self.run_iteration, node, job) for node in self.suite.nodes]
                     failed = False