Move common storage, plot and statistic code to cephlib
diff --git a/wally/ceph.py b/wally/ceph.py
index 3527db3..89324c5 100644
--- a/wally/ceph.py
+++ b/wally/ceph.py
@@ -144,7 +144,7 @@
     return path
 
 
-class FillCephInfoStage(Stage):
+class CollectCephInfoStage(Stage):
     config_block = 'ceph'
     priority = StepOrder.UPDATE_NODES_INFO
 
diff --git a/wally/common_types.py b/wally/common_types.py
index 470a325..4aedfaa 100644
--- a/wally/common_types.py
+++ b/wally/common_types.py
@@ -1,47 +1,12 @@
-import abc
-from typing import Any, Union, List, Dict, NamedTuple
+from typing import Any, Dict, NamedTuple
+
+from cephlib.storage import IStorable
 
 
 IP = str
 IPAddr = NamedTuple("IPAddr", [("host", IP), ("port", int)])
 
 
-class IStorable(metaclass=abc.ABCMeta):
-    """Interface for type, which can be stored"""
-
-    @abc.abstractmethod
-    def raw(self) -> Dict[str, Any]:
-        pass
-
-    @classmethod
-    def fromraw(cls, data: Dict[str, Any]) -> 'IStorable':
-        pass
-
-
-Basic = Union[int, str, bytes, bool, None]
-StorableType = Union[IStorable, Dict[str, Any], List[Any], int, str, bytes, bool, None]
-
-
-class Storable(IStorable):
-    """Default implementation"""
-
-    __ignore_fields__ = []  # type: List[str]
-
-    def raw(self) -> Dict[str, Any]:
-        return {name: val
-                for name, val in self.__dict__.items()
-                if not name.startswith("_") and name not in self.__ignore_fields__}
-
-    @classmethod
-    def fromraw(cls, data: Dict[str, Any]) -> 'IStorable':
-        obj = cls.__new__(cls)
-        if cls.__ignore_fields__:
-            data = data.copy()
-            data.update(dict.fromkeys(cls.__ignore_fields__))
-        obj.__dict__.update(data)
-        return obj
-
-
 class ConnCreds(IStorable):
     def __init__(self, host: str, user: str, passwd: str = None, port: str = '22',
                  key_file: str = None, key: bytes = None) -> None:
diff --git a/wally/config.py b/wally/config.py
index 95ae511..2ec7ba5 100644
--- a/wally/config.py
+++ b/wally/config.py
@@ -1,6 +1,6 @@
 from typing import Any, Dict, Optional, Set
 
-from .common_types import IStorable
+from cephlib.storage import IStorable
 
 
 ConfigBlock = Dict[str, Any]
diff --git a/wally/console_report.py b/wally/console_report.py
index ac54965..1528089 100644
--- a/wally/console_report.py
+++ b/wally/console_report.py
@@ -4,13 +4,13 @@
 import numpy
 
 from cephlib.common import float2str
+from cephlib import texttable
+from cephlib.statistic import calc_norm_stat_props, calc_histo_stat_props
 
-from . import texttable
-from .hlstorage import ResultStorage
+from .result_classes import IResultStorage
 from .stage import Stage, StepOrder
 from .test_run_class import TestRun
 from .suits.io.fio import FioTest
-from .statistic import calc_norm_stat_props, calc_histo_stat_props
 from .suits.io.fio_hist import get_lat_vals
 from .data_selectors import get_aggregated
 
@@ -23,22 +23,23 @@
     priority = StepOrder.REPORT
 
     def run(self, ctx: TestRun) -> None:
-        rstorage = ResultStorage(ctx.storage)
-        for suite in rstorage.iter_suite(FioTest.name):
+        for suite in ctx.rstorage.iter_suite(FioTest.name):
             table = texttable.Texttable(max_width=200)
 
-            tbl = rstorage.get_txt_report(suite)
+            tbl = ctx.rstorage.get_txt_report(suite)
             if tbl is None:
                 table.header(["Description", "IOPS ~ Dev", "BW, MiBps", 'Skew/Kurt', 'lat med, ms', 'lat 95, ms'])
                 table.set_cols_align(('l', 'r', 'r', 'r', 'r', 'r'))
 
-                for job in sorted(rstorage.iter_job(suite), key=lambda job: job.params):
-                    bw_ts = get_aggregated(rstorage, suite, job, metric='bw')
+                for job in sorted(ctx.rstorage.iter_job(suite), key=lambda job: job.params):
+                    bw_ts = get_aggregated(ctx.rstorage, suite.storage_id, job.storage_id, metric='bw',
+                                           trange=job.reliable_info_range_s)
                     props = calc_norm_stat_props(bw_ts)
                     avg_iops = props.average // job.params.params['bsize']
                     iops_dev = props.deviation // job.params.params['bsize']
 
-                    lat_ts = get_aggregated(rstorage, suite, job, metric='lat')
+                    lat_ts = get_aggregated(ctx.rstorage, suite.storage_id, job.storage_id, metric='lat',
+                                            trange=job.reliable_info_range_s)
                     bins_edges = numpy.array(get_lat_vals(lat_ts.data.shape[1]), dtype='float32') / 1000  # convert us to ms
                     lat_props = calc_histo_stat_props(lat_ts, bins_edges)
                     table.add_row([job.params.summary,
@@ -48,5 +49,5 @@
                                    float2str(lat_props.perc_50), float2str(lat_props.perc_95)])
 
                 tbl = table.draw()
-                rstorage.put_txt_report(suite, tbl)
+                ctx.rstorage.put_txt_report(suite, tbl)
             print(tbl)
diff --git a/wally/data_selectors.py b/wally/data_selectors.py
index a5ac400..3e6bc3e 100644
--- a/wally/data_selectors.py
+++ b/wally/data_selectors.py
@@ -1,20 +1,13 @@
-import ctypes
 import logging
-import os.path
-from typing import Tuple, List, Iterable, Iterator, Optional, Union, Dict
-from fractions import Fraction
+from typing import Tuple, Iterator
 
 import numpy
 
-from cephlib.numeric import auto_edges2
+from cephlib.numeric_types import DataSource, TimeSeries
+from cephlib.storage_selectors import c_interpolate_ts_on_seconds_border
 
-import wally
-from .hlstorage import ResultStorage
-from .node_interfaces import NodeInfo
-from .result_classes import DataSource, TimeSeries, SuiteConfig, JobConfig
-from .suits.io.fio import FioJobConfig
+from .result_classes import IResultStorage
 from .suits.io.fio_hist import expected_lat_bins
-from .utils import unit_conversion_coef
 
 
 logger = logging.getLogger("wally")
@@ -40,52 +33,25 @@
 AGG_TAG = 'ALL'
 
 
-def find_nodes_by_roles(rstorage: ResultStorage, node_roles: Iterable[str]) -> List[NodeInfo]:
-    nodes = rstorage.storage.load_list(NodeInfo, 'all_nodes')  # type: List[NodeInfo]
-    node_roles_s = set(node_roles)
-    return [node for node in nodes if node.roles.intersection(node_roles_s)]
-
-
-def find_all_sensors(rstorage: ResultStorage,
-                     node_roles: Iterable[str],
-                     sensor: str,
-                     metric: str) -> Iterator[TimeSeries]:
-    all_nodes_rr = "|".join(node.node_id for node in find_nodes_by_roles(rstorage, node_roles))
-    all_nodes_rr = "(?P<node>{})".format(all_nodes_rr)
-
-    for path, ds in rstorage.iter_sensors(all_nodes_rr, sensor=sensor, metric=metric):
-        ts = rstorage.load_sensor(ds)
-
-        # for sensors ts.times is array of pairs - collection_start_at, colelction_finished_at
-        # to make this array consistent with times in load data second item if each pair is dropped
-        ts.times = ts.times[::2]
-        yield ts
-
-
-def find_all_series(rstorage: ResultStorage, suite: SuiteConfig, job: JobConfig, metric: str) -> Iterator[TimeSeries]:
+def find_all_series(rstorage: IResultStorage, suite_id: str, job_id: str, metric: str) -> Iterator[TimeSeries]:
     "Iterated over selected metric for all nodes for given Suite/job"
-    return rstorage.iter_ts(suite, job, metric=metric)
+    return (rstorage.get_ts(ds) for ds in rstorage.iter_ts(suite_id=suite_id, job_id=job_id, metric=metric))
 
 
-def get_aggregated(rstorage: ResultStorage, suite: SuiteConfig, job: FioJobConfig, metric: str) -> TimeSeries:
+def get_aggregated(rstorage: IResultStorage, suite_id: str, job_id: str, metric: str,
+                   trange: Tuple[int, int]) -> TimeSeries:
     "Sum selected metric for all nodes for given Suite/job"
 
-    tss = list(find_all_series(rstorage, suite, job, metric))
+    tss = list(find_all_series(rstorage, suite_id, job_id, metric))
 
     if len(tss) == 0:
-        raise NameError("Can't found any TS for {},{},{}".format(suite, job, metric))
+        raise NameError("Can't found any TS for {},{},{}".format(suite_id, job_id, metric))
 
-    ds = DataSource(suite_id=suite.storage_id,
-                    job_id=job.storage_id,
-                    node_id=AGG_TAG,
-                    sensor='fio',
-                    dev=AGG_TAG,
-                    metric=metric,
-                    tag='csv')
+    ds = DataSource(suite_id=suite_id, job_id=job_id, node_id=AGG_TAG, sensor='fio',
+                    dev=AGG_TAG, metric=metric, tag='csv')
 
     tss_inp = [c_interpolate_ts_on_seconds_border(ts, tp='fio', allow_broken_step=(metric == 'lat')) for ts in tss]
     res = None
-    trange = job.reliable_info_range_s
 
     for ts in tss_inp:
         if ts.time_units != 's':
@@ -121,10 +87,7 @@
             assert res.shape == dt.shape, "res.shape(={}) != dt.shape(={})".format(res.shape, dt.shape)
             res += dt
 
