diff --git a/wally/ceph.py b/wally/ceph.py
index 35e78b4..4afff06 100644
--- a/wally/ceph.py
+++ b/wally/ceph.py
@@ -1,7 +1,7 @@
 """ Collect data about ceph nodes"""
+import enum
 import logging
 from typing import Dict, cast, List, Set
-
 from cephlib import discover
 from cephlib.discover import OSDInfo
 from cephlib.common import to_ip
@@ -19,7 +19,6 @@
 
 def get_osds_info(node: IRPCNode, ceph_extra_args: str = "", thcount: int = 8) -> Dict[IP, List[OSDInfo]]:
     """Get set of osd's ip"""
-    res: Dict[IP, List[OSDInfo]] = {}
     return {IP(ip): osd_info_list
             for ip, osd_info_list in discover.get_osds_nodes(node.run, ceph_extra_args, thcount=thcount).items()}
 
@@ -45,7 +44,12 @@
 
         ignore_errors = 'ignore_errors' in ctx.config.discover
         ceph = ctx.config.ceph
-        root_node_uri = cast(str, ceph.root_node)
+        try:
+            root_node_uri = cast(str, ceph.root_node)
+        except AttributeError:
+            logger.error("'root_node' option must be provided in 'ceph' config section. " +
+                         "It must be the name of the node, which has access to ceph")
+            raise StopTestError()
         cluster = ceph.get("cluster", "ceph")
         ip_remap = ctx.config.ceph.get('ip_remap', {})
 
@@ -58,13 +62,7 @@
         if key is None:
             key = f"/etc/ceph/{cluster}.client.admin.keyring"
 
-        ceph_extra_args = ""
-
-        if conf:
-            ceph_extra_args += f" -c '{conf}'"
-
-        if key:
-            ceph_extra_args += f" -k '{key}'"
+        ctx.ceph_extra_args = f" -c '{conf}' -k '{key}'"
 
         logger.debug(f"Start discovering ceph nodes from root {root_node_uri}")
         logger.debug(f"cluster={cluster} key={conf} conf={key}")
@@ -73,42 +71,46 @@
 
         ceph_params = {"cluster": cluster, "conf": conf, "key": key}
 
-        with setup_rpc(connect(info), ctx.rpc_code, ctx.default_rpc_plugins,
-                       log_level=ctx.config.rpc_log_level) as node:
+        ssh_user = ctx.config.ssh_opts.get("user")
+        ssh_key = ctx.config.ssh_opts.get("key")
 
-            try:
-                ips = set()
-                for ip, osds_info in get_osds_info(node, ceph_extra_args, thcount=16).items():
-                    ip = ip_remap.get(ip, ip)
-                    ips.add(ip)
-                    creds = ConnCreds(to_ip(cast(str, ip)), user="root")
-                    info = ctx.merge_node(creds, {'ceph-osd'})
-                    info.params.setdefault('ceph-osds', []).extend(info.__dict__.copy() for info in osds_info)
-                    assert 'ceph' not in info.params or info.params['ceph'] == ceph_params
-                    info.params['ceph'] = ceph_params
-                logger.debug(f"Found {len(ips)} nodes with ceph-osd role")
-            except Exception as exc:
-                if not ignore_errors:
-                    logger.exception("OSD discovery failed")
-                    raise StopTestError()
-                else:
-                    logger.warning(f"OSD discovery failed {exc}")
+        node = ctx.ceph_master_node = setup_rpc(connect(info), ctx.rpc_code, ctx.default_rpc_plugins,
+                                                log_level=ctx.config.rpc_log_level,
+                                                sudo=ctx.config.ssh_opts.get("sudo", False))
 
-            try:
-                counter = 0
-                for counter, ip in enumerate(get_mons_ips(node, ceph_extra_args)):
-                    ip = ip_remap.get(ip, ip)
-                    creds = ConnCreds(to_ip(cast(str, ip)), user="root")
-                    info = ctx.merge_node(creds, {'ceph-mon'})
-                    assert 'ceph' not in info.params or info.params['ceph'] == ceph_params
-                    info.params['ceph'] = ceph_params
-                logger.debug(f"Found {counter + 1} nodes with ceph-mon role")
-            except Exception as exc:
-                if not ignore_errors:
-                    logger.exception("MON discovery failed")
-                    raise StopTestError()
-                else:
-                    logger.warning(f"MON discovery failed {exc}")
+        try:
+            ips = set()
+            for ip, osds_info in get_osds_info(node, ctx.ceph_extra_args, thcount=16).items():
+                ip = ip_remap.get(ip, ip)
+                ips.add(ip)
+                creds = ConnCreds(to_ip(cast(str, ip)), user=ssh_user, key_file=ssh_key)
+                info = ctx.merge_node(creds, {'ceph-osd'})
+                info.params.setdefault('ceph-osds', []).extend(info.__dict__.copy() for info in osds_info)
+                assert 'ceph' not in info.params or info.params['ceph'] == ceph_params
+                info.params['ceph'] = ceph_params
+            logger.debug(f"Found {len(ips)} nodes with ceph-osd role")
+        except Exception as exc:
+            if not ignore_errors:
+                logger.exception("OSD discovery failed")
+                raise StopTestError()
+            else:
+                logger.warning(f"OSD discovery failed {exc}")
+
+        try:
+            counter = 0
+            for counter, ip in enumerate(get_mons_ips(node, ctx.ceph_extra_args)):
+                ip = ip_remap.get(ip, ip)
+                creds = ConnCreds(to_ip(cast(str, ip)),  user=ssh_user, key_file=ssh_key)
+                info = ctx.merge_node(creds, {'ceph-mon'})
+                assert 'ceph' not in info.params or info.params['ceph'] == ceph_params
+                info.params['ceph'] = ceph_params
+            logger.debug(f"Found {counter + 1} nodes with ceph-mon role")
+        except Exception as exc:
+            if not ignore_errors:
+                logger.exception("MON discovery failed")
+                raise StopTestError()
+            else:
+                logger.warning(f"MON discovery failed {exc}")
 
 
 def raw_dev_name(path: str) -> str:
@@ -130,12 +132,27 @@
                     jdevs: Set[str] = set()
                     sdevs: Set[str] = set()
                     for osd_info in node.info.params['ceph-osds']:
-                        for key, sset in [('journal', jdevs), ('storage', sdevs)]:
-                            path = osd_info.get(key)
-                            if path:
+
+                        if osd_info['bluestore'] is None:
+                            osd_stor_type_b = node.conn.fs.get_file(osd_info['storage'] + "/type", compress=False)
+                            osd_stor_type = osd_stor_type_b.decode('utf8').strip()
+                            osd_info['bluestore'] = osd_stor_type == 'bluestore'
+
+                        if osd_info['bluestore']:
+                            for name, sset in [('block.db', jdevs), ('block.wal', jdevs), ('block', sdevs)]:
+                                path = f"{osd_info['storage']}/{name}"
                                 dpath = node.conn.fs.get_dev_for_file(path)
                                 if isinstance(dpath, bytes):
                                     dpath = dpath.decode('utf8')
                                 sset.add(raw_dev_name(dpath))
+                        else:
+                            for key, sset in [('journal', jdevs), ('storage', sdevs)]:
+                                path = osd_info.get(key)
+                                if path:
+                                    dpath = node.conn.fs.get_dev_for_file(path)
+                                    if isinstance(dpath, bytes):
+                                        dpath = dpath.decode('utf8')
+                                    sset.add(raw_dev_name(dpath))
+
                     node.info.params['ceph_storage_devs'] = list(sdevs)
                     node.info.params['ceph_journal_devs'] = list(jdevs)
diff --git a/wally/config.py b/wally/config.py
index 1090d6c..cbcacfa 100644
--- a/wally/config.py
+++ b/wally/config.py
@@ -10,30 +10,28 @@
     def __init__(self, dct: ConfigBlock) -> None:
         # make mypy happy, set fake dict
         self.__dict__['_dct'] = {}
-        self.run_uuid: str = None
-        self.storage_url: str = None
-        self.comment: str = None
-        self.keep_vm: bool = None
-        self.dont_discover_nodes: bool = None
-        self.build_id: str = None
-        self.build_description: str = None
-        self.build_type: str = None
-        self.default_test_local_folder: str = None
-        self.settings_dir: str = None
-        self.connect_timeout: int = None
+        self.run_uuid: str = None  # type: ignore
+        self.storage_url: str = None  # type: ignore
+        self.comment: str = None  # type: ignore
+        self.keep_vm: bool = None  # type: ignore
+        self.dont_discover_nodes: bool = None  # type: ignore
+        self.build_id: str = None  # type: ignore
+        self.build_description: str = None  # type: ignore
+        self.build_type: str = None  # type: ignore
+        self.default_test_local_folder: str = None  # type: ignore
+        self.settings_dir: str = None  # type: ignore
+        self.connect_timeout: int = None  # type: ignore
         self.no_tests: bool = False
         self.debug_agents: bool = False
 
-        # None, disabled, enabled, metadata, ignore_errors
-        self.discover: Optional[str] = None
+        self.logging: 'Config' = None  # type: ignore
+        self.ceph: 'Config' = None  # type: ignore
+        self.openstack: 'Config' = None  # type: ignore
+        self.test: 'Config' = None  # type: ignore
+        self.sensors: 'Config' = None  # type: ignore
 
-        self.logging: 'Config' = None
-        self.ceph: 'Config' = None
-        self.openstack: 'Config' = None
-        self.fuel: 'Config' = None
-        self.test: 'Config' = None
-        self.sensors: 'Config' = None
-        self.discover: Set[str] = None
+        # None, disabled, enabled, metadata, ignore_errors
+        self.discover: Set[str] = None  # type: ignore
 
         self._dct.clear()
         self._dct.update(dct)
diff --git a/wally/data_selectors.py b/wally/data_selectors.py
index 2b9037e..8d4f630 100644
--- a/wally/data_selectors.py
+++ b/wally/data_selectors.py
@@ -8,7 +8,6 @@
 from cephlib.node import NodeInfo
 
 from .result_classes import IWallyStorage
-from .suits.io.fio_hist import expected_lat_bins
 
 
 logger = logging.getLogger("wally")
@@ -65,11 +64,11 @@
             logger.error(msg)
             raise ValueError(msg)
 
-        if metric == 'lat' and (len(ts.data.shape) != 2 or ts.data.shape[1] != expected_lat_bins):
-            msg = f"Sensor {ts.source.dev}.{ts.source.sensor} on node {ts.source.node_id} " + \
-                f"has shape={ts.data.shape}. Can only process sensors with shape=[X, {expected_lat_bins}]."
-            logger.error(msg)
-            raise ValueError(msg)
+        # if metric == 'lat' and (len(ts.data.shape) != 2 or ts.data.shape[1] != expected_lat_bins):
+        #     msg = f"Sensor {ts.source.dev}.{ts.source.sensor} on node {ts.source.node_id} " + \
+        #         f"has shape={ts.data.shape}. Can only process sensors with shape=[X, {expected_lat_bins}]."
+        #     logger.error(msg)
+        #     raise ValueError(msg)
 
         if metric != 'lat' and len(ts.data.shape) != 1:
             msg = f"Sensor {ts.source.dev}.{ts.source.sensor} on node {ts.source.node_id} " + \
