Move to new sensor selector, fix some bugs
diff --git a/configs-examples/default.yaml b/configs-examples/default.yaml
index ef62590..c1ee515 100644
--- a/configs-examples/default.yaml
+++ b/configs-examples/default.yaml
@@ -127,3 +127,23 @@
     openstack_ceph: OS_1_to_1 + ceph_vdb
     openstack_cinder: OS_1_to_1 + ceph_iscsi_vdb
     openstack_nova: OS_1_to_1 + nova_io
+
+
+default_dev_roles:
+    - role=testnode:
+        - type=cpu: client_cpu
+        - type=block: client_disk
+        - type=eth: client_net
+        - type=weth: client_net
+
+    - role=storage:
+        - type=cpu: storage_cpu
+        - type=block: storage_disk
+        - type=eth: storage_net
+        - type=weth: storage_net
+
+    - role=compute:
+        - type=cpu: compute_cpu
+        - type=block: compute_disk
+        - type=eth: compute_net
+        - type=weth: compute_net
diff --git a/configs-examples/perf_lab.yml b/configs-examples/perf_lab.yml
index 086d11c..3059aa9 100644
--- a/configs-examples/perf_lab.yml
+++ b/configs-examples/perf_lab.yml
@@ -20,7 +20,7 @@
     root@cz7626: testnode
     root@cz7627: testnode
 
-sleep: 300
+# sleep: 5
 
 tests:
   - fio:
@@ -28,4 +28,15 @@
       params:
           FILENAME: /dev/rbd0
           FILESIZE: 700G
+          RUNTIME: 600
 
+
+dev_roles:
+    - role=testnode:
+        - rbd0: client_disk
+    - role=ceph-osd:
+        - sd[g-z]: [storage_disk, ceph_storage]
+        - sd[c-f]: [storage_disk, ceph_journal]
+    - role=compute:
+        - type=hdd: compute_disk
+        - type=eth: compute_net
diff --git a/v2_plans.md b/v2_plans.md
index bc73d8e..c5c0a4d 100644
--- a/v2_plans.md
+++ b/v2_plans.md
@@ -1,25 +1,22 @@
 TODO today:
 -----------
+* Обдумать gtod_cpu опцию
+* Переделать код ресурсов под новые селекторы
+* Не все роли в классификаторе
+* Модель поиска через регулярки не работает для больших кластеров.
+  Проблема в избыточном применении re.match. Нужно делать предвыборку
+  по известным полям (как это сделать универсально). Задача эквивалентна
+  индексу в многомерной таблице. В простейшем случае сделать несколько
+  многовложенных словарей с разным порядком полей. Перебор происходит по
+  ноде и девайсу, но не по сенсору и метрике - 
+  {(sensor, metric): [(dev, node), ...]} или
+  {sensor: {metric: [(dev, node), ...]}}
 
-* array.array based TimeSeries to store in SensorStorage without numpy
-  no load, only store
-* Сделать суммарный репорт
-
-* Починить ls тестов
-* Интерполировать и кешировать сенсоры только для запрошенных интервалов,
-  падать если куска интервала нет
-* CLuster summary
-* Переместить сырые данные от fio из meta
-* Хранить csv данные агрегированные по устройсту
-* В текстовом репорте skew/kurt печатать до второго знака после запятой
-* В текстовом репорте убрать промежуточные линии между задачами,
-  отличающимися только QD
-* Resource consumption vs QD. сделать заголовок, поставить в
-  Engineering, так-же в ingeneering перенести
+* Расширить локатор устройств, используя роли ноды и ее hw_info
+* Построить для всех лоадов одну таблицу - сервис за X секунд vs. ресурсы
+* Cluster summary
+* Хранить csv данные агрегированные по устройсту???
 * для чтения нужно строить heatmap 'storage read block size', не write
-* Resource consumption per service provided - подписи к графикам
-* Проверить генерацию png, сделать опцию для report - какие картинки
-  ренерировать
 * Storage nodes cpu non-idle heatmap
 * Check ceph fs flush time is larger that tests time
 * Расчет кеш-попаданий при чтении
@@ -44,7 +41,6 @@
 
 * Посмотреть почему тест дикки-фуллера так фигово работает
 * Что делать с IDLE load?
-* Что делать с дырой в данных от сенсоров при перезапуске теста
 
 Wally состоит из частей, которые стоит разделить и унифицировать с другими тулами:
 ----------------------------------------------------------------------------------
@@ -85,30 +81,12 @@
 
 TODO next
 ---------
-* Merge FSStorage and serializer into ObjStorage, separate TSStorage.
-* Build WallyStorage on top of it, use only WallyStorage in code
 * check that OS key match what is stored on disk 
-* unit tests for math functions
 * CEPH PERFORMANCE COUNTERS
-* Sync storage_structure
-* fix fio job summary
-* Use disk max QD as qd limit?
-* Cumulative statistic table for all jobs
-* Add column for job params, which show how many cluster resource consumed
-* show extra outliers with arrows
 * Hide cluster load if no nodes available
-* Show latency skew and curtosis
-* Sort engineering report by result tuple
-* Name engineering reports by long summary
-* Latency heatmap and violin aren't consistent
-* profile violint plot
-* Fix plot layout, there to much unused space around typical plot
 * iops boxplot as function from QD
-* collect device types mapping from nodes - device should be block/net/...
 * Optimize sensor communication with ceph, can run fist OSD request for data validation only on start.
 * Update Storage test, add tests for stat and plot module
-* Aggregated sensors boxplot
-* Hitmap for aggregated sensors
 * automatically find what to plot from storage data (but also allow to select via config)
 
 Have to think:
diff --git a/wally/ceph.py b/wally/ceph.py
index 5c2a4f0..ad3f461 100644
--- a/wally/ceph.py
+++ b/wally/ceph.py
@@ -35,12 +35,6 @@
 
     def run(self, ctx: TestRun) -> None:
         """Return list of ceph's nodes NodeInfo"""
-
-        if 'ceph' not in ctx.config.discover:
-            print(ctx.config.discover)
-            logger.debug("Skip ceph discovery due to config setting")
-            return
-
         if 'all_nodes' in ctx.storage:
             logger.debug("Skip ceph discovery, use previously discovered nodes")
             return
@@ -82,20 +76,16 @@
         with setup_rpc(connect(info), ctx.rpc_code, ctx.default_rpc_plugins,
                        log_level=ctx.config.rpc_log_level) as node:
 
-            # ssh_key = node.get_file_content("~/.ssh/id_rsa", expanduser=True)
-
             try:
                 ips = set()
                 for ip, osds_info in get_osds_info(node, ceph_extra_args, thcount=16).items():
                     ip = ip_remap.get(ip, ip)
                     ips.add(ip)
-                    # creds = ConnCreds(to_ip(cast(str, ip)), user="root", key=ssh_key)
                     creds = ConnCreds(to_ip(cast(str, ip)), user="root")
                     info = ctx.merge_node(creds, {'ceph-osd'})
                     info.params.setdefault('ceph-osds', []).extend(info.__dict__.copy() for info in osds_info)
                     assert 'ceph' not in info.params or info.params['ceph'] == ceph_params
                     info.params['ceph'] = ceph_params
-
                 logger.debug("Found %s nodes with ceph-osd role", len(ips))
             except Exception as exc:
                 if not ignore_errors:
@@ -108,7 +98,6 @@
                 counter = 0
                 for counter, ip in enumerate(get_mons_ips(node, ceph_extra_args)):
                     ip = ip_remap.get(ip, ip)
-                    # creds = ConnCreds(to_ip(cast(str, ip)), user="root", key=ssh_key)
                     creds = ConnCreds(to_ip(cast(str, ip)), user="root")
                     info = ctx.merge_node(creds, {'ceph-mon'})
                     assert 'ceph' not in info.params or info.params['ceph'] == ceph_params
diff --git a/wally/console_report.py b/wally/console_report.py
index 6490329..381d9ff 100644
--- a/wally/console_report.py
+++ b/wally/console_report.py
@@ -1,52 +1,73 @@
 import logging
-
+from typing import cast, Iterator, List, Union
 
 import numpy
 
 from cephlib.common import float2str
-from cephlib import texttable
+from cephlib.texttable import Texttable
 from cephlib.statistic import calc_norm_stat_props, calc_histo_stat_props
 
 from .stage import Stage, StepOrder
 from .test_run_class import TestRun
+from .result_classes import SuiteConfig
 from .suits.io.fio import FioTest
+from .suits.io.fio_job import FioJobParams
 from .suits.io.fio_hist import get_lat_vals
 from .data_selectors import get_aggregated
+from .result_storage import IWallyStorage
 
 
 logger = logging.getLogger("wally")
 
 
+
+console_report_headers = ["Description", "IOPS ~ Dev", "BW, MiBps", 'Skew/Kurt', 'lat med, ms', 'lat 95, ms']
+console_report_align = ['l', 'r', 'r', 'r', 'r', 'r']
+
+def get_console_report_table(suite: SuiteConfig, rstorage: IWallyStorage) -> List[Union[List[str], Texttable.HLINE]]:
+    table = []  # type: List[Union[List[str], Texttable.HLINE]]
+    prev_params = None
+    for job in sorted(rstorage.iter_job(suite), key=lambda job: job.params):
+        fparams = cast(FioJobParams, job.params)
+        fparams['qd'] = None
+
+        if prev_params is not None and fparams.char_tpl != prev_params:
+            table.append(Texttable.HLINE)
+
+        prev_params = fparams.char_tpl
+
+        bw_ts = get_aggregated(rstorage, suite.storage_id, job.storage_id, metric='bw',
+                               trange=job.reliable_info_range_s)
+        props = calc_norm_stat_props(bw_ts)
+        avg_iops = props.average // job.params.params['bsize']
+        iops_dev = props.deviation // job.params.params['bsize']
+
+        lat_ts = get_aggregated(rstorage, suite.storage_id, job.storage_id, metric='lat',
+                                trange=job.reliable_info_range_s)
+        bins_edges = numpy.array(get_lat_vals(lat_ts.data.shape[1]), dtype='float32') / 1000  # convert us to ms
+        lat_props = calc_histo_stat_props(lat_ts, bins_edges)
+        table.append([job.params.summary,
+                      "{:>6s} ~ {:>6s}".format(float2str(avg_iops), float2str(iops_dev)),
+                      float2str(props.average / 1024),  # Ki -> Mi
+                      "{:>5.1f}/{:>5.1f}".format(props.skew, props.kurt),
+                      float2str(lat_props.perc_50), float2str(lat_props.perc_95)])
+    return table
+
+
 class ConsoleReportStage(Stage):
 
     priority = StepOrder.REPORT
 
     def run(self, ctx: TestRun) -> None:
         for suite in ctx.rstorage.iter_suite(FioTest.name):
