move even more code to cephlib
diff --git a/wally/ceph.py b/wally/ceph.py
index 89324c5..5c2a4f0 100644
--- a/wally/ceph.py
+++ b/wally/ceph.py
@@ -1,21 +1,17 @@
 """ Collect data about ceph nodes"""
-import os
 import logging
 from typing import Dict, cast, List, Set
 
-
-from .node_interfaces import NodeInfo, IRPCNode
-from .ssh_utils import ConnCreds
-from .common_types import IP
-from .stage import Stage, StepOrder
-from .test_run_class import TestRun
-from .ssh_utils import parse_ssh_uri
-from .node import connect, setup_rpc
-from .utils import StopTestError, to_ip
-
-
 from cephlib import discover
 from cephlib.discover import OSDInfo
+from cephlib.common import to_ip
+from cephlib.node import NodeInfo, IRPCNode
+from cephlib.ssh import ConnCreds, IP, parse_ssh_uri
+from cephlib.node_impl import connect, setup_rpc
+
+from .stage import Stage, StepOrder
+from .test_run_class import TestRun
+from .utils import StopTestError
 
 
 logger = logging.getLogger("wally")
@@ -57,6 +53,7 @@
         ceph = ctx.config.ceph
         root_node_uri = cast(str, ceph.root_node)
         cluster = ceph.get("cluster", "ceph")
+        ip_remap = ctx.config.ceph.get('ip_remap', {})
 
         conf = ceph.get("conf")
         key = ceph.get("key")
@@ -82,17 +79,6 @@
 
         ceph_params = {"cluster": cluster, "conf": conf, "key": key}
 
-        ip_remap = {
-            '10.8.0.4': '172.16.164.71',
-            '10.8.0.3': '172.16.164.72',
-            '10.8.0.2': '172.16.164.73',
-            '10.8.0.5': '172.16.164.74',
-            '10.8.0.6': '172.16.164.75',
-            '10.8.0.7': '172.16.164.76',
-            '10.8.0.8': '172.16.164.77',
-            '10.8.0.9': '172.16.164.78',
-        }
-
         with setup_rpc(connect(info), ctx.rpc_code, ctx.default_rpc_plugins,
                        log_level=ctx.config.rpc_log_level) as node:
 
@@ -101,7 +87,7 @@
             try:
                 ips = set()
                 for ip, osds_info in get_osds_info(node, ceph_extra_args, thcount=16).items():
-                    ip = ip_remap[ip]
+                    ip = ip_remap.get(ip, ip)
                     ips.add(ip)
                     # creds = ConnCreds(to_ip(cast(str, ip)), user="root", key=ssh_key)
                     creds = ConnCreds(to_ip(cast(str, ip)), user="root")
@@ -121,7 +107,7 @@
             try:
                 counter = 0
                 for counter, ip in enumerate(get_mons_ips(node, ceph_extra_args)):
-                    ip = ip_remap[ip]
+                    ip = ip_remap.get(ip, ip)
                     # creds = ConnCreds(to_ip(cast(str, ip)), user="root", key=ssh_key)
                     creds = ConnCreds(to_ip(cast(str, ip)), user="root")
                     info = ctx.merge_node(creds, {'ceph-mon'})
@@ -136,7 +122,7 @@
                     logger.warning("MON discovery failed %s", exc)
 
 
-def raw_dev_name(path):
+def raw_dev_name(path: str) -> str:
     if path.startswith("/dev/"):
         path = path[5:]
     while path and path[-1].isdigit():
@@ -152,8 +138,8 @@
         for node in ctx.nodes:
             if 'ceph_storage_devs' not in node.info.params:
                 if 'ceph-osd' in node.info.roles:
-                    jdevs = set()
-                    sdevs = set()
+                    jdevs = set()  # type: Set[str]
+                    sdevs = set()  # type: Set[str]
                     for osd_info in node.info.params['ceph-osds']:
                         for key, sset in [('journal', jdevs), ('storage', sdevs)]:
                             path = osd_info.get(key)
diff --git a/wally/common_types.py b/wally/common_types.py
deleted file mode 100644
index 4aedfaa..0000000
--- a/wally/common_types.py
+++ /dev/null
@@ -1,36 +0,0 @@
-from typing import Any, Dict, NamedTuple
-
-from cephlib.storage import IStorable
-
-
-IP = str
-IPAddr = NamedTuple("IPAddr", [("host", IP), ("port", int)])
-
-
-class ConnCreds(IStorable):
-    def __init__(self, host: str, user: str, passwd: str = None, port: str = '22',
-                 key_file: str = None, key: bytes = None) -> None:
-        self.user = user
-        self.passwd = passwd
-        self.addr = IPAddr(host, int(port))
-        self.key_file = key_file
-        self.key = key
-
-    def __str__(self) -> str:
-        return "{}@{}:{}".format(self.user, self.addr.host, self.addr.port)
-
-    def __repr__(self) -> str:
-        return str(self)
-
-    def raw(self) -> Dict[str, Any]:
-        return {
-            'user': self.user,
-            'host': self.addr.host,
-            'port': self.addr.port,
-            'passwd': self.passwd,
-            'key_file': self.key_file
-        }
-
-    @classmethod
-    def fromraw(cls, data: Dict[str, Any]) -> 'ConnCreds':
-        return cls(**data)
diff --git a/wally/console_report.py b/wally/console_report.py
index 1528089..6490329 100644
--- a/wally/console_report.py
+++ b/wally/console_report.py
@@ -7,7 +7,6 @@
 from cephlib import texttable
 from cephlib.statistic import calc_norm_stat_props, calc_histo_stat_props
 
-from .result_classes import IResultStorage
 from .stage import Stage, StepOrder
 from .test_run_class import TestRun
 from .suits.io.fio import FioTest
diff --git a/wally/data_selectors.py b/wally/data_selectors.py
index 3e6bc3e..795c66b 100644
--- a/wally/data_selectors.py
+++ b/wally/data_selectors.py
@@ -6,7 +6,7 @@
 from cephlib.numeric_types import DataSource, TimeSeries
 from cephlib.storage_selectors import c_interpolate_ts_on_seconds_border
 
-from .result_classes import IResultStorage
+from .result_classes import IWallyStorage
 from .suits.io.fio_hist import expected_lat_bins
 
 
@@ -33,12 +33,12 @@
 AGG_TAG = 'ALL'
 
 
-def find_all_series(rstorage: IResultStorage, suite_id: str, job_id: str, metric: str) -> Iterator[TimeSeries]:
+def find_all_series(rstorage: IWallyStorage, suite_id: str, job_id: str, metric: str) -> Iterator[TimeSeries]:
     "Iterated over selected metric for all nodes for given Suite/job"
     return (rstorage.get_ts(ds) for ds in rstorage.iter_ts(suite_id=suite_id, job_id=job_id, metric=metric))
 
 