@@ -80,7 +79,6 @@
         assert trange[0] >= ts.times[0] and trange[1] <= ts.times[-1], \
             f"[{ts.times[0]}, {ts.times[-1]}] not in [{trange[0]}, {trange[-1]}]"
 
-
         idx1, idx2 = numpy.searchsorted(ts.times, trange)
         idx2 += 1
 
diff --git a/wally/fuel.py b/wally/fuel.py
deleted file mode 100644
index 668be21..0000000
--- a/wally/fuel.py
+++ /dev/null
@@ -1,125 +0,0 @@
-import logging
-from typing import Dict, List, NamedTuple, Union, cast
-
-from paramiko.ssh_exception import AuthenticationException
-
-from cephlib.common import parse_creds, to_ip
-from cephlib.ssh import ConnCreds
-from cephlib.node_impl import connect, setup_rpc
-
-from .fuel_rest_api import get_cluster_id, reflect_cluster, FuelInfo, KeystoneAuth
-from .utils import StopTestError
-from .stage import Stage, StepOrder
-from .test_run_class import TestRun
-from .config import ConfigBlock
-from .openstack_api import OSCreds
-
-
-logger = logging.getLogger("wally")
-
-
-FuelNodeInfo = NamedTuple("FuelNodeInfo",
-                          [("version", List[int]),
-                           ("fuel_ext_iface", str),
-                           ("openrc", Dict[str, Union[str, bool]])])
-
-
-
-class DiscoverFuelStage(Stage):
-    """"Fuel nodes discovery, also can get openstack openrc"""
-
-    priority = StepOrder.DISCOVER
-    config_block = 'fuel'
-
-    @classmethod
-    def validate(cls, cfg: ConfigBlock) -> None:
-        # msg = "openstack_env should be provided in fuel config"
-        # check_input_param('openstack_env' in fuel_data, msg)
-        # fuel.openstack_env
-        pass
-
-    def run(self, ctx: TestRun) -> None:
-        full_discovery = 'fuel' in ctx.config.discover
-        metadata_only = (not full_discovery) and ('metadata' in ctx.config.discover)
-        ignore_errors = 'ignore_errors' in ctx.config.discover
-
-        if not (metadata_only or full_discovery):
-            logger.debug("Skip ceph discovery due to config setting")
-            return
-
-        if "fuel_os_creds" in ctx.storage and 'fuel_version' in ctx.storage:
-            logger.debug("Skip FUEL credentials discovery, use previously discovered info")
-            ctx.fuel_openstack_creds = OSCreds(*cast(List, ctx.storage.get('fuel_os_creds')))
-            ctx.fuel_version = ctx.storage.get('fuel_version')
-            if 'all_nodes' in ctx.storage:
-                logger.debug("Skip FUEL nodes discovery, use data from DB")
-                return
-            elif metadata_only:
-                logger.debug("Skip FUEL nodes  discovery due to discovery settings")
-                return
-
-        fuel = ctx.config.fuel
-        fuel_node_info = ctx.merge_node(fuel.ssh_creds, {'fuel_master'})
-        creds = dict(zip(("user", "passwd", "tenant"), parse_creds(fuel.creds)))
-        fuel_conn = KeystoneAuth(fuel.url, creds)
-
-        cluster_id = get_cluster_id(fuel_conn, fuel.openstack_env)
-        cluster = reflect_cluster(fuel_conn, cluster_id)
-
-        if ctx.fuel_version is None:
-            ctx.fuel_version = FuelInfo(fuel_conn).get_version()
-            ctx.storage.put(ctx.fuel_version, "fuel_version")
-
-            logger.info("Found FUEL {0}".format(".".join(map(str, ctx.fuel_version))))
-            openrc = cluster.get_openrc()
-
-            if openrc:
-                auth_url = cast(str, openrc['os_auth_url'])
-                if ctx.fuel_version >= [8, 0] and auth_url.startswith("https://"):
-                    logger.warning("Fixing FUEL 8.0 AUTH url - replace https://->http://")
-                    auth_url = auth_url.replace("https", "http", 1)
-
-                os_creds = OSCreds(name=cast(str, openrc['username']),
-                                   passwd=cast(str, openrc['password']),
-                                   tenant=cast(str, openrc['tenant_name']),
-                                   auth_url=cast(str, auth_url),
-                                   insecure=cast(bool, openrc['insecure']))
-
-                ctx.fuel_openstack_creds = os_creds
-            else:
-                ctx.fuel_openstack_creds = None
-
-            ctx.storage.put(list(ctx.fuel_openstack_creds), "fuel_os_creds")
-
-        if metadata_only:
-            logger.debug("Skip FUEL nodes  discovery due to discovery settings")
-            return
-
-        try:
-            fuel_rpc = setup_rpc(connect(fuel_node_info),
-                                 ctx.rpc_code,
-                                 ctx.default_rpc_plugins,
-                                 log_level=ctx.config.rpc_log_level)
-        except AuthenticationException:
-            msg = "FUEL nodes discovery failed - wrong FUEL master SSH credentials"
-            if ignore_errors:
-                raise StopTestError(msg)
-            logger.warning(msg)
-            return
-        except Exception as exc:
-            if ignore_errors:
-                logger.exception("While connection to FUEL")
-                raise StopTestError("Failed to connect to FUEL")
-            logger.warning("Failed to connect to FUEL - %s", exc)
-            return
-
-        logger.debug("Downloading FUEL node ssh master key")
-        fuel_key = fuel_rpc.get_file_content('/root/.ssh/id_rsa')
-        network = 'fuelweb_admin' if ctx.fuel_version >= [6, 0] else 'admin'
-
-        count = 0
-        for count, fuel_node in enumerate(list(cluster.get_nodes())):
-            ip = str(fuel_node.get_ip(network))
-            ctx.merge_node(ConnCreds(to_ip(ip), "root", key=fuel_key), set(fuel_node.get_roles()))
-
-        logger.debug("Found {} FUEL nodes for env {}".format(count, fuel.openstack_env))
diff --git a/wally/fuel_rest_api.py b/wally/fuel_rest_api.py
deleted file mode 100644
index b1cfc0c..0000000
--- a/wally/fuel_rest_api.py
+++ /dev/null
@@ -1,338 +0,0 @@
-import re
-import abc
-import json
-import logging
-import urllib.parse
-import urllib.error
-import urllib.request
-from typing import Dict, Any, Iterator, List, Callable, cast
-from functools import partial
-
-import netaddr
-from keystoneclient import exceptions
-from keystoneclient.v2_0 import Client as keystoneclient
-
-
-logger = logging.getLogger("wally")
-
-
-class Connection(metaclass=abc.ABCMeta):
-    host = None  # type: str
-
-    @abc.abstractmethod
-    def do(self, method: str, path: str, params: Dict = None) -> Dict:
-        pass
-
-    def get(self, path: str, params: Dict = None) -> Dict:
-        return self.do("GET", path, params)
-
-
-class Urllib2HTTP(Connection):
-    """
-    class for making HTTP requests
-    """
-
-    allowed_methods = ('get', 'put', 'post', 'delete', 'patch', 'head')
-
-    def __init__(self, root_url: str, headers: Dict[str, str] = None) -> None:
-        """
-        """
-        if root_url.endswith('/'):
-            self.root_url = root_url[:-1]
-        else:
-            self.root_url = root_url
-
-        self.host = urllib.parse.urlparse(self.root_url).hostname
-
-        if headers is None:
-            self.headers = {}  # type: Dict[str, str]
-        else:
-            self.headers  = headers
-
-    def do(self, method: str, path: str, params: Dict = None) -> Any:
-        if path.startswith('/'):
-            url = self.root_url + path
-        else:
-            url = self.root_url + '/' + path
-
-        if method == 'get':
-            assert params == {} or params is None
-            data_json = None
-        else:
-            data_json = json.dumps(params)
-
-        logger.debug("HTTP: {0} {1}".format(method.upper(), url))
-
-        request = urllib.request.Request(url,
-                                         data=data_json.encode("utf8"),
-                                         headers=self.headers)
-        if data_json is not None:
-            request.add_header('Content-Type', 'application/json')
-
-        request.get_method = lambda: method.upper()  # type: ignore
-        response = urllib.request.urlopen(request)
-        code = response.code  # type: ignore
-
-        logger.debug("HTTP Responce: {0}".format(code))
-        if code < 200 or code > 209:
-            raise IndexError(url)
-
-        content = response.read()
-
-        if '' == content:
-            return None
-
-        return json.loads(content.decode("utf8"))
-
-    def __getattr__(self, name: str) -> Any:
-        if name in self.allowed_methods:
-            return partial(self.do, name)
-        raise AttributeError(name)
-
-
-class KeystoneAuth(Urllib2HTTP):
-    def __init__(self, root_url: str, creds: Dict[str, str], headers: Dict[str, str] = None) -> None:
-        super(KeystoneAuth, self).__init__(root_url, headers)
-        admin_node_ip = urllib.parse.urlparse(root_url).hostname
-        self.keystone_url = "http://{0}:5000/v2.0".format(admin_node_ip)
-        self.keystone = keystoneclient(
-            auth_url=self.keystone_url, **creds)
-        self.refresh_token()
-
-    def refresh_token(self) -> None:
-        """Get new token from keystone and update headers"""
-        try:
-            self.keystone.authenticate()
-            self.headers['X-Auth-Token'] = self.keystone.auth_token
-        except exceptions.AuthorizationFailure:
-            logger.warning(
-                'Cant establish connection to keystone with url %s',
-                self.keystone_url)
-
-    def do(self, method: str, path: str, params: Dict[str, str] = None) -> Any:
-        """Do request. If gets 401 refresh token"""
-        try:
-            return super(KeystoneAuth, self).do(method, path, params)
-        except urllib.error.HTTPError as e:
-            if e.code == 401:
-                logger.warning(
-                    'Authorization failure: {0}'.format(e.read()))
-                self.refresh_token()
-                return super(KeystoneAuth, self).do(method, path, params)
-            else:
-                raise
-
-
-def get_inline_param_list(url: str) -> Iterator[str]:
-    format_param_rr = re.compile(r"\{([a-zA-Z_]+)\}")
-    for match in format_param_rr.finditer(url):
-        yield match.group(1)
-
-
-class RestObj:
-    name = None  # type: str
-    id = None   # type: int
-
-    def __init__(self, conn: Connection, **kwargs: Any) -> None:
-        self.__dict__.update(kwargs)
-        self.__connection__ = conn
-
-    def __str__(self) -> str:
-        res = ["{0}({1}):".format(self.__class__.__name__, self.name)]
-        for k, v in sorted(self.__dict__.items()):
-            if k.startswith('__') or k.endswith('__'):
-                continue
-            if k != 'name':
-                res.append("    {0}={1!r}".format(k, v))
-        return "\n".join(res)
-
-    def __getitem__(self, item: str) -> Any:
-        return getattr(self, item)
-
-
-def make_call(method: str, url: str) -> Callable:
-    def closure(obj: Any, entire_obj: Any = None, **data: Any) -> Any:
-        inline_params_vals = {}
-        for name in get_inline_param_list(url):
-            if name in data:
-                inline_params_vals[name] = data[name]
-                del data[name]
-            else:
-                inline_params_vals[name] = getattr(obj, name)
-        result_url = url.format(**inline_params_vals)
-
-        if entire_obj is not None:
-            if data != {}:
-                raise ValueError("Both entire_obj and data provided")
-            data = entire_obj
-        return obj.__connection__.do(method, result_url, params=data)
-    return closure
-
-
-RequestMethod = Callable[[str], Callable]
-
-
-PUT = cast(RequestMethod, partial(make_call, 'put'))  # type: RequestMethod
-GET = cast(RequestMethod, partial(make_call, 'get'))  # type: RequestMethod
-DELETE = cast(RequestMethod, partial(make_call, 'delete')) # type: RequestMethod
-
-# -------------------------------  ORM ----------------------------------------
-
-
-def get_fuel_info(url: str) -> 'FuelInfo':
-    conn = Urllib2HTTP(url)
-    return FuelInfo(conn)
-
-
-class FuelInfo(RestObj):
-
-    """Class represents Fuel installation info"""
-
-    get_nodes = GET('api/nodes')
-    get_clusters = GET('api/clusters')
-    get_cluster = GET('api/clusters/{id}')
-    get_info = GET('api/releases')
-
-    @property
-    def nodes(self) -> 'NodeList':
-        """Get all fuel nodes"""
-        return NodeList([Node(self.__connection__, **node) for node
-                         in self.get_nodes()])
-
-    @property
-    def free_nodes(self) -> 'NodeList':
-        """Get unallocated nodes"""
-        return NodeList([Node(self.__connection__, **node) for node in
-                         self.get_nodes() if not node['cluster']])
-
-    @property
-    def clusters(self) -> List['Cluster']:
-        """List clusters in fuel"""
-        return [Cluster(self.__connection__, **cluster) for cluster
-                in self.get_clusters()]
-
-    def get_version(self) -> List[int]:
-        for info in self.get_info():
-            vers = info['version'].split("-")[1].split('.')
-            return list(map(int, vers))
-        raise ValueError("No version found")
-
-
-class Node(RestObj):
-    """Represents node in Fuel"""
-
-    get_info = GET('/api/nodes/{id}')
-    get_interfaces = GET('/api/nodes/{id}/interfaces')
-
-    def get_network_data(self) -> Dict:
-        """Returns node network data"""
-        return self.get_info().get('network_data')
-
-    def get_roles(self) -> List[str]:
-        """Get node roles
-
-        Returns: (roles, pending_roles)
-        """
-        return self.get_info().get('roles')
-
-    def get_ip(self, network='public') -> netaddr.IPAddress:
-        """Get node ip
-
-        :param network: network to pick
-        """
-        nets = self.get_network_data()
-        for net in nets:
-            if net['name'] == network:
-                iface_name = net['dev']
-                for iface in self.get_info()['meta']['interfaces']:
-                    if iface['name'] == iface_name:
-                        try:
-                            return iface['ip']
-                        except KeyError:
-                            return netaddr.IPNetwork(net['ip']).ip
-        raise Exception('Network %s not found' % network)
-
-
-class NodeList(list):
-    """Class for filtering nodes through attributes"""
-    allowed_roles = ['controller', 'compute', 'cinder', 'ceph-osd', 'mongo',
-                     'zabbix-server']
-
-    def __getattr__(self, name: str) -> List[Node]:
-        if name in self.allowed_roles:
-            return [node for node in self if name in node.roles]
-        return []
-
-
-class Cluster(RestObj):
-    """Class represents Cluster in Fuel"""
-
-    get_status = GET('api/clusters/{id}')
-    get_networks = GET('api/clusters/{id}/network_configuration/neutron')
-    get_attributes = GET('api/clusters/{id}/attributes')
-    _get_nodes = GET('api/nodes?cluster_id={id}')
-
-    def __init__(self, *dt, **mp) -> None:
-        super(Cluster, self).__init__(*dt, **mp)
-        self.nodes = NodeList([Node(self.__connection__, **node) for node in
-                               self._get_nodes()])
-
-    def check_exists(self) -> bool:
-        """Check if cluster exists"""
-        try:
-            self.get_status()
-            return True
-        except urllib.error.HTTPError as err:
-            if err.code == 404:
-                return False
-            raise
-
-    def get_openrc(self) -> Dict[str, str]:
-        access = self.get_attributes()['editable']['access']
-        creds = {'username': access['user']['value'],
-                 'password': access['password']['value'],
-                 'tenant_name': access['tenant']['value']}
-
-        version = FuelInfo(self.__connection__).get_version()
-        # only HTTPS since 7.0
-        if version >= [7, 0]:
-            creds['insecure'] = "True"
-            creds['os_auth_url'] = "https://{0}:5000/v2.0".format(
-                self.get_networks()['public_vip'])
-        else:
-            creds['os_auth_url'] = "http://{0}:5000/v2.0".format(
-                self.get_networks()['public_vip'])
-        return creds
-
-    def get_nodes(self) -> Iterator[Node]:
-        for node_descr in self._get_nodes():
-            yield Node(self.__connection__, **node_descr)
-
-
-def reflect_cluster(conn: Connection, cluster_id: int) -> Cluster:
-    """Returns cluster object by id"""
-    c = Cluster(conn, id=cluster_id)
-    c.nodes = NodeList(list(c.get_nodes()))
-    return c
-
-
-def get_all_nodes(conn: Connection) -> Iterator[Node]:
-    """Get all nodes from Fuel"""
-    for node_desc in conn.get('api/nodes'):
-        yield Node(conn, **node_desc)
-
-
-def get_all_clusters(conn: Connection) -> Iterator[Cluster]:
-    """Get all clusters"""
-    for cluster_desc in conn.get('api/clusters'):
-        yield Cluster(conn, **cluster_desc)
-
-
-def get_cluster_id(conn: Connection, name: str) -> int:
-    """Get cluster id by name"""
-    for cluster in get_all_clusters(conn):
-        if cluster.name == name:
-            return cluster.id
-
-    raise ValueError("Cluster {0} not found".format(name))
-
diff --git a/wally/main.py b/wally/main.py
index 77074c6..36d40f0 100644
--- a/wally/main.py
+++ b/wally/main.py
@@ -12,7 +12,7 @@
 from yaml import load as _yaml_load
 
 YLoader = Callable[[IO], Any]