-    agg_ts = TimeSeries(metric,
-                        raw=None,
-                        source=ds,
-                        data=res,
+    agg_ts = TimeSeries(res, source=ds,
                         times=tss_inp[0].times.copy(),
                         units=tss_inp[0].units,
                         histo_bins=tss_inp[0].histo_bins,
@@ -132,299 +95,3 @@
 
     return agg_ts
 
-
-interpolated_cache = {}
-
-
-c_interp_func_agg = None
-c_interp_func_qd = None
-c_interp_func_fio = None
-
-
-def c_interpolate_ts_on_seconds_border(ts: TimeSeries, nc: bool = False, tp: str = 'agg',
-                                       allow_broken_step: bool = False) -> TimeSeries:
-    "Interpolate time series to values on seconds borders"
-    key = (ts.source.tpl, tp)
-    if not nc and key in interpolated_cache:
-        return interpolated_cache[key].copy()
-
-    if tp in ('qd', 'agg'):
-        # both data and times must be 1d compact arrays
-        assert len(ts.data.strides) == 1, "ts.data.strides must be 1D, not " + repr(ts.data.strides)
-        assert ts.data.dtype.itemsize == ts.data.strides[0], "ts.data array must be compact"
-
-    assert len(ts.times.strides) == 1, "ts.times.strides must be 1D, not " + repr(ts.times.strides)
-    assert ts.times.dtype.itemsize == ts.times.strides[0], "ts.times array must be compact"
-
-    assert len(ts.times) == len(ts.data), "len(times)={} != len(data)={} for {!s}"\
-            .format(len(ts.times), len(ts.data), ts.source)
-
-    rcoef = 1 / unit_conversion_coef(ts.time_units, 's')  # type: Union[int, Fraction]
-
-    if isinstance(rcoef, Fraction):
-        assert rcoef.denominator == 1, "Incorrect conversion coef {!r}".format(rcoef)
-        rcoef = rcoef.numerator
-
-    assert rcoef >= 1 and isinstance(rcoef, int), "Incorrect conversion coef {!r}".format(rcoef)
-    coef = int(rcoef)   # make typechecker happy
-
-    global c_interp_func_agg
-    global c_interp_func_qd
-    global c_interp_func_fio
-
-    uint64_p = ctypes.POINTER(ctypes.c_uint64)
-
-    if c_interp_func_agg is None:
-        dirname = os.path.dirname(os.path.dirname(wally.__file__))
-        path = os.path.join(dirname, 'clib', 'libwally.so')
-        cdll = ctypes.CDLL(path)
-
-        c_interp_func_agg = cdll.interpolate_ts_on_seconds_border
-        c_interp_func_qd = cdll.interpolate_ts_on_seconds_border_qd
-
-        for func in (c_interp_func_agg, c_interp_func_qd):
-            func.argtypes = [
-                ctypes.c_uint,  # input_size
-                ctypes.c_uint,  # output_size
-                uint64_p,  # times
-                uint64_p,  # values
-                ctypes.c_uint,  # time_scale_coef
-                uint64_p,  # output
-            ]
-            func.restype = ctypes.c_uint  # output array used size
-
-        c_interp_func_fio = cdll.interpolate_ts_on_seconds_border_fio
-        c_interp_func_fio.restype = ctypes.c_int
-        c_interp_func_fio.argtypes = [
-                ctypes.c_uint,  # input_size
-                ctypes.c_uint,  # output_size
-                uint64_p,  # times
-                ctypes.c_uint,  # time_scale_coef
-                uint64_p,  # output indexes
-                ctypes.c_uint64,  # empty placeholder
-                ctypes.c_bool  # allow broken steps
-            ]
-
-    assert ts.data.dtype.name == 'uint64', "Data dtype for {}=={} != uint64".format(ts.source, ts.data.dtype.name)
-    assert ts.times.dtype.name == 'uint64', "Time dtype for {}=={} != uint64".format(ts.source, ts.times.dtype.name)
-
-    output_sz = int(ts.times[-1]) // coef - int(ts.times[0]) // coef + 2
-    result = numpy.zeros(output_sz, dtype=ts.data.dtype.name)
-
-    if tp in ('qd', 'agg'):
-        assert not allow_broken_step, "Broken steps aren't supported for non-fio arrays"
-        func = c_interp_func_qd if tp == 'qd' else c_interp_func_agg
-        sz = func(ts.data.size,
-                  output_sz,
-                  ts.times.ctypes.data_as(uint64_p),
-                  ts.data.ctypes.data_as(uint64_p),
-                  coef,
-                  result.ctypes.data_as(uint64_p))
-
-        result = result[:sz]
-        output_sz = sz
-
-        rtimes = int(ts.times[0] // coef) + numpy.arange(output_sz, dtype=ts.times.dtype)
-    else:
-        assert tp == 'fio'
-        ridx = numpy.zeros(output_sz, dtype=ts.times.dtype)
-        no_data = (output_sz + 1)
-        sz_or_err = c_interp_func_fio(ts.times.size,
-                                      output_sz,
-                                      ts.times.ctypes.data_as(uint64_p),
-                                      coef,
-                                      ridx.ctypes.data_as(uint64_p),
-                                      no_data,
-                                      allow_broken_step)
-        if sz_or_err <= 0:
-            raise ValueError("Error in input array at index {}. {}".format(-sz_or_err, ts.source))
-
-        rtimes = int(ts.times[0] // coef) + numpy.arange(sz_or_err, dtype=ts.times.dtype)
-
-        empty = numpy.zeros(len(ts.histo_bins), dtype=ts.data.dtype) if ts.source.metric == 'lat' else 0
-        res = []
-        for idx in ridx[:sz_or_err]:
-            if idx == no_data:
-                res.append(empty)
-            else:
-                res.append(ts.data[idx])
-        result = numpy.array(res, dtype=ts.data.dtype)
-
-    res_ts = TimeSeries(ts.name, None, result,
-                        times=rtimes,
-                        units=ts.units,
-                        time_units='s',
-                        source=ts.source(),
-                        histo_bins=ts.histo_bins)
-
-    if not nc:
-        interpolated_cache[ts.source.tpl] = res_ts.copy()
-
-    return res_ts
-
-
-def get_ts_for_time_range(ts: TimeSeries, time_range: Tuple[int, int]) -> TimeSeries:
-    """Return sensor values for given node for given period. Return per second estimated values array
-    Raise an error if required range is not full covered by data in storage"""
-
-    assert ts.time_units == 's', "{} != s for {!s}".format(ts.time_units, ts.source)
-    assert len(ts.times) == len(ts.data), "Time(={}) and data(={}) sizes doesn't equal for {!s}"\
-            .format(len(ts.times), len(ts.data), ts.source)
-
-    if time_range[0] < ts.times[0] or time_range[1] > ts.times[-1]:
-        raise AssertionError(("Incorrect data for get_sensor - time_range={!r}, collected_at=[{}, ..., {}]," +
-                              "sensor = {}_{}.{}.{}").format(time_range, ts.times[0], ts.times[-1],
-                                                             ts.source.node_id, ts.source.sensor, ts.source.dev,
-                                                             ts.source.metric))
-    idx1, idx2 = numpy.searchsorted(ts.times, time_range)
-    return TimeSeries(ts.name, None,
-                      ts.data[idx1:idx2],
-                      times=ts.times[idx1:idx2],
-                      units=ts.units,
-                      time_units=ts.time_units,
-                      source=ts.source,
-                      histo_bins=ts.histo_bins)
-
-
-def make_2d_histo(tss: List[TimeSeries],
-                  outliers_range: Tuple[float, float] = (0.02, 0.98),
-                  bins_count: int = 20,
-                  log_bins: bool = False) -> TimeSeries:
-
-    # validate input data
-    for ts in tss:
-        assert len(ts.times) == len(ts.data), "Time(={}) and data(={}) sizes doesn't equal for {!s}"\
-                .format(len(ts.times), len(ts.data), ts.source)
-        assert ts.time_units == 's', "All arrays should have the same data units"
-        assert ts.units == tss[0].units, "All arrays should have the same data units"
-        assert ts.data.shape == tss[0].data.shape, "All arrays should have the same data size"
-        assert len(ts.data.shape) == 1, "All arrays should be 1d"
-
-    whole_arr = numpy.concatenate([ts.data for ts in tss])
-    whole_arr.shape = [len(tss), -1]
-
-    if outliers_range is not None:
-        max_vl, begin, end, min_vl = numpy.percentile(whole_arr,
-                                                      [0, outliers_range[0] * 100, outliers_range[1] * 100, 100])
-        bins_edges = auto_edges2(begin, end, bins=bins_count, log_space=log_bins)
-        fixed_bins_edges = bins_edges.copy()
-        fixed_bins_edges[0] = begin
-        fixed_bins_edges[-1] = end
-    else:
-        begin, end = numpy.percentile(whole_arr, [0, 100])
-        bins_edges = auto_edges2(begin, end, bins=bins_count, log_space=log_bins)
-        fixed_bins_edges = bins_edges
-
-    res_data = numpy.concatenate(numpy.histogram(column, fixed_bins_edges) for column in whole_arr.T)
-    res_data.shape = (len(tss), -1)
-    res = TimeSeries(name=tss[0].name,
-                     raw=None,
-                     data=res_data,
-                     times=tss[0].times,
-                     units=tss[0].units,
-                     source=tss[0].source,
-                     time_units=tss[0].time_units,
-                     histo_bins=bins_edges)
-    return res
-
-
-def aggregate_histograms(tss: List[TimeSeries],
-                         outliers_range: Tuple[float, float] = (0.02, 0.98),
-                         bins_count: int = 20,
-                         log_bins: bool = False) -> TimeSeries:
-
-    # validate input data
-    for ts in tss:
-        assert len(ts.times) == len(ts.data), "Need to use stripped time"
-        assert ts.time_units == 's', "All arrays should have the same data units"
-        assert ts.units == tss[0].units, "All arrays should have the same data units"
-        assert ts.data.shape == tss[0].data.shape, "All arrays should have the same data size"
-        assert len(ts.data.shape) == 2, "All arrays should be 2d"
-        assert ts.histo_bins is not None, "All arrays should be 2d"
-
-    whole_arr = numpy.concatenate([ts.data for ts in tss])
-    whole_arr.shape = [len(tss), -1]
-
-    max_val = whole_arr.min()
-    min_val = whole_arr.max()
-
-    if outliers_range is not None:
-        begin, end = numpy.percentile(whole_arr, [outliers_range[0] * 100, outliers_range[1] * 100])
-    else:
-        begin = min_val
-        end = max_val
-
-    bins_edges = auto_edges2(begin, end, bins=bins_count, log_space=log_bins)
-
-    if outliers_range is not None:
-        fixed_bins_edges = bins_edges.copy()
-        fixed_bins_edges[0] = begin
-        fixed_bins_edges[-1] = end
-    else:
-        fixed_bins_edges = bins_edges
-
-    res_data = numpy.concatenate(numpy.histogram(column, fixed_bins_edges) for column in whole_arr.T)
-    res_data.shape = (len(tss), -1)
-    return TimeSeries(name=tss[0].name,
-                      raw=None,
-                      data=res_data,
-                      times=tss[0].times,
-                      units=tss[0].units,
-                      source=tss[0].source,
-                      time_units=tss[0].time_units,
-                      histo_bins=fixed_bins_edges)
-
-
-qd_metrics = {'io_queue'}
-summ_sensors_cache = {}  # type: Dict[Tuple[Tuple[str, ...], str, str, Tuple[int, int], int], Optional[TimeSeries]]
-
-
-def summ_sensors(rstorage: ResultStorage,
-                 roles: List[str],
-                 sensor: str,
-                 metric: str,
-                 time_range: Tuple[int, int],
-                 nc: bool = False) -> Optional[TimeSeries]:
-
-    key = (tuple(roles), sensor, metric, time_range, id(ResultStorage))
-    if not nc and key in summ_sensors_cache:
-        return summ_sensors_cache[key].copy()
-
-    res = None  # type: Optional[TimeSeries]
-    for node in find_nodes_by_roles(rstorage, roles):
-        for _, ds in rstorage.iter_sensors(node_id=node.node_id, sensor=sensor, metric=metric):
-            data = rstorage.load_sensor(ds)
-            data = c_interpolate_ts_on_seconds_border(data, 'qd' if metric in qd_metrics else 'agg')
-            data = get_ts_for_time_range(data, time_range)
-            if res is None:
-                res = data
-                res.data = res.data.copy()
-            else:
-                res.data += data.data
-
-    if not nc:
-        summ_sensors_cache[key] = res
-        if len(summ_sensors_cache) > 1024:
-            logger.warning("summ_sensors_cache cache too large %s > 1024", len(summ_sensors_cache))
-
-    return res if res is None else res.copy()
-
-
-def find_sensors_to_2d(rstorage: ResultStorage,
-                       roles: List[str],
-                       sensor: str,
-                       devs: List[str],
-                       metric: str,
-                       time_range: Tuple[int, int]) -> numpy.ndarray:
-
-    res = []  # type: List[TimeSeries]
-    for node in find_nodes_by_roles(rstorage, roles):
-        for dev in devs:
-            for _, ds in rstorage.iter_sensors(node_id=node.node_id, sensor=sensor, dev=dev, metric=metric):
-                data = rstorage.load_sensor(ds)
-                data = c_interpolate_ts_on_seconds_border(data, 'qd' if metric in qd_metrics else 'agg')
-                data = get_ts_for_time_range(data, time_range)
-                res.append(data.data)
-    res2d = numpy.concatenate(res)
-    res2d.shape = ((len(res), -1))
-    return res2d
diff --git a/wally/hlstorage.py b/wally/hlstorage.py
deleted file mode 100644
index 9f66030..0000000
--- a/wally/hlstorage.py
+++ /dev/null
@@ -1,387 +0,0 @@
-import os
-import pprint
-import logging
-from typing import cast, Iterator, Tuple, Type, Dict, Optional, List, Any
-
-import numpy
-
-from .suits.job import JobConfig
-from .result_classes import SuiteConfig, TimeSeries, DataSource, StatProps, IResultStorage, ArrayData
-from .storage import Storage
-from .utils import StopTestError
-from .suits.all_suits import all_suits
-
-
-logger = logging.getLogger('wally')
-
-
-class DB_re:
-    node_id = r'\d+.\d+.\d+.\d+:\d+'
-    job_id = r'[-a-zA-Z0-9_]+_\d+'
-    suite_id = r'[a-z_]+_\d+'
-    sensor = r'[-a-z_]+'
-    dev = r'[-a-zA-Z0-9_]+'
-    tag = r'[a-z_.]+'
-    metric = r'[a-z_.]+'
-
-
-class DB_paths:
-    suite_cfg_r = r'results/{suite_id}\.info\.yml'
-
-    job_root = r'results/{suite_id}\.{job_id}/'
-    job_cfg_r = job_root + r'info\.yml'
-
-    # time series, data from load tool, sensor is a tool name
-    ts_r = job_root + r'{node_id}\.{sensor}\.{metric}\.{tag}'
-
-    # statistica data for ts
-    stat_r = job_root + r'{node_id}\.{sensor}\.{metric}\.stat\.yaml'
-
-    # sensor data
-    sensor_data_r = r'sensors/{node_id}_{sensor}\.{dev}\.{metric}\.{tag}'
-    sensor_time_r = r'sensors/{node_id}_collected_at\.csv'
-
-    report_root = 'report/'
-    plot_r = r'{suite_id}\.{job_id}/{node_id}\.{sensor}\.{dev}\.{metric}\.{tag}'
-    txt_report = report_root + '{suite_id}_report.txt'
-
-    job_extra = 'meta/{suite_id}.{job_id}/{tag}'
-
-    job_cfg = job_cfg_r.replace("\\.", '.')
-    suite_cfg = suite_cfg_r.replace("\\.", '.')
-    ts = ts_r.replace("\\.", '.')
-    stat = stat_r.replace("\\.", '.')
-    sensor_data = sensor_data_r.replace("\\.", '.')
-    sensor_time = sensor_time_r.replace("\\.", '.')
-    plot = plot_r.replace("\\.", '.')
-
-
-DB_rr = {name: r"(?P<{}>{})".format(name, rr)
-         for name, rr in DB_re.__dict__.items()
-         if not name.startswith("__")}
-
-
-def fill_path(path: str, **params) -> str:
-    for name, val in params.items():
-        if val is not None:
-            path = path.replace("{" + name + "}", val)
-    return path
-
-
-class ResultStorage(IResultStorage):
-    # TODO: check that all path components match required patterns
-
-    ts_header_size = 64
-    ts_header_format = "!IIIcc"
-    ts_arr_tag = 'csv'
-    csv_file_encoding = 'ascii'
-
-    def __init__(self, storage: Storage) -> None:
-        self.storage = storage
-        self.cache = {}  # type: Dict[str, Tuple[int, int, ArrayData]]
-
-    def sync(self) -> None:
-        self.storage.sync()
-
-    #  -----------------  SERIALIZATION / DESERIALIZATION  -------------------------------------------------------------
-    def read_headers(self, fd) -> Tuple[str, List[str], List[str], Optional[numpy.ndarray]]:
-        header = fd.readline().decode(self.csv_file_encoding).rstrip().split(",")
-        dtype, has_header2, header2_dtype, *ext_header = header
-
-        if has_header2 == 'true':
-            ln = fd.readline().decode(self.csv_file_encoding).strip()
-            header2 = numpy.fromstring(ln, sep=',', dtype=header2_dtype)
-        else:
-            assert has_header2 == 'false', \
-                "In file {} has_header2 is not true/false, but {!r}".format(fd.name, has_header2)
-            header2 = None
-        return dtype, ext_header, header, header2
-
-    def load_array(self, path: str) -> ArrayData:
-        """
-        Load array from file, shoult not be called directly
-        :param path: file path
-        :return: ArrayData
-        """
-        with self.storage.get_fd(path, "rb") as fd:
-            fd.seek(0, os.SEEK_SET)
-
-            stats = os.fstat(fd.fileno())
-            if path in self.cache:
-                size, atime, arr_info = self.cache[path]
-                if size == stats.st_size and atime == stats.st_atime_ns:
-                    return arr_info
-
-            data_dtype, header, _, header2 = self.read_headers(fd)
-            assert data_dtype == 'uint64', path
-            dt = fd.read().decode(self.csv_file_encoding).strip()
-
-        if len(dt) != 0:
-            arr = numpy.fromstring(dt.replace("\n", ','), sep=',', dtype=data_dtype)
-            lines = dt.count("\n") + 1
-            assert len(set(ln.count(',') for ln in dt.split("\n"))) == 1, \
-                "Data lines in {!r} have different element count".format(path)
-            arr.shape = [lines] if lines == arr.size else [lines, -1]
-        else:
-            arr = None
-
-        arr_data = ArrayData(header, header2, arr)
-        self.cache[path] = (stats.st_size, stats.st_atime_ns, arr_data)
-        return arr_data
-
-    def put_array(self, path: str, data: numpy.array, header: List[str], header2: numpy.ndarray = None,
-                  append_on_exists: bool = False) -> None:
-
-        header = [data.dtype.name] + \
-                 (['false', ''] if header2 is None else ['true', header2.dtype.name]) + \
-                 header
-
-        exists = append_on_exists and path in self.storage
-        vw = data.view().reshape((data.shape[0], 1)) if len(data.shape) == 1 else data
-        mode = "cb" if not exists else "rb+"
-
-        with self.storage.get_fd(path, mode) as fd:
-            if exists:
-                data_dtype, _, full_header, curr_header2 = self.read_headers(fd)
-
-                assert data_dtype == data.dtype.name, \
-                    "Path {!r}. Passed data type ({!r}) and current data type ({!r}) doesn't match"\
-                        .format(path, data.dtype.name, data_dtype)
-
-                assert header == full_header, \
-                    "Path {!r}. Passed header ({!r}) and current header ({!r}) doesn't match"\
-                        .format(path, header, full_header)
-
-                assert header2 == curr_header2, \
-                    "Path {!r}. Passed header2 != current header2: {!r}\n{!r}"\
-                        .format(path, header2, curr_header2)
-
-                fd.seek(0, os.SEEK_END)
-            else:
-                fd.write((",".join(header) + "\n").encode(self.csv_file_encoding))
-                if header2 is not None:
-                    fd.write((",".join(map(str, header2)) + "\n").encode(self.csv_file_encoding))
-
-            numpy.savetxt(fd, vw, delimiter=',', newline="\n", fmt="%lu")
-
-    def load_ts(self, ds: DataSource, path: str) -> TimeSeries:
-        """
-        Load time series, generated by fio or other tool, should not be called directly,
-        use iter_ts istead.
-        :param ds: data source path
-        :param path: path in data storage
-        :return: TimeSeries
-        """
-        (units, time_units), header2, data = self.load_array(path)
-        times = data[:,0].copy()
-        ts_data = data[:,1:]
-
-        if ts_data.shape[1] == 1:
-            ts_data.shape = (ts_data.shape[0],)
-
-        return TimeSeries("{}.{}".format(ds.dev, ds.sensor),
-                          raw=None,
-                          data=ts_data,
-                          times=times,
-                          source=ds,
-                          units=units,
-                          time_units=time_units,
-                          histo_bins=header2)
-
-    def load_sensor_raw(self, ds: DataSource) -> bytes:
-        path = DB_paths.sensor_data.format(**ds.__dict__)
-        with self.storage.get_fd(path, "rb") as fd:
-            return fd.read()
-
-    def load_sensor(self, ds: DataSource) -> TimeSeries:
-        # sensors has no shape
-        path = DB_paths.sensor_time.format(**ds.__dict__)
-        collect_header, must_be_none, collected_at = self.load_array(path)
-
-        # cut 'collection end' time
-        # .copy needed to really remove 'collection end' element to make c_interpolate_.. works correctly
-        collected_at = collected_at[::2].copy()
-
-        # there must be no histogram for collected_at
-        assert must_be_none is None, "Extra header2 {!r} in collect_at file at {!r}".format(must_be_none, path)
-        node, tp, units = collect_header
-        assert node == ds.node_id and tp == 'collected_at' and units in ('ms', 'us'),\
-            "Unexpected collect_at header {!r} at {!r}".format(collect_header, path)
-        assert len(collected_at.shape) == 1, "Collected_at must be 1D at {!r}".format(path)
-
-        data_path = DB_paths.sensor_data.format(**ds.__dict__)
-        data_header, must_be_none, data  = self.load_array(data_path)
-
-        # there must be no histogram for any sensors
-        assert must_be_none is None, "Extra header2 {!r} in sensor data file {!r}".format(must_be_none, data_path)
-
-        data_units = data_header[2]
-        assert data_header == [ds.node_id, ds.metric_fqdn, data_units], \
-            "Unexpected data header {!r} at {!r}".format(data_header, data_path)
-        assert len(data.shape) == 1, "Sensor data must be 1D at {!r}".format(data_path)
-
-        return TimeSeries(ds.metric_fqdn,
-                          raw=None,
-                          data=data,
-                          times=collected_at,
-                          source=ds,
-                          units=data_units,
-                          time_units=units)
-
-    # -------------  CHECK DATA IN STORAGE  ----------------------------------------------------------------------------
-
-    def check_plot_file(self, source: DataSource) -> Optional[str]:
-        path = DB_paths.plot.format(**source.__dict__)
-        fpath = self.storage.resolve_raw(DB_paths.report_root + path)
-        return path if os.path.exists(fpath) else None
-
-    # -------------   PUT DATA INTO STORAGE   --------------------------------------------------------------------------
-
-    def put_or_check_suite(self, suite: SuiteConfig) -> None:
-        path = DB_paths.suite_cfg.format(suite_id=suite.storage_id)
-        if path in self.storage:
-            db_cfg = self.storage.load(SuiteConfig, path)
-            if db_cfg != suite:
-                logger.error("Current suite %s config is not equal to found in storage at %s", suite.test_type, path)
-                logger.debug("Current: \n%s\nStorage:\n%s", pprint.pformat(db_cfg), pprint.pformat(suite))
-                raise StopTestError()
-        else:
-            self.storage.put(suite, path)
-
-    def put_job(self, suite: SuiteConfig, job: JobConfig) -> None:
-        path = DB_paths.job_cfg.format(suite_id=suite.storage_id, job_id=job.storage_id)
-        self.storage.put(job, path)
-
-    def put_ts(self, ts: TimeSeries) -> None:
-        assert ts.data.dtype == ts.times.dtype, "Data type {!r} != time type {!r}".format(ts.data.dtype, ts.times.dtype)
-        assert ts.data.dtype.kind == 'u', "Only unsigned ints are accepted"
-        assert ts.source.tag == self.ts_arr_tag, "Incorrect source tag == {!r}, must be {!r}".format(ts.source.tag,
-                                                                                                     self.ts_arr_tag)
-        csv_path = DB_paths.ts.format(**ts.source.__dict__)
-        header = [ts.units, ts.time_units]
-
-        tv = ts.times.view().reshape((-1, 1))
-        if len(ts.data.shape) == 1:
-            dv = ts.data.view().reshape((ts.times.shape[0], -1))
-        else:
-            dv = ts.data
-
-        result = numpy.concatenate((tv, dv), axis=1)
-        if ts.histo_bins is not None:
-            self.put_array(csv_path, result, header, header2=ts.histo_bins)
-        else:
-            self.put_array(csv_path, result, header)
-
-        if ts.raw:
-            raw_path = DB_paths.ts.format(**ts.source(tag=ts.raw_tag).__dict__)
-            self.storage.put_raw(ts.raw, raw_path)
-
-    def put_extra(self, data: bytes, source: DataSource) -> None:
-        self.storage.put_raw(data, DB_paths.ts.format(**source.__dict__))
-
-    def put_stat(self, data: StatProps, source: DataSource) -> None:
-        self.storage.put(data, DB_paths.stat.format(**source.__dict__))
-
-    # return path to file to be inserted into report
-    def put_plot_file(self, data: bytes, source: DataSource) -> str:
-        path = DB_paths.plot.format(**source.__dict__)
-        self.storage.put_raw(data, DB_paths.report_root + path)
-        return path
-
-    def put_report(self, report: str, name: str) -> str:
-        return self.storage.put_raw(report.encode(self.csv_file_encoding), DB_paths.report_root + name)
-
-    def put_sensor_raw(self, data: bytes, ds: DataSource) -> None:
-        path = DB_paths.sensor_data.format(**ds.__dict__)
-        with self.storage.get_fd(path, "cb") as fd:
-            fd.write(data)
-
-    def append_sensor(self, data: numpy.array, ds: DataSource, units: str, histo_bins: numpy.ndarray = None) -> None:
-        if ds.metric == 'collected_at':
-            path = DB_paths.sensor_time
-            metrics_fqn = 'collected_at'
-        else:
-            path = DB_paths.sensor_data
-            metrics_fqn = ds.metric_fqdn
-
-        if ds.metric == 'lat':
-            assert len(data.shape) == 2, "Latency should be histo array"
-            assert histo_bins is not None, "Latency should have histo bins"
-
-        path = path.format(**ds.__dict__)
-        self.put_array(path, data, [ds.node_id, metrics_fqn, units], header2=histo_bins, append_on_exists=True)
-
-    # -------------   GET DATA FROM STORAGE   --------------------------------------------------------------------------
-
-    def get_stat(self, stat_cls: Type[StatProps], source: DataSource) -> StatProps:
-        return self.storage.load(stat_cls, DB_paths.stat.format(**source.__dict__))
-
-    # -------------   ITER OVER STORAGE   ------------------------------------------------------------------------------
-
-    def iter_paths(self, path_glob) -> Iterator[Tuple[bool, str, Dict[str, str]]]:
-        path = path_glob.format(**DB_rr).split("/")
-        yield from self.storage._iter_paths("", path, {})
-
-    def iter_suite(self, suite_type: str = None) -> Iterator[SuiteConfig]:
-        for is_file, suite_info_path, groups in self.iter_paths(DB_paths.suite_cfg_r):
-            assert is_file
-            suite = self.storage.load(SuiteConfig, suite_info_path)
-            # suite = cast(SuiteConfig, self.storage.load(SuiteConfig, suite_info_path))
-            assert suite.storage_id == groups['suite_id']
-            if not suite_type or suite.test_type == suite_type:
-                yield suite
-
-    def iter_job(self, suite: SuiteConfig) -> Iterator[JobConfig]:
-        job_glob = fill_path(DB_paths.job_cfg_r, suite_id=suite.storage_id)
-        job_config_cls = all_suits[suite.test_type].job_config_cls
-        for is_file, path, groups in self.iter_paths(job_glob):
-            assert is_file
-            job = cast(JobConfig, self.storage.load(job_config_cls, path))
-            assert job.storage_id == groups['job_id']
-            yield job
-
-    # iterate over test tool data
-    def iter_ts(self, suite: SuiteConfig, job: JobConfig, **filters) -> Iterator[TimeSeries]:
-        filters.update(suite_id=suite.storage_id, job_id=job.storage_id)
-        ts_glob = fill_path(DB_paths.ts_r, **filters)
-        for is_file, path, groups in self.iter_paths(ts_glob):
-            tag = groups["tag"]
-            if tag != 'csv':
-                continue
-            assert is_file
-            groups = groups.copy()
-            groups.update(filters)
-            ds = DataSource(suite_id=suite.storage_id,
-                            job_id=job.storage_id,
-                            node_id=groups["node_id"],
-                            sensor=groups["sensor"],
-                            dev=None,
-                            metric=groups["metric"],
-                            tag=tag)
-            yield self.load_ts(ds, path)
-
-    def iter_sensors(self, node_id: str = None, sensor: str = None, dev: str = None, metric: str = None) -> \
-            Iterator[Tuple[str, DataSource]]:
-        vls = dict(node_id=node_id, sensor=sensor, dev=dev, metric=metric)
-        path = fill_path(DB_paths.sensor_data_r, **vls)
-        for is_file, path, groups in self.iter_paths(path):
-            cvls = vls.copy()
-            cvls.update(groups)
-            yield path, DataSource(**cvls)
-
-    def get_txt_report(self, suite: SuiteConfig) -> Optional[str]:
-        path = DB_paths.txt_report.format(suite_id=suite.storage_id)
-        if path in self.storage:
-            return self.storage.get_raw(path).decode('utf8')
-
-    def put_txt_report(self, suite: SuiteConfig, report: str) -> None:
-        path = DB_paths.txt_report.format(suite_id=suite.storage_id)
-        self.storage.put_raw(report.encode('utf8'), path)
-
-    def put_job_info(self, suite: SuiteConfig, job: JobConfig, key: str, data: Any) -> None:
-        path = DB_paths.job_extra.format(suite_id=suite.storage_id, job_id=job.storage_id, tag=key)
-        self.storage.put(data, path)
-
-    def get_job_info(self, suite: SuiteConfig, job: JobConfig, key: str) -> Any:
-        path = DB_paths.job_extra.format(suite_id=suite.storage_id, job_id=job.storage_id, tag=key)
-        return self.storage.get(path, None)
diff --git a/wally/html.py b/wally/html.py
deleted file mode 100644
index a1db5b3..0000000
--- a/wally/html.py
+++ /dev/null
@@ -1,54 +0,0 @@
-from typing import Optional, List, Callable
-
-
-import xmlbuilder3
-
-
-eol = "<br>"
-
-
-def tag(name: str) -> Callable[[str], str]:
-    def closure(data: str) -> str:
-        return "<{}>{}</{}>".format(name, data, name)
-    return closure
-
-
-H3 = tag("H3")
-H2 = tag("H2")
-center = tag("center")
-
-
-def img(link: str) -> str:
-    return '<img src="{}">'.format(link)
-
-
-def table(caption: str, headers: Optional[List[str]], data: List[List[str]], align: List[str] = None) -> str:
-    doc = xmlbuilder3.XMLBuilder("table",
-                                 **{"class": "table table-bordered table-striped table-condensed table-hover",
-                                    "style": "width: auto;"})
-
-    doc.caption.H3.center(caption)
-
-    if headers is not None:
-        with doc.thead:
-            with doc.tr:
-                for header in headers:
-                    doc.th(header)
-
-    max_cols = max(len(line) for line in data if not isinstance(line, str))
-
-    with doc.tbody:
-        for line in data:
-            with doc.tr:
-                if isinstance(line, str):
-                    with doc.td(colspan=str(max_cols)):
-                        doc.center.b(line)
-                else:
-                    if align:
-                        for vl, col_align in zip(line, align):
-                            doc.td(vl, align=col_align)
-                    else:
-                        for vl in line:
-                            doc.td(vl)
-
-    return xmlbuilder3.tostr(doc).split("\n", 1)[1]
\ No newline at end of file
diff --git a/wally/hw_info.py b/wally/hw_info.py
index 61938ab..bb857f2 100644
--- a/wally/hw_info.py
+++ b/wally/hw_info.py
@@ -1,11 +1,13 @@
 import re
 import logging
-from typing import Dict, Iterable
+from typing import Dict, Any
 import xml.etree.ElementTree as ET
 from typing import List, Tuple, cast, Optional
 
-from . import utils
-from .node_utils import get_os, OSRelease
+from cephlib.istorage import Storable
+from cephlib.common import b2ssize
+
+from .node_utils import get_os
 from .node_interfaces import IRPCNode
 
 
@@ -17,7 +19,9 @@
     return match_res.group(0)
 
 
-class HWInfo:
+class HWInfo(Storable):
+    __ignore_fields__ = ['raw_xml']
+
     def __init__(self) -> None:
         self.hostname = None  # type: str
         self.cores = []  # type: List[Tuple[str, int]]
@@ -34,14 +38,10 @@
         self.ram_size = 0  # type: int
         self.sys_name = None  # type: str
         self.mb = None  # type: str
-        self.raw = None  # type: str
+        self.raw_xml = None  # type: Optional[str]
 
         self.storage_controllers = []  # type: List[str]
 
-    def get_hdd_count(self) -> Iterable[int]:
-        # SATA HDD COUNT, SAS 10k HDD COUNT, SAS SSD count, PCI-E SSD count
-        return []
-
     def get_summary(self) -> Dict[str, int]:
         cores = sum(count for _, count in self.cores)
         disks = sum(size for _, size in self.disks_info.values())
@@ -57,8 +57,8 @@
         summ = self.get_summary()
         summary = "Simmary: {cores} cores, {ram}B RAM, {disk}B storage"
         res.append(summary.format(cores=summ['cores'],
-                                  ram=utils.b2ssize(summ['ram']),
-                                  disk=utils.b2ssize(summ['storage'])))
+                                  ram=b2ssize(summ['ram']),
+                                  disk=b2ssize(summ['storage'])))
         res.append(str(self.sys_name))
         if self.mb:
             res.append("Motherboard: " + self.mb)
@@ -66,7 +66,7 @@
         if not self.ram_size:
             res.append("RAM: Failed to get RAM size")
         else:
-            res.append("RAM " + utils.b2ssize(self.ram_size) + "B")
+            res.append("RAM " + b2ssize(self.ram_size) + "B")
 
         if not self.cores:
             res.append("CPU cores: Failed to get CPU info")
@@ -86,7 +86,7 @@
         if self.disks_info:
             res.append("Storage devices:")
             for dev, (model, size) in sorted(self.disks_info.items()):
-                ssize = utils.b2ssize(size) + "B"
+                ssize = b2ssize(size) + "B"
                 res.append("    {0} {1} {2}".format(dev, ssize, model))
         else:
             res.append("Storage devices's: Failed to get info")
@@ -108,30 +108,19 @@
         return str(self.hostname) + ":\n" + "\n".join("    " + i for i in res)
 
 
-class CephInfo:
-    def __init__(self) -> None:
-        pass
-
-
-class SWInfo:
+class SWInfo(Storable):
     def __init__(self) -> None:
         self.mtab = None  # type: str
         self.kernel_version = None  # type: str
         self.libvirt_version = None  # type: Optional[str]
         self.qemu_version = None  # type: Optional[str]
-        self.OS_version = None  # type: OSRelease
-        self.ceph_info = None  # type: Optional[CephInfo]
-
-
-def get_ceph_services_info(node: IRPCNode) -> CephInfo:
-    # TODO: use ceph-monitoring module
-    return CephInfo()
+        self.os_version = None  # type: Tuple[str, ...]
 
 
 def get_sw_info(node: IRPCNode) -> SWInfo:
     res = SWInfo()
 
-    res.OS_version = get_os(node)
+    res.os_version = tuple(get_os(node))
     res.kernel_version = node.get_file_content('/proc/version').decode('utf8').strip()
     res.mtab = node.get_file_content('/etc/mtab').decode('utf8').strip()
 
@@ -147,16 +136,10 @@
     except OSError:
         res.qemu_version = None
 
-    for role in ('ceph-osd', 'ceph-mon', 'ceph-mds'):
-        if role in node.info.roles:
-            res.ceph_info = get_ceph_services_info(node)
-            break
-
     return res
 
 
 def get_hw_info(node: IRPCNode) -> Optional[HWInfo]:
-
     try:
         lshw_out = node.run('sudo lshw -xml 2>/dev/null')
     except Exception as exc:
@@ -164,7 +147,7 @@
         return None
 
     res = HWInfo()
-    res.raw = lshw_out
+    res.raw_xml = lshw_out
     lshw_et = ET.fromstring(lshw_out)
 
     try:
diff --git a/wally/logger.py b/wally/logger.py
index d885ea9..23b0f3b 100644
--- a/wally/logger.py
+++ b/wally/logger.py
@@ -1,7 +1,6 @@
-import yaml
 import logging
 import logging.config
-from typing import Callable, IO, Optional
+from typing import Callable
 
 
 def color_me(color: int) -> Callable[[str], str]:
diff --git a/wally/main.py b/wally/main.py
index a20ade5..e43f1f9 100644
--- a/wally/main.py
+++ b/wally/main.py
@@ -21,8 +21,8 @@
 except ImportError:
     yaml_load = cast(YLoader,  _yaml_load)
 
-
-import texttable
+from cephlib.texttable import Texttable
+from cephlib.istorage import IStorage
 
 try:
     import faulthandler
@@ -30,18 +30,18 @@
     faulthandler = None
 
 from cephlib.common import setup_logging
+from cephlib.storage import make_storage
 
-from . import utils, node
+from . import utils, node, report_profiles, report
 from .node_utils import log_nodes_statistic
-from .storage import make_storage, Storage
 from .config import Config
 from .stage import Stage
 from .test_run_class import TestRun
 from .ssh import set_ssh_key_passwd
-
+from .result_storage import ResultStorage
 
 # stages
-from .ceph import DiscoverCephStage, FillCephInfoStage
+from .ceph import DiscoverCephStage, CollectCephInfoStage
 from .openstack import DiscoverOSStage
 from .fuel import DiscoverFuelStage
 from .run_test import (CollectInfoStage, ExplicitNodesStage, SaveNodesStage,
@@ -123,6 +123,8 @@
     report_parser = subparsers.add_parser('report', help=report_help)
     report_parser.add_argument('-R', '--reporters', help="Comma-separated list of reportes - html,txt",
                                default='html,txt')
+    report_parser.add_argument('-f', '--format', help="Images format, default is " + report_profiles.default_format,
+                               choices=('svg', 'png'), default=report_profiles.default_format)
     report_parser.add_argument("data_dir", help="folder with rest results")
 
     # ---------------------------------------------------------------------
@@ -210,7 +212,7 @@
 
 def get_run_stages() -> List[Stage]:
     return [DiscoverCephStage(),
-            FillCephInfoStage(),
+            CollectCephInfoStage(),
             DiscoverOSStage(),
             DiscoverFuelStage(),
             ExplicitNodesStage(),
@@ -231,7 +233,7 @@
 
     # stop mypy from telling that config & storage might be undeclared
     config = None  # type: Config
-    storage = None  # type: Storage
+    storage = None  # type: IStorage
 
     if opts.subparser_name == 'test':
         config = load_config(opts.config_file)
@@ -277,7 +279,7 @@
         opts.subparser_name = 'resume'
 
     elif opts.subparser_name == 'ls':
-        tab = texttable.Texttable(max_width=200)
+        tab = Texttable(max_width=200)
         tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
         tab.set_cols_align(["l", "l", "l", "l"])
         tab.header(["Name", "Tests", "Run at", "Comment"])
@@ -291,9 +293,10 @@
             return 1
         storage = make_storage(opts.data_dir, existing=True)
         config = storage.load(Config, 'config')
+        report_profiles.default_format = opts.format
+        report.default_format = opts.format
         stages.append(LoadStoredNodesStage())
         stages.append(SaveNodesStage())
-
     elif opts.subparser_name == 'compare':
         # x = run_test.load_data_from_path(opts.data_path1)
         # y = run_test.load_data_from_path(opts.data_path2)
@@ -314,7 +317,6 @@
         return 0
     elif opts.subparser_name == 'ipython':
         storage = make_storage(opts.storage_dir, existing=True)
-        from .hlstorage import ResultStorage
         rstorage = ResultStorage(storage=storage)
 
         import IPython
@@ -341,7 +343,7 @@
 
     logger.info("All info would be stored into %r", config.storage_url)
 
-    ctx = TestRun(config, storage)
+    ctx = TestRun(config, storage, ResultStorage(storage))
     ctx.rpc_code, ctx.default_rpc_plugins = node.get_rpc_server_code()
 
     if opts.ssh_key_passwd is not None:
diff --git a/wally/node.py b/wally/node.py
index 32ec58a..a662f76 100644
--- a/wally/node.py
+++ b/wally/node.py
@@ -8,11 +8,9 @@
 import subprocess
 from typing import Union, cast, Optional, Tuple, Dict
 
-
 from agent import agent
 import paramiko
 
-
 from .node_interfaces import IRPCNode, NodeInfo, ISSHHost
 from .ssh import connect as ssh_connect
 
diff --git a/wally/node_interfaces.py b/wally/node_interfaces.py
index b10f83d..703ef71 100644
--- a/wally/node_interfaces.py
+++ b/wally/node_interfaces.py
@@ -2,8 +2,10 @@
 import logging
 from typing import Any, Set, Dict, Optional, NamedTuple
 
+from cephlib.istorage import IStorable
+
 from .ssh_utils import ConnCreds
-from .common_types import IPAddr, IStorable
+from .common_types import IPAddr
 
 
 RPCCreds = NamedTuple("RPCCreds", [("addr", IPAddr), ("key_file", str), ("cert_file", str)])
diff --git a/wally/openstack.py b/wally/openstack.py
index 88189f5..4048207 100644
--- a/wally/openstack.py
+++ b/wally/openstack.py
@@ -1,7 +1,9 @@
 import os.path
 import socket
 import logging
-from typing import Dict, Any, List, Tuple, cast, Optional
+from typing import Dict, Any, List, Tuple, cast
+
+from cephlib.common import to_ip
 
 from .node_interfaces import NodeInfo
 from .config import ConfigBlock, Config
@@ -10,7 +12,7 @@
                             OSCreds, get_openstack_credentials, prepare_os, launch_vms, clear_nodes)
 from .test_run_class import TestRun
 from .stage import Stage, StepOrder
-from .utils import LogError, StopTestError, get_creds_openrc, to_ip
+from .utils import LogError, StopTestError, get_creds_openrc
 
 
 logger = logging.getLogger("wally")
@@ -136,7 +138,10 @@
             logger.debug("Found %s openstack service nodes" % len(host_services_mapping))
 
             for host, services in host_services_mapping.items():
-                creds = ConnCreds(host=to_ip(host), user=user, passwd=password, key_file=key_file)
+                host_ip = to_ip(host)
+                if host != host_ip:
+                    logger.info("Will use ip_addr %r instead of hostname %r", host_ip, host)
+                creds = ConnCreds(host=host_ip, user=user, passwd=password, key_file=key_file)
                 ctx.merge_node(creds, set(services))
             # TODO: log OS nodes discovery results
         else:
@@ -221,50 +226,3 @@
             ctx.storage.rm('spawned_os_nodes')
 
             logger.info("OS spawned nodes has been successfully removed")
-
-
-
-# @contextlib.contextmanager
-# def suspend_vm_nodes_ctx(ctx: TestRun, unused_nodes: List[IRPCNode]) -> Iterator[List[int]]:
-#
-#     pausable_nodes_ids = [cast(int, node.info.os_vm_id)
-#                           for node in unused_nodes
-#                           if node.info.os_vm_id is not None]
-#
-#     non_pausable = len(unused_nodes) - len(pausable_nodes_ids)
-#
-#     if non_pausable:
-#         logger.warning("Can't pause {} nodes".format(non_pausable))
-#
-#     if pausable_nodes_ids:
-#         logger.debug("Try to pause {} unused nodes".format(len(pausable_nodes_ids)))
-#         with ctx.get_pool() as pool:
-#             openstack_api.pause(ctx.os_connection, pausable_nodes_ids, pool)
-#
-#     try:
-#         yield pausable_nodes_ids
-#     finally:
-#         if pausable_nodes_ids:
-#             logger.debug("Unpausing {} nodes".format(len(pausable_nodes_ids)))
-#             with ctx.get_pool() as pool:
-#                 openstack_api.unpause(ctx.os_connection, pausable_nodes_ids, pool)
-# def clouds_connect_stage(ctx: TestRun) -> None:
-    # TODO(koder): need to use this to connect to openstack in upper code
-    # conn = ctx.config['clouds/openstack']
-    # user, passwd, tenant = parse_creds(conn['creds'])
-    # auth_data = dict(auth_url=conn['auth_url'],
-    #                  username=user,
-    #                  api_key=passwd,
-    #                  project_id=tenant)  # type: Dict[str, str]
-    # logger.debug("Discovering openstack nodes with connection details: %r", conn)
-    # connect to openstack, fuel
-
-    # # parse FUEL REST credentials
-    # username, tenant_name, password = parse_creds(fuel_data['creds'])
-    # creds = {"username": username,
-    #          "tenant_name": tenant_name,
-    #          "password": password}
-    #
-    # # connect to FUEL
-    # conn = fuel_rest_api.KeystoneAuth(fuel_data['url'], creds, headers=None)
-    # pass
\ No newline at end of file
diff --git a/wally/openstack_api.py b/wally/openstack_api.py
index 0f6c6fc..302d898 100644
--- a/wally/openstack_api.py
+++ b/wally/openstack_api.py
@@ -7,7 +7,7 @@
 import tempfile
 import subprocess
 import urllib.request
-from typing import Dict, Any, Iterable, Iterator, NamedTuple, Optional, List, Tuple, Set
+from typing import Dict, Any, Iterable, Iterator, NamedTuple, Optional, List, Tuple
 from concurrent.futures import ThreadPoolExecutor
 
 from keystoneauth1 import loading, session
@@ -16,7 +16,8 @@
 from cinderclient.client import Client as CinderClient
 from glanceclient import Client as GlanceClient
 
-from .utils import Timeout, to_ip
+from cephlib.common import Timeout, to_ip
+
 from .node_interfaces import NodeInfo
 from .ssh_utils import ConnCreds
 
@@ -453,7 +454,10 @@
     user = params['image']['user']
 
     for ip, os_node in create_vms_mt(conn, count, executor, **vm_params):
-        info = NodeInfo(ConnCreds(to_ip(ip), user, key_file=private_key_path), set())
+        node_ip = to_ip(ip)
+        if ip != node_ip:
+            logger.info("Will use ip_addr %r instead of hostname %r", node_ip, ip)
+        info = NodeInfo(ConnCreds(node_ip, user, key_file=private_key_path), set())
         info.os_vm_id = os_node.id
         yield info
 
diff --git a/wally/plot.py b/wally/plot.py
index 6729584..857a594 100644
--- a/wally/plot.py
+++ b/wally/plot.py
@@ -1,15 +1,7 @@
 import logging
-from io import BytesIO
-from functools import wraps
-from typing import Tuple, cast, List, Callable, Optional, Any
+from typing import List
 
 import numpy
-import scipy.stats
-import matplotlib.axis
-import matplotlib.style
-from matplotlib.ticker import FuncFormatter
-from matplotlib.figure import Figure
-import matplotlib.pyplot as plt
 
 # to make seaborn styles available
 import warnings
@@ -17,387 +9,16 @@
     warnings.simplefilter("ignore")
     import seaborn
 
-from cephlib.plot import process_heatmap_data, hmap_from_2d, do_plot_hmap_with_histo
+from cephlib.units import unit_conversion_coef_f
+from cephlib.plot import PlotParams, provide_plot
 
-from .hlstorage import ResultStorage
-from .utils import unit_conversion_coef
-from .statistic import moving_average, moving_dev, hist_outliers_perc, find_ouliers_ts, approximate_curve
-from .result_classes import StatProps, DataSource, TimeSeries, NormStatProps
-from .report_profiles import StyleProfile, ColorProfile
+from .report_profiles import StyleProfile
 from .resources import IOSummary
 
 
 logger = logging.getLogger("wally")
 
 
-# --------------  PLOT HELPERS FUNCTIONS  ------------------------------------------------------------------------------
-
-def get_emb_image(fig: Figure, file_format: str, **opts) -> bytes:
-    bio = BytesIO()
-    if file_format == 'svg':
-        fig.savefig(bio, format='svg', **opts)
-        img_start = "<!-- Created with matplotlib (http://matplotlib.org/) -->"
-        return bio.getvalue().decode("utf8").split(img_start, 1)[1].encode("utf8")
-    else:
-        fig.savefig(bio, format=file_format, **opts)
-        return bio.getvalue()
-
-
-class PlotParams:
-    def __init__(self, fig: Figure, ax: Any, title: str,
-                 style: StyleProfile, colors: ColorProfile) -> None:
-        self.fig = fig
-        self.ax = ax
-        self.style = style
-        self.colors = colors
-        self.title = title
-
-
-def provide_plot(noaxis: bool = False,
-                 eng: bool = False,
-                 no_legend: bool = False,
-                 long_plot: bool = True,
-                 grid: Any = None,
-                 style_name: str = 'default',
-                 noadjust: bool = False) -> Callable[..., Callable[..., str]]:
-    def closure1(func: Callable[..., None]) -> Callable[..., str]:
-        @wraps(func)
-        def closure2(storage: ResultStorage,
-                     style: StyleProfile,
-                     colors: ColorProfile,
-                     path: DataSource,
-                     title: Optional[str],
-                     *args, **kwargs) -> str:
-            fpath = storage.check_plot_file(path)
-            if not fpath:
-
-                assert style_name in ('default', 'ioqd')
-                mlstyle = style.default_style if style_name == 'default' else style.io_chart_style
-                with matplotlib.style.context(mlstyle):
-                    file_format = path.tag.split(".")[-1]
-                    fig = plt.figure(figsize=style.figsize_long if long_plot else style.figsize)
-
-                    if not noaxis:
-                        xlabel = kwargs.pop('xlabel', None)
-                        ylabel = kwargs.pop('ylabel', None)
-                        ax = fig.add_subplot(111)
-
-                        if xlabel is not None:
-                            ax.set_xlabel(xlabel)
-
-                        if ylabel is not None:
-                            ax.set_ylabel(ylabel)
-
-                        if grid:
-                            ax.grid(axis=grid)
-                    else:
-                        ax = None
-
-                    if title:
-                        fig.suptitle(title, fontsize=style.title_font_size)
-
-                    pp = PlotParams(fig, ax, title, style, colors)
-                    func(pp, *args, **kwargs)
-                    apply_style(pp, eng=eng, no_legend=no_legend, noadjust=noadjust)
-
-                    fpath = storage.put_plot_file(get_emb_image(fig, file_format=file_format, dpi=style.dpi), path)
-                    logger.debug("Plot %s saved to %r", path, fpath)
-                    plt.close(fig)
-            return fpath
-        return closure2
-    return closure1
-
-
-def apply_style(pp: PlotParams, eng: bool = True, no_legend: bool = False, noadjust: bool = False) -> None:
-
-    if (pp.style.legend_for_eng or not eng) and not no_legend:
-        if not noadjust:
-            pp.fig.subplots_adjust(right=StyleProfile.subplot_adjust_r)
-        legend_location = "center left"
-        legend_bbox_to_anchor = (1.03, 0.81)
-
-        for ax in pp.fig.axes:
-            ax.legend(loc=legend_location, bbox_to_anchor=legend_bbox_to_anchor)
-    elif not noadjust:
-        pp.fig.subplots_adjust(right=StyleProfile.subplot_adjust_r_no_legend)
-
-    if pp.style.tide_layout:
-        pp.fig.set_tight_layout(True)
-
-
-# --------------  PLOT FUNCTIONS  --------------------------------------------------------------------------------------
-
-
-@provide_plot(eng=True)
-def plot_hist(pp: PlotParams, units: str, prop: StatProps) -> None:
-
-    normed_bins = prop.bins_populations / prop.bins_populations.sum()
-    bar_width = prop.bins_edges[1] - prop.bins_edges[0]
-    pp.ax.bar(prop.bins_edges, normed_bins, color=pp.colors.box_color, width=bar_width, label="Real data")
-
-    pp.ax.set(xlabel=units, ylabel="Value probability")
-
-    if isinstance(prop, NormStatProps):
-        nprop = cast(NormStatProps, prop)
-        stats = scipy.stats.norm(nprop.average, nprop.deviation)
-
-        new_edges, step = numpy.linspace(prop.bins_edges[0], prop.bins_edges[-1],
-                                         len(prop.bins_edges) * 10, retstep=True)
-
-        ypoints = stats.cdf(new_edges) * 11
-        ypoints = [nextpt - prevpt for (nextpt, prevpt) in zip(ypoints[1:], ypoints[:-1])]
-        xpoints = (new_edges[1:] + new_edges[:-1]) / 2
-
-        pp.ax.plot(xpoints, ypoints, color=pp.colors.primary_color, label="Expected from\nnormal\ndistribution")
-
-    pp.ax.set_xlim(left=prop.bins_edges[0])
-    if prop.log_bins:
-        pp.ax.set_xscale('log')
-
-
-@provide_plot(grid='y')
-def plot_simple_over_time(pp: PlotParams, tss: List[Tuple[str, numpy.ndarray]], average: bool = False) -> None:
-    max_len = 0
-    for name, arr in tss:
-        if average:
-            avg_vals = moving_average(arr, pp.style.avg_range)
-            if pp.style.approx_average_no_points:
-                time_points = numpy.arange(len(avg_vals))
-                avg_vals = approximate_curve(cast(List[int], time_points),
-                                             avg_vals,
-                                             cast(List[int], time_points),
-                                             pp.style.curve_approx_level)
-            arr = avg_vals
-        pp.ax.plot(arr, label=name)
-        max_len = max(max_len, len(arr))
-    pp.ax.set_xlim(-5, max_len + 5)
-
-
-@provide_plot(no_legend=True, grid='x', noadjust=True)
-def plot_simple_bars(pp: PlotParams,
-                     names: List[str],
-                     values: List[float],
-                     errs: List[float] = None,
-                     x_formatter: Callable[[float, float], str] = None,
-                     one_point_zero_line: bool = True) -> None:
-
-    ind = numpy.arange(len(names))
-    width = 0.35
-    pp.ax.barh(ind, values, width, xerr=errs)
-
-    pp.ax.set_yticks(ind)
-    pp.ax.set_yticklabels(names)
-    pp.ax.set_xlim(0, max(val + err for val, err in zip(values, errs)) * 1.1)
-
-    if one_point_zero_line:
-        pp.ax.axvline(x=1.0, color='r', linestyle='--', linewidth=1, alpha=0.5)
-
-    if x_formatter:
-        pp.ax.xaxis.set_major_formatter(FuncFormatter(x_formatter))
-
-    pp.fig.subplots_adjust(left=0.2)
-
-
-@provide_plot(no_legend=True, long_plot=True, noaxis=True)
-def plot_hmap_from_2d(pp: PlotParams, data2d: numpy.ndarray, xlabel: str, ylabel: str,
-                      bins: numpy.ndarray = None) -> None:
-    ioq1d, ranges = hmap_from_2d(data2d)
-    heatmap, bins = process_heatmap_data(ioq1d, bin_ranges=ranges, bins=bins)
-    bins_populations, _ = numpy.histogram(ioq1d, bins)
-
-    ax, _ = do_plot_hmap_with_histo(pp.fig,
-                                    heatmap,
-                                    bins_populations,
-                                    bins,
-                                    cmap=pp.colors.hmap_cmap,
-                                    cbar=pp.style.heatmap_colorbar,
-                                    histo_grid=pp.style.histo_grid)
-    ax.set(ylabel=ylabel, xlabel=xlabel)
-
-
-@provide_plot(eng=True, grid='y')
-def plot_v_over_time(pp: PlotParams, units: str, ts: TimeSeries,
-                     plot_avg_dev: bool = True, plot_points: bool = True) -> None:
-
-    min_time = min(ts.times)
-
-    # convert time to ms
-    coef = float(unit_conversion_coef(ts.time_units, 's'))
-    time_points = numpy.array([(val_time - min_time) * coef for val_time in ts.times])
-
-    outliers_idxs = find_ouliers_ts(ts.data, cut_range=pp.style.outliers_q_nd)
-    outliers_4q_idxs = find_ouliers_ts(ts.data, cut_range=pp.style.outliers_hide_q_nd)
-    normal_idxs = numpy.logical_not(outliers_idxs)
-    outliers_idxs = outliers_idxs & numpy.logical_not(outliers_4q_idxs)
-    # hidden_outliers_count = numpy.count_nonzero(outliers_4q_idxs)
-
-    data = ts.data[normal_idxs]
-    data_times = time_points[normal_idxs]
-    outliers = ts.data[outliers_idxs]
-    outliers_times = time_points[outliers_idxs]
-
-    if plot_points:
-        alpha = pp.colors.noise_alpha if plot_avg_dev else 1.0
-        pp.ax.plot(data_times, data, pp.style.point_shape, color=pp.colors.primary_color, alpha=alpha, label="Data")
-        pp.ax.plot(outliers_times, outliers, pp.style.err_point_shape, color=pp.colors.err_color, label="Outliers")
-
-    has_negative_dev = False
-    plus_minus = "\xb1"
-
-    if plot_avg_dev and len(data) < pp.style.avg_range * 2:
-        logger.warning("Array %r to small to plot average over %s points", pp.title, pp.style.avg_range)
-    elif plot_avg_dev:
-        avg_vals = moving_average(data, pp.style.avg_range)
-        dev_vals = moving_dev(data, pp.style.avg_range)
-        avg_times = moving_average(data_times, pp.style.avg_range)
-
-        if (plot_points and pp.style.approx_average) or (not plot_points and pp.style.approx_average_no_points):
-            avg_vals = approximate_curve(avg_times, avg_vals, avg_times, pp.style.curve_approx_level)
-            dev_vals = approximate_curve(avg_times, dev_vals, avg_times, pp.style.curve_approx_level)
-
-        pp.ax.plot(avg_times, avg_vals, c=pp.colors.suppl_color1, label="Average")
-
-        low_vals_dev = avg_vals - dev_vals * pp.style.dev_range_x
-        hight_vals_dev = avg_vals + dev_vals * pp.style.dev_range_x
-        if (pp.style.dev_range_x - int(pp.style.dev_range_x)) < 0.01:
-            pp.ax.plot(avg_times, low_vals_dev, c=pp.colors.suppl_color2,
-                       label="{}{}*stdev".format(plus_minus, int(pp.style.dev_range_x)))
-        else:
-            pp.ax.plot(avg_times, low_vals_dev, c=pp.colors.suppl_color2,
-                       label="{}{}*stdev".format(plus_minus, pp.style.dev_range_x))
-        pp.ax.plot(avg_times, hight_vals_dev, c=pp.colors.suppl_color2)
-        has_negative_dev = low_vals_dev.min() < 0
-
-    pp.ax.set_xlim(-5, max(time_points) + 5)
-    pp.ax.set_xlabel("Time, seconds from test begin")
-
-    if plot_avg_dev:
-        pp.ax.set_ylabel("{}. Average and {}stddev over {} points".format(units, plus_minus, pp.style.avg_range))
-    else:
-        pp.ax.set_ylabel(units)
-
-    if has_negative_dev:
-        pp.ax.set_ylim(bottom=0)
-
-
-@provide_plot(eng=True, no_legend=True, grid='y', noadjust=True)
-def plot_lat_over_time(pp: PlotParams, ts: TimeSeries) -> None:
-    times = ts.times - min(ts.times)
-    step = len(times) / pp.style.lat_samples
-    points = [times[int(i * step + 0.5)] for i in range(pp.style.lat_samples)]
-    points.append(times[-1])
-    bounds = list(zip(points[:-1], points[1:]))
-    agg_data = []
-    positions = []
-    labels = []
-
-    for begin, end in bounds:
-        agg_hist = ts.data[begin:end].sum(axis=0)
-
-        if pp.style.violin_instead_of_box:
-            # cut outliers
-            idx1, idx2 = hist_outliers_perc(agg_hist, pp.style.outliers_lat)
-            agg_hist = agg_hist[idx1:idx2]
-            curr_bins_vals = ts.histo_bins[idx1:idx2]
-
-            correct_coef = pp.style.violin_point_count / sum(agg_hist)
-            if correct_coef > 1:
-                correct_coef = 1
-        else:
-            curr_bins_vals = ts.histo_bins
-            correct_coef = 1
-
-        vals = numpy.empty(shape=[numpy.sum(agg_hist)], dtype='float32')
-        cidx = 0
-
-        non_zero, = agg_hist.nonzero()
-        for pos in non_zero:
-            count = int(agg_hist[pos] * correct_coef + 0.5)
-
-            if count != 0:
-                vals[cidx: cidx + count] = curr_bins_vals[pos]
-                cidx += count
-
-        agg_data.append(vals[:cidx])
-        positions.append((end + begin) / 2)
-        labels.append(str((end + begin) // 2))
-
-    if pp.style.violin_instead_of_box:
-        patches = pp.ax.violinplot(agg_data, positions=positions, showmeans=True, showmedians=True, widths=step / 2)
-        patches['cmeans'].set_color("blue")
-        patches['cmedians'].set_color("green")
-        if pp.style.legend_for_eng:
-            legend_location = "center left"
-            legend_bbox_to_anchor = (1.03, 0.81)
-            pp.ax.legend([patches['cmeans'], patches['cmedians']], ["mean", "median"],
-                         loc=legend_location, bbox_to_anchor=legend_bbox_to_anchor)
-    else:
-        pp.ax.boxplot(agg_data, 0, '', positions=positions, labels=labels, widths=step / 4)
-
-    pp.ax.set_xlim(min(times), max(times))
-    pp.ax.set_xlabel("Time, seconds from test begin, sampled for ~{} seconds".format(int(step)))
-    pp.fig.subplots_adjust(right=pp.style.subplot_adjust_r)
-
-
-@provide_plot(eng=True, no_legend=True, noaxis=True, long_plot=True)
-def plot_histo_heatmap(pp: PlotParams, ts: TimeSeries, ylabel: str, xlabel: str = "time, s") -> None:
-
-    # only histogram-based ts can be plotted
-    assert len(ts.data.shape) == 2
-
-    # Find global outliers. As load is expected to be stable during one job
-    # outliers range can be detected globally
-    total_hist = ts.data.sum(axis=0)
-    idx1, idx2 = hist_outliers_perc(total_hist,
-                                    bounds_perc=pp.style.outliers_lat,
-                                    min_bins_left=pp.style.hm_hist_bins_count)
-
-    # merge outliers with most close non-outliers cell
-    orig_data = ts.data[:, idx1:idx2].copy()
-    if idx1 > 0:
-        orig_data[:, 0] += ts.data[:, :idx1].sum(axis=1)
-
-    if idx2 < ts.data.shape[1]:
-        orig_data[:, -1] += ts.data[:, idx2:].sum(axis=1)
-
-    bins_vals = ts.histo_bins[idx1:idx2]
-
-    # rebin over X axis
-    # aggregate some lines in ts.data to plot ~style.hm_x_slots x bins
-    agg_idx = float(len(orig_data)) / pp.style.hm_x_slots
-    if agg_idx >= 2:
-        idxs = list(map(int, numpy.round(numpy.arange(0, len(orig_data) + 1, agg_idx))))
-        assert len(idxs) > 1
-        data = numpy.empty([len(idxs) - 1, orig_data.shape[1]], dtype=numpy.float32)  # type: List[numpy.ndarray]
-        for idx, (sidx, eidx) in enumerate(zip(idxs[:-1], idxs[1:])):
-            data[idx] = orig_data[sidx:eidx,:].sum(axis=0) / (eidx - sidx)
-    else:
-        data = orig_data
-
-    # rebin over Y axis
-    # =================
-
-    # don't using rebin_histogram here, as we need apply same bins for many arrays
-    step = (bins_vals[-1] - bins_vals[0]) / pp.style.hm_hist_bins_count
-    new_bins_edges = numpy.arange(pp.style.hm_hist_bins_count) * step + bins_vals[0]
-    bin_mapping = numpy.clip(numpy.searchsorted(new_bins_edges, bins_vals) - 1, 0, len(new_bins_edges) - 1)
-
-    # map origin bins ranges to heatmap bins, iterate over rows
-    cmap = []
-    for line in data:
-        curr_bins = [0] * pp.style.hm_hist_bins_count
-        for idx, count in zip(bin_mapping, line):
-            curr_bins[idx] += count
-        cmap.append(curr_bins)
-    ncmap = numpy.array(cmap)
-
-    histo = ncmap.sum(axis=0).reshape((-1,))
-    ax, _ = do_plot_hmap_with_histo(pp.fig, ncmap, histo, new_bins_edges,
-                                    cmap=pp.colors.hmap_cmap,
-                                    cbar=pp.style.heatmap_colorbar, avg_labels=True)
-    ax.set(ylabel=ylabel, xlabel=xlabel)
-
-
 @provide_plot(eng=False, no_legend=True, grid='y', style_name='ioqd', noadjust=True)
 def io_chart(pp: PlotParams,
              legend: str,
@@ -430,8 +51,8 @@
     block_size = iosums[0].block_size
     xpos = numpy.arange(1, len(iosums) + 1, dtype='uint')
 
-    coef_mb = float(unit_conversion_coef(iosums[0].bw.units, "MiBps"))
-    coef_iops = float(unit_conversion_coef(iosums[0].bw.units, "KiBps")) / block_size
+    coef_mb = unit_conversion_coef_f(iosums[0].bw.units, "MiBps")
+    coef_iops = unit_conversion_coef_f(iosums[0].bw.units, "KiBps") / block_size
 
     iops_primary = block_size < pp.style.large_blocks
 
@@ -476,7 +97,7 @@
     ax2 = pp.ax.twinx()
 
     # plot median and 95 perc latency
-    lat_coef_ms = float(unit_conversion_coef(iosums[0].lat.units, "ms"))
+    lat_coef_ms = unit_conversion_coef_f(iosums[0].lat.units, "ms")
     ax2.plot(xpos, [iosum.lat.perc_50 * lat_coef_ms for iosum in iosums], label="lat med")
     ax2.plot(xpos, [iosum.lat.perc_95 * lat_coef_ms for iosum in iosums], label="lat 95%")
 
diff --git a/wally/process_results.py b/wally/process_results.py
deleted file mode 100644
index b2ed783..0000000
--- a/wally/process_results.py
+++ /dev/null
@@ -1,52 +0,0 @@
-# put all result preprocessing here
-# # selection, aggregation
-#
-# from io import BytesIO
-# import logging
-# from typing import Any
-#
-# from .stage import Stage, StepOrder
-# from .test_run_class import TestRun
-# from .statistic import calc_norm_stat_props, calc_histo_stat_props
-# from .result_classes import StatProps, DataSource, TimeSeries
-# from .hlstorage import ResultStorage
-# from .suits.io.fio_hist import get_lat_vals, expected_lat_bins
-# from .suits.io.fio import FioTest
-# from .utils import StopTestError
-#
-# import matplotlib
-# matplotlib.use('svg')
-# import matplotlib.pyplot as plt
-#
-#
-# logger = logging.getLogger("wally")
-#
-#
-# class CalcStatisticStage(Stage):
-#     priority = StepOrder.TEST + 1
-#
-#     def run(self, ctx: TestRun) -> None:
-#         rstorage = ResultStorage(ctx.storage)
-#
-#         for suite in rstorage.iter_suite(FioTest.name):
-#             for job in rstorage.iter_job(suite):
-#                 for ts in rstorage.iter_ts(suite, job):
-#                     if ts.source.sensor == 'lat':
-#                         if ts.data.shape[1] != expected_lat_bins:
-#                             logger.error("Sensor %s.%s on node %s has" +
-#                                          "shape=%s. Can only process sensors with shape=[X,%s].",
-#                                          ts.source.dev, ts.source.sensor, ts.source.node_id,
-#                                          ts.data.shape, expected_lat_bins)
-#                             continue
-#
-#                         ts.bins_edges = get_lat_vals(ts.data.shape[1])
-#                         stat_prop = calc_histo_stat_props(ts)  # type: StatProps
-#
-#                     elif len(ts.data.shape) != 1:
-#                         logger.warning("Sensor %s.%s on node %s provide 2+D data. Can't process it.",
-#                                        ts.source.dev, ts.source.sensor, ts.source.node_id)
-#                         continue
-#                     else:
-#                         stat_prop = calc_norm_stat_props(ts)
-#
-#         raise StopTestError()
diff --git a/wally/report.py b/wally/report.py
index 3ac1292..d37ac60 100644
--- a/wally/report.py
+++ b/wally/report.py
@@ -11,17 +11,21 @@
 
 import wally
 
-from . import html
+from cephlib import html
+from cephlib.units import b2ssize, b2ssize_10, unit_conversion_coef, unit_conversion_coef_f
+from cephlib.statistic import calc_norm_stat_props
+from cephlib.storage_selectors import summ_sensors, find_sensors_to_2d
+from cephlib.wally_storage import find_nodes_by_roles
+
+from .utils import STORAGE_ROLES
 from .stage import Stage, StepOrder
 from .test_run_class import TestRun
-from .hlstorage import ResultStorage
-from .utils import b2ssize, b2ssize_10, STORAGE_ROLES, unit_conversion_coef
-from .statistic import calc_norm_stat_props
+from .result_classes import IResultStorage
 from .result_classes import DataSource, TimeSeries, SuiteConfig
 from .suits.io.fio import FioTest, FioJobConfig
 from .suits.io.fio_job import FioJobParams
 from .suits.job import JobConfig
-from .data_selectors import get_aggregated, AGG_TAG, summ_sensors, find_sensors_to_2d, find_nodes_by_roles
+from .data_selectors import get_aggregated, AGG_TAG
 from .report_profiles import (DefStyleProfile, DefColorProfile, StyleProfile, ColorProfile,
                               default_format, io_chart_format)
 from .plot import (io_chart, plot_simple_bars, plot_hmap_from_2d, plot_lat_over_time, plot_simple_over_time,
@@ -139,7 +143,7 @@
 #  --------------------  REPORTS  --------------------------------------------------------------------------------------
 
 class ReporterBase:
-    def __init__(self, rstorage: ResultStorage, style: StyleProfile, colors: ColorProfile) -> None:
+    def __init__(self, rstorage: IResultStorage, style: StyleProfile, colors: ColorProfile) -> None:
         self.style = style
         self.colors = colors
         self.rstorage = rstorage
@@ -223,7 +227,7 @@
 
     def get_divs(self, suite: SuiteConfig) -> Iterator[Tuple[str, str, HTMLBlock]]:
         qd_grouped_jobs = {}  # type: Dict[FioJobParams, List[FioJobConfig]]
-        test_nc = len(list(find_nodes_by_roles(self.rstorage, ['testnode'])))
+        test_nc = len(list(find_nodes_by_roles(self.rstorage.storage, ['testnode'])))
         for job in self.rstorage.iter_job(suite):
             fjob = cast(FioJobConfig, job)
             if fjob.bsize != 4:
@@ -260,7 +264,7 @@
                             tag=io_chart_format)
 
             fpath = self.plt(plot_simple_bars, ds, jc_no_qd.long_summary, labels, vals, errs,
-                             xlabel="CPU time per IOP", ylabel="QD * Test nodes" if test_nc != 1 else "QD",
+                             xlabel="CPU core time per IOP", ylabel="QD * Test nodes" if test_nc != 1 else "QD",
                              x_formatter=(lambda x, pos: b2ssize_10(x) + 's'),
                              one_point_zero_line=False)
 
@@ -277,7 +281,7 @@
         io_sum = make_iosum(self.rstorage, suite, fjob, self.style.hist_boxes)
 
         caption = "Test summary - " + job.params.long_summary
-        test_nc = len(list(find_nodes_by_roles(self.rstorage, ['testnode'])))
+        test_nc = len(list(find_nodes_by_roles(self.rstorage.storage, ['testnode'])))
         if test_nc > 1:
             caption += " * {} nodes".format(test_nc)
 
@@ -297,7 +301,7 @@
 
         bw_units = "B"
         bw_target_units = bw_units + 'ps'
-        bw_coef = float(unit_conversion_coef(io_sum.bw.units, bw_target_units))
+        bw_coef = unit_conversion_coef_f(io_sum.bw.units, bw_target_units)
 
         adf_v, *_1, stats, _2 = adfuller(io_sum.bw.data)
 
@@ -323,7 +327,7 @@
         stat_data = [bw_data]
 
         if fjob.bsize < StyleProfile.large_blocks:
-            iops_coef = float(unit_conversion_coef(io_sum.bw.units, 'KiBps')) / fjob.bsize
+            iops_coef = unit_conversion_coef_f(io_sum.bw.units, 'KiBps') / fjob.bsize
             iops_data = ["IOPS",
                          b2ssize_10(io_sum.bw.data.sum() * iops_coef),
                          "{}IOPS ~ {}IOPS".format(b2ssize_10(io_sum.bw.average * iops_coef),
@@ -337,7 +341,7 @@
                          ad_test]
 
             lat_target_unit = 's'
-            lat_coef = unit_conversion_coef(io_sum.lat.units, lat_target_unit)
+            lat_coef = unit_conversion_coef_f(io_sum.lat.units, lat_target_unit)
             # latency
             lat_data = ["Latency",
                         "-",
@@ -416,8 +420,8 @@
             for name in records.keys()
         }
 
-        short_name[ResourceNames.storage_cpu_s] = "CPU (s/IOP)"
-        short_name[ResourceNames.storage_cpu_s_b] = "CPU (s/B)"
+        short_name[ResourceNames.storage_cpu_s] = "CPU core (s/IOP)"
+        short_name[ResourceNames.storage_cpu_s_b] = "CPU core (s/B)"
 
         with doc.tbody:
             with doc.tr:
@@ -531,24 +535,21 @@
 
     def get_divs(self, suite: SuiteConfig, job: JobConfig) -> Iterator[Tuple[str, str, HTMLBlock]]:
 
-        nodes = list(find_nodes_by_roles(self.rstorage, STORAGE_ROLES))
+        nodes = list(find_nodes_by_roles(self.rstorage.storage, STORAGE_ROLES))
 
         sensor = 'block-io'
         metric = 'io_queue'
         bn_val = 16
 
-        for node in nodes:
+        for node_id in nodes:
             bn = 0
             tot = 0
-            for _, ds in self.rstorage.iter_sensors(node_id=node.node_id, sensor=sensor, metric=metric):
+            for _, ds in self.rstorage.iter_sensors(node_id=node_id, sensor=sensor, metric=metric):
                 if ds.dev in ('sdb', 'sdc', 'sdd', 'sde'):
-                    data = self.rstorage.load_sensor(ds)
-                    p1 = job.reliable_info_range_s[0] * unit_conversion_coef('s', data.time_units)
-                    p2 = job.reliable_info_range_s[1] * unit_conversion_coef('s', data.time_units)
-                    idx1, idx2 = numpy.searchsorted(data.times, (p1, p2))
-                    bn += (data.data[idx1: idx2] > bn_val).sum()
-                    tot += idx2 - idx1
-            print(node, bn, tot)
+                    ts = self.rstorage.get_sensor(ds, job.reliable_info_range_s)
+                    bn += (ts.data > bn_val).sum()
+                    tot += len(ts.data)
+            print(node_id, bn, tot)
 
         yield Menu1st.per_job, job.summary, HTMLBlock("")
 
@@ -586,7 +587,7 @@
         storage_devs = None
         test_nodes_devs = ['rbd0']
 
-        for node in find_nodes_by_roles(self.rstorage, STORAGE_ROLES):
+        for node in self.rstorage.find_nodes(STORAGE_ROLES):
             cjd = set(node.params['ceph_journal_devs'])
             if journal_devs is None:
                 journal_devs = cjd
@@ -609,8 +610,9 @@
                 HTMLBlock(html.H2(html.center("{} IO heatmaps".format(name.capitalize()))))
 
             # QD heatmap
-            ioq2d = find_sensors_to_2d(self.rstorage, roles, sensor='block-io', devs=devs,
-                                       metric='io_queue', time_range=trange)
+            nodes = find_nodes_by_roles(self.rstorage.storage, roles)
+            ioq2d = find_sensors_to_2d(self.rstorage, trange, sensor='block-io', devs=devs,
+                                       node_id=nodes, metric='io_queue', )
 
             ds = DataSource(suite.storage_id, job.storage_id, AGG_TAG, 'block-io', name, tag="hmap." + default_format)
 
@@ -619,11 +621,11 @@
             yield Menu1st.per_job, job.summary, HTMLBlock(html.img(fname))
 
             # Block size heatmap
-            wc2d = find_sensors_to_2d(self.rstorage, roles, sensor='block-io', devs=devs,
-                                      metric='writes_completed', time_range=trange)
+            wc2d = find_sensors_to_2d(self.rstorage, trange, node_id=nodes, sensor='block-io', devs=devs,
+                                      metric='writes_completed')
             wc2d[wc2d < 1E-3] = 1
-            sw2d = find_sensors_to_2d(self.rstorage, roles, sensor='block-io', devs=devs,
-                                      metric='sectors_written', time_range=trange)
+            sw2d = find_sensors_to_2d(self.rstorage, trange, node_id=nodes, sensor='block-io', devs=devs,
+                                      metric='sectors_written')
             data2d = sw2d / wc2d / 1024
             fname = self.plt(plot_hmap_from_2d, ds(metric='wr_block_size'),
                              data2d=data2d, title=name.capitalize() + " write block size",
@@ -631,8 +633,8 @@
             yield Menu1st.per_job, job.summary, HTMLBlock(html.img(fname))
 
             # iotime heatmap
-            wtime2d = find_sensors_to_2d(self.rstorage, roles, sensor='block-io', devs=devs,
-                                         metric='io_time', time_range=trange)
+            wtime2d = find_sensors_to_2d(self.rstorage, trange, node_id=nodes, sensor='block-io', devs=devs,
+                                         metric='io_time')
             fname = self.plt(plot_hmap_from_2d, ds(metric='io_time'), data2d=wtime2d,
                              xlabel='Time', ylabel="IO time (ms) per second",
                              title=name.capitalize() + " iotime", bins=StyleProfile.iotime_bins)
@@ -650,24 +652,25 @@
 
         yield Menu1st.per_job, job.summary, HTMLBlock(html.H2(html.center("Load tool results")))
 
-        agg_io = get_aggregated(self.rstorage, suite, fjob, "bw")
+        agg_io = get_aggregated(self.rstorage, suite.storage_id, fjob.storage_id, "bw", job.reliable_info_range_s)
         if fjob.bsize >= DefStyleProfile.large_blocks:
             title = "Fio measured Bandwidth over time"
             units = "MiBps"
-            agg_io.data //= int(unit_conversion_coef(units, agg_io.units))
+            agg_io.data //= int(unit_conversion_coef_f(units, agg_io.units))
         else:
             title = "Fio measured IOPS over time"
-            agg_io.data //= (int(unit_conversion_coef("KiBps", agg_io.units)) * fjob.bsize)
+            agg_io.data //= (int(unit_conversion_coef_f("KiBps", agg_io.units)) * fjob.bsize)
             units = "IOPS"
 
         fpath = self.plt(plot_v_over_time, agg_io.source(tag='ts.' + default_format), title, units, agg_io)
         yield Menu1st.per_job, fjob.summary, HTMLBlock(html.img(fpath))
 
         if fjob.bsize < DefStyleProfile.large_blocks:
-            agg_lat = get_aggregated(self.rstorage, suite, fjob, "lat").copy()
+            agg_lat = get_aggregated(self.rstorage, suite.storage_id, fjob.storage_id, "lat",
+                                     job.reliable_info_range_s)
             TARGET_UNITS = 'ms'
-            coef = unit_conversion_coef(agg_lat.units, TARGET_UNITS)
-            agg_lat.histo_bins = agg_lat.histo_bins.copy() * float(coef)
+            coef = unit_conversion_coef_f(agg_lat.units, TARGET_UNITS)
+            agg_lat.histo_bins = agg_lat.histo_bins.copy() * coef
             agg_lat.units = TARGET_UNITS
 
             fpath = self.plt(plot_lat_over_time, agg_lat.source(tag='ts.' + default_format), "Latency", agg_lat,
@@ -681,15 +684,15 @@
 
         fjob = cast(FioJobConfig, job)
 
-        agg_io = get_aggregated(self.rstorage, suite, fjob, "bw")
+        agg_io = get_aggregated(self.rstorage, suite.storage_id, fjob.storage_id, "bw", job.reliable_info_range_s)
 
         if fjob.bsize >= DefStyleProfile.large_blocks:
             title = "BW distribution"
             units = "MiBps"
-            agg_io.data //= int(unit_conversion_coef(units, agg_io.units))
+            agg_io.data //= int(unit_conversion_coef_f(units, agg_io.units))
         else:
             title = "IOPS distribution"
-            agg_io.data //= (int(unit_conversion_coef("KiBps", agg_io.units)) * fjob.bsize)
+            agg_io.data //= (int(unit_conversion_coef_f("KiBps", agg_io.units)) * fjob.bsize)
             units = "IOPS"
 
         io_stat_prop = calc_norm_stat_props(agg_io, bins_count=StyleProfile.hist_boxes)
@@ -716,34 +719,34 @@
         sensors = []
         max_iop = 0
         max_bytes = 0
-
+        stor_nodes = find_nodes_by_roles(self.rstorage.storage, STORAGE_ROLES)
         for sensor, metric, op, units in self.storage_sensors:
-            ts = summ_sensors(self.rstorage, STORAGE_ROLES, sensor, metric, job.reliable_info_range_s)
-            ds = DataSource(suite_id=suite.storage_id,
-                            job_id=job.storage_id,
-                            node_id="storage",
-                            sensor=sensor,
-                            dev=AGG_TAG,
-                            metric=metric,
-                            tag="ts." + default_format)
+            ts = summ_sensors(self.rstorage, job.reliable_info_range_s, node_id=stor_nodes, sensor=sensor,
+                              metric=metric)
+            if ts is not None:
+                ds = DataSource(suite_id=suite.storage_id,
+                                job_id=job.storage_id,
+                                node_id="storage",
+                                sensor=sensor,
+                                dev=AGG_TAG,
+                                metric=metric,
+                                tag="ts." + default_format)
 
-            data = ts.data if units != 'MiB' else ts.data * float(unit_conversion_coef(ts.units, 'MiB'))
-            ts = TimeSeries(name="",
-                            times=numpy.arange(*job.reliable_info_range_s),
-                            data=data,
-                            raw=None,
-                            units=units if ts.units is None else ts.units,
-                            time_units=ts.time_units,
-                            source=ds,
-                            histo_bins=ts.histo_bins)
+                data = ts.data if units != 'MiB' else ts.data * unit_conversion_coef_f(ts.units, 'MiB')
+                ts = TimeSeries(times=numpy.arange(*job.reliable_info_range_s),
+                                data=data,
+                                units=units if ts.units is None else ts.units,
+                                time_units=ts.time_units,
+                                source=ds,
+                                histo_bins=ts.histo_bins)
 
-            sensors.append(("{} {}".format(op, units), ds, ts, units))
+                sensors.append(("{} {}".format(op, units), ds, ts, units))
 
-            if units == 'iop':
-                max_iop = max(max_iop, data.sum())
-            else:
-                assert units == 'MiB'
-                max_bytes = max(max_bytes, data.sum())
+                if units == 'iop':
+                    max_iop = max(max_iop, data.sum())
+                else:
+                    assert units == 'MiB'
+                    max_bytes = max(max_bytes, data.sum())
 
         for title, ds, ts, units in sensors:
             if ts.data.sum() >= (max_iop if units == 'iop' else max_bytes) * DefStyleProfile.min_load_diff:
@@ -761,13 +764,11 @@
     priority = StepOrder.REPORT
 
     def run(self, ctx: TestRun) -> None:
-        rstorage = ResultStorage(ctx.storage)
-
         job_reporters_cls = [StatInfo, Resources, LoadToolResults, ClusterLoad, CPULoadPlot, QDIOTimeHeatmap]
-        job_reporters = [rcls(rstorage, DefStyleProfile, DefColorProfile) for rcls in job_reporters_cls]
+        job_reporters = [rcls(ctx.rstorage, DefStyleProfile, DefColorProfile) for rcls in job_reporters_cls]
 
         suite_reporters_cls = [IOQD, ResourceQD]
-        suite_reporters = [rcls(rstorage, DefStyleProfile, DefColorProfile) for rcls in suite_reporters_cls]
+        suite_reporters = [rcls(ctx.rstorage, DefStyleProfile, DefColorProfile) for rcls in suite_reporters_cls]
 
         root_dir = os.path.dirname(os.path.dirname(wally.__file__))
         doc_templ_path = os.path.join(root_dir, "report_templates/index.html")
@@ -788,8 +789,8 @@
         job_summ_sort_order = []
 
         # TODO: filter reporters
-        for suite in rstorage.iter_suite(FioTest.name):
-            all_jobs = list(rstorage.iter_job(suite))
+        for suite in ctx.rstorage.iter_suite(FioTest.name):
+            all_jobs = list(ctx.rstorage.iter_job(suite))
             all_jobs.sort(key=lambda job: job.params)
 
             new_jobs_in_order = [job.summary for job in all_jobs]
@@ -797,7 +798,7 @@
             assert not same, "Job with same names in different suits found: " + ",".join(same)
             job_summ_sort_order.extend(new_jobs_in_order)
 
-            for job in all_jobs[:1]:
+            for job in all_jobs:
                 try:
                     for reporter in job_reporters:
                         logger.debug("Start reporter %s on job %s suite %s",
@@ -845,6 +846,6 @@
 
         report = report_template.replace("{{{menu}}}", ("\n" + " " * 16).join(menu_block))
         report = report.replace("{{{content}}}", ("\n" + " " * 16).join(content_block))
-        report_path = rstorage.put_report(report, "index.html")
-        rstorage.put_report(css_file, "main.css")
+        report_path = ctx.rstorage.put_report(report, "index.html")
+        ctx.rstorage.put_report(css_file, "main.css")
         logger.info("Report is stored into %r", report_path)
diff --git a/wally/resources.py b/wally/resources.py
index f063eb5..d867a1a 100644
--- a/wally/resources.py
+++ b/wally/resources.py
@@ -1,15 +1,24 @@
+import logging
 from typing import Tuple, Dict, cast, List
 
 import numpy
 
-from .hlstorage import ResultStorage
-from .utils import b2ssize_10, b2ssize, unit_conversion_coef, STORAGE_ROLES
-from .result_classes import SuiteConfig
+
+from cephlib.units import b2ssize_10, b2ssize, unit_conversion_coef_f
+from cephlib.statistic import NormStatProps, HistoStatProps, calc_norm_stat_props, calc_histo_stat_props
+from cephlib.numeric_types import TimeSeries
+from cephlib.wally_storage import find_nodes_by_roles
+from cephlib.storage_selectors import summ_sensors
+
+from .result_classes import IResultStorage, SuiteConfig
+from .utils import STORAGE_ROLES
 from .suits.io.fio import FioJobConfig
 from .suits.job import JobConfig
-from .result_classes import NormStatProps, HistoStatProps, TimeSeries
-from .statistic import calc_norm_stat_props, calc_histo_stat_props
-from .data_selectors import get_aggregated, AGG_TAG, summ_sensors, find_sensors_to_2d, find_nodes_by_roles
+from .data_selectors import get_aggregated
+from .hw_info import HWInfo
+
+
+logger = logging.getLogger('wally')
 
 
 class IOSummary:
@@ -76,15 +85,15 @@
 iosum_cache = {}  # type: Dict[Tuple[str, str]]
 
 
-def make_iosum(rstorage: ResultStorage, suite: SuiteConfig, job: FioJobConfig, hist_boxes: int,
+def make_iosum(rstorage: IResultStorage, suite: SuiteConfig, job: FioJobConfig, hist_boxes: int,
                nc: bool = False) -> IOSummary:
 
     key = (suite.storage_id, job.storage_id)
     if not nc and key in iosum_cache:
         return iosum_cache[key]
 
-    lat = get_aggregated(rstorage, suite, job, "lat")
-    io = get_aggregated(rstorage, suite, job, "bw")
+    lat = get_aggregated(rstorage, suite.storage_id, job.storage_id, "lat", job.reliable_info_range_s)
+    io = get_aggregated(rstorage, suite.storage_id, job.storage_id, "bw", job.reliable_info_range_s)
 
     res = IOSummary(job.qd,
                     nodes_count=len(suite.nodes_ids),
@@ -101,7 +110,7 @@
 cpu_load_cache = {}  # type: Dict[Tuple[int, Tuple[str, ...], Tuple[int, int]], Dict[str, TimeSeries]]
 
 
-def get_cluster_cpu_load(rstorage: ResultStorage, roles: List[str],
+def get_cluster_cpu_load(rstorage: IResultStorage, roles: List[str],
                          time_range: Tuple[int, int], nc: bool = False) -> Dict[str, TimeSeries]:
 
     key = (id(rstorage), tuple(roles), time_range)
@@ -110,8 +119,9 @@
 
     cpu_ts = {}
     cpu_metrics = "idle guest iowait sirq nice irq steal sys user".split()
+    nodes = find_nodes_by_roles(rstorage.storage, roles)
     for name in cpu_metrics:
-        cpu_ts[name] = summ_sensors(rstorage, roles, sensor='system-cpu', metric=name, time_range=time_range)
+        cpu_ts[name] = summ_sensors(rstorage, time_range, nodes=nodes, sensor='system-cpu', metric=name)
 
     it = iter(cpu_ts.values())
     total_over_time = next(it).data.copy()  # type: numpy.ndarray
@@ -131,7 +141,7 @@
 
 def get_resources_usage(suite: SuiteConfig,
                         job: JobConfig,
-                        rstorage: ResultStorage,
+                        rstorage: IResultStorage,
                         large_block: int = 256,
                         hist_boxes: int = 10,
                         nc: bool = False) -> Tuple[Dict[str, Tuple[str, float, float]], bool]:
@@ -148,7 +158,7 @@
 
     io_sum = make_iosum(rstorage, suite, fjob, hist_boxes)
 
-    tot_io_coef = float(unit_conversion_coef(io_sum.bw.units, "Bps"))
+    tot_io_coef = unit_conversion_coef_f(io_sum.bw.units, "Bps")
     io_transfered = io_sum.bw.data * tot_io_coef
 
     records = {
@@ -156,7 +166,7 @@
     }  # type: Dict[str, Tuple[str, float, float]]
 
     if iops_ok:
-        ops_done = io_transfered / (fjob.bsize * float(unit_conversion_coef("KiBps", "Bps")))
+        ops_done = io_transfered / (fjob.bsize * unit_conversion_coef_f("KiBps", "Bps"))
         records[ResourceNames.io_made] = (b2ssize_10(ops_done.sum()) + "OP", None, None)
     else:
         ops_done = None
@@ -189,13 +199,14 @@
         if service_provided_count is None:
             continue
 
-        res_ts = summ_sensors(rstorage, roles, sensor=sensor, metric=metric, time_range=job.reliable_info_range_s)
+        nodes = find_nodes_by_roles(rstorage.storage, roles)
+        res_ts = summ_sensors(rstorage, job.reliable_info_range_s, nodes=nodes, sensor=sensor, metric=metric)
         if res_ts is None:
             continue
 
         data = res_ts.data
         if units == "B":
-            data = data * float(unit_conversion_coef(res_ts.units, "B"))
+            data = data * unit_conversion_coef_f(res_ts.units, "B")
 
         avg, dev = avg_dev_div(data, service_provided_count)
         if avg < 0.1:
@@ -204,10 +215,20 @@
         all_agg[vname] = data
 
     # cpu usage
-    nodes_count = len(list(find_nodes_by_roles(rstorage, STORAGE_ROLES)))
-    cpu_ts = get_cluster_cpu_load(rstorage, STORAGE_ROLES, job.reliable_info_range_s)
+    stor_cores_count = 0
+    all_stor_nodes = list(find_nodes_by_roles(rstorage.storage, STORAGE_ROLES))
+    for node in all_stor_nodes:
+        try:
+            node_hw_info = rstorage.storage.load(HWInfo, 'hw_info', node.node_id)
+        except KeyError:
+            logger.warning("No hw_info available for node %s. Using 'NODE time' instead of " +
+                           "CPU core time for CPU consumption metrics")
+            stor_cores_count = len(all_stor_nodes)
+            break
+        stor_cores_count += sum(cores for _, cores in node_hw_info.cores)
 
-    cpus_used_sec = (1.0 - cpu_ts['idle'].data / cpu_ts['total'].data) * nodes_count
+    cpu_ts = get_cluster_cpu_load(rstorage, STORAGE_ROLES, job.reliable_info_range_s)
+    cpus_used_sec = (1.0 - (cpu_ts['idle'].data + cpu_ts['iowait'].data) / cpu_ts['total'].data) * stor_cores_count
     used_s = b2ssize_10(cpus_used_sec.sum()) + 's'
 
     all_agg[ResourceNames.storage_cpu] = cpus_used_sec
diff --git a/wally/result_classes.py b/wally/result_classes.py
index 60a0a55..207ba71 100644
--- a/wally/result_classes.py
+++ b/wally/result_classes.py
@@ -1,15 +1,12 @@
 import abc
-import copy
-from typing import Dict, List, Any, Optional, Tuple, cast, Type, Iterator, NamedTuple
+from typing import Dict, List, Any, Tuple, cast, Type, Iterator, Union
 
-
-import numpy
-from scipy.stats.mstats_basic import NormaltestResult
+from cephlib.numeric_types import TimeSeries, DataSource
+from cephlib.statistic import StatProps
+from cephlib.istorage import IImagesStorage, Storable, ISensorStorage
 
 from .suits.job import JobConfig
-from .node_interfaces import IRPCNode
-from .common_types import Storable
-from .utils import round_digits, Number
+from .node_interfaces import IRPCNode, NodeInfo
 
 
 class SuiteConfig(Storable):
@@ -56,206 +53,8 @@
                 set(self.nodes_ids) == set(other.nodes_ids))
 
 
-class DataSource:
-    def __init__(self,
-                 suite_id: str = None,
-                 job_id: str = None,
-                 node_id: str = None,
-                 sensor: str = None,
-                 dev: str = None,
-                 metric: str = None,
-                 tag: str = None) -> None:
-        self.suite_id = suite_id
-        self.job_id = job_id
-        self.node_id = node_id
-        self.sensor = sensor
-        self.dev = dev
-        self.metric = metric
-        self.tag = tag
-
-    @property
-    def metric_fqdn(self) -> str:
-        return "{0.sensor}.{0.dev}.{0.metric}".format(self)
-
-    def __call__(self, **kwargs) -> 'DataSource':
-        dct = self.__dict__.copy()
-        dct.update(kwargs)
-        return self.__class__(**dct)
-
-    def __str__(self) -> str:
-        return ("suite={0.suite_id},job={0.job_id},node={0.node_id}," +
-                "path={0.sensor}.{0.dev}.{0.metric},tag={0.tag}").format(self)
-
-    def __repr__(self) -> str:
-        return str(self)
-
-    @property
-    def tpl(self) -> Tuple[Optional[str], Optional[str], Optional[str], Optional[str],
-                           Optional[str], Optional[str], Optional[str]]:
-        return self.suite_id, self.job_id, self.node_id, self.sensor, self.dev, self.metric, self.tag
-
-    def __eq__(self, o: object) -> bool:
-        return self.tpl == cast(DataSource, o).tpl
-
-    def __hash__(self) -> int:
-        return hash(self.tpl)
-
-
-class TimeSeries:
-    """Data series from sensor - either system sensor or from load generator tool (e.g. fio)"""
-
-    def __init__(self,
-                 name: str,
-                 raw: Optional[bytes],
-                 data: numpy.ndarray,
-                 times: numpy.ndarray,
-                 units: str,
-                 source: DataSource,
-                 time_units: str = 'us',
-                 raw_tag: str = 'txt',
-                 histo_bins: numpy.ndarray = None) -> None:
-
-        # Sensor name. Typically DEV_NAME.METRIC
-        self.name = name
-
-        # units for data
-        self.units = units
-
-        # units for time
-        self.time_units = time_units
-
-        # Time series times and values. Time in ms from Unix epoch.
-        self.times = times
-        self.data = data
-
-        # Raw sensor data (is provided). Like log file for fio iops/bw/lat.
-        self.raw = raw
-        self.raw_tag = raw_tag
-        self.source = source
-        self.histo_bins = histo_bins
-
-    def __str__(self) -> str:
-        res = "TS({}):\n".format(self.name)
-        res += "    source={}\n".format(self.source)
-        res += "    times_size={}\n".format(len(self.times))
-        res += "    data_shape={}\n".format(*self.data.shape)
-        return res
-
-    def __repr__(self) -> str:
-        return str(self)
-
-    def copy(self, no_data: bool = False) -> 'TimeSeries':
-        cp = copy.copy(self)
-
-        if not no_data:
-            cp.times = self.times.copy()
-            cp.data = self.data.copy()
-
-        cp.source = self.source()
-        return cp
-
-
 # (node_name, source_dev, metric_name) => metric_results
 JobMetrics = Dict[Tuple[str, str, str], TimeSeries]
-
-
-class StatProps(Storable):
-    "Statistic properties for timeseries with unknown data distribution"
-
-    __ignore_fields__ = ['data']
-
-    def __init__(self, data: numpy.array, units: str) -> None:
-        self.perc_99 = None  # type: float
-        self.perc_95 = None  # type: float
-        self.perc_90 = None  # type: float
-        self.perc_50 = None   # type: float
-        self.perc_10 = None  # type: float
-        self.perc_5 = None   # type: float
-        self.perc_1 = None   # type: float
-
-        self.min = None  # type: Number
-        self.max = None  # type: Number
-
-        # bin_center: bin_count
-        self.log_bins = False
-        self.bins_populations = None # type: numpy.array
-
-        # bin edges, one more element that in bins_populations
-        self.bins_edges = None  # type: numpy.array
-
-        self.data = data
-        self.units = units
-
-    def __str__(self) -> str:
-        res = ["{}(size = {}):".format(self.__class__.__name__, len(self.data))]
-        for name in ["perc_1", "perc_5", "perc_10", "perc_50", "perc_90", "perc_95", "perc_99"]:
-            res.append("    {} = {}".format(name, round_digits(getattr(self, name))))
-        res.append("    range {} {}".format(round_digits(self.min), round_digits(self.max)))
-        return "\n".join(res)
-
-    def __repr__(self) -> str:
-        return str(self)
-
-    def raw(self) -> Dict[str, Any]:
-        data = super().raw()
-        data['bins_mids'] = list(data['bins_mids'])
-        data['bins_populations'] = list(data['bins_populations'])
-        return data
-
-    @classmethod
-    def fromraw(cls, data: Dict[str, Any]) -> 'StatProps':
-        data['bins_mids'] = numpy.array(data['bins_mids'])
-        data['bins_populations'] = numpy.array(data['bins_populations'])
-        return cast(StatProps, super().fromraw(data))
-
-
-class HistoStatProps(StatProps):
-    """Statistic properties for 2D timeseries with unknown data distribution and histogram as input value.
-    Used for latency"""
-    def __init__(self, data: numpy.array, units: str) -> None:
-        StatProps.__init__(self, data, units)
-
-
-class NormStatProps(StatProps):
-    "Statistic properties for timeseries with normal data distribution. Used for iops/bw"
-    def __init__(self, data: numpy.array, units: str) -> None:
-        StatProps.__init__(self, data, units)
-
-        self.average = None  # type: float
-        self.deviation = None  # type: float
-        self.confidence = None  # type: float
-        self.confidence_level = None  # type: float
-        self.normtest = None  # type: NormaltestResult
-        self.skew = None  # type: float
-        self.kurt = None  # type: float
-
-    def __str__(self) -> str:
-        res = ["NormStatProps(size = {}):".format(len(self.data)),
-               "    distr = {} ~ {}".format(round_digits(self.average), round_digits(self.deviation)),
-               "    confidence({0.confidence_level}) = {1}".format(self, round_digits(self.confidence)),
-               "    perc_1 = {}".format(round_digits(self.perc_1)),
-               "    perc_5 = {}".format(round_digits(self.perc_5)),
-               "    perc_10 = {}".format(round_digits(self.perc_10)),
-               "    perc_50 = {}".format(round_digits(self.perc_50)),
-               "    perc_90 = {}".format(round_digits(self.perc_90)),
-               "    perc_95 = {}".format(round_digits(self.perc_95)),
-               "    perc_99 = {}".format(round_digits(self.perc_99)),
-               "    range {} {}".format(round_digits(self.min), round_digits(self.max)),
-               "    normtest = {0.normtest}".format(self),
-               "    skew ~ kurt = {0.skew} ~ {0.kurt}".format(self)]
-        return "\n".join(res)
-
-    def raw(self) -> Dict[str, Any]:
-        data = super().raw()
-        data['normtest'] = (data['nortest'].statistic, data['nortest'].pvalue)
-        return data
-
-    @classmethod
-    def fromraw(cls, data: Dict[str, Any]) -> 'NormStatProps':
-        data['normtest'] = NormaltestResult(*data['normtest'])
-        return cast(NormStatProps, super().fromraw(data))
-
-
 JobStatMetrics = Dict[Tuple[str, str, str], StatProps]
 
 
@@ -273,29 +72,7 @@
         self.processed = None  # type: JobStatMetrics
 
 
-ArrayData = NamedTuple("ArrayData",
-                       [('header', List[str]),
-                        ('histo_bins', Optional[numpy.ndarray]),
-                        ('data', Optional[numpy.ndarray])])
-
-
-class IResultStorage(metaclass=abc.ABCMeta):
-
-    @abc.abstractmethod
-    def sync(self) -> None:
-        pass
-
-    @abc.abstractmethod
-    def append_sensor(self, data: numpy.array, ds: DataSource, units: str, histo_bins: numpy.ndarray = None) -> None:
-        pass
-
-    @abc.abstractmethod
-    def load_sensor(self, ds: DataSource) -> TimeSeries:
-        pass
-
-    @abc.abstractmethod
-    def iter_sensors(self, ds: DataSource) -> Iterator[TimeSeries]:
-        pass
+class IResultStorage(ISensorStorage, IImagesStorage, metaclass=abc.ABCMeta):
 
     @abc.abstractmethod
     def put_or_check_suite(self, suite: SuiteConfig) -> None:
@@ -306,10 +83,6 @@
         pass
 
     @abc.abstractmethod
-    def put_ts(self, ts: TimeSeries) -> None:
-        pass
-
-    @abc.abstractmethod
     def put_extra(self, data: bytes, source: DataSource) -> None:
         pass
 
@@ -329,11 +102,31 @@
     def iter_job(self, suite: SuiteConfig) -> Iterator[JobConfig]:
         pass
 
-    @abc.abstractmethod
-    def iter_ts(self, suite: SuiteConfig, job: JobConfig) -> Iterator[TimeSeries]:
-        pass
-
     # return path to file to be inserted into report
     @abc.abstractmethod
     def put_plot_file(self, data: bytes, source: DataSource) -> str:
         pass
+
+    @abc.abstractmethod
+    def get_job_info(self, suite: SuiteConfig, job: JobConfig, key: str) -> Any:
+        pass
+
+    @abc.abstractmethod
+    def get_ts(self, ds: DataSource) -> TimeSeries:
+        pass
+
+    @abc.abstractmethod
+    def put_ts(self, ts: TimeSeries) -> None:
+        pass
+
+    @abc.abstractmethod
+    def iter_ts(self, **ds_parts) -> Iterator[DataSource]:
+        pass
+
+    @abc.abstractmethod
+    def put_job_info(self, suite: SuiteConfig, job: JobConfig, key: str, data: Any) -> None:
+        pass
+
+    @abc.abstractmethod
+    def find_nodes(self, roles: Union[str, List[str]]) -> List[NodeInfo]:
+        pass
\ No newline at end of file
diff --git a/wally/result_storage.py b/wally/result_storage.py
new file mode 100644
index 0000000..282bdb5
--- /dev/null
+++ b/wally/result_storage.py
@@ -0,0 +1,150 @@
+import os
+import pprint
+import logging
+from typing import cast, Iterator, Tuple, Type, Optional, Any
+
+import numpy
+
+from cephlib.wally_storage import WallyDB
+from cephlib.hlstorage import SensorStorageBase
+from cephlib.statistic import StatProps
+from cephlib.numeric_types import DataSource, TimeSeries
+
+from .suits.job import JobConfig
+from .result_classes import SuiteConfig, IResultStorage
+from .utils import StopTestError
+from .suits.all_suits import all_suits
+
+from cephlib.storage import Storage
+
+logger = logging.getLogger('wally')
+
+
+def fill_path(path: str, **params) -> str:
+    for name, val in params.items():
+        if val is not None:
+            path = path.replace("{" + name + "}", val)
+    return path
+
+
+class ResultStorage(IResultStorage, SensorStorageBase):
+    def __init__(self, storage: Storage) -> None:
+        SensorStorageBase.__init__(self, storage, WallyDB)
+
+    # -------------  CHECK DATA IN STORAGE  ----------------------------------------------------------------------------
+    def check_plot_file(self, source: DataSource) -> Optional[str]:
+        path = self.db_paths.plot.format(**source.__dict__)
+        fpath = self.storage.get_fname(self.db_paths.report_root + path)
+        return path if os.path.exists(fpath) else None
+
+    # -------------   PUT DATA INTO STORAGE   --------------------------------------------------------------------------
+    def put_or_check_suite(self, suite: SuiteConfig) -> None:
+        path = self.db_paths.suite_cfg.format(suite_id=suite.storage_id)
+        if path in self.storage:
+            db_cfg = self.storage.load(SuiteConfig, path)
+            if db_cfg != suite:
+                logger.error("Current suite %s config is not equal to found in storage at %s", suite.test_type, path)
+                logger.debug("Current: \n%s\nStorage:\n%s", pprint.pformat(db_cfg), pprint.pformat(suite))
+                raise StopTestError()
+        else:
+            self.storage.put(suite, path)
+
+    def put_job(self, suite: SuiteConfig, job: JobConfig) -> None:
+        path = self.db_paths.job_cfg.format(suite_id=suite.storage_id, job_id=job.storage_id)
+        self.storage.put(job, path)
+
+    def put_extra(self, data: bytes, source: DataSource) -> None:
+        self.storage.put_raw(data, self.db_paths.ts.format(**source.__dict__))
+
+    def put_stat(self, data: StatProps, source: DataSource) -> None:
+        self.storage.put(data, self.db_paths.stat.format(**source.__dict__))
+
+    # return path to file to be inserted into report
+    def put_plot_file(self, data: bytes, source: DataSource) -> str:
+        path = self.db_paths.plot.format(**source.__dict__)
+        self.storage.put_raw(data, self.db_paths.report_root + path)
+        return path
+
+    def put_report(self, report: str, name: str) -> str:
+        return self.storage.put_raw(report.encode(self.csv_file_encoding), self.db_paths.report_root + name)
+
+    def put_txt_report(self, suite: SuiteConfig, report: str) -> None:
+        path = self.db_paths.txt_report.format(suite_id=suite.storage_id)
+        self.storage.put_raw(report.encode('utf8'), path)
+
+    def put_job_info(self, suite: SuiteConfig, job: JobConfig, key: str, data: Any) -> None:
+        path = self.db_paths.job_extra.format(suite_id=suite.storage_id, job_id=job.storage_id, tag=key)
+        self.storage.put(data, path)
+
+    # -------------   GET DATA FROM STORAGE   --------------------------------------------------------------------------
+
+    def get_stat(self, stat_cls: Type[StatProps], source: DataSource) -> StatProps:
+        return self.storage.load(stat_cls, self.db_paths.stat.format(**source.__dict__))
+
+
+    def get_txt_report(self, suite: SuiteConfig) -> Optional[str]:
+        path = self.db_paths.txt_report.format(suite_id=suite.storage_id)
+        if path in self.storage:
+            return self.storage.get_raw(path).decode('utf8')
+
+    def get_job_info(self, suite: SuiteConfig, job: JobConfig, key: str) -> Any:
+        path = self.db_paths.job_extra.format(suite_id=suite.storage_id, job_id=job.storage_id, tag=key)
+        return self.storage.get(path, None)
+    # -------------   ITER OVER STORAGE   ------------------------------------------------------------------------------
+
+    def iter_suite(self, suite_type: str = None) -> Iterator[SuiteConfig]:
+        for is_file, suite_info_path, groups in self.iter_paths(self.db_paths.suite_cfg_r):
+            assert is_file
+            suite = self.storage.load(SuiteConfig, suite_info_path)
+            # suite = cast(SuiteConfig, self.storage.load(SuiteConfig, suite_info_path))
+            assert suite.storage_id == groups['suite_id']
+            if not suite_type or suite.test_type == suite_type:
+                yield suite
+
+    def iter_job(self, suite: SuiteConfig) -> Iterator[JobConfig]:
+        job_glob = fill_path(self.db_paths.job_cfg_r, suite_id=suite.storage_id)
+        job_config_cls = all_suits[suite.test_type].job_config_cls
+        for is_file, path, groups in self.iter_paths(job_glob):
+            assert is_file
+            job = cast(JobConfig, self.storage.load(job_config_cls, path))
+            assert job.storage_id == groups['job_id']
+            yield job
+
+    #  -----------------  TS  ------------------------------------------------------------------------------------------
+    def get_ts(self, ds: DataSource) -> TimeSeries:
+        path = self.db_paths.ts.format_map(ds.__dict__)
+        (units, time_units), header2, content = self.storage.get_array(path)
+
+        times = content[:,0].copy()
+        data = content[:,1:]
+
+        if data.shape[1] == 1:
+            data.shape = (data.shape[0],)
+
+        return TimeSeries(data=data, times=times, source=ds, units=units, time_units=time_units, histo_bins=header2)
+
+    def put_ts(self, ts: TimeSeries) -> None:
+        assert ts.data.dtype == ts.times.dtype, "Data type {!r} != time type {!r}".format(ts.data.dtype, ts.times.dtype)
+        assert ts.data.dtype.kind == 'u', "Only unsigned ints are accepted"
+        assert ts.source.tag == self.ts_arr_tag, \
+            "Incorrect source tag == {!r}, must be {!r}".format(ts.source.tag, self.ts_arr_tag)
+
+        if ts.source.metric == 'lat':
+            assert len(ts.data.shape) == 2, "Latency should be 2d array"
+            assert ts.histo_bins is not None, "Latency should have histo_bins field not empty"
+
+        csv_path = self.db_paths.ts.format_map(ts.source.__dict__)
+        header = [ts.units, ts.time_units]
+
+        tv = ts.times.view().reshape((-1, 1))
+
+        if len(ts.data.shape) == 1:
+            dv = ts.data.view().reshape((ts.times.shape[0], -1))
+        else:
+            dv = ts.data
+
+        result = numpy.concatenate((tv, dv), axis=1)
+        self.storage.put_array(csv_path, result, header, header2=ts.histo_bins, append_on_exists=False)
+
+    def iter_ts(self, **ds_parts) -> Iterator[DataSource]:
+        return self.iter_objs(self.db_paths.ts_r, **ds_parts)
\ No newline at end of file
diff --git a/wally/run_test.py b/wally/run_test.py
index 555cfa1..4e02a75 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -13,9 +13,7 @@
 from .sensors import collect_sensors_data
 from .suits.all_suits import all_suits
 from .test_run_class import TestRun
-from .utils import StopTestError
 from .result_classes import SuiteConfig
-from .hlstorage import ResultStorage
 
 
 logger = logging.getLogger("wally")
@@ -76,17 +74,22 @@
             t_end = time.time()
 
             for node, val in zip(ctx.nodes, tms):
-                max_delta = int(max(t_start - val, val - t_end) * 1000)
-                if max_delta > ctx.config.max_time_diff_ms:
+                delta = 0
+                if val > t_end:
+                    delta = val - t_end
+                elif t_start > val:
+                    delta = t_start - val
+
+                if delta > ctx.config.max_time_diff_ms:
                     msg = ("Too large time shift {}ms on node {}. Stopping test." +
                            " Fix time on cluster nodes and restart test, or change " +
-                           "max_time_diff_ms(={}ms) setting in config").format(max_delta,
+                           "max_time_diff_ms(={}ms) setting in config").format(delta,
                                                                                str(node),
                                                                                ctx.config.max_time_diff_ms)
                     logger.error(msg)
-                    raise StopTestError(msg)
-                if max_delta > 0:
-                    logger.warning("Node %s has time shift at least %s ms", node, max_delta)
+                    raise utils.StopTestError(msg)
+                if delta > 0:
+                    logger.warning("Node %s has time shift at least %s ms", node, delta)
 
 
     def cleanup(self, ctx: TestRun) -> None:
@@ -246,11 +249,11 @@
 
             if not test_nodes:
                 logger.error("No test nodes found")
-                raise StopTestError()
+                raise utils.StopTestError()
 
             if len(test_suite) != 1:
                 logger.error("Test suite %s contain more than one test. Put each test in separated group", suite_idx)
-                raise StopTestError()
+                raise utils.StopTestError()
 
             name, params = list(test_suite.items())[0]
             vm_count = params.get('node_limit', None)  # type: Optional[int]
@@ -267,7 +270,7 @@
 
             if name not in all_suits:
                 logger.error("Test suite %r not found. Only suits [%s] available", name, ", ".join(all_suits))
-                raise StopTestError()
+                raise utils.StopTestError()
 
             test_cls = all_suits[name]
             remote_dir = ctx.config.default_test_local_folder.format(name=name, uuid=ctx.config.run_uuid)
@@ -279,7 +282,7 @@
                                 idx=suite_idx,
                                 keep_raw_files=ctx.config.keep_raw_files)
 
-            test_cls(storage=ResultStorage(ctx.storage),
+            test_cls(storage=ctx.rstorage,
                      suite=suite,
                      on_idle=lambda: collect_sensors_data(ctx, False)).run()
 
@@ -312,7 +315,7 @@
             if ctx.nodes_info:
                 logger.error("Internal error: Some nodes already stored in " +
                              "nodes_info before LoadStoredNodesStage stage")
-                raise StopTestError()
+                raise utils.StopTestError()
 
             nodes = {node.node_id: node for node in ctx.storage.load_list(NodeInfo, SaveNodesStage.nodes_path)}
 
diff --git a/wally/sensors.py b/wally/sensors.py
index 89bc224..9fb2177 100644
--- a/wally/sensors.py
+++ b/wally/sensors.py
@@ -6,12 +6,12 @@
 import numpy
 
 from cephlib import sensors_rpc_plugin
+from cephlib.units import b2ssize
 
 from . import utils
 from .test_run_class import TestRun
 from .result_classes import DataSource
 from .stage import Stage, StepOrder
-from .hlstorage import ResultStorage
 
 
 plugin_fname = sensors_rpc_plugin.__file__.rsplit(".", 1)[0] + ".py"
@@ -86,7 +86,6 @@
 
 
 def collect_sensors_data(ctx: TestRun, stop: bool = False):
-    rstorage = ResultStorage(ctx.storage)
     total_sz = 0
 
     logger.info("Start loading sensors")
@@ -105,19 +104,19 @@
             for path, value, is_array, units in sensors_rpc_plugin.unpack_rpc_updates(data_tpl):
                 if path == 'collected_at':
                     ds = DataSource(node_id=node_id, metric='collected_at', tag='csv')
-                    rstorage.append_sensor(numpy.array(value), ds, units)
+                    ctx.rstorage.append_sensor(numpy.array(value), ds, units)
                 else:
                     sensor, dev, metric = path.split(".")
                     ds = DataSource(node_id=node_id, metric=metric, dev=dev, sensor=sensor, tag='csv')
                     if is_array:
-                        rstorage.append_sensor(numpy.array(value), ds, units)
+                        ctx.rstorage.append_sensor(numpy.array(value), ds, units)
                     else:
                         if metric == 'historic':
-                            rstorage.put_sensor_raw(bz2.compress(value), ds(tag='bin'))
+                            ctx.rstorage.put_sensor_raw(bz2.compress(value), ds(tag='bin'))
                         else:
                             assert metric in ('perf_dump', 'historic_js')
-                            rstorage.put_sensor_raw(value, ds(tag='js'))
-    logger.info("Download %sB of sensors data", utils.b2ssize(total_sz))
+                            ctx.rstorage.put_sensor_raw(value, ds(tag='js'))
+    logger.info("Download %sB of sensors data", b2ssize(total_sz))
 
 
 
diff --git a/wally/ssh.py b/wally/ssh.py
index fcb7fb3..ddf3e58 100644
--- a/wally/ssh.py
+++ b/wally/ssh.py
@@ -9,7 +9,8 @@
 
 import paramiko
 
-from . import utils
+from cephlib.common import Timeout
+
 from .common_types import ConnCreds, IPAddr
 
 logger = logging.getLogger("wally")
@@ -108,7 +109,7 @@
 
     addrs_set = set(addrs)  # type: Set[IPAddr]
 
-    for _ in utils.Timeout(timeout):
+    for _ in Timeout(timeout):
         selector = selectors.DefaultSelector()  # type: selectors.BaseSelector
         with selector:
             for addr in addrs_set:
diff --git a/wally/ssh_utils.py b/wally/ssh_utils.py
index ed857b6..56d317f 100644
--- a/wally/ssh_utils.py
+++ b/wally/ssh_utils.py
@@ -4,7 +4,7 @@
 from typing import List, Dict
 
 
-from . import utils
+from cephlib.common import to_ip
 from .common_types import ConnCreds
 
 
@@ -60,7 +60,7 @@
         if rrm is not None:
             params = {"user": getpass.getuser()}  # type: Dict[str, str]
             params.update(rrm.groupdict())
-            params['host'] = utils.to_ip(params['host'])
+            params['host'] = to_ip(params['host'])
             return ConnCreds(**params)  # type: ignore
 
     raise ValueError("Can't parse {0!r} as ssh uri value".format(uri))
diff --git a/wally/statistic.py b/wally/statistic.py
deleted file mode 100644
index 2b1e83a..0000000
--- a/wally/statistic.py
+++ /dev/null
@@ -1,350 +0,0 @@
-import math
-import logging
-import itertools
-from typing import List, Callable, Iterable, cast, Tuple
-
-import numpy
-from scipy import stats, optimize
-from numpy import linalg
-from numpy.polynomial.chebyshev import chebfit, chebval
-
-
-from .result_classes import NormStatProps, HistoStatProps, TimeSeries
-from .utils import Number
-
-
-logger = logging.getLogger("wally")
-DOUBLE_DELTA = 1e-8
-MIN_VALUES_FOR_CONFIDENCE = 7
-
-
-average = numpy.mean
-dev = lambda x: math.sqrt(numpy.var(x, ddof=1))
-
-
-def calc_norm_stat_props(ts: TimeSeries, bins_count: int = None, confidence: float = 0.95) -> NormStatProps:
-    "Calculate statistical properties of array of numbers"
-
-    res = NormStatProps(ts.data, ts.units)  # type: ignore
-
-    if len(ts.data) == 0:
-        raise ValueError("Input array is empty")
-
-    res.average = average(ts.data)
-    res.deviation = dev(ts.data)
-
-    data = sorted(ts.data)
-    res.max = data[-1]
-    res.min = data[0]
-    pcs = numpy.percentile(data, q=[1.0, 5.0, 10., 50., 90., 95., 99.])
-    res.perc_1, res.perc_5, res.perc_10, res.perc_50, res.perc_90, res.perc_95, res.perc_99 = pcs
-
-    if len(data) >= MIN_VALUES_FOR_CONFIDENCE:
-        res.confidence = stats.sem(ts.data) * \
-                         stats.t.ppf((1 + confidence) / 2, len(ts.data) - 1)
-        res.confidence_level = confidence
-    else:
-        res.confidence = None
-        res.confidence_level = None
-
-    if bins_count is not None:
-        res.bins_populations, res.bins_edges = numpy.histogram(ts.data, bins=bins_count)
-        res.bins_edges = res.bins_edges[:-1]
-
-    try:
-        res.normtest = stats.mstats.normaltest(ts.data)
-    except Exception as exc:
-        logger.warning("stats.mstats.normaltest failed with error: %s", exc)
-
-    res.skew = stats.skew(ts.data)
-    res.kurt = stats.kurtosis(ts.data)
-
-    return res
-
-
-# update this code
-def rebin_histogram(bins_populations: numpy.array,
-                    bins_edges: numpy.array,
-                    new_bins_count: int,
-                    left_tail_idx: int = None,
-                    right_tail_idx: int = None,
-                    log_bins: bool = False) -> Tuple[numpy.array, numpy.array]:
-    # rebin large histogram into smaller with new_bins bins, linearly distributes across
-    # left_tail_idx:right_tail_idx range
-
-    assert len(bins_populations.shape) == 1
-    assert len(bins_edges.shape) == 1
-    assert bins_edges.shape[0] == bins_populations.shape[0]
-
-    if left_tail_idx is None:
-        min_val = bins_edges[0]
-    else:
-        min_val = bins_edges[left_tail_idx]
-
-    if right_tail_idx is None:
-        max_val = bins_edges[-1]
-    else:
-        max_val = bins_edges[right_tail_idx]
-
-    if log_bins:
-        assert min_val > 1E-3
-        step = (max_val / min_val) ** (1 / new_bins_count)
-        new_bins_edges = min_val * (step ** numpy.arange(new_bins_count))  # type: numpy.array
-    else:
-        new_bins_edges = numpy.linspace(min_val, max_val, new_bins_count + 1, dtype='float')[:-1]  # type: numpy.array
-
-    old_bins_pos = numpy.searchsorted(new_bins_edges, bins_edges, side='right')
-    new_bins = numpy.zeros(new_bins_count, dtype=int)  # type: numpy.array
-
-    # last source bin can't be split
-    # TODO: need to add assert for this
-    new_bins[-1] += bins_populations[-1]
-    bin_sizes = bins_edges[1:] - bins_edges[:-1]
-
-    # correct position to get bin idx from edge idx
-    old_bins_pos -= 1
-    old_bins_pos[old_bins_pos < 0] = 0
-    new_bins_sizes = new_bins_edges[1:] - new_bins_edges[:-1]
-
-    for population, begin, end, bsize in zip(bins_populations[:-1], old_bins_pos[:-1], old_bins_pos[1:], bin_sizes):
-        if begin == end:
-            new_bins[begin] += population
-        else:
-            density = population / bsize
-            for curr_box in range(begin, end):
-                cnt = min(int(new_bins_sizes[begin] * density + 0.5), population)
-                new_bins[begin] += cnt
-                population -= cnt
-
-    return new_bins, new_bins_edges
-
-
-def calc_histo_stat_props(ts: TimeSeries,
-                          bins_edges: numpy.array = None,
-                          rebins_count: int = None,
-                          tail: float = 0.005) -> HistoStatProps:
-    if bins_edges is None:
-        bins_edges = ts.histo_bins
-
-    res = HistoStatProps(ts.data, ts.units)
-
-    # summ across all series
-    aggregated = ts.data.sum(axis=0, dtype='int')
-    total = aggregated.sum()
-
-    # percentiles levels
-    expected = list(numpy.array([0.01, 0.05, 0.1, 0.5, 0.9, 0.95, 0.99]) * total)
-    cumsum = numpy.cumsum(aggregated)
-
-    percentiles_bins = numpy.searchsorted(cumsum, expected)
-    percentiles = bins_edges[percentiles_bins]
-    res.perc_1, res.perc_5, res.perc_10, res.perc_50, res.perc_90, res.perc_95, res.perc_99 = percentiles
-
-    # don't show tail ranges on histogram
-    left_tail_idx, right_tail_idx = numpy.searchsorted(cumsum, [tail * total, (1 - tail) * total])
-
-    # minimax and maximal non-zero elements
-    non_zero = numpy.nonzero(aggregated)[0]
-    res.min = bins_edges[aggregated[non_zero[0]]]
-    res.max = bins_edges[non_zero[-1] + (1 if non_zero[-1] != len(bins_edges) else 0)]
-
-    res.log_bins = False
-    if rebins_count is not None:
-        res.bins_populations, res.bins_edges = rebin_histogram(aggregated, bins_edges, rebins_count,
-                                                               left_tail_idx, right_tail_idx)
-    else:
-        res.bins_populations = aggregated
-        res.bins_edges = bins_edges.copy()
-
-    return res
-
-
-def groupby_globally(data: Iterable, key_func: Callable):
-    grouped = {}  # type: ignore
-    grouped_iter = itertools.groupby(data, key_func)
-
-    for (bs, cache_tp, act, conc), curr_data_it in grouped_iter:
-        key = (bs, cache_tp, act, conc)
-        grouped.setdefault(key, []).extend(curr_data_it)
-
-    return grouped
-
-
-def approximate_curve(x: List[Number], y: List[float], xnew: List[Number], curved_coef: int) -> List[float]:
-    """returns ynew - y values of some curve approximation"""
-    return cast(List[float], chebval(xnew, chebfit(x, y, curved_coef)))
-
-
-def approximate_line(x: List[Number], y: List[float], xnew: List[Number], relative_dist: bool = False) -> List[float]:
-    """
-    x, y - test data, xnew - dots, where we want find approximation
-    if not relative_dist distance = y - newy
-    returns ynew - y values of linear approximation
-    """
-    ox = numpy.array(x)
-    oy = numpy.array(y)
-
-    # set approximation function
-    def func_line(tpl, x):
-        return tpl[0] * x + tpl[1]
-
-    def error_func_rel(tpl, x, y):
-        return 1.0 - y / func_line(tpl, x)
-
-    def error_func_abs(tpl, x, y):
-        return y - func_line(tpl, x)
-
-    # choose distance mode
-    error_func = error_func_rel if relative_dist else error_func_abs
-
-    tpl_initial = tuple(linalg.solve([[ox[0], 1.0], [ox[1], 1.0]],
-                                     oy[:2]))
-
-    # find line
-    tpl_final, success = optimize.leastsq(error_func, tpl_initial[:], args=(ox, oy))
-
-    # if error
-    if success not in range(1, 5):
-        raise ValueError("No line for this dots")
-
-    # return new dots
-    return func_line(tpl_final, numpy.array(xnew))
-
-
-def moving_average(data: numpy.array, window: int) -> numpy.array:
-    cumsum = numpy.cumsum(data)
-    cumsum[window:] = cumsum[window:] - cumsum[:-window]
-    return cumsum[window - 1:] / window
-
-
-def moving_dev(data: numpy.array, window: int) -> numpy.array:
-    cumsum = numpy.cumsum(data)
-    cumsum2 = numpy.cumsum(data ** 2)
-    cumsum[window:] = cumsum[window:] - cumsum[:-window]
-    cumsum2[window:] = cumsum2[window:] - cumsum2[:-window]
-    return ((cumsum2[window - 1:] - cumsum[window - 1:] ** 2 / window) / (window - 1)) ** 0.5
-
-
-def find_ouliers(data: numpy.array,
-                 center_range: Tuple[int, int] = (25, 75),
-                 cut_range: float = 3.0) -> numpy.array:
-    v1, v2 = numpy.percentile(data, center_range)
-    return numpy.abs(data - (v1 + v2) / 2) > ((v2 - v1) / 2 * cut_range)
-
-
-def find_ouliers_ts(data: numpy.array,
-                    windows_size: int = 30,
-                    center_range: Tuple[int, int] = (25, 75),
-                    cut_range: float = 3.0) -> numpy.array:
-    outliers = numpy.empty(data.shape, dtype=bool)
-
-    if len(data) < windows_size:
-        outliers[:] = False
-        return outliers
-
-    begin_idx = 0
-    if len(data) < windows_size * 2:
-        end_idx = (len(data) % windows_size) // 2 + windows_size
-    else:
-        end_idx = len(data)
-
-    while True:
-        cdata = data[begin_idx: end_idx]
-        outliers[begin_idx: end_idx] = find_ouliers(cdata, center_range, cut_range)
-        begin_idx = end_idx
-
-        if end_idx == len(data):
-            break
-
-        end_idx += windows_size
-        if len(data) - end_idx < windows_size:
-            end_idx = len(data)
-
-    return outliers
-
-
-def hist_outliers_nd(bin_populations: numpy.array,
-                     bin_centers: numpy.array,
-                     center_range: Tuple[int, int] = (25, 75),
-                     cut_range: float = 3.0) -> Tuple[int, int]:
-    assert len(bin_populations) == len(bin_centers)
-    total_count = bin_populations.sum()
-
-    perc25 = total_count / 100.0 * center_range[0]
-    perc75 = total_count / 100.0 * center_range[1]
-
-    perc25_idx, perc75_idx = numpy.searchsorted(numpy.cumsum(bin_populations), [perc25, perc75])
-    middle = (bin_centers[perc75_idx] + bin_centers[perc25_idx]) / 2
-    r = (bin_centers[perc75_idx] - bin_centers[perc25_idx]) / 2
-
-    lower_bound = middle - r * cut_range
-    upper_bound = middle + r * cut_range
-
-    lower_cut_idx, upper_cut_idx = numpy.searchsorted(bin_centers, [lower_bound, upper_bound])
-    return lower_cut_idx, upper_cut_idx
-
-
-def hist_outliers_perc(bin_populations: numpy.array,
-                       bounds_perc: Tuple[float, float] = (0.01, 0.99),
-                       min_bins_left: int = None) -> Tuple[int, int]:
-    assert len(bin_populations.shape) == 1
-    total_count = bin_populations.sum()
-    lower_perc = total_count * bounds_perc[0]
-    upper_perc = total_count * bounds_perc[1]
-    idx1, idx2 = numpy.searchsorted(numpy.cumsum(bin_populations), [lower_perc, upper_perc])
-
-    # don't cut too many bins. At least min_bins_left must left
-    if min_bins_left is not None and idx2 - idx1 < min_bins_left:
-        missed = min_bins_left - (idx2 - idx1) // 2
-        idx2 = min(len(bin_populations), idx2 + missed)
-        idx1 = max(0, idx1 - missed)
-
-    return idx1, idx2
-
-
-def ts_hist_outliers_perc(bin_populations: numpy.array,
-                          window_size: int = 10,
-                          bounds_perc: Tuple[float, float] = (0.01, 0.99),
-                          min_bins_left: int = None) -> Tuple[int, int]:
-    assert len(bin_populations.shape) == 2
-
-    points = list(range(0, len(bin_populations), window_size))
-    if len(bin_populations) % window_size != 0:
-        points.append(points[-1] + window_size)
-
-    ranges = []  # type: List[List[int]]
-    for begin, end in zip(points[:-1], points[1:]):
-        window_hist = bin_populations[begin:end].sum(axis=0)
-        ranges.append(hist_outliers_perc(window_hist, bounds_perc=bounds_perc, min_bins_left=min_bins_left))
-
-    return min(i[0] for i in ranges), max(i[1] for i in ranges)
-
-
-# TODO: revise next
-# def difference(y, ynew):
-#     """returns average and maximum relative and
-#        absolute differences between y and ynew
-#        result may contain None values for y = 0
-#        return value - tuple:
-#        [(abs dif, rel dif) * len(y)],
-#        (abs average, abs max),
-#        (rel average, rel max)"""
-#
-#     abs_dlist = []
-#     rel_dlist = []
-#
-#     for y1, y2 in zip(y, ynew):
-#         # absolute
-#         abs_dlist.append(y1 - y2)
-#
-#         if y1 > 1E-6:
-#             rel_dlist.append(abs(abs_dlist[-1] / y1))
-#         else:
-#             raise ZeroDivisionError("{0!r} is too small".format(y1))
-#
-#     da_avg = sum(abs_dlist) / len(abs_dlist)
-#     dr_avg = sum(rel_dlist) / len(rel_dlist)
-#
-#     return (zip(abs_dlist, rel_dlist),
-#             (da_avg, max(abs_dlist)), (dr_avg, max(rel_dlist))
-#             )
diff --git a/wally/storage.py b/wally/storage.py
deleted file mode 100644
index e07d9b3..0000000
--- a/wally/storage.py
+++ /dev/null
@@ -1,349 +0,0 @@
-"""
-This module contains interfaces for storage classes
-"""
-
-import os
-import re
-import abc
-import shutil
-import logging
-from typing import Any, TypeVar, Type, IO, Tuple, cast, List, Dict, Iterable, Iterator
-
-import yaml
-try:
-    from yaml import CLoader as Loader, CDumper as Dumper  # type: ignore
-except ImportError:
-    from yaml import Loader, Dumper  # type: ignore
-
-from .common_types import IStorable
-
-
-logger = logging.getLogger("wally")
-
-
-class ISimpleStorage(metaclass=abc.ABCMeta):
-    """interface for low-level storage, which doesn't support serialization
-    and can operate only on bytes"""
-
-    @abc.abstractmethod
-    def put(self, value: bytes, path: str) -> None:
-        pass
-
-    @abc.abstractmethod
-    def get(self, path: str) -> bytes:
-        pass
-
-    @abc.abstractmethod
-    def rm(self, path: str) -> None:
-        pass
-
-    @abc.abstractmethod
-    def sync(self) -> None:
-        pass
-
-    @abc.abstractmethod
-    def __contains__(self, path: str) -> bool:
-        pass
-
-    @abc.abstractmethod
-    def get_fd(self, path: str, mode: str = "rb+") -> IO:
-        pass
-
-    @abc.abstractmethod
-    def get_fname(self, path: str) -> str:
-        pass
-
-    @abc.abstractmethod
-    def sub_storage(self, path: str) -> 'ISimpleStorage':
-        pass
-
-    @abc.abstractmethod
-    def list(self, path: str) -> Iterator[Tuple[bool, str]]:
-        pass
-
-
-class ITSStorage(metaclass=abc.ABCMeta):
-    """interface for low-level storage, which doesn't support serialization
-    and can operate only on bytes"""
-
-    @abc.abstractmethod
-    def put(self, value: bytes, path: str) -> None:
-        pass
-
-    @abc.abstractmethod
-    def get(self, path: str) -> bytes:
-        pass
-
-    @abc.abstractmethod
-    def rm(self, path: str) -> None:
-        pass
-
-    @abc.abstractmethod
-    def sync(self) -> None:
-        pass
-
-    @abc.abstractmethod
-    def __contains__(self, path: str) -> bool:
-        pass
-
-    @abc.abstractmethod
-    def get_fd(self, path: str, mode: str = "rb+") -> IO:
-        pass
-
-    @abc.abstractmethod
-    def sub_storage(self, path: str) -> 'ISimpleStorage':
-        pass
-
-    @abc.abstractmethod
-    def list(self, path: str) -> Iterator[Tuple[bool, str]]:
-        pass
-
-
-class ISerializer(metaclass=abc.ABCMeta):
-    """Interface for serialization class"""
-    @abc.abstractmethod
-    def pack(self, value: IStorable) -> bytes:
-        pass
-
-    @abc.abstractmethod
-    def unpack(self, data: bytes) -> Any:
-        pass
-
-
-class FSStorage(ISimpleStorage):
-    """Store all data in files on FS"""
-
-    def __init__(self, root_path: str, existing: bool) -> None:
-        self.root_path = root_path
-        self.existing = existing
-        self.ignored = {'.', '..'}
-
-    def j(self, path: str) -> str:
-        return os.path.join(self.root_path, path)
-
-    def put(self, value: bytes, path: str) -> None:
-        jpath = self.j(path)
-        os.makedirs(os.path.dirname(jpath), exist_ok=True)
-        with open(jpath, "wb") as fd:
-            fd.write(value)
-
-    def get(self, path: str) -> bytes:
-        try:
-            with open(self.j(path), "rb") as fd:
-                return fd.read()
-        except FileNotFoundError as exc:
-            raise KeyError(path) from exc
-
-    def rm(self, path: str) -> None:
-        if os.path.isdir(path):
-            shutil.rmtree(path, ignore_errors=True)
-        elif os.path.exists(path):
-            os.unlink(path)
-
-    def __contains__(self, path: str) -> bool:
-        return os.path.exists(self.j(path))
-
-    def get_fname(self, path: str) -> str:
-        return self.j(path)
-
-    def get_fd(self, path: str, mode: str = "rb+") -> IO[bytes]:
-        jpath = self.j(path)
-
-        if "cb" == mode:
-            create_on_fail = True
-            mode = "rb+"
-            os.makedirs(os.path.dirname(jpath), exist_ok=True)
-        elif "ct" == mode:
-            create_on_fail = True
-            mode = "rt+"
-            os.makedirs(os.path.dirname(jpath), exist_ok=True)
-        else:
-            create_on_fail = False
-
-        try:
-            fd = open(jpath, mode)
-        except IOError:
-            if not create_on_fail:
-                raise
-
-            if 't' in mode:
-                fd = open(jpath, "wt")
-            else:
-                fd = open(jpath, "wb")
-
-        return cast(IO[bytes], fd)
-
-    def sub_storage(self, path: str) -> 'FSStorage':
-        return self.__class__(self.j(path), self.existing)
-
-    def sync(self):
-        pass
-
-    def list(self, path: str) -> Iterator[Tuple[bool, str]]:
-        path = self.j(path)
-
-        if not os.path.exists(path):
-            return
-
-        if not os.path.isdir(path):
-            raise OSError("{!r} is not a directory".format(path))
-
-        for fobj in os.scandir(path):
-            if fobj.path not in self.ignored:
-                if fobj.is_dir():
-                    yield False, fobj.name
-                else:
-                    yield True, fobj.name
-
-
-class YAMLSerializer(ISerializer):
-    """Serialize data to yaml"""
-    def pack(self, value: IStorable) -> bytes:
-        try:
-            return yaml.dump(value, Dumper=Dumper, encoding="utf8")
-        except Exception as exc:
-            raise ValueError("Can't pickle object {!r} to yaml".format(type(value))) from exc
-
-    def unpack(self, data: bytes) -> Any:
-        return yaml.load(data, Loader=Loader)
-
-
-class SAFEYAMLSerializer(ISerializer):
-    """Serialize data to yaml"""
-    def pack(self, value: IStorable) -> bytes:
-        try:
-            return yaml.safe_dump(value, encoding="utf8")
-        except Exception as exc:
-            raise ValueError("Can't pickle object {!r} to yaml".format(type(value))) from exc
-
-    def unpack(self, data: bytes) -> Any:
-        return yaml.safe_load(data)
-
-
-ObjClass = TypeVar('ObjClass', bound=IStorable)
-
-
-class _Raise:
-    pass
-
-
-class Storage:
-    """interface for storage"""
-
-    def __init__(self, sstorage: ISimpleStorage, serializer: ISerializer) -> None:
-        self.sstorage = sstorage
-        self.serializer = serializer
-        self.cache = {}
-
-    def sub_storage(self, *path: str) -> 'Storage':
-        fpath = "/".join(path)
-        return self.__class__(self.sstorage.sub_storage(fpath), self.serializer)
-
-    def put(self, value: Any, *path: str) -> None:
-        dct_value = cast(IStorable, value).raw() if isinstance(value, IStorable) else value
-        serialized = self.serializer.pack(dct_value)  # type: ignore
-        fpath = "/".join(path)
-        self.sstorage.put(serialized, fpath)
-
-    def put_list(self, value: Iterable[IStorable], *path: str) -> None:
-        serialized = self.serializer.pack([obj.raw() for obj in value])  # type: ignore
-        fpath = "/".join(path)
-        self.sstorage.put(serialized, fpath)
-
-    def get(self, path: str, default: Any = _Raise) -> Any:
-        try:
-            vl = self.sstorage.get(path)
-        except:
-            if default is _Raise:
-                raise
-            return default
-
-        return self.serializer.unpack(vl)
-
-    def rm(self, *path: str) -> None:
-        fpath = "/".join(path)
-        self.sstorage.rm(fpath)
-
-    def __contains__(self, path: str) -> bool:
-        return path in self.sstorage
-
-    def put_raw(self, val: bytes, *path: str) -> str:
-        fpath = "/".join(path)
-        self.sstorage.put(val, fpath)
-        # TODO: dirty hack
-        return self.resolve_raw(fpath)
-
-    def resolve_raw(self, fpath) -> str:
-        return cast(FSStorage, self.sstorage).j(fpath)
-
-    def get_raw(self, *path: str) -> bytes:
-        return self.sstorage.get("/".join(path))
-
-    def append_raw(self, value: bytes, *path: str) -> None:
-        with self.sstorage.get_fd("/".join(path), "rb+") as fd:
-            fd.seek(0, os.SEEK_END)
-            fd.write(value)
-
-    def get_fd(self, path: str, mode: str = "r") -> IO:
-        return self.sstorage.get_fd(path, mode)
-
-    def get_fname(self, path: str) -> str:
-        return self.sstorage.get_fname(path)
-
-    def load_list(self, obj_class: Type[ObjClass], *path: str) -> List[ObjClass]:
-        path_s = "/".join(path)
-        if path_s not in self.cache:
-            raw_val = cast(List[Dict[str, Any]], self.get(path_s))
-            assert isinstance(raw_val, list)
-            self.cache[path_s] = [cast(ObjClass, obj_class.fromraw(val)) for val in raw_val]
-        return self.cache[path_s]
-
-    def load(self, obj_class: Type[ObjClass], *path: str) -> ObjClass:
-        path_s = "/".join(path)
-        if path_s not in self.cache:
-            self.cache[path_s] = cast(ObjClass, obj_class.fromraw(self.get(path_s)))
-        return self.cache[path_s]
-
-    def sync(self) -> None:
-        self.sstorage.sync()
-
-    def __enter__(self) -> 'Storage':
-        return self
-
-    def __exit__(self, x: Any, y: Any, z: Any) -> None:
-        self.sync()
-
-    def list(self, *path: str) -> Iterator[Tuple[bool, str]]:
-        return self.sstorage.list("/".join(path))
-
-    def _iter_paths(self,
-                    root: str,
-                    path_parts: List[str],
-                    groups: Dict[str, str]) -> Iterator[Tuple[bool, str, Dict[str, str]]]:
-
-        curr = path_parts[0]
-        rest = path_parts[1:]
-
-        for is_file, name in self.list(root):
-            if rest and is_file:
-                continue
-
-            rr = re.match(pattern=curr + "$", string=name)
-            if rr:
-                if root:
-                    path = root + "/" + name
-                else:
-                    path = name
-
-                new_groups = rr.groupdict().copy()
-                new_groups.update(groups)
-
-                if rest:
-                    yield from self._iter_paths(path, rest, new_groups)
-                else:
-                    yield is_file, path, new_groups
-
-
-def make_storage(url: str, existing: bool = False) -> Storage:
-    return Storage(FSStorage(url, existing), SAFEYAMLSerializer())
-
diff --git a/wally/suits/io/verify.cfg b/wally/suits/io/verify.cfg
index 033d771..9433361 100644
--- a/wally/suits/io/verify.cfg
+++ b/wally/suits/io/verify.cfg
@@ -6,7 +6,7 @@
 LQDW={% 1, 4, 16, 64 %}
 LQDR={% 1, 4, 16, 64 %}
 
-runtime=300
+runtime=600
 direct=1
 ramp_time=30
 
diff --git a/wally/suits/job.py b/wally/suits/job.py
index 5cb27b5..5f3c764 100644
--- a/wally/suits/job.py
+++ b/wally/suits/job.py
@@ -2,7 +2,7 @@
 from typing import Dict, Any, Tuple, cast, Union
 from collections import OrderedDict
 
-from ..common_types import Storable
+from cephlib.storage import Storable
 
 
 class JobParams(metaclass=abc.ABCMeta):
diff --git a/wally/test_run_class.py b/wally/test_run_class.py
index c4b5bc5..b67035e 100644
--- a/wally/test_run_class.py
+++ b/wally/test_run_class.py
@@ -1,19 +1,19 @@
 from typing import List, Callable, Any, Dict, Optional, Set
 from concurrent.futures import ThreadPoolExecutor
 
+from cephlib.istorage import IStorage
 
-from .timeseries import SensorDatastore
 from .node_interfaces import NodeInfo, IRPCNode
 from .openstack_api import OSCreds, OSConnection
-from .storage import Storage
 from .config import Config
 from .fuel_rest_api import Connection
 from .ssh_utils import ConnCreds
+from .result_classes import IResultStorage
 
 
 class TestRun:
     """Test run information"""
-    def __init__(self, config: Config, storage: Storage) -> None:
+    def __init__(self, config: Config, storage: IStorage, rstorage: IResultStorage) -> None:
         # NodesInfo list
         self.nodes_info = {}  # type: Dict[str, NodeInfo]
 
@@ -33,8 +33,8 @@
         self.default_rpc_plugins = None  # type: Dict[str, bytes]
 
         self.storage = storage
+        self.rstorage = rstorage
         self.config = config
-        self.sensors_data = SensorDatastore()
         self.sensors_run_on = set()  # type: Set[str]
         self.os_spawned_nodes_ids = None  # type: List[int]
 
diff --git a/wally/texttable.py b/wally/texttable.py
deleted file mode 100644
index 504b04d..0000000
--- a/wally/texttable.py
+++ /dev/null
@@ -1,590 +0,0 @@
-# texttable - module for creating simple ASCII tables
-# Copyright (C) 2003-2015 Gerome Fournier <jef(at)foutaise.org>
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
-
-"""module for creating simple ASCII tables
-
-
-Example:
-
-    table = Texttable()
-    table.set_cols_align(["l", "r", "c"])
-    table.set_cols_valign(["t", "m", "b"])
-    table.add_rows([["Name", "Age", "Nickname"],
-                    ["Mr\\nXavier\\nHuon", 32, "Xav'"],
-                    ["Mr\\nBaptiste\\nClement", 1, "Baby"],
-                    ["Mme\\nLouise\\nBourgeau", 28, "Lou\\n\\nLoue"]])
-    print table.draw() + "\\n"
-
-    table = Texttable()
-    table.set_deco(Texttable.HEADER)
-    table.set_cols_dtype(['t',  # text
-                          'f',  # float (decimal)
-                          'e',  # float (exponent)
-                          'i',  # integer
-                          'a']) # automatic
-    table.set_cols_align(["l", "r", "r", "r", "l"])
-    table.add_rows([["text",    "float", "exp", "int", "auto"],
-                    ["abcd",    "67",    654,   89,    128.001],
-                    ["efghijk", 67.5434, .654,  89.6,  12800000000000000000000.00023],
-                    ["lmn",     5e-78,   5e-78, 89.4,  .000000000000128],
-                    ["opqrstu", .023,    5e+78, 92.,   12800000000000000000000]])
-    print table.draw()
-
-Result:
-
-    +----------+-----+----------+
-    |   Name   | Age | Nickname |
-    +==========+=====+==========+
-    | Mr       |     |          |
-    | Xavier   |  32 |          |
-    | Huon     |     |   Xav'   |
-    +----------+-----+----------+
-    | Mr       |     |          |
-    | Baptiste |   1 |          |
-    | Clement  |     |   Baby   |
-    +----------+-----+----------+
-    | Mme      |     |   Lou    |
-    | Louise   |  28 |          |
-    | Bourgeau |     |   Loue   |
-    +----------+-----+----------+
-
-    text   float       exp      int     auto
-    ===========================================
-    abcd   67.000   6.540e+02   89    128.001
-    efgh   67.543   6.540e-01   90    1.280e+22
-    ijkl   0.000    5.000e-78   89    0.000
-    mnop   0.023    5.000e+78   92    1.280e+22
-"""
-
-from __future__ import division
-
-__all__ = ["Texttable", "ArraySizeError"]
-
-__author__ = 'Gerome Fournier <jef(at)foutaise.org>'
-__license__ = 'LGPL'
-__version__ = '0.8.8'
-__credits__ = """\
-Jeff Kowalczyk:
-    - textwrap improved import
-    - comment concerning header output
-
-Anonymous:
-    - add_rows method, for adding rows in one go
-
-Sergey Simonenko:
-    - redefined len() function to deal with non-ASCII characters
-
-Roger Lew:
-    - columns datatype specifications
-
-Brian Peterson:
-    - better handling of unicode errors
-
-Frank Sachsenheim:
-    - add Python 2/3-compatibility
-
-Maximilian Hils:
-    - fix minor bug for Python 3 compatibility
-
-frinkelpi:
-    - preserve empty lines
-"""
-
-import sys
-import string
-import unicodedata
-
-try:
-    if sys.version >= '2.3':
-        import textwrap
-    elif sys.version >= '2.2':
-        from optparse import textwrap
-    else:
-        from optik import textwrap
-except ImportError:
-    sys.stderr.write("Can't import textwrap module!\n")
-    raise
-
-if sys.version >= '2.7':
-    from functools import reduce
-
-if sys.version >= '3.0':
-    unicode_type = str
-    bytes_type = bytes
-else:
-    unicode_type = unicode
-    bytes_type = str
-
-
-def obj2unicode(obj):
-    """Return a unicode representation of a python object
-    """
-    if isinstance(obj, unicode_type):
-        return obj
-    elif isinstance(obj, bytes_type):
-        try:
-            return unicode_type(obj, 'utf-8')
-        except UnicodeDecodeError as strerror:
-            sys.stderr.write("UnicodeDecodeError exception for string '%s': %s\n" % (obj, strerror))
-            return unicode_type(obj, 'utf-8', 'replace')
-    else:
-        return unicode_type(obj)
-
-
-def len(iterable):
-    """Redefining len here so it will be able to work with non-ASCII characters
-    """
-    if isinstance(iterable, bytes_type) or isinstance(iterable, unicode_type):
-        unicode_data = obj2unicode(iterable)
-        if hasattr(unicodedata, 'east_asian_width'):
-            w = unicodedata.east_asian_width
-            return sum([w(c) in 'WF' and 2 or 1 for c in unicode_data])
-        else:
-            return unicode_data.__len__()
-    else:
-        return iterable.__len__()
-
-
-class ArraySizeError(Exception):
-    """Exception raised when specified rows don't fit the required size
-    """
-
-    def __init__(self, msg):
-        self.msg = msg
-        Exception.__init__(self, msg, '')
-
-    def __str__(self):
-        return self.msg
-
-
-class Texttable:
-
-    BORDER = 1
-    HEADER = 1 << 1
-    HLINES = 1 << 2
-    VLINES = 1 << 3
-
-    def __init__(self, max_width=80):
-        """Constructor
-
-        - max_width is an integer, specifying the maximum width of the table
-        - if set to 0, size is unlimited, therefore cells won't be wrapped
-        """
-
-        if max_width <= 0:
-            max_width = False
-        self._max_width = max_width
-        self._precision = 3
-
-        self._deco = Texttable.VLINES | Texttable.HLINES | Texttable.BORDER | Texttable.HEADER
-        self._reset()
-                         ## left, horiz, cross, right
-        self._chars_top = (chr(0x250c), chr(0x2500), chr(0x252c), chr(0x2510))
-        # self.chars_header = (chr(0x255e), chr(0x2550), chr(0x256a), chr(0x2561))
-        self._chars_header = (chr(0x251d), chr(0x2501), chr(0x253f), chr(0x2525))
-        self._chars_middle = (chr(0x251c), chr(0x2500), chr(0x253c), chr(0x2524))
-        self._chars_bottom = (chr(0x2514), chr(0x2500), chr(0x2534), chr(0x2518))
-        self._char_vert = chr(0x2502)
-        self._align = None
-
-    def _reset(self):
-        """Reset the instance
-
-        - reset rows and header
-        """
-
-        self._row_size = None
-        self._header = []
-        self._rows = []
-
-    def set_cols_align(self, array):
-        """Set the desired columns alignment
-
-        - the elements of the array should be either "l", "c" or "r":
-
-            * "l": column flushed left
-            * "c": column centered
-            * "r": column flushed right
-        """
-
-        self._check_row_size(array)
-        self._align = array
-
-    def set_cols_valign(self, array):
-        """Set the desired columns vertical alignment
-
-        - the elements of the array should be either "t", "m" or "b":
-
-            * "t": column aligned on the top of the cell
-            * "m": column aligned on the middle of the cell
-            * "b": column aligned on the bottom of the cell
-        """
-
-        self._check_row_size(array)
-        self._valign = array
-
-    def set_cols_dtype(self, array):
-        """Set the desired columns datatype for the cols.
-
-        - the elements of the array should be either "a", "t", "f", "e" or "i":
-
-            * "a": automatic (try to use the most appropriate datatype)
-            * "t": treat as text
-            * "f": treat as float in decimal format
-            * "e": treat as float in exponential format
-            * "i": treat as int
-
-        - by default, automatic datatyping is used for each column
-        """
-
-        self._check_row_size(array)
-        self._dtype = array
-
-    def set_cols_width(self, array):
-        """Set the desired columns width
-
-        - the elements of the array should be integers, specifying the
-          width of each column. For example:
-
-                [10, 20, 5]
-        """
-
-        self._check_row_size(array)
-        try:
-            array = list(map(int, array))
-            if reduce(min, array) <= 0:
-                raise ValueError
-        except ValueError:
-            sys.stderr.write("Wrong argument in column width specification\n")
-            raise
-        self._width = array
-
-    def set_precision(self, width):
-        """Set the desired precision for float/exponential formats
-
-        - width must be an integer >= 0
-
-        - default value is set to 3
-        """
-
-        if not type(width) is int or width < 0:
-            raise ValueError('width must be an integer greater then 0')
-        self._precision = width
-
-    def header(self, array):
-        """Specify the header of the table
-        """
-
-        self._check_row_size(array)
-        self._header = list(map(obj2unicode, array))
-
-    def add_row(self, array):
-        """Add a row in the rows stack
-
-        - cells can contain newlines and tabs
-        """
-
-        self._check_row_size(array)
-
-        if not hasattr(self, "_dtype"):
-            self._dtype = ["a"] * self._row_size
-
-        cells = []
-        for i, x in enumerate(array):
-            cells.append(self._str(i, x))
-
-        self._rows.append(cells)
-
-    def add_rows(self, rows, header=True):
-        """Add several rows in the rows stack
-
-        - The 'rows' argument can be either an iterator returning arrays,
-          or a by-dimensional array
-        - 'header' specifies if the first row should be used as the header
-          of the table
-        """
-
-        # nb: don't use 'iter' on by-dimensional arrays, to get a
-        #     usable code for python 2.1
-        if header:
-            if hasattr(rows, '__iter__') and hasattr(rows, 'next'):
-                self.header(rows.next())
-            else:
-                self.header(rows[0])
-                rows = rows[1:]
-        for row in rows:
-            self.add_row(row)
-
-    def draw(self):
-        """Draw the table
-
-        - the table is returned as a whole string
-        """
-
-        if not self._header and not self._rows:
-            return
-        self._compute_cols_width()
-        self._check_align()
-        out = ""
-
-        if self._has_border():
-            out += self._hline(*self._chars_top)
-
-        if self._header:
-            out += self._draw_line(self._header, isheader=True)
-            if self._has_header():
-                out += self._hline(*self._chars_header)
-
-        length = 0
-        for row in self._rows:
-            length += 1
-            out += self._draw_line(row)
-            if self._has_hlines() and length < len(self._rows):
-                out += self._hline(*self._chars_middle)
-
-        if self._has_border():
-            out += self._hline(*self._chars_bottom)
-
-        return out[:-1]
-
-    def _str(self, i, x):
-        """Handles string formatting of cell data
-
-            i - index of the cell datatype in self._dtype
-            x - cell data to format
-        """
-        if isinstance(x, str):
-            return x
-
-        try:
-            f = float(x)
-        except:
-            return obj2unicode(x)
-
-        n = self._precision
-        dtype = self._dtype[i]
-
-        if dtype == 'i':
-            return str(int(round(f)))
-        elif dtype == 'f':
-            return '%.*f' % (n, f)
-        elif dtype == 'e':
-            return '%.*e' % (n, f)
-        elif dtype == 't':
-            return obj2unicode(x)
-        else:
-            if f - round(f) == 0:
-                if abs(f) > 1e8:
-                    return '%.*e' % (n, f)
-                else:
-                    return str(int(round(f)))
-            else:
-                if abs(f) > 1e8:
-                    return '%.*e' % (n, f)
-                else:
-                    return '%.*f' % (n, f)
-
-    def _check_row_size(self, array):
-        """Check that the specified array fits the previous rows size
-        """
-
-        if not self._row_size:
-            self._row_size = len(array)
-        elif self._row_size != len(array):
-            raise ArraySizeError("array should contain %d elements" \
-                % self._row_size)
-
-    def _has_vlines(self):
-        """Return a boolean, if vlines are required or not
-        """
-
-        return self._deco & Texttable.VLINES > 0
-
-    def _has_hlines(self):
-        """Return a boolean, if hlines are required or not
-        """
-
-        return self._deco & Texttable.HLINES > 0
-
-    def _has_border(self):
-        """Return a boolean, if border is required or not
-        """
-
-        return self._deco & Texttable.BORDER > 0
-
-    def _has_header(self):
-        """Return a boolean, if header line is required or not
-        """
-
-        return self._deco & Texttable.HEADER > 0
-
-    def _hline(self, left, horiz, cross, right):
-        """Return a string used to separated rows or separate header from rows"""
-
-        # compute cell separator
-        sep = horiz + (cross if self._has_vlines() else horiz) + horiz
-
-        # build the line
-        line = sep.join([horiz * n for n in self._width])
-
-        # add border if needed
-        if self._has_border():
-            line = left + horiz + line  + horiz + right
-
-        return line + "\n"
-
-    def _len_cell(self, cell):
-        """Return the width of the cell
-
-        Special characters are taken into account to return the width of the
-        cell, such like newlines and tabs
-        """
-
-        cell_lines = cell.split('\n')
-        maxi = 0
-        for line in cell_lines:
-            length = 0
-            parts = line.split('\t')
-            for part, i in zip(parts, list(range(1, len(parts) + 1))):
-                length = length + len(part)
-                if i < len(parts):
-                    length = (length//8 + 1) * 8
-            maxi = max(maxi, length)
-        return maxi
-
-    def _compute_cols_width(self):
-        """Return an array with the width of each column
-
-        If a specific width has been specified, exit. If the total of the
-        columns width exceed the table desired width, another width will be
-        computed to fit, and cells will be wrapped.
-        """
-
-        if hasattr(self, "_width"):
-            return
-        maxi = []
-        if self._header:
-            maxi = [ self._len_cell(x) for x in self._header ]
-        for row in self._rows:
-            for cell,i in zip(row, list(range(len(row)))):
-                try:
-                    maxi[i] = max(maxi[i], self._len_cell(cell))
-                except (TypeError, IndexError):
-                    maxi.append(self._len_cell(cell))
-        items = len(maxi)
-        length = sum(maxi)
-        if self._max_width and length + items * 3 + 1 > self._max_width:
-            maxi = [
-                int(round(self._max_width / (length + items * 3 + 1) * n))
-                for n in maxi
-            ]
-        self._width = maxi
-
-    def _check_align(self):
-        """Check if alignment has been specified, set default one if not
-        """
-
-        if not hasattr(self, "_align"):
-            self._align = ["l"] * self._row_size
-        if not hasattr(self, "_valign"):
-            self._valign = ["t"] * self._row_size
-
-    def _draw_line(self, line, isheader=False):
-        """Draw a line
-
-        Loop over a single cell length, over all the cells
-        """
-
-        line = self._splitit(line, isheader)
-        space = " "
-        out = ""
-        for i in range(len(line[0])):
-            if self._has_border():
-                out += "%s " % self._char_vert
-            length = 0
-            for cell, width, align in zip(line, self._width, self._align):
-                length += 1
-                cell_line = cell[i]
-                fill = width - len(cell_line)
-                if isheader:
-                    align = "c"
-                if align == "r":
-                    out += "%s " % (fill * space + cell_line)
-                elif align == "c":
-                    out += "%s " % (int(fill/2) * space + cell_line + int(fill/2 + fill%2) * space)
-                else:
-                    out += "%s " % (cell_line + fill * space)
-                if length < len(line):
-                    out += "%s " % [space, self._char_vert][self._has_vlines()]
-            out += "%s\n" % ['', self._char_vert][self._has_border()]
-        return out
-
-    def _splitit(self, line, isheader):
-        """Split each element of line to fit the column width
-
-        Each element is turned into a list, result of the wrapping of the
-        string to the desired width
-        """
-
-        line_wrapped = []
-        for cell, width in zip(line, self._width):
-            array = []
-            for c in cell.split('\n'):
-                if c.strip() == "":
-                    array.append("")
-                else:
-                    array.extend(textwrap.wrap(c, width))
-            line_wrapped.append(array)
-        max_cell_lines = reduce(max, list(map(len, line_wrapped)))
-        for cell, valign in zip(line_wrapped, self._valign):
-            if isheader:
-                valign = "t"
-            if valign == "m":
-                missing = max_cell_lines - len(cell)
-                cell[:0] = [""] * int(missing / 2)
-                cell.extend([""] * int(missing / 2 + missing % 2))
-            elif valign == "b":
-                cell[:0] = [""] * (max_cell_lines - len(cell))
-            else:
-                cell.extend([""] * (max_cell_lines - len(cell)))
-        return line_wrapped
-
-
-if __name__ == '__main__':
-    table = Texttable()
-    table.set_cols_align(["l", "r", "c"])
-    table.set_cols_valign(["t", "m", "b"])
-    table.add_rows([["Name", "Age", "Nickname"],
-                    ["Mr\nXavier\nHuon", 32, "Xav'"],
-                    ["Mr\nBaptiste\nClement", 1, "Baby"],
-                    ["Mme\nLouise\nBourgeau", 28, "Lou\n \nLoue"]])
-    print(table.draw() + "\n")
-
-    table = Texttable()
-    table.set_deco(Texttable.HEADER)
-    table.set_cols_dtype(['t',  # text
-                          'f',  # float (decimal)
-                          'e',  # float (exponent)
-                          'i',  # integer
-                          'a']) # automatic
-    table.set_cols_align(["l", "r", "r", "r", "l"])
-    table.add_rows([["text",    "float", "exp", "int", "auto"],
-                    ["abcd",    "67",    654,   89,    128.001],
-                    ["efghijk", 67.5434, .654,  89.6,  12800000000000000000000.00023],
-                    ["lmn",     5e-78,   5e-78, 89.4,  .000000000000128],
-                    ["opqrstu", .023,    5e+78, 92.,   12800000000000000000000]])
-    print(table.draw())
diff --git a/wally/timeseries.py b/wally/timeseries.py
deleted file mode 100644
index d322fff..0000000
--- a/wally/timeseries.py
+++ /dev/null
@@ -1,61 +0,0 @@
-import array
-import threading
-
-
-class SensorDatastore(object):
-    def __init__(self, stime=None):
-        self.lock = threading.Lock()
-        self.stime = stime
-
-        self.min_size = 60 * 60
-        self.max_size = 60 * 61
-
-        self.data = {
-            'testnodes:io': array.array("B"),
-            'testnodes:cpu': array.array("B"),
-        }
-
-    def get_values(self, name, start, end):
-        assert end >= start
-
-        if end == start:
-            return []
-
-        with self.lock:
-            curr_arr = self.data[name]
-            if self.stime is None:
-                return []
-
-            sidx = start - self.stime
-            eidx = end - self.stime
-
-            if sidx < 0 and eidx < 0:
-                return [0] * (end - start)
-            elif sidx < 0:
-                return [0] * (-sidx) + curr_arr[:eidx]
-            return curr_arr[sidx:eidx]
-
-    def update_values(self, data_time, vals, add=False):
-        with self.lock:
-            if self.stime is None:
-                self.stime = data_time
-
-            for name, value in vals.items():
-                curr_arr = self.data.setdefault(name, array.array("H"))
-                curr_end_time = len(curr_arr) + self.stime
-
-                dtime = data_time - curr_end_time
-
-                if dtime > 0:
-                    curr_arr.extend([0] * dtime)
-                    curr_arr.append(value)
-                elif dtime == 0:
-                    curr_arr.append(value)
-                else:
-                    # dtime < 0
-                    sindex = len(curr_arr) + dtime
-                    if sindex > 0:
-                        if add:
-                            curr_arr[sindex] += value
-                        else:
-                            curr_arr[sindex].append(value)
diff --git a/wally/types.py b/wally/types.py
deleted file mode 100644
index 30db83f..0000000
--- a/wally/types.py
+++ /dev/null
@@ -1,8 +0,0 @@
-from typing import TypeVar, List, Union
-
-import numpy
-
-
-TNumber = TypeVar('TNumber', int, float)
-Number = Union[int, float]
-NumVector = Union[numpy.ndarray, List[int], List[float]]
diff --git a/wally/utils.py b/wally/utils.py
index a0f10e8..dfd0e8c 100644
--- a/wally/utils.py
+++ b/wally/utils.py
@@ -1,33 +1,19 @@
-import re
 import os
 import sys
-import math
-import time
 import uuid
-import socket
 import logging
 import datetime
-import ipaddress
-import threading
 import contextlib
-import subprocess
-from fractions import Fraction
 
-from typing import Any, Tuple, Union, List, Iterator, Iterable, Optional, IO, cast, TypeVar, Callable
-
-try:
-    import psutil
-except ImportError:
-    psutil = None
+from typing import Any, Tuple, Iterator, Iterable
 
 try:
     from petname import Generate as pet_generate
 except ImportError:
-    def pet_generate(x: str, y: str) -> str:
+    def pet_generate(_1: str, _2: str) -> str:
         return str(uuid.uuid4())
 
-
-from .types import TNumber, Number
+from cephlib.common import run_locally, sec_to_str
 
 
 logger = logging.getLogger("wally")
@@ -65,63 +51,6 @@
     pass
 
 
-class Timeout(Iterable[float]):
-    def __init__(self, timeout: int, message: str = None, min_tick: int = 1, no_exc: bool = False) -> None:
-        self.end_time = time.time() + timeout
-        self.message = message
-        self.min_tick = min_tick
-        self.prev_tick_at = time.time()
-        self.no_exc = no_exc
-
-    def tick(self) -> bool:
-        current_time = time.time()
-
-        if current_time > self.end_time:
-            if self.message:
-                msg = "Timeout: {}".format(self.message)
-            else:
-                msg = "Timeout"
-
-            if self.no_exc:
-                return False
-
-            raise TimeoutError(msg)
-
-        sleep_time = self.min_tick - (current_time - self.prev_tick_at)
-        if sleep_time > 0:
-            time.sleep(sleep_time)
-            self.prev_tick_at = time.time()
-        else:
-            self.prev_tick_at = current_time
-
-        return True
-
-    def __iter__(self) -> Iterator[float]:
-        return cast(Iterator[float], self)
-
-    def __next__(self) -> float:
-        if not self.tick():
-            raise StopIteration()
-        return self.end_time - time.time()
-
-
-def greater_digit_pos(val: Number) -> int:
-    return int(math.floor(math.log10(val))) + 1
-
-
-def round_digits(val: TNumber, num_digits: int = 3) -> TNumber:
-    pow = 10 ** (greater_digit_pos(val) - num_digits)
-    return type(val)(int(val / pow) * pow)
-
-
-def is_ip(data: str) -> bool:
-    try:
-        ipaddress.ip_address(data)
-        return True
-    except ValueError:
-        return False
-
-
 def log_block(message: str, exc_logger:logging.Logger = None) -> LogError:
     logger.debug("Starts : " + message)
     return LogError(message, exc_logger)
@@ -133,212 +62,6 @@
         raise StopTestError(message)
 
 
-def parse_creds(creds: str) -> Tuple[str, str, str]:
-    """Parse simple credentials format user[:passwd]@host"""
-    user, passwd_host = creds.split(":", 1)
-
-    if '@' not in passwd_host:
-        passwd, host = passwd_host, None
-    else:
-        passwd, host = passwd_host.rsplit('@', 1)
-
-    return user, passwd, host
-
-
-SMAP = dict(k=1024, m=1024 ** 2, g=1024 ** 3, t=1024 ** 4)
-
-
-def ssize2b(ssize: Union[str, int]) -> int:
-    try:
-        if isinstance(ssize, int):
-            return ssize
-
-        ssize = ssize.lower()
-        if ssize[-1] in SMAP:
-            return int(ssize[:-1]) * SMAP[ssize[-1]]
-        return int(ssize)
-    except (ValueError, TypeError, AttributeError):
-        raise ValueError("Unknow size format {!r}".format(ssize))
-
-
-RSMAP = [('K', 1024),
-         ('M', 1024 ** 2),
-         ('G', 1024 ** 3),
-         ('T', 1024 ** 4)]
-
-
-def b2ssize(value: Union[int, float]) -> str:
-    if isinstance(value, float) and value < 100:
-        return b2ssize_10(value)
-
-    value = int(value)
-    if value < 1024:
-        return str(value) + " "
-
-    # make mypy happy
-    scale = 1
-    name = ""
-
-    for name, scale in RSMAP:
-        if value < 1024 * scale:
-            if value % scale == 0:
-                return "{} {}i".format(value // scale, name)
-            else:
-                return "{:.1f} {}i".format(float(value) / scale, name)
-
-    return "{}{}i".format(value // scale, name)
-
-
-RSMAP_10 = [(' f', 0.001 ** 4),
-            (' n', 0.001 ** 3),
-            (' u', 0.001 ** 2),
-            (' m', 0.001),
-            (' ', 1),
-            (' K', 1000),
-            (' M', 1000 ** 2),
-            (' G', 1000 ** 3),
-            (' T', 1000 ** 4),
-            (' P', 1000 ** 5),
-            (' E', 1000 ** 6)]
-
-
-def has_next_digit_after_coma(x: float) -> bool:
-    return x * 10 - int(x * 10) > 1
-
-
-def has_second_digit_after_coma(x: float) -> bool:
-    return (x * 10 - int(x * 10)) * 10 > 1
-
-
-def b2ssize_10(value: Union[int, float]) -> str:
-    # make mypy happy
-    scale = 1
-    name = " "
-
-    if value == 0.0:
-        return "0 "
-
-    if value / RSMAP_10[0][1] < 1.0:
-        return "{:.2e} ".format(value)
-
-    for name, scale in RSMAP_10:
-        cval = value / scale
-        if cval < 1000:
-            # detect how many digits after dot to show
-            if cval > 100:
-                return "{}{}".format(int(cval), name)
-            if cval > 10:
-                if has_next_digit_after_coma(cval):
-                    return "{:.1f}{}".format(cval, name)
-                else:
-                    return "{}{}".format(int(cval), name)
-            if cval >= 1:
-                if has_second_digit_after_coma(cval):
-                    return "{:.2f}{}".format(cval, name)
-                elif has_next_digit_after_coma(cval):
-                    return "{:.1f}{}".format(cval, name)
-                return "{}{}".format(int(cval), name)
-            raise AssertionError("Can't get here")
-
-    return "{}{}".format(int(value // scale), name)
-
-
-def run_locally(cmd: Union[str, List[str]], input_data: str = "", timeout: int = 20) -> str:
-    if isinstance(cmd, str):
-        shell = True
-        cmd_str = cmd
-    else:
-        shell = False
-        cmd_str = " ".join(cmd)
-
-    proc = subprocess.Popen(cmd,
-                            shell=shell,
-                            stdin=subprocess.PIPE,
-                            stdout=subprocess.PIPE,
-                            stderr=subprocess.PIPE)
-    res = []  # type: List[Tuple[bytes, bytes]]
-
-    def thread_func() -> None:
-        rr = proc.communicate(input_data.encode("utf8"))
-        res.extend(rr)
-
-    thread = threading.Thread(target=thread_func,
-                              name="Local cmd execution")
-    thread.daemon = True
-    thread.start()
-    thread.join(timeout)
-
-    if thread.is_alive():
-        if psutil is not None:
-            parent = psutil.Process(proc.pid)
-            for child in parent.children(recursive=True):
-                child.kill()
-            parent.kill()
-        else:
-            proc.kill()
-
-        thread.join()
-        raise RuntimeError("Local process timeout: " + cmd_str)
-
-    stdout_data, stderr_data = zip(*res)  # type: List[bytes], List[bytes]
-
-    out = b"".join(stdout_data).decode("utf8")
-    err = b"".join(stderr_data).decode("utf8")
-
-    if 0 != proc.returncode:
-        raise subprocess.CalledProcessError(proc.returncode,
-                                            cmd_str, out + err)
-
-    return out
-
-
-def get_ip_for_target(target_ip: str) -> str:
-    if not is_ip(target_ip):
-        target_ip = socket.gethostbyname(target_ip)
-
-    first_dig = map(int, target_ip.split("."))
-    if first_dig == 127:
-        return '127.0.0.1'
-
-    data = run_locally('ip route get to'.split(" ") + [target_ip])
-
-    rr1 = r'{0} via [.0-9]+ dev (?P<dev>.*?) src (?P<ip>[.0-9]+)$'
-    rr1 = rr1.replace(" ", r'\s+')
-    rr1 = rr1.format(target_ip.replace('.', r'\.'))
-
-    rr2 = r'{0} dev (?P<dev>.*?) src (?P<ip>[.0-9]+)$'
-    rr2 = rr2.replace(" ", r'\s+')
-    rr2 = rr2.format(target_ip.replace('.', r'\.'))
-
-    data_line = data.split("\n")[0].strip()
-    res1 = re.match(rr1, data_line)
-    res2 = re.match(rr2, data_line)
-
-    if res1 is not None:
-        return res1.group('ip')
-
-    if res2 is not None:
-        return res2.group('ip')
-
-    raise OSError("Can't define interface for {0}".format(target_ip))
-
-
-def open_for_append_or_create(fname: str) -> IO[str]:
-    if not os.path.exists(fname):
-        return open(fname, "w")
-
-    fd = open(fname, 'r+')
-    fd.seek(0, os.SEEK_END)
-    return fd
-
-
-def sec_to_str(seconds: int) -> str:
-    h = seconds // 3600
-    m = (seconds % 3600) // 60
-    s = seconds % 60
-    return "{}:{:02d}:{:02d}".format(h, m, s)
-
-
 def yamable(data: Any) -> Any:
     if isinstance(data, (tuple, list)):
         return map(yamable, data)
@@ -352,16 +75,6 @@
     return data
 
 
-def flatten(data: Iterable[Any]) -> List[Any]:
-    res = []
-    for i in data:
-        if isinstance(i, (list, tuple, set)):
-            res.extend(flatten(i))
-        else:
-            res.append(i)
-    return res
-
-
 def get_creds_openrc(path: str) -> Tuple[str, str, str, str, bool]:
     fc = open(path).read()
 
@@ -369,7 +82,7 @@
 
     msg = "Failed to get creads from openrc file"
     with LogError(msg):
-        data = run_locally(['/bin/bash'], input_data=fc + "\n" + echo)
+        data = run_locally(['/bin/bash'], input_data=(fc + "\n" + echo).encode('utf8')).decode("utf8")
 
     msg = "Failed to get creads from openrc file: " + data
     with LogError(msg):
@@ -383,19 +96,6 @@
     return user, passwd, tenant, auth_url, insecure
 
 
-def which(program: str) -> Optional[str]:
-    def is_exe(fpath):
-        return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
-
-    for path in os.environ["PATH"].split(os.pathsep):
-        path = path.strip('"')
-        exe_file = os.path.join(path, program)
-        if is_exe(exe_file):
-            return exe_file
-
-    return None
-
-
 @contextlib.contextmanager
 def empty_ctx(val: Any = None) -> Iterator[Any]:
     yield val
@@ -414,17 +114,6 @@
     return results_dir, run_uuid
 
 
-def to_ip(host_or_ip: str) -> str:
-    # translate hostname to address
-    try:
-        ipaddress.ip_address(host_or_ip)
-        return host_or_ip
-    except ValueError:
-        ip_addr = socket.gethostbyname(host_or_ip)
-        logger.info("Will use ip_addr %r instead of hostname %r", ip_addr, host_or_ip)
-        return ip_addr
-
-
 def get_time_interval_printable_info(seconds: int) -> Tuple[str, str]:
     exec_time_s = sec_to_str(seconds)
     now_dt = datetime.datetime.now()
@@ -432,68 +121,6 @@
     return exec_time_s, "{:%H:%M:%S}".format(end_dt)
 
 
-FM_FUNC_INPUT = TypeVar("FM_FUNC_INPUT")
-FM_FUNC_RES = TypeVar("FM_FUNC_RES")
-
-
-def flatmap(func: Callable[[FM_FUNC_INPUT], Iterable[FM_FUNC_RES]],
-            inp_iter: Iterable[FM_FUNC_INPUT]) -> Iterator[FM_FUNC_RES]:
-    for val in inp_iter:
-        for res in func(val):
-            yield res
-
-
-_coefs = {
-    'n': Fraction(1, 1000**3),
-    'u': Fraction(1, 1000**2),
-    'm': Fraction(1, 1000),
-    'K': 1000,
-    'M': 1000 ** 2,
-    'G': 1000 ** 3,
-    'Ki': 1024,
-    'Mi': 1024 ** 2,
-    'Gi': 1024 ** 3,
-}
-
-
-def split_unit(units: str) -> Tuple[Union[Fraction, int], str]:
-    if len(units) > 2 and units[:2] in _coefs:
-        return _coefs[units[:2]], units[2:]
-    if len(units) > 1 and units[0] in _coefs:
-        return _coefs[units[0]], units[1:]
-    else:
-        return 1, units
-
-
-conversion_cache = {}
-
-
-def unit_conversion_coef(from_unit: str, to_unit: str) -> Union[Fraction, int]:
-    key = (from_unit, to_unit)
-    if key in conversion_cache:
-        return conversion_cache[key]
-
-    f1, u1 = split_unit(from_unit)
-    f2, u2 = split_unit(to_unit)
-
-    assert u1 == u2, "Can't convert {!r} to {!r}".format(from_unit, to_unit)
-
-    if isinstance(f1, int) and isinstance(f2, int):
-        if f1 % f2 != 0:
-            res = Fraction(f1, f2)
-        else:
-            res = f1 // f2
-    else:
-        res = f1 / f2
-
-    if isinstance(res, Fraction) and cast(Fraction, res).denominator == 1:
-        res = cast(Fraction, res).numerator
-
-    conversion_cache[key] = res
-
-    return res
-
-
 def shape2str(shape: Iterable[int]) -> str:
     return "*".join(map(str, shape))