-            table = texttable.Texttable(max_width=200)
-
+            table = Texttable(max_width=200)
+            table.set_deco(Texttable.VLINES | Texttable.BORDER | Texttable.HEADER)
             tbl = ctx.rstorage.get_txt_report(suite)
             if tbl is None:
-                table.header(["Description", "IOPS ~ Dev", "BW, MiBps", 'Skew/Kurt', 'lat med, ms', 'lat 95, ms'])
-                table.set_cols_align(('l', 'r', 'r', 'r', 'r', 'r'))
-
-                for job in sorted(ctx.rstorage.iter_job(suite), key=lambda job: job.params):
-                    bw_ts = get_aggregated(ctx.rstorage, suite.storage_id, job.storage_id, metric='bw',
-                                           trange=job.reliable_info_range_s)
-                    props = calc_norm_stat_props(bw_ts)
-                    avg_iops = props.average // job.params.params['bsize']
-                    iops_dev = props.deviation // job.params.params['bsize']
-
-                    lat_ts = get_aggregated(ctx.rstorage, suite.storage_id, job.storage_id, metric='lat',
-                                            trange=job.reliable_info_range_s)
-                    bins_edges = numpy.array(get_lat_vals(lat_ts.data.shape[1]), dtype='float32') / 1000  # convert us to ms
-                    lat_props = calc_histo_stat_props(lat_ts, bins_edges)
-                    table.add_row([job.params.summary,
-                                   "{} ~ {}".format(float2str(avg_iops), float2str(iops_dev)),
-                                   float2str(props.average / 1024),  # Ki -> Mi
-                                   "{}/{}".format(float2str(props.skew), float2str(props.kurt)),
-                                   float2str(lat_props.perc_50), float2str(lat_props.perc_95)])
-
+                table.header(console_report_headers)
+                table.set_cols_align(console_report_align)
+                for line in get_console_report_table(suite, ctx.rstorage):
+                    table.add_row(line)
                 tbl = table.draw()
                 ctx.rstorage.put_txt_report(suite, tbl)
             print(tbl)
diff --git a/wally/data_selectors.py b/wally/data_selectors.py
index 795c66b..1950e95 100644
--- a/wally/data_selectors.py
+++ b/wally/data_selectors.py
@@ -1,10 +1,11 @@
 import logging
-from typing import Tuple, Iterator
+from typing import Tuple, Iterator, List, Iterable, Dict, Union, Callable, Set
 
 import numpy
 
 from cephlib.numeric_types import DataSource, TimeSeries
 from cephlib.storage_selectors import c_interpolate_ts_on_seconds_border
+from cephlib.node import NodeInfo
 
 from .result_classes import IWallyStorage
 from .suits.io.fio_hist import expected_lat_bins
@@ -40,18 +41,23 @@
 
 def get_aggregated(rstorage: IWallyStorage, suite_id: str, job_id: str, metric: str,
                    trange: Tuple[int, int]) -> TimeSeries:
-    "Sum selected metric for all nodes for given Suite/job"
+    "Sum selected fio metric for all nodes for given Suite/job"
+
+    key = (id(rstorage), suite_id, job_id, metric, trange)
+    aggregated_cache = rstorage.storage.other_caches['aggregated']
+    if key in aggregated_cache:
+        return aggregated_cache[key].copy()
 
     tss = list(find_all_series(rstorage, suite_id, job_id, metric))
 
     if len(tss) == 0:
         raise NameError("Can't found any TS for {},{},{}".format(suite_id, job_id, metric))
 
-    ds = DataSource(suite_id=suite_id, job_id=job_id, node_id=AGG_TAG, sensor='fio',
-                    dev=AGG_TAG, metric=metric, tag='csv')
+    c_intp = c_interpolate_ts_on_seconds_border
+    tss_inp = [c_intp(ts.select(trange), tp='fio', allow_broken_step=(metric == 'lat')) for ts in tss]
 
-    tss_inp = [c_interpolate_ts_on_seconds_border(ts, tp='fio', allow_broken_step=(metric == 'lat')) for ts in tss]
     res = None
+    res_times = None
 
     for ts in tss_inp:
         if ts.time_units != 's':
@@ -82,16 +88,23 @@
 
         dt = ts.data[idx1: idx2]
         if res is None:
-            res = dt
+            res = dt.copy()
+            res_times = ts.times[idx1: idx2].copy()
         else:
             assert res.shape == dt.shape, "res.shape(={}) != dt.shape(={})".format(res.shape, dt.shape)
             res += dt
 
+    ds = DataSource(suite_id=suite_id, job_id=job_id, node_id=AGG_TAG, sensor='fio',
+                    dev=AGG_TAG, metric=metric, tag='csv')
     agg_ts = TimeSeries(res, source=ds,
-                        times=tss_inp[0].times.copy(),
+                        times=res_times,
                         units=tss_inp[0].units,
                         histo_bins=tss_inp[0].histo_bins,
                         time_units=tss_inp[0].time_units)
+    aggregated_cache[key] = agg_ts
+    return agg_ts.copy()
 
-    return agg_ts
+
+def get_nodes(storage: IWallyStorage, roles: Iterable[str]) -> List[NodeInfo]:
+    return [node for node in storage.load_nodes() if node.roles.intersection(roles)]
 
diff --git a/wally/main.py b/wally/main.py
index 66e39db..497bdac 100644
--- a/wally/main.py
+++ b/wally/main.py
@@ -10,11 +10,9 @@
 from typing import List, Tuple, Any, Callable, IO, cast, Optional, Iterator
 from yaml import load as _yaml_load
 
-
 YLoader = Callable[[IO], Any]
 yaml_load = None  # type: YLoader
 
-
 try:
     from yaml import CLoader
     yaml_load = cast(YLoader,  functools.partial(_yaml_load, Loader=CLoader))
@@ -31,6 +29,7 @@
 
 from cephlib.common import setup_logging
 from cephlib.storage import make_storage
+from cephlib.wally_storage import WallyDB
 from cephlib.ssh import set_ssh_key_passwd
 from cephlib.node import log_nodes_statistic
 from cephlib.node_impl import get_rpc_server_code
@@ -69,29 +68,53 @@
         raise
 
 
-def list_results(path: str) -> List[Tuple[str, str, str, str]]:
-    results = []  # type: List[Tuple[float, str, str, str, str]]
-
+def list_results(path: str, limit: int = None) -> List[Tuple[str, str, str, str, str]]:
+    dirs = []
     for dir_name in os.listdir(path):
         full_path = os.path.join(path, dir_name)
+        dirs.append((os.stat(full_path).st_ctime, full_path))
 
+    dirs.sort()
+    results = []  # type: List[Tuple[str, str, str, str, str]]
+    for _, full_path in dirs[::-1]:
         try:
             stor = make_storage(full_path, existing=True)
         except Exception as exc:
             logger.warning("Can't load folder {}. Error {}".format(full_path, exc))
 
-        comment = cast(str, stor.get('info/comment'))
-        run_uuid = cast(str, stor.get('info/run_uuid'))
-        run_time = cast(float, stor.get('info/run_time'))
-        test_types = ""
-        results.append((run_time,
-                        run_uuid,
-                        test_types,
-                        time.ctime(run_time),
-                        '-' if comment is None else comment))
+        try:
+            try:
+                cfg = stor.load(Config, WallyDB.config)
+            except KeyError:
+                cfg = stor.load(Config, "config")
+        except Exception as exc:
+            print("Fail to load {}. {}".format(os.path.basename(full_path), exc))
+            continue
 
-    results.sort()
-    return [i[1:] for i in results]
+        if WallyDB.run_interval in stor:
+            run_time = stor.get(WallyDB.run_interval)[0]
+        else:
+            run_time = os.stat(full_path).st_ctime
+
+        ftime = time.strftime("%d %b %H:%M", time.localtime(run_time))
+
+        test_types = []
+        for suite_cfg in cfg.get('tests', []):
+            for suite_name, params in suite_cfg.items():
+                if suite_name == 'fio':
+                    test_types.append("{}.{}".format(suite_name, params['load']))
+                else:
+                    test_types.append(suite_name)
+        results.append((cfg.run_uuid,
+                        ",".join(test_types),
+                        ftime,
+                        '-' if cfg.comment is None else cfg.comment,
+                        '-'))
+
+        if limit and len(results) >= limit:
+            break
+
+    return results
 
 
 def log_nodes_statistic_stage(ctx: TestRun) -> None:
@@ -103,7 +126,8 @@
     parser = argparse.ArgumentParser(prog='wally', description=descr)
     parser.add_argument("-l", '--log-level', help="print some extra log info")
     parser.add_argument("--ssh-key-passwd", default=None, help="Pass ssh key password")
-    parser.add_argument("--ssh-key-passwd-kbd", action="store_true", help="Enter ssh key password interactivelly")
+    parser.add_argument("--ssh-key-passwd-kbd", action="store_true", help="Enter ssh key password interactively")
+    parser.add_argument("--profile", action="store_true", help="Profile execution")
     parser.add_argument("-s", '--settings-dir', default=None,
                         help="Folder to store key/settings/history files")
 