-yaml_load = None  # type: YLoader
+yaml_load: YLoader = None  # type: ignore
 
 try:
     from yaml import CLoader
@@ -44,7 +44,6 @@
 # stages
 from .ceph import DiscoverCephStage, CollectCephInfoStage
 from .openstack import DiscoverOSStage
-from .fuel import DiscoverFuelStage
 from .run_test import (CollectInfoStage, ExplicitNodesStage, SaveNodesStage,
                        RunTestsStage, ConnectStage, SleepStage, PrepareNodes,
                        LoadStoredNodesStage)
@@ -141,7 +140,6 @@
     parser.add_argument("--profile", action="store_true", help="Profile execution")
     parser.add_argument("-s", '--settings-dir', default=None,
                         help="Folder to store key/settings/history files")
-
     subparsers = parser.add_subparsers(dest='subparser_name')
 
     # ---------------------------------------------------------------------
@@ -252,7 +250,6 @@
     return [DiscoverCephStage(),
             CollectCephInfoStage(),
             DiscoverOSStage(),
-            DiscoverFuelStage(),
             ExplicitNodesStage(),
             StartSensorsStage(),
             RunTestsStage(),
@@ -267,16 +264,16 @@
         faulthandler.register(signal.SIGUSR1, all_threads=True)
 
     opts = parse_args(argv)
-    stages = []  # type: List[Stage]
+    stages: List[Stage] = []
 
     # stop mypy from telling that config & storage might be undeclared
-    config = None  # type: Config
-    storage = None  # type: IStorage
+    config: Config = None  # type: ignore
+    storage: IStorage = None  # type: ignore
 
     if opts.profile:
         import cProfile
-        pr = cProfile.Profile()
-        pr.enable()
+        pr: Optional[cProfile.Profile] = cProfile.Profile()
+        pr.enable()  # type: ignore
     else:
         pr = None
 
@@ -464,7 +461,7 @@
 
     if opts.profile:
         assert pr is not None
-        pr.disable()
+        pr.disable()  # type: ignore
         import pstats
         pstats.Stats(pr).sort_stats('tottime').print_stats(30)
 
diff --git a/wally/meta_info.py b/wally/meta_info.py
deleted file mode 100644
index 7dc3901..0000000
--- a/wally/meta_info.py
+++ /dev/null
@@ -1,55 +0,0 @@
-from typing import Any, Dict, List
-from .fuel_rest_api import KeystoneAuth, FuelInfo
-
-
-def total_lab_info(nodes: List[Dict[str, Any]]) -> Dict[str, int]:
-    lab_data = {'nodes_count': len(nodes),
-                'total_memory': 0,
-                'total_disk': 0,
-                'processor_count': 0}  # type: Dict[str, int]
-
-    for node in nodes:
-        lab_data['total_memory'] += node['memory']['total']
-        lab_data['processor_count'] += len(node['processors'])
-
-        for disk in node['disks']:
-            lab_data['total_disk'] += disk['size']
-
-    lab_data['total_memory'] /= (1024 ** 3)
-    lab_data['total_disk'] /= (1024 ** 3)
-
-    return lab_data
-
-
-def collect_lab_data(url: str, cred: Dict[str, str]) -> Dict[str, Any]:
-    finfo = FuelInfo(KeystoneAuth(url, cred))
-
-    nodes = []  # type: List[Dict[str, str]]
-    result = {}  # type: Dict[str, Any]
-
-    for node in finfo.get_nodes():  # type: ignore
-        node_info = {
-            'name': node['name'],
-            'processors': [],
-            'interfaces': [],
-            'disks': [],
-            'devices': [],
-            'memory': node['meta']['memory'].copy()
-        }
-
-        for processor in node['meta']['cpu']['spec']:
-            node_info['processors'].append(processor)
-
-        for iface in node['meta']['interfaces']:
-            node_info['interfaces'].append(iface)
-
-        for disk in node['meta']['disks']:
-            node_info['disks'].append(disk)
-
-        nodes.append(node_info)
-
-    result['nodes'] = nodes
-    result['fuel_version'] = finfo.get_version()
-    result['total_info'] = total_lab_info(nodes)
-
-    return result
diff --git a/wally/openstack.py b/wally/openstack.py
index 575edc7..1e02653 100644
--- a/wally/openstack.py
+++ b/wally/openstack.py
@@ -8,8 +8,8 @@
 from cephlib.ssh import ConnCreds
 
 from .config import ConfigBlock, Config
-from .openstack_api import (os_connect, find_vms,
-                            OSCreds, get_openstack_credentials, prepare_os, launch_vms, clear_nodes)
+from .openstack_api import (os_connect, find_vms, OSConnection,
+                            OSCreds, get_openstack_credentials_from_env, prepare_os, launch_vms, clear_nodes)
 from .test_run_class import TestRun
 from .stage import Stage, StepOrder
 from .utils import LogError, StopTestError, get_creds_openrc
@@ -41,8 +41,8 @@
     if stored is not None:
         return OSCreds(*cast(List, stored))
 
-    creds = None  # type: OSCreds
-    os_creds = None  # type: OSCreds
+    creds: OSCreds = None  # type: ignore
+    os_creds: OSCreds = None  # type: ignore
     force_insecure = False
     cfg = ctx.config
 
@@ -53,7 +53,7 @@
             if isinstance(sett, str):
                 if 'ENV' == sett:
                     logger.info("Using OS credentials from shell environment")
-                    os_creds = get_openstack_credentials()
+                    os_creds = get_openstack_credentials_from_env()
                 else:
                     logger.info("Using OS credentials from " + os_cfg['OPENRC'])
                     creds_tuple = get_creds_openrc(sett)
@@ -69,11 +69,7 @@
         if 'insecure' in os_cfg:
             force_insecure = os_cfg.get('insecure', False)
 
-    if os_creds is None and 'fuel' in cfg.clouds and 'openstack_env' in cfg.clouds['fuel'] and \
-            ctx.fuel_openstack_creds is not None:
-        logger.info("Using fuel creds")
-        creds = ctx.fuel_openstack_creds
-    elif os_creds is None:
+    if os_creds is None:
         logger.error("Can't found OS credentials")
         raise StopTestError("Can't found OS credentials", None)
 
@@ -102,8 +98,7 @@
 
     config_block = 'openstack'
 
-    # discover FUEL cluster first
-    priority = StepOrder.DISCOVER + 1
+    priority = StepOrder.DISCOVER
 
     @classmethod
     def validate(cls, conf: ConfigBlock) -> None:
@@ -120,6 +115,8 @@
 
         ensure_connected_to_openstack(ctx)
 
+        os_conn: OSConnection = ctx.os_connection  # type: ignore  # remove Optional[]
+
         cfg = ctx.config.openstack
         os_nodes_auth = cfg.auth  # type: str
         if os_nodes_auth.count(":") == 2:
@@ -131,8 +128,8 @@
             key_file = None
 
         if 'metadata' not in ctx.config.discover:
-            services = ctx.os_connection.nova.services.list()  # type: List[Any]
-            host_services_mapping = {}  # type: Dict[str, List[str]]
+            services: List[Any] = os_conn.nova.services.list()
+            host_services_mapping: Dict[str, List[str]] = {}
 
             for service in services:
                 ip = cast(str, socket.gethostbyname(service.host))
@@ -152,9 +149,8 @@
 
         private_key_path = get_vm_keypair_path(ctx.config)[0]
 
-        vm_creds = None  # type: str
         for vm_creds in cfg.get("vms", []):
-            user_name, vm_name_pattern = vm_creds.split("@", 1)
+            user_name, vm_name_pattern = vm_creds.split("@", 1)  # type: ignore
             msg = "Vm like {} lookup failed".format(vm_name_pattern)
 
             with LogError(msg):
@@ -163,7 +159,7 @@
 
                 ensure_connected_to_openstack(ctx)
 
-                for ip, vm_id in find_vms(ctx.os_connection, vm_name_pattern):
+                for ip, vm_id in find_vms(os_conn, vm_name_pattern):  # type: ignore
                     creds = ConnCreds(host=to_ip(ip), user=user_name, key_file=private_key_path)
                     info = NodeInfo(creds, {'testnode'})
                     info.os_vm_id = vm_id
@@ -195,6 +191,8 @@
         vm_image_config = ctx.config.vm_configs[vm_spawn_config.cfg_name]
 
         ensure_connected_to_openstack(ctx)
+        os_conn: OSConnection = ctx.os_connection  # type: ignore  # remove Optional[]
+
         params = vm_image_config.copy()
         params.update(vm_spawn_config)
         params.update(get_vm_keypair_path(ctx.config))
@@ -203,13 +201,13 @@
 
         if not ctx.config.openstack.get("skip_preparation", False):
             logger.info("Preparing openstack")
-            prepare_os(ctx.os_connection, params)
+            prepare_os(os_conn, params)
         else:
             logger.info("Scip openstack preparation as 'skip_preparation' is set")
 
         ctx.os_spawned_nodes_ids = []
         with ctx.get_pool() as pool:
-            for info in launch_vms(ctx.os_connection, params, pool):
+            for info in launch_vms(os_conn, params, pool):
                 info.roles.add('testnode')
                 nid = info.node_id
                 if nid in ctx.nodes_info:
@@ -225,7 +223,7 @@
         if not ctx.config.keep_vm and ctx.os_spawned_nodes_ids:
             logger.info("Removing nodes")
 
-            clear_nodes(ctx.os_connection, ctx.os_spawned_nodes_ids)
+            clear_nodes(ctx.os_connection, ctx.os_spawned_nodes_ids)  # type: ignore
             ctx.storage.rm('spawned_os_nodes')
 
             logger.info("OS spawned nodes has been successfully removed")
diff --git a/wally/openstack_api.py b/wally/openstack_api.py
index 257a7de..b1d9172 100644
--- a/wally/openstack_api.py
+++ b/wally/openstack_api.py
@@ -7,8 +7,8 @@
 import tempfile
 import subprocess
 import urllib.request
-from typing import Dict, Any, Iterable, Iterator, NamedTuple, Optional, List, Tuple
-from concurrent.futures import ThreadPoolExecutor
+from typing import Dict, Iterable, Iterator, NamedTuple, Optional, List, Tuple, Any
+from concurrent.futures import ThreadPoolExecutor, Future
 
 from keystoneauth1 import loading, session
 from novaclient.exceptions import NotFound
@@ -45,15 +45,18 @@
                       ("insecure", bool)])
 
 
