moving code to cephlib
diff --git a/scripts/perf.py b/scripts/perf.py
new file mode 100644
index 0000000..96f3436
--- /dev/null
+++ b/scripts/perf.py
@@ -0,0 +1,7 @@
+from wally import main
+opts = "X -l DEBUG report /tmp/perf_tests/warm_doe".split()
+
+def x():
+    main.main(opts)
+
+x()
diff --git a/v2_plans.md b/v2_plans.md
index 56e2669..70a0caf 100644
--- a/v2_plans.md
+++ b/v2_plans.md
@@ -1,118 +1,176 @@
-* Remarks:
-    * With current code impossible to do vm count scan test
+Wally состоит из частей, которые стоит
+разделить и унифицировать с другими тулами
+------------------------------------------
 
-* TODO next
-    * unit tests for math functions
-    * CEPH PERFORMANCE COUNTERS
-    * Sync storage_structure
-    * fix fio job summary
-    * Use disk max QD as qd limit?
-    * Cumulative statistic table for all jobs
-    * Add column for job params, which show how many cluster resource consumed
-    * show extra outliers with arrows
-    * More X = func(QD) plots. Eg. - kurt/skew, etc.
-    * Hide cluster load if no nodes available
-    * Show latency skew and curtosis
-    * Sort engineering report by result tuple
-    * Name engineering reports by long summary
-    * Latency heatmap and violin aren't consistent
-    * profile violint plot
-    * Fix plot layout, there to much unused space around typical plot
-    * iops boxplot as function from QD
-    * collect device types mapping from nodes - device should be block/net/...
-    * Optimize sensor communication with ceph, can run fist OSD request for
-      data validation only on start.
-    * Update Storage test, add tests for stat and plot module
-    * Aggregated sensors boxplot
-    * Hitmap for aggregated sensors
-    * automatically find what to plot from storage data (but also allow to select via config)
+* Сделать ceph-lib, вынести ее в отдельный проект,
+  должна поддерживать и 2.7 и 3.5 и не иметь строгих внешних
+  бинарных зависимостейю В нее вынести:
+    * Cluster detector
+    * Cluster info collector
+    * Monitoring
+    * FSStorage
+
+* Openstack VM spawn
+* Load generators
+* Load results visualizator
+* Cluster load visualization
+* Поиск узких мест
+* Расчет потребляемых ресурсов
+* Сопрягающий код
+* Хранилища должны легко подключаться
+
+* Расчет потребления ресурсов сделать конфигурируемым -
+  указывать соотношения чего с чем считать
+* В конфиге задавать storage plugin
+
+
+Ресурсы:
+--------
+На выходе из сенсоров есть 
+
+NODE_OR_ROLE.DEVICE.SENSOR
+
+create namespace with all nodes/roles as objects with specially overloaded
+__getattr__ method to handle device and then handle sensor.
+Make eval on result
+
+
+(CLUSTER.DISK.riops + CLUSTER.DISK.wiops) / (VM.DISK.riops + VM.DISK.wiops)
+
+
+Remarks:
+--------
+
+* With current code impossible to do vm count scan test
+
+TODO next
+---------
+
+* Remove DBStorage, merge FSStorage and serializer into
+  ObjStorage, separate TSStorage.
+* Build WallyStorage on top of it, use only WallyStorage in code
+* check that OS key match what is stored on disk 
+* unit tests for math functions
+* CEPH PERFORMANCE COUNTERS
+* Sync storage_structure
+* fix fio job summary
+* Use disk max QD as qd limit?
+* Cumulative statistic table for all jobs
+* Add column for job params, which show how many cluster resource consumed
+* show extra outliers with arrows
+* More X = func(QD) plots. Eg. - kurt/skew, etc.
+* Hide cluster load if no nodes available
+* Show latency skew and curtosis
+* Sort engineering report by result tuple
+* Name engineering reports by long summary
+* Latency heatmap and violin aren't consistent
+* profile violint plot
+* Fix plot layout, there to much unused space around typical plot
+* iops boxplot as function from QD
+* collect device types mapping from nodes - device should be block/net/...
+* Optimize sensor communication with ceph, can run fist OSD request for
+  data validation only on start.
+* Update Storage test, add tests for stat and plot module
+* Aggregated sensors boxplot
+* Hitmap for aggregated sensors
+* automatically find what to plot from storage data (but also allow to select via config)
 
 Have to think:
-    * Each sensor should collect only one portion of data. During
-      start it should scan all available sources and tell upper code to create separated funcs for them.
-    * store statistic results in storage
-    * During prefill check io on file
-    * Store percentiles levels in TS, separate 1D TS and 2D TS to different classes, store levels in 2D TS
-    * weight average and deviation
-    * C++/Go disk stat sensors to measure IOPS/Lat on milliseconds
+--------------
+* Send data to external storage
+* Each sensor should collect only one portion of data. During
+  start it should scan all available sources and tell upper code to create separated funcs for them.
+* store statistic results in storage
+* During prefill check io on file
+* Store percentiles levels in TS, separate 1D TS and 2D TS to different classes, store levels in 2D TS
+* weight average and deviation
+* C++/Go disk stat sensors to measure IOPS/Lat on milliseconds
 
 * TODO large
-    * Force to kill running fio on ctrl+C and correct cleanup or cleanup all previous run with 'wally cleanup PATH'
+------------
+* Force to kill running fio on ctrl+C and correct cleanup or cleanup all previous run with 'wally cleanup PATH'
 
 * Code:
-    * RW mixed report
-    * RPC reconnect in case of errors
-    * store more information for node - OSD settings, FS on test nodes, target block device settings on test nodes
-    * Sensors
-        - Revise sensors code. Prepack on node side, different sensors data types
-        - perf
-        - [bcc](https://github.com/iovisor/bcc)
-        - ceph sensors
-    * Config validation
-    * Add sync 4k write with small set of thcount
-    * Flexible SSH connection creds - use agent, default ssh settings or part of config
-    * Remove created temporary files - create all tempfiles via func from .utils, which track them
-    * Use ceph-monitoring from wally
-    * Use warm-up detection to select real test time.
-    * Report code:
-        - Compatible report types set up by config and load??
-    * Calculate statistic for previous iteration in background
+-------
+* RW mixed report
+* RPC reconnect in case of errors
+* store more information for node - OSD settings, FS on test nodes, target block device settings on test nodes
+* Sensors
+    - Revise sensors code. Prepack on node side, different sensors data types
+    - perf
+    - [bcc](https://github.com/iovisor/bcc)
+    - ceph sensors
+* Config validation
+* Add sync 4k write with small set of thcount
+* Flexible SSH connection creds - use agent, default ssh settings or part of config
+* Remove created temporary files - create all tempfiles via func from .utils, which track them
+* Use ceph-monitoring from wally
+* Use warm-up detection to select real test time.
+* Report code:
+    - Compatible report types set up by config and load??
+* Calculate statistic for previous iteration in background
         
 * UT
-    * UT, which run test with predefined in yaml cluster (cluster and config created separatelly, not with tests)
-      and check that result storage work as expected. Declare db sheme in seaprated yaml file, UT should check.
-    * White-box event logs for UT
-    * Result-to-yaml for UT
+----
+* UT, which run test with predefined in yaml cluster (cluster and config created separatelly, not with tests)
+  and check that result storage work as expected. Declare db sheme in seaprated yaml file, UT should check.
+* White-box event logs for UT
+* Result-to-yaml for UT
 
 * Infra:
-    * Add script to download fio from git and build it
-    * Docker/lxd public container as default distribution way
-    * Update setup.py to provide CLI entry points
+--------
+* Add script to download fio from git and build it
+* Docker/lxd public container as default distribution way
+* Update setup.py to provide CLI entry points
 
 * Statistical result check and report:
-    * [Q-Q plot](https://en.wikipedia.org/wiki/Q%E2%80%93Q_plot)
-    * Check results distribution
-    * Warn for non-normal results
-    * Check that distribution of different parts is close. Average performance should be steady across test
-    * Node histogram distribution
-    * Interactive report, which shows different plots and data,
-      depending on selected visualization type
-    * Offload simple report table to cvs/yaml/json/test/ascii_table
-    * fio load reporters (visualizers), ceph report tool
-        [ceph-viz-histo](https://github.com/cronburg/ceph-viz/tree/master/histogram)
-    * evaluate bokeh for visualization
-    * [flamegraph](https://www.youtube.com/watch?v=nZfNehCzGdw) for 'perf' output
-    * detect internal pattern:
-        - FFT
-        - http://mabrek.github.io/
-        - https://github.com/rushter/MLAlgorithms
-        - https://github.com/rushter/data-science-blogs
-        - https://habrahabr.ru/post/311092/
-        - https://blog.cloudera.com/blog/2015/12/common-probability-distributions-the-data-scientists-crib-sheet/
-        - http://docs.scipy.org/doc/scipy-0.14.0/reference/generated/scipy.stats.mstats.normaltest.html
-        - http://profitraders.com/Math/Shapiro.html
-        - http://www.machinelearning.ru/wiki/index.php?title=%D0%9A%D1%80%D0%B8%D1%82%D0%B5%D1%80%D0%B8%D0%B9_%D1%85%D0%B8-%D0%BA%D0%B2%D0%B0%D0%B4%D1%80%D0%B0%D1%82
-        - http://docs.scipy.org/doc/numpy/reference/generated/numpy.fft.fft.html#numpy.fft.fft
-        - https://en.wikipedia.org/wiki/Log-normal_distribution
-        - http://stats.stackexchange.com/questions/25709/what-distribution-is-most-commonly-used-to-model-server-response-time
-        - http://www.lognormal.com/features/
-        - http://blog.simiacryptus.com/2015/10/modeling-network-latency.html
-    * For HDD read/write - report caches hit ratio, maps of real read/writes, FS counters
-    * Report help page, link for explanations
-    * checkboxes for show/hide part of image
-    * pop-up help for part of picture
-    * pop-up text values for bars/lines
-    * waterfall charts for ceph request processing
-    * correct comparison between different systems
+--------------------------------------
+* KDE on latency, than found local extremums and estimate
+  effective cache sizes from them
+* [Q-Q plot](https://en.wikipedia.org/wiki/Q%E2%80%93Q_plot)
+* Check results distribution
+* Warn for non-normal results
+* Check that distribution of different parts is close. Average performance should be steady across test
+* Node histogram distribution
+* Interactive report, which shows different plots and data,
+  depending on selected visualization type
+* Offload simple report table to cvs/yaml/json/test/ascii_table
+* fio load reporters (visualizers), ceph report tool
+    [ceph-viz-histo](https://github.com/cronburg/ceph-viz/tree/master/histogram)
+* evaluate bokeh for visualization
+* [flamegraph](https://www.youtube.com/watch?v=nZfNehCzGdw) for 'perf' output
+* detect internal pattern:
+    - FFT
+    - http://mabrek.github.io/
+    - https://github.com/rushter/MLAlgorithms
+    - https://github.com/rushter/data-science-blogs
+    - https://habrahabr.ru/post/311092/
+    - https://blog.cloudera.com/blog/2015/12/common-probability-distributions-the-data-scientists-crib-sheet/
+    - http://docs.scipy.org/doc/scipy-0.14.0/reference/generated/scipy.stats.mstats.normaltest.html
+    - http://profitraders.com/Math/Shapiro.html
+    - http://www.machinelearning.ru/wiki/index.php?title=%D0%9A%D1%80%D0%B8%D1%82%D0%B5%D1%80%D0%B8%D0%B9_%D1%85%D0%B8-%D0%BA%D0%B2%D0%B0%D0%B4%D1%80%D0%B0%D1%82
+    - http://docs.scipy.org/doc/numpy/reference/generated/numpy.fft.fft.html#numpy.fft.fft
+    - https://en.wikipedia.org/wiki/Log-normal_distribution
+    - http://stats.stackexchange.com/questions/25709/what-distribution-is-most-commonly-used-to-model-server-response-time
+    - http://www.lognormal.com/features/
+    - http://blog.simiacryptus.com/2015/10/modeling-network-latency.html
+* For HDD read/write - report caches hit ratio, maps of real read/writes, FS counters
+* Report help page, link for explanations
+* checkboxes for show/hide part of image
+* pop-up help for part of picture
+* pop-up text values for bars/lines
+* waterfall charts for ceph request processing
+* correct comparison between different systems
 
 * Maybe move to 2.1:
-    * Add sensor collection time to them
-    * Make collection interval configurable per sensor type, make collection time separated for each sensor
-    * DB <-> files conversion, or just store all the time in files as well
-    * Automatically scale QD till saturation
-    * Runtime visualization
-    * Integrate vdbench/spc/TPC/TPB
-    * Add aio rpc client
-    * Add integration tests with nbd
-    * fix existing folder detection
-    * Simple REST API for external in-browser UI
\ No newline at end of file
+--------------------
+* Add sensor collection time to them
+* Make collection interval configurable per sensor type, make collection time separated for each sensor
+* DB <-> files conversion, or just store all the time in files as well
+* Automatically scale QD till saturation
+* Runtime visualization
+* Integrate vdbench/spc/TPC/TPB
+* Add aio rpc client
+* Add integration tests with nbd
+* fix existing folder detection
+* Simple REST API for external in-browser UI
diff --git a/wally/ceph.py b/wally/ceph.py
index a374ed6..9734baa 100644
--- a/wally/ceph.py
+++ b/wally/ceph.py
@@ -14,83 +14,23 @@
 from .utils import StopTestError, to_ip
 
 
+from cephlib import discover
+from cephlib.discover import OSDInfo
+
+
 logger = logging.getLogger("wally")
 
 
-class OSDInfo:
-    def __init__(self, id: int, journal: str = None, storage: str = None) -> None:
-        self.id = id
-        self.journal = journal
-        self.storage = storage
-
-
-def get_osds_info(node: IRPCNode, conf: str, key: str) -> Dict[IP, List[OSDInfo]]:
+def get_osds_info(node: IRPCNode, ceph_extra_args: str = "") -> Dict[IP, List[OSDInfo]]:
     """Get set of osd's ip"""
-
-    data = node.run("ceph -c {} -k {} --format json osd dump".format(conf, key))
-    try:
-        jdata = json.loads(data)
-    except:
-        open("/tmp/ceph-out.json", "w").write(data)
-        raise
-    ips = {}  # type: Dict[IP, List[OSDInfo]]
-    first_error = True
-    for osd_data in jdata["osds"]:
-        osd_id = int(osd_data["osd"])
-        if "public_addr" not in osd_data:
-            if first_error:
-                logger.warning("No 'public_addr' field in 'ceph osd dump' output for osd %s" +
-                               "(all subsequent errors omitted)", osd_id)
-                first_error = False
-        else:
-            ip_port = osd_data["public_addr"]
-            if '/' in ip_port:
-                ip_port = ip_port.split("/", 1)[0]
-            ip = IP(ip_port.split(":")[0])
-
-            osd_journal_path = None  # type: Optional[str]
-            osd_data_path = None  # type: Optional[str]
-
-            # TODO: parallelize this!
-            osd_cfg = node.run("ceph -n osd.{} --show-config".format(osd_id))
-            for line in osd_cfg.split("\n"):
-                if line.startswith("osd_journal ="):
-                    osd_journal_path = line.split("=")[1].strip()
-                elif line.startswith("osd_data ="):
-                    osd_data_path = line.split("=")[1].strip()
-
-            if osd_data_path is None or osd_journal_path is None:
-                open("/tmp/ceph-out.json", "w").write(osd_cfg)
-                logger.error("Can't detect osd %s journal or storage path", osd_id)
-                raise StopTestError()
-
-            ips.setdefault(ip, []).append(OSDInfo(osd_id,
-                                                  journal=osd_journal_path,
-                                                  storage=osd_data_path))
-    return ips
+    res = {}  # type: Dict[IP, List[OSDInfo]]
+    return {IP(ip): osd_info_list
+            for ip, osd_info_list in discover.get_osds_nodes(node.run, ceph_extra_args)}
 
 
-def get_mons_ips(node: IRPCNode, conf: str, key: str) -> Set[IP]:
+def get_mons_ips(node: IRPCNode, ceph_extra_args: str = "") -> Set[IP]:
     """Return mon ip set"""
-
-    data = node.run("ceph -c {} -k {} --format json mon_status".format(conf, key))
-    jdata = json.loads(data)
-    ips = set()  # type: Set[IP]
-    first_error = True
-    for mon_data in jdata["monmap"]["mons"]:
-        if "addr" not in mon_data:
-            if first_error:
-                mon_name = mon_data.get("name", "<MON_NAME_MISSED>")
-                logger.warning("No 'addr' field in 'ceph mon_status' output for mon %s" +
-                               "(all subsequent errors omitted)", mon_name)
-                first_error = False
-        else:
-            ip_port = mon_data["addr"]
-            if '/' in ip_port:
-                ip_port = ip_port.split("/", 1)[0]
-            ips.add(IP(ip_port.split(":")[0]))
-
-    return ips
+    return {IP(ip) for ip in discover.get_mons_nodes(node.run, ceph_extra_args).values()}
 
 
 class DiscoverCephStage(Stage):
@@ -112,33 +52,39 @@
         ceph = ctx.config.ceph
         root_node_uri = cast(str, ceph.root_node)
         cluster = ceph.get("cluster", "ceph")
+
         conf = ceph.get("conf")
         key = ceph.get("key")
 
-        logger.debug("Start discovering ceph nodes from root %s", root_node_uri)
-        logger.debug("cluster=%s key=%s conf=%s", cluster, conf, key)
-
-        info = NodeInfo(parse_ssh_uri(root_node_uri), set())
-
         if conf is None:
             conf = "/etc/ceph/{}.conf".format(cluster)
 
         if key is None:
             key = "/etc/ceph/{}.client.admin.keyring".format(cluster)
 
+        ceph_extra_args = ""
+
+        if conf:
+            ceph_extra_args += " -c '{}'".format(conf)
+
+        if key:
+            ceph_extra_args += " -k '{}'".format(key)
+
+        logger.debug("Start discovering ceph nodes from root %s", root_node_uri)
+        logger.debug("cluster=%s key=%s conf=%s", cluster, conf, key)
+
+        info = NodeInfo(parse_ssh_uri(root_node_uri), set())
+
         ceph_params = {"cluster": cluster, "conf": conf, "key": key}
 
-        with setup_rpc(connect(info),
-                       ctx.rpc_code,
-                       ctx.default_rpc_plugins,
+        with setup_rpc(connect(info), ctx.rpc_code, ctx.default_rpc_plugins,
                        log_level=ctx.config.rpc_log_level) as node:
 
             ssh_key = node.get_file_content("~/.ssh/id_rsa")
 
-
             try:
                 ips = set()
-                for ip, osds_info in get_osds_info(node, conf, key).items():
+                for ip, osds_info in get_osds_info(node, ceph_extra_args).items():
                     ips.add(ip)
                     creds = ConnCreds(to_ip(cast(str, ip)), user="root", key=ssh_key)
                     info = ctx.merge_node(creds, {'ceph-osd'})
@@ -156,7 +102,7 @@
 
             try:
                 counter = 0
-                for counter, ip in enumerate(get_mons_ips(node, conf, key)):
+                for counter, ip in enumerate(get_mons_ips(node, ceph_extra_args)):
                     creds = ConnCreds(to_ip(cast(str, ip)), user="root", key=ssh_key)
                     info = ctx.merge_node(creds, {'ceph-mon'})
                     assert 'ceph' not in info.params or info.params['ceph'] == ceph_params
diff --git a/wally/hlstorage.py b/wally/hlstorage.py
index f2c9488..3672458 100644
--- a/wally/hlstorage.py
+++ b/wally/hlstorage.py
@@ -1,13 +1,13 @@
 import os
 import logging
-from typing import cast, Iterator, Tuple, Type, Dict, Optional
+from typing import cast, Iterator, Tuple, Type, Dict, Optional, Any, List
 
 import numpy
 
 from .suits.job import JobConfig
 from .result_classes import SuiteConfig, TimeSeries, DataSource, StatProps, IResultStorage
-from .storage import Storage, csv_file_encoding
-from .utils import StopTestError, str2shape, shape2str
+from .storage import Storage
+from .utils import StopTestError
 from .suits.all_suits import all_suits
 
 
@@ -41,7 +41,7 @@
     sensor_time_r = r'sensors/{node_id}_collected_at\.csv'
 
     report_root = 'report/'
-    plot_r = r'report/{suite_id}\.{job_id}/{node_id}\.{sensor}\.{dev}\.{metric}\.{tag}'
+    plot_r = r'{suite_id}\.{job_id}/{node_id}\.{sensor}\.{dev}\.{metric}\.{tag}'
 
     job_cfg = job_cfg_r.replace("\\.", '.')
     suite_cfg = suite_cfg_r.replace("\\.", '.')
@@ -70,39 +70,92 @@
     ts_header_size = 64
     ts_header_format = "!IIIcc"
     ts_arr_tag = 'csv'
+    csv_file_encoding = 'ascii'
 
     def __init__(self, storage: Storage) -> None:
         self.storage = storage
+        self.cache = {}  # type: Dict[str, Tuple[int, int, Any, List[str]]]
 
     def sync(self) -> None:
         self.storage.sync()
 
     #  -----------------  SERIALIZATION / DESERIALIZATION  -------------------------------------------------------------
+    def load_array(self, path: str, skip_shape: bool = False) -> Tuple[numpy.array, Tuple[str, ...]]:
+        with self.storage.get_fd(path, "rb") as fd:
+            stats = os.fstat(fd.fileno())
+            if path in self.cache:
+                size, atime, obj, header = self.cache[path]
+                if size == stats.st_size and atime == stats.st_atime_ns:
+                    return obj, header
+
+            header = fd.readline().decode(self.csv_file_encoding).strip().split(",")
+            print("header =", header)
+            if skip_shape:
+                header = header[1:]
+            dt = fd.read().decode("utf-8").strip()
+            print(dt.split("\n")[0])
+            arr = numpy.fromstring(dt.replace("\n", ','), sep=',', dtype=header[0])
+            if len(dt) != 0:
+                lines = dt.count("\n") + 1
+                columns = dt.split("\n", 1)[0].count(",") + 1
+                assert lines * columns == len(arr)
+                if columns == 1:
+                    arr.shape = (lines,)
+                else:
+                    arr.shape = (lines, columns)
+
+        self.cache[path] = (stats.st_size, stats.st_atime_ns, arr, header[1:])
+        return arr, header[1:]
+
+    def put_array(self, path:str, data: numpy.array, header: List[str], append_on_exists: bool = False) -> None:
+        header = [data.dtype.name] + header
+
+        exists = append_on_exists and path in self.storage
+        if len(data.shape) == 1:
+            # make array vertical to simplify reading
+            vw = data.view().reshape((data.shape[0], 1))
+        else:
+            vw = data
+
+        with self.storage.get_fd(path, "cb" if exists else "wb") as fd:
+            if exists:
+                curr_header = fd.readline().decode(self.csv_file_encoding).rstrip().split(",")
+                assert header == curr_header, \
+                    "Path {!r}. Expected header ({!r}) and current header ({!r}) don't match"\
+                        .format(path, header, curr_header)
+                fd.seek(0, os.SEEK_END)
+            else:
+                fd.write((",".join(header) + "\n").encode(self.csv_file_encoding))
+
+            numpy.savetxt(fd, vw, delimiter=',', newline="\n", fmt="%lu")
 
     def load_ts(self, ds: DataSource, path: str) -> TimeSeries:
+        arr, header = self.load_array(path, skip_shape=True)
+        units, time_units = header
 
-        with self.storage.get_fd(path, "rb") as fd:
-            header = fd.readline().decode(csv_file_encoding).strip().split(",")
-            shape, dtype, units, time_units = header
-            arr = numpy.loadtxt(fd, delimiter=',', dtype=dtype)
+        data = arr[:,1:]
+        if data.shape[1] == 1:
+            data = data.reshape((-1,))
 
         return TimeSeries("{}.{}".format(ds.dev, ds.sensor),
                           raw=None,
-                          data=arr[:,1:].reshape(str2shape(shape)),
+                          data=data,
                           times=arr[:,0],
                           source=ds,
                           units=units,
                           time_units=time_units)
 
     def load_sensor(self, ds: DataSource) -> TimeSeries:
-        collect_header, collected_at = self.storage.get_array(DB_paths.sensor_time.format(**ds.__dict__))
+        collected_at, collect_header = self.load_array(DB_paths.sensor_time.format(**ds.__dict__))
         assert collect_header == [ds.node_id, 'collected_at', 'us'], repr(collect_header)
-
-        data_header, data = self.storage.get_array(DB_paths.sensor_data.format(**ds.__dict__))
+        data, data_header = self.load_array(DB_paths.sensor_data.format(**ds.__dict__))
 
         data_units = data_header[2]
         assert data_header == [ds.node_id, ds.metric_fqdn, data_units]
 
+        assert len(data.shape) == 1
+        assert len(collected_at.shape) == 1
+
         return TimeSeries(ds.metric_fqdn,
                           raw=None,
                           data=data,
@@ -115,7 +168,7 @@
 
     def check_plot_file(self, source: DataSource) -> Optional[str]:
         path = DB_paths.plot.format(**source.__dict__)
-        fpath = self.storage.resolve_raw(path)
+        fpath = self.storage.resolve_raw(DB_paths.report_root + path)
         return path if os.path.exists(fpath) else None
 
     # -------------   PUT DATA INTO STORAGE   --------------------------------------------------------------------------
@@ -140,22 +193,16 @@
         assert ts.source.tag == self.ts_arr_tag
 
         csv_path = DB_paths.ts.format(**ts.source.__dict__)
-        header = [shape2str(ts.data.shape),
-                  ts.data.dtype.name,
-                  ts.units,
-                  ts.time_units]
+        header = [ts.data.dtype.name, ts.units, ts.time_units]
 
-        with self.storage.get_fd(csv_path, "cb") as fd:
-            tv = ts.times.view().reshape((-1, 1))
+        tv = ts.times.view().reshape((-1, 1))
+        if len(ts.data.shape) == 1:
+            dv = ts.data.view().reshape((ts.times.shape[0], -1))
+        else:
+            dv = ts.data
 
-            if len(ts.data.shape) == 1:
-                dv = ts.data.view().reshape((ts.times.shape[0], -1))
-            else:
-                dv = ts.data
-
-            result = numpy.concatenate((tv, dv), axis=1)
-            fd.write((",".join(map(str, header)) + "\n").encode(csv_file_encoding))
-            numpy.savetxt(fd, result, delimiter=',', newline="\n", fmt="%lu")
+        result = numpy.concatenate((tv, dv), axis=1)
+        self.put_array(csv_path, result, header)
 
         if ts.raw:
             raw_path = DB_paths.ts.format(**ts.source(tag=ts.raw_tag).__dict__)
@@ -170,10 +217,11 @@
     # return path to file to be inserted into report
     def put_plot_file(self, data: bytes, source: DataSource) -> str:
         path = DB_paths.plot.format(**source.__dict__)
-        return cast(str, self.storage.put_raw(data, path))
+        self.storage.put_raw(data, DB_paths.report_root + path)
+        return path
 
     def put_report(self, report: str, name: str) -> str:
-        return self.storage.put_raw(report.encode("utf8"), DB_paths.report_root + name)
+        return self.storage.put_raw(report.encode(self.csv_file_encoding), DB_paths.report_root + name)
 
     def append_sensor(self, data: numpy.array, ds: DataSource, units: str) -> None:
         if ds.metric == 'collected_at':
@@ -182,7 +230,7 @@
         else:
             path = DB_paths.sensor_data
             metrics_fqn = ds.metric_fqdn
-        self.storage.append([ds.node_id, metrics_fqn, units], data, path.format(**ds.__dict__))
+        self.put_array(path.format(**ds.__dict__), data, [ds.node_id, metrics_fqn, units], append_on_exists=True)
 
     # -------------   GET DATA FROM STORAGE   --------------------------------------------------------------------------
 
@@ -217,7 +265,6 @@
     def iter_ts(self, suite: SuiteConfig, job: JobConfig, **filters) -> Iterator[TimeSeries]:
         filters.update(suite_id=suite.storage_id, job_id=job.storage_id)
         ts_glob = fill_path(DB_paths.ts_r, **filters)
-
         for is_file, path, groups in self.iter_paths(ts_glob):
             assert is_file
             groups = groups.copy()
diff --git a/wally/report.py b/wally/report.py
index 0b0540e..68170ec 100644
--- a/wally/report.py
+++ b/wally/report.py
@@ -19,8 +19,7 @@
 from .node_interfaces import NodeInfo
 from .utils import b2ssize, b2ssize_10, STORAGE_ROLES
 from .statistic import (calc_norm_stat_props, calc_histo_stat_props, moving_average, moving_dev,
-                        hist_outliers_perc, ts_hist_outliers_perc, find_ouliers_ts, approximate_curve,
-                        rebin_histogram)
+                        hist_outliers_perc, ts_hist_outliers_perc, find_ouliers_ts, approximate_curve)
 from .result_classes import (StatProps, DataSource, TimeSeries, NormStatProps, HistoStatProps, SuiteConfig,
                              IResultStorage)
 from .suits.io.fio_hist import get_lat_vals, expected_lat_bins
@@ -158,7 +157,7 @@
     return IOSummary(job.qd,
                      nodes_count=len(suite.nodes_ids),
                      block_size=job.bsize,
-                     lat=calc_histo_stat_props(lat, bins_edges, StyleProfile.hist_boxes),
+                     lat=calc_histo_stat_props(lat, bins_edges, rebins_count=StyleProfile.hist_boxes),
                      bw=calc_norm_stat_props(io, StyleProfile.hist_boxes))
 
 #
@@ -192,7 +191,7 @@
 
 
 def get_aggregated(rstorage: ResultStorage, suite: SuiteConfig, job: FioJobConfig, metric: str) -> TimeSeries:
-    tss = list(rstorage.iter_ts(suite, job, sensor=metric))
+    tss = list(rstorage.iter_ts(suite, job, metric=metric))
     ds = DataSource(suite_id=suite.storage_id,
                     job_id=job.storage_id,
                     node_id=AGG_TAG,
@@ -214,13 +213,13 @@
                          "shape=%s. Can only process sensors with shape=[X, %s].",
                          ts.source.dev, ts.source.sensor, ts.source.node_id,
                          ts.data.shape, expected_lat_bins)
-            continue
+            raise ValueError()
 
         if metric != 'lat' and len(ts.data.shape) != 1:
-            logger.error("Sensor %s.%s on node %s has" +
+            logger.error("Sensor %s.%s on node %s has " +
                          "shape=%s. Can only process 1D sensors.",
                          ts.source.dev, ts.source.sensor, ts.source.node_id, ts.data.shape)
-            continue
+            raise ValueError()
 
         # TODO: match times on different ts
         agg_ts.data += ts.data
@@ -290,7 +289,7 @@
     val_it = iter(sensor_data.data[pos1 - 1: pos2 + 1])
 
     # result array, cumulative value per second
-    result = numpy.zeros((end - begin) // MICRO)
+    result = numpy.zeros(int(end - begin) // MICRO)
     idx = 0
     curr_summ = 0
 
@@ -965,7 +964,7 @@
         storage_nodes = [node.node_id for node in nodes if node.roles.intersection(STORAGE_ROLES)]
         test_nodes = [node.node_id for node in nodes if "testnode" in node.roles]
 
-        trange = [job.reliable_info_range[0] / 1000, job.reliable_info_range[1] / 1000]
+        trange = (job.reliable_info_range[0] / 1000, job.reliable_info_range[1] / 1000)
         ops_done = io_transfered / fjob.bsize / KB
 
         all_metrics = [
@@ -1035,7 +1034,7 @@
 
         agg_lat = get_aggregated(rstorage, suite, fjob, "lat")
         bins_edges = numpy.array(get_lat_vals(agg_lat.data.shape[1]), dtype='float32') / 1000  # convert us to ms
-        lat_stat_prop = calc_histo_stat_props(agg_lat, bins_edges, bins_count=StyleProfile.hist_lat_boxes)
+        lat_stat_prop = calc_histo_stat_props(agg_lat, bins_edges, rebins_count=StyleProfile.hist_lat_boxes)
 
         # import IPython
         # IPython.embed()
@@ -1179,6 +1178,7 @@
                             units=units,
                             time_units="us",
                             source=ds)
+
             fpath = plot_v_over_time(rstorage, ds, sensor_title, sensor_title, ts=ts)  # type: str
             yield Menu1st.per_job, job.summary, HTMLBlock(html.img(fpath))
 
diff --git a/wally/sensors.py b/wally/sensors.py
index bcdb4e3..c773b34 100644
--- a/wally/sensors.py
+++ b/wally/sensors.py
@@ -95,7 +95,7 @@
                     node_cfg['ceph'].update(node.info.params['ceph'])
                     node_cfg['ceph']['osds'] = [osd.id for osd in node.info.params['ceph-osds']]  # type: ignore
 
-                logger.debug("Setting up sensort RPC plugin for node %s", nid)
+                logger.debug("Setting up sensors RPC plugin for node %s", nid)
                 node.upload_plugin("sensors", SENSORS_PLUGIN_CODE)
                 ctx.sensors_run_on.add(nid)
                 logger.debug("Start monitoring node %s", nid)
diff --git a/wally/sensors_rpc_plugin.py b/wally/sensors_rpc_plugin.py
deleted file mode 100644
index ffb0abd..0000000
--- a/wally/sensors_rpc_plugin.py
+++ /dev/null
@@ -1,687 +0,0 @@
-import os
-import sys
-import json
-import time
-import zlib
-import array
-import pprint
-import logging
-import threading
-import traceback
-import subprocess
-import collections
-
-
-import Pool  # type: ignore
-
-
-mod_name = "sensors"
-__version__ = (0, 1)
-
-
-logger = logging.getLogger("agent.sensors")
-SensorsMap = {}
-
-
-class Sensor(object):
-    def __init__(self, params, allowed=None, disallowed=None):
-        self.params = params
-        self.allowed = allowed
-        self.disallowed = disallowed
-        self.allowed_names = set()
-
-    def add_data(self, device, name, value):
-        pass
-
-    def collect(self):
-        pass
-
-    def get_updates(self):
-        pass
-
-    @classmethod
-    def unpack_results(cls, device, metric, data, typecode):
-        pass
-
-    def init(self):
-        pass
-
-    def stop(self):
-        pass
-
-
-class ArraysSensor(Sensor):
-    typecode = 'L'
-
-    def __init__(self, params, allowed=None, disallowed=None):
-        Sensor.__init__(self, params, allowed, disallowed)
-        self.data = collections.defaultdict(lambda: array.array(self.typecode))
-        self.prev_vals = {}
-
-    def add_data(self, device, name, value):
-        self.data[(device, name)].append(value)
-
-    def add_relative(self, device, name, value):
-        key = (device, name)
-        pval = self.prev_vals.get(key)
-        if pval is not None:
-            self.data[key].append(value - pval)
-        self.prev_vals[key] = value
-
-    def get_updates(self):
-        res = self.data
-        self.data = collections.defaultdict(lambda: array.array(self.typecode))
-        return {key: (arr.typecode, arr.tostring()) for key, arr in res.items()}
-
-    @classmethod
-    def unpack_results(cls, device, metric, packed, typecode):
-        arr = array.array(typecode)
-        if sys.version_info >= (3, 0, 0):
-            arr.frombytes(packed)
-        else:
-            arr.fromstring(packed)
-        return arr
-
-    def is_dev_accepted(self, name):
-        dev_ok = True
-
-        if self.disallowed is not None:
-            dev_ok = all(not name.startswith(prefix) for prefix in self.disallowed)
-
-        if dev_ok and self.allowed is not None:
-            dev_ok = any(name.startswith(prefix) for prefix in self.allowed)
-
-        return dev_ok
-
-
-time_array_typechar = ArraysSensor.typecode
-
-
-def provides(name):
-    def closure(cls):
-        SensorsMap[name] = cls
-        return cls
-    return closure
-
-
-def get_pid_list(disallowed_prefixes, allowed_prefixes):
-    """Return pid list from list of pids and names"""
-    # exceptions
-    disallowed = disallowed_prefixes if disallowed_prefixes is not None else []
-    if allowed_prefixes is None:
-        # if nothing setted - all ps will be returned except setted
-        result = [pid for pid in os.listdir('/proc')
-                  if pid.isdigit() and pid not in disallowed]
-    else:
-        result = []
-        for pid in os.listdir('/proc'):
-            if pid.isdigit() and pid not in disallowed:
-                name = get_pid_name(pid)
-                if pid in allowed_prefixes or any(name.startswith(val) for val in allowed_prefixes):
-                    # this is allowed pid?
-                    result.append(pid)
-    return result
-
-
-def get_pid_name(pid):
-    """Return name by pid"""
-    try:
-        with open(os.path.join('/proc/', pid, 'cmdline'), 'r') as pidfile:
-            try:
-                cmd = pidfile.readline().split()[0]
-                return os.path.basename(cmd).rstrip('\x00')
-            except IndexError:
-                # no cmd returned
-                return "<NO NAME>"
-    except IOError:
-        # upstream wait any string, no matter if we couldn't read proc
-        return "no_such_process"
-
-
-@provides("block-io")
-class BlockIOSensor(ArraysSensor):
-    #  1 - major number
-    #  2 - minor mumber
-    #  3 - device name
-    #  4 - reads completed successfully
-    #  5 - reads merged
-    #  6 - sectors read
-    #  7 - time spent reading (ms)
-    #  8 - writes completed
-    #  9 - writes merged
-    # 10 - sectors written
-    # 11 - time spent writing (ms)
-    # 12 - I/Os currently in progress
-    # 13 - time spent doing I/Os (ms)
-    # 14 - weighted time spent doing I/Os (ms)
-
-    SECTOR_SIZE = 512
-
-    io_values_pos = [
-        (3, 'reads_completed', True, 1),
-        (5, 'sectors_read', True, SECTOR_SIZE),
-        (6, 'rtime', True, 1),
-        (7, 'writes_completed', True, 1),
-        (9, 'sectors_written', True, SECTOR_SIZE),
-        (10, 'wtime', True, 1),
-        (11, 'io_queue', False, 1),
-        (13, 'io_time', True, 1)
-    ]
-
-    def __init__(self, *args, **kwargs):
-        ArraysSensor.__init__(self, *args, **kwargs)
-
-        if self.disallowed is None:
-            self.disallowed = ('ram', 'loop')
-
-        for line in open('/proc/diskstats'):
-            vals = line.split()
-            dev_name = vals[2]
-            if self.is_dev_accepted(dev_name) and not dev_name[-1].isdigit():
-                self.allowed_names.add(dev_name)
-
-        self.collect(init_rel=True)
-
-    def collect(self, init_rel=False):
-        for line in open('/proc/diskstats'):
-            vals = line.split()
-            dev_name = vals[2]
-
-            if dev_name not in self.allowed_names:
-                continue
-
-            for pos, name, aggregated, coef in self.io_values_pos:
-                vl = int(vals[pos]) * coef
-                if aggregated:
-                    self.add_relative(dev_name, name, vl)
-                elif not init_rel:
-                    self.add_data(dev_name, name, int(vals[pos]))
-
-
-@provides("net-io")
-class NetIOSensor(ArraysSensor):
-    #  1 - major number
-    #  2 - minor mumber
-    #  3 - device name
-    #  4 - reads completed successfully
-    #  5 - reads merged
-    #  6 - sectors read
-    #  7 - time spent reading (ms)
-    #  8 - writes completed
-    #  9 - writes merged
-    # 10 - sectors written
-    # 11 - time spent writing (ms)
-    # 12 - I/Os currently in progress
-    # 13 - time spent doing I/Os (ms)
-    # 14 - weighted time spent doing I/Os (ms)
-
-    net_values_pos = [
-        (0, 'recv_bytes', True),
-        (1, 'recv_packets', True),
-        (8, 'send_bytes', True),
-        (9, 'send_packets', True),
-    ]
-
-    def __init__(self, *args, **kwargs):
-        ArraysSensor.__init__(self, *args, **kwargs)
-
-        if self.disallowed is None:
-            self.disallowed = ('docker', 'lo')
-
-        if self.allowed is None:
-            self.allowed = ('eth',)
-
-        for _, _, aggregated in self.net_values_pos:
-            assert aggregated, "Non-aggregated values is not supported in net sensor"
-
-        for line in open('/proc/net/dev').readlines()[2:]:
-            dev_name, stats = line.split(":", 1)
-            dev_name = dev_name.strip()
-            if self.is_dev_accepted(dev_name):
-                self.allowed_names.add(dev_name)
-
-        self.collect(init_rel=True)
-
-    def collect(self, init_rel=False):
-        for line in open('/proc/net/dev').readlines()[2:]:
-            dev_name, stats = line.split(":", 1)
-            dev_name = dev_name.strip()
-            if dev_name in self.allowed_names:
-                vals = stats.split()
-                for pos, name, _ in self.net_values_pos:
-                    vl = int(vals[pos])
-                    self.add_relative(dev_name, name, vl )
-
-
-def pid_stat(pid):
-    """Return total cpu usage time from process"""
-    # read /proc/pid/stat
-    with open(os.path.join('/proc/', pid, 'stat'), 'r') as pidfile:
-        proctimes = pidfile.readline().split()
-    # get utime from /proc/<pid>/stat, 14 item
-    utime = proctimes[13]
-    # get stime from proc/<pid>/stat, 15 item
-    stime = proctimes[14]
-    # count total process used time
-    return float(int(utime) + int(stime))
-
-
-@provides("perprocess-cpu")
-class ProcCpuSensor(ArraysSensor):
-    def collect(self):
-        # TODO(koder): fixed list of PID's must be given
-        for pid in get_pid_list(self.disallowed, self.allowed):
-            try:
-                self.add_data(get_pid_name(pid), pid, pid_stat(pid))
-            except IOError:
-                # probably proc has already terminated, skip it
-                continue
-
-
-def get_mem_stats(pid):
-    """Return memory data of pid in format (private, shared)"""
-
-    fname = '/proc/{0}/{1}'.format(pid, "smaps")
-    lines = open(fname).readlines()
-
-    shared = 0
-    private = 0
-    pss = 0
-
-    # add 0.5KiB as this avg error due to truncation
-    pss_adjust = 0.5
-
-    for line in lines:
-        if line.startswith("Shared"):
-            shared += int(line.split()[1])
-
-        if line.startswith("Private"):
-            private += int(line.split()[1])
-
-        if line.startswith("Pss"):
-            pss += float(line.split()[1]) + pss_adjust
-
-    # Note Shared + Private = Rss above
-    # The Rss in smaps includes video card mem etc.
-
-    if pss != 0:
-        shared = int(pss - private)
-
-    return (private, shared)
-
-
-def get_ram_size():
-    """Return RAM size in Kb"""
-    with open("/proc/meminfo") as proc:
-        mem_total = proc.readline().split()
-    return int(mem_total[1])
-
-
-@provides("perprocess-ram")
-class ProcRamSensor(ArraysSensor):
-    def collect(self):
-        # TODO(koder): fixed list of PID's nust be given
-        for pid in get_pid_list(self.disallowed, self.allowed):
-            try:
-                dev_name = get_pid_name(pid)
-
-                private, shared = get_mem_stats(pid)
-                total = private + shared
-                sys_total = get_ram_size()
-                usage = float(total) / sys_total
-
-                sensor_name = "{0}({1})".format(dev_name, pid)
-
-                self.add_data(sensor_name, "private_mem", private)
-                self.add_data(sensor_name, "shared_mem", shared),
-                self.add_data(sensor_name, "used_mem", total),
-                self.add_data(sensor_name, "mem_usage_percent", int(usage * 100))
-            except IOError:
-                # permission denied or proc die
-                continue
-
-
-@provides("system-cpu")
-class SystemCPUSensor(ArraysSensor):
-    # 0 - cpu name
-    # 1 - user: normal processes executing in user mode
-    # 2 - nice: niced processes executing in user mode
-    # 3 - system: processes executing in kernel mode
-    # 4 - idle: twiddling thumbs
-    # 5 - iowait: waiting for I/O to complete
-    # 6 - irq: servicing interrupts
-    # 7 - softirq: servicing softirqs
-
-    cpu_values_pos = [
-        (1, 'user_processes', True),
-        (2, 'nice_processes', True),
-        (3, 'system_processes', True),
-        (4, 'idle_time', True),
-    ]
-
-    def collect(self):
-        # calculate core count
-        core_count = 0
-
-        for line in open('/proc/stat'):
-            vals = line.split()
-            dev_name = vals[0]
-
-            if dev_name == 'cpu':
-                for pos, name, _ in self.cpu_values_pos:
-                    self.add_data(dev_name, name, int(vals[pos]))
-            elif dev_name == 'procs_blocked':
-                self.add_data("cpu", "procs_blocked", int(vals[1]))
-            elif dev_name.startswith('cpu'):
-                core_count += 1
-
-        # procs in queue
-        TASKSPOS = 3
-        vals = open('/proc/loadavg').read().split()
-        ready_procs = vals[TASKSPOS].partition('/')[0]
-
-        # dec on current proc
-        procs_queue = (float(ready_procs) - 1) / core_count
-        self.add_data("cpu", "procs_queue_x10", int(procs_queue * 10))
-
-
-@provides("system-ram")
-class SystemRAMSensor(ArraysSensor):
-    # return this values or setted in allowed
-    ram_fields = ['MemTotal', 'MemFree', 'Buffers', 'Cached', 'SwapCached',
-                  'Dirty', 'Writeback', 'SwapTotal', 'SwapFree']
-
-    def __init__(self, *args, **kwargs):
-        ArraysSensor.__init__(self, *args, **kwargs)
-
-        if self.allowed is None:
-            self.allowed = self.ram_fields
-
-        self.allowed_fields = set()
-        for line in open('/proc/meminfo'):
-            field_name = line.split()[0].rstrip(":")
-            if self.is_dev_accepted(field_name):
-                self.allowed_fields.add(field_name)
-
-    def collect(self):
-        for line in open('/proc/meminfo'):
-            vals = line.split()
-            field = vals[0].rstrip(":")
-            if field in self.allowed_fields:
-                self.add_data("ram", field, int(vals[1]))
-
-
-try:
-    from ceph_daemon import admin_socket
-except ImportError:
-    admin_socket = None
-
-
-@provides("ceph")
-class CephSensor(ArraysSensor):
-
-    historic_duration = 2
-    historic_size = 200
-
-    def run_ceph_daemon_cmd(self, osd_id, *args):
-        asok = "/var/run/ceph/{}-osd.{}.asok".format(self.cluster, osd_id)
-        if admin_socket:
-            res = admin_socket(asok, args)
-        else:
-            res = subprocess.check_output("ceph daemon {} {}".format(asok, " ".join(args)), shell=True)
-
-        return res
-
-    def collect(self):
-        def get_val(dct, path):
-            if '/' in path:
-                root, next = path.split('/', 1)
-                return get_val(dct[root], next)
-            return dct[path]
-
-        for osd_id in self.params['osds']:
-            data = json.loads(self.run_ceph_daemon_cmd(osd_id, 'perf', 'dump'))
-            for key_name in self.params['counters']:
-                self.add_data("osd{}".format(osd_id), key_name.replace("/", "."), get_val(data, key_name))
-
-            if 'historic' in self.params.get('sources', {}):
-                self.historic.setdefault(osd_id, []).append(self.run_ceph_daemon_cmd(osd_id, "dump_historic_ops"))
-
-            if 'in_flight' in self.params.get('sources', {}):
-                self.in_flight.setdefault(osd_id, []).append(self.run_ceph_daemon_cmd(osd_id, "dump_ops_in_flight"))
-
-    def set_osd_historic(self, duration, keep, osd_id):
-        data = json.loads(self.run_ceph_daemon_cmd(osd_id, "dump_historic_ops"))
-        self.run_ceph_daemon_cmd(osd_id, "config set osd_op_history_duration {}".format(duration))
-        self.run_ceph_daemon_cmd(osd_id, "config set osd_op_history_size {}".format(keep))
-        return (data["duration to keep"], data["num to keep"])
-
-    def init(self):
-        self.cluster = self.params['cluster']
-        self.prev_vals = {}
-        self.historic = {}
-        self.in_flight = {}
-
-        if 'historic' in self.params.get('sources', {}):
-            for osd_id in self.params['osds']:
-                self.prev_vals[osd_id] = self.set_osd_historic(self.historic_duration, self.historic_size, osd_id)
-
-    def stop(self):
-        for osd_id, (duration, keep) in self.prev_vals.items():
-            self.prev_vals[osd_id] = self.set_osd_historic(duration, keep, osd_id)
-
-    def get_updates(self):
-        res = super().get_updates()
-
-        for osd_id, data in self.historic.items():
-            res[("osd{}".format(osd_id), "historic")] = (None, data)
-
-        self.historic = {}
-
-        for osd_id, data in self.in_flight.items():
-            res[("osd{}".format(osd_id), "in_flight")] = (None, data)
-
-        self.in_flight = {}
-
-        return res
-
-    @classmethod
-    def unpack_results(cls, device, metric, packed, typecode):
-        if metric in ('historic', 'in_flight'):
-            assert typecode is None
-            return packed
-
-        arr = array.array(typecode)
-        if sys.version_info >= (3, 0, 0):
-            arr.frombytes(packed)
-        else:
-            arr.fromstring(packed)
-
-        return arr
-
-
-class SensorsData(object):
-    def __init__(self):
-        self.cond = threading.Condition()
-        self.collected_at = array.array(time_array_typechar)
-        self.stop = False
-        self.sensors = {}
-        self.data_fd = None  # temporary file to store results
-        self.exception = None
-
-
-def collect(sensors_config):
-    curr = {}
-    for name, config in sensors_config.items():
-        params = {'config': config}
-
-        if "allow" in config:
-            params["allowed_prefixes"] = config["allow"]
-
-        if "disallow" in config:
-            params["disallowed_prefixes"] = config["disallow"]
-
-        curr[name] = SensorsMap[name](**params)
-    return curr
-
-
-def sensors_bg_thread(sensors_config, sdata):
-    try:
-        next_collect_at = time.time()
-        if "pool_sz" in sensors_config:
-            sensors_config = sensors_config.copy()
-            pool_sz = sensors_config.pop("pool_sz")
-        else:
-            pool_sz = 32
-
-        if pool_sz != 0:
-            pool = Pool(sensors_config.get("pool_sz"))
-        else:
-            pool = None
-
-        # prepare sensor classes
-        with sdata.cond:
-            sdata.sensors = {}
-            for name, config in sensors_config.items():
-                params = {'params': config}
-                logger.debug("Start sensor %r with config %r", name, config)
-
-                if "allow" in config:
-                    params["allowed_prefixes"] = config["allow"]
-
-                if "disallow" in config:
-                    params["disallowed_prefixes"] = config["disallow"]
-
-                sdata.sensors[name] = SensorsMap[name](**params)
-                sdata.sensors[name].init()
-
-            logger.debug("sensors.config = %s", pprint.pformat(sensors_config))
-            logger.debug("Sensors map keys %s", ", ".join(sdata.sensors.keys()))
-
-        # TODO: handle exceptions here
-        # main loop
-        while not sdata.stop:
-            dtime = next_collect_at - time.time()
-            if dtime > 0:
-                with sdata.cond:
-                    sdata.cond.wait(dtime)
-
-            next_collect_at += 1.0
-
-            if sdata.stop:
-                break
-
-            ctm = time.time()
-            with sdata.cond:
-                sdata.collected_at.append(int(ctm * 1000000))
-                if pool is not None:
-                    caller = lambda x: x()
-                    for ok, val in pool.map(caller, [sensor.collect for sensor in sdata.sensors.values()]):
-                        if not ok:
-                            raise val
-                else:
-                    for sensor in sdata.sensors.values():
-                        sensor.collect()
-                etm = time.time()
-                sdata.collected_at.append(int(etm * 1000000))
-                logger.debug("Add data to collected_at - %s, %s", ctm, etm)
-
-            if etm - ctm > 0.1:
-                # TODO(koder): need to signal that something in not really ok with sensor collecting
-                pass
-
-    except Exception:
-        logger.exception("In sensor BG thread")
-        sdata.exception = traceback.format_exc()
-    finally:
-        for sensor in sdata.sensors.values():
-            sensor.stop()
-
-
-sensors_thread = None
-sdata = None  # type: SensorsData
-
-
-def rpc_start(sensors_config):
-    global sensors_thread
-    global sdata
-
-    if array.array('L').itemsize != 8:
-        message = "Python array.array('L') items should be 8 bytes in size, not {}." + \
-                  " Can't provide sensors on this platform. Disable sensors in config and retry"
-        raise ValueError(message.format(array.array('L').itemsize))
-
-    if sensors_thread is not None:
-        raise ValueError("Thread already running")
-
-    sdata = SensorsData()
-    sensors_thread = threading.Thread(target=sensors_bg_thread, args=(sensors_config, sdata))
-    sensors_thread.daemon = True
-    sensors_thread.start()
-
-
-def unpack_rpc_updates(res_tuple):
-    offset_map, compressed_blob, compressed_collected_at_b = res_tuple
-    blob = zlib.decompress(compressed_blob)
-    collected_at_b = zlib.decompress(compressed_collected_at_b)
-    collected_at = array.array(time_array_typechar)
-    collected_at.frombytes(collected_at_b)
-    yield 'collected_at', collected_at
-
-    # TODO: data is unpacked/repacked here with no reason
-    for sensor_path, (offset, size, typecode) in offset_map.items():
-        sensor_path = sensor_path.decode("utf8")
-        sensor_name, device, metric = sensor_path.split('.', 2)
-        sensor_data = SensorsMap[sensor_name].unpack_results(device,
-                                                             metric,
-                                                             blob[offset:offset + size],
-                                                             typecode.decode("ascii"))
-        yield sensor_path, sensor_data
-
-
-def rpc_get_updates():
-    if sdata is None:
-        raise ValueError("No sensor thread running")
-
-    offset_map = collected_at = None
-    blob = ""
-
-    with sdata.cond:
-        if sdata.exception:
-            raise Exception(sdata.exception)
-
-        offset_map = {}
-        for sensor_name, sensor in sdata.sensors.items():
-            for (device, metric), (typecode, val) in sensor.get_updates().items():
-                offset_map["{}.{}.{}".format(sensor_name, device, metric)] = (len(blob), len(val), typecode)
-                blob += val
-
-        collected_at = sdata.collected_at
-        sdata.collected_at = array.array(sdata.collected_at.typecode)
-
-    logger.debug(str(collected_at))
-    return offset_map, zlib.compress(blob), zlib.compress(collected_at.tostring())
-
-
-def rpc_stop():
-    global sensors_thread
-    global sdata
-
-    if sensors_thread is None:
-        raise ValueError("No sensor thread running")
-
-    sdata.stop = True
-    with sdata.cond:
-        sdata.cond.notify_all()
-
-    sensors_thread.join()
-
-    if sdata.exception:
-        raise Exception(sdata.exception)
-
-    res = rpc_get_updates()
-
-    sensors_thread = None
-    sdata = None
-
-    return res
diff --git a/wally/sensors_rpc_plugin.pyi b/wally/sensors_rpc_plugin.pyi
deleted file mode 100644
index 21fe1d5..0000000
--- a/wally/sensors_rpc_plugin.pyi
+++ /dev/null
@@ -1,22 +0,0 @@
-from typing import NamedTuple, TypeVar, Callable, Any, Optional, List, Iterable, Dict, Tuple
-
-Pid = TypeVar('Pid', str)
-AnyFunc = TypeVar('AnyFunc', Callable[..., Any])
-PrefixList = Optional[List[str]]
-SensorDict = Dict[str, int]
-
-def provides(name: str) -> Callable[[AnyFunc], AnyFunc]: ...
-def is_dev_accepted(name: str, disallowed_prefixes: PrefixList, allowed_prefixes: PrefixList) -> bool: ...
-def get_pid_name(pid: Pid) -> str: ...
-def pid_stat(pid: Pid) -> float: ...
-def get_mem_stats(pid : Pid) -> Tuple[int, int]: ...
-def get_ram_size() -> int: ...
-
-def io_stat(disallowed_prefixes: PrefixList, allowed_prefixes: PrefixList) -> int: ...
-def net_stat(disallowed_prefixes: PrefixList, allowed_prefixes: PrefixList) -> int: ...
-def pscpu_stat(disallowed_prefixes: PrefixList, allowed_prefixes: PrefixList) -> int: ...
-def psram_stat(disallowed_prefixes: PrefixList, allowed_prefixes: PrefixList) -> int: ...
-def syscpu_stat(disallowed_prefixes: PrefixList, allowed_prefixes: PrefixList) -> int: ...
-def sysram_stat(disallowed_prefixes: PrefixList, allowed_prefixes: PrefixList) -> int: ...
-
-
diff --git a/wally/statistic.py b/wally/statistic.py
index 4ebfccc..7edff67 100644
--- a/wally/statistic.py
+++ b/wally/statistic.py
@@ -223,7 +223,7 @@
 
 def find_ouliers(data: numpy.array,
                  center_range: Tuple[int, int] = (25, 75),
-                 cut_range: float = 3) -> numpy.array:
+                 cut_range: float = 3.0) -> numpy.array:
     v1, v2 = numpy.percentile(data, center_range)
     return numpy.abs(data - (v1 + v2) / 2) > ((v2 - v1) / 2 * cut_range)
 
@@ -231,7 +231,7 @@
 def find_ouliers_ts(data: numpy.array,
                     windows_size: int = 30,
                     center_range: Tuple[int, int] = (25, 75),
-                    cut_range: float = 3) -> numpy.array:
+                    cut_range: float = 3.0) -> numpy.array:
     outliers = numpy.empty(data.shape, dtype=bool)
 
     if len(data) < windows_size:
diff --git a/wally/storage.py b/wally/storage.py
index c8edf5d..ab52e12 100644
--- a/wally/storage.py
+++ b/wally/storage.py
@@ -18,7 +18,6 @@
 import numpy
 
 from .common_types import IStorable
-from .utils import shape2str, str2shape
 
 
 logger = logging.getLogger("wally")
@@ -61,6 +60,43 @@
         pass
 
 
+class ITSStorage(metaclass=abc.ABCMeta):
+    """interface for low-level storage, which doesn't support serialization
+    and can operate only on bytes"""
+
+    @abc.abstractmethod
+    def put(self, value: bytes, path: str) -> None:
+        pass
+
+    @abc.abstractmethod
+    def get(self, path: str) -> bytes:
+        pass
+
+    @abc.abstractmethod
+    def rm(self, path: str) -> None:
+        pass
+
+    @abc.abstractmethod
+    def sync(self) -> None:
+        pass
+
+    @abc.abstractmethod
+    def __contains__(self, path: str) -> bool:
+        pass
+
+    @abc.abstractmethod
+    def get_fd(self, path: str, mode: str = "rb+") -> IO:
+        pass
+
+    @abc.abstractmethod
+    def sub_storage(self, path: str) -> 'ISimpleStorage':
+        pass
+
+    @abc.abstractmethod
+    def list(self, path: str) -> Iterator[Tuple[bool, str]]:
+        pass
+
+
 class ISerializer(metaclass=abc.ABCMeta):
     """Interface for serialization class"""
     @abc.abstractmethod
@@ -72,115 +108,13 @@
         pass
 
 
-class DBStorage(ISimpleStorage):
-
-    create_tb_sql = "CREATE TABLE IF NOT EXISTS wally_storage (key text, data blob, type text)"
-    insert_sql = "INSERT INTO wally_storage VALUES (?, ?, ?)"
-    update_sql = "UPDATE wally_storage SET data=?, type=? WHERE key=?"
-    select_sql = "SELECT data, type FROM wally_storage WHERE key=?"
-    contains_sql = "SELECT 1 FROM wally_storage WHERE key=?"
-    rm_sql = "DELETE FROM wally_storage WHERE key LIKE '{}%'"
-    list2_sql = "SELECT key, length(data), type FROM wally_storage"
-    SQLITE3_THREADSAFE = 1
-
-    def __init__(self, db_path: str = None, existing: bool = False,
-                 prefix: str = None, db: sqlite3.Connection = None) -> None:
-
-        assert not prefix or "'" not in prefix, "Broken sql prefix {!r}".format(prefix)
-
-        if db_path:
-            self.existing = existing
-            if existing:
-                if not os.path.isfile(db_path):
-                    raise IOError("No storage found at {!r}".format(db_path))
-
-            os.makedirs(os.path.dirname(db_path), exist_ok=True)
-            if sqlite3.threadsafety != self.SQLITE3_THREADSAFE:
-                raise RuntimeError("Sqlite3 compiled without threadsafe support, can't use DB storage on it")
-
-            try:
-                self.db = sqlite3.connect(db_path, check_same_thread=False)
-            except sqlite3.OperationalError as exc:
-                raise IOError("Can't open database at {!r}".format(db_path)) from exc
-
-            self.db.execute(self.create_tb_sql)
-        else:
-            if db is None:
-                raise ValueError("Either db or db_path parameter must be passed")
-            self.db = db
-
-        if prefix is None:
-            self.prefix = ""
-        elif not prefix.endswith('/'):
-            self.prefix = prefix + '/'
-        else:
-            self.prefix = prefix
-
-    def put(self, value: bytes, path: str) -> None:
-        c = self.db.cursor()
-        fpath = self.prefix + path
-        c.execute(self.contains_sql, (fpath,))
-        if len(c.fetchall()) == 0:
-            c.execute(self.insert_sql, (fpath, value, 'yaml'))
-        else:
-            c.execute(self.update_sql, (value, 'yaml', fpath))
-
-    def get(self, path: str) -> bytes:
-        c = self.db.cursor()
-        c.execute(self.select_sql, (self.prefix + path,))
-        res = cast(List[Tuple[bytes, str]], c.fetchall())  # type: List[Tuple[bytes, str]]
-        if not res:
-            raise KeyError(path)
-        assert len(res) == 1
-        val, tp = res[0]
-        assert tp == 'yaml'
-        return val
-
-    def rm(self, path: str) -> None:
-        c = self.db.cursor()
-        path = self.prefix + path
-        assert "'" not in path, "Broken sql path {!r}".format(path)
-        c.execute(self.rm_sql.format(path))
-
-    def __contains__(self, path: str) -> bool:
-        c = self.db.cursor()
-        path = self.prefix + path
-        c.execute(self.contains_sql, (self.prefix + path,))
-        return len(c.fetchall()) != 0
-
-    def print_tree(self):
-        c = self.db.cursor()
-        c.execute(self.list2_sql)
-        data = list(c.fetchall())
-        data.sort()
-        print("------------------ DB ---------------------")
-        for key, data_ln, type in data:
-            print(key, data_ln, type)
-        print("------------------ END --------------------")
-
-    def sub_storage(self, path: str) -> 'DBStorage':
-        return self.__class__(prefix=self.prefix + path, db=self.db)
-
-    def sync(self):
-        self.db.commit()
-
-    def get_fd(self, path: str, mode: str = "rb+") -> IO[bytes]:
-        raise NotImplementedError("SQLITE3 doesn't provide fd-like interface")
-
-    def list(self, path: str) -> Iterator[Tuple[bool, str]]:
-        raise NotImplementedError("SQLITE3 doesn't provide list method")
-
-
-DB_REL_PATH = "__db__.db"
-
-
 class FSStorage(ISimpleStorage):
     """Store all data in files on FS"""
 
     def __init__(self, root_path: str, existing: bool) -> None:
         self.root_path = root_path
         self.existing = existing
-        self.ignored = {self.j(DB_REL_PATH), '.', '..'}
+        self.ignored = {'.', '..'}
 
     def j(self, path: str) -> str:
         return os.path.join(self.root_path, path)
@@ -288,37 +222,31 @@
     pass
 
 
-csv_file_encoding = 'ascii'
-
-
 class Storage:
     """interface for storage"""
 
-    def __init__(self, fs_storage: ISimpleStorage, db_storage: ISimpleStorage, serializer: ISerializer) -> None:
-        self.fs = fs_storage
-        self.db = db_storage
+    def __init__(self, sstorage: ISimpleStorage, serializer: ISerializer) -> None:
+        self.sstorage = sstorage
         self.serializer = serializer
 
     def sub_storage(self, *path: str) -> 'Storage':
         fpath = "/".join(path)
-        return self.__class__(self.fs.sub_storage(fpath), self.db.sub_storage(fpath), self.serializer)
+        return self.__class__(self.sstorage.sub_storage(fpath), self.serializer)
 
     def put(self, value: Any, *path: str) -> None:
         dct_value = cast(IStorable, value).raw() if isinstance(value, IStorable) else value
         serialized = self.serializer.pack(dct_value)  # type: ignore
         fpath = "/".join(path)
-        self.db.put(serialized, fpath)
-        self.fs.put(serialized, fpath)
+        self.sstorage.put(serialized, fpath)
 
     def put_list(self, value: Iterable[IStorable], *path: str) -> None:
         serialized = self.serializer.pack([obj.raw() for obj in value])  # type: ignore
         fpath = "/".join(path)
-        self.db.put(serialized, fpath)
-        self.fs.put(serialized, fpath)
+        self.sstorage.put(serialized, fpath)
 
     def get(self, path: str, default: Any = _Raise) -> Any:
         try:
-            vl = self.db.get(path)
+            vl = self.sstorage.get(path)
         except:
             if default is _Raise:
                 raise
@@ -328,97 +256,30 @@
 
     def rm(self, *path: str) -> None:
         fpath = "/".join(path)
-        self.fs.rm(fpath)
-        self.db.rm(fpath)
+        self.sstorage.rm(fpath)
 
     def __contains__(self, path: str) -> bool:
-        return path in self.fs or path in self.db
+        return path in self.sstorage
 
     def put_raw(self, val: bytes, *path: str) -> str:
         fpath = "/".join(path)
-        self.fs.put(val, fpath)
+        self.sstorage.put(val, fpath)
         # TODO: dirty hack
         return self.resolve_raw(fpath)
 
     def resolve_raw(self, fpath) -> str:
-        return cast(FSStorage, self.fs).j(fpath)
+        return cast(FSStorage, self.sstorage).j(fpath)
 
     def get_raw(self, *path: str) -> bytes:
-        return self.fs.get("/".join(path))
+        return self.sstorage.get("/".join(path))
 
     def append_raw(self, value: bytes, *path: str) -> None:
-        with self.fs.get_fd("/".join(path), "rb+") as fd:
+        with self.sstorage.get_fd("/".join(path), "rb+") as fd:
             fd.seek(0, os.SEEK_END)
             fd.write(value)
 
     def get_fd(self, path: str, mode: str = "r") -> IO:
-        return self.fs.get_fd(path, mode)
-
-    def put_array(self, header: List[str], value: numpy.array, *path: str) -> None:
-        for val in header:
-            assert isinstance(val, str) and ',' not in val, \
-                "Can't convert {!r} to array header, as it's values contains comma".format(header)
-
-        fpath = "/".join(path)
-        with self.get_fd(fpath, "wb") as fd:
-            self.do_append(fd, header, value, fpath)
-
-    def get_array(self, *path: str) -> Tuple[List[str], numpy.array]:
-        path_s = "/".join(path)
-        with self.get_fd(path_s, "rb") as fd:
-            header = fd.readline().decode(csv_file_encoding).rstrip().split(",")
-            type_code, second_axis = header[-2:]
-            res = numpy.genfromtxt(fd, dtype=type_code, delimiter=',')
-
-        if '0' == second_axis:
-            res.shape = (len(res),)
-
-        return header[:-2], res
-
-    def append(self, header: List[str], value: numpy.array, *path: str) -> None:
-        for val in header:
-            assert isinstance(val, str) and ',' not in val, \
-                "Can't convert {!r} to array header, as it's values contains comma".format(header)
-
-        fpath = "/".join(path)
-        with self.get_fd(fpath, "cb") as fd:
-            self.do_append(fd, header, value, fpath, maybe_append=True)
-
-    def do_append(self, fd, header: List[str], value: numpy.array, path: str, fmt="%lu",
-                  maybe_append: bool = False) -> None:
-
-        if len(value.shape) == 1:
-            second_axis = 0
-        else:
-            second_axis = value.shape[1]
-        header += [value.dtype.name, str(second_axis)]
-
-        write_header = False
-
-        if maybe_append:
-            fd.seek(0, os.SEEK_END)
-            if fd.tell() != 0:
-                fd.seek(0, os.SEEK_SET)
-                # check header match
-                curr_header = fd.readline().decode(csv_file_encoding).rstrip().split(",")
-                assert header == curr_header, \
-                    "Path {!r}. Expected header ({!r}) and current header ({!r}) don't match"\
-                        .format(path, header, curr_header)
-                fd.seek(0, os.SEEK_END)
-            else:
-                write_header = True
-        else:
-            write_header = True
-
-        if write_header:
-            fd.write((",".join(header) + "\n").encode(csv_file_encoding))
-
-        if len(value.shape) == 1:
-            # make array vertical to simplify reading
-            vw = value.view().reshape((value.shape[0], 1))
-        else:
-            vw = value
-        numpy.savetxt(fd, vw, delimiter=',', newline="\n", fmt=fmt)
+        return self.sstorage.get_fd(path, mode)
 
     def load_list(self, obj_class: Type[ObjClass], *path: str) -> List[ObjClass]:
         path_s = "/".join(path)
@@ -431,8 +292,7 @@
         return cast(ObjClass, obj_class.fromraw(self.get(path_s)))
 
     def sync(self) -> None:
-        self.db.sync()
-        self.fs.sync()
+        self.sstorage.sync()
 
     def __enter__(self) -> 'Storage':
         return self
@@ -441,7 +301,7 @@
         self.sync()
 
     def list(self, *path: str) -> Iterator[Tuple[bool, str]]:
-        return self.fs.list("/".join(path))
+        return self.sstorage.list("/".join(path))
 
     def _iter_paths(self,
                     root: str,
@@ -472,7 +332,5 @@
 
 
 def make_storage(url: str, existing: bool = False) -> Storage:
-    return Storage(FSStorage(url, existing),
-                   DBStorage(os.path.join(url, DB_REL_PATH)),
-                   SAFEYAMLSerializer())
+    return Storage(FSStorage(url, existing), SAFEYAMLSerializer())