@@ -111,6 +135,8 @@
 
     # ---------------------------------------------------------------------
     report_parser = subparsers.add_parser('ls', help='list all results')
+    report_parser.add_argument("-l", "--limit", metavar='LIMIT', help="Show only LIMIT last results",
+                               default=None, type=int)
     report_parser.add_argument("result_storage", help="Folder with test results")
 
     # ---------------------------------------------------------------------
@@ -236,6 +262,13 @@
     config = None  # type: Config
     storage = None  # type: IStorage
 
+    if opts.profile:
+        import cProfile
+        pr = cProfile.Profile()
+        pr.enable()
+    else:
+        pr = None
+
     if opts.subparser_name == 'test':
         config = load_config(opts.config_file)
         config.storage_url, config.run_uuid = utils.get_uniq_path_uuid(config.results_storage)
@@ -250,7 +283,7 @@
         config.discover = set(name for name in config.get('discover', '').split(",") if name)
 
         storage = make_storage(config.storage_url)
-        storage.put(config, 'config')
+        storage.put(config, WallyDB.config)
 
         stages.extend(get_run_stages())
         stages.append(SaveNodesStage())
@@ -267,7 +300,7 @@
     elif opts.subparser_name == 'resume':
         opts.resumed = True
         storage = make_storage(opts.storage_dir, existing=True)
-        config = storage.load(Config, 'config')
+        config = storage.load(Config, WallyDB.config)
         stages.extend(get_run_stages())
         stages.append(LoadStoredNodesStage())
         prev_opts = storage.get('cli')  # type: List[str]
@@ -281,9 +314,10 @@
 
     elif opts.subparser_name == 'ls':
         tab = Texttable(max_width=200)
-        tab.set_cols_align(["l", "l", "l", "l"])
-        tab.header(["Name", "Tests", "Run at", "Comment"])
-        tab.add_rows(list_results(opts.result_storage))
+        tab.set_cols_align(["l", "l", "l", "l", 'c'])
+        tab.set_deco(Texttable.VLINES | Texttable.BORDER | Texttable.HEADER)
+        tab.header(["Name", "Tests", "Started at", "Comment", "Result"])
+        tab.add_rows(list_results(opts.result_storage, opts.limit), header=False)
         print(tab.draw())
         return 0
 
@@ -292,7 +326,7 @@
             print(" --no-report option can't be used with 'report' cmd")
             return 1
         storage = make_storage(opts.data_dir, existing=True)
-        config = storage.load(Config, 'config')
+        config = storage.load(Config, WallyDB.config)
         report_profiles.default_format = opts.format
         report.default_format = opts.format
         stages.append(LoadStoredNodesStage())
@@ -327,6 +361,8 @@
         print("Subparser {!r} is not supported".format(opts.subparser_name))
         return 1
 
+    start_time = int(time.time())
+
     report_stages = []  # type: List[Stage]
     if not getattr(opts, "no_report", False):
         reporters = opts.reporters.split(",")
@@ -346,6 +382,9 @@
     ctx = TestRun(config, storage, WallyStorage(storage))
     ctx.rpc_code, ctx.default_rpc_plugins = get_rpc_server_code()
 
+    if 'dev_roles' in ctx.config:
+        ctx.devs_locator = ctx.config.dev_roles
+
     if opts.ssh_key_passwd is not None:
         set_ssh_key_passwd(opts.ssh_key_passwd)
     elif opts.ssh_key_passwd_kbd:
@@ -396,16 +435,30 @@
     ctx.storage.sync()
 
     logger.info("All info is stored into %r", config.storage_url)
+    end_time = int(time.time())
+    storage.put([start_time, end_time], WallyDB.run_interval)
 
     if failed or cleanup_failed:
         if opts.subparser_name == 'report':
             logger.error("Report generation failed. See error details in log above")
         else:
             logger.error("Tests are failed. See error details in log above")
-        return 1
+        code = 1
     else:
         if opts.subparser_name == 'report':
             logger.info("Report successfully generated")
         else:
             logger.info("Tests finished successfully")
-        return 0
+        code = 0
+
+    if opts.profile:
+        assert pr is not None
+        pr.disable()
+        import pstats
+        pstats.Stats(pr).sort_stats('tottime').print_stats(30)
+
+    if opts.subparser_name == 'test':
+        storage.put(code, WallyDB.res_code)
+
+    storage.sync()
+    return code
diff --git a/wally/report.py b/wally/report.py
index 9615461..f42c7c9 100644
--- a/wally/report.py
+++ b/wally/report.py
@@ -1,23 +1,30 @@
 import os
 import abc
 import logging
+import collections
 from collections import defaultdict
-from typing import Dict, Any, Iterator, Tuple, cast, List, Set, Optional, Union, Type
+from typing import Dict, Any, Iterator, Tuple, cast, List, Set, Optional, Union, Type, Iterable
 
 import numpy
+import scipy.stats
 from statsmodels.tsa.stattools import adfuller
 
 import xmlbuilder3
 
 import wally
 
+# import matplotlib
+# matplotlib.use('GTKAgg')
+
 from cephlib import html
 from cephlib.units import b2ssize, b2ssize_10, unit_conversion_coef, unit_conversion_coef_f
 from cephlib.statistic import calc_norm_stat_props
-from cephlib.storage_selectors import summ_sensors, find_sensors_to_2d
+from cephlib.storage_selectors import sum_sensors, find_sensors_to_2d, update_storage_selector, DevRoles
 from cephlib.wally_storage import find_nodes_by_roles
 from cephlib.plot import (plot_simple_bars, plot_hmap_from_2d, plot_lat_over_time, plot_simple_over_time,
-                          plot_histo_heatmap, plot_v_over_time, plot_hist)
+                          plot_histo_heatmap, plot_v_over_time, plot_hist, plot_dots_with_regression)
+from cephlib.numeric_types import ndarray2d
+from cephlib.node import NodeRole
 
 from .utils import STORAGE_ROLES
 from .stage import Stage, StepOrder
@@ -31,7 +38,8 @@
 from .report_profiles import (DefStyleProfile, DefColorProfile, StyleProfile, ColorProfile,
                               default_format, io_chart_format)
 from .plot import io_chart
-from .resources import ResourceNames, get_resources_usage, make_iosum, IOSummary, get_cluster_cpu_load
+from .resources import ResourceNames, get_resources_usage, make_iosum, get_cluster_cpu_load
+from .console_report import get_console_report_table, console_report_headers, console_report_align, Texttable
 
 
 logger = logging.getLogger("wally")
@@ -43,31 +51,6 @@
 DEBUG = False
 
 
-# ----------------  STRUCTS  -------------------------------------------------------------------------------------------
-
-
-# TODO: need to be revised, have to user StatProps fields instead
-class StoragePerfSummary:
-    def __init__(self) -> None:
-        self.direct_iops_r_max = 0  # type: int
-        self.direct_iops_w_max = 0  # type: int
-
-        # 64 used instead of 4k to faster feed caches
-        self.direct_iops_w64_max = 0  # type: int
-
-        self.rws4k_10ms = 0  # type: int
-        self.rws4k_30ms = 0  # type: int
-        self.rws4k_100ms = 0  # type: int
-        self.bw_write_max = 0  # type: int
-        self.bw_read_max = 0  # type: int
-
-        self.bw = None  # type: float
-        self.iops = None  # type: float
-        self.lat = None  # type: float
-        self.lat_50 = None  # type: float
-        self.lat_95 = None  # type: float
-
-
 # --------------  AGGREGATION AND STAT FUNCTIONS  ----------------------------------------------------------------------
 
 LEVEL_SENSORS = {("block-io", "io_queue"), ("system-cpu", "procs_blocked"), ("system-cpu", "procs_queue")}
@@ -124,23 +107,30 @@
 
 
 class Menu1st:
-    engineering = "Engineering"
     summary = "Summary"
     per_job = "Per Job"
+    engineering = "Engineering"
+    engineering_per_job = "Engineering per job"
+    order = [summary, per_job, engineering, engineering_per_job]
 
 
 class Menu2ndEng:
+    summary = "Summary"
     iops_time = "IOPS(time)"
     hist = "IOPS/lat overall histogram"
     lat_time = "Lat(time)"
+    resource_regression = "Resource usage LR"
+    order = [summary, iops_time, hist, lat_time, resource_regression]
 
 
 class Menu2ndSumm:
+    summary = "Summary"
     io_lat_qd = "IO & Lat vs QD"
-    cpu_usage_qd = "CPU usage"
+    resources_usage_qd = "Resource usage"
+    order = [summary, io_lat_qd, resources_usage_qd]
 
 
-menu_1st_order = [Menu1st.summary, Menu1st.engineering, Menu1st.per_job]
+menu_1st_order = [Menu1st.summary, Menu1st.engineering, Menu1st.per_job, Menu1st.engineering_per_job]
 
 
 #  --------------------  REPORTS  --------------------------------------------------------------------------------------
@@ -176,9 +166,120 @@
 #     """Creates graphs, which show how IOPS and Latency depend on block size"""
 #
 #