-# TODO(koder): should correctly process different sources, not only env????
-def get_openstack_credentials() -> OSCreds:
+def get_openstack_credentials_from_env() -> OSCreds:
     is_insecure = os.environ.get('OS_INSECURE', 'false').lower() in ('true', 'yes')
-
-    return OSCreds(os.environ.get('OS_USERNAME'),
-                   os.environ.get('OS_PASSWORD'),
-                   os.environ.get('OS_TENANT_NAME'),
-                   os.environ.get('OS_AUTH_URL'),
-                   is_insecure)
+    try:
+        return OSCreds(os.environ['OS_USERNAME'],
+                       os.environ['OS_PASSWORD'],
+                       os.environ['OS_TENANT_NAME'],
+                       os.environ['OS_AUTH_URL'],
+                       is_insecure)
+    except KeyError:
+        logger.error("One of openstack enviroment variable is not defined - check for " +
+                     "OS_USERNAME, OS_PASSWORD, OS_TENANT_NAME, OS_AUTH_URL")
+        raise
 
 
 class OSConnection:
@@ -100,7 +103,7 @@
             vm.pause()
 
     for future in executor.map(pause_vm, ids):
-        future.result()
+        future.result()  # type: ignore
 
 
 def unpause(conn: OSConnection, ids: Iterable[int], executor: ThreadPoolExecutor, max_resume_time=10) -> None:
@@ -116,7 +119,7 @@
         raise RuntimeError("Can't unpause vm {0}".format(vm_id))
 
     for future in executor.map(unpause, ids):
-        future.result()
+        future.result()  # type: ignore
 
 
 def prepare_os(conn: OSConnection, params: Dict[str, Any], max_vm_per_node: int = 8) -> None:
@@ -493,8 +496,8 @@
                   sec_group_size: int = None) -> List[Tuple[str, Any]]:
 
     if network_zone_name is not None:
-        network_future = executor.submit(conn.nova.networks.find,
-                                         label=network_zone_name)
+        network_future: Optional[Future] = executor.submit(conn.nova.networks.find,
+                                                           label=network_zone_name)
     else:
         network_future = None
 
@@ -505,7 +508,7 @@
         ips_future = executor.submit(get_floating_ips,
                                      conn, flt_ip_pool, amount)
         logger.debug("Wait for floating ip")
-        ips = ips_future.result()
+        ips: List[Any] = ips_future.result()
         ips += [Allocate] * (amount - len(ips))
     else:
         ips = [None] * amount
@@ -517,7 +520,7 @@
 
     if network_future is not None:
         logger.debug("Waiting for network results")
-        nics = [{'net-id': network_future.result().id}]
+        nics: Any = [{'net-id': network_future.result().id}]
     else:
         nics = None
 
@@ -528,27 +531,28 @@
     futures = []
     logger.debug("Requesting new vm's")
 
-    orig_scheduler_hints = scheduler_hints.copy()
-    group_name_template = scheduler_hints['group'].format("\\d+")
-    groups = list(get_free_server_groups(conn, group_name_template + "$"))
-    groups.sort()
+    if scheduler_hints:
+        orig_scheduler_hints = scheduler_hints.copy()  # type: ignore
+        group_name_template = scheduler_hints['group'].format("\\d+")
+        groups = list(get_free_server_groups(conn, group_name_template + "$"))
+        groups.sort()
 
-    for idx, (name, flt_ip) in enumerate(zip(names, ips), 2):
+        for idx, (name, flt_ip) in enumerate(zip(names, ips), 2):
 