-def get_aggregated(rstorage: IResultStorage, suite_id: str, job_id: str, metric: str,
+def get_aggregated(rstorage: IWallyStorage, suite_id: str, job_id: str, metric: str,
                    trange: Tuple[int, int]) -> TimeSeries:
     "Sum selected metric for all nodes for given Suite/job"
 
@@ -60,7 +60,7 @@
             raise ValueError(msg)
 
         if metric == 'lat' and (len(ts.data.shape) != 2 or ts.data.shape[1] != expected_lat_bins):
-            msg = "Sensor {}.{} on node %s has shape={}. Can only process sensors with shape=[X, {}].".format(
+            msg = "Sensor {}.{} on node {} has shape={}. Can only process sensors with shape=[X, {}].".format(
                          ts.source.dev, ts.source.sensor, ts.source.node_id, ts.data.shape, expected_lat_bins)
             logger.error(msg)
             raise ValueError(msg)
diff --git a/wally/fuel.py b/wally/fuel.py
index 3a11403..668be21 100644
--- a/wally/fuel.py
+++ b/wally/fuel.py
@@ -3,12 +3,14 @@
 
 from paramiko.ssh_exception import AuthenticationException
 
+from cephlib.common import parse_creds, to_ip
+from cephlib.ssh import ConnCreds
+from cephlib.node_impl import connect, setup_rpc
+
 from .fuel_rest_api import get_cluster_id, reflect_cluster, FuelInfo, KeystoneAuth
-from .ssh_utils import ConnCreds
-from .utils import StopTestError, parse_creds, to_ip
+from .utils import StopTestError
 from .stage import Stage, StepOrder
 from .test_run_class import TestRun
-from .node import connect, setup_rpc
 from .config import ConfigBlock
 from .openstack_api import OSCreds
 
diff --git a/wally/fuel_rest_api.py b/wally/fuel_rest_api.py
index 8907b81..b1cfc0c 100644
--- a/wally/fuel_rest_api.py
+++ b/wally/fuel_rest_api.py
@@ -261,6 +261,7 @@
     def __getattr__(self, name: str) -> List[Node]:
         if name in self.allowed_roles:
             return [node for node in self if name in node.roles]
+        return []
 
 
 class Cluster(RestObj):
diff --git a/wally/hw_info.py b/wally/hw_info.py
deleted file mode 100644
index bb857f2..0000000
--- a/wally/hw_info.py
+++ /dev/null
@@ -1,275 +0,0 @@
-import re
-import logging
-from typing import Dict, Any
-import xml.etree.ElementTree as ET
-from typing import List, Tuple, cast, Optional
-
-from cephlib.istorage import Storable
-from cephlib.common import b2ssize
-
-from .node_utils import get_os
-from .node_interfaces import IRPCNode
-
-
-logger = logging.getLogger("wally")
-
-
-def get_data(rr: str, data: str) -> str:
-    match_res = re.search("(?ims)" + rr, data)
-    return match_res.group(0)
-
-
-class HWInfo(Storable):
-    __ignore_fields__ = ['raw_xml']
-
-    def __init__(self) -> None:
-        self.hostname = None  # type: str
-        self.cores = []  # type: List[Tuple[str, int]]
-
-        # /dev/... devices
-        self.disks_info = {}  # type: Dict[str, Tuple[str, int]]
-
-        # real disks on raid controller
-        self.disks_raw_info = {}  # type: Dict[str, str]
-
-        # name => (speed, is_full_diplex, ip_addresses)
-        self.net_info = {}  # type: Dict[str, Tuple[Optional[int], Optional[bool], List[str]]]
-
-        self.ram_size = 0  # type: int
-        self.sys_name = None  # type: str
-        self.mb = None  # type: str
-        self.raw_xml = None  # type: Optional[str]
-
-        self.storage_controllers = []  # type: List[str]
-
-    def get_summary(self) -> Dict[str, int]:
-        cores = sum(count for _, count in self.cores)
-        disks = sum(size for _, size in self.disks_info.values())
-
-        return {'cores': cores,
-                'ram': self.ram_size,
-                'storage': disks,
-                'disk_count': len(self.disks_info)}
-
-    def __str__(self):
-        res = []
-
-        summ = self.get_summary()
-        summary = "Simmary: {cores} cores, {ram}B RAM, {disk}B storage"
-        res.append(summary.format(cores=summ['cores'],
-                                  ram=b2ssize(summ['ram']),
-                                  disk=b2ssize(summ['storage'])))
-        res.append(str(self.sys_name))
-        if self.mb:
-            res.append("Motherboard: " + self.mb)
-
-        if not self.ram_size:
-            res.append("RAM: Failed to get RAM size")
-        else:
-            res.append("RAM " + b2ssize(self.ram_size) + "B")
-
-        if not self.cores:
-            res.append("CPU cores: Failed to get CPU info")
-        else:
-            res.append("CPU cores:")
-            for name, count in self.cores:
-                if count > 1:
-                    res.append("    {0} * {1}".format(count, name))
-                else:
-                    res.append("    " + name)
-
-        if self.storage_controllers:
-            res.append("Disk controllers:")
-            for descr in self.storage_controllers:
-                res.append("    " + descr)
-
-        if self.disks_info:
-            res.append("Storage devices:")
-            for dev, (model, size) in sorted(self.disks_info.items()):
-                ssize = b2ssize(size) + "B"
-                res.append("    {0} {1} {2}".format(dev, ssize, model))
-        else:
-            res.append("Storage devices's: Failed to get info")
-
-        if self.disks_raw_info:
-            res.append("Disks devices:")
-            for dev, descr in sorted(self.disks_raw_info.items()):
-                res.append("    {0} {1}".format(dev, descr))
-        else:
-            res.append("Disks devices's: Failed to get info")
-
-        if self.net_info:
-            res.append("Net adapters:")
-            for name, (speed, dtype, _) in self.net_info.items():
-                res.append("    {0} {2} duplex={1}".format(name, dtype, speed))
-        else:
-            res.append("Net adapters: Failed to get net info")
-
-        return str(self.hostname) + ":\n" + "\n".join("    " + i for i in res)
-
-
-class SWInfo(Storable):
-    def __init__(self) -> None:
-        self.mtab = None  # type: str
-        self.kernel_version = None  # type: str
-        self.libvirt_version = None  # type: Optional[str]
-        self.qemu_version = None  # type: Optional[str]
-        self.os_version = None  # type: Tuple[str, ...]
-
-
-def get_sw_info(node: IRPCNode) -> SWInfo:
-    res = SWInfo()
-
-    res.os_version = tuple(get_os(node))
-    res.kernel_version = node.get_file_content('/proc/version').decode('utf8').strip()
-    res.mtab = node.get_file_content('/etc/mtab').decode('utf8').strip()
-
-    try:
-        res.libvirt_version = node.run("virsh -v", nolog=True).strip()
-    except OSError:
-        res.libvirt_version = None
-
-    # dpkg -l ??
-
-    try:
-        res.qemu_version = node.run("qemu-system-x86_64 --version", nolog=True).strip()
-    except OSError:
-        res.qemu_version = None
-
-    return res
-
-
-def get_hw_info(node: IRPCNode) -> Optional[HWInfo]:
-    try:
-        lshw_out = node.run('sudo lshw -xml 2>/dev/null')
-    except Exception as exc:
-        logger.warning("lshw failed on node %s: %s", node.node_id, exc)
-        return None
-
-    res = HWInfo()
-    res.raw_xml = lshw_out
-    lshw_et = ET.fromstring(lshw_out)
-
-    try:
-        res.hostname = cast(str, lshw_et.find("node").attrib['id'])
-    except Exception:
-        pass
-
-    try:
-
-        res.sys_name = cast(str, lshw_et.find("node/vendor").text) + " " + \
-            cast(str, lshw_et.find("node/product").text)
-        res.sys_name = res.sys_name.replace("(To be filled by O.E.M.)", "")
-        res.sys_name = res.sys_name.replace("(To be Filled by O.E.M.)", "")
-    except Exception:
-        pass
-
-    core = lshw_et.find("node/node[@id='core']")
-    if core is None:
-        return res
-
-    try:
-        res.mb = " ".join(cast(str, core.find(node).text)
-                          for node in ['vendor', 'product', 'version'])
-    except Exception:
-        pass
-
-    for cpu in core.findall("node[@class='processor']"):
-        try:
-            model = cast(str, cpu.find('product').text)
-            threads_node = cpu.find("configuration/setting[@id='threads']")
-            if threads_node is None:
-                threads = 1
-            else:
-                threads = int(threads_node.attrib['value'])
-            res.cores.append((model, threads))
-        except Exception:
-            pass
-
-    res.ram_size = 0
-    for mem_node in core.findall(".//node[@class='memory']"):
-        descr = mem_node.find('description')
-        try:
-            if descr is not None and descr.text == 'System Memory':
-                mem_sz = mem_node.find('size')
-                if mem_sz is None:
-                    for slot_node in mem_node.find("node[@class='memory']"):
-                        slot_sz = slot_node.find('size')
-                        if slot_sz is not None:
-                            assert slot_sz.attrib['units'] == 'bytes'
-                            res.ram_size += int(slot_sz.text)
-                else:
-                    assert mem_sz.attrib['units'] == 'bytes'
-                    res.ram_size += int(mem_sz.text)
-        except Exception:
-            pass
-
-    for net in core.findall(".//node[@class='network']"):
-        try:
-            link = net.find("configuration/setting[@id='link']")
-            if link.attrib['value'] == 'yes':
-                name = cast(str, net.find("logicalname").text)
-                speed_node = net.find("configuration/setting[@id='speed']")
-
-                if speed_node is None:
-                    speed = None
-                else:
-                    speed = int(speed_node.attrib['value'])
-
-                dup_node = net.find("configuration/setting[@id='duplex']")
-                if dup_node is None:
-                    dup = None
-                else:
-                    dup = cast(str, dup_node.attrib['value']).lower() == 'yes'
-
-                ips = []  # type: List[str]
-                res.net_info[name] = (speed, dup, ips)
-        except Exception:
-            pass
-
-    for controller in core.findall(".//node[@class='storage']"):
-        try:
-            description = getattr(controller.find("description"), 'text', "")
-            product = getattr(controller.find("product"), 'text', "")
-            vendor = getattr(controller.find("vendor"), 'text', "")
-            dev = getattr(controller.find("logicalname"), 'text', "")
-            if dev != "":
-                res.storage_controllers.append(
-                    "{0}: {1} {2} {3}".format(dev, description,
-                                              vendor, product))
-            else:
-                res.storage_controllers.append(
-                    "{0} {1} {2}".format(description,
-                                         vendor, product))
-        except Exception:
-            pass
-
-    for disk in core.findall(".//node[@class='disk']"):
-        try:
-            lname_node = disk.find('logicalname')
-            if lname_node is not None:
-                dev = cast(str, lname_node.text).split('/')[-1]
-
-                if dev == "" or dev[-1].isdigit():
-                    continue
-
-                sz_node = disk.find('size')
-                assert sz_node.attrib['units'] == 'bytes'
-                sz = int(sz_node.text)
-                res.disks_info[dev] = ('', sz)
-            else:
-                description = disk.find('description').text
-                product = disk.find('product').text
-                vendor = disk.find('vendor').text
-                version = disk.find('version').text
-                serial = disk.find('serial').text
-
-                full_descr = "{0} {1} {2} {3} {4}".format(
-                    description, product, vendor, version, serial)
-
-                businfo = cast(str, disk.find('businfo').text)
-                res.disks_raw_info[businfo] = full_descr
-        except Exception:
-            pass
-
-    return res
diff --git a/wally/main.py b/wally/main.py
index e43f1f9..66e39db 100644
--- a/wally/main.py
+++ b/wally/main.py
@@ -31,14 +31,15 @@
 
 from cephlib.common import setup_logging
 from cephlib.storage import make_storage
+from cephlib.ssh import set_ssh_key_passwd
+from cephlib.node import log_nodes_statistic
+from cephlib.node_impl import get_rpc_server_code
 
-from . import utils, node, report_profiles, report
-from .node_utils import log_nodes_statistic
+from . import utils, report_profiles, report
 from .config import Config
 from .stage import Stage
 from .test_run_class import TestRun
-from .ssh import set_ssh_key_passwd
-from .result_storage import ResultStorage
+from .result_storage import WallyStorage
 
 # stages
 from .ceph import DiscoverCephStage, CollectCephInfoStage
@@ -280,7 +281,6 @@
 
     elif opts.subparser_name == 'ls':
         tab = Texttable(max_width=200)
-        tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
         tab.set_cols_align(["l", "l", "l", "l"])
         tab.header(["Name", "Tests", "Run at", "Comment"])
         tab.add_rows(list_results(opts.result_storage))
@@ -317,7 +317,7 @@
         return 0
     elif opts.subparser_name == 'ipython':
         storage = make_storage(opts.storage_dir, existing=True)
-        rstorage = ResultStorage(storage=storage)
+        rstorage = WallyStorage(storage=storage)
 
         import IPython
         IPython.embed()
@@ -343,8 +343,8 @@
 
     logger.info("All info would be stored into %r", config.storage_url)
 
-    ctx = TestRun(config, storage, ResultStorage(storage))
-    ctx.rpc_code, ctx.default_rpc_plugins = node.get_rpc_server_code()
+    ctx = TestRun(config, storage, WallyStorage(storage))
+    ctx.rpc_code, ctx.default_rpc_plugins = get_rpc_server_code()
 
     if opts.ssh_key_passwd is not None:
         set_ssh_key_passwd(opts.ssh_key_passwd)
@@ -398,8 +398,14 @@
     logger.info("All info is stored into %r", config.storage_url)
 
     if failed or cleanup_failed:
-        logger.error("Tests are failed. See error details in log above")
+        if opts.subparser_name == 'report':
+            logger.error("Report generation failed. See error details in log above")
+        else:
+            logger.error("Tests are failed. See error details in log above")
         return 1
     else:
-        logger.info("Tests finished successfully")
+        if opts.subparser_name == 'report':
+            logger.info("Report successfully generated")
+        else:
+            logger.info("Tests finished successfully")
         return 0
diff --git a/wally/node.py b/wally/node.py
deleted file mode 100644
index a662f76..0000000
--- a/wally/node.py
+++ /dev/null
@@ -1,359 +0,0 @@
-import os
-import zlib
-import time
-import json
-import socket
-import logging
-import tempfile
-import subprocess
-from typing import Union, cast, Optional, Tuple, Dict
-
-from agent import agent
-import paramiko
-
-from .node_interfaces import IRPCNode, NodeInfo, ISSHHost
-from .ssh import connect as ssh_connect
-
-
-logger = logging.getLogger("wally")
-
-
-class SSHHost(ISSHHost):
-    def __init__(self, conn: paramiko.SSHClient, info: NodeInfo) -> None:
-        self.conn = conn
-        self.info = info
-
-    def __str__(self) -> str:
-        return self.node_id
-
-    @property
-    def node_id(self) -> str:
-        return self.info.node_id
-
-    def put_to_file(self, path: Optional[str], content: bytes) -> str:
-        if path is None:
-            path = self.run("mktemp", nolog=True).strip()
-
-        logger.debug("PUT %s bytes to %s", len(content), path)
-
-        with self.conn.open_sftp() as sftp:
-            with sftp.open(path, "wb") as fd:
-                fd.write(content)
-
-        return path
-
-    def disconnect(self):
-        self.conn.close()
-
-    def run(self, cmd: str, timeout: int = 60, nolog: bool = False) -> str:
-        if not nolog:
-            logger.debug("SSH:{0} Exec {1!r}".format(self, cmd))
-
-        transport = self.conn.get_transport()
-        session = transport.open_session()
-
-        try:
-            session.set_combine_stderr(True)
-            stime = time.time()
-
-            session.exec_command(cmd)
-            session.settimeout(1)
-            session.shutdown_write()
-            output = ""
-
-            while True:
-                try:
-                    ndata = session.recv(1024).decode("utf-8")
-                    if not ndata:
-                        break
-                    output += ndata
-                except socket.timeout:
-                    pass
-
-                if time.time() - stime > timeout:
-                    raise OSError(output + "\nExecution timeout")
-
-            code = session.recv_exit_status()
-        finally:
-            found = False
-
-            if found:
-                session.close()
-
-        if code != 0:
-            templ = "SSH:{0} Cmd {1!r} failed with code {2}. Output: {3}"
-            raise OSError(templ.format(self, cmd, code, output))
-
-        return output
-
-
-class LocalHost(ISSHHost):
-    def __str__(self):
-        return "<Local>"
-
-    def get_ip(self) -> str:
-        return 'localhost'
-
-    def put_to_file(self, path: Optional[str], content: bytes) -> str:
-        if path is None:
-            fd, path = tempfile.mkstemp(text=False)
-            os.close(fd)
-        else:
-            dir_name = os.path.dirname(path)
-            os.makedirs(dir_name, exist_ok=True)
-
-        with open(path, "wb") as fd2:
-            fd2.write(content)
-
-        return path
-
-    def run(self, cmd: str, timeout: int = 60, nolog: bool = False) -> str:
-        proc = subprocess.Popen(cmd, shell=True,
-                                stdin=subprocess.PIPE,
-                                stdout=subprocess.PIPE,
-                                stderr=subprocess.STDOUT)
-
-        stdout_data_b, _ = proc.communicate()
-        stdout_data = stdout_data_b.decode("utf8")
-
-        if proc.returncode != 0:
-            templ = "SSH:{0} Cmd {1!r} failed with code {2}. Output: {3}"
-            raise OSError(templ.format(self, cmd, proc.returncode, stdout_data))
-
-        return stdout_data
-
-    def disconnect(self):
-        pass
-
-
-def get_rpc_server_code() -> Tuple[bytes, Dict[str, bytes]]:
-    # setup rpc data
-    if agent.__file__.endswith(".pyc"):
-        path = agent.__file__[:-1]
-    else:
-        path = agent.__file__
-
-    master_code = open(path, "rb").read()
-
-    plugins = {}  # type: Dict[str, bytes]
-    cli_path = os.path.join(os.path.dirname(path), "cli_plugin.py")
-    plugins["cli"] = open(cli_path, "rb").read()
-
-    fs_path = os.path.join(os.path.dirname(path), "fs_plugin.py")
-    plugins["fs"] = open(fs_path, "rb").read()
-
-    return master_code, plugins
-
-
-def connect(info: Union[str, NodeInfo], conn_timeout: int = 60) -> ISSHHost:
-    if info == 'local':
-        return LocalHost()
-    else:
-        info_c = cast(NodeInfo, info)
-        return SSHHost(ssh_connect(info_c.ssh_creds, conn_timeout), info_c)
-
-
-class RPCNode(IRPCNode):
-    """Node object"""
-
-    def __init__(self, conn: agent.SimpleRPCClient, info: NodeInfo) -> None:
-        self.info = info
-        self.conn = conn
-
-    def __str__(self) -> str:
-        return "Node({!r})".format(self.info)
-
-    def __repr__(self) -> str:
-        return str(self)
-
-    @property
-    def node_id(self) -> str:
-        return self.info.node_id
-
-    def get_file_content(self, path: str, expanduser: bool = False, compress: bool = True) -> bytes:
-        logger.debug("GET %s from %s", path, self.info)
-        if expanduser:
-            path = self.conn.fs.expanduser(path)
-        res = self.conn.fs.get_file(path, compress)
-        logger.debug("Download %s bytes from remote file %s from %s", len(res), path, self.info)
-        if compress:
-            res = zlib.decompress(res)
-        return res
-
-    def run(self, cmd: str, timeout: int = 60, nolog: bool = False, check_timeout: float = 0.01) -> str:
-        if not nolog:
-            logger.debug("Node %s - run %s", self.node_id, cmd)
-
-        cmd_b = cmd.encode("utf8")
-        proc_id = self.conn.cli.spawn(cmd_b, timeout=timeout, merge_out=True)
-        out = ""
-
-        while True:
-            code, outb, _ = self.conn.cli.get_updates(proc_id)
-            out += outb.decode("utf8")
-            if code is not None:
-                break
-            time.sleep(check_timeout)
-
-        if code != 0:
-            templ = "Node {} - cmd {!r} failed with code {}. Output: {!r}."
-            raise OSError(templ.format(self.node_id, cmd, code, out))
-
-        return out
-
-    def copy_file(self, local_path: str, remote_path: str = None,
-                  expanduser: bool = False,
-                  compress: bool = False) -> str:
-
-        if expanduser:
-            remote_path = self.conn.fs.expanduser(remote_path)
-
-        data = open(local_path, 'rb').read()  # type: bytes
-        return self.put_to_file(remote_path, data, compress=compress)
-
-    def put_to_file(self, path: Optional[str], content: bytes, expanduser: bool = False, compress: bool = False) -> str:
-        if expanduser:
-            path = self.conn.fs.expanduser(path)
-        if compress:
-            content = zlib.compress(content)
-        return self.conn.fs.store_file(path, content, compress)
-
-    def stat_file(self, path: str, expanduser: bool = False) -> Dict[str, int]:
-        if expanduser:
-            path = self.conn.fs.expanduser(path)
-        return self.conn.fs.file_stat(path)
-
-    def __exit__(self, x, y, z) -> bool:
-        self.disconnect(stop=True)
-        return False
-
-    def upload_plugin(self, name: str, code: bytes, version: str = None) -> None:
-        self.conn.server.load_module(name, version, code)
-
-    def disconnect(self, stop: bool = False) -> None:
-        if stop:
-            logger.debug("Stopping RPC server on %s", self.info)
-            self.conn.server.stop()
-
-        logger.debug("Disconnecting from %s", self.info)
-        self.conn.disconnect()
-        self.conn = None
-
-
-def get_node_python_27(node: ISSHHost) -> Optional[str]:
-    python_cmd = None  # type: Optional[str]
-    try:
-        python_cmd = node.run('which python2.7').strip()
-    except Exception as exc:
-        pass
-
-    if python_cmd is None:
-        try:
-            if '2.7' in node.run('python --version'):
-                python_cmd = node.run('which python').strip()
-        except Exception as exc:
-            pass
-
-    return python_cmd
-
-
-def setup_rpc(node: ISSHHost,
-              rpc_server_code: bytes,
-              plugins: Dict[str, bytes] = None,
-              port: int = 0,
-              log_level: str = None) -> IRPCNode:
-
-    logger.debug("Setting up RPC connection to {}".format(node.info))
-    python_cmd = get_node_python_27(node)
-    if python_cmd:
-        logger.debug("python2.7 on node {} path is {}".format(node.info, python_cmd))
-    else:
-        logger.error(("Can't find python2.7 on node {}. " +
-                      "Install python2.7 and rerun test").format(node.info))
-        raise ValueError("Python not found")
-
-    code_file = node.put_to_file(None, rpc_server_code)
-    ip = node.info.ssh_creds.addr.host
-
-    log_file = None  # type: Optional[str]
-    if log_level:
-        log_file = node.run("mktemp", nolog=True).strip()
-        cmd = "{} {} --log-level={} server --listen-addr={}:{} --daemon --show-settings"
-        cmd = cmd.format(python_cmd, code_file, log_level, ip, port) + " --stdout-file={}".format(log_file)
-        logger.info("Agent logs for node {} stored remotely in file {}, log level is {}".format(
-            node.node_id, log_file, log_level))
-    else:
-        cmd = "{} {} --log-level=CRITICAL server --listen-addr={}:{} --daemon --show-settings"
-        cmd = cmd.format(python_cmd, code_file, ip, port)
-
-    params_js = node.run(cmd).strip()
-    params = json.loads(params_js)
-
-    node.info.params.update(params)
-
-    port = int(params['addr'].split(":")[1])
-    rpc_conn = agent.connect((ip, port))
-
-    rpc_node = RPCNode(rpc_conn, node.info)
-    rpc_node.rpc_log_file = log_file
-
-    if plugins is not None:
-        try:
-            for name, code in plugins.items():
-                rpc_node.upload_plugin(name, code)
-        except Exception:
-            rpc_node.disconnect(True)
-            raise
-
-    return rpc_node
-
-
-        # class RemoteNode(node_interfaces.IRPCNode):
-#     def __init__(self, node_info: node_interfaces.NodeInfo, rpc_conn: agent.RPCClient):
-#         self.info = node_info
-#         self.rpc = rpc_conn
-#
-    # def get_interface(self, ip: str) -> str:
-    #     """Get node external interface for given IP"""
-    #     data = self.run("ip a", nolog=True)
-    #     curr_iface = None
-    #
-    #     for line in data.split("\n"):
-    #         match1 = re.match(r"\d+:\s+(?P<name>.*?):\s\<", line)
-    #         if match1 is not None:
-    #             curr_iface = match1.group('name')
-    #
-    #         match2 = re.match(r"\s+inet\s+(?P<ip>[0-9.]+)/", line)
-    #         if match2 is not None:
-    #             if match2.group('ip') == ip:
-    #                 assert curr_iface is not None
-    #                 return curr_iface
-    #
-    #     raise KeyError("Can't found interface for ip {0}".format(ip))
-    #
-    # def get_user(self) -> str:
-    #     """"get ssh connection username"""
-    #     if self.ssh_conn_url == 'local':
-    #         return getpass.getuser()
-    #     return self.ssh_cred.user
-    #
-    #
-    # def run(self, cmd: str, stdin_data: str = None, timeout: int = 60, nolog: bool = False) -> Tuple[int, str]:
-    #     """Run command on node. Will use rpc connection, if available"""
-    #
-    #     if self.rpc_conn is None:
-    #         return run_over_ssh(self.ssh_conn, cmd,
-    #                             stdin_data=stdin_data, timeout=timeout,
-    #                             nolog=nolog, node=self)
-    #     assert not stdin_data
-    #     proc_id = self.rpc_conn.cli.spawn(cmd)
-    #     exit_code = None
-    #     output = ""
-    #
-    #     while exit_code is None:
-    #         exit_code, stdout_data, stderr_data = self.rpc_conn.cli.get_updates(proc_id)
-    #         output += stdout_data + stderr_data
-    #
-    #     return exit_code, output
-
-
diff --git a/wally/node_interfaces.py b/wally/node_interfaces.py
deleted file mode 100644
index 703ef71..0000000
--- a/wally/node_interfaces.py
+++ /dev/null
@@ -1,144 +0,0 @@
-import abc
-import logging
-from typing import Any, Set, Dict, Optional, NamedTuple
-
-from cephlib.istorage import IStorable
-
-from .ssh_utils import ConnCreds
-from .common_types import IPAddr
-
-
-RPCCreds = NamedTuple("RPCCreds", [("addr", IPAddr), ("key_file", str), ("cert_file", str)])
-logger = logging.getLogger("wally")
-
-
-class NodeInfo(IStorable):
-    """Node information object, result of discovery process or config parsing"""
-    def __init__(self, ssh_creds: ConnCreds, roles: Set[str], params: Dict[str, Any] = None) -> None:
-        # ssh credentials
-        self.ssh_creds = ssh_creds
-
-        # credentials for RPC connection
-        self.rpc_creds = None  # type: Optional[RPCCreds]
-
-        self.roles = roles
-        self.os_vm_id = None  # type: Optional[int]
-        self.params = {}  # type: Dict[str, Any]
-        if params is not None:
-            self.params = params
-
-    @property
-    def node_id(self) -> str:
-        return "{0.host}:{0.port}".format(self.ssh_creds.addr)
-
-    def __str__(self) -> str:
-        return self.node_id
-
-    def __repr__(self) -> str:
-        return str(self)
-
-    def raw(self) -> Dict[str, Any]:
-        dct = self.__dict__.copy()
-
-        if self.rpc_creds is not None:
-            dct['rpc_creds'] = list(self.rpc_creds)
-
-        dct['ssh_creds'] = self.ssh_creds.raw()
-        dct['roles'] = list(self.roles)
-        return dct
-
-    @classmethod
-    def fromraw(cls, data: Dict[str, Any]) -> 'NodeInfo':
-        data = data.copy()
-        if data['rpc_creds'] is not None:
-            data['rpc_creds'] = RPCCreds(*data['rpc_creds'])
-
-        data['ssh_creds'] = ConnCreds.fromraw(data['ssh_creds'])
-        data['roles'] = set(data['roles'])
-        obj = cls.__new__(cls)  # type: ignore
-        obj.__dict__.update(data)
-        return obj
-
-
-class ISSHHost(metaclass=abc.ABCMeta):
-    """Minimal interface, required to setup RPC connection"""
-    info = None  # type: NodeInfo
-
-    @abc.abstractmethod
-    def run(self, cmd: str, timeout: int = 60, nolog: bool = False) -> str:
-        pass
-
-    @abc.abstractmethod
-    def __str__(self) -> str:
-        pass
-
-    @abc.abstractmethod
-    def disconnect(self) -> None:
-        pass
-
-    @abc.abstractmethod
-    def put_to_file(self, path: Optional[str], content: bytes) -> str:
-        pass
-
-    def __enter__(self) -> 'ISSHHost':
-        return self
-
-    def __exit__(self, x, y, z) -> bool:
-        self.disconnect()
-        return False
-
-    @property
-    def node_id(self) -> str:
-        return self.info.node_id
-
-
-class IRPCNode(metaclass=abc.ABCMeta):
-    """Remote filesystem interface"""
-    info = None  # type: NodeInfo
-    conn = None  # type: Any
-    rpc_log_file = None  # type: str
-
-    @property
-    def node_id(self) -> str:
-        return self.info.node_id
-
-    @abc.abstractmethod
-    def __str__(self) -> str:
-        pass
-
-    @abc.abstractmethod
-    def run(self, cmd: str, timeout: int = 60, nolog: bool = False, check_timeout: float = 0.01) -> str:
-        pass
-
-    @abc.abstractmethod
-    def copy_file(self, local_path: str, remote_path: str = None,
-                  expanduser: bool = False, compress: bool = False) -> str:
-        pass
-
-    @abc.abstractmethod
-    def get_file_content(self, path: str, expanduser: bool = False, compress: bool = False) -> bytes:
-        pass
-
-    @abc.abstractmethod
-    def put_to_file(self, path: Optional[str], content: bytes, expanduser: bool = False, compress: bool = False) -> str:
-        pass
-
-    @abc.abstractmethod
-    def stat_file(self, path:str, expanduser: bool = False) -> Any:
-        pass
-
-    @abc.abstractmethod
-    def disconnect(self) -> None:
-        pass
-
-    @abc.abstractmethod
-    def upload_plugin(self, name: str, code: bytes, version: str = None) -> None:
-        pass
-
-    def __enter__(self) -> 'IRPCNode':
-        return self
-
-    def __exit__(self, x, y, z) -> bool:
-        self.disconnect()
-        return False
-
diff --git a/wally/node_utils.py b/wally/node_utils.py
deleted file mode 100644
index e66ba44..0000000
--- a/wally/node_utils.py
+++ /dev/null
@@ -1,56 +0,0 @@
-import logging
-import collections
-from typing import Dict, Sequence, NamedTuple
-
-from .node_interfaces import IRPCNode
-
-logger = logging.getLogger("wally")
-
-
-def log_nodes_statistic(nodes: Sequence[IRPCNode]) -> None:
-    logger.info("Found {0} nodes total".format(len(nodes)))
-
-    per_role = collections.defaultdict(int)  # type: Dict[str, int]
-    for node in nodes:
-        for role in node.info.roles:
-            per_role[role] += 1
-
-    for role, count in sorted(per_role.items()):
-        logger.debug("Found {0} nodes with role {1}".format(count, role))
-
-
-
-OSRelease = NamedTuple("OSRelease",
-                       [("distro", str),
-                        ("release", str),
-                        ("arch", str)])
-
-
-def get_os(node: IRPCNode) -> OSRelease:
-    """return os type, release and architecture for node.
-    """
-    arch = node.run("arch", nolog=True).strip()
-
-    try:
-        node.run("ls -l /etc/redhat-release", nolog=True)
-        return OSRelease('redhat', None, arch)
-    except:
-        pass
-
-    try:
-        node.run("ls -l /etc/debian_version", nolog=True)
-
-        release = None
-        for line in node.run("lsb_release -a", nolog=True).split("\n"):
-            if ':' not in line:
-                continue
-            opt, val = line.split(":", 1)
-
-            if opt == 'Codename':
-                release = val.strip()
-
-        return OSRelease('ubuntu', release, arch)
-    except:
-        pass
-
-    raise RuntimeError("Unknown os")
diff --git a/wally/openstack.py b/wally/openstack.py
index 4048207..16b9db6 100644
--- a/wally/openstack.py
+++ b/wally/openstack.py
@@ -1,13 +1,13 @@
 import os.path
 import socket
 import logging
-from typing import Dict, Any, List, Tuple, cast
+from typing import Dict, Any, List, Tuple, cast, Optional
 
 from cephlib.common import to_ip
+from cephlib.node import NodeInfo
+from cephlib.ssh import ConnCreds
 
-from .node_interfaces import NodeInfo
 from .config import ConfigBlock, Config
-from .ssh_utils import ConnCreds
 from .openstack_api import (os_connect, find_vms,
                             OSCreds, get_openstack_credentials, prepare_os, launch_vms, clear_nodes)
 from .test_run_class import TestRun
diff --git a/wally/openstack_api.py b/wally/openstack_api.py
index 302d898..257a7de 100644
--- a/wally/openstack_api.py
+++ b/wally/openstack_api.py
@@ -17,9 +17,8 @@
 from glanceclient import Client as GlanceClient
 
 from cephlib.common import Timeout, to_ip
-
-from .node_interfaces import NodeInfo
-from .ssh_utils import ConnCreds
+from cephlib.node import NodeInfo
+from cephlib.ssh import ConnCreds
 
 
 __doc__ = """
diff --git a/wally/ops_log.py b/wally/ops_log.py
deleted file mode 100644
index 48f2b61..0000000
--- a/wally/ops_log.py
+++ /dev/null
@@ -1,9 +0,0 @@
-from typing import Any
-
-
-log = []
-
-
-def log_op(name: str, *params: Any) -> None:
-    log.append([name] + list(params))
-
diff --git a/wally/plot.py b/wally/plot.py
index 857a594..f809d19 100644
--- a/wally/plot.py
+++ b/wally/plot.py
@@ -3,16 +3,9 @@
 
 import numpy
 
-# to make seaborn styles available
-import warnings
-with warnings.catch_warnings():
-    warnings.simplefilter("ignore")
-    import seaborn
-
 from cephlib.units import unit_conversion_coef_f
 from cephlib.plot import PlotParams, provide_plot
 
-from .report_profiles import StyleProfile
 from .resources import IOSummary
 
 
@@ -143,7 +136,7 @@
 
     # override some styles
     pp.fig.set_size_inches(*pp.style.qd_chart_inches)
-    pp.fig.subplots_adjust(right=StyleProfile.subplot_adjust_r)
+    pp.fig.subplots_adjust(right=pp.style.subplot_adjust_r)
 
     if pp.style.extra_io_spine:
         ax3.grid(False)
diff --git a/wally/report.py b/wally/report.py
index d37ac60..9615461 100644
--- a/wally/report.py
+++ b/wally/report.py
@@ -2,7 +2,7 @@
 import abc
 import logging
 from collections import defaultdict
-from typing import Dict, Any, Iterator, Tuple, cast, List, Set, Optional, Union
+from typing import Dict, Any, Iterator, Tuple, cast, List, Set, Optional, Union, Type
 
 import numpy
 from statsmodels.tsa.stattools import adfuller
@@ -16,11 +16,13 @@
 from cephlib.statistic import calc_norm_stat_props
 from cephlib.storage_selectors import summ_sensors, find_sensors_to_2d
 from cephlib.wally_storage import find_nodes_by_roles
+from cephlib.plot import (plot_simple_bars, plot_hmap_from_2d, plot_lat_over_time, plot_simple_over_time,
+                          plot_histo_heatmap, plot_v_over_time, plot_hist)
 
 from .utils import STORAGE_ROLES
 from .stage import Stage, StepOrder
 from .test_run_class import TestRun
-from .result_classes import IResultStorage
+from .result_classes import IWallyStorage
 from .result_classes import DataSource, TimeSeries, SuiteConfig
 from .suits.io.fio import FioTest, FioJobConfig
 from .suits.io.fio_job import FioJobParams
@@ -28,9 +30,10 @@
 from .data_selectors import get_aggregated, AGG_TAG
 from .report_profiles import (DefStyleProfile, DefColorProfile, StyleProfile, ColorProfile,
                               default_format, io_chart_format)
-from .plot import (io_chart, plot_simple_bars, plot_hmap_from_2d, plot_lat_over_time, plot_simple_over_time,
-                   plot_histo_heatmap, plot_v_over_time, plot_hist)
+from .plot import io_chart
 from .resources import ResourceNames, get_resources_usage, make_iosum, IOSummary, get_cluster_cpu_load
+
+
 logger = logging.getLogger("wally")
 
 
@@ -111,12 +114,12 @@
 class Table:
     def __init__(self, header: List[str]) -> None:
         self.header = header
-        self.data = []
+        self.data = []  # type: List[List[str]]
 
     def add_line(self, values: List[str]) -> None:
         self.data.append(values)
 
-    def html(self):
+    def html(self) -> str:
         return html.table("", self.header, self.data)
 
 
@@ -143,7 +146,7 @@
 #  --------------------  REPORTS  --------------------------------------------------------------------------------------
 
 class ReporterBase:
-    def __init__(self, rstorage: IResultStorage, style: StyleProfile, colors: ColorProfile) -> None:
+    def __init__(self, rstorage: IWallyStorage, style: StyleProfile, colors: ColorProfile) -> None:
         self.style = style
         self.colors = colors
         self.rstorage = rstorage
@@ -193,7 +196,7 @@
 
     def get_divs(self, suite: SuiteConfig) -> Iterator[Tuple[str, str, HTMLBlock]]:
         ts_map = defaultdict(list)  # type: Dict[FioJobParams, List[Tuple[SuiteConfig, FioJobConfig]]]
-        str_summary = {}  # type: Dict[FioJobParams, List[IOSummary]]
+        str_summary = {}  # type: Dict[FioJobParams, Tuple[str, str]]
 
         for job in self.rstorage.iter_job(suite):
             fjob = cast(FioJobConfig, job)
@@ -389,10 +392,10 @@
             (ResourceNames.storage_read_iop, ResourceNames.storage_read),
             (ResourceNames.storage_iop, ResourceNames.storage_rw),
             (ResourceNames.storage_cpu_s, ResourceNames.storage_cpu_s_b),
-        ]  # type: List[Union[str, Tuple[Optional[str], ...]]
+        ]  # type: List[Union[str, Tuple[Optional[str], ...]]]
 
         if not iops_ok:
-            table_structure2 = []
+            table_structure2 = []  # type: List[Union[Tuple[str, ...], str]]
             for line in table_structure:
                 if isinstance(line, str):
                     table_structure2.append(line)
@@ -483,7 +486,8 @@
         bytes_names = [ResourceNames.test_write, ResourceNames.test_read, ResourceNames.test_rw,
                        ResourceNames.test_send, ResourceNames.test_recv, ResourceNames.test_net,
                        ResourceNames.storage_write, ResourceNames.storage_read, ResourceNames.storage_rw,
-                       ResourceNames.storage_send, ResourceNames.storage_recv, ResourceNames.storage_net]
+                       ResourceNames.storage_send, ResourceNames.storage_recv,
+                       ResourceNames.storage_net] # type: List[str]
 
         net_pkt_names = [ResourceNames.test_send_pkt, ResourceNames.test_recv_pkt, ResourceNames.test_net_pkt,
                          ResourceNames.storage_send_pkt, ResourceNames.storage_recv_pkt, ResourceNames.storage_net_pkt]
@@ -497,9 +501,9 @@
             HTMLBlock(html.H2(html.center("Resource consumption per service provided")))
 
         for tp, names in pairs:
-            vals = []
-            devs = []
-            avail_names = []
+            vals = []  # type: List[float]
+            devs = []  # type: List[float]
+            avail_names = []  # type: List[str]
             for name in names:
                 if name in records:
                     avail_names.append(name)
@@ -512,7 +516,7 @@
                     devs.append(dev)
 
             # synchronously sort values and names, values is a key
-            vals, names, devs = map(list, zip(*sorted(zip(vals, names, devs))))
+            vals, names, devs = map(list, zip(*sorted(zip(vals, names, devs)))) # type: ignore
 
             ds = DataSource(suite_id=suite.storage_id,
                             job_id=job.storage_id,
@@ -587,18 +591,19 @@
         storage_devs = None
         test_nodes_devs = ['rbd0']
 
-        for node in self.rstorage.find_nodes(STORAGE_ROLES):
-            cjd = set(node.params['ceph_journal_devs'])
-            if journal_devs is None:
-                journal_devs = cjd
-            else:
-                assert journal_devs == cjd, "{!r} != {!r}".format(journal_devs, cjd)
+        for node in self.rstorage.load_nodes():
+            if node.roles.intersection(STORAGE_ROLES):
+                cjd = set(node.params['ceph_journal_devs'])
+                if journal_devs is None:
+                    journal_devs = cjd
+                else:
+                    assert journal_devs == cjd, "{!r} != {!r}".format(journal_devs, cjd)
 
-            csd = set(node.params['ceph_storage_devs'])
-            if storage_devs is None:
-                storage_devs = csd
-            else:
-                assert storage_devs == csd, "{!r} != {!r}".format(storage_devs, csd)
+                csd = set(node.params['ceph_storage_devs'])
+                if storage_devs is None:
+                    storage_devs = csd
+                else:
+                    assert storage_devs == csd, "{!r} != {!r}".format(storage_devs, csd)
 
         trange = (job.reliable_info_range[0] // 1000, job.reliable_info_range[1] // 1000)
 
@@ -611,8 +616,8 @@
 
             # QD heatmap
             nodes = find_nodes_by_roles(self.rstorage.storage, roles)
-            ioq2d = find_sensors_to_2d(self.rstorage, trange, sensor='block-io', devs=devs,
-                                       node_id=nodes, metric='io_queue', )
+            ioq2d = find_sensors_to_2d(self.rstorage, trange, sensor='block-io', dev=devs,
+                                       node_id=nodes, metric='io_queue')
 
             ds = DataSource(suite.storage_id, job.storage_id, AGG_TAG, 'block-io', name, tag="hmap." + default_format)
 
@@ -621,10 +626,10 @@
             yield Menu1st.per_job, job.summary, HTMLBlock(html.img(fname))
 
             # Block size heatmap
-            wc2d = find_sensors_to_2d(self.rstorage, trange, node_id=nodes, sensor='block-io', devs=devs,
+            wc2d = find_sensors_to_2d(self.rstorage, trange, node_id=nodes, sensor='block-io', dev=devs,
                                       metric='writes_completed')
             wc2d[wc2d < 1E-3] = 1
-            sw2d = find_sensors_to_2d(self.rstorage, trange, node_id=nodes, sensor='block-io', devs=devs,
+            sw2d = find_sensors_to_2d(self.rstorage, trange, node_id=nodes, sensor='block-io', dev=devs,
                                       metric='sectors_written')
             data2d = sw2d / wc2d / 1024
             fname = self.plt(plot_hmap_from_2d, ds(metric='wr_block_size'),
@@ -633,7 +638,7 @@
             yield Menu1st.per_job, job.summary, HTMLBlock(html.img(fname))
 
             # iotime heatmap
-            wtime2d = find_sensors_to_2d(self.rstorage, trange, node_id=nodes, sensor='block-io', devs=devs,
+            wtime2d = find_sensors_to_2d(self.rstorage, trange, node_id=nodes, sensor='block-io', dev=devs,
                                          metric='io_time')
             fname = self.plt(plot_hmap_from_2d, ds(metric='io_time'), data2d=wtime2d,
                              xlabel='Time', ylabel="IO time (ms) per second",
@@ -765,10 +770,12 @@
 
     def run(self, ctx: TestRun) -> None:
         job_reporters_cls = [StatInfo, Resources, LoadToolResults, ClusterLoad, CPULoadPlot, QDIOTimeHeatmap]
-        job_reporters = [rcls(ctx.rstorage, DefStyleProfile, DefColorProfile) for rcls in job_reporters_cls]
+        job_reporters = [rcls(ctx.rstorage, DefStyleProfile, DefColorProfile)
+                         for rcls in job_reporters_cls] # type: ignore
 
-        suite_reporters_cls = [IOQD, ResourceQD]
-        suite_reporters = [rcls(ctx.rstorage, DefStyleProfile, DefColorProfile) for rcls in suite_reporters_cls]
+        suite_reporters_cls = [IOQD, ResourceQD]  # type: List[Type[SuiteReporter]]
+        suite_reporters = [rcls(ctx.rstorage, DefStyleProfile, DefColorProfile)
+                           for rcls in suite_reporters_cls]  # type: ignore
 
         root_dir = os.path.dirname(os.path.dirname(wally.__file__))
         doc_templ_path = os.path.join(root_dir, "report_templates/index.html")
@@ -800,7 +807,7 @@
 
             for job in all_jobs:
                 try:
-                    for reporter in job_reporters:
+                    for reporter in job_reporters:  # type: JobReporter
                         logger.debug("Start reporter %s on job %s suite %s",
                                      reporter.__class__.__name__, job.summary, suite.test_type)
                         for block, item, html in reporter.get_divs(suite, job):
@@ -810,13 +817,13 @@
                 except Exception:
                     logger.exception("Failed to generate report for %s", job.summary)
 
-            for reporter in suite_reporters:
+            for sreporter in suite_reporters:  # type: SuiteReporter
                 try:
-                    logger.debug("Start reporter %s on suite %s", reporter.__class__.__name__, suite.test_type)
-                    for block, item, html in reporter.get_divs(suite):
+                    logger.debug("Start reporter %s on suite %s", sreporter.__class__.__name__, suite.test_type)
+                    for block, item, html in sreporter.get_divs(suite):
                         items[block][item].append(html)
-                except Exception as exc:
-                    logger.exception("Failed to generate report")
+                except Exception:
+                    logger.exception("Failed to generate report for suite %s", suite)
 
             if DEBUG:
                 break
diff --git a/wally/resources.py b/wally/resources.py
index d867a1a..dca9dcc 100644
--- a/wally/resources.py
+++ b/wally/resources.py
@@ -9,13 +9,13 @@
 from cephlib.numeric_types import TimeSeries
 from cephlib.wally_storage import find_nodes_by_roles
 from cephlib.storage_selectors import summ_sensors
+from cephlib.node import HWInfo
 
-from .result_classes import IResultStorage, SuiteConfig
+from .result_classes import IWallyStorage, SuiteConfig
 from .utils import STORAGE_ROLES
 from .suits.io.fio import FioJobConfig
 from .suits.job import JobConfig
 from .data_selectors import get_aggregated
-from .hw_info import HWInfo
 
 
 logger = logging.getLogger('wally')
@@ -82,10 +82,10 @@
     return vec.sum() / denom.sum(), numpy.std(vals, ddof=1)
 
 
-iosum_cache = {}  # type: Dict[Tuple[str, str]]
+iosum_cache = {}  # type: Dict[Tuple[str, str], IOSummary]
 
 
-def make_iosum(rstorage: IResultStorage, suite: SuiteConfig, job: FioJobConfig, hist_boxes: int,
+def make_iosum(rstorage: IWallyStorage, suite: SuiteConfig, job: FioJobConfig, hist_boxes: int,
                nc: bool = False) -> IOSummary:
 
     key = (suite.storage_id, job.storage_id)
@@ -110,7 +110,7 @@
 cpu_load_cache = {}  # type: Dict[Tuple[int, Tuple[str, ...], Tuple[int, int]], Dict[str, TimeSeries]]
 
 
-def get_cluster_cpu_load(rstorage: IResultStorage, roles: List[str],
+def get_cluster_cpu_load(rstorage: IWallyStorage, roles: List[str],
                          time_range: Tuple[int, int], nc: bool = False) -> Dict[str, TimeSeries]:
 
     key = (id(rstorage), tuple(roles), time_range)
@@ -121,7 +121,7 @@
     cpu_metrics = "idle guest iowait sirq nice irq steal sys user".split()
     nodes = find_nodes_by_roles(rstorage.storage, roles)
     for name in cpu_metrics:
-        cpu_ts[name] = summ_sensors(rstorage, time_range, nodes=nodes, sensor='system-cpu', metric=name)
+        cpu_ts[name] = summ_sensors(rstorage, time_range, node_id=nodes, sensor='system-cpu', metric=name)
 
     it = iter(cpu_ts.values())
     total_over_time = next(it).data.copy()  # type: numpy.ndarray
@@ -141,11 +141,12 @@
 
 def get_resources_usage(suite: SuiteConfig,
                         job: JobConfig,
-                        rstorage: IResultStorage,
+                        rstorage: IWallyStorage,
                         large_block: int = 256,
                         hist_boxes: int = 10,
                         nc: bool = False) -> Tuple[Dict[str, Tuple[str, float, float]], bool]:
 
+    records = {}  # type: Dict[str, Tuple[str, float, float]]
     if not nc:
         records = rstorage.get_job_info(suite, job, "resource_usage")
         if records is not None:
@@ -163,7 +164,7 @@
 
     records = {
         ResourceNames.data_tr: (b2ssize(io_transfered.sum()) + "B", None, None)
-    }  # type: Dict[str, Tuple[str, float, float]]
+    }
 
     if iops_ok:
         ops_done = io_transfered / (fjob.bsize * unit_conversion_coef_f("KiBps", "Bps"))
@@ -200,7 +201,7 @@
             continue
 
         nodes = find_nodes_by_roles(rstorage.storage, roles)
-        res_ts = summ_sensors(rstorage, job.reliable_info_range_s, nodes=nodes, sensor=sensor, metric=metric)
+        res_ts = summ_sensors(rstorage, job.reliable_info_range_s, node_id=nodes, sensor=sensor, metric=metric)
         if res_ts is None:
             continue
 
@@ -219,10 +220,10 @@
     all_stor_nodes = list(find_nodes_by_roles(rstorage.storage, STORAGE_ROLES))
     for node in all_stor_nodes:
         try:
-            node_hw_info = rstorage.storage.load(HWInfo, 'hw_info', node.node_id)
+            node_hw_info = rstorage.storage.load(HWInfo, 'hw_info', node)
         except KeyError:
             logger.warning("No hw_info available for node %s. Using 'NODE time' instead of " +
-                           "CPU core time for CPU consumption metrics")
+                           "CPU core time for CPU consumption metrics", node)
             stor_cores_count = len(all_stor_nodes)
             break
         stor_cores_count += sum(cores for _, cores in node_hw_info.cores)
@@ -277,4 +278,3 @@
         rstorage.put_job_info(suite, job, "resource_usage", srecords)
 
     return records, iops_ok
-
diff --git a/wally/result_classes.py b/wally/result_classes.py
index 207ba71..d1fd104 100644
--- a/wally/result_classes.py
+++ b/wally/result_classes.py
@@ -4,9 +4,10 @@
 from cephlib.numeric_types import TimeSeries, DataSource
 from cephlib.statistic import StatProps
 from cephlib.istorage import IImagesStorage, Storable, ISensorStorage
+from cephlib.node import NodeInfo
+from cephlib.node_impl import IRPCNode
 
 from .suits.job import JobConfig
-from .node_interfaces import IRPCNode, NodeInfo
 
 
 class SuiteConfig(Storable):
@@ -72,7 +73,7 @@
         self.processed = None  # type: JobStatMetrics
 
 
-class IResultStorage(ISensorStorage, IImagesStorage, metaclass=abc.ABCMeta):
+class IWallyStorage(ISensorStorage, IImagesStorage, metaclass=abc.ABCMeta):
 
     @abc.abstractmethod
     def put_or_check_suite(self, suite: SuiteConfig) -> None:
@@ -128,5 +129,5 @@
         pass
 
     @abc.abstractmethod
-    def find_nodes(self, roles: Union[str, List[str]]) -> List[NodeInfo]:
+    def load_nodes(self) -> List[NodeInfo]:
         pass
\ No newline at end of file
diff --git a/wally/result_storage.py b/wally/result_storage.py
index 282bdb5..70c46d7 100644
--- a/wally/result_storage.py
+++ b/wally/result_storage.py
@@ -1,17 +1,19 @@
 import os
+import json
 import pprint
 import logging
-from typing import cast, Iterator, Tuple, Type, Optional, Any
+from typing import cast, Iterator, Tuple, Type, Optional, Any, Union, List
 
 import numpy
 
 from cephlib.wally_storage import WallyDB
-from cephlib.hlstorage import SensorStorageBase
+from cephlib.sensor_storage import SensorStorageBase
 from cephlib.statistic import StatProps
 from cephlib.numeric_types import DataSource, TimeSeries
+from cephlib.node import NodeInfo
 
 from .suits.job import JobConfig
-from .result_classes import SuiteConfig, IResultStorage
+from .result_classes import SuiteConfig, IWallyStorage
 from .utils import StopTestError
 from .suits.all_suits import all_suits
 
@@ -27,9 +29,10 @@
     return path
 
 
-class ResultStorage(IResultStorage, SensorStorageBase):
+class WallyStorage(IWallyStorage, SensorStorageBase):
     def __init__(self, storage: Storage) -> None:
         SensorStorageBase.__init__(self, storage, WallyDB)
+        self.cached_nodes = None
 
     # -------------  CHECK DATA IN STORAGE  ----------------------------------------------------------------------------
     def check_plot_file(self, source: DataSource) -> Optional[str]:
@@ -74,18 +77,21 @@
 
     def put_job_info(self, suite: SuiteConfig, job: JobConfig, key: str, data: Any) -> None:
         path = self.db_paths.job_extra.format(suite_id=suite.storage_id, job_id=job.storage_id, tag=key)
-        self.storage.put(data, path)
+        if isinstance(data, bytes):
+            self.storage.put_raw(data, path)
+        else:
+            self.storage.put(data, path)
 
     # -------------   GET DATA FROM STORAGE   --------------------------------------------------------------------------
 
     def get_stat(self, stat_cls: Type[StatProps], source: DataSource) -> StatProps:
         return self.storage.load(stat_cls, self.db_paths.stat.format(**source.__dict__))
 
-
     def get_txt_report(self, suite: SuiteConfig) -> Optional[str]:
         path = self.db_paths.txt_report.format(suite_id=suite.storage_id)
         if path in self.storage:
             return self.storage.get_raw(path).decode('utf8')
+        return None
 
     def get_job_info(self, suite: SuiteConfig, job: JobConfig, key: str) -> Any:
         path = self.db_paths.job_extra.format(suite_id=suite.storage_id, job_id=job.storage_id, tag=key)
@@ -110,11 +116,19 @@
             assert job.storage_id == groups['job_id']
             yield job
 
+    def load_nodes(self) -> List[NodeInfo]:
+        if not self.cached_nodes:
+            self.cached_nodes = self.storage.load_list(NodeInfo, WallyDB.all_nodes)
+            if WallyDB.nodes_params in self.storage:
+                params = json.loads(self.storage.get_raw(WallyDB.nodes_params).decode('utf8'))
+                for node in self.cached_nodes:
+                    node.params = params.get(node.node_id, {})
+        return self.cached_nodes
+
     #  -----------------  TS  ------------------------------------------------------------------------------------------
     def get_ts(self, ds: DataSource) -> TimeSeries:
         path = self.db_paths.ts.format_map(ds.__dict__)
         (units, time_units), header2, content = self.storage.get_array(path)
-
         times = content[:,0].copy()
         data = content[:,1:]
 
diff --git a/wally/run_test.py b/wally/run_test.py
index 4e02a75..6fbefc4 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -5,10 +5,13 @@
 from concurrent.futures import Future
 from typing import List, Dict, Tuple, Optional, Union, cast
 
-from . import utils, ssh_utils, hw_info
+from cephlib.wally_storage import WallyDB
+from cephlib.node import NodeInfo, IRPCNode, get_hw_info, get_sw_info
+from cephlib.ssh import parse_ssh_uri
+from cephlib.node_impl import setup_rpc, connect
+
+from . import utils
 from .config import ConfigBlock
-from .node import setup_rpc, connect
-from .node_interfaces import NodeInfo, IRPCNode
 from .stage import Stage, StepOrder
 from .sensors import collect_sensors_data
 from .suits.all_suits import all_suits
@@ -103,7 +106,7 @@
                 node.conn.cli.killall()
                 if node.rpc_log_file is not None:
                     nid = node.node_id
-                    path = "rpc_logs/{}.txt".format(nid)
+                    path = WallyDB.rpc_logs.format(node_id=nid)
                     node.conn.server.flush_logs()
                     log = node.get_file_content(node.rpc_log_file)
                     if path in ctx.storage:
@@ -133,9 +136,9 @@
             # can't make next RPC request until finish with previous
             for node in ctx.nodes:
                 nid = node.node_id
-                hw_info_path = "hw_info/{}".format(nid)
+                hw_info_path = WallyDB.hw_info.format(node_id=nid)
                 if hw_info_path not in ctx.storage:
-                    futures[(hw_info_path, nid)] = pool.submit(hw_info.get_hw_info, node)
+                    futures[(hw_info_path, nid)] = pool.submit(get_hw_info, node)
 
             for (path, nid), future in futures.items():
                 try:
@@ -147,9 +150,9 @@
             futures.clear()
             for node in ctx.nodes:
                 nid = node.node_id
-                sw_info_path = "sw_info/{}".format(nid)
+                sw_info_path = WallyDB.sw_info.format(node_id=nid)
                 if sw_info_path not in ctx.storage:
-                    futures[(sw_info_path, nid)] = pool.submit(hw_info.get_sw_info, node)
+                    futures[(sw_info_path, nid)] = pool.submit(get_sw_info, node)
 
             for (path, nid), future in futures.items():
                 try:
@@ -166,12 +169,12 @@
     config_block = 'nodes'
 
     def run(self, ctx: TestRun) -> None:
-        if 'all_nodes' in ctx.storage:
+        if WallyDB.all_nodes in ctx.storage:
             logger.info("Skip explicid nodes filling, as all_nodes all ready in storage")
             return
 
         for url, roles in ctx.config.get('nodes', {}).raw().items():
-            ctx.merge_node(ssh_utils.parse_ssh_uri(url), set(role.strip() for role in roles.split(",")))
+            ctx.merge_node(parse_ssh_uri(url), set(role.strip() for role in roles.split(",")))
             logger.debug("Add node %s with roles %s", url, roles)
 
 
@@ -293,8 +296,6 @@
 
 class SaveNodesStage(Stage):
     """Save nodes list to file"""
-    nodes_path = 'all_nodes'
-    params_path = 'all_nodes_params.js'
     priority = StepOrder.UPDATE_NODES_INFO + 1
 
     def run(self, ctx: TestRun) -> None:
@@ -302,27 +303,20 @@
         params = {node.node_id: node.params for node in infos}
         ninfos = [copy.copy(node) for node in infos]
         for node in ninfos:
-            node.params = "in {!r} file".format(self.params_path)
-        ctx.storage.put_list(ninfos, self.nodes_path)
-        ctx.storage.put_raw(json.dumps(params).encode('utf8'), self.params_path)
+            node.params = {"in file": WallyDB.nodes_params}
+        ctx.storage.put_list(ninfos, WallyDB.all_nodes)
+        ctx.storage.put_raw(json.dumps(params).encode('utf8'), WallyDB.nodes_params)
 
 
 class LoadStoredNodesStage(Stage):
     priority = StepOrder.DISCOVER
 
     def run(self, ctx: TestRun) -> None:
-        if SaveNodesStage.nodes_path in ctx.storage:
+        if WallyDB.all_nodes in ctx.storage:
             if ctx.nodes_info:
                 logger.error("Internal error: Some nodes already stored in " +
                              "nodes_info before LoadStoredNodesStage stage")
                 raise utils.StopTestError()
 
-            nodes = {node.node_id: node for node in ctx.storage.load_list(NodeInfo, SaveNodesStage.nodes_path)}
-
-            if SaveNodesStage.params_path in ctx.storage:
-                params = json.loads(ctx.storage.get_raw(SaveNodesStage.params_path).decode('utf8'))
-                for node_id, node in nodes.items():
-                    node.params = params.get(node_id, {})
-
-            ctx.nodes_info = nodes
+            ctx.nodes_info = {node.node_id: node for node in ctx.rstorage.load_nodes()}
             logger.info("%s nodes loaded from database", len(ctx.nodes_info))
diff --git a/wally/ssh.py b/wally/ssh.py
deleted file mode 100644
index ddf3e58..0000000
--- a/wally/ssh.py
+++ /dev/null
@@ -1,141 +0,0 @@
-import time
-import errno
-import socket
-import logging
-import os.path
-import selectors
-from io import StringIO
-from typing import cast, Dict, List, Set, Optional
-
-import paramiko
-
-from cephlib.common import Timeout
-
-from .common_types import ConnCreds, IPAddr
-
-logger = logging.getLogger("wally")
-NODE_KEYS = {}  # type: Dict[IPAddr, paramiko.RSAKey]
-SSH_KEY_PASSWD = None  # type: Optional[str]
-
-
-def set_ssh_key_passwd(passwd: str) -> None:
-    global SSH_KEY_PASSWD
-    SSH_KEY_PASSWD = passwd
-
-
-def set_key_for_node(host_port: IPAddr, key: bytes) -> None:
-    with StringIO(key.decode("utf8")) as sio:
-        NODE_KEYS[host_port] = paramiko.RSAKey.from_private_key(sio)  # type: ignore
-
-
-def connect(creds: ConnCreds,
-            conn_timeout: int = 60,
-            tcp_timeout: int = 15,
-            default_banner_timeout: int = 30) -> paramiko.SSHClient:
-
-    ssh = paramiko.SSHClient()
-    ssh.load_host_keys('/dev/null')
-    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
-    ssh.known_hosts = None
-
-    end_time = time.time() + conn_timeout  # type: float
-
-    logger.debug("SSH connecting to %s", creds)
-
-    while True:
-        try:
-            time_left = end_time - time.time()
-            c_tcp_timeout = min(tcp_timeout, time_left)
-
-            banner_timeout_arg = {}  # type: Dict[str, int]
-            if paramiko.__version_info__ >= (1, 15, 2):
-                banner_timeout_arg['banner_timeout'] = int(min(default_banner_timeout, time_left))
-
-            if creds.passwd is not None:
-                ssh.connect(creds.addr.host,
-                            timeout=c_tcp_timeout,
-                            username=creds.user,
-                            password=cast(str, creds.passwd),
-                            port=creds.addr.port,
-                            allow_agent=False,
-                            look_for_keys=False,
-                            **banner_timeout_arg)
-            elif creds.key_file is not None:
-                ssh.connect(creds.addr.host,
-                            username=creds.user,
-                            timeout=c_tcp_timeout,
-                            pkey=paramiko.RSAKey.from_private_key_file(creds.key_file, password=SSH_KEY_PASSWD),
-                            look_for_keys=False,
-                            port=creds.addr.port,
-                            **banner_timeout_arg)
-            elif creds.key is not None:
-                with StringIO(creds.key.decode("utf8")) as sio:
-                    ssh.connect(creds.addr.host,
-                                username=creds.user,
-                                timeout=c_tcp_timeout,
-                                pkey=paramiko.RSAKey.from_private_key(sio, password=SSH_KEY_PASSWD),  # type: ignore
-                                look_for_keys=False,
-                                port=creds.addr.port,
-                                **banner_timeout_arg)
-            elif (creds.addr.host, creds.addr.port) in NODE_KEYS:
-                ssh.connect(creds.addr.host,
-                            username=creds.user,
-                            timeout=c_tcp_timeout,
-                            pkey=NODE_KEYS[creds.addr],
-                            look_for_keys=False,
-                            port=creds.addr.port,
-                            **banner_timeout_arg)
-            else:
-                key_file = os.path.expanduser('~/.ssh/id_rsa')
-                ssh.connect(creds.addr.host,
-                            username=creds.user,
-                            timeout=c_tcp_timeout,
-                            key_filename=key_file,
-                            look_for_keys=False,
-                            port=creds.addr.port,
-                            **banner_timeout_arg)
-            return ssh
-        except (socket.gaierror, paramiko.PasswordRequiredException):
-            raise
-        except socket.error:
-            if time.time() > end_time:
-                raise
-            time.sleep(1)
-
-
-def wait_ssh_available(addrs: List[IPAddr],
-                       timeout: int = 300,
-                       tcp_timeout: float = 1.0) -> None:
-
-    addrs_set = set(addrs)  # type: Set[IPAddr]
-
-    for _ in Timeout(timeout):
-        selector = selectors.DefaultSelector()  # type: selectors.BaseSelector
-        with selector:
-            for addr in addrs_set:
-                sock = socket.socket()
-                sock.setblocking(False)
-                try:
-                    sock.connect(addr)
-                except BlockingIOError:
-                    pass
-                selector.register(sock, selectors.EVENT_READ, data=addr)
-
-            etime = time.time() + tcp_timeout
-            ltime = etime - time.time()
-            while ltime > 0:
-                # convert to greater or equal integer
-                for key, _ in selector.select(timeout=int(ltime + 0.99999)):
-                    selector.unregister(key.fileobj)
-                    try:
-                        key.fileobj.getpeername()  # type: ignore
-                        addrs_set.remove(key.data)
-                    except OSError as exc:
-                        if exc.errno == errno.ENOTCONN:
-                            pass
-                ltime = etime - time.time()
-
-        if not addrs_set:
-            break
-
-
diff --git a/wally/ssh_utils.py b/wally/ssh_utils.py
deleted file mode 100644
index 56d317f..0000000
--- a/wally/ssh_utils.py
+++ /dev/null
@@ -1,68 +0,0 @@
-import re
-import getpass
-import logging
-from typing import List, Dict
-
-
-from cephlib.common import to_ip
-from .common_types import ConnCreds
-
-
-logger = logging.getLogger("wally")
-
-
-class URIsNamespace:
-    class ReParts:
-        user_rr = "[^:]*?"
-        host_rr = "[^:@]*?"
-        port_rr = "\\d+"
-        key_file_rr = "[^:@]*"
-        passwd_rr = ".*?"
-
-    re_dct = ReParts.__dict__
-
-    for attr_name, val in re_dct.items():
-        if attr_name.endswith('_rr'):
-            new_rr = "(?P<{0}>{1})".format(attr_name[:-3], val)
-            setattr(ReParts, attr_name, new_rr)
-
-    re_dct = ReParts.__dict__
-
-    templs = [
-        "^{host_rr}$",
-        "^{host_rr}:{port_rr}$",
-        "^{host_rr}::{key_file_rr}$",
-        "^{host_rr}:{port_rr}:{key_file_rr}$",
-        "^{user_rr}@{host_rr}$",
-        "^{user_rr}@{host_rr}:{port_rr}$",
-        "^{user_rr}@{host_rr}::{key_file_rr}$",
-        "^{user_rr}@{host_rr}:{port_rr}:{key_file_rr}$",
-        "^{user_rr}:{passwd_rr}@{host_rr}$",
-        "^{user_rr}:{passwd_rr}@{host_rr}:{port_rr}$",
-    ]
-
-    uri_reg_exprs = []  # type: List[str]
-    for templ in templs:
-        uri_reg_exprs.append(templ.format(**re_dct))
-
-
-def parse_ssh_uri(uri: str) -> ConnCreds:
-    """Parse ssh connection URL from one of following form
-        [ssh://]user:passwd@host[:port]
-        [ssh://][user@]host[:port][:key_file]
-    """
-
-    if uri.startswith("ssh://"):
-        uri = uri[len("ssh://"):]
-
-    for rr in URIsNamespace.uri_reg_exprs:
-        rrm = re.match(rr, uri)
-        if rrm is not None:
-            params = {"user": getpass.getuser()}  # type: Dict[str, str]
-            params.update(rrm.groupdict())
-            params['host'] = to_ip(params['host'])
-            return ConnCreds(**params)  # type: ignore
-
-    raise ValueError("Can't parse {0!r} as ssh uri value".format(uri))
-
-
diff --git a/wally/suits/io/fio.py b/wally/suits/io/fio.py
index 1895b6c..374e5a4 100644
--- a/wally/suits/io/fio.py
+++ b/wally/suits/io/fio.py
@@ -1,13 +1,14 @@
 import os.path
 import logging
-from typing import cast, Any, List, Union
+from typing import cast, Any, List, Union, Tuple, Optional
 
 import numpy
 
+from cephlib.units import ssize2b, b2ssize
+from cephlib.node import IRPCNode, get_os
+
 import wally
-from ...utils import StopTestError, ssize2b, b2ssize
-from ...node_interfaces import IRPCNode
-from ...node_utils import get_os
+from ...utils import StopTestError
 from ..itest import ThreadedTest
 from ...result_classes import TimeSeries, DataSource
 from ..job import JobConfig
@@ -189,7 +190,7 @@
         node.conn.fs.unlink(self.remote_output_file)
 
         files = [name for name in node.conn.fs.listdir(self.exec_folder)]
-        result = []
+        result = []  # type: List[TimeSeries]
         for name, file_path, units in get_log_files(cast(FioJobConfig, job)):
             log_files = [fname for fname in files if fname.startswith(file_path)]
             if len(log_files) != 1:
@@ -235,19 +236,16 @@
                         logger.exception("Error during parse %s fio log file in line %s: %r", name, idx, line)
                         raise StopTestError()
 
-            if not self.suite.keep_raw_files:
-                raw_result = None
+            assert not self.suite.keep_raw_files, "keep_raw_files is not supported"
 
             histo_bins = None if name != 'lat' else numpy.array(get_lat_vals())
-
-            result.append(TimeSeries(name=name,
-                                     raw=raw_result,
-                                     data=numpy.array(parsed, dtype='uint64'),
-                                     units=units,
-                                     times=numpy.array(times, dtype='uint64'),
-                                     time_units='ms',
-                                     source=path(metric=name, tag='csv'),
-                                     histo_bins=histo_bins))
+            ts = TimeSeries(data=numpy.array(parsed, dtype='uint64'),
+                            units=units,
+                            times=numpy.array(times, dtype='uint64'),
+                            time_units='ms',
+                            source=path(metric=name, tag='csv'),
+                            histo_bins=histo_bins)
+            result.append(ts)
         return result
 
     def format_for_console(self, data: Any) -> str:
diff --git a/wally/suits/io/fio_job.py b/wally/suits/io/fio_job.py
index 39715ef..3d4e886 100644
--- a/wally/suits/io/fio_job.py
+++ b/wally/suits/io/fio_job.py
@@ -1,10 +1,9 @@
-import abc
 import copy
 from collections import OrderedDict
 from typing import Optional, Iterator, Union, Dict, Tuple, NamedTuple, Any, cast
 
+from cephlib.units import ssize2b, b2ssize
 
-from ...utils import ssize2b, b2ssize
 from ..job import JobConfig, JobParams
 
 
diff --git a/wally/suits/io/fio_task_parser.py b/wally/suits/io/fio_task_parser.py
index 222589b..9ffeed8 100644
--- a/wally/suits/io/fio_task_parser.py
+++ b/wally/suits/io/fio_task_parser.py
@@ -9,8 +9,9 @@
 from typing import Optional, Iterator, Union, Dict, Iterable, List, Tuple, NamedTuple, Any
 from collections import OrderedDict
 
+from cephlib.units import ssize2b
+from cephlib.common import flatmap, sec_to_str
 
-from ...utils import sec_to_str, ssize2b, flatmap
 from .fio_job import Var, FioJobConfig
 
 SECTION = 0
@@ -87,16 +88,12 @@
                                  fname, lineno, oline)
 
             if line.startswith('['):
-                yield CfgLine(fname, lineno, oline, SECTION,
-                              line[1:-1].strip(), None)
+                yield CfgLine(fname, lineno, oline, SECTION, line[1:-1].strip(), None)
             elif '=' in line:
                 opt_name, opt_val = line.split('=', 1)
-                yield CfgLine(fname, lineno, oline, SETTING,
-                              opt_name.strip(),
-                              parse_value(opt_val.strip()))
+                yield CfgLine(fname, lineno, oline, SETTING, opt_name.strip(), parse_value(opt_val.strip()))
             elif line.startswith("include "):
-                yield CfgLine(fname, lineno, oline, INCLUDE,
-                              line.split(" ", 1)[1], None)
+                yield CfgLine(fname, lineno, oline, INCLUDE, line.split(" ", 1)[1], None)
             else:
                 yield CfgLine(fname, lineno, oline, SETTING, line, '1')
 
diff --git a/wally/suits/io/rpc_plugin.py b/wally/suits/io/rpc_plugin.py
index 39ed5cc..1c7fee0 100644
--- a/wally/suits/io/rpc_plugin.py
+++ b/wally/suits/io/rpc_plugin.py
@@ -42,10 +42,13 @@
 
     assert size % 4 == 0, "File size must be proportional to 4M"
 
-    cmd_templ = "{} --name=xxx --filename={} --direct=1 --bs=4m --size={}m --rw=write"
+    cmd_templ = "{0} --name=xxx --filename={1} --direct=1 --bs=4m --size={2}m --rw=write"
 
     run_time = time.time()
-    subprocess.check_output(cmd_templ.format(fio_path, fname, size), shell=True)
+    try:
+        subprocess.check_output(cmd_templ.format(fio_path, fname, size), shell=True)
+    except subprocess.CalledProcessError as exc:
+        raise RuntimeError("{0!s}.\nOutput: {1}".format(exc, exc.output))
     run_time = time.time() - run_time
 
     prefill_bw = None if run_time < 1.0 else int(size / run_time)
@@ -55,6 +58,6 @@
 
 def rpc_install(name, binary):
     try:
-        subprocess.check_output("which {}".format(binary), shell=True)
+        subprocess.check_output("which {0}".format(binary), shell=True)
     except:
-        subprocess.check_output("apt-get install -y {}".format(name), shell=True)
+        subprocess.check_output("apt-get install -y {0}".format(name), shell=True)
diff --git a/wally/suits/io/rrd.cfg b/wally/suits/io/rrd.cfg
index ae2d960..b013796 100644
--- a/wally/suits/io/rrd.cfg
+++ b/wally/suits/io/rrd.cfg
@@ -8,27 +8,27 @@
 rw=randread
 iodepth=1
 
-[test_{TEST_SUMM}]
-iodepth=16
-blocksize=60k
-rw=randread
+#[test_{TEST_SUMM}]
+#iodepth=16
+#blocksize=60k
+#rw=randread
 
-[test_{TEST_SUMM}]
-blocksize=60k
-rw=randwrite
-iodepth=1
+#[test_{TEST_SUMM}]
+#blocksize=60k
+#rw=randwrite
+#iodepth=1
 
-[test_{TEST_SUMM}]
-iodepth=16
-blocksize=60k
-rw=randwrite
+#[test_{TEST_SUMM}]
+#iodepth=16
+#blocksize=60k
+#rw=randwrite
 
-[test_{TEST_SUMM}]
-iodepth=1
-blocksize=1m
-rw=write
+#[test_{TEST_SUMM}]
+#iodepth=1
+#blocksize=1m
+#rw=write
 
-[test_{TEST_SUMM}]
-iodepth=1
-blocksize=1m
-rw=read
+#[test_{TEST_SUMM}]
+#iodepth=1
+#blocksize=1m
+#rw=read
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index e848442..b2b3a54 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -2,13 +2,14 @@
 import time
 import logging
 import os.path
-from typing import Any, List, Optional, Callable, Iterable, cast
+from typing import Any, List, Optional, Callable, Iterable, cast, Tuple
 
 from concurrent.futures import ThreadPoolExecutor, wait
 
+from cephlib.node import IRPCNode
+
 from ..utils import StopTestError, get_time_interval_printable_info
-from ..node_interfaces import IRPCNode
-from ..result_classes import SuiteConfig, JobConfig, TimeSeries, IResultStorage
+from ..result_classes import SuiteConfig, JobConfig, TimeSeries, IWallyStorage
 
 
 logger = logging.getLogger("wally")
@@ -24,7 +25,7 @@
     retry_time = 30
     job_config_cls = None  # type: type
 
-    def __init__(self, storage: IResultStorage, suite: SuiteConfig, on_idle: Callable[[], None] = None) -> None:
+    def __init__(self, storage: IWallyStorage, suite: SuiteConfig, on_idle: Callable[[], None] = None) -> None:
         self.suite = suite
         self.stop_requested = False
         self.sorted_nodes_ids = sorted(node.node_id for node in self.suite.nodes)
diff --git a/wally/suits/job.py b/wally/suits/job.py
index 5f3c764..8ef1093 100644
--- a/wally/suits/job.py
+++ b/wally/suits/job.py
@@ -2,7 +2,7 @@
 from typing import Dict, Any, Tuple, cast, Union
 from collections import OrderedDict
 
-from cephlib.storage import Storable
+from cephlib.istorage import Storable
 
 
 class JobParams(metaclass=abc.ABCMeta):
diff --git a/wally/test_run_class.py b/wally/test_run_class.py
index b67035e..a7721e6 100644
--- a/wally/test_run_class.py
+++ b/wally/test_run_class.py
@@ -2,18 +2,18 @@
 from concurrent.futures import ThreadPoolExecutor
 
 from cephlib.istorage import IStorage
+from cephlib.node import NodeInfo, IRPCNode
+from cephlib.ssh import ConnCreds
 
-from .node_interfaces import NodeInfo, IRPCNode
 from .openstack_api import OSCreds, OSConnection
 from .config import Config
 from .fuel_rest_api import Connection
-from .ssh_utils import ConnCreds
-from .result_classes import IResultStorage
+from .result_classes import IWallyStorage
 
 
 class TestRun:
     """Test run information"""
-    def __init__(self, config: Config, storage: IStorage, rstorage: IResultStorage) -> None:
+    def __init__(self, config: Config, storage: IStorage, rstorage: IWallyStorage) -> None:
         # NodesInfo list
         self.nodes_info = {}  # type: Dict[str, NodeInfo]
 
diff --git a/wally/utils.py b/wally/utils.py
index dfd0e8c..754734d 100644
--- a/wally/utils.py
+++ b/wally/utils.py
@@ -19,7 +19,7 @@
 logger = logging.getLogger("wally")
 
 
-STORAGE_ROLES = {'ceph-osd'}
+STORAGE_ROLES = ['ceph-osd']
 
 
 class StopTestError(RuntimeError):
@@ -121,9 +121,3 @@
     return exec_time_s, "{:%H:%M:%S}".format(end_dt)
 
 
-def shape2str(shape: Iterable[int]) -> str:
-    return "*".join(map(str, shape))
-
-
-def str2shape(shape: str) -> Tuple[int, ...]:
-    return tuple(map(int, shape.split('*')))