-# # Main performance report
-# class PerformanceSummary(SuiteReporter):
-#     """Aggregated summary fro storage"""
+
+
+class StoragePerfSummary:
+    iops_units = "KiBps"
+    bw_units = "Bps"
+    NO_VAL = -1
+
+    def __init__(self) -> None:
+        self.rw_iops_10ms = self.NO_VAL  # type: int
+        self.rw_iops_30ms = self.NO_VAL  # type: int
+        self.rw_iops_100ms = self.NO_VAL  # type: int
+
+        self.rr_iops_10ms = self.NO_VAL  # type: int
+        self.rr_iops_30ms = self.NO_VAL  # type: int
+        self.rr_iops_100ms = self.NO_VAL  # type: int
+
+        self.bw_write_max = self.NO_VAL  # type: int
+        self.bw_read_max = self.NO_VAL  # type: int
+
+        self.bw = None  # type: Optional[float]
+        self.read_iops = None  # type: Optional[float]
+        self.write_iops = None  # type: Optional[float]
+
+
+def get_performance_summary(storage: IWallyStorage, suite: SuiteConfig,
+                            hboxes: int, large_blocks: int) -> Tuple[StoragePerfSummary, StoragePerfSummary]:
+
+    psum95 = StoragePerfSummary()
+    psum50 = StoragePerfSummary()
+
+    for job in storage.iter_job(suite):
+        if isinstance(job, FioJobConfig):
+            fjob = cast(FioJobConfig, job)
+            io_sum = make_iosum(storage, suite, job, hboxes)
+
+            bw_avg = io_sum.bw.average * unit_conversion_coef(io_sum.bw.units, StoragePerfSummary.bw_units)
+
+            if fjob.bsize < large_blocks:
+                lat_95_ms = io_sum.lat.perc_95 * unit_conversion_coef(io_sum.lat.units, 'ms')
+                lat_50_ms = io_sum.lat.perc_50 * unit_conversion_coef(io_sum.lat.units, 'ms')
+
+                iops_avg = io_sum.bw.average * unit_conversion_coef(io_sum.bw.units, StoragePerfSummary.iops_units)
+                iops_avg /= fjob.bsize
+
+                if fjob.oper == 'randwrite' and fjob.sync_mode == 'd':
+                    for lat, field in [(10, 'rw_iops_10ms'), (30, 'rw_iops_30ms'), (100, 'rw_iops_100ms')]:
+                        if lat_95_ms <= lat:
+                            setattr(psum95, field, max(getattr(psum95, field), iops_avg))
+                        if lat_50_ms <= lat:
+                            setattr(psum50, field, max(getattr(psum50, field), iops_avg))
+
+                if fjob.oper == 'randread' and fjob.sync_mode == 'd':
+                    for lat, field in [(10, 'rr_iops_10ms'), (30, 'rr_iops_30ms'), (100, 'rr_iops_100ms')]:
+                        if lat_95_ms <= lat:
+                            setattr(psum95, field, max(getattr(psum95, field), iops_avg))
+                        if lat_50_ms <= lat:
+                            setattr(psum50, field, max(getattr(psum50, field), iops_avg))
+            elif fjob.sync_mode == 'd':
+                if fjob.oper in ('randwrite', 'write'):
+                    psum50.bw_write_max = max(psum50.bw_write_max, bw_avg)
+                elif fjob.oper in ('randread', 'read'):
+                    psum50.bw_read_max = max(psum50.bw_read_max, bw_avg)
+
+    return psum50, psum95
+
+
+# Main performance report
+class PerformanceSummary(SuiteReporter):
+    """Aggregated summary for storage"""
+    def get_divs(self, suite: SuiteConfig) -> Iterator[Tuple[str, str, HTMLBlock]]:
+        psum50, psum95 = get_performance_summary(self.rstorage, suite, self.style.hist_boxes, self.style.large_blocks)
+
+        caption = "Storage summary report"
+        res = html.H3(html.center(caption))
+
+        headers = ["Mode", "Stats", "Explanation"]
+        align = ['left', 'right', "left"]
+        data = []
+
+        if psum95.rr_iops_10ms != psum95.NO_VAL or psum95.rr_iops_30ms != psum95.NO_VAL or \
+                psum95.rr_iops_100ms != psum95.NO_VAL:
+            data.append("Average random read IOPS for small blocks")
+
+        if psum95.rr_iops_10ms != psum95.NO_VAL:
+            data.append(("Database", b2ssize_10(psum95.rr_iops_10ms), "Latency 95th percentile < 10ms"))
+        if psum95.rr_iops_30ms != psum95.NO_VAL:
+            data.append(("File system", b2ssize_10(psum95.rr_iops_30ms), "Latency 95th percentile < 30ms"))
+        if psum95.rr_iops_100ms != psum95.NO_VAL:
+            data.append(("File server", b2ssize_10(psum95.rr_iops_100ms), "Latency 95th percentile < 100ms"))
+
+        if psum95.rw_iops_10ms != psum95.NO_VAL or psum95.rw_iops_30ms != psum95.NO_VAL or \
+                psum95.rw_iops_100ms != psum95.NO_VAL:
+            data.append("Average random write IOPS for small blocks")
+
+        if psum95.rw_iops_10ms != psum95.NO_VAL:
+            data.append(("Database", b2ssize_10(psum95.rw_iops_10ms), "Latency 95th percentile < 10ms"))
+        if psum95.rw_iops_30ms != psum95.NO_VAL:
+            data.append(("File system", b2ssize_10(psum95.rw_iops_30ms), "Latency 95th percentile < 30ms"))
+        if psum95.rw_iops_100ms != psum95.NO_VAL:
+            data.append(("File server", b2ssize_10(psum95.rw_iops_100ms), "Latency 95th percentile < 100ms"))
+
+        if psum50.bw_write_max != psum50.NO_VAL or psum50.bw_read_max != psum50.NO_VAL:
+            data.append("Average sequention IO")
+
+        if psum50.bw_write_max != psum95.NO_VAL:
+            data.append(("Write", b2ssize(psum50.bw_write_max) + psum50.bw_units,
+                         "Large blocks (>={}KiB)".format(self.style.large_blocks)))
+        if psum50.bw_read_max != psum95.NO_VAL:
+            data.append(("Read", b2ssize(psum50.bw_read_max) + psum50.bw_units,
+                         "Large blocks (>={}KiB)".format(self.style.large_blocks)))
+
+        res += html.center(html.table("Performance", headers, data, align=align))
+        yield Menu1st.summary, Menu2ndSumm.summary, HTMLBlock(res)
+
 
 # # Node load over test time
 # class NodeLoad(SuiteReporter):
@@ -186,7 +287,6 @@
 
 # # Ceph operation breakout report
 # class CephClusterSummary(SuiteReporter):
-#     """IOPS/latency during test"""
 
 
 # Main performance report
@@ -204,15 +304,15 @@
             str_summary[fjob_no_qd] = (fjob_no_qd.summary, fjob_no_qd.long_summary)
             ts_map[fjob_no_qd].append((suite, fjob))
 
+        caption = "IOPS, bandwith, and latency as function of parallel IO request count (QD)"
+        yield Menu1st.summary, Menu2ndSumm.io_lat_qd, HTMLBlock(html.H3(html.center(caption)))
+
         for tpl, suites_jobs in ts_map.items():
             if len(suites_jobs) >= self.style.min_iops_vs_qd_jobs:
                 iosums = [make_iosum(self.rstorage, suite, job, self.style.hist_boxes) for suite, job in suites_jobs]
                 iosums.sort(key=lambda x: x.qd)
                 summary, summary_long = str_summary[tpl]
 
-                yield Menu1st.summary, Menu2ndSumm.io_lat_qd, \
-                    HTMLBlock(html.H2(html.center("IOPS, BW, Lat = func(QD). " + summary_long)))
-
                 ds = DataSource(suite_id=suite.storage_id,
                                 job_id=summary,
                                 node_id=AGG_TAG,
@@ -221,8 +321,8 @@
                                 metric="io_over_qd",
                                 tag=io_chart_format)
 
-                fpath = self.plt(io_chart, ds, title="", legend="IOPS/BW", iosums=iosums)
-                yield Menu1st.summary, Menu2ndSumm.io_lat_qd, HTMLBlock(html.center(html.img(fpath)))
+                fpath = self.plt(io_chart, ds, title=summary_long, legend="IOPS/BW", iosums=iosums)
+                yield Menu1st.summary, Menu2ndSumm.io_lat_qd, HTMLBlock(html.img(fpath))
 
 
 class ResourceQD(SuiteReporter):
@@ -239,6 +339,8 @@
             fjob_no_qd = cast(FioJobParams, fjob.params.copy(qd=None))
             qd_grouped_jobs.setdefault(fjob_no_qd, []).append(fjob)
 
+        yield Menu1st.summary, Menu2ndSumm.resources_usage_qd, HTMLBlock(html.center(html.H3("Resource usage summary")))
+
         for jc_no_qd, jobs in sorted(qd_grouped_jobs.items()):
             cpu_usage2qd = {}
             for job in jobs:
@@ -266,12 +368,94 @@
                             metric="cpu_for_iop",
                             tag=io_chart_format)
 