-        scheduler_hints = None
-        if orig_scheduler_hints is not None and sec_group_size is not None:
-            if "group" in orig_scheduler_hints:
+            scheduler_hints = None
+            if orig_scheduler_hints is not None and sec_group_size is not None:
+                if "group" in orig_scheduler_hints:
+                    scheduler_hints = orig_scheduler_hints.copy()
+                    scheduler_hints['group'] = groups[idx // sec_group_size]
+
+            if scheduler_hints is None:
                 scheduler_hints = orig_scheduler_hints.copy()
-                scheduler_hints['group'] = groups[idx // sec_group_size]
 
-        if scheduler_hints is None:
-            scheduler_hints = orig_scheduler_hints.copy()
+            params = (conn, name, keypair_name, img, fl,
+                      nics, vol_sz, flt_ip, scheduler_hints,
+                      flt_ip_pool, [security_group])
 
-        params = (conn, name, keypair_name, img, fl,
-                  nics, vol_sz, flt_ip, scheduler_hints,
-                  flt_ip_pool, [security_group])
-
-        futures.append(executor.submit(create_vm, *params))
+            futures.append(executor.submit(create_vm, *params))
     res = [future.result() for future in futures]
     logger.debug("Done spawning")
     return res
@@ -569,7 +573,7 @@
               delete_timeout: int = 120) -> Tuple[str, Any]:
 
     # make mypy/pylint happy
-    srv = None  # type: Any
+    srv: Any = None
     for i in range(max_retry):
         srv = conn.nova.servers.create(name, flavor=flavor, image=img, nics=nics, key_name=keypair_name,
                                        scheduler_hints=scheduler_hints, security_groups=security_groups)
@@ -614,11 +618,12 @@
                 return srv.id in ids
 
         volumes_to_delete = []
-        for vol in conn.cinder.volumes.list():
-            for attachment in vol.attachments:
-                if attachment['server_id'] in ids:
-                    volumes_to_delete.append(vol)
-                    break
+        if ids:
+            for vol in conn.cinder.volumes.list():
+                for attachment in vol.attachments:
+                    if attachment['server_id'] in ids:
+                        volumes_to_delete.append(vol)
+                        break
 
         still_alive = set()
         for srv in conn.nova.servers.list():
diff --git a/wally/pretty_yaml.py b/wally/pretty_yaml.py
index 7cd0f3a..05dfe59 100644
--- a/wally/pretty_yaml.py
+++ b/wally/pretty_yaml.py
@@ -1,7 +1,7 @@
 __doc__ = "functions for make pretty yaml files"
 __all__ = ['dumps']
 
-from typing import Any, Iterable, List
+from typing import Any, Iterable, List, Optional
 
 
 def dumps_simple(val: Any) -> str:
@@ -49,10 +49,8 @@
 
     if isinstance(data, (list, tuple)):
         if all(map(is_simple, data)):
-            if all_nums(data):
-                one_line = "[{0}]".format(", ".join(map(dumps_simple, data)))
-            else:
-                one_line = "[{0}]".format(",".join(map(dumps_simple, data)))
+            join_str = ", " if all_nums(data) else ","
+            one_line: Optional[str] = "[" + join_str.join(map(dumps_simple, data)) + "]"
         elif len(data) == 0:
             one_line = "[]"
         else:
@@ -76,9 +74,7 @@
 
             one_line = None
             if all(map(is_simple, data.values())):
-                one_line = ", ".join(
-                    "{0}: {1}".format(dumps_simple(k), dumps_simple(v))
-                    for k, v in sorted(data.items()))
+                one_line = ", ".join(f"{dumps_simple(k)}: {dumps_simple(v)}" for k, v in sorted(data.items()))
                 one_line = "{" + one_line + "}"
                 if len(one_line) > width:
                     one_line = None
diff --git a/wally/report.py b/wally/report.py
index 092f8d8..d6a42ee 100644
--- a/wally/report.py
+++ b/wally/report.py
@@ -171,20 +171,20 @@
     NO_VAL = -1
 
     def __init__(self) -> None:
-        self.rw_iops_10ms = self.NO_VAL  # type: int
-        self.rw_iops_30ms = self.NO_VAL  # type: int
-        self.rw_iops_100ms = self.NO_VAL  # type: int
+        self.rw_iops_10ms = self.NO_VAL
+        self.rw_iops_30ms = self.NO_VAL
+        self.rw_iops_100ms = self.NO_VAL
 
-        self.rr_iops_10ms = self.NO_VAL  # type: int
-        self.rr_iops_30ms = self.NO_VAL  # type: int
-        self.rr_iops_100ms = self.NO_VAL  # type: int
+        self.rr_iops_10ms = self.NO_VAL
+        self.rr_iops_30ms = self.NO_VAL
+        self.rr_iops_100ms = self.NO_VAL
 
-        self.bw_write_max = self.NO_VAL  # type: int
-        self.bw_read_max = self.NO_VAL  # type: int
+        self.bw_write_max = self.NO_VAL
+        self.bw_read_max = self.NO_VAL
 
-        self.bw = None  # type: Optional[float]
-        self.read_iops = None  # type: Optional[float]
-        self.write_iops = None  # type: Optional[float]
+        self.bw: Optional[float] = None
+        self.read_iops: Optional[float] = None
+        self.write_iops: Optional[float] = None
 
 
 def get_performance_summary(storage: IWallyStorage, suite: SuiteConfig,
@@ -240,7 +240,7 @@
 
         headers = ["Mode", "Stats", "Explanation"]
         align = ['left', 'right', "left"]
-        data = []
+        data: List[Union[str, Tuple[str, str, str]]] = []
 
         if psum95.rr_iops_10ms != psum95.NO_VAL or psum95.rr_iops_30ms != psum95.NO_VAL or \
                 psum95.rr_iops_100ms != psum95.NO_VAL:
@@ -274,8 +274,9 @@
             data.append(("Read", b2ssize(psum50.bw_read_max) + psum50.bw_units,
                          "Large blocks (>={}KiB)".format(self.style.large_blocks)))
 
-        res += html.center(html.table("Performance", headers, data, align=align))
-        yield Menu1st.summary, Menu2ndSumm.summary, HTMLBlock(res)
+        if data:
+            res += html.center(html.table("Performance", headers, data, align=align))
+            yield Menu1st.summary, Menu2ndSumm.summary, HTMLBlock(res)
 
 
 # # Node load over test time
@@ -292,8 +293,8 @@
     suite_types = {'fio'}
 
     def get_divs(self, suite: SuiteConfig) -> Iterator[Tuple[str, str, HTMLBlock]]:
-        ts_map = defaultdict(list)  # type: Dict[FioJobParams, List[Tuple[SuiteConfig, FioJobConfig]]]
-        str_summary = {}  # type: Dict[FioJobParams, Tuple[str, str]]
+        ts_map: Dict[FioJobParams, List[Tuple[SuiteConfig, FioJobConfig]]] = defaultdict(list)
+        str_summary: Dict[FioJobParams, Tuple[str, str]] = {}
 
         for job in self.rstorage.iter_job(suite):
             fjob = cast(FioJobConfig, job)
@@ -326,7 +327,7 @@
     suite_types = {'fio'}
 
     def get_divs(self, suite: SuiteConfig) -> Iterator[Tuple[str, str, HTMLBlock]]:
-        qd_grouped_jobs = {}  # type: Dict[FioJobParams, List[FioJobConfig]]
+        qd_grouped_jobs: Dict[FioJobParams, List[FioJobConfig]] = {}
         test_nc = len(list(find_nodes_by_roles(self.rstorage.storage, ['testnode'])))
         for job in self.rstorage.iter_job(suite):
             fjob = cast(FioJobConfig, job)
@@ -350,7 +351,8 @@
             if len(cpu_usage2qd) < StyleProfile.min_iops_vs_qd_jobs:
                 continue
 
-            labels, vals, errs = zip(*((l, avg, dev) for l, (_, avg, dev) in sorted(cpu_usage2qd.items())))
+            labels, vals, errs = zip(*((l, avg, dev)
+                                     for l, (_, avg, dev) in sorted(cpu_usage2qd.items()))) # type: ignore
 
             if test_nc == 1:
                 labels = list(map(str, labels))
@@ -734,7 +736,6 @@
                     ts = self.rstorage.get_sensor(ds, job.reliable_info_range_s)
                     bn += (ts.data > bn_val).sum()
                     tot += len(ts.data)
-            print(node_id, bn, tot)
 
         yield Menu1st.per_job, job.summary, HTMLBlock("")
 
@@ -761,23 +762,6 @@
             yield Menu1st.per_job, job.summary, HTMLBlock(html.img(fname))
 
 
-class DevRoles:
-    client_disk = 'client_disk'
-    client_net = 'client_net'
-    client_cpu = 'client_cpu'
-
-    storage_disk = 'storage_disk'
-    storage_client_net = 'storage_client_net'
-    storage_replication_net = 'storage_replication_net'
-    storage_cpu = 'storage_disk'
-    ceph_storage = 'ceph_storage'
-    ceph_journal = 'ceph_journal'
-
-    compute_disk = 'compute_disk'
-    compute_net = 'compute_net'
-    compute_cpu = 'compute_cpu'
-
-
 def roles_for_sensors(storage: IWallyStorage) -> Dict[str, List[DataSource]]:
     role2ds = defaultdict(list)
 
@@ -785,19 +769,19 @@
         ds = DataSource(node_id=node.node_id)
         if 'ceph-osd' in node.roles:
             for jdev in node.params.get('ceph_journal_devs', []):
-                role2ds[DevRoles.ceph_journal].append(ds(dev=jdev))
-                role2ds[DevRoles.storage_disk].append(ds(dev=jdev))
+                role2ds[DevRoles.osd_journal].append(ds(dev=jdev))
+                role2ds[DevRoles.storage_block].append(ds(dev=jdev))
 
             for sdev in node.params.get('ceph_storage_devs', []):
-                role2ds[DevRoles.ceph_storage].append(ds(dev=sdev))
-                role2ds[DevRoles.storage_disk].append(ds(dev=sdev))
+                role2ds[DevRoles.osd_storage].append(ds(dev=sdev))
+                role2ds[DevRoles.storage_block].append(ds(dev=sdev))
 
             if node.hw_info:
                 for dev in node.hw_info.disks_info:
-                    role2ds[DevRoles.storage_disk].append(ds(dev=dev))
+                    role2ds[DevRoles.storage_block].append(ds(dev=dev))
 
         if 'testnode' in node.roles:
-            role2ds[DevRoles.client_disk].append(ds(dev='rbd0'))
+            role2ds[DevRoles.client_block].append(ds(dev='rbd0'))
 
     return role2ds
 
@@ -812,7 +796,7 @@
         trange = (job.reliable_info_range[0] // 1000, job.reliable_info_range[1] // 1000)
         test_nc = len(list(find_nodes_by_roles(self.rstorage.storage, ['testnode'])))
 
-        for dev_role in (DevRoles.ceph_storage, DevRoles.ceph_journal, DevRoles.client_disk):
+        for dev_role in (DevRoles.osd_storage, DevRoles.osd_journal, DevRoles.client_block):
 
             caption = "{} IO heatmaps - {}".format(dev_role.capitalize(), cast(FioJobParams, job).params.long_summary)
             if test_nc != 1:
diff --git a/wally/resources.py b/wally/resources.py
index 4074efb..2d563b9 100644
--- a/wally/resources.py
+++ b/wally/resources.py
@@ -1,5 +1,5 @@
 import logging
-from typing import Tuple, Dict, cast, List
+from typing import Tuple, Dict, cast, List, Optional, Union
 
 import numpy
 
@@ -148,15 +148,13 @@
                         rstorage: IWallyStorage,
                         large_block: int = 256,
                         hist_boxes: int = 10,
-                        nc: bool = False) -> Tuple[Dict[str, Tuple[str, float, float]], bool]:
+                        nc: bool = False) -> Tuple[Dict[str, Tuple[str, Optional[float], Optional[float]]], bool]:
 
-    records = {}  # type: Dict[str, Tuple[str, float, float]]
     if not nc:
-        records = rstorage.get_job_info(suite, job, WallyDB.resource_usage_rel)
-        if records is not None:
-            records = records.copy()
-            iops_ok = records.pop('iops_ok')
-            return records, iops_ok
+        jinfo = rstorage.get_job_info(suite, job, WallyDB.resource_usage_rel)
+        if jinfo is not None:
+            jinfo = jinfo.copy()
+            return jinfo, jinfo.pop('iops_ok')  # type: ignore
 
     fjob = cast(FioJobConfig, job)
     iops_ok = fjob.bsize < large_block
@@ -166,7 +164,7 @@
     tot_io_coef = unit_conversion_coef_f(io_sum.bw.units, "Bps")
     io_transfered = io_sum.bw.data * tot_io_coef
 
-    records = {
+    records: Dict[str, Tuple[str, Optional[float], Optional[float]]] = {
         ResourceNames.data_tr: (b2ssize(io_transfered.sum()) + "B", None, None)
     }
 
@@ -215,7 +213,7 @@
 
         avg, dev = avg_dev_div(data, service_provided_count)
         if avg < 0.1:
-            dev = None
+            dev = None # type: ignore
         records[vname] = (ffunc(data.sum()) + units, avg, dev)
         all_agg[vname] = data
 
@@ -266,7 +264,7 @@
             agg = all_agg[name1] + all_agg[name2]
             avg, dev = avg_dev_div(agg, service_provided_masked)
             if avg < 0.1:
-                dev = None
+                dev = None  # type: ignore
             records[vname] = (ffunc(agg.sum()) + units, avg, dev)
 
     if not nc:
@@ -276,7 +274,7 @@
             records[name] = v1, toflt(v2), toflt(v3)
 
         srecords = records.copy()
-        srecords['iops_ok'] = iops_ok
+        srecords['iops_ok'] = iops_ok  # type: ignore
         rstorage.put_job_info(suite, job, WallyDB.resource_usage_rel, srecords)
 
     return records, iops_ok
diff --git a/wally/result_classes.py b/wally/result_classes.py
index 9d59d42..f2b84e6 100644
--- a/wally/result_classes.py
+++ b/wally/result_classes.py
@@ -59,20 +59,6 @@
 JobStatMetrics = Dict[Tuple[str, str, str], StatProps]
 
 
-class JobResult:
-    """Contains done test job information"""
-
-    def __init__(self,
-                 info: JobConfig,
-                 begin_time: int,
-                 end_time: int,
-                 raw: JobMetrics) -> None:
-        self.info = info
-        self.run_interval = (begin_time, end_time)
-        self.raw = raw  # type: JobMetrics
-        self.processed = None  # type: JobStatMetrics
-
-
 class IWallyStorage(ISensorStorage, IImagesStorage, metaclass=abc.ABCMeta):
 
     @abc.abstractmethod
diff --git a/wally/result_storage.py b/wally/result_storage.py
index de2f86d..88b8323 100644
--- a/wally/result_storage.py
+++ b/wally/result_storage.py
@@ -2,7 +2,7 @@
 import json
 import pprint
 import logging
-from typing import cast, Iterator, Tuple, Type, Optional, Any, Union, List
+from typing import cast, Iterator, Type, Optional, Any, List
 
 import numpy
 
@@ -17,7 +17,7 @@
 from .utils import StopTestError
 from .suits.all_suits import all_suits
 
-from cephlib.storage import Storage
+from cephlib.storage import IStorage
 
 logger = logging.getLogger('wally')
 
@@ -30,7 +30,7 @@
 
 
 class WallyStorage(IWallyStorage, SensorStorage):
-    def __init__(self, storage: Storage) -> None:
+    def __init__(self, storage: IStorage) -> None:
         SensorStorage.__init__(self, storage, WallyDB)
 
     def flush(self) -> None:
diff --git a/wally/run_test.py b/wally/run_test.py
index 578a65b..9757322 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -37,7 +37,8 @@
                     return True, setup_rpc(ssh_node,
                                            ctx.rpc_code,
                                            ctx.default_rpc_plugins,
-                                           log_level=ctx.config.rpc_log_level)
+                                           log_level=ctx.config.rpc_log_level,
+                                           sudo=True)
                 except Exception as exc:
                     logger.exception("During connect to %s: %s", node_info, exc)
                     return False, node_info
@@ -100,9 +101,11 @@
             for node in ctx.nodes:
                 node.conn.cli.killall()
 
+            if ctx.ceph_master_node:
+                ctx.ceph_master_node.conn.cli.killall()
+
             logger.info("Downloading RPC servers logs")
             for node in ctx.nodes:
-                node.conn.cli.killall()
                 if node.rpc_log_file is not None:
                     nid = node.node_id
                     path = WallyDB.rpc_logs.format(node_id=nid)
@@ -116,7 +119,8 @@
 
         logger.info("Disconnecting")
         with ctx.get_pool() as pool:
-            list(pool.map(lambda node: node.disconnect(stop=True), ctx.nodes))
+            list(pool.map(lambda node: node.disconnect(stop=True),
+                          ctx.nodes + ([ctx.ceph_master_node] if ctx.ceph_master_node else [])))
 
 
 class CollectInfoStage(Stage):
@@ -268,7 +272,7 @@
 
             test_cls(storage=ctx.rstorage,
                      suite=suite,
-                     on_idle=lambda: collect_sensors_data(ctx, False)).run()
+                     on_tests_boundry=lambda before_test: collect_sensors_data(ctx, False, before_test)).run()
 
     @classmethod
     def validate_config(cls, cfg: ConfigBlock) -> None:
diff --git a/wally/sensors.py b/wally/sensors.py
index 9fb2177..0aca82e 100644
--- a/wally/sensors.py
+++ b/wally/sensors.py
@@ -1,12 +1,14 @@
 import bz2
+import time
 import array
 import logging
-from typing import Dict
+from typing import Dict, Tuple, Optional, Any
 
 import numpy
 
 from cephlib import sensors_rpc_plugin
 from cephlib.units import b2ssize
+from cephlib.wally_storage import WallyDB
 
 from . import utils
 from .test_run_class import TestRun
@@ -85,37 +87,75 @@
                 logger.debug("Skip monitoring node %s, as no sensors selected", nid)
 
 
-def collect_sensors_data(ctx: TestRun, stop: bool = False):
+def collect_sensors_data(ctx: TestRun,
+                         stop: bool = False,
+                         before_test: bool = False):
     total_sz = 0
 
-    logger.info("Start loading sensors")
-    for node in ctx.nodes:
-        node_id = node.node_id
-        if node_id in ctx.sensors_run_on:
-            func = node.conn.sensors.stop if stop else node.conn.sensors.get_updates
+    # ceph pg and pool data collected separatelly
+    cluster_metrics = getattr(ctx.config.sensors, 'cluster', [])
 
-            # hack to calculate total transferred size
-            offset_map, compressed_blob, compressed_collected_at_b = func()
-            data_tpl = (offset_map, compressed_blob, compressed_collected_at_b)
+    pgs_io = 'ceph-pgs-io' in cluster_metrics
+    pools_io = 'ceph-pools-io' in cluster_metrics
 
-            total_sz += len(compressed_blob) + len(compressed_collected_at_b) + sum(map(len, offset_map)) + \
-                16 * len(offset_map)
+    if pgs_io or pools_io:
+        assert ctx.ceph_master_node is not None
 
-            for path, value, is_array, units in sensors_rpc_plugin.unpack_rpc_updates(data_tpl):
-                if path == 'collected_at':
-                    ds = DataSource(node_id=node_id, metric='collected_at', tag='csv')
-                    ctx.rstorage.append_sensor(numpy.array(value), ds, units)
-                else:
-                    sensor, dev, metric = path.split(".")
-                    ds = DataSource(node_id=node_id, metric=metric, dev=dev, sensor=sensor, tag='csv')
-                    if is_array:
+        def collect() -> Tuple[Optional[Any], Optional[Any]]:
+            pg_dump = ctx.ceph_master_node.run(f"ceph {ctx.ceph_extra_args} pg dump --format json") if pgs_io else None
+            pools_dump = ctx.ceph_master_node.run(f"rados {ctx.ceph_extra_args} df --format json") if pools_io else None
+            return pg_dump, pools_dump
+        future = ctx.get_pool().submit(collect)
+    else:
+        future = None
+
+    ctime = int(time.time())
+
+    if not before_test:
+        logger.info("Start loading sensors")
+        for node in ctx.nodes:
+            node_id = node.node_id
+            if node_id in ctx.sensors_run_on:
+                func = node.conn.sensors.stop if stop else node.conn.sensors.get_updates
+
+                # hack to calculate total transferred size
+                offset_map, compressed_blob, compressed_collected_at_b = func()
+                data_tpl = (offset_map, compressed_blob, compressed_collected_at_b)
+
+                total_sz += len(compressed_blob) + len(compressed_collected_at_b) + sum(map(len, offset_map)) + \
+                    16 * len(offset_map)
+
+                for path, value, is_array, units in sensors_rpc_plugin.unpack_rpc_updates(data_tpl):
+                    if path == 'collected_at':
+                        ds = DataSource(node_id=node_id, metric='collected_at', tag='csv')
                         ctx.rstorage.append_sensor(numpy.array(value), ds, units)
                     else:
-                        if metric == 'historic':
-                            ctx.rstorage.put_sensor_raw(bz2.compress(value), ds(tag='bin'))
+                        sensor, dev, metric = path.split(".")
+                        ds = DataSource(node_id=node_id, metric=metric, dev=dev, sensor=sensor, tag='csv')
+                        if is_array:
+                            ctx.rstorage.append_sensor(numpy.array(value), ds, units)
                         else:
-                            assert metric in ('perf_dump', 'historic_js')
-                            ctx.rstorage.put_sensor_raw(value, ds(tag='js'))
+                            if metric == 'historic':
+                                value = bz2.compress(value)
+                                tag = 'bz2'
+                            else:
+                                assert metric == 'perf_dump'
+                                tag = 'txt'
+                            ctx.storage.put_raw(value, WallyDB.ceph_metric(node_id=node_id,
+                                                                           metric=metric,
+                                                                           time=ctime,
+                                                                           tag=tag))
+
+    if future:
+        pgs_info, pools_info = future.result()
+        if pgs_info:
+            total_sz += len(pgs_info)
+            ctx.storage.put_raw(bz2.compress(pgs_info.encode('utf8')), WallyDB.pgs_io.format(time=ctime))
+
+        if pools_info:
+            total_sz += len(pools_info)
+            ctx.storage.put_raw(bz2.compress(pools_info.encode('utf8')), WallyDB.pools_io.format(time=ctime))
+
     logger.info("Download %sB of sensors data", b2ssize(total_sz))
 
 
@@ -125,5 +165,5 @@
     config_block = 'sensors'
 
     def run(self, ctx: TestRun) -> None:
-        collect_sensors_data(ctx, True)
+        collect_sensors_data(ctx, True, False)
 
diff --git a/wally/stage.py b/wally/stage.py
index 14e80ae..92cd557 100644
--- a/wally/stage.py
+++ b/wally/stage.py
@@ -17,8 +17,8 @@
 
 
 class Stage(metaclass=abc.ABCMeta):
-    priority = None  # type: int
-    config_block = None  # type: Optional[str]
+    priority: int = None  # type: ignore
+    config_block: Optional[str] = None
 
     @classmethod
     def name(cls) -> str:
diff --git a/wally/suits/io/defaults_qd.cfg b/wally/suits/io/defaults_qd.cfg
index c3dee19..ee10055 100644
--- a/wally/suits/io/defaults_qd.cfg
+++ b/wally/suits/io/defaults_qd.cfg
@@ -21,5 +21,6 @@
 write_bw_log=fio_bw_log
 log_avg_msec=1000
 write_hist_log=fio_lat_hist_log
+log_hist_coarseness=0
 log_hist_msec=1000
 log_unix_epoch=1
diff --git a/wally/suits/io/fio.py b/wally/suits/io/fio.py
index 374e5a4..693a547 100644
--- a/wally/suits/io/fio.py
+++ b/wally/suits/io/fio.py
@@ -1,6 +1,6 @@
 import os.path
 import logging
-from typing import cast, Any, List, Union, Tuple, Optional
+from typing import cast, Any, List, Union
 
 import numpy
 
@@ -14,7 +14,7 @@
 from ..job import JobConfig
 from .fio_task_parser import execution_time, fio_cfg_compile, FioJobConfig, FioParams, get_log_files
 from . import rpc_plugin