-            fpath = self.plt(plot_simple_bars, ds, jc_no_qd.long_summary, labels, vals, errs,
-                             xlabel="CPU core time per IOP", ylabel="QD * Test nodes" if test_nc != 1 else "QD",
+            title = "CPU time per IOP, " + jc_no_qd.long_summary
+            fpath = self.plt(plot_simple_bars, ds, title, labels, vals, errs,
+                             xlabel="CPU core time per IOP",
+                             ylabel="QD * Test nodes" if test_nc != 1 else "QD",
                              x_formatter=(lambda x, pos: b2ssize_10(x) + 's'),
                              one_point_zero_line=False)
 
-            yield Menu1st.summary, Menu2ndSumm.cpu_usage_qd, HTMLBlock(html.center(html.img(fpath)))
+            yield Menu1st.summary, Menu2ndSumm.resources_usage_qd, HTMLBlock(html.img(fpath))
+
+
+def get_resources_usage2(suite: SuiteConfig, job: JobConfig, rstorage: IWallyStorage,
+                         roles, sensor, metric, test_metric, agg_window: int = 5) -> ndarray2d:
+    assert test_metric == 'iops'
+    fjob = cast(FioJobConfig, job)
+    bw = get_aggregated(rstorage, suite.storage_id, job.storage_id, "bw", job.reliable_info_range_s)
+    io_transfered = bw.data * unit_conversion_coef_f(bw.units, "Bps")
+    ops_done = io_transfered / (fjob.bsize * unit_conversion_coef_f("KiBps", "Bps"))
+    nodes = [node for node in rstorage.load_nodes() if node.roles.intersection(STORAGE_ROLES)]
+
+    if sensor == 'system-cpu':
+        assert metric == 'used'
+        core_count = None
+        for node in nodes:
+            if core_count is None:
+                core_count = sum(cores for _, cores in node.hw_info.cpus)
+            else:
+                assert core_count == sum(cores for _, cores in node.hw_info.cpus)
+        cpu_ts = get_cluster_cpu_load(rstorage, roles, job.reliable_info_range_s)
+        metric_data = (1.0 - (cpu_ts['idle'].data + cpu_ts['iowait'].data) / cpu_ts['total'].data) * core_count
+    else:
+        metric_data = sum_sensors(rstorage, job.reliable_info_range_s,
+                                  node_id=[node.node_id for node in nodes], sensor=sensor, metric=metric)
+
+    res = []
+    for pos in range(0, len(ops_done) - agg_window, agg_window):
+        pe = pos + agg_window
+        res.append((numpy.average(ops_done[pos: pe]), numpy.average(metric_data.data[pos: pe])))
+
+    return res
+
+
+class ResourceConsumptionSummary(SuiteReporter):
+    suite_types = {'fio'}
+
+    def get_divs(self, suite: SuiteConfig) -> Iterator[Tuple[str, str, HTMLBlock]]:
+        vs = 'iops'
+        for job_tp in ('rwd4', 'rrd4'):
+            for sensor_metric in ('net-io.send_packets', 'system-cpu.used'):
+                sensor, metric = sensor_metric.split(".")
+                usage = []
+                for job in self.rstorage.iter_job(suite):
+                    if job_tp in job.summary:
+                        usage.extend(get_resources_usage2(suite, job, self.rstorage, STORAGE_ROLES,
+                                                          sensor=sensor, metric=metric, test_metric=vs))
+
+                if not usage:
+                    continue
+
+                iops, cpu = zip(*usage)
+                slope, intercept, r_value, p_value, std_err = scipy.stats.linregress(iops, cpu)
+                x = numpy.array([0.0, max(iops) * 1.1])
+
+                ds = DataSource(suite_id=suite.storage_id,
+                                job_id=job_tp,
+                                node_id="storage",
+                                sensor='usage-regression',
+                                dev=AGG_TAG,
+                                metric=sensor_metric + '.VS.' + vs,
+                                tag=default_format)
+
+                fname = self.plt(plot_dots_with_regression, ds,
+                                 "{}::{}.{}".format(job_tp, sensor_metric, vs),
+                                 x=iops, y=cpu,
+                                 xlabel=vs,
+                                 ylabel=sensor_metric,
+                                 x_approx=x, y_approx=intercept + slope * x)
+
+                yield Menu1st.engineering, Menu2ndEng.resource_regression, HTMLBlock(html.img(fname))
+
+
+class EngineeringSummary(SuiteReporter):
+    suite_types = {'fio'}
+
+    def get_divs(self, suite: SuiteConfig) -> Iterator[Tuple[str, str, HTMLBlock]]:
+        tbl = [line for line in get_console_report_table(suite, self.rstorage) if line is not Texttable.HLINE]
+        align = [{'l': 'left', 'r': 'right'}[al] for al in console_report_align]
+        res = html.center(html.table("Test results", console_report_headers, tbl, align=align))
+        yield Menu1st.engineering, Menu2ndEng.summary, HTMLBlock(res)
 
 
 class StatInfo(JobReporter):
@@ -288,7 +472,7 @@
         if test_nc > 1:
             caption += " * {} nodes".format(test_nc)
 
-        res = html.H2(html.center(caption))
+        res = html.H3(html.center(caption))
         stat_data_headers = ["Name",
                              "Total done",
                              "Average ~ Dev",
@@ -360,7 +544,7 @@
             # sensor usage
             stat_data.extend([iops_data, lat_data])
 
-        res += html.center(html.table("Load stats info", stat_data_headers, stat_data, align=align))
+        res += html.center(html.table("Test results", stat_data_headers, stat_data, align=align))
         yield Menu1st.per_job, job.summary, HTMLBlock(res)
 
 
@@ -404,7 +588,7 @@
                     table_structure2.append((line[1],))
             table_structure = table_structure2
 
-        yield Menu1st.per_job, job.summary, HTMLBlock(html.H2(html.center("Resources usage")))
+        yield Menu1st.per_job, job.summary, HTMLBlock(html.H3(html.center("Resources usage")))
 
         doc = xmlbuilder3.XMLBuilder("table",
                                      **{"class": "table table-bordered table-striped table-condensed table-hover",
@@ -498,7 +682,7 @@
             pairs.append(('Net packets per IOP', net_pkt_names))
 
         yield Menu1st.per_job, job.summary, \
-            HTMLBlock(html.H2(html.center("Resource consumption per service provided")))
+            HTMLBlock(html.H3(html.center("Resource consumption per service provided")))
 
         for tp, names in pairs:
             vals = []  # type: List[float]
@@ -548,7 +732,7 @@
         for node_id in nodes:
             bn = 0
             tot = 0
-            for _, ds in self.rstorage.iter_sensors(node_id=node_id, sensor=sensor, metric=metric):
+            for ds in self.rstorage.iter_sensors(node_id=node_id, sensor=sensor, metric=metric):
                 if ds.dev in ('sdb', 'sdc', 'sdd', 'sde'):
                     ts = self.rstorage.get_sensor(ds, job.reliable_info_range_s)
                     bn += (ts.data > bn_val).sum()
@@ -580,70 +764,116 @@
             yield Menu1st.per_job, job.summary, HTMLBlock(html.img(fname))
 
 
+class DevRoles:
+    client_disk = 'client_disk'
+    client_net = 'client_net'
+    client_cpu = 'client_cpu'
+
+    storage_disk = 'storage_disk'
+    storage_client_net = 'storage_client_net'
+    storage_replication_net = 'storage_replication_net'
+    storage_cpu = 'storage_disk'
+    ceph_storage = 'ceph_storage'
+    ceph_journal = 'ceph_journal'
+
+    compute_disk = 'compute_disk'
+    compute_net = 'compute_net'
+    compute_cpu = 'compute_cpu'
+
+
+def roles_for_sensors(storage: IWallyStorage) -> Dict[str, List[DataSource]]:
+    role2ds = defaultdict(list)
+
+    for node in storage.load_nodes():
+        ds = DataSource(node_id=node.node_id)
+        if 'ceph-osd' in node.roles:
+            for jdev in node.params.get('ceph_journal_devs', []):
+                role2ds[DevRoles.ceph_journal].append(ds(dev=jdev))
+                role2ds[DevRoles.storage_disk].append(ds(dev=jdev))
+
+            for sdev in node.params.get('ceph_storage_devs', []):
+                role2ds[DevRoles.ceph_storage].append(ds(dev=sdev))
+                role2ds[DevRoles.storage_disk].append(ds(dev=sdev))
+
+            if node.hw_info:
+                for dev in node.hw_info.disks_info:
+                    role2ds[DevRoles.storage_disk].append(ds(dev=dev))
+
+        if 'testnode' in node.roles:
+            role2ds[DevRoles.client_disk].append(ds(dev='rbd0'))
+
+    return role2ds
+
+
+def get_sources_for_roles(roles: Iterable[str]) -> List[DataSource]:
+    return []
+
+
 # IO time and QD
 class QDIOTimeHeatmap(JobReporter):
     def get_divs(self, suite: SuiteConfig, job: JobConfig) -> Iterator[Tuple[str, str, HTMLBlock]]:
 
-        # TODO: fix this hardcode, need to track what devices are actually used on test and storage nodes
-        # use saved storage info in nodes
-
-        journal_devs = None
-        storage_devs = None
-        test_nodes_devs = ['rbd0']
-
-        for node in self.rstorage.load_nodes():
-            if node.roles.intersection(STORAGE_ROLES):
-                cjd = set(node.params['ceph_journal_devs'])
-                if journal_devs is None:
-                    journal_devs = cjd
-                else:
-                    assert journal_devs == cjd, "{!r} != {!r}".format(journal_devs, cjd)
-
-                csd = set(node.params['ceph_storage_devs'])
-                if storage_devs is None:
-                    storage_devs = csd
-                else:
-                    assert storage_devs == csd, "{!r} != {!r}".format(storage_devs, csd)
+        # journal_devs = None
+        # storage_devs = None
+        # test_nodes_devs = ['rbd0']
+        #
+        # for node in self.rstorage.load_nodes():
+        #     if node.roles.intersection(STORAGE_ROLES):
+        #         cjd = set(node.params['ceph_journal_devs'])
+        #         if journal_devs is None:
+        #             journal_devs = cjd
+        #         else:
+        #             assert journal_devs == cjd, "{!r} != {!r}".format(journal_devs, cjd)
+        #
+        #         csd = set(node.params['ceph_storage_devs'])
+        #         if storage_devs is None:
+        #             storage_devs = csd
+        #         else:
+        #             assert storage_devs == csd, "{!r} != {!r}".format(storage_devs, csd)
+        #
 
         trange = (job.reliable_info_range[0] // 1000, job.reliable_info_range[1] // 1000)
+        test_nc = len(list(find_nodes_by_roles(self.rstorage.storage, ['testnode'])))
 
-        for name, devs, roles in [('storage', storage_devs, STORAGE_ROLES),
-                                  ('journal', journal_devs, STORAGE_ROLES),
-                                  ('test', test_nodes_devs, ['testnode'])]:
+        for dev_role in (DevRoles.ceph_storage, DevRoles.ceph_journal, DevRoles.client_disk):
 
-            yield Menu1st.per_job, job.summary, \
-                HTMLBlock(html.H2(html.center("{} IO heatmaps".format(name.capitalize()))))
+            caption = "{} IO heatmaps - {}".format(dev_role.capitalize(), cast(FioJobParams, job).params.long_summary)
+            if test_nc != 1:
+                caption += " * {} nodes".format(test_nc)
+
+            yield Menu1st.engineering_per_job, job.summary, HTMLBlock(html.H3(html.center(caption)))
 
             # QD heatmap
-            nodes = find_nodes_by_roles(self.rstorage.storage, roles)
-            ioq2d = find_sensors_to_2d(self.rstorage, trange, sensor='block-io', dev=devs,
-                                       node_id=nodes, metric='io_queue')
+            # nodes = find_nodes_by_roles(self.rstorage.storage, roles)
 
-            ds = DataSource(suite.storage_id, job.storage_id, AGG_TAG, 'block-io', name, tag="hmap." + default_format)
+            ioq2d = find_sensors_to_2d(self.rstorage, trange, dev_role=dev_role, sensor='block-io', metric='io_queue')
+
+            ds = DataSource(suite.storage_id, job.storage_id, AGG_TAG, 'block-io', dev_role,
+                            tag="hmap." + default_format)
 
             fname = self.plt(plot_hmap_from_2d, ds(metric='io_queue'), data2d=ioq2d, xlabel='Time', ylabel="IO QD",
-                             title=name.capitalize() + " devs QD", bins=StyleProfile.qd_bins)
-            yield Menu1st.per_job, job.summary, HTMLBlock(html.img(fname))
+                             title=dev_role.capitalize() + " devs QD", bins=StyleProfile.qd_bins)
+            yield Menu1st.engineering_per_job, job.summary, HTMLBlock(html.img(fname))
 
             # Block size heatmap
-            wc2d = find_sensors_to_2d(self.rstorage, trange, node_id=nodes, sensor='block-io', dev=devs,
+            wc2d = find_sensors_to_2d(self.rstorage, trange, dev_role=dev_role, sensor='block-io',
                                       metric='writes_completed')
             wc2d[wc2d < 1E-3] = 1
-            sw2d = find_sensors_to_2d(self.rstorage, trange, node_id=nodes, sensor='block-io', dev=devs,
+            sw2d = find_sensors_to_2d(self.rstorage, trange, dev_role=dev_role, sensor='block-io',
                                       metric='sectors_written')
             data2d = sw2d / wc2d / 1024
             fname = self.plt(plot_hmap_from_2d, ds(metric='wr_block_size'),
-                             data2d=data2d, title=name.capitalize() + " write block size",
+                             data2d=data2d, title=dev_role.capitalize() + " write block size",
                              ylabel="IO bsize, KiB", xlabel='Time', bins=StyleProfile.block_size_bins)
-            yield Menu1st.per_job, job.summary, HTMLBlock(html.img(fname))
+            yield Menu1st.engineering_per_job, job.summary, HTMLBlock(html.img(fname))
 
             # iotime heatmap
-            wtime2d = find_sensors_to_2d(self.rstorage, trange, node_id=nodes, sensor='block-io', dev=devs,
+            wtime2d = find_sensors_to_2d(self.rstorage, trange, dev_role=dev_role, sensor='block-io',
                                          metric='io_time')
             fname = self.plt(plot_hmap_from_2d, ds(metric='io_time'), data2d=wtime2d,
                              xlabel='Time', ylabel="IO time (ms) per second",
-                             title=name.capitalize() + " iotime", bins=StyleProfile.iotime_bins)
-            yield Menu1st.per_job, job.summary, HTMLBlock(html.img(fname))
+                             title=dev_role.capitalize() + " iotime", bins=StyleProfile.iotime_bins)
+            yield Menu1st.engineering_per_job, job.summary, HTMLBlock(html.img(fname))
 
 
 # IOPS/latency over test time for each job
@@ -652,14 +882,16 @@
     suite_types = {'fio'}
 
     def get_divs(self, suite: SuiteConfig, job: JobConfig) -> Iterator[Tuple[str, str, HTMLBlock]]:
-
         fjob = cast(FioJobConfig, job)
 
-        yield Menu1st.per_job, job.summary, HTMLBlock(html.H2(html.center("Load tool results")))
+        # caption = "Load tool results, " + job.params.long_summary
+        caption = "Load tool results"
+        yield Menu1st.per_job, job.summary, HTMLBlock(html.H3(html.center(caption)))
 
         agg_io = get_aggregated(self.rstorage, suite.storage_id, fjob.storage_id, "bw", job.reliable_info_range_s)
+
         if fjob.bsize >= DefStyleProfile.large_blocks:
-            title = "Fio measured Bandwidth over time"
+            title = "Fio measured bandwidth over time"
             units = "MiBps"
             agg_io.data //= int(unit_conversion_coef_f(units, agg_io.units))
         else:
@@ -670,6 +902,11 @@
         fpath = self.plt(plot_v_over_time, agg_io.source(tag='ts.' + default_format), title, units, agg_io)
         yield Menu1st.per_job, fjob.summary, HTMLBlock(html.img(fpath))
 
+        title = "BW distribution" if fjob.bsize >= DefStyleProfile.large_blocks else "IOPS distribution"
+        io_stat_prop = calc_norm_stat_props(agg_io, bins_count=StyleProfile.hist_boxes)
+        fpath = self.plt(plot_hist, agg_io.source(tag='hist.' + default_format), title, units, io_stat_prop)
+        yield Menu1st.per_job, fjob.summary, HTMLBlock(html.img(fpath))
+
         if fjob.bsize < DefStyleProfile.large_blocks:
             agg_lat = get_aggregated(self.rstorage, suite.storage_id, fjob.storage_id, "lat",
                                      job.reliable_info_range_s)
@@ -687,23 +924,6 @@
 
             yield Menu1st.per_job, fjob.summary, HTMLBlock(html.img(fpath))
 
-        fjob = cast(FioJobConfig, job)
-
-        agg_io = get_aggregated(self.rstorage, suite.storage_id, fjob.storage_id, "bw", job.reliable_info_range_s)
-
-        if fjob.bsize >= DefStyleProfile.large_blocks:
-            title = "BW distribution"
-            units = "MiBps"
-            agg_io.data //= int(unit_conversion_coef_f(units, agg_io.units))
-        else:
-            title = "IOPS distribution"
-            agg_io.data //= (int(unit_conversion_coef_f("KiBps", agg_io.units)) * fjob.bsize)
-            units = "IOPS"
-
-        io_stat_prop = calc_norm_stat_props(agg_io, bins_count=StyleProfile.hist_boxes)
-        fpath = self.plt(plot_hist, agg_io.source(tag='hist.' + default_format), title, units, io_stat_prop)
-        yield Menu1st.per_job, fjob.summary, HTMLBlock(html.img(fpath))
-
 
 # Cluster load over test time
 class ClusterLoad(JobReporter):
@@ -719,15 +939,14 @@
 
     def get_divs(self, suite: SuiteConfig, job: JobConfig) -> Iterator[Tuple[str, str, HTMLBlock]]:
 
-        yield Menu1st.per_job, job.summary, HTMLBlock(html.H2(html.center("Cluster load")))
+        yield Menu1st.per_job, job.summary, HTMLBlock(html.H3(html.center("Cluster load")))
 
         sensors = []
         max_iop = 0
         max_bytes = 0
         stor_nodes = find_nodes_by_roles(self.rstorage.storage, STORAGE_ROLES)
         for sensor, metric, op, units in self.storage_sensors:
-            ts = summ_sensors(self.rstorage, job.reliable_info_range_s, node_id=stor_nodes, sensor=sensor,
-                              metric=metric)
+            ts = sum_sensors(self.rstorage, job.reliable_info_range_s, node_id=stor_nodes, sensor=sensor, metric=metric)
             if ts is not None:
                 ds = DataSource(suite_id=suite.storage_id,
                                 job_id=job.storage_id,
@@ -758,22 +977,60 @@
                 fpath = self.plt(plot_v_over_time, ds, title, units, ts=ts)
                 yield Menu1st.per_job, job.summary, HTMLBlock(html.img(fpath))
             else:
-                logger.info("Hide '%s' plot for %s, as it's cum load is less then %s%%",
+                logger.info("Hide '%s' plot for %s, as it's load is less then %s%% from maximum",
                             title, job.summary, int(DefStyleProfile.min_load_diff * 100))
 
 
 # ------------------------------------------  REPORT STAGES  -----------------------------------------------------------
 
 
+def add_devroles(ctx: TestRun):
+    # TODO: need to detect all devices for node on this stage using hw info
+    detected_selectors = collections.defaultdict(
+        lambda: collections.defaultdict(list))  # type: Dict[str, Dict[str, List[str]]]
+
+    for node in ctx.nodes:
+        if NodeRole.osd in node.info.roles:
+            all_devs = set()
+
+            jdevs = node.info.params.get('ceph_journal_devs')
+            if jdevs:
+                all_devs.update(jdevs)
+                detected_selectors[node.info.hostname]["|".join(jdevs)].append(DevRoles.osd_journal)
+
+            sdevs = node.info.params.get('ceph_storage_devs')
+            if sdevs:
+                all_devs.update(sdevs)
+                detected_selectors[node.info.hostname]["|".join(sdevs)].append(DevRoles.osd_storage)
+
+            if all_devs:
+                detected_selectors[node.info.hostname]["|".join(all_devs)].append(DevRoles.storage_block)
+
+    for hostname, dev_rules in detected_selectors.items():
+        dev_locs = []  # type: List[Dict[str, List[str]]]
+        ctx.devs_locator.append({hostname: dev_locs})
+        for dev_names, roles in dev_rules.items():
+            dev_locs.append({dev_names: roles})
+
+
 class HtmlReportStage(Stage):
     priority = StepOrder.REPORT
 
     def run(self, ctx: TestRun) -> None:
-        job_reporters_cls = [StatInfo, Resources, LoadToolResults, ClusterLoad, CPULoadPlot, QDIOTimeHeatmap]
+        nodes = ctx.rstorage.load_nodes()
+        update_storage_selector(ctx.rstorage, ctx.devs_locator, nodes)
+
+        job_reporters_cls = [StatInfo, LoadToolResults, Resources, ClusterLoad, CPULoadPlot, QDIOTimeHeatmap]
+        # job_reporters_cls = [QDIOTimeHeatmap]
         job_reporters = [rcls(ctx.rstorage, DefStyleProfile, DefColorProfile)
                          for rcls in job_reporters_cls] # type: ignore
 
-        suite_reporters_cls = [IOQD, ResourceQD]  # type: List[Type[SuiteReporter]]
+        suite_reporters_cls = [IOQD,
+                               ResourceQD,
+                               PerformanceSummary,
+                               EngineeringSummary,
+                               ResourceConsumptionSummary]  # type: List[Type[SuiteReporter]]
+        # suite_reporters_cls = []  # type: List[Type[SuiteReporter]]
         suite_reporters = [rcls(ctx.rstorage, DefStyleProfile, DefColorProfile)
                            for rcls in suite_reporters_cls]  # type: ignore
 
@@ -823,7 +1080,7 @@
                     for block, item, html in sreporter.get_divs(suite):
                         items[block][item].append(html)
                 except Exception:
-                    logger.exception("Failed to generate report for suite %s", suite)
+                    logger.exception("Failed to generate report for suite %s", suite.storage_id)
 
             if DEBUG:
                 break
@@ -837,10 +1094,16 @@
             )
             menu_block.append('<div class="collapse" id="item{}">'.format(idx_1st))
 
-            if menu_1st == Menu1st.per_job:
-                in_order = sorted(items[menu_1st], key=job_summ_sort_order.index)
+            if menu_1st in (Menu1st.per_job, Menu1st.engineering_per_job):
+                key = job_summ_sort_order.index
+            elif menu_1st == Menu1st.engineering:
+                key = Menu2ndEng.order.index
+            elif menu_1st == Menu1st.summary:
+                key = Menu2ndSumm.order.index
             else:
-                in_order = sorted(items[menu_1st])
+                key = lambda x: x
+
+            in_order = sorted(items[menu_1st], key=key)
 
             for menu_2nd in in_order:
                 menu_block.append('    <a href="#content{}" class="nav-group-item">{}</a>'
diff --git a/wally/report_profiles.py b/wally/report_profiles.py
index 920c320..e83907e 100644
--- a/wally/report_profiles.py
+++ b/wally/report_profiles.py
@@ -9,6 +9,7 @@
     suppl_color3 = 'orange'
     box_color = 'y'
     err_color = 'red'
+    super_outlier_color = 'orange'
 
     noise_alpha = 0.3
     subinfo_alpha = 0.7
@@ -43,6 +44,9 @@
 
     point_shape = 'o'
     err_point_shape = '*'
+    max_hidden_outliers_fraction = 0.05
+    super_outlier_point_shape_up = '^'
+    super_outlier_point_shape_down = 'v'
 
     avg_range = 20
     approx_average = True
@@ -71,9 +75,9 @@
     # heatmap_interpolation_points = 300
 
     heatmap_colorbar = False
-    outliers_q_nd = 3.0
-    outliers_hide_q_nd = 4.0
-    outliers_lat = (0.01, 0.9)
+    outliers_q_nd = 3
+    outliers_hide_q_nd = 4
+    outliers_lat = (0.01, 0.95)
 
     violin_instead_of_box = True
     violin_point_count = 30000
diff --git a/wally/resources.py b/wally/resources.py
index dca9dcc..a08993d 100644
--- a/wally/resources.py
+++ b/wally/resources.py
@@ -7,9 +7,8 @@
 from cephlib.units import b2ssize_10, b2ssize, unit_conversion_coef_f
 from cephlib.statistic import NormStatProps, HistoStatProps, calc_norm_stat_props, calc_histo_stat_props
 from cephlib.numeric_types import TimeSeries
-from cephlib.wally_storage import find_nodes_by_roles
-from cephlib.storage_selectors import summ_sensors
-from cephlib.node import HWInfo
+from cephlib.wally_storage import find_nodes_by_roles, WallyDB
+from cephlib.storage_selectors import sum_sensors
 
 from .result_classes import IWallyStorage, SuiteConfig
 from .utils import STORAGE_ROLES
@@ -120,8 +119,13 @@
     cpu_ts = {}
     cpu_metrics = "idle guest iowait sirq nice irq steal sys user".split()
     nodes = find_nodes_by_roles(rstorage.storage, roles)
+
+    cores_per_node = {}
+    for node in rstorage.load_nodes():
+        cores_per_node[node.node_id] = sum(cores for _, cores in node.hw_info.cpus)
+
     for name in cpu_metrics:
-        cpu_ts[name] = summ_sensors(rstorage, time_range, node_id=nodes, sensor='system-cpu', metric=name)
+        cpu_ts[name] = sum_sensors(rstorage, time_range, node_id=nodes, sensor='system-cpu', metric=name)
 
     it = iter(cpu_ts.values())
     total_over_time = next(it).data.copy()  # type: numpy.ndarray
@@ -148,7 +152,7 @@
 
     records = {}  # type: Dict[str, Tuple[str, float, float]]
     if not nc:
-        records = rstorage.get_job_info(suite, job, "resource_usage")
+        records = rstorage.get_job_info(suite, job, WallyDB.resource_usage_rel)
         if records is not None:
             records = records.copy()
             iops_ok = records.pop('iops_ok')
@@ -201,7 +205,7 @@
             continue
 
         nodes = find_nodes_by_roles(rstorage.storage, roles)
-        res_ts = summ_sensors(rstorage, job.reliable_info_range_s, node_id=nodes, sensor=sensor, metric=metric)
+        res_ts = sum_sensors(rstorage, job.reliable_info_range_s, node_id=nodes, sensor=sensor, metric=metric)
         if res_ts is None:
             continue
 
@@ -216,17 +220,15 @@
         all_agg[vname] = data
 
     # cpu usage
-    stor_cores_count = 0
-    all_stor_nodes = list(find_nodes_by_roles(rstorage.storage, STORAGE_ROLES))
-    for node in all_stor_nodes:
-        try:
-            node_hw_info = rstorage.storage.load(HWInfo, 'hw_info', node)
-        except KeyError:
-            logger.warning("No hw_info available for node %s. Using 'NODE time' instead of " +
-                           "CPU core time for CPU consumption metrics", node)
-            stor_cores_count = len(all_stor_nodes)
-            break
-        stor_cores_count += sum(cores for _, cores in node_hw_info.cores)
+    stor_cores_count = None
+    for node in rstorage.load_nodes():
+        if node.roles.intersection(STORAGE_ROLES):
+            if stor_cores_count is None:
+                stor_cores_count = sum(cores for _, cores in node.hw_info.cpus)
+            else:
+                assert stor_cores_count == sum(cores for _, cores in node.hw_info.cpus)
+
+    assert stor_cores_count != 0
 
     cpu_ts = get_cluster_cpu_load(rstorage, STORAGE_ROLES, job.reliable_info_range_s)
     cpus_used_sec = (1.0 - (cpu_ts['idle'].data + cpu_ts['iowait'].data) / cpu_ts['total'].data) * stor_cores_count
@@ -275,6 +277,6 @@
 
         srecords = records.copy()
         srecords['iops_ok'] = iops_ok
-        rstorage.put_job_info(suite, job, "resource_usage", srecords)
+        rstorage.put_job_info(suite, job, WallyDB.resource_usage_rel, srecords)
 
     return records, iops_ok
diff --git a/wally/result_classes.py b/wally/result_classes.py
index d1fd104..9d59d42 100644
--- a/wally/result_classes.py
+++ b/wally/result_classes.py
@@ -76,6 +76,10 @@
 class IWallyStorage(ISensorStorage, IImagesStorage, metaclass=abc.ABCMeta):
 
     @abc.abstractmethod
+    def flush(self) -> None:
+        pass
+
+    @abc.abstractmethod
     def put_or_check_suite(self, suite: SuiteConfig) -> None:
         pass
 
diff --git a/wally/result_storage.py b/wally/result_storage.py
index 70c46d7..de2f86d 100644
--- a/wally/result_storage.py
+++ b/wally/result_storage.py
@@ -7,7 +7,7 @@
 import numpy
 
 from cephlib.wally_storage import WallyDB
-from cephlib.sensor_storage import SensorStorageBase
+from cephlib.sensor_storage import SensorStorage
 from cephlib.statistic import StatProps
 from cephlib.numeric_types import DataSource, TimeSeries
 from cephlib.node import NodeInfo
@@ -29,10 +29,12 @@
     return path
 
 
-class WallyStorage(IWallyStorage, SensorStorageBase):
+class WallyStorage(IWallyStorage, SensorStorage):
     def __init__(self, storage: Storage) -> None:
-        SensorStorageBase.__init__(self, storage, WallyDB)
-        self.cached_nodes = None
+        SensorStorage.__init__(self, storage, WallyDB)
+
+    def flush(self) -> None:
+        self.storage.flush()
 
     # -------------  CHECK DATA IN STORAGE  ----------------------------------------------------------------------------
     def check_plot_file(self, source: DataSource) -> Optional[str]:
@@ -76,7 +78,7 @@
         self.storage.put_raw(report.encode('utf8'), path)
 
     def put_job_info(self, suite: SuiteConfig, job: JobConfig, key: str, data: Any) -> None:
-        path = self.db_paths.job_extra.format(suite_id=suite.storage_id, job_id=job.storage_id, tag=key)
+        path = self.db_paths.job_extra.format(suite_id=suite.storage_id, job_id=job.storage_id, name=key)
         if isinstance(data, bytes):
             self.storage.put_raw(data, path)
         else:
@@ -94,7 +96,7 @@
         return None
 
     def get_job_info(self, suite: SuiteConfig, job: JobConfig, key: str) -> Any:
-        path = self.db_paths.job_extra.format(suite_id=suite.storage_id, job_id=job.storage_id, tag=key)
+        path = self.db_paths.job_extra.format(suite_id=suite.storage_id, job_id=job.storage_id, name=key)
         return self.storage.get(path, None)
     # -------------   ITER OVER STORAGE   ------------------------------------------------------------------------------
 
@@ -102,7 +104,6 @@
         for is_file, suite_info_path, groups in self.iter_paths(self.db_paths.suite_cfg_r):
             assert is_file
             suite = self.storage.load(SuiteConfig, suite_info_path)
-            # suite = cast(SuiteConfig, self.storage.load(SuiteConfig, suite_info_path))
             assert suite.storage_id == groups['suite_id']
             if not suite_type or suite.test_type == suite_type:
                 yield suite
@@ -117,13 +118,16 @@
             yield job
 
     def load_nodes(self) -> List[NodeInfo]:
-        if not self.cached_nodes:
-            self.cached_nodes = self.storage.load_list(NodeInfo, WallyDB.all_nodes)
+        try:
+            return self.storage.other_caches['wally']['nodes']
+        except KeyError:
+            nodes = self.storage.load_list(NodeInfo, WallyDB.all_nodes)
             if WallyDB.nodes_params in self.storage:
                 params = json.loads(self.storage.get_raw(WallyDB.nodes_params).decode('utf8'))
-                for node in self.cached_nodes:
+                for node in nodes:
                     node.params = params.get(node.node_id, {})
-        return self.cached_nodes
+            self.storage.other_caches['wally']['nodes'] = nodes
+            return nodes
 
     #  -----------------  TS  ------------------------------------------------------------------------------------------
     def get_ts(self, ds: DataSource) -> TimeSeries:
@@ -160,5 +164,5 @@
         result = numpy.concatenate((tv, dv), axis=1)
         self.storage.put_array(csv_path, result, header, header2=ts.histo_bins, append_on_exists=False)
 
-    def iter_ts(self, **ds_parts) -> Iterator[DataSource]:
-        return self.iter_objs(self.db_paths.ts_r, **ds_parts)
\ No newline at end of file
+    def iter_ts(self, **ds_parts: str) -> Iterator[DataSource]:
+        return self.iter_objs(self.db_paths.ts_r, **ds_parts)
diff --git a/wally/run_test.py b/wally/run_test.py
index 6fbefc4..578a65b 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -2,11 +2,10 @@
 import json
 import copy
 import logging
-from concurrent.futures import Future
-from typing import List, Dict, Tuple, Optional, Union, cast
+from typing import List, Tuple, Optional, Union, cast
 
 from cephlib.wally_storage import WallyDB
-from cephlib.node import NodeInfo, IRPCNode, get_hw_info, get_sw_info
+from cephlib.node import NodeInfo, IRPCNode, get_hw_info, get_sw_info, get_hostname
 from cephlib.ssh import parse_ssh_uri
 from cephlib.node_impl import setup_rpc, connect
 
@@ -80,7 +79,7 @@
                 delta = 0
                 if val > t_end:
                     delta = val - t_end
-                elif t_start > val:
+                elif val < t_start:
                     delta = t_start - val
 
                 if delta > ctx.config.max_time_diff_ms:
@@ -91,9 +90,9 @@
                                                                                ctx.config.max_time_diff_ms)
                     logger.error(msg)
                     raise utils.StopTestError(msg)
-                if delta > 0:
-                    logger.warning("Node %s has time shift at least %s ms", node, delta)
 
+                if delta > 1:
+                    logger.warning("Node %s has time shift at least %s ms", node, int(delta))
 
     def cleanup(self, ctx: TestRun) -> None:
         if ctx.config.get("download_rpc_logs", False):
@@ -123,43 +122,25 @@
 class CollectInfoStage(Stage):
     """Collect node info"""
 
-    priority = StepOrder.START_SENSORS - 2
+    priority = StepOrder.UPDATE_NODES_INFO
     config_block = 'collect_info'
 
     def run(self, ctx: TestRun) -> None:
-        if not ctx.config.collect_info:
-            return
-
-        futures = {}  # type: Dict[Tuple[str, str], Future]
-
         with ctx.get_pool() as pool:
-            # can't make next RPC request until finish with previous
-            for node in ctx.nodes:
-                nid = node.node_id
-                hw_info_path = WallyDB.hw_info.format(node_id=nid)
-                if hw_info_path not in ctx.storage:
-                    futures[(hw_info_path, nid)] = pool.submit(get_hw_info, node)
+            try:
+                # can't make next RPC request until finish with previous for same node
+                for node, hw_info in zip(ctx.nodes, pool.map(get_hw_info, ctx.nodes)):
+                    node.info.hw_info = hw_info
+                for node, sw_info in zip(ctx.nodes, pool.map(get_sw_info, ctx.nodes)):
+                    node.info.sw_info = sw_info
+            except Exception as exc:
+                logger.exception("During collecting cluster info")
+                raise utils.StopTestError() from exc
 
-            for (path, nid), future in futures.items():
-                try:
-                    ctx.storage.put(future.result(), path)
-                except Exception:
-                    logger.exception("During collecting hardware info from %s", nid)
-                    raise utils.StopTestError()
-
-            futures.clear()
-            for node in ctx.nodes:
-                nid = node.node_id
-                sw_info_path = WallyDB.sw_info.format(node_id=nid)
-                if sw_info_path not in ctx.storage:
-                    futures[(sw_info_path, nid)] = pool.submit(get_sw_info, node)
-
-            for (path, nid), future in futures.items():
-                try:
-                    ctx.storage.put(future.result(), path)
-                except Exception:
-                    logger.exception("During collecting software info from %s", nid)
-                    raise utils.StopTestError()
+            logger.debug("Collecting hostnames")
+            hostnames = pool.map(get_hostname, ctx.nodes)
+            for node, hostname in zip(ctx.nodes, hostnames):
+                node.info.hostname = hostname
 
 
 class ExplicitNodesStage(Stage):
diff --git a/wally/suits/io/fio_job.py b/wally/suits/io/fio_job.py
index 3d4e886..6676895 100644
--- a/wally/suits/io/fio_job.py
+++ b/wally/suits/io/fio_job.py
@@ -116,11 +116,12 @@
 
     @property
     def oper(self) -> str:
-        return self.vals['rw']
+        vl = self.vals['rw']
+        return vl if ':' not in vl else vl.split(":")[0]
 
     @property
     def op_type_short(self) -> str:
-        return self.op_type2short[self.vals['rw']]
+        return self.op_type2short[self.oper]
 
     @property
     def thcount(self) -> int:
diff --git a/wally/suits/io/fio_task_parser.py b/wally/suits/io/fio_task_parser.py
index 9ffeed8..5b91885 100644
--- a/wally/suits/io/fio_task_parser.py
+++ b/wally/suits/io/fio_task_parser.py
@@ -210,6 +210,7 @@
 def apply_params(sec: FioJobConfig, params: FioParams) -> FioJobConfig:
     processed_vals = OrderedDict()  # type: Dict[str, Any]
     processed_vals.update(params)
+
     for name, val in sec.vals.items():
         if name in params:
             continue
@@ -300,6 +301,12 @@
 
 
 def fio_cfg_compile(source: str, fname: str, test_params: FioParams) -> Iterator[FioJobConfig]:
+    test_params = test_params.copy()
+
+    if 'RAMPTIME' not in test_params and 'RUNTIME' in test_params:
+        ramp = int(int(test_params['RUNTIME']) * 0.05)
+        test_params['RAMPTIME'] = min(30, max(5, ramp))
+
     it = parse_all_in_1(source, fname)
     it = (apply_params(sec, test_params) for sec in it)
     it = flatmap(process_cycles, it)
diff --git a/wally/suits/io/verify.cfg b/wally/suits/io/verify.cfg
index 9433361..250fade 100644
--- a/wally/suits/io/verify.cfg
+++ b/wally/suits/io/verify.cfg
@@ -6,9 +6,9 @@
 LQDW={% 1, 4, 16, 64 %}
 LQDR={% 1, 4, 16, 64 %}
 
-runtime=600
+runtime={RUNTIME}
 direct=1
-ramp_time=30
+ramp_time={RAMPTIME}
 
 # ---------------------------------------------------------------------
 
@@ -19,24 +19,21 @@
 
 [verify_{TEST_SUMM}]
 blocksize=1m
-rw=read
+rw=randread:16
 iodepth={LQDR}
 
 [verify_{TEST_SUMM}]
 blocksize=4k
 rw=randwrite
-direct=1
 iodepth={QDW}
 
 [verify_{TEST_SUMM}]
 blocksize=4k
 rw=randread
-direct=1
 iodepth={QDR}
 
 [verify_{TEST_SUMM}]
 blocksize=4k
 rw=randwrite
 sync=1
-direct=1
 iodepth=1
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index b2b3a54..a997b02 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -105,8 +105,8 @@
             run_times = list(map(self.get_expected_runtime, not_in_storage))
 
             if None not in run_times:
-                # +5% - is a rough estimation for additional operations
-                expected_run_time = int(sum(run_times) * 1.05)
+                # +10s - is a rough estimation for additional operations per iteration
+                expected_run_time = int(sum(run_times) + 10 * len(not_in_storage))
 
                 exec_time_s, end_dt_s = get_time_interval_printable_info(expected_run_time)
                 logger.info("Entire test should takes around %s and finish at %s", exec_time_s, end_dt_s)
diff --git a/wally/test_run_class.py b/wally/test_run_class.py
index a7721e6..7aed795 100644
--- a/wally/test_run_class.py
+++ b/wally/test_run_class.py
@@ -1,9 +1,11 @@
+import collections
 from typing import List, Callable, Any, Dict, Optional, Set
 from concurrent.futures import ThreadPoolExecutor
 
 from cephlib.istorage import IStorage
 from cephlib.node import NodeInfo, IRPCNode
 from cephlib.ssh import ConnCreds
+from cephlib.storage_selectors import DevRolesConfig
 
 from .openstack_api import OSCreds, OSConnection
 from .config import Config
@@ -37,6 +39,7 @@
         self.config = config
         self.sensors_run_on = set()  # type: Set[str]
         self.os_spawned_nodes_ids = None  # type: List[int]
+        self.devs_locator = []  # type: DevRolesConfig
 
     def get_pool(self):
         return ThreadPoolExecutor(self.config.get('worker_pool_sz', 32))