-from .fio_hist import get_lat_vals, expected_lat_bins
+from .fio_hist import get_lat_vals
 
 
 logger = logging.getLogger("wally")
@@ -209,7 +209,7 @@
                 raise StopTestError()
 
             # TODO: fix units, need to get array type from stream
-
+            open("/tmp/tt", 'wb').write(raw_result)
             parsed = []  # type: List[Union[List[int], int]]
             times = []
 
@@ -223,11 +223,11 @@
                         if name == 'lat':
                             vals = [int(i.strip()) for i in rest]
 
-                            if len(vals) != expected_lat_bins:
-                                msg = "Expect {} bins in latency histogram, but found {} at time {}" \
-                                             .format(expected_lat_bins, len(vals), time_ms_s)
-                                logger.error(msg)
-                                raise StopTestError(msg)
+                            # if len(vals) != expected_lat_bins:
+                            #     msg = f"Expect {expected_lat_bins} bins in latency histogram, " + \
+                            #           f"but found {len(vals)} at time {time_ms_s}"
+                            #     logger.error(msg)
+                            #     raise StopTestError(msg)
 
                             parsed.append(vals)
                         else:
@@ -238,7 +238,7 @@
 
             assert not self.suite.keep_raw_files, "keep_raw_files is not supported"
 
-            histo_bins = None if name != 'lat' else numpy.array(get_lat_vals())
+            histo_bins = None if name != 'lat' else numpy.array(get_lat_vals(len(parsed[0])))
             ts = TimeSeries(data=numpy.array(parsed, dtype='uint64'),
                             units=units,
                             times=numpy.array(times, dtype='uint64'),
diff --git a/wally/suits/io/fio_hist.py b/wally/suits/io/fio_hist.py
index a2ded70..fc32d0d 100644
--- a/wally/suits/io/fio_hist.py
+++ b/wally/suits/io/fio_hist.py
@@ -1,9 +1,6 @@
 from typing import List
 
 
-expected_lat_bins = 1216
-
-
 #----------------------------  FIO HIST LOG PARSE CODE -----------------------------------------------------------------
 
 # Copy-paste from fio/tools/hist/fiologparser_hist.py.
@@ -52,6 +49,12 @@
     return lower + (upper - lower) * edge
 
 
-def get_lat_vals(columns: int = expected_lat_bins, coarseness: int = 0) -> List[float]:
-    return [plat_idx_to_val_coarse(val, coarseness) for val in range(columns)]
+def get_lat_vals(columns: int, coarseness: int = 0) -> List[float]:
+    # convert ns to ms
+    if columns == 1216:
+        coef = 1
+    elif columns == 1856:
+        coef = 1000
+
+    return [plat_idx_to_val_coarse(val, coarseness) / coef for val in range(columns)]
 
diff --git a/wally/suits/io/fio_job.py b/wally/suits/io/fio_job.py
index 6676895..9dffb49 100644
--- a/wally/suits/io/fio_job.py
+++ b/wally/suits/io/fio_job.py
@@ -1,13 +1,10 @@
 import copy
 from collections import OrderedDict
-from typing import Optional, Iterator, Union, Dict, Tuple, NamedTuple, Any, cast
+from typing import Optional, Iterator, Union, Dict, Tuple, Any, cast
 
 from cephlib.units import ssize2b, b2ssize
 
-from ..job import JobConfig, JobParams
-
-
-Var = NamedTuple('Var', [('name', str)])
+from ..job import JobConfig, JobParams, Var
 
 
 def is_fio_opt_true(vl: Union[str, int]) -> bool:
@@ -40,7 +37,7 @@
     @property
     def summary(self) -> str:
         """Test short summary, used mostly for file names and short image description"""
-        res = "{0[oper_short]}{0[sync_mode]}{0[bsize]}".format(self)
+        res = f"{self['oper_short']}{self['sync_mode']}{self['bsize']}"
         if self['qd'] is not None:
             res += "_qd" + str(self['qd'])
         if self['thcount'] not in (1, None):
@@ -52,13 +49,13 @@
     @property
     def long_summary(self) -> str:
         """Readable long summary for management and deployment engineers"""
-        res = "{0[oper]}, {0.sync_mode_long}, block size {1}B".format(self, b2ssize(self['bsize'] * 1024))
+        res = f"{self['oper']}, {self.sync_mode_long}, block size {b2ssize(self['bsize'] * 1024)}B"
         if self['qd'] is not None:
             res += ", QD = " + str(self['qd'])
         if self['thcount'] not in (1, None):
-            res += ", threads={0[thcount]}".format(self)
+            res += f", threads={self['thcount']}"
         if self['write_perc'] is not None:
-            res += ", write_perc={0[write_perc]}%".format(self)
+            res += f", fwrite_perc={self['write_perc']}%"
         return res
 
     def copy(self, **kwargs: Dict[str, Any]) -> 'FioJobParams':
@@ -89,24 +86,24 @@
     def __init__(self, name: str, idx: int) -> None:
         JobConfig.__init__(self, idx)
         self.name = name
-        self._sync_mode = None  # type: Optional[str]
-        self._params = None  # type: Optional[Dict[str, Any]]
+        self._sync_mode: Optional[str] = None
+        self._params: Optional[Dict[str, Any]] = None
 
     # ------------- BASIC PROPERTIES -----------------------------------------------------------------------------------
 
     @property
     def write_perc(self) -> Optional[int]:
         try:
-            return int(self.vals["rwmixwrite"])
+            return int(self.vals["rwmixwrite"])  # type: ignore
         except (KeyError, TypeError):
             try:
-                return 100 - int(self.vals["rwmixread"])
+                return 100 - int(self.vals["rwmixread"])  # type: ignore
             except (KeyError, TypeError):
                 return None
 
     @property
     def qd(self) -> int:
-        return int(self.vals.get('iodepth', '1'))
+        return int(self.vals.get('iodepth', '1'))  # type: ignore
 
     @property
     def bsize(self) -> int:
@@ -117,7 +114,7 @@
     @property
     def oper(self) -> str:
         vl = self.vals['rw']
-        return vl if ':' not in vl else vl.split(":")[0]
+        return vl if ':' not in vl else vl.split(":")[0]  # type: ignore
 
     @property
     def op_type_short(self) -> str:
@@ -125,14 +122,14 @@
 
     @property
     def thcount(self) -> int:
-        return int(self.vals.get('numjobs', 1))
+        return int(self.vals.get('numjobs', 1))  # type: ignore
 
     @property
     def sync_mode(self) -> str:
         if self._sync_mode is None:
-            direct = is_fio_opt_true(self.vals.get('direct', '0')) or \
-                     not is_fio_opt_true(self.vals.get('buffered', '0'))
-            sync = is_fio_opt_true(self.vals.get('sync', '0'))
+            direct = is_fio_opt_true(self.vals.get('direct', '0'))  # type: ignore
+            direct = direct or not is_fio_opt_true(self.vals.get('buffered', '0'))  # type: ignore
+            sync = is_fio_opt_true(self.vals.get('sync', '0'))  # type: ignore
             self._sync_mode = self.ds2mode[(sync, direct)]
         return cast(str, self._sync_mode)
 
diff --git a/wally/suits/io/fio_task_parser.py b/wally/suits/io/fio_task_parser.py
index 5b91885..a9e13dc 100644
--- a/wally/suits/io/fio_task_parser.py
+++ b/wally/suits/io/fio_task_parser.py
@@ -1,9 +1,8 @@
 #!/usr/bin/env python3
 
 import re
-import os
 import sys
-import os.path
+import pathlib
 import argparse
 import itertools
 from typing import Optional, Iterator, Union, Dict, Iterable, List, Tuple, NamedTuple, Any
@@ -104,34 +103,30 @@
 def fio_config_parse(lexer_iter: Iterable[CfgLine]) -> Iterator[FioJobConfig]:
     in_globals = False
     curr_section = None
-    glob_vals = OrderedDict()  # type: Dict[str, Any]
+    glob_vals: Dict[str, Any] = OrderedDict()
     sections_count = 0
 
-    lexed_lines = list(lexer_iter)  # type: List[CfgLine]
+    lexed_lines: List[CfgLine] = list(lexer_iter)
     one_more = True
-    includes = {}
+    includes: Dict[str, Tuple[str, int]] = {}
 
     while one_more:
-        new_lines = []  # type: List[CfgLine]
+        new_lines: List[CfgLine] = []
         one_more = False
         for line in lexed_lines:
             fname, lineno, oline, tp, name, val = line
 
             if INCLUDE == tp:
-                if not os.path.exists(fname):
-                    dirname = '.'
-                else:
-                    dirname = os.path.dirname(fname)
-
-                new_fname = os.path.join(dirname, name)
-                includes[new_fname] = (fname, lineno)
+                fobj = pathlib.Path(fname)
+                new_fname = (fobj.parent / name) if fobj.exists() else pathlib.Path(name)
+                includes[str(new_fname)] = (fname, lineno)
 
                 try:
-                    cont = open(new_fname).read()
+                    cont = new_fname.open().read()
                 except IOError as err:
-                    raise ParseError("Error while including file {}: {}".format(new_fname, err), fname, lineno, oline)
+                    raise ParseError(f"Error while including file {new_fname}: {err}", fname, lineno, oline)
 
-                new_lines.extend(fio_config_lexer(cont, new_fname))
+                new_lines.extend(fio_config_lexer(cont, str(new_fname)))
                 one_more = True
             else:
                 new_lines.append(line)
@@ -161,7 +156,7 @@
             if in_globals:
                 glob_vals[name] = val
             elif name == name.upper():
-                raise ParseError("Param {!r} not in [global] section".format(name), fname, lineno, oline)
+                raise ParseError(f"Param {name!r} not in [global] section", fname, lineno, oline)
             elif curr_section is None:
                     raise ParseError("Data outside section", fname, lineno, oline)
             else:
@@ -172,7 +167,7 @@
 
 
 def process_cycles(sec: FioJobConfig) -> Iterator[FioJobConfig]:
-    cycles = OrderedDict()  # type: Dict[str, Any]
+    cycles: Dict[str, Any] = OrderedDict()
 
     for name, val in sec.vals.items():
         if isinstance(val, list) and name.upper() != name:
@@ -203,12 +198,12 @@
             yield new_sec
 
 
-FioParamsVal = Union[str, Var]
+FioParamsVal = Union[str, Var, int]
 FioParams = Dict[str, FioParamsVal]
 
 
 def apply_params(sec: FioJobConfig, params: FioParams) -> FioJobConfig:
-    processed_vals = OrderedDict()  # type: Dict[str, Any]
+    processed_vals: Dict[str, Any] = OrderedDict()
     processed_vals.update(params)
 
     for name, val in sec.vals.items():
@@ -251,8 +246,7 @@
     sec.vals['unified_rw_reporting'] = '1'
 
     if isinstance(sec.vals['size'], Var):
-        raise ValueError("Variable {0} isn't provided".format(
-            sec.vals['size'].name))
+        raise ValueError(f"Variable {sec.vals['size'].name} isn't provided")
 
     sz = ssize2b(sec.vals['size'])
     offset = sz * ((MAGIC_OFFSET * counter[0]) % 1.0)
@@ -266,7 +260,7 @@
 
     for vl in sec.vals.values():
         if isinstance(vl, Var):
-            raise ValueError("Variable {0} isn't provided".format(vl.name))
+            raise ValueError(f"Variable {vl.name} isn't provided")
 
     params = sec.vals.copy()
     params['UNIQ'] = 'UN{0}'.format(counter[0])
@@ -282,13 +276,11 @@
     return sec.vals.get('ramp_time', 0) + sec.vals.get('runtime', 0)
 
 
-def parse_all_in_1(source:str, fname: str = None) -> Iterator[FioJobConfig]:
+def parse_all_in_1(source:str, fname: str) -> Iterator[FioJobConfig]:
     return fio_config_parse(fio_config_lexer(source, fname))
 
 
 def get_log_files(sec: FioJobConfig, iops: bool = False) -> Iterator[Tuple[str, str, str]]:
-    res = []  # type: List[Tuple[str, str, str]]
-
     keys = [('write_bw_log', 'bw', 'KiBps'),
             ('write_hist_log', 'lat', 'us')]
     if iops:
@@ -304,8 +296,8 @@
     test_params = test_params.copy()
 
     if 'RAMPTIME' not in test_params and 'RUNTIME' in test_params:
-        ramp = int(int(test_params['RUNTIME']) * 0.05)
-        test_params['RAMPTIME'] = min(30, max(5, ramp))
+        ramp = int(int(test_params['RUNTIME']) * 0.05)  # type: ignore
+        test_params['RAMPTIME'] = min(30, max(5, ramp))  # type: ignore
 
     it = parse_all_in_1(source, fname)
     it = (apply_params(sec, test_params) for sec in it)
diff --git a/wally/suits/io/rrd_raw.cfg b/wally/suits/io/rrd_raw.cfg
index 2b0fc74..e7ca82d 100644
--- a/wally/suits/io/rrd_raw.cfg
+++ b/wally/suits/io/rrd_raw.cfg
@@ -1,6 +1,6 @@
 [test]
-blocksize=4k
-rw=randread
+blocksize=4m
+rw=write
 iodepth=1
 ramp_time=0
 runtime=120
@@ -17,5 +17,8 @@
 wait_for_previous=1
 per_job_logs=0
 randrepeat=0
-filename=/dev/sdb
-size=100G
\ No newline at end of file
+filename=/media/data/test.db
+size=50G
+;verify_pattern=0x00
+buffer_compress_percentage=99
+write_bw_log=/tmp/bw.non-compress.log
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index 26fd252..111e6bb 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -26,11 +26,12 @@
     retry_time = 30
     job_config_cls = None  # type: type
 
-    def __init__(self, storage: IWallyStorage, suite: SuiteConfig, on_idle: Callable[[], None] = None) -> None:
+    def __init__(self, storage: IWallyStorage, suite: SuiteConfig,
+                 on_tests_boundry: Callable[[bool], None] = None) -> None:
         self.suite = suite
         self.stop_requested = False
         self.sorted_nodes_ids = sorted(node.node_id for node in self.suite.nodes)
-        self.on_idle = on_idle
+        self.on_tests_boundry = on_tests_boundry
         self.storage = storage
 
     def request_stop(self) -> None:
@@ -55,11 +56,11 @@
     # used_max_diff = max((min_run_time * max_rel_time_diff), max_time_diff)
     max_time_diff = 5
     max_rel_time_diff = 0.05
-    load_profile_name = None  # type: str
+    load_profile_name: str = None  # type: ignore
 
     def __init__(self, *args, **kwargs) -> None:
         PerfTest.__init__(self, *args, **kwargs)
-        self.job_configs = None  # type: List[JobConfig]
+        self.job_configs: List[JobConfig] = None  # type: ignore
 
     @abc.abstractmethod
     def get_expected_runtime(self, iter_cfg: JobConfig) -> Optional[int]:
@@ -107,13 +108,12 @@
 
             if None not in run_times:
                 # +10s - is a rough estimation for additional operations per iteration
-                expected_run_time = int(sum(run_times) + 10 * len(not_in_storage))
-
+                expected_run_time: int = int(sum(run_times) + 10 * len(not_in_storage))  # type: ignore
                 exec_time_s, end_dt_s = get_time_interval_printable_info(expected_run_time)
                 logger.info("Entire test should takes around %s and finish at %s", exec_time_s, end_dt_s)
 
             for job in not_in_storage:
-                results = []  # type: List[TimeSeries]
+                results: List[TimeSeries] = []
                 for retry_idx in range(self.max_retry):
                     logger.info("Preparing job %s", job.params.summary)
 
@@ -121,8 +121,14 @@
                     wait([pool.submit(self.prepare_iteration, node, job) for node in self.suite.nodes])
 
                     expected_job_time = self.get_expected_runtime(job)
-                    exec_time_s, end_dt_s = get_time_interval_printable_info(expected_job_time)
-                    logger.info("Job should takes around %s and finish at %s", exec_time_s, end_dt_s)
+                    if expected_job_time is None:
+                        logger.info("Job execution time is unknown")
+                    else:
+                        exec_time_s, end_dt_s = get_time_interval_printable_info(expected_job_time)
+                        logger.info("Job should takes around %s and finish at %s", exec_time_s, end_dt_s)
+
+                    if self.on_tests_boundry is not None:
+                        self.on_tests_boundry(True)
 
                     jfutures = [pool.submit(self.run_iteration, node, job) for node in self.suite.nodes]
                     failed = False
@@ -132,6 +138,9 @@
                         except EnvironmentError:
                             failed = True
 
+                    if self.on_tests_boundry is not None:
+                        self.on_tests_boundry(False)
+
                     if not failed:
                         break
 
@@ -145,8 +154,8 @@
                     results = []
 
                 # per node jobs start and stop times
-                start_times = []  # type: List[int]
-                stop_times = []  # type: List[int]
+                start_times: List[int] = []
+                stop_times: List[int] = []
 
                 for ts in results:
                     self.storage.put_ts(ts)
@@ -180,8 +189,6 @@
                 self.storage.put_job(self.suite, job)
                 self.storage.sync()
 
-                if self.on_idle is not None:
-                    self.on_idle()
 
     @abc.abstractmethod
     def config_node(self, node: IRPCNode) -> None:
diff --git a/wally/suits/job.py b/wally/suits/job.py
index 8ef1093..d336807 100644
--- a/wally/suits/job.py
+++ b/wally/suits/job.py
@@ -1,10 +1,13 @@
 import abc
-from typing import Dict, Any, Tuple, cast, Union
+from typing import Dict, Any, Tuple, cast, Union, NamedTuple
 from collections import OrderedDict
 
 from cephlib.istorage import Storable
 
 
+Var = NamedTuple('Var', [('name', str)])
+
+
 class JobParams(metaclass=abc.ABCMeta):
     """Class contains all job parameters, which significantly affects job results.
     Like block size or operation type, but not file name or file size.
@@ -41,12 +44,12 @@
 
     def __eq__(self, o: object) -> bool:
         if not isinstance(o, self.__class__):
-            raise TypeError("Can't compare {!r} to {!r}".format(self.__class__.__qualname__, type(o).__qualname__))
+            raise TypeError(f"Can't compare {self.__class__.__qualname__!r} to {type(o).__qualname__!r}")
         return sorted(self.params.items()) == sorted(cast(JobParams, o).params.items())
 
     def __lt__(self, o: object) -> bool:
         if not isinstance(o, self.__class__):
-            raise TypeError("Can't compare {!r} to {!r}".format(self.__class__.__qualname__, type(o).__qualname__))
+            raise TypeError(f"Can't compare {self.__class__.__qualname__!r} to {type(o).__qualname__!r}")
         return self.char_tpl < cast(JobParams, o).char_tpl
 
     @property
@@ -63,11 +66,10 @@
         self.idx = idx
 
         # time interval, in seconds, when test was running on all nodes
-        self.reliable_info_range = None  # type: Tuple[int, int]
-
+        self.reliable_info_range: Tuple[int, int] = None  # type: ignore
 
         # all job parameters, both from suite file and config file
-        self.vals = OrderedDict()  # type: Dict[str, Any]
+        self.vals: Dict[str, Any] = OrderedDict()
 
     @property
     def reliable_info_range_s(self) -> Tuple[int, int]:
@@ -76,7 +78,7 @@
     @property
     def storage_id(self) -> str:
         """unique string, used as key in storage"""
-        return "{}_{}".format(self.summary, self.idx)
+        return f"{self.summary}_{self.idx}"
 
     @property
     @abc.abstractmethod
diff --git a/wally/test_run_class.py b/wally/test_run_class.py
index 7aed795..05585e1 100644
--- a/wally/test_run_class.py
+++ b/wally/test_run_class.py
@@ -1,4 +1,3 @@
-import collections
 from typing import List, Callable, Any, Dict, Optional, Set
 from concurrent.futures import ThreadPoolExecutor
 
@@ -9,7 +8,6 @@
 
 from .openstack_api import OSCreds, OSConnection
 from .config import Config
-from .fuel_rest_api import Connection
 from .result_classes import IWallyStorage
 
 
@@ -17,29 +15,29 @@
     """Test run information"""
     def __init__(self, config: Config, storage: IStorage, rstorage: IWallyStorage) -> None:
         # NodesInfo list
-        self.nodes_info = {}  # type: Dict[str, NodeInfo]
+        self.nodes_info: Dict[str, NodeInfo] = {}
+
+        self.ceph_master_node: Optional[IRPCNode] = None
+        self.ceph_extra_args: Optional[str] = None
 
         # Nodes list
-        self.nodes = []  # type: List[IRPCNode]
+        self.nodes: List[IRPCNode] = []
 
-        self.build_meta = {}  # type: Dict[str,Any]
-        self.clear_calls_stack = []  # type: List[Callable[['TestRun'], None]]
+        self.build_meta: Dict[str,Any] = {}
+        self.clear_calls_stack: List[Callable[['TestRun'], None]] = []
 
         # openstack credentials
-        self.fuel_openstack_creds = None  # type: Optional[OSCreds]
-        self.fuel_version = None  # type: Optional[List[int]]
-        self.os_creds = None  # type: Optional[OSCreds]
-        self.os_connection = None  # type: Optional[OSConnection]
-        self.fuel_conn = None  # type: Optional[Connection]
-        self.rpc_code = None  # type: bytes
-        self.default_rpc_plugins = None  # type: Dict[str, bytes]
+        self.os_creds: Optional[OSCreds] = None  # type: ignore
+        self.os_connection: Optional[OSConnection] = None  # type: ignore
+        self.rpc_code: bytes = None  # type: ignore
+        self.default_rpc_plugins: Dict[str, bytes] = None  # type: ignore
 
         self.storage = storage
         self.rstorage = rstorage
         self.config = config
-        self.sensors_run_on = set()  # type: Set[str]
-        self.os_spawned_nodes_ids = None  # type: List[int]
-        self.devs_locator = []  # type: DevRolesConfig
+        self.sensors_run_on: Set[str] = set()
+        self.os_spawned_nodes_ids: List[int] = None  # type: ignore
+        self.devs_locator: DevRolesConfig = []
 
     def get_pool(self):
         return ThreadPoolExecutor(self.config.get('worker_pool_sz', 32))
