2.0 refactoring:

    * Add type for most of functions
    * Remove old fio run code, move to RPC/pluggable
    * Remove most of sensors code, will move then to RPC
    * Other refactoring
diff --git a/wally/config.py b/wally/config.py
index 332dc5e..c5d1db0 100644
--- a/wally/config.py
+++ b/wally/config.py
@@ -1,6 +1,5 @@
 import os
 import uuid
-import logging
 import functools
 
 import yaml
@@ -11,19 +10,26 @@
     def pet_generate(x, y):
         return str(uuid.uuid4())
 
-import pretty_yaml
+from . import pretty_yaml
 
 
-class NoData(object):
+class NoData:
     @classmethod
     def get(cls, name, x):
         return cls
 
 
-class Config(object):
+class Config:
     def __init__(self, val=None):
         if val is not None:
             self.update(val)
+        self.results_dir = None
+        self.run_uuid = None
+        self.settings = {}
+        self.run_params_file = None
+        self.default_test_local_folder = None
+        self.hwinfo_directory = None
+        self.hwreport_fname = None
 
     def get(self, name, defval=None):
         obj = self.__dict__
@@ -65,7 +71,6 @@
     file_name = os.path.abspath(file_name)
 
     defaults = dict(
-        sensors_remote_path='/tmp/sensors',
         testnode_log_root='/tmp/wally',
         settings={}
     )
@@ -85,7 +90,6 @@
     cfg.update(raw_cfg)
 
     results_storage = cfg.settings.get('results_storage', '/tmp')
-    print results_storage
     results_storage = os.path.abspath(results_storage)
 
     existing = file_name.startswith(results_storage)
@@ -135,84 +139,3 @@
 
     return dict(run_uuid=dt['run_uuid'],
                 comment=dt.get('comment'))
-
-
-def color_me(color):
-    RESET_SEQ = "\033[0m"
-    COLOR_SEQ = "\033[1;%dm"
-
-    color_seq = COLOR_SEQ % (30 + color)
-
-    def closure(msg):
-        return color_seq + msg + RESET_SEQ
-    return closure
-
-
-class ColoredFormatter(logging.Formatter):
-    BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8)
-
-    colors = {
-        'WARNING': color_me(YELLOW),
-        'DEBUG': color_me(BLUE),
-        'CRITICAL': color_me(YELLOW),
-        'ERROR': color_me(RED)
-    }
-
-    def __init__(self, msg, use_color=True, datefmt=None):
-        logging.Formatter.__init__(self, msg, datefmt=datefmt)
-        self.use_color = use_color
-
-    def format(self, record):
-        orig = record.__dict__
-        record.__dict__ = record.__dict__.copy()
-        levelname = record.levelname
-
-        prn_name = levelname + ' ' * (8 - len(levelname))
-        if levelname in self.colors:
-            record.levelname = self.colors[levelname](prn_name)
-        else:
-            record.levelname = prn_name
-
-        # super doesn't work here in 2.6 O_o
-        res = logging.Formatter.format(self, record)
-
-        # res = super(ColoredFormatter, self).format(record)
-
-        # restore record, as it will be used by other formatters
-        record.__dict__ = orig
-        return res
-
-
-def setup_loggers(def_level=logging.DEBUG, log_fname=None):
-    logger = logging.getLogger('wally')
-    logger.setLevel(logging.DEBUG)
-    sh = logging.StreamHandler()
-    sh.setLevel(def_level)
-
-    log_format = '%(asctime)s - %(levelname)s - %(name)-15s - %(message)s'
-    colored_formatter = ColoredFormatter(log_format, datefmt="%H:%M:%S")
-
-    sh.setFormatter(colored_formatter)
-    logger.addHandler(sh)
-
-    logger_api = logging.getLogger("wally.fuel_api")
-
-    if log_fname is not None:
-        fh = logging.FileHandler(log_fname)
-        log_format = '%(asctime)s - %(levelname)8s - %(name)-15s - %(message)s'
-        formatter = logging.Formatter(log_format, datefmt="%H:%M:%S")
-        fh.setFormatter(formatter)
-        fh.setLevel(logging.DEBUG)
-        logger.addHandler(fh)
-        logger_api.addHandler(fh)
-    else:
-        fh = None
-
-    logger_api.addHandler(sh)
-    logger_api.setLevel(logging.WARNING)
-
-    logger = logging.getLogger('paramiko')
-    logger.setLevel(logging.WARNING)
-    # logger.addHandler(sh)
-    if fh is not None:
-        logger.addHandler(fh)
diff --git a/wally/discover/ceph.py b/wally/discover/ceph.py
index 70a6edf..ae06cbc 100644
--- a/wally/discover/ceph.py
+++ b/wally/discover/ceph.py
@@ -1,39 +1,22 @@
 """ Collect data about ceph nodes"""
 import json
 import logging
-import subprocess
+from typing import Iterable
 
 
-from .node import Node
-from wally.ssh_utils import connect
+from ..node import NodeInfo, Node
 
 
-logger = logging.getLogger("io-perf-tool")
+logger = logging.getLogger("wally.discover")
 
 
-def local_execute(cmd):
-    return subprocess.check_output(cmd, stderr=subprocess.STDOUT, shell=True)
-
-
-def ssh_execute(ssh):
-    def closure(cmd):
-        _, chan, _ = ssh.exec_command(cmd)
-        return chan.read()
-    return closure
-
-
-def discover_ceph_nodes(ip):
-    """ Return list of ceph's nodes ips """
+def discover_ceph_nodes(node: Node) -> Iterable[NodeInfo]:
+    """Return list of ceph's nodes NodeInfo"""
     ips = {}
 
-    if ip != 'local':
-        executor = ssh_execute(connect(ip))
-    else:
-        executor = local_execute
-
-    osd_ips = get_osds_ips(executor, get_osds_list(executor))
-    mon_ips = get_mons_or_mds_ips(executor, "mon")
-    mds_ips = get_mons_or_mds_ips(executor, "mds")
+    osd_ips = get_osds_ips(node, get_osds_list(node))
+    mon_ips = get_mons_or_mds_ips(node, "mon")
+    mds_ips = get_mons_or_mds_ips(node, "mds")
 
     for ip in osd_ips:
         url = "ssh://%s" % ip
@@ -47,43 +30,35 @@
         url = "ssh://%s" % ip
         ips.setdefault(url, []).append("ceph-mds")
 
-    return [Node(url, list(roles)) for url, roles in ips.items()]
+    return [NodeInfo(url, set(roles)) for url, roles in ips.items()]
 
 
-def get_osds_list(executor):
-    """ Get list of osds id"""
-    return filter(None, executor("ceph osd ls").split("\n"))
+def get_osds_list(node: Node) -> Iterable[str]:
+    """Get list of osd's id"""
+    return filter(None, node.run("ceph osd ls").split("\n"))
 
 
-def get_mons_or_mds_ips(executor, who):
-    """ Return mon ip list
-        :param who - "mon" or "mds" """
-    if who not in ("mon", "mds"):
-        raise ValueError(("'%s' in get_mons_or_mds_ips instead" +
-                          "of mon/mds") % who)
+def get_mons_or_mds_ips(node: Node, who: str) -> Iterable[str]:
+    """Return mon ip list. who - mon/mds"""
+    assert who in ("mon", "mds"), \
+        "{!r} in get_mons_or_mds_ips instead of mon/mds".format(who)
 
-    line_res = executor("ceph {0} dump".format(who)).split("\n")
+    line_res = node.run("ceph {0} dump".format(who)).split("\n")
 
     ips = set()
     for line in line_res:
         fields = line.split()
-
-        # what does fields[1], fields[2] means?
-        # make this code looks like:
-        # SOME_MENINGFULL_VAR1, SOME_MENINGFULL_VAR2 = line.split()[1:3]
-
         if len(fields) > 2 and who in fields[2]:
             ips.add(fields[1].split(":")[0])
 
     return ips
 
 
-def get_osds_ips(executor, osd_list):
-    """ Get osd's ips
-        :param osd_list - list of osd names from osd ls command"""
+def get_osds_ips(node: Node, osd_list: Iterable[str]) -> Iterable[str]:
+    """Get osd's ips. osd_list - list of osd names from osd ls command"""
     ips = set()
     for osd_id in osd_list:
-        out = executor("ceph osd find {0}".format(osd_id))
+        out = node.run("ceph osd find {0}".format(osd_id))
         ip = json.loads(out)["ip"]
         ips.add(str(ip.split(":")[0]))
     return ips
diff --git a/wally/discover/discover.py b/wally/discover/discover.py
index 5b9d3fe..233650e 100644
--- a/wally/discover/discover.py
+++ b/wally/discover/discover.py
@@ -1,10 +1,14 @@
 import os.path
 import logging
 
+from paramiko import AuthenticationException
+
 from . import ceph
 from . import fuel
 from . import openstack
-from wally.utils import parse_creds
+from ..utils import parse_creds, StopTestError
+from ..test_run_class import TestRun
+from ..node import Node, NodeInfo
 
 
 logger = logging.getLogger("wally.discover")
@@ -28,12 +32,12 @@
 """
 
 
-def discover(ctx, discover, clusters_info, var_dir, discover_nodes=True):
+def discover(testrun: TestRun, discover_cfg, clusters_info, var_dir, discover_nodes=True):
+    """Discover nodes in clusters"""
     nodes_to_run = []
     clean_data = None
-    ctx.fuel_openstack_creds = None
 
-    for cluster in discover:
+    for cluster in discover_cfg:
         if cluster == "openstack" and not discover_nodes:
             logger.warning("Skip openstack cluster discovery")
         elif cluster == "openstack" and discover_nodes:
@@ -64,19 +68,30 @@
             if cluster == "fuel_openrc_only":
                 discover_nodes = False
 
-            res = fuel.discover_fuel_nodes(clusters_info['fuel'],
-                                           var_dir,
+            ssh_creds = clusters_info['fuel']['ssh_creds']
+            fuel_node = Node(NodeInfo(ssh_creds, {'fuel_master'}))
+
+            try:
+                fuel_node.connect_ssh()
+            except AuthenticationException:
+                raise StopTestError("Wrong fuel credentials")
+            except Exception:
+                logger.exception("While connection to FUEL")
+                raise StopTestError("Failed to connect to FUEL")
+
+            fuel_node.connect_rpc()
+
+            res = fuel.discover_fuel_nodes(fuel_node,
+                                           clusters_info['fuel'],
                                            discover_nodes)
             nodes, clean_data, openrc_dict, version = res
 
-            if openrc_dict is None:
-                ctx.fuel_openstack_creds = None
-            else:
+            if openrc_dict:
                 if version >= [8, 0] and openrc_dict['os_auth_url'].startswith("https://"):
                     logger.warning("Fixing FUEL 8.0 AUTH url - replace https://->http://")
                     openrc_dict['os_auth_url'] = "http" + openrc_dict['os_auth_url'][5:]
 
-                ctx.fuel_openstack_creds = {
+                testrun.fuel_openstack_creds = {
                     'name': openrc_dict['username'],
                     'passwd': openrc_dict['password'],
                     'tenant': openrc_dict['tenant_name'],
@@ -91,11 +106,11 @@
             fuel_openrc_fname = os.path.join(var_dir,
                                              env_f_name + "_openrc")
 
-            if ctx.fuel_openstack_creds is not None:
+            if testrun.fuel_openstack_creds is not None:
                 with open(fuel_openrc_fname, "w") as fd:
-                    fd.write(openrc_templ.format(**ctx.fuel_openstack_creds))
-                    msg = "Openrc for cluster {0} saves into {1}"
-                    logger.info(msg.format(env_name, fuel_openrc_fname))
+                    fd.write(openrc_templ.format(**testrun.fuel_openstack_creds))
+                msg = "Openrc for cluster {0} saves into {1}"
+                logger.info(msg.format(env_name, fuel_openrc_fname))
             nodes_to_run.extend(nodes)
 
         elif cluster == "ceph":
diff --git a/wally/discover/fuel.py b/wally/discover/fuel.py
index 4f34298..1443a9f 100644
--- a/wally/discover/fuel.py
+++ b/wally/discover/fuel.py
@@ -1,44 +1,41 @@
-import re
 import socket
 import logging
-from urlparse import urlparse
-
-import sshtunnel
-from paramiko import AuthenticationException
+from typing import Dict, Any, Tuple, List
+from urllib.parse import urlparse
 
 
-from wally.fuel_rest_api import (KeystoneAuth, get_cluster_id,
-                                 reflect_cluster, FuelInfo)
-from wally.utils import (parse_creds, check_input_param, StopTestError,
-                         clean_resource, get_ip_for_target)
-from wally.ssh_utils import (run_over_ssh, connect, set_key_for_node,
-                             read_from_remote)
-
-from .node import Node
+from .. import fuel_rest_api
+from ..utils import parse_creds, check_input_param
+from ..node import NodeInfo, Node, FuelNodeInfo
 
 
 logger = logging.getLogger("wally.discover")
-BASE_PF_PORT = 44006
 
 
-def discover_fuel_nodes(fuel_data, var_dir, discover_nodes=True):
+def discover_fuel_nodes(fuel_master_node: Node,
+                        fuel_data: Dict[str, Any],
+                        discover_nodes: bool=True) -> Tuple[List[NodeInfo], FuelNodeInfo]:
+    """Discover nodes in fuel cluster, get openrc for selected cluster"""
+
+    # parse FUEL REST credentials
     username, tenant_name, password = parse_creds(fuel_data['creds'])
     creds = {"username": username,
              "tenant_name": tenant_name,
              "password": password}
 
-    conn = KeystoneAuth(fuel_data['url'], creds, headers=None)
-
+    # connect to FUEL
+    conn = fuel_rest_api.KeystoneAuth(fuel_data['url'], creds, headers=None)
     msg = "openstack_env should be provided in fuel config"
     check_input_param('openstack_env' in fuel_data, msg)
 
-    cluster_id = get_cluster_id(conn, fuel_data['openstack_env'])
-    cluster = reflect_cluster(conn, cluster_id)
-    version = FuelInfo(conn).get_version()
+    # get cluster information from REST API
+    cluster_id = fuel_rest_api.get_cluster_id(conn, fuel_data['openstack_env'])
+    cluster = fuel_rest_api.reflect_cluster(conn, cluster_id)
+    version = fuel_rest_api.FuelInfo(conn).get_version()
 
     if not discover_nodes:
         logger.warning("Skip fuel cluster discovery")
-        return ([], None, cluster.get_openrc(), version)
+        return [], FuelNodeInfo(version, None, cluster.get_openrc())
 
     fuel_nodes = list(cluster.get_nodes())
 
@@ -46,85 +43,27 @@
 
     network = 'fuelweb_admin' if version >= [6, 0] else 'admin'
 
-    ssh_creds = fuel_data['ssh_creds']
-
     fuel_host = urlparse(fuel_data['url']).hostname
     fuel_ip = socket.gethostbyname(fuel_host)
+    fuel_ext_iface = fuel_master_node.get_interface(fuel_ip)
 
-    try:
-        ssh_conn = connect("{0}@{1}".format(ssh_creds, fuel_host))
-    except AuthenticationException:
-        raise StopTestError("Wrong fuel credentials")
-    except Exception:
-        logger.exception("While connection to FUEL")
-        raise StopTestError("Failed to connect to FUEL")
-
-    fuel_ext_iface = get_external_interface(ssh_conn, fuel_ip)
-
+    # get FUEL master key to connect to cluster nodes via ssh
     logger.debug("Downloading fuel master key")
-    fuel_key = download_master_key(ssh_conn)
+    fuel_key = fuel_master_node.get_file_content('/root/.ssh/id_rsa')
+
+    # forward ports of cluster nodes to FUEL master
+    logger.info("Forwarding ssh ports from FUEL nodes to localhost")
+    ips = [str(fuel_node.get_ip(network)) for fuel_node in fuel_nodes]
+    port_fw = [fuel_master_node.forward_port(ip, 22) for ip in ips]
+    listen_ip = fuel_master_node.get_ip()
 
     nodes = []
-    ips_ports = []
-
-    logger.info("Forwarding ssh ports from FUEL nodes to localhost")
-    fuel_usr, fuel_passwd = ssh_creds.split(":", 1)
-    ips = [str(fuel_node.get_ip(network)) for fuel_node in fuel_nodes]
-    port_fw = forward_ssh_ports(fuel_host, fuel_usr, fuel_passwd, ips)
-    listen_ip = get_ip_for_target(fuel_host)
-
     for port, fuel_node, ip in zip(port_fw, fuel_nodes, ips):
-        logger.debug(
-            "SSH port forwarding {0} => localhost:{1}".format(ip, port))
+        logger.debug("SSH port forwarding {} => {}:{}".format(ip, listen_ip, port))
+        conn_url = "ssh://root@{}:{}".format(listen_ip, port)
+        nodes.append(NodeInfo(conn_url, fuel_node['roles'], listen_ip, fuel_key))
 
-        conn_url = "ssh://root@127.0.0.1:{0}".format(port)
-        set_key_for_node(('127.0.0.1', port), fuel_key)
+    logger.debug("Found {} fuel nodes for env {}".format(len(nodes), fuel_data['openstack_env']))
 
-        node = Node(conn_url, fuel_node['roles'])
-        node.monitor_ip = listen_ip
-        nodes.append(node)
-        ips_ports.append((ip, port))
+    return nodes, FuelNodeInfo(version, fuel_ext_iface, cluster.get_openrc())
 
-    logger.debug("Found %s fuel nodes for env %r" %
-                 (len(nodes), fuel_data['openstack_env']))
-
-    return (nodes,
-            (ssh_conn, fuel_ext_iface, ips_ports),
-            cluster.get_openrc(),
-            version)
-
-
-def download_master_key(conn):
-    # download master key
-    with conn.open_sftp() as sftp:
-        return read_from_remote(sftp, '/root/.ssh/id_rsa')
-
-
-def get_external_interface(conn, ip):
-    data = run_over_ssh(conn, "ip a", node='fuel-master', nolog=True)
-    curr_iface = None
-    for line in data.split("\n"):
-
-        match1 = re.match(r"\d+:\s+(?P<name>.*?):\s\<", line)
-        if match1 is not None:
-            curr_iface = match1.group('name')
-
-        match2 = re.match(r"\s+inet\s+(?P<ip>[0-9.]+)/", line)
-        if match2 is not None:
-            if match2.group('ip') == ip:
-                assert curr_iface is not None
-                return curr_iface
-    raise KeyError("Can't found interface for ip {0}".format(ip))
-
-
-def forward_ssh_ports(proxy_ip, proxy_user, proxy_passwd, ips):
-    for ip in ips:
-        tunnel = sshtunnel.open(
-                    (proxy_ip, 22),
-                    ssh_username=proxy_user,
-                    ssh_password=proxy_passwd,
-                    threaded=True,
-                    remote_bind_address=(ip, 22))
-        tunnel.__enter__()
-        clean_resource(tunnel.__exit__, None, None, None)
-        yield tunnel.local_bind_port
diff --git a/wally/discover/node.py b/wally/discover/node.py
deleted file mode 100644
index a3d58f9..0000000
--- a/wally/discover/node.py
+++ /dev/null
@@ -1,46 +0,0 @@
-import getpass
-
-from wally.ssh_utils import parse_ssh_uri
-
-
-class Node(object):
-
-    def __init__(self, conn_url, roles):
-        self.roles = roles
-        self.conn_url = conn_url
-        self.connection = None
-        self.monitor_ip = None
-        self.os_vm_id = None
-
-    def get_ip(self):
-        if self.conn_url == 'local':
-            return '127.0.0.1'
-
-        assert self.conn_url.startswith("ssh://")
-        return parse_ssh_uri(self.conn_url[6:]).host
-
-    def get_conn_id(self):
-        if self.conn_url == 'local':
-            return '127.0.0.1'
-
-        assert self.conn_url.startswith("ssh://")
-        creds = parse_ssh_uri(self.conn_url[6:])
-        return "{0.host}:{0.port}".format(creds)
-
-    def get_user(self):
-        if self.conn_url == 'local':
-            return getpass.getuser()
-
-        assert self.conn_url.startswith("ssh://")
-        creds = parse_ssh_uri(self.conn_url[6:])
-        return creds.user
-
-    def __str__(self):
-        templ = "<Node: url={conn_url!r} roles={roles}" + \
-                " connected={is_connected}>"
-        return templ.format(conn_url=self.conn_url,
-                            roles=", ".join(self.roles),
-                            is_connected=self.connection is not None)
-
-    def __repr__(self):
-        return str(self)
diff --git a/wally/discover/openstack.py b/wally/discover/openstack.py
index 32b4629..bb128c3 100644
--- a/wally/discover/openstack.py
+++ b/wally/discover/openstack.py
@@ -1,42 +1,63 @@
 import socket
 import logging
+from typing import Iterable, Dict, Any
 
 
 from novaclient.client import Client
 
-from .node import Node
+from ..node import NodeInfo
 from wally.utils import parse_creds
 
 
-logger = logging.getLogger("io-perf-tool.discover")
+logger = logging.getLogger("wally.discover")
 
 
-def get_floating_ip(vm):
-    addrs = vm.addresses
-    for net_name, ifaces in addrs.items():
+def get_floating_ip(vm) -> str:
+    """Get VM floating IP address"""
+
+    for net_name, ifaces in vm.addresses.items():
         for iface in ifaces:
             if iface.get('OS-EXT-IPS:type') == "floating":
                 return iface['addr']
 
+    raise ValueError("VM {} has no floating ip".format(vm))
 
-def discover_vms(client, search_opts):
+
+def get_ssh_url(user: str, password: str, ip: str, key: str) -> str:
+    """Get ssh connection URL from parts"""
+
+    if password is not None:
+        assert key is None, "Both key and password provided"
+        return "ssh://{}:{}@{}".format(user, password, ip)
+    else:
+        assert key is not None, "None of key/password provided"
+        return "ssh://{}@{}::{}".format(user, ip, key)
+
+
+def discover_vms(client: Client, search_opts) -> Iterable[NodeInfo]:
+    """Discover virtual machines"""
     user, password, key = parse_creds(search_opts.pop('auth'))
 
     servers = client.servers.list(search_opts=search_opts)
     logger.debug("Found %s openstack vms" % len(servers))
-    return [Node(get_floating_ip(server), ["test_vm"], username=user,
-                 password=password, key_path=key)
-            for server in servers if get_floating_ip(server)]
+
+    nodes = []
+    for server in servers:
+        ip = get_floating_ip(server)
+        nodes.append(NodeInfo(get_ssh_url(user, password, ip, key), ["test_vm"]))
+
+    return nodes
 
 
-def discover_services(client, opts):
+def discover_services(client: Client, opts: Dict[str, Any]) -> Iterable[NodeInfo]:
+    """Discover openstack services for given cluster"""
     user, password, key = parse_creds(opts.pop('auth'))
 
     services = []
     if opts['service'] == "all":
         services = client.services.list()
     else:
-        if isinstance(opts['service'], basestring):
+        if isinstance(opts['service'], str):
             opts['service'] = [opts['service']]
 
         for s in opts['service']:
@@ -50,21 +71,25 @@
 
     logger.debug("Found %s openstack service nodes" %
                  len(host_services_mapping))
-    return [Node(host, services, username=user,
-                 password=password, key_path=key) for host, services in
-            host_services_mapping.items()]
+    nodes = []
+    for host, services in host_services_mapping.items():
+        ssh_url = get_ssh_url(user, password, host, key)
+        nodes.append(NodeInfo(ssh_url, services))
+
+    return nodes
 
 
-def discover_openstack_nodes(conn_details, conf):
+def discover_openstack_nodes(conn_details: Dict[str, str], conf: Dict[str, Any]) -> Iterable[NodeInfo]:
     """Discover vms running in openstack
-    :param conn_details - dict with openstack connection details -
+    conn_details - dict with openstack connection details -
         auth_url, api_key (password), username
+    conf - test configuration object
     """
     client = Client(version='1.1', **conn_details)
-    nodes = []
+
     if conf.get('discover'):
         services_to_discover = conf['discover'].get('nodes')
         if services_to_discover:
-            nodes.extend(discover_services(client, services_to_discover))
+            return discover_services(client, services_to_discover)
 
-    return nodes
+    return []
diff --git a/wally/fuel_rest_api.py b/wally/fuel_rest_api.py
index 728ac94..8799ed2 100644
--- a/wally/fuel_rest_api.py
+++ b/wally/fuel_rest_api.py
@@ -2,8 +2,9 @@
 import json
 import time
 import logging
-import urllib2
-import urlparse
+import urllib.request
+import urllib.parse
+from typing import Dict, Any
 from functools import partial, wraps
 
 import netaddr
@@ -15,14 +16,14 @@
 logger = logging.getLogger("wally.fuel_api")
 
 
-class Urllib2HTTP(object):
+class Urllib2HTTP:
     """
     class for making HTTP requests
     """
 
     allowed_methods = ('get', 'put', 'post', 'delete', 'patch', 'head')
 
-    def __init__(self, root_url, headers=None):
+    def __init__(self, root_url: str, headers: Dict[str, str]=None):
         """
         """
         if root_url.endswith('/'):
@@ -32,10 +33,10 @@
 
         self.headers = headers if headers is not None else {}
 
-    def host(self):
+    def host(self) -> str:
         return self.root_url.split('/')[2]
 
-    def do(self, method, path, params=None):
+    def do(self, method: str, path: str, params: Dict[Any, Any]=None):
         if path.startswith('/'):
             url = self.root_url + path
         else:
@@ -49,14 +50,14 @@
 
         logger.debug("HTTP: {0} {1}".format(method.upper(), url))
 
-        request = urllib2.Request(url,
-                                  data=data_json,
-                                  headers=self.headers)
+        request = urllib.request.Request(url,
+                                         data=data_json,
+                                         headers=self.headers)
         if data_json is not None:
             request.add_header('Content-Type', 'application/json')
 
         request.get_method = lambda: method.upper()
-        response = urllib2.urlopen(request)
+        response = urllib.request.urlopen(request)
 
         logger.debug("HTTP Responce: {0}".format(response.code))
 
@@ -70,16 +71,16 @@
 
         return json.loads(content)
 
-    def __getattr__(self, name):
+    def __getattr__(self, name: str):
         if name in self.allowed_methods:
             return partial(self.do, name)
         raise AttributeError(name)
 
 
 class KeystoneAuth(Urllib2HTTP):
-    def __init__(self, root_url, creds, headers=None):
+    def __init__(self, root_url: str, creds: Dict[str, str], headers: Dict[str, str]=None):
         super(KeystoneAuth, self).__init__(root_url, headers)
-        admin_node_ip = urlparse.urlparse(root_url).hostname
+        admin_node_ip = urllib.parse.urlparse(root_url).hostname
         self.keystone_url = "http://{0}:5000/v2.0".format(admin_node_ip)
         self.keystone = keystoneclient(
             auth_url=self.keystone_url, **creds)
@@ -95,11 +96,11 @@
                 'Cant establish connection to keystone with url %s',
                 self.keystone_url)
 
-    def do(self, method, path, params=None):
+    def do(self, method: str, path: str, params: Dict[str, str]=None):
         """Do request. If gets 401 refresh token"""
         try:
             return super(KeystoneAuth, self).do(method, path, params)
-        except urllib2.HTTPError as e:
+        except urllib.request.HTTPError as e:
             if e.code == 401:
                 logger.warning(
                     'Authorization failure: {0}'.format(e.read()))
@@ -109,13 +110,13 @@
                 raise
 
 
-def get_inline_param_list(url):
+def get_inline_param_list(url: str):
     format_param_rr = re.compile(r"\{([a-zA-Z_]+)\}")
     for match in format_param_rr.finditer(url):
         yield match.group(1)
 
 
-class RestObj(object):
+class RestObj:
     name = None
     id = None
 
@@ -136,7 +137,7 @@
         return getattr(self, item)
 
 
-def make_call(method, url):
+def make_call(method: str, url: str):
     def closure(obj, entire_obj=None, **data):
         inline_params_vals = {}
         for name in get_inline_param_list(url):
@@ -226,40 +227,6 @@
 
     get_info = GET('/api/nodes/{id}')
     get_interfaces = GET('/api/nodes/{id}/interfaces')
-    update_interfaces = PUT('/api/nodes/{id}/interfaces')
-
-    def set_network_assigment(self, mapping):
-        """Assings networks to interfaces
-        :param mapping: list (dict) interfaces info
-        """
-
-        curr_interfaces = self.get_interfaces()
-
-        network_ids = {}
-        for interface in curr_interfaces:
-            for net in interface['assigned_networks']:
-                network_ids[net['name']] = net['id']
-
-        # transform mappings
-        new_assigned_networks = {}
-
-        for dev_name, networks in mapping.items():
-            new_assigned_networks[dev_name] = []
-            for net_name in networks:
-                nnet = {'name': net_name, 'id': network_ids[net_name]}
-                new_assigned_networks[dev_name].append(nnet)
-
-        # update by ref
-        for dev_descr in curr_interfaces:
-            if dev_descr['name'] in new_assigned_networks:
-                nass = new_assigned_networks[dev_descr['name']]
-                dev_descr['assigned_networks'] = nass
-
-        self.update_interfaces(curr_interfaces, id=self.id)
-
-    def set_node_name(self, name):
-        """Update node name"""
-        self.__connection__.put('api/nodes', [{'id': self.id, 'name': name}])
 
     def get_network_data(self):
         """Returns node network data"""
@@ -308,23 +275,9 @@
 class Cluster(RestObj):
     """Class represents Cluster in Fuel"""
 
-    add_node_call = PUT('api/nodes')
-    start_deploy = PUT('api/clusters/{id}/changes')
     get_status = GET('api/clusters/{id}')
-    delete = DELETE('api/clusters/{id}')
-    get_tasks_status = GET("api/tasks?cluster_id={id}")
-    get_networks = GET(
-        'api/clusters/{id}/network_configuration/neutron')
-
-    get_attributes = GET(
-        'api/clusters/{id}/attributes')
-
-    set_attributes = PUT(
-        'api/clusters/{id}/attributes')
-
-    configure_networks = PUT(
-        'api/clusters/{id}/network_configuration/{net_provider}')
-
+    get_networks = GET('api/clusters/{id}/network_configuration/neutron')
+    get_attributes = GET('api/clusters/{id}/attributes')
     _get_nodes = GET('api/nodes?cluster_id={id}')
 
     def __init__(self, *dt, **mp):
@@ -338,17 +291,16 @@
         try:
             self.get_status()
             return True
-        except urllib2.HTTPError as err:
+        except urllib.request.HTTPError as err:
             if err.code == 404:
                 return False
             raise
 
     def get_openrc(self):
         access = self.get_attributes()['editable']['access']
-        creds = {}
-        creds['username'] = access['user']['value']
-        creds['password'] = access['password']['value']
-        creds['tenant_name'] = access['tenant']['value']
+        creds = {'username': access['user']['value'],
+                 'password': access['password']['value'],
+                 'tenant_name': access['tenant']['value']}
 
         version = FuelInfo(self.__connection__).get_version()
         # only HTTPS since 7.0
@@ -365,78 +317,6 @@
         for node_descr in self._get_nodes():
             yield Node(self.__connection__, **node_descr)
 
-    def add_node(self, node, roles, interfaces=None):
-        """Add node to cluster
-
-        :param node: Node object
-        :param roles: roles to assign
-        :param interfaces: mapping iface name to networks
-        """
-        data = {}
-        data['pending_roles'] = roles
-        data['cluster_id'] = self.id
-        data['id'] = node.id
-        data['pending_addition'] = True
-
-        logger.debug("Adding node %s to cluster..." % node.id)
-
-        self.add_node_call([data])
-        self.nodes.append(node)
-
-        if interfaces is not None:
-            networks = {}
-            for iface_name, params in interfaces.items():
-                networks[iface_name] = params['networks']
-
-            node.set_network_assigment(networks)
-
-    def wait_operational(self, timeout):
-        """Wait until cluster status operational"""
-        def wo():
-            status = self.get_status()['status']
-            if status == "error":
-                raise Exception("Cluster deploy failed")
-            return self.get_status()['status'] == 'operational'
-        with_timeout(timeout, "deploy cluster")(wo)()
-
-    def deploy(self, timeout):
-        """Start deploy and wait until all tasks finished"""
-        logger.debug("Starting deploy...")
-        self.start_deploy()
-
-        self.wait_operational(timeout)
-
-        def all_tasks_finished_ok(obj):
-            ok = True
-            for task in obj.get_tasks_status():
-                if task['status'] == 'error':
-                    raise Exception('Task execution error')
-                elif task['status'] != 'ready':
-                    ok = False
-            return ok
-
-        wto = with_timeout(timeout, "wait deployment finished")
-        wto(all_tasks_finished_ok)(self)
-
-    def set_networks(self, net_descriptions):
-        """Update cluster networking parameters"""
-        configuration = self.get_networks()
-        current_networks = configuration['networks']
-        parameters = configuration['networking_parameters']
-
-        if net_descriptions.get('networks'):
-            net_mapping = net_descriptions['networks']
-
-            for net in current_networks:
-                net_desc = net_mapping.get(net['name'])
-                if net_desc:
-                    net.update(net_desc)
-
-        if net_descriptions.get('networking_parameters'):
-            parameters.update(net_descriptions['networking_parameters'])
-
-        self.configure_networks(**configuration)
-
 
 def reflect_cluster(conn, cluster_id):
     """Returns cluster object by id"""
@@ -465,80 +345,3 @@
 
     raise ValueError("Cluster {0} not found".format(name))
 
-
-sections = {
-    'sahara': 'additional_components',
-    'murano': 'additional_components',
-    'ceilometer': 'additional_components',
-    'volumes_ceph': 'storage',
-    'images_ceph': 'storage',
-    'ephemeral_ceph': 'storage',
-    'objects_ceph': 'storage',
-    'osd_pool_size': 'storage',
-    'volumes_lvm': 'storage',
-    'volumes_vmdk': 'storage',
-    'tenant': 'access',
-    'password': 'access',
-    'user': 'access',
-    'vc_password': 'vcenter',
-    'cluster': 'vcenter',
-    'host_ip': 'vcenter',
-    'vc_user': 'vcenter',
-    'use_vcenter': 'vcenter',
-}
-
-
-def create_empty_cluster(conn, cluster_desc, debug_mode=False):
-    """Create new cluster with configuration provided"""
-
-    data = {}
-    data['nodes'] = []
-    data['tasks'] = []
-    data['name'] = cluster_desc['name']
-    data['release'] = cluster_desc['release']
-    data['mode'] = cluster_desc.get('deployment_mode')
-    data['net_provider'] = cluster_desc.get('net_provider')
-
-    params = conn.post(path='/api/clusters', params=data)
-    cluster = Cluster(conn, **params)
-
-    attributes = cluster.get_attributes()
-
-    ed_attrs = attributes['editable']
-
-    ed_attrs['common']['libvirt_type']['value'] = \
-        cluster_desc.get('libvirt_type', 'kvm')
-
-    if 'nodes' in cluster_desc:
-        use_ceph = cluster_desc['nodes'].get('ceph_osd', None) is not None
-    else:
-        use_ceph = False
-
-    if 'storage_type' in cluster_desc:
-        st = cluster_desc['storage_type']
-        if st == 'ceph':
-            use_ceph = True
-        else:
-            use_ceph = False
-
-    if use_ceph:
-        opts = ['ephemeral_ceph', 'images_ceph', 'images_vcenter']
-        opts += ['iser', 'objects_ceph', 'volumes_ceph']
-        opts += ['volumes_lvm', 'volumes_vmdk']
-
-        for name in opts:
-            val = ed_attrs['storage'][name]
-            if val['type'] == 'checkbox':
-                is_ceph = ('images_ceph' == name)
-                is_ceph = is_ceph or ('volumes_ceph' == name)
-
-                if is_ceph:
-                    val['value'] = True
-                else:
-                    val['value'] = False
-    # else:
-    #     raise NotImplementedError("Non-ceph storages are not implemented")
-
-    cluster.set_attributes(attributes)
-
-    return cluster
diff --git a/wally/hw_info.py b/wally/hw_info.py
index 73226e3..214883d 100644
--- a/wally/hw_info.py
+++ b/wally/hw_info.py
@@ -1,15 +1,17 @@
 import re
+from typing import Dict, Any, Iterable
 import xml.etree.ElementTree as ET
 
-from wally import ssh_utils, utils
+from . import utils
+from .inode import INode
 
 
-def get_data(rr, data):
+def get_data(rr: str, data: str) -> str:
     match_res = re.search("(?ims)" + rr, data)
     return match_res.group(0)
 
 
-class HWInfo(object):
+class HWInfo:
     def __init__(self):
         self.hostname = None
         self.cores = []
@@ -30,11 +32,11 @@
 
         self.storage_controllers = []
 
-    def get_HDD_count(self):
+    def get_hdd_count(self) -> Iterable[int]:
         # SATA HDD COUNT, SAS 10k HDD COUNT, SAS SSD count, PCI-E SSD count
         return []
 
-    def get_summary(self):
+    def get_summary(self) -> Dict[str, Any]:
         cores = sum(count for _, count in self.cores)
         disks = sum(size for _, size in self.disks_info.values())
 
@@ -52,15 +54,15 @@
                                   ram=utils.b2ssize(summ['ram']),
                                   disk=utils.b2ssize(summ['storage'])))
         res.append(str(self.sys_name))
-        if self.mb is not None:
+        if self.mb:
             res.append("Motherboard: " + self.mb)
 
-        if self.ram_size == 0:
+        if not self.ram_size:
             res.append("RAM: Failed to get RAM size")
         else:
             res.append("RAM " + utils.b2ssize(self.ram_size) + "B")
 
-        if self.cores == []:
+        if not self.cores:
             res.append("CPU cores: Failed to get CPU info")
         else:
             res.append("CPU cores:")
@@ -70,12 +72,12 @@
                 else:
                     res.append("    " + name)
 
-        if self.storage_controllers != []:
+        if self.storage_controllers:
             res.append("Disk controllers:")
             for descr in self.storage_controllers:
                 res.append("    " + descr)
 
-        if self.disks_info != {}:
+        if self.disks_info:
             res.append("Storage devices:")
             for dev, (model, size) in sorted(self.disks_info.items()):
                 ssize = utils.b2ssize(size) + "B"
@@ -83,14 +85,14 @@
         else:
             res.append("Storage devices's: Failed to get info")
 
-        if self.disks_raw_info != {}:
+        if self.disks_raw_info:
             res.append("Disks devices:")
             for dev, descr in sorted(self.disks_raw_info.items()):
                 res.append("    {0} {1}".format(dev, descr))
         else:
             res.append("Disks devices's: Failed to get info")
 
-        if self.net_info != {}:
+        if self.net_info:
             res.append("Net adapters:")
             for name, (speed, dtype, _) in self.net_info.items():
                 res.append("    {0} {2} duplex={1}".format(name, dtype, speed))
@@ -100,7 +102,7 @@
         return str(self.hostname) + ":\n" + "\n".join("    " + i for i in res)
 
 
-class SWInfo(object):
+class SWInfo:
     def __init__(self):
         self.partitions = None
         self.kernel_version = None
@@ -112,48 +114,29 @@
         self.ceph_version = None
 
 
-def get_sw_info(conn):
+def get_sw_info(node: INode) -> SWInfo:
     res = SWInfo()
+
     res.OS_version = utils.get_os()
-
-    with conn.open_sftp() as sftp:
-        def get(fname):
-            try:
-                return ssh_utils.read_from_remote(sftp, fname)
-            except:
-                return None
-
-        res.kernel_version = get('/proc/version')
-        res.partitions = get('/etc/mtab')
-
-    def rr(cmd):
-        try:
-            return ssh_utils.run_over_ssh(conn, cmd, nolog=True)
-        except:
-            return None
-
-    res.libvirt_version = rr("virsh -v")
-    res.qemu_version = rr("qemu-system-x86_64 --version")
-    res.ceph_version = rr("ceph --version")
+    res.kernel_version = node.get_file_content('/proc/version')
+    res.partitions = node.get_file_content('/etc/mtab')
+    res.libvirt_version = node.run("virsh -v", nolog=True)
+    res.qemu_version = node.run("qemu-system-x86_64 --version", nolog=True)
+    res.ceph_version = node.run("ceph --version", nolog=True)
 
     return res
 
 
-def get_network_info():
-    pass
-
-
-def get_hw_info(conn):
+def get_hw_info(node: INode) -> HWInfo:
     res = HWInfo()
-    lshw_out = ssh_utils.run_over_ssh(conn, 'sudo lshw -xml 2>/dev/null',
-                                      nolog=True)
+    lshw_out = node.run('sudo lshw -xml 2>/dev/null', nolog=True)
 
     res.raw = lshw_out
     lshw_et = ET.fromstring(lshw_out)
 
     try:
         res.hostname = lshw_et.find("node").attrib['id']
-    except:
+    except Exception:
         pass
 
     try:
@@ -161,7 +144,7 @@
                         lshw_et.find("node/product").text)
         res.sys_name = res.sys_name.replace("(To be filled by O.E.M.)", "")
         res.sys_name = res.sys_name.replace("(To be Filled by O.E.M.)", "")
-    except:
+    except Exception:
         pass
 
     core = lshw_et.find("node/node[@id='core']")
@@ -171,7 +154,7 @@
     try:
         res.mb = " ".join(core.find(node).text
                           for node in ['vendor', 'product', 'version'])
-    except:
+    except Exception:
         pass
 
     for cpu in core.findall("node[@class='processor']"):
@@ -183,7 +166,7 @@
             else:
                 threads = int(threads_node.attrib['value'])
             res.cores.append((model, threads))
-        except:
+        except Exception:
             pass
 
     res.ram_size = 0
@@ -201,7 +184,7 @@
                 else:
                     assert mem_sz.attrib['units'] == 'bytes'
                     res.ram_size += int(mem_sz.text)
-        except:
+        except Exception:
             pass
 
     for net in core.findall(".//node[@class='network']"):
@@ -223,7 +206,7 @@
                     dup = dup_node.attrib['value']
 
                 res.net_info[name] = (speed, dup, [])
-        except:
+        except Exception:
             pass
 
     for controller in core.findall(".//node[@class='storage']"):
@@ -240,7 +223,7 @@
                 res.storage_controllers.append(
                     "{0} {1} {2}".format(description,
                                          vendor, product))
-        except:
+        except Exception:
             pass
 
     for disk in core.findall(".//node[@class='disk']"):
@@ -268,98 +251,7 @@
 
                 businfo = disk.find('businfo').text
                 res.disks_raw_info[businfo] = full_descr
-        except:
+        except Exception:
             pass
 
     return res
-
-# import traceback
-# print ET.tostring(disk)
-# traceback.print_exc()
-
-# print get_hw_info(None)
-
-# def get_hw_info(conn):
-#     res = HWInfo(None)
-#     remote_run = functools.partial(ssh_utils.run_over_ssh, conn, nolog=True)
-
-#     # some data
-#     with conn.open_sftp() as sftp:
-#         proc_data = ssh_utils.read_from_remote(sftp, '/proc/cpuinfo')
-#         mem_data = ssh_utils.read_from_remote(sftp, '/proc/meminfo')
-
-#     # cpu info
-#     curr_core = {}
-#     for line in proc_data.split("\n"):
-#         if line.strip() == "":
-#             if curr_core != {}:
-#                 res.cores.append(curr_core)
-#                 curr_core = {}
-#         else:
-#             param, val = line.split(":", 1)
-#             curr_core[param.strip()] = val.strip()
-
-#     if curr_core != {}:
-#         res.cores.append(curr_core)
-
-#     # RAM info
-#     for line in mem_data.split("\n"):
-#         if line.startswith("MemTotal"):
-#             res.ram_size = int(line.split(":", 1)[1].split()[0]) * 1024
-#             break
-
-#     # HDD info
-#     for dev in remote_run('ls /dev').split():
-#         if dev[-1].isdigit():
-#             continue
-
-#         if dev.startswith('sd') or dev.startswith('hd'):
-#             model = None
-#             size = None
-
-#             for line in remote_run('sudo hdparm -I /dev/' + dev).split("\n"):
-#                 if "Model Number:" in line:
-#                     model = line.split(':', 1)[1]
-#                 elif "device size with M = 1024*1024" in line:
-#                     size = int(line.split(':', 1)[1].split()[0])
-#                     size *= 1024 ** 2
-
-#             res.disks_info[dev] = (model, size)
-
-#     # Network info
-#     separator = '*-network'
-#     net_info = remote_run('sudo lshw -class network')
-#     for net_dev_info in net_info.split(separator):
-#         if net_dev_info.strip().startswith("DISABLED"):
-#             continue
-
-#         if ":" not in net_dev_info:
-#             continue
-
-#         dev_params = {}
-#         for line in net_dev_info.split("\n"):
-#             line = line.strip()
-#             if ':' in line:
-#                 key, data = line.split(":", 1)
-#                 dev_params[key.strip()] = data.strip()
-
-#         if 'configuration' not in dev_params:
-#             print "!!!!!", net_dev_info
-#             continue
-
-#         conf = dev_params['configuration']
-#         if 'link=yes' in conf:
-#             if 'speed=' in conf:
-#                 speed = conf.split('speed=', 1)[1]
-#                 speed = speed.strip().split()[0]
-#             else:
-#                 speed = None
-
-#             if "duplex=" in conf:
-#                 dtype = conf.split("duplex=", 1)[1]
-#                 dtype = dtype.strip().split()[0]
-#             else:
-#                 dtype = None
-
-#             res.net_info[dev_params['logical name']] = (speed, dtype)
-#     return res
diff --git a/wally/inode.py b/wally/inode.py
new file mode 100644
index 0000000..225a807
--- /dev/null
+++ b/wally/inode.py
@@ -0,0 +1,106 @@
+import abc
+from typing import Set, Dict, Tuple, Any, Optional
+
+from .ssh_utils import parse_ssh_uri
+
+
+class FuelNodeInfo:
+    """FUEL master node additional info"""
+    def __init__(self,
+                 version: str,
+                 fuel_ext_iface: str,
+                 openrc: Dict[str, str]):
+
+        self.version = version
+        self.fuel_ext_iface = fuel_ext_iface
+        self.openrc = openrc
+
+
+class NodeInfo:
+    """Node information object"""
+    def __init__(self,
+                 ssh_conn_url: str,
+                 roles: Set[str],
+                 bind_ip: str=None,
+                 ssh_key: str=None):
+        self.ssh_conn_url = ssh_conn_url
+        self.roles = roles
+
+        if bind_ip is None:
+            bind_ip = parse_ssh_uri(self.ssh_conn_url).host
+
+        self.bind_ip = bind_ip
+        self.ssh_key = ssh_key
+
+
+class INode(metaclass=abc.ABCMeta):
+    """Node object"""
+
+    def __init__(self, node_info: NodeInfo):
+        self.rpc = None
+        self.node_info = node_info
+        self.hwinfo = None
+        self.roles = []
+
+    @abc.abstractmethod
+    def __str__(self):
+        pass
+
+    def __repr__(self):
+        return str(self)
+
+    @abc.abstractmethod
+    def is_connected(self) -> bool:
+        pass
+
+    @abc.abstractmethod
+    def connect_ssh(self, timeout: int=None) -> None:
+        pass
+
+    @abc.abstractmethod
+    def connect_rpc(self) -> None:
+        pass
+
+    @abc.abstractmethod
+    def prepare_rpc(self) -> None:
+        pass
+
+    @abc.abstractmethod
+    def get_ip(self) -> str:
+        pass
+
+    @abc.abstractmethod
+    def get_user(self) -> str:
+        pass
+
+    @abc.abstractmethod
+    def run(self, cmd: str, stdin_data: str=None, timeout: int=60, nolog: bool=False) -> str:
+        pass
+
+    @abc.abstractmethod
+    def discover_hardware_info(self) -> None:
+        pass
+
+    @abc.abstractmethod
+    def copy_file(self, local_path: str, remote_path: Optional[str]=None) -> str:
+        pass
+
+    @abc.abstractmethod
+    def get_file_content(self, path: str) -> bytes:
+        pass
+
+    @abc.abstractmethod
+    def put_to_file(self, path:str, content: bytes) -> None:
+        pass
+
+    @abc.abstractmethod
+    def forward_port(self, ip: str, remote_port: int, local_port: int=None) -> int:
+        pass
+
+    @abc.abstractmethod
+    def get_interface(self, ip: str) -> str:
+        pass
+
+    @abc.abstractmethod
+    def stat_file(self, path:str) -> Any:
+        pass
diff --git a/wally/keystone.py b/wally/keystone.py
index 24d322c..77c1d5b 100644
--- a/wally/keystone.py
+++ b/wally/keystone.py
@@ -1,19 +1,20 @@
 import json
-import urllib2
+import urllib.request
 from functools import partial
+from typing import Dict, Any
 
 from keystoneclient import exceptions
 from keystoneclient.v2_0 import Client as keystoneclient
 
 
-class Urllib2HTTP(object):
+class Urllib2HTTP:
     """
     class for making HTTP requests
     """
 
     allowed_methods = ('get', 'put', 'post', 'delete', 'patch', 'head')
 
-    def __init__(self, root_url, headers=None, echo=False):
+    def __init__(self, root_url:str, headers:Dict[str, str]=None, echo: bool=False):
         """
         """
         if root_url.endswith('/'):
@@ -24,7 +25,7 @@
         self.headers = headers if headers is not None else {}
         self.echo = echo
 
-    def do(self, method, path, params=None):
+    def do(self, method: str, path: str, params: Dict[str, str]=None) -> Any:
         if path.startswith('/'):
             url = self.root_url + path
         else:
@@ -36,14 +37,14 @@
         else:
             data_json = json.dumps(params)
 
-        request = urllib2.Request(url,
-                                  data=data_json,
-                                  headers=self.headers)
+        request = urllib.request.Request(url,
+                                         data=data_json,
+                                         headers=self.headers)
         if data_json is not None:
             request.add_header('Content-Type', 'application/json')
 
         request.get_method = lambda: method.upper()
-        response = urllib2.urlopen(request)
+        response = urllib.request.urlopen(request)
 
         if response.code < 200 or response.code > 209:
             raise IndexError(url)
@@ -62,15 +63,15 @@
 
 
 class KeystoneAuth(Urllib2HTTP):
-    def __init__(self, root_url, creds, headers=None, echo=False,
-                 admin_node_ip=None):
+    def __init__(self, root_url: str, creds: Dict[str, str], headers: Dict[str, str]=None, echo: bool=False,
+                 admin_node_ip: str=None):
         super(KeystoneAuth, self).__init__(root_url, headers, echo)
         self.keystone_url = "http://{0}:5000/v2.0".format(admin_node_ip)
         self.keystone = keystoneclient(
             auth_url=self.keystone_url, **creds)
         self.refresh_token()
 
-    def refresh_token(self):
+    def refresh_token(self) -> None:
         """Get new token from keystone and update headers"""
         try:
             self.keystone.authenticate()
@@ -78,11 +79,11 @@
         except exceptions.AuthorizationFailure:
             raise
 
-    def do(self, method, path, params=None):
+    def do(self, method: str, path: str, params: Dict[str, str]=None) -> Any:
         """Do request. If gets 401 refresh token"""
         try:
             return super(KeystoneAuth, self).do(method, path, params)
-        except urllib2.HTTPError as e:
+        except urllib.request.HTTPError as e:
             if e.code == 401:
                 self.refresh_token()
                 return super(KeystoneAuth, self).do(method, path, params)
diff --git a/wally/logger.py b/wally/logger.py
new file mode 100644
index 0000000..43eff6b
--- /dev/null
+++ b/wally/logger.py
@@ -0,0 +1,82 @@
+import logging
+
+
+def color_me(color):
+    RESET_SEQ = "\033[0m"
+    COLOR_SEQ = "\033[1;%dm"
+
+    color_seq = COLOR_SEQ % (30 + color)
+
+    def closure(msg):
+        return color_seq + msg + RESET_SEQ
+    return closure
+
+
+class ColoredFormatter(logging.Formatter):
+    BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8)
+
+    colors = {
+        'WARNING': color_me(YELLOW),
+        'DEBUG': color_me(BLUE),
+        'CRITICAL': color_me(YELLOW),
+        'ERROR': color_me(RED)
+    }
+
+    def __init__(self, msg, use_color=True, datefmt=None):
+        logging.Formatter.__init__(self, msg, datefmt=datefmt)
+        self.use_color = use_color
+
+    def format(self, record):
+        orig = record.__dict__
+        record.__dict__ = record.__dict__.copy()
+        levelname = record.levelname
+
+        prn_name = levelname + ' ' * (8 - len(levelname))
+        if levelname in self.colors:
+            record.levelname = self.colors[levelname](prn_name)
+        else:
+            record.levelname = prn_name
+
+        # super doesn't work here in 2.6 O_o
+        res = logging.Formatter.format(self, record)
+
+        # res = super(ColoredFormatter, self).format(record)
+
+        # restore record, as it will be used by other formatters
+        record.__dict__ = orig
+        return res
+
+
+def setup_loggers(def_level=logging.DEBUG, log_fname=None):
+    logger = logging.getLogger('wally')
+    logger.setLevel(logging.DEBUG)
+    sh = logging.StreamHandler()
+    sh.setLevel(def_level)
+
+    log_format = '%(asctime)s - %(levelname)s - %(name)-15s - %(message)s'
+    colored_formatter = ColoredFormatter(log_format, datefmt="%H:%M:%S")
+
+    sh.setFormatter(colored_formatter)
+    logger.addHandler(sh)
+
+    logger_api = logging.getLogger("wally.fuel_api")
+
+    if log_fname is not None:
+        fh = logging.FileHandler(log_fname)
+        log_format = '%(asctime)s - %(levelname)8s - %(name)-15s - %(message)s'
+        formatter = logging.Formatter(log_format, datefmt="%H:%M:%S")
+        fh.setFormatter(formatter)
+        fh.setLevel(logging.DEBUG)
+        logger.addHandler(fh)
+        logger_api.addHandler(fh)
+    else:
+        fh = None
+
+    logger_api.addHandler(sh)
+    logger_api.setLevel(logging.WARNING)
+
+    logger = logging.getLogger('paramiko')
+    logger.setLevel(logging.WARNING)
+    # logger.addHandler(sh)
+    if fh is not None:
+        logger.addHandler(fh)
diff --git a/wally/main.py b/wally/main.py
index da144dd..9358b53 100644
--- a/wally/main.py
+++ b/wally/main.py
@@ -5,7 +5,6 @@
 import logging
 import argparse
 import functools
-import contextlib
 
 from yaml import load as _yaml_load
 
@@ -24,34 +23,17 @@
     faulthandler = None
 
 
-from wally.timeseries import SensorDatastore
-from wally import utils, run_test, pretty_yaml
-from wally.config import (load_config, setup_loggers,
-                          get_test_files, save_run_params, load_run_params)
+from .timeseries import SensorDatastore
+from . import utils, run_test, pretty_yaml
+from .config import (load_config,
+                     get_test_files, save_run_params, load_run_params)
+from .logger import setup_loggers
+from .stage import log_stage
 
 
 logger = logging.getLogger("wally")
 
 
-class Context(object):
-    def __init__(self):
-        self.build_meta = {}
-        self.nodes = []
-        self.clear_calls_stack = []
-        self.openstack_nodes_ids = []
-        self.sensors_mon_q = None
-        self.hw_info = []
-        self.fuel_openstack_creds = None
-
-
-def get_stage_name(func):
-    nm = get_func_name(func)
-    if nm.endswith("stage"):
-        return nm
-    else:
-        return nm + " stage"
-
-
 def get_test_names(raw_res):
     res = []
     for tp, data in raw_res:
@@ -106,31 +88,6 @@
     print(tab.draw())
 
 
-def get_func_name(obj):
-    if hasattr(obj, '__name__'):
-        return obj.__name__
-    if hasattr(obj, 'func_name'):
-        return obj.func_name
-    return obj.func.func_name
-
-
-@contextlib.contextmanager
-def log_stage(func):
-    msg_templ = "Exception during {0}: {1!s}"
-    msg_templ_no_exc = "During {0}"
-
-    logger.info("Start " + get_stage_name(func))
-
-    try:
-        yield
-    except utils.StopTestError as exc:
-        logger.error(msg_templ.format(
-            get_func_name(func), exc))
-    except Exception:
-        logger.exception(msg_templ_no_exc.format(
-            get_func_name(func)))
-
-
 def make_storage_dir_struct(cfg):
     utils.mkdirs_if_unxists(cfg.results_dir)
     utils.mkdirs_if_unxists(cfg.sensor_storage)
@@ -175,7 +132,7 @@
     test_parser.add_argument('--build-type', type=str, default="GA")
     test_parser.add_argument('-n', '--no-tests', action='store_true',
                              help="Don't run tests", default=False)
-    test_parser.add_argument('--load_report',  action='store_true')
+    test_parser.add_argument('--load_report', action='store_true')
     test_parser.add_argument("-k", '--keep-vm', action='store_true',
                              help="Don't remove test vm's", default=False)
     test_parser.add_argument("-d", '--dont-discover-nodes', action='store_true',
diff --git a/wally/meta_info.py b/wally/meta_info.py
index 12cb061..19fdad9 100644
--- a/wally/meta_info.py
+++ b/wally/meta_info.py
@@ -1,8 +1,9 @@
-from urlparse import urlparse
-from keystone import KeystoneAuth
+from typing import Any, Dict
+from urllib.parse import urlparse
+from .keystone import KeystoneAuth
 
 
-def total_lab_info(data):
+def total_lab_info(data: Dict[str, Any]) -> Dict[str, Any]:
     lab_data = {}
     lab_data['nodes_count'] = len(data['nodes'])
     lab_data['total_memory'] = 0
@@ -24,7 +25,7 @@
     return lab_data
 
 
-def collect_lab_data(url, cred):
+def collect_lab_data(url: str, cred: Dict[str, str]) -> Dict[str, Any]:
     u = urlparse(url)
     keystone = KeystoneAuth(root_url=url, creds=cred, admin_node_ip=u.hostname)
     lab_info = keystone.do(method='get', path="/api/nodes")
diff --git a/wally/node.py b/wally/node.py
new file mode 100644
index 0000000..b146876
--- /dev/null
+++ b/wally/node.py
@@ -0,0 +1,108 @@
+import re
+import getpass
+from typing import Tuple
+from .inode import INode, NodeInfo
+
+from .ssh_utils import parse_ssh_uri, run_over_ssh, connect
+
+
+class Node(INode):
+    """Node object"""
+
+    def __init__(self, node_info: NodeInfo):
+        self.info = node_info
+        self.roles = node_info.roles
+        self.bind_ip = node_info.bind_ip
+
+        assert self.ssh_conn_url.startswith("ssh://")
+        self.ssh_conn_url = node_info.ssh_conn_url
+
+        self.ssh_conn = None
+        self.rpc_conn_url = None
+        self.rpc_conn = None
+        self.os_vm_id = None
+        self.hw_info = None
+
+        if self.ssh_conn_url is not None:
+            self.ssh_cred = parse_ssh_uri(self.ssh_conn_url)
+            self.node_id = "{0.host}:{0.port}".format(self.ssh_cred)
+        else:
+            self.ssh_cred = None
+            self.node_id = None
+
+    def __str__(self):
+        template = "<Node: url={conn_url!r} roles={roles}" + \
+                   " connected={is_connected}>"
+        return template.format(conn_url=self.ssh_conn_url,
+                               roles=", ".join(self.roles),
+                               is_connected=self.ssh_conn is not None)
+
+    def __repr__(self):
+        return str(self)
+
+    def connect_ssh(self) -> None:
+        self.ssh_conn = connect(self.ssh_conn_url)
+
+    def connect_rpc(self) -> None:
+        raise NotImplementedError()
+
+    def prepare_rpc(self) -> None:
+        raise NotImplementedError()
+
+    def get_ip(self) -> str:
+        """get node connection ip address"""
+
+        if self.ssh_conn_url == 'local':
+            return '127.0.0.1'
+        return self.ssh_cred.host
+
+    def get_user(self) -> str:
+        """"get ssh connection username"""
+        if self.ssh_conn_url == 'local':
+            return getpass.getuser()
+        return self.ssh_cred.user
+
+    def run(self, cmd: str, stdin_data: str=None, timeout: int=60, nolog: bool=False) -> Tuple[int, str]:
+        """Run command on node. Will use rpc connection, if available"""
+
+        if self.rpc_conn is None:
+            return run_over_ssh(self.ssh_conn, cmd,
+                                stdin_data=stdin_data, timeout=timeout,
+                                nolog=nolog, node=self)
+        assert not stdin_data
+        proc_id = self.rpc_conn.cli.spawn(cmd)
+        exit_code = None
+        output = ""
+
+        while exit_code is None:
+            exit_code, stdout_data, stderr_data = self.rpc_conn.cli.get_updates(proc_id)
+            output += stdout_data + stderr_data
+
+        return exit_code, output
+
+    def discover_hardware_info(self) -> None:
+        raise NotImplementedError()
+
+    def get_file_content(self, path: str) -> str:
+        raise NotImplementedError()
+
+    def forward_port(self, ip: str, remote_port: int, local_port: int = None) -> int:
+        raise NotImplementedError()
+
+    def get_interface(self, ip: str) -> str:
+        """Get node external interface for given IP"""
+        data = self.run("ip a", nolog=True)
+        curr_iface = None
+
+        for line in data.split("\n"):
+            match1 = re.match(r"\d+:\s+(?P<name>.*?):\s\<", line)
+            if match1 is not None:
+                curr_iface = match1.group('name')
+
+            match2 = re.match(r"\s+inet\s+(?P<ip>[0-9.]+)/", line)
+            if match2 is not None:
+                if match2.group('ip') == ip:
+                    assert curr_iface is not None
+                    return curr_iface
+
+        raise KeyError("Can't found interface for ip {0}".format(ip))
diff --git a/wally/node_rpc.py b/wally/node_rpc.py
new file mode 100644
index 0000000..9a458ae
--- /dev/null
+++ b/wally/node_rpc.py
@@ -0,0 +1,428 @@
+import re
+import json
+import time
+import errno
+import socket
+import shutil
+import logging
+import os.path
+import getpass
+import StringIO
+import threading
+import subprocess
+
+import paramiko
+
+from agent import connect
+from .ssh_utils import Local, ssh_connect, ssh_copy_file
+
+
+logger = logging.getLogger("wally")
+
+
+def setup_node(conn, agent_path, ip):
+    agent_fname, log_fname = run_over_ssh(conn, "mktemp;echo;mktemp").strip().split()
+    with conn.open_sftp() as sftp:
+        ssh_copy_file(sftp, agent_path, agent_fname)
+
+    cmd = "python {} server -d --listen-addr={}:0 --stdout-file={}"
+    jdata = run_over_ssh(conn, cmd.format(agent_fname, ip, log_fname)).strip()
+    run_over_ssh(conn, "rm {}".format(agent_fname))
+    sett = json.loads(jdata)
+    return connect(sett['addr'])
+
+
+def exists(rpc, path):
+    """os.path.exists for paramiko's SCP object"""
+    return rpc.exists(path)
+
+
+def save_to_remote(sftp, path, content):
+    with sftp.open(path, "wb") as fd:
+        fd.write(content)
+
+
+def read_from_remote(sftp, path):
+    with sftp.open(path, "rb") as fd:
+        return fd.read()
+
+
+def normalize_dirpath(dirpath):
+    while dirpath.endswith("/"):
+        dirpath = dirpath[:-1]
+    return dirpath
+
+
+ALL_RWX_MODE = ((1 << 9) - 1)
+
+
+def ssh_mkdir(sftp, remotepath, mode=ALL_RWX_MODE, intermediate=False):
+    remotepath = normalize_dirpath(remotepath)
+    if intermediate:
+        try:
+            sftp.mkdir(remotepath, mode=mode)
+        except (IOError, OSError):
+            upper_dir = remotepath.rsplit("/", 1)[0]
+
+            if upper_dir == '' or upper_dir == '/':
+                raise
+
+            ssh_mkdir(sftp, upper_dir, mode=mode, intermediate=True)
+            return sftp.mkdir(remotepath, mode=mode)
+    else:
+        sftp.mkdir(remotepath, mode=mode)
+
+
+def ssh_copy_file(sftp, localfile, remfile, preserve_perm=True):
+    sftp.put(localfile, remfile)
+    if preserve_perm:
+        sftp.chmod(remfile, os.stat(localfile).st_mode & ALL_RWX_MODE)
+
+
+def put_dir_recursively(sftp, localpath, remotepath, preserve_perm=True):
+    "upload local directory to remote recursively"
+
+    # hack for localhost connection
+    if hasattr(sftp, "copytree"):
+        sftp.copytree(localpath, remotepath)
+        return
+
+    assert remotepath.startswith("/"), "%s must be absolute path" % remotepath
+
+    # normalize
+    localpath = normalize_dirpath(localpath)
+    remotepath = normalize_dirpath(remotepath)
+
+    try:
+        sftp.chdir(remotepath)
+        localsuffix = localpath.rsplit("/", 1)[1]
+        remotesuffix = remotepath.rsplit("/", 1)[1]
+        if localsuffix != remotesuffix:
+            remotepath = os.path.join(remotepath, localsuffix)
+    except IOError:
+        pass
+
+    for root, dirs, fls in os.walk(localpath):
+        prefix = os.path.commonprefix([localpath, root])
+        suffix = root.split(prefix, 1)[1]
+        if suffix.startswith("/"):
+            suffix = suffix[1:]
+
+        remroot = os.path.join(remotepath, suffix)
+
+        try:
+            sftp.chdir(remroot)
+        except IOError:
+            if preserve_perm:
+                mode = os.stat(root).st_mode & ALL_RWX_MODE
+            else:
+                mode = ALL_RWX_MODE
+            ssh_mkdir(sftp, remroot, mode=mode, intermediate=True)
+            sftp.chdir(remroot)
+
+        for f in fls:
+            remfile = os.path.join(remroot, f)
+            localfile = os.path.join(root, f)
+            ssh_copy_file(sftp, localfile, remfile, preserve_perm)
+
+
+def delete_file(conn, path):
+    sftp = conn.open_sftp()
+    sftp.remove(path)
+    sftp.close()
+
+
+def copy_paths(conn, paths):
+    sftp = conn.open_sftp()
+    try:
+        for src, dst in paths.items():
+            try:
+                if os.path.isfile(src):
+                    ssh_copy_file(sftp, src, dst)
+                elif os.path.isdir(src):
+                    put_dir_recursively(sftp, src, dst)
+                else:
+                    templ = "Can't copy {0!r} - " + \
+                            "it neither a file not a directory"
+                    raise OSError(templ.format(src))
+            except Exception as exc:
+                tmpl = "Scp {0!r} => {1!r} failed - {2!r}"
+                raise OSError(tmpl.format(src, dst, exc))
+    finally:
+        sftp.close()
+
+
+class ConnCreds(object):
+    conn_uri_attrs = ("user", "passwd", "host", "port", "path")
+
+    def __init__(self):
+        for name in self.conn_uri_attrs:
+            setattr(self, name, None)
+
+    def __str__(self):
+        return str(self.__dict__)
+
+
+uri_reg_exprs = []
+
+
+class URIsNamespace(object):
+    class ReParts(object):
+        user_rr = "[^:]*?"
+        host_rr = "[^:@]*?"
+        port_rr = "\\d+"
+        key_file_rr = "[^:@]*"
+        passwd_rr = ".*?"
+
+    re_dct = ReParts.__dict__
+
+    for attr_name, val in re_dct.items():
+        if attr_name.endswith('_rr'):
+            new_rr = "(?P<{0}>{1})".format(attr_name[:-3], val)
+            setattr(ReParts, attr_name, new_rr)
+
+    re_dct = ReParts.__dict__
+
+    templs = [
+        "^{host_rr}$",
+        "^{host_rr}:{port_rr}$",
+        "^{host_rr}::{key_file_rr}$",
+        "^{host_rr}:{port_rr}:{key_file_rr}$",
+        "^{user_rr}@{host_rr}$",
+        "^{user_rr}@{host_rr}:{port_rr}$",
+        "^{user_rr}@{host_rr}::{key_file_rr}$",
+        "^{user_rr}@{host_rr}:{port_rr}:{key_file_rr}$",
+        "^{user_rr}:{passwd_rr}@{host_rr}$",
+        "^{user_rr}:{passwd_rr}@{host_rr}:{port_rr}$",
+    ]
+
+    for templ in templs:
+        uri_reg_exprs.append(templ.format(**re_dct))
+
+
+def parse_ssh_uri(uri):
+    # user:passwd@ip_host:port
+    # user:passwd@ip_host
+    # user@ip_host:port
+    # user@ip_host
+    # ip_host:port
+    # ip_host
+    # user@ip_host:port:path_to_key_file
+    # user@ip_host::path_to_key_file
+    # ip_host:port:path_to_key_file
+    # ip_host::path_to_key_file
+
+    if uri.startswith("ssh://"):
+        uri = uri[len("ssh://"):]
+
+    res = ConnCreds()
+    res.port = "22"
+    res.key_file = None
+    res.passwd = None
+    res.user = getpass.getuser()
+
+    for rr in uri_reg_exprs:
+        rrm = re.match(rr, uri)
+        if rrm is not None:
+            res.__dict__.update(rrm.groupdict())
+            return res
+
+    raise ValueError("Can't parse {0!r} as ssh uri value".format(uri))
+
+
+def reconnect(conn, uri, **params):
+    if uri == 'local':
+        return conn
+
+    creds = parse_ssh_uri(uri)
+    creds.port = int(creds.port)
+    return ssh_connect(creds, reuse_conn=conn, **params)
+
+
+def connect(uri, **params):
+    if uri == 'local':
+        res = Local()
+    else:
+        creds = parse_ssh_uri(uri)
+        creds.port = int(creds.port)
+        res = ssh_connect(creds, **params)
+    return res
+
+
+all_sessions_lock = threading.Lock()
+all_sessions = {}
+
+
+class BGSSHTask(object):
+    CHECK_RETRY = 5
+
+    def __init__(self, node, use_sudo):
+        self.node = node
+        self.pid = None
+        self.use_sudo = use_sudo
+
+    def start(self, orig_cmd, **params):
+        uniq_name = 'test'
+        cmd = "screen -S {0} -d -m {1}".format(uniq_name, orig_cmd)
+        run_over_ssh(self.node.connection, cmd,
+                     timeout=10, node=self.node.get_conn_id(),
+                     **params)
+        processes = run_over_ssh(self.node.connection, "ps aux", nolog=True)
+
+        for iter in range(self.CHECK_RETRY):
+            for proc in processes.split("\n"):
+                if orig_cmd in proc and "SCREEN" not in proc:
+                    self.pid = proc.split()[1]
+                    break
+            if self.pid is not None:
+                break
+            time.sleep(1)
+
+        if self.pid is None:
+            self.pid = -1
+
+    def check_running(self):
+        assert self.pid is not None
+        if -1 == self.pid:
+            return False
+        try:
+            run_over_ssh(self.node.connection,
+                         "ls /proc/{0}".format(self.pid),
+                         timeout=10, nolog=True)
+            return True
+        except OSError:
+            return False
+
+    def kill(self, soft=True, use_sudo=True):
+        assert self.pid is not None
+        if self.pid == -1:
+            return True
+        try:
+            if soft:
+                cmd = "kill {0}"
+            else:
+                cmd = "kill -9 {0}"
+
+            if self.use_sudo:
+                cmd = "sudo " + cmd
+
+            run_over_ssh(self.node.connection,
+                         cmd.format(self.pid), nolog=True)
+            return True
+        except OSError:
+            return False
+
+    def wait(self, soft_timeout, timeout):
+        end_of_wait_time = timeout + time.time()
+        soft_end_of_wait_time = soft_timeout + time.time()
+
+        # time_till_check = random.randint(5, 10)
+        time_till_check = 2
+
+        # time_till_first_check = random.randint(2, 6)
+        time_till_first_check = 2
+        time.sleep(time_till_first_check)
+        if not self.check_running():
+            return True
+
+        while self.check_running() and time.time() < soft_end_of_wait_time:
+            # time.sleep(soft_end_of_wait_time - time.time())
+            time.sleep(time_till_check)
+
+        while end_of_wait_time > time.time():
+            time.sleep(time_till_check)
+            if not self.check_running():
+                break
+        else:
+            self.kill()
+            time.sleep(1)
+            if self.check_running():
+                self.kill(soft=False)
+            return False
+        return True
+
+
+def run_over_ssh(conn, cmd, stdin_data=None, timeout=60,
+                 nolog=False, node=None):
+    "should be replaces by normal implementation, with select"
+
+    if isinstance(conn, Local):
+        if not nolog:
+            logger.debug("SSH:local Exec {0!r}".format(cmd))
+        proc = subprocess.Popen(cmd, shell=True,
+                                stdin=subprocess.PIPE,
+                                stdout=subprocess.PIPE,
+                                stderr=subprocess.STDOUT)
+
+        stdoutdata, _ = proc.communicate(input=stdin_data)
+        if proc.returncode != 0:
+            templ = "SSH:{0} Cmd {1!r} failed with code {2}. Output: {3}"
+            raise OSError(templ.format(node, cmd, proc.returncode, stdoutdata))
+
+        return stdoutdata
+
+    transport = conn.get_transport()
+    session = transport.open_session()
+
+    if node is None:
+        node = ""
+
+    with all_sessions_lock:
+        all_sessions[id(session)] = session
+
+    try:
+        session.set_combine_stderr(True)
+
+        stime = time.time()
+
+        if not nolog:
+            logger.debug("SSH:{0} Exec {1!r}".format(node, cmd))
+
+        session.exec_command(cmd)
+
+        if stdin_data is not None:
+            session.sendall(stdin_data)
+
+        session.settimeout(1)
+        session.shutdown_write()
+        output = ""
+
+        while True:
+            try:
+                ndata = session.recv(1024)
+                output += ndata
+                if "" == ndata:
+                    break
+            except socket.timeout:
+                pass
+
+            if time.time() - stime > timeout:
+                raise OSError(output + "\nExecution timeout")
+
+        code = session.recv_exit_status()
+    finally:
+        found = False
+        with all_sessions_lock:
+            if id(session) in all_sessions:
+                found = True
+                del all_sessions[id(session)]
+
+        if found:
+            session.close()
+
+    if code != 0:
+        templ = "SSH:{0} Cmd {1!r} failed with code {2}. Output: {3}"
+        raise OSError(templ.format(node, cmd, code, output))
+
+    return output
+
+
+def close_all_sessions():
+    with all_sessions_lock:
+        for session in all_sessions.values():
+            try:
+                session.sendall('\x03')
+                session.close()
+            except:
+                pass
+        all_sessions.clear()
diff --git a/wally/pretty_yaml.py b/wally/pretty_yaml.py
index afb6375..4038ea8 100644
--- a/wally/pretty_yaml.py
+++ b/wally/pretty_yaml.py
@@ -1,13 +1,14 @@
 __doc__ = "functions for make pretty yaml files"
 __all__ = ['dumps']
 
+from typing import Any, Iterable
 
-def dumps_simple(val):
+
+def dumps_simple(val: Any) -> str:
     bad_symbols = set(" \r\t\n,':{}[]><;")
 
-    if isinstance(val, basestring):
-        if isinstance(val, unicode):
-            val = val.encode('utf8')
+    if isinstance(val, str):
+        val = val.encode('utf8')
 
         try:
             float(val)
@@ -27,16 +28,16 @@
     return str(val)
 
 
-def is_simple(val):
-    simple_type = isinstance(val, (str, unicode, int, long, bool, float))
+def is_simple(val: Any) -> bool:
+    simple_type = isinstance(val, (str, int, bool, float))
     return simple_type or val is None
 
 
-def all_nums(vals):
-    return all(isinstance(val, (int, float, long)) for val in vals)
+def all_nums(vals: Iterable[Any]) -> bool:
+    return all(isinstance(val, (int, float)) for val in vals)
 
 
-def dumpv(data, tab_sz=4, width=160, min_width=40):
+def dumpv(data: Any, tab_sz: int=4, width: int=160, min_width: int=40) -> str:
     tab = ' ' * tab_sz
 
     if width < min_width:
@@ -107,5 +108,5 @@
     return res
 
 
-def dumps(data, tab_sz=4, width=120, min_width=40):
+def dumps(data: Any, tab_sz: int=4, width: int=120, min_width: int=40) -> str:
     return "\n".join(dumpv(data, tab_sz, width, min_width))
diff --git a/wally/report.py b/wally/report.py
index f27491b..ed3c362 100644
--- a/wally/report.py
+++ b/wally/report.py
@@ -4,7 +4,8 @@
 import logging
 import itertools
 import collections
-from cStringIO import StringIO
+from io import StringIO
+from typing import Dict
 
 try:
     import numpy
@@ -16,18 +17,18 @@
     plt = None
 
 import wally
-from wally.utils import ssize2b
-from wally.statistic import round_3_digit
-from wally.suits.io.fio_task_parser import (get_test_sync_mode,
-                                            get_test_summary,
-                                            parse_all_in_1,
-                                            abbv_name_to_full)
+from .utils import ssize2b
+from .statistic import round_3_digit
+from .suits.io.fio_task_parser import (get_test_sync_mode,
+                                       get_test_summary,
+                                       parse_all_in_1,
+                                       abbv_name_to_full)
 
 
 logger = logging.getLogger("wally.report")
 
 
-class DiskInfo(object):
+class DiskInfo:
     def __init__(self):
         self.direct_iops_r_max = 0
         self.direct_iops_w_max = 0
@@ -46,7 +47,7 @@
 
 
 class Attrmapper(object):
-    def __init__(self, dct):
+    def __init__(self, dct: Dict):
         self.__dct = dct
 
     def __getattr__(self, name):
@@ -347,12 +348,6 @@
                             data['bw_write_max'][1],
                             data['bw_write_max'][2])
 
-    # templ_name = 'report_ceph_1.html'
-
-    # import pprint
-    # pprint.pprint(data)
-    # pprint.pprint(info.__dict__)
-
     images.update(data)
     templ = get_template(templ_name)
     return templ.format(lab_info=lab_description,
@@ -576,7 +571,7 @@
                                       # res.lat.average,
                                       res.concurence))
 
-    rws4k_iops_lat_th.sort(key=lambda (_1, _2, conc): conc)
+    rws4k_iops_lat_th.sort(key=lambda x: x[2])
 
     latv = [lat for _, lat, _ in rws4k_iops_lat_th]
 
diff --git a/wally/run_test.py b/wally/run_test.py
index f99f445..4390f5e 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -5,7 +5,7 @@
 import functools
 import contextlib
 import collections
-
+from typing import List, Dict, Optional, Iterable, Any, Generator, Mapping, Callable
 from yaml import load as _yaml_load
 
 try:
@@ -14,19 +14,21 @@
 except ImportError:
     yaml_load = _yaml_load
 
-from concurrent.futures import ThreadPoolExecutor
+from concurrent.futures import ThreadPoolExecutor, wait
 
-from wally.hw_info import get_hw_info
-from wally.config import get_test_files
-from wally.discover import discover, Node
-from wally import pretty_yaml, utils, report, ssh_utils, start_vms
-from wally.sensors_utils import with_sensors_util, sensors_info_util
+from .config import Config
+from .config import get_test_files
+from .discover import discover, Node
+from .inode import INode
+from .test_run_class import TestRun
 
-from wally.suits.mysql import MysqlTest
-from wally.suits.itest import TestConfig
-from wally.suits.io.fio import IOPerfTest
-from wally.suits.postgres import PgBenchTest
-from wally.suits.omgbench import OmgTest
+from . import pretty_yaml, utils, report, ssh_utils, start_vms
+
+from .suits.mysql import MysqlTest
+from .suits.itest import TestConfig
+from .suits.io.fio import IOPerfTest
+from .suits.postgres import PgBenchTest
+from .suits.omgbench import OmgTest
 
 
 TOOL_TYPE_MAPPER = {
@@ -40,9 +42,8 @@
 logger = logging.getLogger("wally")
 
 
-def connect_all(nodes, spawned_node=False):
-    """
-    Connect to all nodes, log errors
+def connect_all(nodes: Iterable[INode], spawned_node: Optional[bool]=False) -> None:
+    """Connect to all nodes, log errors
     nodes:[Node] - list of nodes
     spawned_node:bool - whenever nodes is newly spawned VM
     """
@@ -51,93 +52,87 @@
 
     conn_timeout = 240 if spawned_node else 30
 
-    def connect_ext(conn_url):
+    def connect_ext(node: INode) -> bool:
         try:
-            return ssh_utils.connect(conn_url, conn_timeout=conn_timeout)
+            node.connect_ssh(conn_timeout)
+            node.connect_rpc(conn_timeout)
+            return True
         except Exception as exc:
-            logger.error("During connect to {0}: {1!s}".format(conn_url, exc))
-            return None
-
-    urls = []
-    ssh_pref = "ssh://"
-
-    for node in nodes:
-        if node.conn_url == 'local':
-            urls.append(node.conn_url)
-        elif node.conn_url.startswith(ssh_pref):
-            urls.append(node.conn_url[len(ssh_pref):])
-        else:
-            msg = "Unknown url type {0}".format(node.conn_url)
-            logger.error(msg)
-            raise utils.StopTestError(msg)
+            logger.error("During connect to {}: {!s}".format(node, exc))
+            return False
 
     with ThreadPoolExecutor(32) as pool:
-        for node, conn in zip(nodes, pool.map(connect_ext, urls)):
-            node.connection = conn
+        list(pool.map(connect_ext, nodes))
 
     failed_testnodes = []
     failed_nodes = []
 
     for node in nodes:
-        if node.connection is None:
+        if not node.is_connected():
             if 'testnode' in node.roles:
-                failed_testnodes.append(node.get_conn_id())
+                failed_testnodes.append(node)
             else:
-                failed_nodes.append(node.get_conn_id())
+                failed_nodes.append(node)
 
-    if failed_nodes != []:
-        msg = "Node(s) {0} would be excluded - can't connect"
-        logger.warning(msg.format(",".join(failed_nodes)))
+    if failed_nodes:
+        msg = "Node(s) {} would be excluded - can't connect"
+        logger.warning(msg.format(",".join(map(str, failed_nodes))))
 
-    if failed_testnodes != []:
-        msg = "Can't connect to testnode(s) " + ",".join(failed_testnodes)
+    if failed_testnodes:
+        msg = "Can't connect to testnode(s) " + \
+              ",".join(map(str, failed_testnodes))
         logger.error(msg)
         raise utils.StopTestError(msg)
 
-    if len(failed_nodes) == 0:
+    if not failed_nodes:
         logger.info("All nodes connected successfully")
 
 
-def collect_hw_info_stage(cfg, ctx):
-    if os.path.exists(cfg['hwreport_fname']):
+def collect_hw_info_stage(cfg: Config, nodes: Iterable[INode]) -> None:
+    # TODO(koder): rewrite this function, to use other storage type
+    if os.path.exists(cfg.hwreport_fname):
         msg = "{0} already exists. Skip hw info"
         logger.info(msg.format(cfg['hwreport_fname']))
         return
 
     with ThreadPoolExecutor(32) as pool:
-        connections = (node.connection for node in ctx.nodes)
-        ctx.hw_info.extend(pool.map(get_hw_info, connections))
+        fitures = pool.submit(node.discover_hardware_info
+                              for node in nodes)
+        wait(fitures)
 
-    with open(cfg['hwreport_fname'], 'w') as hwfd:
-        for node, info in zip(ctx.nodes, ctx.hw_info):
+    with open(cfg.hwreport_fname, 'w') as hwfd:
+        for node in nodes:
             hwfd.write("-" * 60 + "\n")
             hwfd.write("Roles : " + ", ".join(node.roles) + "\n")
-            hwfd.write(str(info) + "\n")
+            hwfd.write(str(node.hwinfo) + "\n")
             hwfd.write("-" * 60 + "\n\n")
 
-            if info.hostname is not None:
+            if node.hwinfo.hostname is not None:
                 fname = os.path.join(
                     cfg.hwinfo_directory,
-                    info.hostname + "_lshw.xml")
+                    node.hwinfo.hostname + "_lshw.xml")
 
                 with open(fname, "w") as fd:
-                    fd.write(info.raw)
-    logger.info("Hardware report stored in " + cfg['hwreport_fname'])
-    logger.debug("Raw hardware info in " + cfg['hwinfo_directory'] + " folder")
+                    fd.write(node.hwinfo.raw)
+
+    logger.info("Hardware report stored in " + cfg.hwreport_fname)
+    logger.debug("Raw hardware info in " + cfg.hwinfo_directory + " folder")
 
 
 @contextlib.contextmanager
-def suspend_vm_nodes_ctx(unused_nodes):
+def suspend_vm_nodes_ctx(unused_nodes: Iterable[INode]) -> Generator[List[int]]:
+
     pausable_nodes_ids = [node.os_vm_id for node in unused_nodes
                           if node.os_vm_id is not None]
+
     non_pausable = len(unused_nodes) - len(pausable_nodes_ids)
 
     if 0 != non_pausable:
-        logger.warning("Can't pause {0} nodes".format(
+        logger.warning("Can't pause {} nodes".format(
                        non_pausable))
 
     if len(pausable_nodes_ids) != 0:
-        logger.debug("Try to pause {0} unused nodes".format(
+        logger.debug("Try to pause {} unused nodes".format(
                      len(pausable_nodes_ids)))
         start_vms.pause(pausable_nodes_ids)
 
@@ -145,20 +140,20 @@
         yield pausable_nodes_ids
     finally:
         if len(pausable_nodes_ids) != 0:
-            logger.debug("Unpausing {0} nodes".format(
+            logger.debug("Unpausing {} nodes".format(
                          len(pausable_nodes_ids)))
             start_vms.unpause(pausable_nodes_ids)
 
 
-def generate_result_dir_name(results, name, params):
+def generate_result_dir_name(results: str, name: str, params: Dict[str, Any]) -> str:
     # make a directory for results
     all_tests_dirs = os.listdir(results)
 
     if 'name' in params:
-        dir_name = "{0}_{1}".format(name, params['name'])
+        dir_name = "{}_{}".format(name, params['name'])
     else:
         for idx in range(len(all_tests_dirs) + 1):
-            dir_name = "{0}_{1}".format(name, idx)
+            dir_name = "{}_{}".format(name, idx)
             if dir_name not in all_tests_dirs:
                 break
         else:
@@ -167,7 +162,13 @@
     return os.path.join(results, dir_name)
 
 
-def run_tests(cfg, test_block, nodes):
+@contextlib.contextmanager
+def sensor_monitoring(sensor_cfg: Any, nodes: Iterable[INode]) -> Generator[None]:
+    # TODO(koder): write this function
+    pass
+
+
+def run_tests(cfg: Config, test_block, nodes: Iterable[INode]) -> None:
     """
     Run test from test block
     """
@@ -183,11 +184,11 @@
 
         # iterate over all node counts
         limit = params.get('node_limit', len(test_nodes))
-        if isinstance(limit, (int, long)):
+        if isinstance(limit, int):
             vm_limits = [limit]
         else:
             list_or_tpl = isinstance(limit, (tuple, list))
-            all_ints = list_or_tpl and all(isinstance(climit, (int, long))
+            all_ints = list_or_tpl and all(isinstance(climit, int)
                                            for climit in limit)
             if not all_ints:
                 msg = "'node_limit' parameter ion config should" + \
@@ -221,50 +222,39 @@
                                        if node.os_vm_id is not None]
 
                 if len(resumable_nodes_ids) != 0:
-                    logger.debug("Check and unpause {0} nodes".format(
+                    logger.debug("Check and unpause {} nodes".format(
                                  len(resumable_nodes_ids)))
                     start_vms.unpause(resumable_nodes_ids)
 
-                sens_nodes = curr_test_nodes + not_test_nodes
-                with sensors_info_util(cfg, sens_nodes) as sensor_data:
-                    test_cls = TOOL_TYPE_MAPPER[name]
+                test_cls = TOOL_TYPE_MAPPER[name]
 
-                    remote_dir = cfg.default_test_local_folder.format(name=name)
+                remote_dir = cfg.default_test_local_folder.format(name=name)
 
-                    test_cfg = TestConfig(test_cls.__name__,
-                                          params=params,
-                                          test_uuid=cfg.run_uuid,
-                                          nodes=test_nodes,
-                                          log_directory=results_path,
-                                          remote_dir=remote_dir)
+                test_cfg = TestConfig(test_cls.__name__,
+                                      params=params,
+                                      test_uuid=cfg.run_uuid,
+                                      nodes=test_nodes,
+                                      log_directory=results_path,
+                                      remote_dir=remote_dir)
 
-                    t_start = time.time()
-                    res = test_cls(test_cfg).run()
-                    t_end = time.time()
-
-            # save sensor data
-            if sensor_data is not None:
-                fname = "{0}_{1}.csv".format(int(t_start), int(t_end))
-                fpath = os.path.join(cfg.sensor_storage, fname)
-
-                with open(fpath, "w") as fd:
-                    fd.write("\n\n".join(sensor_data))
+                t_start = time.time()
+                res = test_cls(test_cfg).run()
+                t_end = time.time()
 
             results.append(res)
 
         yield name, results
 
 
-def connect_stage(cfg, ctx):
+def connect_stage(cfg: Config, ctx: TestRun) -> None:
     ctx.clear_calls_stack.append(disconnect_stage)
     connect_all(ctx.nodes)
     ctx.nodes = [node for node in ctx.nodes if node.connection is not None]
 
 
-def discover_stage(cfg, ctx):
-    """
-    discover clusters and nodes stage
-    """
+def discover_stage(cfg: Config, ctx: TestRun) -> None:
+    """discover clusters and nodes stage"""
+
     if cfg.get('discover') is not None:
         discover_objs = [i.strip() for i in cfg.discover.strip().split(",")]
 
@@ -280,7 +270,8 @@
         ctx.nodes.append(Node(url, roles.split(",")))
 
 
-def save_nodes_stage(cfg, ctx):
+def save_nodes_stage(cfg: Config, ctx: TestRun) -> None:
+    """Save nodes list to file"""
     cluster = {}
     for node in ctx.nodes:
         roles = node.roles[:]
@@ -294,15 +285,15 @@
         fd.write(pretty_yaml.dumps(cluster))
 
 
-def reuse_vms_stage(cfg, ctx):
+def reuse_vms_stage(cfg: Config, ctx: TestRun) -> None:
     vms_patterns = cfg.get('clouds', {}).get('openstack', {}).get('vms', [])
     private_key_path = get_vm_keypair(cfg)['keypair_file_private']
 
     for creds in vms_patterns:
         user_name, vm_name_pattern = creds.split("@", 1)
-        msg = "Vm like {0} lookup failed".format(vm_name_pattern)
+        msg = "Vm like {} lookup failed".format(vm_name_pattern)
 
-        with utils.log_error(msg):
+        with utils.LogError(msg):
             msg = "Looking for vm with name like {0}".format(vm_name_pattern)
             logger.debug(msg)
 
@@ -321,7 +312,7 @@
                 ctx.nodes.append(node)
 
 
-def get_OS_credentials(cfg, ctx):
+def get_OS_credentials(cfg: Config, ctx: TestRun) -> None:
     creds = None
     os_creds = None
     force_insecure = False
@@ -371,7 +362,7 @@
     return creds
 
 
-def get_vm_keypair(cfg):
+def get_vm_keypair(cfg: Config) -> Dict[str, str]:
     res = {}
     for field, ext in (('keypair_file_private', 'pem'),
                        ('keypair_file_public', 'pub')):
@@ -388,7 +379,7 @@
 
 
 @contextlib.contextmanager
-def create_vms_ctx(ctx, cfg, config, already_has_count=0):
+def create_vms_ctx(ctx: TestRun, cfg: Config, config, already_has_count: int=0) -> Generator[List[INode]]:
     if config['count'].startswith('='):
         count = int(config['count'][1:])
         if count <= already_has_count:
@@ -436,7 +427,7 @@
         ctx.nodes = old_nodes
 
 
-def run_tests_stage(cfg, ctx):
+def run_tests_stage(cfg: Config, ctx: TestRun) -> None:
     ctx.results = collections.defaultdict(lambda: [])
 
     for group in cfg.get('tests', []):
@@ -469,7 +460,7 @@
         if cfg.get('sensors') is None:
             sensor_ctx = utils.empty_ctx()
         else:
-            sensor_ctx = with_sensors_util(cfg.get('sensors'), ctx.nodes)
+            sensor_ctx = sensor_monitoring(cfg.get('sensors'), ctx.nodes)
 
         with vm_ctx as new_nodes:
             if len(new_nodes) != 0:
@@ -482,7 +473,7 @@
                             ctx.results[tp].extend(res)
 
 
-def shut_down_vms_stage(cfg, ctx):
+def shut_down_vms_stage(cfg: Config, ctx: TestRun) -> None:
     vm_ids_fname = cfg.vm_ids_fname
     if ctx.openstack_nodes_ids is None:
         nodes_ids = open(vm_ids_fname).read().split()
@@ -498,17 +489,17 @@
         os.remove(vm_ids_fname)
 
 
-def store_nodes_in_log(cfg, nodes_ids):
+def store_nodes_in_log(cfg: Config, nodes_ids: Iterable[str]):
     with open(cfg.vm_ids_fname, 'w') as fd:
         fd.write("\n".join(nodes_ids))
 
 
-def clear_enviroment(cfg, ctx):
+def clear_enviroment(cfg: Config, ctx: TestRun) -> None:
     if os.path.exists(cfg.vm_ids_fname):
         shut_down_vms_stage(cfg, ctx)
 
 
-def disconnect_stage(cfg, ctx):
+def disconnect_stage(cfg: Config, ctx: TestRun) -> None:
     ssh_utils.close_all_sessions()
 
     for node in ctx.nodes:
@@ -516,7 +507,7 @@
             node.connection.close()
 
 
-def store_raw_results_stage(cfg, ctx):
+def store_raw_results_stage(cfg: Config, ctx: TestRun) -> None:
     if os.path.exists(cfg.raw_results):
         cont = yaml_load(open(cfg.raw_results).read())
     else:
@@ -529,7 +520,7 @@
         fd.write(raw_data)
 
 
-def console_report_stage(cfg, ctx):
+def console_report_stage(cfg: Config, ctx: TestRun) -> None:
     first_report = True
     text_rep_fname = cfg.text_report_file
     with open(text_rep_fname, "w") as fd:
@@ -558,7 +549,7 @@
             print("\n" + rep + "\n")
 
 
-def test_load_report_stage(cfg, ctx):
+def test_load_report_stage(cfg: Config, ctx: TestRun) -> None:
     load_rep_fname = cfg.load_report_file
     found = False
     for idx, (tp, data) in enumerate(ctx.results.items()):
@@ -572,7 +563,7 @@
             report.make_load_report(idx, cfg['results'], load_rep_fname)
 
 
-def html_report_stage(cfg, ctx):
+def html_report_stage(cfg: Config, ctx: TestRun) -> None:
     html_rep_fname = cfg.html_report_file
     found = False
     for tp, data in ctx.results.items():
@@ -589,10 +580,10 @@
                                   lab_info=ctx.hw_info)
 
 
-def load_data_from_path(test_res_dir):
+def load_data_from_path(test_res_dir: str) -> Mapping[str, List[Any]]:
     files = get_test_files(test_res_dir)
     raw_res = yaml_load(open(files['raw_results']).read())
-    res = collections.defaultdict(lambda: [])
+    res = collections.defaultdict(list)
 
     for tp, test_lists in raw_res:
         for tests in test_lists:
@@ -603,10 +594,10 @@
     return res
 
 
-def load_data_from_path_stage(var_dir, _, ctx):
+def load_data_from_path_stage(var_dir: str, _, ctx: TestRun) -> None:
     for tp, vals in load_data_from_path(var_dir).items():
         ctx.results.setdefault(tp, []).extend(vals)
 
 
-def load_data_from(var_dir):
+def load_data_from(var_dir: str) -> Callable:
     return functools.partial(load_data_from_path_stage, var_dir)
diff --git a/wally/sensors.py b/wally/sensors.py
new file mode 100644
index 0000000..7560a11
--- /dev/null
+++ b/wally/sensors.py
@@ -0,0 +1,401 @@
+import os
+from collections import namedtuple
+
+
+SensorInfo = namedtuple("SensorInfo", ['value', 'is_accumulated'])
+
+
+def provides(name: str):
+    def closure(func):
+        return func
+    return closure
+
+
+def is_dev_accepted(name, disallowed_prefixes, allowed_prefixes):
+    dev_ok = True
+
+    if disallowed_prefixes is not None:
+        dev_ok = all(not name.startswith(prefix)
+                     for prefix in disallowed_prefixes)
+
+    if dev_ok and allowed_prefixes is not None:
+        dev_ok = any(name.startswith(prefix)
+                     for prefix in allowed_prefixes)
+
+    return dev_ok
+
+
+def get_pid_list(disallowed_prefixes, allowed_prefixes):
+    """Return pid list from list of pids and names"""
+    # exceptions
+    but = disallowed_prefixes if disallowed_prefixes is not None else []
+    if allowed_prefixes is None:
+        # if nothing setted - all ps will be returned except setted
+        result = [pid
+                  for pid in os.listdir('/proc')
+                  if pid.isdigit() and pid not in but]
+    else:
+        result = []
+        for pid in os.listdir('/proc'):
+            if pid.isdigit() and pid not in but:
+                name = get_pid_name(pid)
+                if pid in allowed_prefixes or \
+                   any(name.startswith(val) for val in allowed_prefixes):
+                    # this is allowed pid?
+                    result.append(pid)
+    return result
+
+
+def get_pid_name(pid):
+    """Return name by pid"""
+    try:
+        with open(os.path.join('/proc/', pid, 'cmdline'), 'r') as pidfile:
+            try:
+                cmd = pidfile.readline().split()[0]
+                return os.path.basename(cmd).rstrip('\x00')
+            except IndexError:
+                # no cmd returned
+                return "<NO NAME>"
+    except IOError:
+        # upstream wait any string, no matter if we couldn't read proc
+        return "no_such_process"
+
+
+def delta(func, only_upd=True):
+    prev = {}
+    while True:
+        for dev_name, vals in func():
+            if dev_name not in prev:
+                prev[dev_name] = {}
+                for name, (val, _) in vals.items():
+                    prev[dev_name][name] = val
+            else:
+                dev_prev = prev[dev_name]
+                res = {}
+                for stat_name, (val, accum_val) in vals.items():
+                    if accum_val:
+                        if stat_name in dev_prev:
+                            delta = int(val) - int(dev_prev[stat_name])
+                            if not only_upd or 0 != delta:
+                                res[stat_name] = str(delta)
+                        dev_prev[stat_name] = val
+                    elif not only_upd or '0' != val:
+                        res[stat_name] = val
+
+                if only_upd and len(res) == 0:
+                    continue
+                yield dev_name, res
+        yield None, None
+
+
+#  1 - major number
+#  2 - minor mumber
+#  3 - device name
+#  4 - reads completed successfully
+#  5 - reads merged
+#  6 - sectors read
+#  7 - time spent reading (ms)
+#  8 - writes completed
+#  9 - writes merged
+# 10 - sectors written
+# 11 - time spent writing (ms)
+# 12 - I/Os currently in progress
+# 13 - time spent doing I/Os (ms)
+# 14 - weighted time spent doing I/Os (ms)
+
+io_values_pos = [
+    (3, 'reads_completed', True),
+    (5, 'sectors_read', True),
+    (6, 'rtime', True),
+    (7, 'writes_completed', True),
+    (9, 'sectors_written', True),
+    (10, 'wtime', True),
+    (11, 'io_queue', False),
+    (13, 'io_time', True)
+]
+
+
+@provides("block-io")
+def io_stat(disallowed_prefixes=('ram', 'loop'), allowed_prefixes=None):
+    results = {}
+    for line in open('/proc/diskstats'):
+        vals = line.split()
+        dev_name = vals[2]
+
+        dev_ok = is_dev_accepted(dev_name,
+                                 disallowed_prefixes,
+                                 allowed_prefixes)
+        if dev_name[-1].isdigit():
+            dev_ok = False
+
+        if dev_ok:
+            for pos, name, accum_val in io_values_pos:
+                sensor_name = "{0}.{1}".format(dev_name, name)
+                results[sensor_name] = SensorInfo(int(vals[pos]), accum_val)
+    return results
+
+
+def get_latency(stat1, stat2):
+    disks = set(i.split('.', 1)[0] for i in stat1)
+    results = {}
+
+    for disk in disks:
+        rdc = disk + '.reads_completed'
+        wrc = disk + '.writes_completed'
+        rdt = disk + '.rtime'
+        wrt = disk + '.wtime'
+        lat = 0.0
+
+        io_ops1 = stat1[rdc].value + stat1[wrc].value
+        io_ops2 = stat2[rdc].value + stat2[wrc].value
+
+        diops = io_ops2 - io_ops1
+
+        if diops != 0:
+            io1 = stat1[rdt].value + stat1[wrt].value
+            io2 = stat2[rdt].value + stat2[wrt].value
+            lat = abs(float(io1 - io2)) / diops
+
+        results[disk + '.latence'] = SensorInfo(lat, False)
+
+    return results
+
+
+#  1 - major number
+#  2 - minor mumber
+#  3 - device name
+#  4 - reads completed successfully
+#  5 - reads merged
+#  6 - sectors read
+#  7 - time spent reading (ms)
+#  8 - writes completed
+#  9 - writes merged
+# 10 - sectors written
+# 11 - time spent writing (ms)
+# 12 - I/Os currently in progress
+# 13 - time spent doing I/Os (ms)
+# 14 - weighted time spent doing I/Os (ms)
+
+net_values_pos = [
+    (0, 'recv_bytes', True),
+    (1, 'recv_packets', True),
+    (8, 'send_bytes', True),
+    (9, 'send_packets', True),
+]
+
+
+@provides("net-io")
+def net_stat(disallowed_prefixes=('docker', 'lo'), allowed_prefixes=('eth',)):
+    results = {}
+
+    for line in open('/proc/net/dev').readlines()[2:]:
+        dev_name, stats = line.split(":", 1)
+        dev_name = dev_name.strip()
+        vals = stats.split()
+
+        dev_ok = is_dev_accepted(dev_name,
+                                 disallowed_prefixes,
+                                 allowed_prefixes)
+
+        if '.' in dev_name and dev_name.split('.')[-1].isdigit():
+            dev_ok = False
+
+        if dev_ok:
+            for pos, name, accum_val in net_values_pos:
+                sensor_name = "{0}.{1}".format(dev_name, name)
+                results[sensor_name] = SensorInfo(int(vals[pos]), accum_val)
+    return results
+
+
+@provides("perprocess-cpu")
+def pscpu_stat(disallowed_prefixes=None, allowed_prefixes=None):
+    results = {}
+    pid_list = get_pid_list(disallowed_prefixes, allowed_prefixes)
+
+    for pid in pid_list:
+        try:
+            dev_name = get_pid_name(pid)
+
+            pid_stat1 = pid_stat(pid)
+
+            sensor_name = "{0}.{1}".format(dev_name, pid)
+            results[sensor_name] = SensorInfo(pid_stat1, True)
+        except IOError:
+            # may be, proc has already terminated, skip it
+            continue
+    return results
+
+
+def pid_stat(pid):
+    """Return total cpu usage time from process"""
+    # read /proc/pid/stat
+    with open(os.path.join('/proc/', pid, 'stat'), 'r') as pidfile:
+        proctimes = pidfile.readline().split()
+    # get utime from /proc/<pid>/stat, 14 item
+    utime = proctimes[13]
+    # get stime from proc/<pid>/stat, 15 item
+    stime = proctimes[14]
+    # count total process used time
+    return float(int(utime) + int(stime))
+
+
+# Based on ps_mem.py:
+# Licence: LGPLv2
+# Author:  P@draigBrady.com
+# Source:  http://www.pixelbeat.org/scripts/ps_mem.py
+#   http://github.com/pixelb/scripts/commits/master/scripts/ps_mem.py
+
+
+# Note shared is always a subset of rss (trs is not always)
+def get_mem_stats(pid):
+    """Return memory data of pid in format (private, shared)"""
+
+    fname = '/proc/{0}/{1}'.format(pid, "smaps")
+    lines = open(fname).readlines()
+
+    shared = 0
+    private = 0
+    pss = 0
+
+    # add 0.5KiB as this avg error due to trunctation
+    pss_adjust = 0.5
+
+    for line in lines:
+        if line.startswith("Shared"):
+            shared += int(line.split()[1])
+
+        if line.startswith("Private"):
+            private += int(line.split()[1])
+
+        if line.startswith("Pss"):
+            pss += float(line.split()[1]) + pss_adjust
+
+    # Note Shared + Private = Rss above
+    # The Rss in smaps includes video card mem etc.
+
+    if pss != 0:
+        shared = int(pss - private)
+
+    return (private, shared)
+
+
+@provides("perprocess-ram")
+def psram_stat(disallowed_prefixes=None, allowed_prefixes=None):
+    results = {}
+    pid_list = get_pid_list(disallowed_prefixes, allowed_prefixes)
+    for pid in pid_list:
+        try:
+            dev_name = get_pid_name(pid)
+
+            private, shared = get_mem_stats(pid)
+            total = private + shared
+            sys_total = get_ram_size()
+            usage = float(total) / float(sys_total)
+
+            sensor_name = "{0}({1})".format(dev_name, pid)
+
+            results[sensor_name + ".private_mem"] = SensorInfo(private, False)
+            results[sensor_name + ".shared_mem"] = SensorInfo(shared, False)
+            results[sensor_name + ".used_mem"] = SensorInfo(total, False)
+            name = sensor_name + ".mem_usage_percent"
+            results[name] = SensorInfo(usage * 100, False)
+        except IOError:
+            # permission denied or proc die
+            continue
+    return results
+
+
+def get_ram_size():
+    """Return RAM size in Kb"""
+    with open("/proc/meminfo") as proc:
+        mem_total = proc.readline().split()
+    return mem_total[1]
+
+
+# 0 - cpu name
+# 1 - user: normal processes executing in user mode
+# 2 - nice: niced processes executing in user mode
+# 3 - system: processes executing in kernel mode
+# 4 - idle: twiddling thumbs
+# 5 - iowait: waiting for I/O to complete
+# 6 - irq: servicing interrupts
+# 7 - softirq: servicing softirqs
+
+io_values_pos = [
+    (1, 'user_processes', True),
+    (2, 'nice_processes', True),
+    (3, 'system_processes', True),
+    (4, 'idle_time', True),
+]
+
+
+@provides("system-cpu")
+def syscpu_stat(disallowed_prefixes=None, allowed_prefixes=None):
+    results = {}
+
+    # calculate core count
+    core_count = 0
+
+    for line in open('/proc/stat'):
+        vals = line.split()
+        dev_name = vals[0]
+
+        if dev_name == 'cpu':
+            for pos, name, accum_val in io_values_pos:
+                sensor_name = "{0}.{1}".format(dev_name, name)
+                results[sensor_name] = SensorInfo(int(vals[pos]),
+                                                  accum_val)
+        elif dev_name == 'procs_blocked':
+            val = int(vals[1])
+            results["cpu.procs_blocked"] = SensorInfo(val, False)
+        elif dev_name.startswith('cpu'):
+            core_count += 1
+
+    # procs in queue
+    TASKSPOS = 3
+    vals = open('/proc/loadavg').read().split()
+    ready_procs = vals[TASKSPOS].partition('/')[0]
+    # dec on current proc
+    procs_queue = (float(ready_procs) - 1) / core_count
+    results["cpu.procs_queue"] = SensorInfo(procs_queue, False)
+
+    return results
+
+
+# return this values or setted in allowed
+ram_fields = [
+    'MemTotal',
+    'MemFree',
+    'Buffers',
+    'Cached',
+    'SwapCached',
+    'Dirty',
+    'Writeback',
+    'SwapTotal',
+    'SwapFree'
+]
+
+
+@provides("system-ram")
+def sysram_stat(disallowed_prefixes=None, allowed_prefixes=None):
+    if allowed_prefixes is None:
+        allowed_prefixes = ram_fields
+    results = {}
+    for line in open('/proc/meminfo'):
+        vals = line.split()
+        dev_name = vals[0].rstrip(":")
+
+        dev_ok = is_dev_accepted(dev_name,
+                                 disallowed_prefixes,
+                                 allowed_prefixes)
+
+        title = "ram.{0}".format(dev_name)
+
+        if dev_ok:
+            results[title] = SensorInfo(int(vals[1]), False)
+
+    if 'ram.MemFree' in results and 'ram.MemTotal' in results:
+        used = results['ram.MemTotal'].value - results['ram.MemFree'].value
+        usage = float(used) / results['ram.MemTotal'].value
+        results["ram.usage_percent"] = SensorInfo(usage, False)
+    return results
diff --git a/wally/sensors/__init__.py b/wally/sensors/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/wally/sensors/__init__.py
+++ /dev/null
diff --git a/wally/sensors/api.py b/wally/sensors/api.py
deleted file mode 100644
index 68b2e7d..0000000
--- a/wally/sensors/api.py
+++ /dev/null
@@ -1,94 +0,0 @@
-import os
-import time
-import json
-import logging
-import contextlib
-
-from concurrent.futures import ThreadPoolExecutor
-
-from wally.ssh_utils import (copy_paths, run_over_ssh,
-                             save_to_remote, read_from_remote)
-
-
-logger = logging.getLogger("wally.sensors")
-
-
-class SensorConfig(object):
-    def __init__(self, conn, url, sensors, source_id,
-                 monitor_url=None):
-        self.conn = conn
-        self.url = url
-        self.sensors = sensors
-        self.source_id = source_id
-        self.monitor_url = monitor_url
-
-
-@contextlib.contextmanager
-def with_sensors(sensor_configs, remote_path):
-    paths = {os.path.dirname(__file__):
-             os.path.join(remote_path, "sensors")}
-    config_remote_path = os.path.join(remote_path, "conf.json")
-
-    def deploy_sensors(node_sensor_config):
-        # check that path already exists
-        copy_paths(node_sensor_config.conn, paths)
-        with node_sensor_config.conn.open_sftp() as sftp:
-            sensors_config = node_sensor_config.sensors.copy()
-            sensors_config['source_id'] = node_sensor_config.source_id
-            save_to_remote(sftp, config_remote_path,
-                           json.dumps(sensors_config))
-
-    def remove_sensors(node_sensor_config):
-        run_over_ssh(node_sensor_config.conn,
-                     "rm -rf {0}".format(remote_path),
-                     node=node_sensor_config.url, timeout=10)
-
-    logger.debug("Installing sensors on {0} nodes".format(len(sensor_configs)))
-    with ThreadPoolExecutor(max_workers=32) as executor:
-        list(executor.map(deploy_sensors, sensor_configs))
-        try:
-            yield
-        finally:
-            list(executor.map(remove_sensors, sensor_configs))
-
-
-@contextlib.contextmanager
-def sensors_info(sensor_configs, remote_path):
-    config_remote_path = os.path.join(remote_path, "conf.json")
-
-    def start_sensors(node_sensor_config):
-        cmd_templ = 'env PYTHONPATH="{0}" python -m ' + \
-                    "sensors.main -d start -u {1} {2}"
-
-        cmd = cmd_templ.format(remote_path,
-                               node_sensor_config.monitor_url,
-                               config_remote_path)
-
-        run_over_ssh(node_sensor_config.conn, cmd,
-                     node=node_sensor_config.url)
-
-    def stop_and_gather_data(node_sensor_config):
-        cmd = 'env PYTHONPATH="{0}" python -m sensors.main -d stop'
-        cmd = cmd.format(remote_path)
-        run_over_ssh(node_sensor_config.conn, cmd,
-                     node=node_sensor_config.url)
-        # some magic
-        time.sleep(1)
-
-        assert node_sensor_config.monitor_url.startswith("csvfile://")
-
-        res_path = node_sensor_config.monitor_url.split("//", 1)[1]
-        with node_sensor_config.conn.open_sftp() as sftp:
-            res = read_from_remote(sftp, res_path)
-
-        return res
-
-    results = []
-
-    logger.debug("Starting sensors on {0} nodes".format(len(sensor_configs)))
-    with ThreadPoolExecutor(max_workers=32) as executor:
-        list(executor.map(start_sensors, sensor_configs))
-        try:
-            yield results
-        finally:
-            results.extend(executor.map(stop_and_gather_data, sensor_configs))
diff --git a/wally/sensors/cp_protocol.py b/wally/sensors/cp_protocol.py
deleted file mode 100644
index 1b6993c..0000000
--- a/wally/sensors/cp_protocol.py
+++ /dev/null
@@ -1,227 +0,0 @@
-#!/usr/bin/env python
-""" Protocol class """
-
-import re
-import zlib
-import json
-import logging
-import binascii
-
-
-logger = logging.getLogger("wally.sensors")
-
-
-# protocol contains 2 type of packet:
-# 1 - header, which contains template schema of counters
-# 2 - body, which contains only values in order as in template
-#       it uses msgpack (or provided packer) for optimization
-#
-# packet has format:
-# begin_data_prefixSIZE\n\nDATAend_data_postfix
-# packet part has format:
-# SIZE\n\rDATA
-#
-# DATA use archivation
-
-
-class PacketException(Exception):
-    """ Exceptions from Packet"""
-    pass
-
-
-class Packet(object):
-    """ Class proceed packet by protocol"""
-
-    prefix = "begin_data_prefix"
-    postfix = "end_data_postfix"
-    header_prefix = "template"
-    # other fields
-    # is_begin
-    # is_end
-    # crc
-    # data
-    # data_len
-
-    def __init__(self, packer):
-        # preinit
-        self.is_begin = False
-        self.is_end = False
-        self.crc = None
-        self.data = ""
-        self.data_len = None
-        self.srv_template = None
-        self.clt_template = None
-        self.tmpl_size = 0
-        self.packer = packer
-
-    def new_packet(self, part):
-        """ New packet adding """
-        # proceed packet
-        try:
-            # get size
-            local_size_s, _, part = part.partition("\n\r")
-            local_size = int(local_size_s)
-
-            # find prefix
-            begin = part.find(self.prefix)
-            if begin != -1:
-                # divide data if something before begin and prefix
-                from_i = begin + len(self.prefix)
-                part = part[from_i:]
-                # reset flags
-                self.is_begin = True
-                self.is_end = False
-                self.data = ""
-                # get size
-                data_len_s, _, part = part.partition("\n\r")
-                self.data_len = int(data_len_s)
-                # get crc
-                crc_s, _, part = part.partition("\n\r")
-                self.crc = int(crc_s)
-
-            # bad size?
-            if local_size != self.data_len:
-                raise PacketException("Part size error")
-
-            # find postfix
-            end = part.find(self.postfix)
-            if end != -1:
-                # divide postfix
-                part = part[:end]
-                self.is_end = True
-
-            self.data += part
-            # check if it is end
-            if self.is_end:
-                self.data = zlib.decompress(self.data)
-                if self.data_len != len(self.data):
-                    raise PacketException("Total size error")
-                if binascii.crc32(self.data) != self.crc:
-                    raise PacketException("CRC error")
-
-                # check, if it is template
-                if self.data.startswith(self.header_prefix):
-                    self.srv_template = self.data
-                    # template is for internal use
-                    return None
-
-                # decode values list
-                vals = self.packer.unpack(self.data)
-                dump = self.srv_template % tuple(vals)
-                return dump
-            else:
-                return None
-
-        except PacketException as e:
-            # if something wrong - skip packet
-            logger.warning("Packet skipped: %s", e)
-            self.is_begin = False
-            self.is_end = False
-            return None
-
-        except TypeError:
-            # if something wrong - skip packet
-            logger.warning("Packet skipped: doesn't match schema")
-            self.is_begin = False
-            self.is_end = False
-            return None
-
-        except:
-            # if something at all wrong - skip packet
-            logger.warning("Packet skipped: something is wrong")
-            self.is_begin = False
-            self.is_end = False
-            return None
-
-    @staticmethod
-    def create_packet(data, part_size):
-        """ Create packet divided by parts with part_size from data
-            No compression here """
-        # prepare data
-        data_len = "%i\n\r" % len(data)
-        header = "%s%s%s\n\r" % (Packet.prefix, data_len, binascii.crc32(data))
-        compact_data = zlib.compress(data)
-        packet = "%s%s%s" % (header, compact_data, Packet.postfix)
-
-        partheader_len = len(data_len)
-
-        beg = 0
-        end = part_size - partheader_len
-
-        result = []
-        while beg < len(packet):
-            block = packet[beg:beg+end]
-            result.append(data_len + block)
-            beg += end
-
-        return result
-
-    def create_packet_v2(self, data, part_size):
-        """ Create packet divided by parts with part_size from data
-            Compressed """
-        result = []
-        # create and add to result template header
-        if self.srv_template is None:
-            perf_string = json.dumps(data)
-            self.create_answer_template(perf_string)
-            template = self.header_prefix + self.srv_template
-            header = Packet.create_packet(template, part_size)
-            result.extend(header)
-
-        vals = self.get_matching_value_list(data)
-        body = self.packer.pack(vals)
-        parts = Packet.create_packet(body, part_size)
-        result.extend(parts)
-        return result
-
-    def get_matching_value_list(self, data):
-        """ Get values in order server expect"""
-        vals = range(0, self.tmpl_size)
-
-        try:
-            for node, groups in self.clt_template.items():
-                for group, counters in groups.items():
-                    for counter, index in counters.items():
-                        if not isinstance(index, dict):
-                            vals[index] = data[node][group][counter]
-                        else:
-                            for k, i in index.items():
-                                vals[i] = data[node][group][counter][k]
-
-            return vals
-
-        except (IndexError, KeyError):
-            logger = logging.getLogger(__name__)
-            logger.error("Data don't match last schema")
-            raise PacketException("Data don't match last schema")
-
-    def create_answer_template(self, perf_string):
-        """ Create template for server to insert counter values
-            Return tuple of server and clien templates + number of replaces"""
-        replacer = re.compile(": [0-9]+\.?[0-9]*")
-        # replace all values by %s
-        finditer = replacer.finditer(perf_string)
-        # server not need know positions
-        self.srv_template = ""
-        # client need positions
-        clt_template = ""
-        beg = 0
-        k = 0
-        # this could be done better?
-        for match in finditer:
-            # define input place in server template
-            self.srv_template += perf_string[beg:match.start()]
-            self.srv_template += ": %s"
-            # define match number in client template
-            clt_template += perf_string[beg:match.start()]
-            clt_template += ": %i" % k
-
-            beg = match.end()
-            k += 1
-
-        # add tail
-        self.srv_template += perf_string[beg:]
-        clt_template += perf_string[beg:]
-
-        self.tmpl_size = k
-        self.clt_template = json.loads(clt_template)
diff --git a/wally/sensors/cp_transport.py b/wally/sensors/cp_transport.py
deleted file mode 100644
index 2e00e80..0000000
--- a/wally/sensors/cp_transport.py
+++ /dev/null
@@ -1,157 +0,0 @@
-#!/usr/bin/env python
-""" UDP sender class """
-
-import socket
-import urlparse
-
-from cp_protocol import Packet
-
-try:
-    from disk_perf_test_tool.logger import define_logger
-    logger = define_logger(__name__)
-except ImportError:
-    class Logger(object):
-        def debug(self, *dt):
-            pass
-    logger = Logger()
-
-
-class SenderException(Exception):
-    """ Exceptions in Sender class """
-    pass
-
-
-class Timeout(Exception):
-    """ Exceptions in Sender class """
-    pass
-
-
-class Sender(object):
-    """ UDP sender class """
-
-    def __init__(self, packer, url=None, port=None, host="0.0.0.0", size=256):
-        """ Create connection object from input udp string or params"""
-
-        # test input
-        if url is None and port is None:
-            raise SenderException("Bad initialization")
-        if url is not None:
-            data = urlparse.urlparse(url)
-            # check schema
-            if data.scheme != "udp":
-                mes = "Bad protocol type: %s instead of UDP" % data.scheme
-                logger.error(mes)
-                raise SenderException("Bad protocol type")
-            # try to get port
-            try:
-                int_port = int(data.port)
-            except ValueError:
-                logger.error("Bad UDP port")
-                raise SenderException("Bad UDP port")
-            # save paths
-            self.sendto = (data.hostname, int_port)
-            self.bindto = (data.hostname, int_port)
-            # try to get size
-            try:
-                self.size = int(data.path.strip("/"))
-            except ValueError:
-                logger.error("Bad packet part size")
-                raise SenderException("Bad packet part size")
-        else:
-            # url is None - use size and port
-            self.sendto = (host, port)
-            self.bindto = ("0.0.0.0", port)
-            self.size = size
-
-        self.packer = packer
-
-        self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
-        self.binded = False
-        self.all_data = {}
-        self.send_packer = None
-
-    def bind(self):
-        """ Prepare for listening """
-        self.sock.bind(self.bindto)
-        self.sock.settimeout(0.5)
-        self.binded = True
-
-    def send(self, data):
-        """ Send data to udp socket"""
-        if self.sock.sendto(data, self.sendto) != len(data):
-            mes = "Cannot send data to %s:%s" % self.sendto
-            logger.error(mes)
-            raise SenderException("Cannot send data")
-
-    def send_by_protocol(self, data):
-        """ Send data by Packet protocol
-            data = dict"""
-        if self.send_packer is None:
-            self.send_packer = Packet(self.packer())
-        parts = self.send_packer.create_packet_v2(data, self.size)
-        for part in parts:
-            self.send(part)
-
-    def recv(self):
-        """ Receive data from udp socket"""
-        # check for binding
-        if not self.binded:
-            self.bind()
-        # try to recv
-        try:
-            data, (remote_ip, _) = self.sock.recvfrom(self.size)
-            return data, remote_ip
-        except socket.timeout:
-            raise Timeout()
-
-    def recv_by_protocol(self):
-        """ Receive data from udp socket by Packet protocol"""
-        data, remote_ip = self.recv()
-
-        if remote_ip not in self.all_data:
-            self.all_data[remote_ip] = Packet(self.packer())
-
-        return self.all_data[remote_ip].new_packet(data)
-
-    def recv_with_answer(self, stop_event=None):
-        """ Receive data from udp socket and send 'ok' back
-            Command port = local port + 1
-            Answer port = local port
-            Waiting for command is blocking """
-        # create command socket
-        command_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
-        command_port = self.bindto[1]+1
-        command_sock.bind(("0.0.0.0", command_port))
-        command_sock.settimeout(1)
-        # try to recv
-        while True:
-            try:
-                data, (remote_ip, _) = command_sock.recvfrom(self.size)
-                self.send("ok")
-                return data, remote_ip
-            except socket.timeout:
-                if stop_event is not None and stop_event.is_set():
-                    # return None if we are interrupted
-                    return None
-
-    def verified_send(self, send_host, message, max_repeat=20):
-        """ Send and verify it by answer not more then max_repeat
-            Send port = local port + 1
-            Answer port = local port
-            Return True if send is verified """
-        # create send socket
-        send_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
-        send_port = self.sendto[1]+1
-        for repeat in range(0, max_repeat):
-            send_sock.sendto(message, (send_host, send_port))
-            try:
-                data, remote_ip = self.recv()
-                if remote_ip == send_host and data == "ok":
-                    return True
-                else:
-                    logger.warning("No answer from %s, try %i",
-                                   send_host, repeat)
-            except Timeout:
-                logger.warning("No answer from %s, try %i", send_host, repeat)
-
-        return False
diff --git a/wally/sensors/daemonize.py b/wally/sensors/daemonize.py
deleted file mode 100644
index bc4ab81..0000000
--- a/wally/sensors/daemonize.py
+++ /dev/null
@@ -1,225 +0,0 @@
-# #!/usr/bin/python
-
-import os
-import pwd
-import grp
-import sys
-import fcntl
-import signal
-import atexit
-import logging
-import resource
-
-
-class Daemonize(object):
-    """ Daemonize object
-    Object constructor expects three arguments:
-    - app: contains the application name which will be sent to syslog.
-    - pid: path to the pidfile.
-    - action: your custom function which will be executed after daemonization.
-    - keep_fds: optional list of fds which should not be closed.
-    - auto_close_fds: optional parameter to not close opened fds.
-    - privileged_action: action that will be executed before
-                         drop privileges if user or
-                         group parameter is provided.
-                         If you want to transfer anything from privileged
-                         action to action, such as opened privileged file
-                         descriptor, you should return it from
-                         privileged_action function and catch it inside action
-                         function.
-    - user: drop privileges to this user if provided.
-    - group: drop privileges to this group if provided.
-    - verbose: send debug messages to logger if provided.
-    - logger: use this logger object instead of creating new one, if provided.
-    """
-    def __init__(self, app, pid, action, keep_fds=None, auto_close_fds=True,
-                 privileged_action=None, user=None, group=None, verbose=False,
-                 logger=None):
-        self.app = app
-        self.pid = pid
-        self.action = action
-        self.keep_fds = keep_fds or []
-        self.privileged_action = privileged_action or (lambda: ())
-        self.user = user
-        self.group = group
-        self.logger = logger
-        self.verbose = verbose
-        self.auto_close_fds = auto_close_fds
-
-    def sigterm(self, signum, frame):
-        """ sigterm method
-        These actions will be done after SIGTERM.
-        """
-        self.logger.warn("Caught signal %s. Stopping daemon." % signum)
-        os.remove(self.pid)
-        sys.exit(0)
-
-    def exit(self):
-        """ exit method
-        Cleanup pid file at exit.
-        """
-        self.logger.warn("Stopping daemon.")
-        os.remove(self.pid)
-        sys.exit(0)
-
-    def start(self):
-        """ start method
-        Main daemonization process.
-        """
-        # If pidfile already exists, we should read pid from there;
-        # to overwrite it, if locking
-        # will fail, because locking attempt somehow purges the file contents.
-        if os.path.isfile(self.pid):
-            with open(self.pid, "r") as old_pidfile:
-                old_pid = old_pidfile.read()
-        # Create a lockfile so that only one instance of this daemon is
-        # running at any time.
-        try:
-            lockfile = open(self.pid, "w")
-        except IOError:
-            print("Unable to create the pidfile.")
-            sys.exit(1)
-        try:
-            # Try to get an exclusive lock on the file. This will fail if
-            # another process has the file
-            # locked.
-            fcntl.flock(lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB)
-        except IOError:
-            print("Unable to lock on the pidfile.")
-            # We need to overwrite the pidfile if we got here.
-            with open(self.pid, "w") as pidfile:
-                pidfile.write(old_pid)
-            sys.exit(1)
-
-        # Fork, creating a new process for the child.
-        process_id = os.fork()
-        if process_id < 0:
-            # Fork error. Exit badly.
-            sys.exit(1)
-        elif process_id != 0:
-            # This is the parent process. Exit.
-            sys.exit(0)
-        # This is the child process. Continue.
-
-        # Stop listening for signals that the parent process receives.
-        # This is done by getting a new process id.
-        # setpgrp() is an alternative to setsid().
-        # setsid puts the process in a new parent group and detaches
-        # its controlling terminal.
-        process_id = os.setsid()
-        if process_id == -1:
-            # Uh oh, there was a problem.
-            sys.exit(1)
-
-        # Add lockfile to self.keep_fds.
-        self.keep_fds.append(lockfile.fileno())
-
-        # Close all file descriptors, except the ones mentioned in
-        # self.keep_fds.
-        devnull = "/dev/null"
-        if hasattr(os, "devnull"):
-            # Python has set os.devnull on this system, use it instead as it
-            # might be different
-            # than /dev/null.
-            devnull = os.devnull
-
-        if self.auto_close_fds:
-            for fd in range(3, resource.getrlimit(resource.RLIMIT_NOFILE)[0]):
-                if fd not in self.keep_fds:
-                    try:
-                        os.close(fd)
-                    except OSError:
-                        pass
-
-        devnull_fd = os.open(devnull, os.O_RDWR)
-        os.dup2(devnull_fd, 0)
-        os.dup2(devnull_fd, 1)
-        os.dup2(devnull_fd, 2)
-
-        if self.logger is None:
-            # Initialize logging.
-            self.logger = logging.getLogger(self.app)
-            self.logger.setLevel(logging.DEBUG)
-            # Display log messages only on defined handlers.
-            self.logger.propagate = False
-
-            # Initialize syslog.
-            # It will correctly work on OS X, Linux and FreeBSD.
-            if sys.platform == "darwin":
-                syslog_address = "/var/run/syslog"
-            else:
-                syslog_address = "/dev/log"
-
-            # We will continue with syslog initialization only if
-            # actually have such capabilities
-            # on the machine we are running this.
-            if os.path.isfile(syslog_address):
-                syslog = logging.handlers.SysLogHandler(syslog_address)
-                if self.verbose:
-                    syslog.setLevel(logging.DEBUG)
-                else:
-                    syslog.setLevel(logging.INFO)
-                # Try to mimic to normal syslog messages.
-                format_t = "%(asctime)s %(name)s: %(message)s"
-                formatter = logging.Formatter(format_t,
-                                              "%b %e %H:%M:%S")
-                syslog.setFormatter(formatter)
-
-                self.logger.addHandler(syslog)
-
-        # Set umask to default to safe file permissions when running
-        # as a root daemon. 027 is an
-        # octal number which we are typing as 0o27 for Python3 compatibility.
-        os.umask(0o27)
-
-        # Change to a known directory. If this isn't done, starting a daemon
-        # in a subdirectory that
-        # needs to be deleted results in "directory busy" errors.
-        os.chdir("/")
-
-        # Execute privileged action
-        privileged_action_result = self.privileged_action()
-        if not privileged_action_result:
-            privileged_action_result = []
-
-        # Change gid
-        if self.group:
-            try:
-                gid = grp.getgrnam(self.group).gr_gid
-            except KeyError:
-                self.logger.error("Group {0} not found".format(self.group))
-                sys.exit(1)
-            try:
-                os.setgid(gid)
-            except OSError:
-                self.logger.error("Unable to change gid.")
-                sys.exit(1)
-
-        # Change uid
-        if self.user:
-            try:
-                uid = pwd.getpwnam(self.user).pw_uid
-            except KeyError:
-                self.logger.error("User {0} not found.".format(self.user))
-                sys.exit(1)
-            try:
-                os.setuid(uid)
-            except OSError:
-                self.logger.error("Unable to change uid.")
-                sys.exit(1)
-
-        try:
-            lockfile.write("%s" % (os.getpid()))
-            lockfile.flush()
-        except IOError:
-            self.logger.error("Unable to write pid to the pidfile.")
-            print("Unable to write pid to the pidfile.")
-            sys.exit(1)
-
-        # Set custom action on SIGTERM.
-        signal.signal(signal.SIGTERM, self.sigterm)
-        atexit.register(self.exit)
-
-        self.logger.warn("Starting daemon.")
-
-        self.action(*privileged_action_result)
diff --git a/wally/sensors/discover.py b/wally/sensors/discover.py
deleted file mode 100644
index f227043..0000000
--- a/wally/sensors/discover.py
+++ /dev/null
@@ -1,9 +0,0 @@
-all_sensors = {}
-
-
-def provides(sensor_class_name):
-    def closure(func):
-        assert sensor_class_name not in all_sensors
-        all_sensors[sensor_class_name] = func
-        return func
-    return closure
diff --git a/wally/sensors/main.py b/wally/sensors/main.py
deleted file mode 100644
index 20eedc5..0000000
--- a/wally/sensors/main.py
+++ /dev/null
@@ -1,154 +0,0 @@
-import os
-import sys
-import time
-import json
-import glob
-import signal
-import os.path
-import argparse
-
-from .sensors.utils import SensorInfo
-from .daemonize import Daemonize
-from .discover import all_sensors
-from .protocol import create_protocol
-
-
-# load all sensors
-from . import sensors
-sensors_dir = os.path.dirname(sensors.__file__)
-for fname in glob.glob(os.path.join(sensors_dir, "*.py")):
-    mod_name = os.path.basename(fname[:-3])
-    __import__("sensors.sensors." + mod_name)
-
-
-def get_values(required_sensors):
-    result = {}
-    for sensor_name, params in required_sensors:
-        if sensor_name in all_sensors:
-            result.update(all_sensors[sensor_name](**params))
-        else:
-            msg = "Sensor {0!r} isn't available".format(sensor_name)
-            raise ValueError(msg)
-    return result
-
-
-def parse_args(args):
-    parser = argparse.ArgumentParser()
-    parser.add_argument('-d', '--daemon',
-                        choices=('start', 'stop', 'status',
-                                 'start_monitoring', 'stop_monitoring',
-                                 'dump_ram_data'),
-                        default=None)
-
-    parser.add_argument('-u', '--url', default='stdout://')
-    parser.add_argument('-t', '--timeout', type=float, default=1)
-    parser.add_argument('-l', '--list-sensors', action='store_true')
-    parser.add_argument('sensors_config', type=argparse.FileType('r'),
-                        default=None, nargs='?')
-    return parser.parse_args(args[1:])
-
-
-def daemon_main(required_sensors, opts):
-    try:
-        source_id = str(required_sensors.pop('source_id'))
-    except KeyError:
-        source_id = None
-
-    sender = create_protocol(opts.url)
-    prev = {}
-    next_data_record_time = time.time()
-
-    first_round = True
-    while True:
-        real_time = int(time.time())
-
-        if real_time < int(next_data_record_time):
-            if int(next_data_record_time) - real_time > 2:
-                print "Error: sleep too small portion!!"
-            report_time = int(next_data_record_time)
-        elif real_time > int(next_data_record_time):
-            if real_time - int(next_data_record_time) > 2:
-                report_time = real_time
-            else:
-                report_time = int(next_data_record_time)
-        else:
-            report_time = real_time
-
-        data = get_values(required_sensors.items())
-        curr = {'time': SensorInfo(report_time, True)}
-        for name, val in data.items():
-            if val.is_accumulated:
-                if name in prev:
-                    curr[name] = SensorInfo(val.value - prev[name], True)
-                prev[name] = val.value
-            else:
-                curr[name] = SensorInfo(val.value, False)
-
-        if source_id is not None:
-            curr['source_id'] = source_id
-
-        # on first round not all fields was ready
-        # this leads to setting wrong headers list
-        if not first_round:
-            sender.send(curr)
-        else:
-            first_round = False
-
-        next_data_record_time = report_time + opts.timeout + 0.5
-        time.sleep(next_data_record_time - time.time())
-
-
-def pid_running(pid):
-    return os.path.exists("/proc/" + str(pid))
-
-
-def main(argv):
-    opts = parse_args(argv)
-
-    if opts.list_sensors:
-        print "\n".join(sorted(all_sensors))
-        return 0
-
-    if opts.daemon is not None:
-        pid_file = "/tmp/sensors.pid"
-        if opts.daemon == 'start':
-            required_sensors = json.loads(opts.sensors_config.read())
-
-            def root_func():
-                daemon_main(required_sensors, opts)
-
-            daemon = Daemonize(app="perfcollect_app",
-                               pid=pid_file,
-                               action=root_func)
-            daemon.start()
-        elif opts.daemon == 'stop':
-            if os.path.isfile(pid_file):
-                pid = int(open(pid_file).read())
-                if pid_running(pid):
-                    os.kill(pid, signal.SIGTERM)
-
-                time.sleep(0.5)
-
-                if pid_running(pid):
-                    os.kill(pid, signal.SIGKILL)
-
-                time.sleep(0.5)
-
-                if os.path.isfile(pid_file):
-                    os.unlink(pid_file)
-        elif opts.daemon == 'status':
-            if os.path.isfile(pid_file):
-                pid = int(open(pid_file).read())
-                if pid_running(pid):
-                    print "running"
-                    return
-            print "stopped"
-        else:
-            raise ValueError("Unknown daemon operation {}".format(opts.daemon))
-    else:
-        required_sensors = json.loads(opts.sensors_config.read())
-        daemon_main(required_sensors, opts)
-    return 0
-
-if __name__ == "__main__":
-    exit(main(sys.argv))
diff --git a/wally/sensors/protocol.py b/wally/sensors/protocol.py
deleted file mode 100644
index 7c8aa0e..0000000
--- a/wally/sensors/protocol.py
+++ /dev/null
@@ -1,312 +0,0 @@
-import sys
-import csv
-import time
-import struct
-import socket
-import select
-import cPickle as pickle
-from urlparse import urlparse
-
-
-class Timeout(Exception):
-    pass
-
-
-class CantUnpack(Exception):
-    pass
-
-
-# ------------------------------------- Serializers --------------------------
-
-
-class ISensortResultsSerializer(object):
-    def pack(self, data):
-        pass
-
-    def unpack(self, data):
-        pass
-
-
-class StructSerializerSend(ISensortResultsSerializer):
-    initial_times = 5
-    resend_timeout = 60
-    HEADERS = 'h'
-    DATA = 'd'
-    END_OF_HEADERS = '\x00'
-    END_OF_SOURCE_ID = '\x00'
-    HEADERS_SEPARATOR = '\n'
-
-    def __init__(self):
-        self.field_order = None
-        self.headers_send_cycles_left = self.initial_times
-        self.pack_fmt = None
-        self.next_header_send_time = None
-
-    def pack(self, data):
-        data = data.copy()
-
-        source_id = data.pop("source_id")
-        vals = [int(data.pop("time").value)]
-
-        if self.field_order is None:
-            self.field_order = sorted(data.keys())
-            self.pack_fmt = "!I" + "I" * len(self.field_order)
-
-        need_resend = False
-        if self.next_header_send_time is not None:
-            if time.time() > self.next_header_send_time:
-                need_resend = True
-
-        if self.headers_send_cycles_left > 0 or need_resend:
-            forder = self.HEADERS_SEPARATOR.join(self.field_order)
-            flen = struct.pack("!H", len(self.field_order))
-
-            result = (self.HEADERS + source_id +
-                      self.END_OF_SOURCE_ID +
-                      socket.gethostname() +
-                      self.END_OF_SOURCE_ID +
-                      flen + forder + self.END_OF_HEADERS)
-
-            if self.headers_send_cycles_left > 0:
-                self.headers_send_cycles_left -= 1
-
-            self.next_header_send_time = time.time() + self.resend_timeout
-        else:
-            result = ""
-
-        for name in self.field_order:
-            vals.append(int(data[name].value))
-
-        packed_data = self.DATA + source_id
-        packed_data += self.END_OF_SOURCE_ID
-        packed_data += struct.pack(self.pack_fmt, *vals)
-
-        return result + packed_data
-
-
-class StructSerializerRecv(ISensortResultsSerializer):
-    def __init__(self):
-        self.fields = {}
-        self.formats = {}
-        self.hostnames = {}
-
-    def unpack(self, data):
-        code = data[0]
-
-        if code == StructSerializerSend.HEADERS:
-            source_id, hostname, packed_data = data[1:].split(
-                StructSerializerSend.END_OF_SOURCE_ID, 2)
-            # fields order provided
-            flen_sz = struct.calcsize("!H")
-            flen = struct.unpack("!H", packed_data[:flen_sz])[0]
-
-            headers_data, rest = packed_data[flen_sz:].split(
-                StructSerializerSend.END_OF_HEADERS, 1)
-
-            forder = headers_data.split(
-                StructSerializerSend.HEADERS_SEPARATOR)
-
-            assert len(forder) == flen, \
-                "Wrong len {0} != {1}".format(len(forder), flen)
-
-            if 'source_id' in self.fields:
-                assert self.fields[source_id] == ['time'] + forder,\
-                    "New field order"
-            else:
-                self.fields[source_id] = ['time'] + forder
-                self.formats[source_id] = "!I" + "I" * flen
-                self.hostnames[source_id] = hostname
-
-            if len(rest) != 0:
-                return self.unpack(rest)
-            return None
-        else:
-            source_id, packed_data = data[1:].split(
-                StructSerializerSend.END_OF_SOURCE_ID, 1)
-            assert code == StructSerializerSend.DATA,\
-                "Unknown code {0!r}".format(code)
-
-            try:
-                fields = self.fields[source_id]
-            except KeyError:
-                raise CantUnpack("No fields order provided"
-                                 " for {0} yet".format(source_id))
-            s_format = self.formats[source_id]
-
-            exp_size = struct.calcsize(s_format)
-            assert len(packed_data) == exp_size, \
-                "Wrong data len {0} != {1}".format(len(packed_data), exp_size)
-
-            vals = struct.unpack(s_format, packed_data)
-            res = dict(zip(fields, vals))
-            res['source_id'] = source_id
-            res['hostname'] = self.hostnames[source_id]
-            return res
-
-
-class PickleSerializer(ISensortResultsSerializer):
-    def pack(self, data):
-        ndata = {}
-        for key, val in data.items():
-            if isinstance(val, basestring):
-                ndata[key] = val
-            else:
-                ndata[key] = val.value
-        return pickle.dumps(ndata)
-
-    def unpack(self, data):
-        return pickle.loads(data)
-
-# ------------------------------------- Transports ---------------------------
-
-
-class ITransport(object):
-    def __init__(self, receiver):
-        pass
-
-    def send(self, data):
-        pass
-
-    def recv(self, timeout=None):
-        pass
-
-
-class StdoutTransport(ITransport):
-    MIN_COL_WIDTH = 10
-
-    def __init__(self, receiver, delta=True):
-        if receiver:
-            cname = self.__class__.__name__
-            raise ValueError("{0} don't allows receiving".format(cname))
-
-        self.headers = None
-        self.line_format = ""
-        self.prev = {}
-        self.delta = delta
-        self.fd = sys.stdout
-
-    def send(self, data):
-        if self.headers is None:
-            self.headers = sorted(data)
-            self.headers.remove('source_id')
-
-            for pos, header in enumerate(self.headers):
-                self.line_format += "{%s:>%s}" % (pos,
-                                                  max(len(header) + 1,
-                                                      self.MIN_COL_WIDTH))
-
-            print self.line_format.format(*self.headers)
-
-        if self.delta:
-
-            vals = [data[header].value - self.prev.get(header, 0)
-                    for header in self.headers]
-
-            self.prev.update(dict((header, data[header].value)
-                             for header in self.headers))
-        else:
-            vals = [data[header].value for header in self.headers]
-
-        self.fd.write(self.line_format.format(*vals) + "\n")
-
-    def recv(self, timeout=None):
-        cname = self.__class__.__name__
-        raise ValueError("{0} don't allows receiving".format(cname))
-
-
-class FileTransport(StdoutTransport):
-    def __init__(self, receiver, fname, delta=True):
-        StdoutTransport.__init__(self, receiver, delta)
-        self.fd = open(fname, "w")
-
-
-class CSVFileTransport(ITransport):
-    required_keys = set(['time', 'source_id'])
-
-    def __init__(self, receiver, fname):
-        ITransport.__init__(self, receiver)
-        self.fd = open(fname, "w")
-        self.csv_fd = csv.writer(self.fd)
-        self.field_list = []
-        self.csv_fd.writerow(['NEW_DATA'])
-
-    def send(self, data):
-        if self.field_list == []:
-            keys = set(data)
-            assert self.required_keys.issubset(keys)
-            keys -= self.required_keys
-            self.field_list = sorted(keys)
-            self.csv_fd.writerow([data['source_id'], socket.getfqdn()] +
-                                 self.field_list)
-            self.field_list = ['time'] + self.field_list
-
-        self.csv_fd.writerow([data[sens].value for sens in self.field_list])
-
-
-class RAMTransport(ITransport):
-    def __init__(self, next_tr):
-        self.next = next_tr
-        self.data = []
-
-    def send(self, data):
-        self.data.append(data)
-
-    def flush(self):
-        for data in self.data:
-            self.next.send(data)
-        self.data = []
-
-
-class UDPTransport(ITransport):
-    def __init__(self, receiver, ip, port, packer_cls):
-        self.port = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
-        if receiver:
-            self.port.bind((ip, port))
-            self.packer_cls = packer_cls
-            self.packers = {}
-        else:
-            self.packer = packer_cls()
-            self.dst = (ip, port)
-
-    def send(self, data):
-        raw_data = self.packer.pack(data)
-        self.port.sendto(raw_data, self.dst)
-
-    def recv(self, timeout=None):
-        r, _, _ = select.select([self.port], [], [], timeout)
-        if len(r) != 0:
-            raw_data, addr = self.port.recvfrom(10000)
-            packer = self.packers.setdefault(addr, self.packer_cls())
-            return addr, packer.unpack(raw_data)
-        else:
-            raise Timeout()
-
-
-# -------------------------- Factory function --------------------------------
-
-
-def create_protocol(uri, receiver=False):
-    if uri == 'stdout':
-        return StdoutTransport(receiver)
-
-    parsed_uri = urlparse(uri)
-    if parsed_uri.scheme == 'udp':
-        ip, port = parsed_uri.netloc.split(":")
-
-        if receiver:
-            packer_cls = StructSerializerRecv
-        else:
-            packer_cls = StructSerializerSend
-
-        return UDPTransport(receiver, ip=ip, port=int(port),
-                            packer_cls=packer_cls)
-    elif parsed_uri.scheme == 'file':
-        return FileTransport(receiver, parsed_uri.path)
-    elif parsed_uri.scheme == 'csvfile':
-        return CSVFileTransport(receiver, parsed_uri.path)
-    elif parsed_uri.scheme == 'ram':
-        intenal_recv = CSVFileTransport(receiver, parsed_uri.path)
-        return RAMTransport(intenal_recv)
-    else:
-        templ = "Can't instantiate transport from {0!r}"
-        raise ValueError(templ.format(uri))
diff --git a/wally/sensors/sensors/__init__.py b/wally/sensors/sensors/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/wally/sensors/sensors/__init__.py
+++ /dev/null
diff --git a/wally/sensors/sensors/io_sensors.py b/wally/sensors/sensors/io_sensors.py
deleted file mode 100644
index e4049a9..0000000
--- a/wally/sensors/sensors/io_sensors.py
+++ /dev/null
@@ -1,74 +0,0 @@
-from ..discover import provides
-from .utils import SensorInfo, is_dev_accepted
-
-#  1 - major number
-#  2 - minor mumber
-#  3 - device name
-#  4 - reads completed successfully
-#  5 - reads merged
-#  6 - sectors read
-#  7 - time spent reading (ms)
-#  8 - writes completed
-#  9 - writes merged
-# 10 - sectors written
-# 11 - time spent writing (ms)
-# 12 - I/Os currently in progress
-# 13 - time spent doing I/Os (ms)
-# 14 - weighted time spent doing I/Os (ms)
-
-io_values_pos = [
-    (3, 'reads_completed', True),
-    (5, 'sectors_read', True),
-    (6, 'rtime', True),
-    (7, 'writes_completed', True),
-    (9, 'sectors_written', True),
-    (10, 'wtime', True),
-    (11, 'io_queue', False),
-    (13, 'io_time', True)
-]
-
-
-@provides("block-io")
-def io_stat(disallowed_prefixes=('ram', 'loop'), allowed_prefixes=None):
-    results = {}
-    for line in open('/proc/diskstats'):
-        vals = line.split()
-        dev_name = vals[2]
-
-        dev_ok = is_dev_accepted(dev_name,
-                                 disallowed_prefixes,
-                                 allowed_prefixes)
-        if dev_name[-1].isdigit():
-            dev_ok = False
-
-        if dev_ok:
-            for pos, name, accum_val in io_values_pos:
-                sensor_name = "{0}.{1}".format(dev_name, name)
-                results[sensor_name] = SensorInfo(int(vals[pos]), accum_val)
-    return results
-
-
-def get_latency(stat1, stat2):
-    disks = set(i.split('.', 1)[0] for i in stat1)
-    results = {}
-
-    for disk in disks:
-        rdc = disk + '.reads_completed'
-        wrc = disk + '.writes_completed'
-        rdt = disk + '.rtime'
-        wrt = disk + '.wtime'
-        lat = 0.0
-
-        io_ops1 = stat1[rdc].value + stat1[wrc].value
-        io_ops2 = stat2[rdc].value + stat2[wrc].value
-
-        diops = io_ops2 - io_ops1
-
-        if diops != 0:
-            io1 = stat1[rdt].value + stat1[wrt].value
-            io2 = stat2[rdt].value + stat2[wrt].value
-            lat = abs(float(io1 - io2)) / diops
-
-        results[disk + '.latence'] = SensorInfo(lat, False)
-
-    return results
diff --git a/wally/sensors/sensors/net_sensors.py b/wally/sensors/sensors/net_sensors.py
deleted file mode 100644
index 2723499..0000000
--- a/wally/sensors/sensors/net_sensors.py
+++ /dev/null
@@ -1,47 +0,0 @@
-from ..discover import provides
-from .utils import SensorInfo, is_dev_accepted
-
-#  1 - major number
-#  2 - minor mumber
-#  3 - device name
-#  4 - reads completed successfully
-#  5 - reads merged
-#  6 - sectors read
-#  7 - time spent reading (ms)
-#  8 - writes completed
-#  9 - writes merged
-# 10 - sectors written
-# 11 - time spent writing (ms)
-# 12 - I/Os currently in progress
-# 13 - time spent doing I/Os (ms)
-# 14 - weighted time spent doing I/Os (ms)
-
-net_values_pos = [
-    (0, 'recv_bytes', True),
-    (1, 'recv_packets', True),
-    (8, 'send_bytes', True),
-    (9, 'send_packets', True),
-]
-
-
-@provides("net-io")
-def net_stat(disallowed_prefixes=('docker', 'lo'), allowed_prefixes=('eth',)):
-    results = {}
-
-    for line in open('/proc/net/dev').readlines()[2:]:
-        dev_name, stats = line.split(":", 1)
-        dev_name = dev_name.strip()
-        vals = stats.split()
-
-        dev_ok = is_dev_accepted(dev_name,
-                                 disallowed_prefixes,
-                                 allowed_prefixes)
-
-        if '.' in dev_name and dev_name.split('.')[-1].isdigit():
-            dev_ok = False
-
-        if dev_ok:
-            for pos, name, accum_val in net_values_pos:
-                sensor_name = "{0}.{1}".format(dev_name, name)
-                results[sensor_name] = SensorInfo(int(vals[pos]), accum_val)
-    return results
diff --git a/wally/sensors/sensors/pscpu_sensors.py b/wally/sensors/sensors/pscpu_sensors.py
deleted file mode 100644
index ccbcefc..0000000
--- a/wally/sensors/sensors/pscpu_sensors.py
+++ /dev/null
@@ -1,37 +0,0 @@
-import os
-
-from ..discover import provides
-from .utils import SensorInfo, get_pid_name, get_pid_list
-
-
-@provides("perprocess-cpu")
-def pscpu_stat(disallowed_prefixes=None, allowed_prefixes=None):
-    results = {}
-    pid_list = get_pid_list(disallowed_prefixes, allowed_prefixes)
-
-    for pid in pid_list:
-        try:
-            dev_name = get_pid_name(pid)
-
-            pid_stat1 = pid_stat(pid)
-
-            sensor_name = "{0}.{1}".format(dev_name, pid)
-            results[sensor_name] = SensorInfo(pid_stat1, True)
-        except IOError:
-            # may be, proc has already terminated, skip it
-            continue
-    return results
-
-
-def pid_stat(pid):
-    """Return total cpu usage time from process"""
-    # read /proc/pid/stat
-    with open(os.path.join('/proc/', pid, 'stat'), 'r') as pidfile:
-        proctimes = pidfile.readline().split()
-    # get utime from /proc/<pid>/stat, 14 item
-    utime = proctimes[13]
-    # get stime from proc/<pid>/stat, 15 item
-    stime = proctimes[14]
-    # count total process used time
-    proctotal = int(utime) + int(stime)
-    return float(proctotal)
diff --git a/wally/sensors/sensors/psram_sensors.py b/wally/sensors/sensors/psram_sensors.py
deleted file mode 100644
index 34f729a..0000000
--- a/wally/sensors/sensors/psram_sensors.py
+++ /dev/null
@@ -1,75 +0,0 @@
-from ..discover import provides
-from .utils import SensorInfo, get_pid_name, get_pid_list
-
-
-# Based on ps_mem.py:
-# Licence: LGPLv2
-# Author:  P@draigBrady.com
-# Source:  http://www.pixelbeat.org/scripts/ps_mem.py
-#   http://github.com/pixelb/scripts/commits/master/scripts/ps_mem.py
-
-
-# Note shared is always a subset of rss (trs is not always)
-def get_mem_stats(pid):
-    """Return memory data of pid in format (private, shared)"""
-
-    fname = '/proc/{0}/{1}'.format(pid, "smaps")
-    lines = open(fname).readlines()
-
-    shared = 0
-    private = 0
-    pss = 0
-
-    # add 0.5KiB as this avg error due to trunctation
-    pss_adjust = 0.5
-
-    for line in lines:
-        if line.startswith("Shared"):
-            shared += int(line.split()[1])
-
-        if line.startswith("Private"):
-            private += int(line.split()[1])
-
-        if line.startswith("Pss"):
-            pss += float(line.split()[1]) + pss_adjust
-
-    # Note Shared + Private = Rss above
-    # The Rss in smaps includes video card mem etc.
-
-    if pss != 0:
-        shared = int(pss - private)
-
-    return (private, shared)
-
-
-@provides("perprocess-ram")
-def psram_stat(disallowed_prefixes=None, allowed_prefixes=None):
-    results = {}
-    pid_list = get_pid_list(disallowed_prefixes, allowed_prefixes)
-    for pid in pid_list:
-        try:
-            dev_name = get_pid_name(pid)
-
-            private, shared = get_mem_stats(pid)
-            total = private + shared
-            sys_total = get_ram_size()
-            usage = float(total) / float(sys_total)
-
-            sensor_name = "{0}({1})".format(dev_name, pid)
-
-            results[sensor_name + ".private_mem"] = SensorInfo(private, False)
-            results[sensor_name + ".shared_mem"] = SensorInfo(shared, False)
-            results[sensor_name + ".used_mem"] = SensorInfo(total, False)
-            name = sensor_name + ".mem_usage_percent"
-            results[name] = SensorInfo(usage * 100, False)
-        except IOError:
-            # permission denied or proc die
-            continue
-    return results
-
-
-def get_ram_size():
-    """Return RAM size in Kb"""
-    with open("/proc/meminfo") as proc:
-        mem_total = proc.readline().split()
-    return mem_total[1]
diff --git a/wally/sensors/sensors/syscpu_sensors.py b/wally/sensors/sensors/syscpu_sensors.py
deleted file mode 100644
index e2fab8c..0000000
--- a/wally/sensors/sensors/syscpu_sensors.py
+++ /dev/null
@@ -1,51 +0,0 @@
-from .utils import SensorInfo
-from ..discover import provides
-
-# 0 - cpu name
-# 1 - user: normal processes executing in user mode
-# 2 - nice: niced processes executing in user mode
-# 3 - system: processes executing in kernel mode
-# 4 - idle: twiddling thumbs
-# 5 - iowait: waiting for I/O to complete
-# 6 - irq: servicing interrupts
-# 7 - softirq: servicing softirqs
-
-io_values_pos = [
-    (1, 'user_processes', True),
-    (2, 'nice_processes', True),
-    (3, 'system_processes', True),
-    (4, 'idle_time', True),
-]
-
-
-@provides("system-cpu")
-def syscpu_stat(disallowed_prefixes=None, allowed_prefixes=None):
-    results = {}
-
-    # calculate core count
-    core_count = 0
-
-    for line in open('/proc/stat'):
-        vals = line.split()
-        dev_name = vals[0]
-
-        if dev_name == 'cpu':
-            for pos, name, accum_val in io_values_pos:
-                sensor_name = "{0}.{1}".format(dev_name, name)
-                results[sensor_name] = SensorInfo(int(vals[pos]),
-                                                  accum_val)
-        elif dev_name == 'procs_blocked':
-            val = int(vals[1])
-            results["cpu.procs_blocked"] = SensorInfo(val, False)
-        elif dev_name.startswith('cpu'):
-            core_count += 1
-
-    # procs in queue
-    TASKSPOS = 3
-    vals = open('/proc/loadavg').read().split()
-    ready_procs = vals[TASKSPOS].partition('/')[0]
-    # dec on current proc
-    procs_queue = (float(ready_procs) - 1) / core_count
-    results["cpu.procs_queue"] = SensorInfo(procs_queue, False)
-
-    return results
diff --git a/wally/sensors/sensors/sysram_sensors.py b/wally/sensors/sensors/sysram_sensors.py
deleted file mode 100644
index 9de5d82..0000000
--- a/wally/sensors/sensors/sysram_sensors.py
+++ /dev/null
@@ -1,41 +0,0 @@
-from ..discover import provides
-from .utils import SensorInfo, is_dev_accepted
-
-
-# return this values or setted in allowed
-ram_fields = [
-    'MemTotal',
-    'MemFree',
-    'Buffers',
-    'Cached',
-    'SwapCached',
-    'Dirty',
-    'Writeback',
-    'SwapTotal',
-    'SwapFree'
-]
-
-
-@provides("system-ram")
-def sysram_stat(disallowed_prefixes=None, allowed_prefixes=None):
-    if allowed_prefixes is None:
-        allowed_prefixes = ram_fields
-    results = {}
-    for line in open('/proc/meminfo'):
-        vals = line.split()
-        dev_name = vals[0].rstrip(":")
-
-        dev_ok = is_dev_accepted(dev_name,
-                                 disallowed_prefixes,
-                                 allowed_prefixes)
-
-        title = "ram.{0}".format(dev_name)
-
-        if dev_ok:
-            results[title] = SensorInfo(int(vals[1]), False)
-
-    if 'ram.MemFree' in results and 'ram.MemTotal' in results:
-        used = results['ram.MemTotal'].value - results['ram.MemFree'].value
-        usage = float(used) / results['ram.MemTotal'].value
-        results["ram.usage_percent"] = SensorInfo(usage, False)
-    return results
diff --git a/wally/sensors/sensors/utils.py b/wally/sensors/sensors/utils.py
deleted file mode 100644
index cff3201..0000000
--- a/wally/sensors/sensors/utils.py
+++ /dev/null
@@ -1,83 +0,0 @@
-import os
-
-from collections import namedtuple
-
-SensorInfo = namedtuple("SensorInfo", ['value', 'is_accumulated'])
-
-
-def is_dev_accepted(name, disallowed_prefixes, allowed_prefixes):
-    dev_ok = True
-
-    if disallowed_prefixes is not None:
-        dev_ok = all(not name.startswith(prefix)
-                     for prefix in disallowed_prefixes)
-
-    if dev_ok and allowed_prefixes is not None:
-        dev_ok = any(name.startswith(prefix)
-                     for prefix in allowed_prefixes)
-
-    return dev_ok
-
-
-def get_pid_list(disallowed_prefixes, allowed_prefixes):
-    """Return pid list from list of pids and names"""
-    # exceptions
-    but = disallowed_prefixes if disallowed_prefixes is not None else []
-    if allowed_prefixes is None:
-        # if nothing setted - all ps will be returned except setted
-        result = [pid
-                  for pid in os.listdir('/proc')
-                  if pid.isdigit() and pid not in but]
-    else:
-        result = []
-        for pid in os.listdir('/proc'):
-            if pid.isdigit() and pid not in but:
-                name = get_pid_name(pid)
-                if pid in allowed_prefixes or \
-                   any(name.startswith(val) for val in allowed_prefixes):
-                    print name
-                    # this is allowed pid?
-                    result.append(pid)
-    return result
-
-
-def get_pid_name(pid):
-    """Return name by pid"""
-    try:
-        with open(os.path.join('/proc/', pid, 'cmdline'), 'r') as pidfile:
-            try:
-                cmd = pidfile.readline().split()[0]
-                return os.path.basename(cmd).rstrip('\x00')
-            except IndexError:
-                # no cmd returned
-                return "<NO NAME>"
-    except IOError:
-        # upstream wait any string, no matter if we couldn't read proc
-        return "no_such_process"
-
-
-def delta(func, only_upd=True):
-    prev = {}
-    while True:
-        for dev_name, vals in func():
-            if dev_name not in prev:
-                prev[dev_name] = {}
-                for name, (val, _) in vals.items():
-                    prev[dev_name][name] = val
-            else:
-                dev_prev = prev[dev_name]
-                res = {}
-                for stat_name, (val, accum_val) in vals.items():
-                    if accum_val:
-                        if stat_name in dev_prev:
-                            delta = int(val) - int(dev_prev[stat_name])
-                            if not only_upd or 0 != delta:
-                                res[stat_name] = str(delta)
-                        dev_prev[stat_name] = val
-                    elif not only_upd or '0' != val:
-                        res[stat_name] = val
-
-                if only_upd and len(res) == 0:
-                    continue
-                yield dev_name, res
-        yield None, None
diff --git a/wally/sensors/webui/sensors.html b/wally/sensors/webui/sensors.html
deleted file mode 100644
index 77e3cf6..0000000
--- a/wally/sensors/webui/sensors.html
+++ /dev/null
@@ -1,57 +0,0 @@
-<!DOCTYPE html>
-<meta charset="utf-8">
-<style>
-    @import url(http://fonts.googleapis.com/css?family=Yanone+Kaffeesatz:400,700);
-    @import url(http://square.github.io/cubism/style.css);
-</style>
-<script src="http://d3js.org/d3.v3.min.js" charset="utf-8"></script>
-<script src="http://square.github.io/cubism/cubism.v1.js"></script>
-<div id="body"> <div id="graph" /></div>
- 
-<script>
-
-// create context and horizon
-var context = cubism.context().serverDelay(3 * 1000).step(1000).size(1000)
-var horizon = context.horizon().extent([0, 100])
- 
-// define metric accessor
-function wally_source(name) {
-    function selector(start, stop, step, callback){
-        function on_result(data) { 
-            callback(null, data);
-        };
-
-        url = "/sensors?start=" + start + "&stop=" + stop + "&step=" + step + "&name=" + name;
-        d3.json(url, on_result);
-    }
-
-    return context.metric(selector, name);
-}
- 
-// draw graph
-var metrics = ["testnodes:io_q", "testnodes:cpu"];
-horizon.metric(wally_source);
- 
-d3.select("#graph").selectAll(".horizon")
-    .data(metrics)
-    .enter()
-    .append("div")
-    .attr("class", "horizon")
-    .call(horizon.height(120));
- 
-// set rule
-d3.select("#body").append("div")
-    .attr("class", "rule")
-    .call(context.rule());
- 
-// set focus
-context.on("focus", function(i) {
-    d3.selectAll(".value")
-        .style( "right", i == null ? null : context.size() - i + "px");
-});
-
-// set axis 
-var axis = context.axis()
-d3.select("#graph").append("div").attr("class", "axis").append("g").call(axis);
-
-</script>
diff --git a/wally/sensors_utils.py b/wally/sensors_utils.py
deleted file mode 100644
index 600f3bb..0000000
--- a/wally/sensors_utils.py
+++ /dev/null
@@ -1,92 +0,0 @@
-import os.path
-import logging
-import contextlib
-
-from concurrent.futures import ThreadPoolExecutor
-
-from wally import ssh_utils
-from wally.sensors.api import (with_sensors, sensors_info, SensorConfig)
-
-
-logger = logging.getLogger("wally.sensors")
-
-
-def get_sensors_config_for_nodes(cfg, nodes, remote_path):
-    monitored_nodes = []
-    sensors_configs = []
-    source2roles_map = {}
-
-    receiver_url = "csvfile://" + os.path.join(remote_path, "results.csv")
-
-    for role, sensors_str in cfg["roles_mapping"].items():
-        sensors = [sens.strip() for sens in sensors_str.split(",")]
-
-        collect_cfg = dict((sensor, {}) for sensor in sensors)
-
-        for node in nodes:
-            if role in node.roles:
-                source2roles_map[node.get_conn_id()] = node.roles
-                monitored_nodes.append(node)
-                sens_cfg = SensorConfig(node.connection,
-                                        node.get_conn_id(),
-                                        collect_cfg,
-                                        source_id=node.get_conn_id(),
-                                        monitor_url=receiver_url)
-                sensors_configs.append(sens_cfg)
-
-    return monitored_nodes, sensors_configs, source2roles_map
-
-
-PID_FILE = "/tmp/sensors.pid"
-
-
-def clear_old_sensors(sensors_configs):
-    def stop_sensors(sens_cfg):
-        with sens_cfg.conn.open_sftp() as sftp:
-            try:
-                pid = ssh_utils.read_from_remote(sftp, PID_FILE)
-                pid = int(pid.strip())
-                ssh_utils.run_over_ssh(sens_cfg.conn,
-                                       "kill -9 " + str(pid))
-                sftp.remove(PID_FILE)
-            except:
-                pass
-
-    with ThreadPoolExecutor(32) as pool:
-        list(pool.map(stop_sensors, sensors_configs))
-
-
-@contextlib.contextmanager
-def with_sensors_util(sensors_cfg, nodes):
-    srp = sensors_cfg['sensors_remote_path']
-    monitored_nodes, sensors_configs, source2roles_map = \
-        get_sensors_config_for_nodes(sensors_cfg, nodes, srp)
-
-    with with_sensors(sensors_configs, srp):
-        yield source2roles_map
-
-
-@contextlib.contextmanager
-def sensors_info_util(cfg, nodes):
-    if cfg.get('sensors', None) is None:
-        yield
-        return
-
-    _, sensors_configs, _ = \
-        get_sensors_config_for_nodes(cfg.sensors, nodes,
-                                     cfg.sensors_remote_path)
-
-    clear_old_sensors(sensors_configs)
-    ctx = sensors_info(sensors_configs, cfg.sensors_remote_path)
-    try:
-        res = ctx.__enter__()
-        yield res
-    except:
-        ctx.__exit__(None, None, None)
-        raise
-    finally:
-        try:
-            ctx.__exit__(None, None, None)
-        except:
-            logger.exception("During stop/collect sensors")
-            del res[:]
diff --git a/wally/ssh_utils.py b/wally/ssh_utils.py
index e566cbe..ada4af6 100644
--- a/wally/ssh_utils.py
+++ b/wally/ssh_utils.py
@@ -78,8 +78,7 @@
 
 
 def exists(sftp, path):
-    """os.path.exists for paramiko's SCP object
-    """
+    "os.path.exists for paramiko's SCP object"
     try:
         sftp.stat(path)
         return True
@@ -329,6 +328,7 @@
 
 
 def parse_ssh_uri(uri):
+    # [ssh://]+
     # user:passwd@ip_host:port
     # user:passwd@ip_host
     # user@ip_host:port
diff --git a/wally/stage.py b/wally/stage.py
new file mode 100644
index 0000000..5f47a1b
--- /dev/null
+++ b/wally/stage.py
@@ -0,0 +1,36 @@
+import logging
+import contextlib
+
+
+from .utils import StopTestError
+
+logger = logging.getLogger("wally")
+
+
+class TestStage:
+    name = ""
+
+    def __init__(self, testrun, config):
+        self.testrun = testrun
+        self.config = config
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self):
+        return self
+
+
+@contextlib.contextmanager
+def log_stage(stage):
+    msg_templ = "Exception during {0}: {1!s}"
+    msg_templ_no_exc = "During {0}"
+
+    logger.info("Start " + stage.name)
+
+    try:
+        yield
+    except StopTestError as exc:
+        logger.error(msg_templ.format(stage.name, exc))
+    except Exception:
+        logger.exception(msg_templ_no_exc.format(stage.name))
diff --git a/wally/start_vms.py b/wally/start_vms.py
index 0c4ccc7..759d63b 100644
--- a/wally/start_vms.py
+++ b/wally/start_vms.py
@@ -5,9 +5,8 @@
 import urllib
 import os.path
 import logging
-import warnings
-import subprocess
 import collections
+from typing import Dict, Any, Iterable
 
 from concurrent.futures import ThreadPoolExecutor
 
@@ -41,7 +40,7 @@
 CINDER_CONNECTION = None
 
 
-def is_connected():
+def is_connected() -> bool:
     return NOVA_CONNECTION is not None
 
 
@@ -50,7 +49,7 @@
                                   "tenant", "auth_url", "insecure"])
 
 
-def ostack_get_creds():
+def ostack_get_creds() -> OSCreds:
     if STORED_OPENSTACK_CREDS is None:
         return OSCreds(os.environ.get('OS_USERNAME'),
                        os.environ.get('OS_PASSWORD'),
@@ -61,7 +60,7 @@
         return STORED_OPENSTACK_CREDS
 
 
-def nova_connect(os_creds=None):
+def nova_connect(os_creds: OSCreds=None) -> n_client:
     global NOVA_CONNECTION
     global STORED_OPENSTACK_CREDS
 
@@ -80,7 +79,7 @@
     return NOVA_CONNECTION
 
 
-def cinder_connect(os_creds=None):
+def cinder_connect(os_creds: OSCreds=None) -> c_client:
     global CINDER_CONNECTION
     global STORED_OPENSTACK_CREDS
 
@@ -97,68 +96,7 @@
     return CINDER_CONNECTION
 
 
-def prepare_os_subpr(nova, params, os_creds, max_vm_per_compute=8):
-    if os_creds is None:
-        os_creds = ostack_get_creds()
-
-    serv_groups = " ".join(map(params['aa_group_name'].format,
-                               range(max_vm_per_compute)))
-
-    image_name = params['image']['name']
-    env = os.environ.copy()
-
-    env.update(dict(
-        OS_USERNAME=os_creds.name,
-        OS_PASSWORD=os_creds.passwd,
-        OS_TENANT_NAME=os_creds.tenant,
-        OS_AUTH_URL=os_creds.auth_url,
-        OS_INSECURE="1" if os_creds.insecure else "0",
-
-        FLAVOR_NAME=params['flavor']['name'],
-        FLAVOR_RAM=str(params['flavor']['ram_size']),
-        FLAVOR_HDD=str(params['flavor']['hdd_size']),
-        FLAVOR_CPU_COUNT=str(params['flavor']['cpu_count']),
-
-        SERV_GROUPS=serv_groups,
-
-        SECGROUP=params['security_group'],
-
-        IMAGE_NAME=image_name,
-        IMAGE_URL=params['image']['url'],
-    ))
-
-    spath = os.path.dirname(os.path.dirname(wally.__file__))
-    spath = os.path.join(spath, 'scripts/prepare.sh')
-
-    with warnings.catch_warnings():
-        warnings.simplefilter("ignore")
-        fname = os.tempnam()
-
-    cmd = "bash {spath} >{fname} 2>&1".format(spath=spath, fname=fname)
-    try:
-        subprocess.check_call(cmd, shell=True, env=env)
-    except:
-        logger.error("Prepare failed. Logs in " + fname)
-        with open(fname) as fd:
-            logger.error("Message:\n    " + fd.read().replace("\n", "\n    "))
-        raise
-    os.unlink(fname)
-
-    while True:
-        status = nova.images.find(name=image_name).status
-        if status == 'ACTIVE':
-            break
-        msg = "Image {0} is still in {1} state. Waiting 10 more seconds"
-        logger.info(msg.format(image_name, status))
-        time.sleep(10)
-
-    create_keypair(nova,
-                   params['keypair_name'],
-                   params['keypair_file_public'],
-                   params['keypair_file_private'])
-
-
-def find_vms(nova, name_prefix):
+def find_vms(nova: n_client, name_prefix: str) -> Iterable[str, int]:
     for srv in nova.servers.list():
         if srv.name.startswith(name_prefix):
             for ips in srv.addresses.values():
@@ -168,7 +106,7 @@
                         break
 
 
-def pause(ids):
+def pause(ids: Iterable[int]) -> None:
     def pause_vm(conn, vm_id):
         vm = conn.servers.get(vm_id)
         if vm.status == 'ACTIVE':
@@ -182,7 +120,7 @@
             future.result()
 
 
-def unpause(ids, max_resume_time=10):
+def unpause(ids: Iterable[int], max_resume_time=10) -> None:
     def unpause(conn, vm_id):
         vm = conn.servers.get(vm_id)
         if vm.status == 'PAUSED':
@@ -204,7 +142,7 @@
             future.result()
 
 
-def prepare_os(nova, params, os_creds, max_vm_per_compute=8):
+def prepare_os(nova: n_client, params: Dict[str, Any], os_creds: OSCreds) -> None:
     """prepare openstack for futher usage
 
     Creates server groups, security rules, keypair, flavor
@@ -257,7 +195,7 @@
     create_flavor(nova, **params['flavor'])
 
 
-def create_keypair(nova, name, pub_key_path, priv_key_path):
+def create_keypair(nova: n_client, name: str, pub_key_path: str, priv_key_path: str):
     """create and upload keypair into nova, if doesn't exists yet
 
     Create and upload keypair into nova, if keypair with given bane
@@ -308,7 +246,7 @@
                                " or remove key from openstack")
 
 
-def get_or_create_aa_group(nova, name):
+def get_or_create_aa_group(nova: n_client, name: str) -> int:
     """create anti-affinity server group, if doesn't exists yet
 
     parameters:
@@ -326,7 +264,7 @@
     return group.id
 
 
-def allow_ssh(nova, group_name):
+def allow_ssh(nova: n_client, group_name: str) -> int:
     """create sequrity group for ping and ssh
 
     parameters:
@@ -355,7 +293,7 @@
     return secgroup.id
 
 
-def create_image(nova, os_creds, name, url):
+def create_image(nova: n_client, os_creds: OSCreds, name: str, url: str):
     """upload image into glance from given URL, if given image doesn't exisis yet
 
     parameters:
@@ -394,7 +332,7 @@
             os.unlink(tempnam)
 
 
-def create_flavor(nova, name, ram_size, hdd_size, cpu_count):
+def create_flavor(nova: n_client, name: str, ram_size: int, hdd_size: int, cpu_count: int):
     """create flavor, if doesn't exisis yet
 
     parameters:
@@ -415,7 +353,7 @@
     nova.flavors.create(name, cpu_count, ram_size, hdd_size)
 
 
-def create_volume(size, name):
+def create_volume(size: int, name: str):
     cinder = cinder_connect()
     vol = cinder.volumes.create(size=size, display_name=name)
     err_count = 0
@@ -436,7 +374,7 @@
     return vol
 
 
-def wait_for_server_active(nova, server, timeout=300):
+def wait_for_server_active(nova: n_client, server, timeout: int=300)-> None:
     """waiting till server became active
 
     parameters:
diff --git a/wally/suits/io/defaults.cfg b/wally/suits/io/defaults.cfg
new file mode 100644
index 0000000..8c8644b
--- /dev/null
+++ b/wally/suits/io/defaults.cfg
@@ -0,0 +1,24 @@
+buffered=0
+group_reporting=1
+iodepth=1
+unified_rw_reporting=1
+
+norandommap=1
+
+thread=1
+time_based=1
+wait_for_previous=1
+
+# this is critical for correct results in multy-node run
+randrepeat=0
+
+filename={FILENAME}
+
+size={TEST_FILE_SIZE}
+
+write_lat_log=fio_log
+write_iops_log=fio_log
+write_bw_log=fio_log
+log_avg_msec=500
+
+
diff --git a/wally/suits/io/fio.py b/wally/suits/io/fio.py
index 7b77461..50bb1fd 100644
--- a/wally/suits/io/fio.py
+++ b/wally/suits/io/fio.py
@@ -3,51 +3,45 @@
 import json
 import stat
 import random
-import shutil
+import hashlib
 import os.path
 import logging
 import datetime
 import functools
-import subprocess
 import collections
+from typing import Dict, List, Callable, Any, Tuple, Optional
 
 import yaml
-import paramiko
 import texttable
 from paramiko.ssh_exception import SSHException
 from concurrent.futures import ThreadPoolExecutor, wait
 
 import wally
-from wally.pretty_yaml import dumps
-from wally.statistic import round_3_digit, data_property, average
-from wally.utils import ssize2b, sec_to_str, StopTestError, Barrier, get_os
-from wally.ssh_utils import (save_to_remote, read_from_remote, BGSSHTask, reconnect)
+
+from ...pretty_yaml import dumps
+from ...statistic import round_3_digit, data_property, average
+from ...utils import ssize2b, sec_to_str, StopTestError, Barrier, get_os
+from ...inode import INode
+
+from ..itest import (TimeSeriesValue, PerfTest, TestResults, TestConfig)
 
 from .fio_task_parser import (execution_time, fio_cfg_compile,
                               get_test_summary, get_test_summary_tuple,
                               get_test_sync_mode, FioJobSection)
+from .rpc_plugin import parse_fio_result
 
-from ..itest import (TimeSeriesValue, PerfTest, TestResults,
-                     run_on_node, TestConfig, MeasurementMatrix)
 
 logger = logging.getLogger("wally")
 
 
-# Results folder structure
-# results/
-#     {loadtype}_{num}/
-#         config.yaml
-#         ......
-
-
-class NoData(object):
+class NoData:
     pass
 
 
-def cached_prop(func):
+def cached_prop(func: Callable[..., Any]) -> Callable[..., Any]:
     @property
     @functools.wraps(func)
-    def closure(self):
+    def closure(self) -> Any:
         val = getattr(self, "_" + func.__name__)
         if val is NoData:
             val = func(self)
@@ -56,7 +50,7 @@
     return closure
 
 
-def load_fio_log_file(fname):
+def load_fio_log_file(fname: str) -> TimeSeriesValue:
     with open(fname) as fd:
         it = [ln.split(',')[:2] for ln in fd]
 
@@ -72,7 +66,7 @@
 WRITE_IOPS_DISCSTAT_POS = 7
 
 
-def load_sys_log_file(ftype, fname):
+def load_sys_log_file(ftype: str, fname: str) -> TimeSeriesValue:
     assert ftype == 'iops'
     pval = None
     with open(fname) as fd:
@@ -89,7 +83,7 @@
     return TimeSeriesValue(vals)
 
 
-def load_test_results(folder, run_num):
+def load_test_results(folder: str, run_num: int) -> 'FioRunResult':
     res = {}
     params = None
 
@@ -97,7 +91,7 @@
     params = yaml.load(open(fn).read())
 
     conn_ids_set = set()
-    rr = r"{0}_(?P<conn_id>.*?)_(?P<type>[^_.]*)\.\d+\.log$".format(run_num)
+    rr = r"{}_(?P<conn_id>.*?)_(?P<type>[^_.]*)\.\d+\.log$".format(run_num)
     for fname in os.listdir(folder):
         rm = re.match(rr, fname)
         if rm is None:
@@ -115,7 +109,7 @@
 
         conn_ids_set.add(conn_id)
 
-    rr = r"{0}_(?P<conn_id>.*?)_(?P<type>[^_.]*)\.sys\.log$".format(run_num)
+    rr = r"{}_(?P<conn_id>.*?)_(?P<type>[^_.]*)\.sys\.log$".format(run_num)
     for fname in os.listdir(folder):
         rm = re.match(rr, fname)
         if rm is None:
@@ -159,8 +153,8 @@
     return FioRunResult(config, fio_task, mm_res, raw_res, params['intervals'], run_num)
 
 
-class Attrmapper(object):
-    def __init__(self, dct):
+class Attrmapper:
+    def __init__(self, dct: Dict[str, Any]):
         self.__dct = dct
 
     def __getattr__(self, name):
@@ -170,8 +164,8 @@
             raise AttributeError(name)
 
 
-class DiskPerfInfo(object):
-    def __init__(self, name, summary, params, testnodes_count):
+class DiskPerfInfo:
+    def __init__(self, name: str, summary: str, params: Dict[str, Any], testnodes_count: int):
         self.name = name
         self.bw = None
         self.iops = None
@@ -193,7 +187,7 @@
         self.concurence = self.params['vals'].get('numjobs', 1)
 
 
-def get_lat_perc_50_95(lat_mks):
+def get_lat_perc_50_95(lat_mks: List[float]) -> Tuple[float, float]:
     curr_perc = 0
     perc_50 = None
     perc_95 = None
@@ -224,8 +218,8 @@
     return perc_50 / 1000., perc_95 / 1000.
 
 
-class IOTestResults(object):
-    def __init__(self, suite_name, fio_results, log_directory):
+class IOTestResults:
+    def __init__(self, suite_name: str, fio_results: 'FioRunResult', log_directory: str):
         self.suite_name = suite_name
         self.fio_results = fio_results
         self.log_directory = log_directory
@@ -236,7 +230,7 @@
     def __len__(self):
         return len(self.fio_results)
 
-    def get_yamable(self):
+    def get_yamable(self) -> Dict[str, List[str]]:
         items = [(fio_res.summary(), fio_res.idx) for fio_res in self]
         return {self.suite_name: [self.log_directory] + items}
 
@@ -424,6 +418,10 @@
     soft_runcycle = 5 * 60
     retry_time = 30
 
+    zero_md5_hash = hashlib.md5()
+    zero_md5_hash.update(b"\x00" * 1024)
+    zero_md5 = zero_md5_hash.hexdigest()
+
     def __init__(self, config):
         PerfTest.__init__(self, config)
 
@@ -464,7 +462,7 @@
         self.fio_configs = None
 
     @classmethod
-    def load(cls, suite_name, folder):
+    def load(cls, suite_name: str, folder: str) -> IOTestResults:
         res = []
         for fname in os.listdir(folder):
             if re.match("\d+_params.yaml$", fname):
@@ -478,11 +476,9 @@
         pass
 
     # size is megabytes
-    def check_prefill_required(self, rossh, fname, size, num_blocks=16):
+    def check_prefill_required(self, node: INode, fname: str, size: int, num_blocks: Optional[int]=16) -> bool:
         try:
-            with rossh.connection.open_sftp() as sftp:
-                fstats = sftp.stat(fname)
-
+            fstats = node.stat_file(fname)
             if stat.S_ISREG(fstats.st_mode) and fstats.st_size < size * 1024 ** 2:
                 return True
         except EnvironmentError:
@@ -498,14 +494,13 @@
         if self.use_sudo:
             cmd = "sudo " + cmd
 
-        zero_md5 = '0f343b0931126a20f133d67c2b018a3b'
         bsize = size * (1024 ** 2)
         offsets = [random.randrange(bsize - 1024) for _ in range(num_blocks)]
         offsets.append(bsize - 1024)
         offsets.append(0)
 
         for offset in offsets:
-            data = rossh(cmd.format(fname, offset), nolog=True)
+            data = node.run(cmd.format(fname, offset), nolog=True)
 
             md = ""
             for line in data.split("\n"):
@@ -517,12 +512,12 @@
                 logger.error("File data check is failed - " + data)
                 return True
 
-            if zero_md5 == md:
+            if self.zero_md5 == md:
                 return True
 
         return False
 
-    def prefill_test_files(self, rossh, files, force=False):
+    def prefill_test_files(self, node: INode, files: List[str], force:bool=False) -> None:
         if self.use_system_fio:
             cmd_templ = "fio "
         else:
@@ -542,7 +537,7 @@
         ddtime = 0
         for fname, curr_sz in files.items():
             if not force:
-                if not self.check_prefill_required(rossh, fname, curr_sz):
+                if not self.check_prefill_required(node, fname, curr_sz):
                     logger.debug("prefill is skipped")
                     continue
 
@@ -551,7 +546,7 @@
             ssize += curr_sz
 
             stime = time.time()
-            rossh(cmd, timeout=curr_sz)
+            node.run(cmd, timeout=curr_sz)
             ddtime += time.time() - stime
 
         if ddtime > 1.0:
@@ -559,10 +554,10 @@
             mess = "Initiall fio fill bw is {0} MiBps for this vm"
             logger.info(mess.format(fill_bw))
 
-    def install_utils(self, node, rossh, max_retry=3, timeout=5):
+    def install_utils(self, node: INode) -> None:
         need_install = []
         packs = [('screen', 'screen')]
-        os_info = get_os(rossh)
+        os_info = get_os(node)
 
         if self.use_system_fio:
             packs.append(('fio', 'fio'))
@@ -575,7 +570,7 @@
                 continue
 
             try:
-                rossh('which ' + bin_name, nolog=True)
+                node.run('which ' + bin_name, nolog=True)
             except OSError:
                 need_install.append(package)
 
@@ -585,14 +580,10 @@
             else:
                 cmd = "sudo apt-get -y install " + " ".join(need_install)
 
-            for _ in range(max_retry):
-                try:
-                    rossh(cmd)
-                    break
-                except OSError as err:
-                    time.sleep(timeout)
-            else:
-                raise OSError("Can't install - " + str(err))
+            try:
+                node.run(cmd)
+            except OSError as err:
+                raise OSError("Can't install - {}".format(" ".join(need_install))) from err
 
         if not self.use_system_fio:
             fio_dir = os.path.dirname(os.path.dirname(wally.__file__))
@@ -602,19 +593,16 @@
             fio_path = os.path.join(fio_dir, fname)
 
             if not os.path.exists(fio_path):
-                raise RuntimeError("No prebuild fio available for {0}".format(os_info))
+                raise RuntimeError("No prebuild fio binary available for {0}".format(os_info))
 
             bz_dest = self.join_remote('fio.bz2')
-            with node.connection.open_sftp() as sftp:
-                sftp.put(fio_path, bz_dest)
+            node.copy_file(fio_path, bz_dest)
+            node.run("bzip2 --decompress {}" + bz_dest, nolog=True)
+            node.run("chmod a+x " + self.join_remote("fio"), nolog=True)
 
-            rossh("bzip2 --decompress " + bz_dest, nolog=True)
-            rossh("chmod a+x " + self.join_remote("fio"), nolog=True)
-
-    def pre_run(self):
+    def pre_run(self) -> None:
         if 'FILESIZE' not in self.config_params:
-            # need to detect file size
-            pass
+            raise NotImplementedError("File size detection is not implemented")
 
         self.fio_configs = fio_cfg_compile(self.raw_cfg,
                                            self.config_fname,
@@ -641,36 +629,28 @@
                                    force=self.force_prefill)
             list(pool.map(fc, self.config.nodes))
 
-    def pre_run_th(self, node, files, force):
+    def pre_run_th(self, node: INode, files: List[str], force_prefil: Optional[bool]=False) -> None:
         try:
-            # fill files with pseudo-random data
-            rossh = run_on_node(node)
-            rossh.connection = node.connection
+            cmd = 'mkdir -p "{0}"'.format(self.config.remote_dir)
+            if self.use_sudo:
+                cmd = "sudo " + cmd
+                cmd += " ; sudo chown {0} {1}".format(node.get_user(),
+                                                      self.config.remote_dir)
+            node.run(cmd, nolog=True)
 
-            try:
-                cmd = 'mkdir -p "{0}"'.format(self.config.remote_dir)
-                if self.use_sudo:
-                    cmd = "sudo " + cmd
-                    cmd += " ; sudo chown {0} {1}".format(node.get_user(),
-                                                          self.config.remote_dir)
-                rossh(cmd, nolog=True)
+            assert self.config.remote_dir != "" and self.config.remote_dir != "/"
+            node.run("rm -rf {}/*".format(self.config.remote_dir), nolog=True)
 
-                assert self.config.remote_dir != "" and self.config.remote_dir != "/"
-                rossh("rm -rf {0}/*".format(self.config.remote_dir), nolog=True)
+        except Exception as exc:
+            msg = "Failed to create folder {} on remote {}."
+            msg = msg.format(self.config.remote_dir, node, exc)
+            logger.exception(msg)
+            raise StopTestError(msg) from exc
 
-            except Exception as exc:
-                msg = "Failed to create folder {0} on remote {1}. Error: {2!s}"
-                msg = msg.format(self.config.remote_dir, node.get_conn_id(), exc)
-                logger.exception(msg)
-                raise StopTestError(msg, exc)
+        self.install_utils(node)
+        self.prefill_test_files(node, files, force_prefil)
 
-            self.install_utils(node, rossh)
-            self.prefill_test_files(rossh, files, force)
-        except:
-            logger.exception("XXXX")
-            raise
-
-    def show_test_execution_time(self):
+    def show_expected_execution_time(self) -> None:
         if len(self.fio_configs) > 1:
             # +10% - is a rough estimation for additional operations
             # like sftp, etc
@@ -682,16 +662,17 @@
             logger.info(msg.format(exec_time_s,
                                    end_dt.strftime("%H:%M:%S")))
 
-    def run(self):
+    def run(self) -> IOTestResults:
         logger.debug("Run preparation")
         self.pre_run()
-        self.show_test_execution_time()
+        self.show_expected_execution_time()
+        num_nodes = len(self.config.nodes)
 
         tname = os.path.basename(self.config_fname)
         if tname.endswith('.cfg'):
             tname = tname[:-4]
 
-        barrier = Barrier(len(self.config.nodes))
+        barrier = Barrier(num_nodes)
         results = []
 
         # set of Operation_Mode_BlockSize str's
@@ -699,17 +680,14 @@
         # they already too slow with previous thread count
         lat_bw_limit_reached = set()
 
-        with ThreadPoolExecutor(len(self.config.nodes)) as pool:
+        with ThreadPoolExecutor(num_nodes) as pool:
             for pos, fio_cfg in enumerate(self.fio_configs):
-                test_descr = get_test_summary(fio_cfg.vals).split("th")[0]
+                test_descr = get_test_summary(fio_cfg.vals, noqd=True)
                 if test_descr in lat_bw_limit_reached:
                     continue
-                else:
-                    logger.info("Will run {0} test".format(fio_cfg.name))
 
-                templ = "Test should takes about {0}." + \
-                        " Should finish at {1}," + \
-                        " will wait at most till {2}"
+                logger.info("Will run {} test".format(fio_cfg.name))
+                templ = "Test should takes about {}. Should finish at {}, will wait at most till {}"
                 exec_time = execution_time(fio_cfg)
                 exec_time_str = sec_to_str(exec_time)
                 timeout = int(exec_time + max(300, exec_time))
@@ -722,43 +700,37 @@
                                          end_dt.strftime("%H:%M:%S"),
                                          wait_till.strftime("%H:%M:%S")))
 
-                func = functools.partial(self.do_run,
-                                         barrier=barrier,
-                                         fio_cfg=fio_cfg,
-                                         pos=pos)
+                run_test_func = functools.partial(self.do_run,
+                                                  barrier=barrier,
+                                                  fio_cfg=fio_cfg,
+                                                  pos=pos)
 
                 max_retr = 3
                 for idx in range(max_retr):
+                    if 0 != idx:
+                        logger.info("Sleeping %ss and retrying", self.retry_time)
+                        time.sleep(self.retry_time)
+
                     try:
-                        intervals = list(pool.map(func, self.config.nodes))
+                        intervals = list(pool.map(run_test_func, self.config.nodes))
                         if None not in intervals:
                             break
                     except (EnvironmentError, SSHException) as exc:
+                        if max_retr - 1 == idx:
+                            raise StopTestError("Fio failed") from exc
                         logger.exception("During fio run")
-                        if idx == max_retr - 1:
-                            raise StopTestError("Fio failed", exc)
 
-                    logger.info("Reconnectiong, sleeping %ss and retrying", self.retry_time)
-
-                    wait([pool.submit(node.connection.close)
-                          for node in self.config.nodes])
-
-                    time.sleep(self.retry_time)
-
-                    wait([pool.submit(reconnect, node.connection, node.conn_url)
-                             for node in self.config.nodes])
-
-                fname = "{0}_task.fio".format(pos)
+                fname = "{}_task.fio".format(pos)
                 with open(os.path.join(self.config.log_directory, fname), "w") as fd:
                     fd.write(str(fio_cfg))
 
-                params = {'vm_count': len(self.config.nodes)}
+                params = {'vm_count': num_nodes}
                 params['name'] = fio_cfg.name
                 params['vals'] = dict(fio_cfg.vals.items())
                 params['intervals'] = intervals
                 params['nodes'] = [node.get_conn_id() for node in self.config.nodes]
 
-                fname = "{0}_params.yaml".format(pos)
+                fname = "{}_params.yaml".format(pos)
                 with open(os.path.join(self.config.log_directory, fname), "w") as fd:
                     fd.write(dumps(params))
 
@@ -770,7 +742,7 @@
 
                     # conver us to ms
                     if self.max_latency < lat_50:
-                        logger.info(("Will skip all subsequent tests of {0} " +
+                        logger.info(("Will skip all subsequent tests of {} " +
                                      "due to lat/bw limits").format(fio_cfg.name))
                         lat_bw_limit_reached.add(test_descr)
 
@@ -782,49 +754,7 @@
         return IOTestResults(self.config.params['cfg'],
                              results, self.config.log_directory)
 
-    def do_run(self, node, barrier, fio_cfg, pos, nolog=False):
-        if self.use_sudo:
-            sudo = "sudo "
-        else:
-            sudo = ""
-
-        bash_file = """
-#!/bin/bash
-
-function get_dev() {{
-    if [ -b "$1" ] ; then
-        echo $1
-    else
-        echo $(df "$1" | tail -1 | awk '{{print $1}}')
-    fi
-}}
-
-function log_io_activiti(){{
-    local dest="$1"
-    local dev=$(get_dev "$2")
-    local sleep_time="$3"
-    dev=$(basename "$dev")
-
-    echo $dev
-
-    for (( ; ; )) ; do
-        grep -E "\\b$dev\\b" /proc/diskstats >> "$dest"
-        sleep $sleep_time
-    done
-}}
-
-sync
-cd {exec_folder}
-
-log_io_activiti {io_log_file} {test_file} 1 &
-local pid="$!"
-
-{fio_path}fio --output-format=json --output={out_file} --alloc-size=262144 {job_file} >{err_out_file} 2>&1
-echo $? >{res_code_file}
-kill -9 $pid
-
-"""
-
+    def do_run(self, node: INode, barrier: Barrier, fio_cfg, pos: int, nolog: bool=False):
         exec_folder = self.config.remote_dir
 
         if self.use_system_fio:
@@ -835,157 +765,20 @@
             else:
                 fio_path = exec_folder
 
-        bash_file = bash_file.format(out_file=self.results_file,
-                                     job_file=self.task_file,
-                                     err_out_file=self.err_out_file,
-                                     res_code_file=self.exit_code_file,
-                                     exec_folder=exec_folder,
-                                     fio_path=fio_path,
-                                     test_file=self.config_params['FILENAME'],
-                                     io_log_file=self.io_log_file).strip()
-
-        with node.connection.open_sftp() as sftp:
-            save_to_remote(sftp, self.task_file, str(fio_cfg))
-            save_to_remote(sftp, self.sh_file, bash_file)
-
         exec_time = execution_time(fio_cfg)
-
-        timeout = int(exec_time + max(300, exec_time))
-        soft_tout = exec_time
-
-        begin = time.time()
-
-        fnames_before = run_on_node(node)("ls -1 " + exec_folder, nolog=True)
-
         barrier.wait()
-
-        task = BGSSHTask(node, self.use_sudo)
-        task.start(sudo + "bash " + self.sh_file)
-
-        while True:
-            try:
-                task.wait(soft_tout, timeout)
-                break
-            except paramiko.SSHException:
-                pass
-
-            try:
-                node.connection.close()
-            except:
-                pass
-
-            reconnect(node.connection, node.conn_url)
-
-        end = time.time()
-        rossh = run_on_node(node)
-        fnames_after = rossh("ls -1 " + exec_folder, nolog=True)
-
-        conn_id = node.get_conn_id().replace(":", "_")
-        if not nolog:
-            logger.debug("Test on node {0} is finished".format(conn_id))
-
-        log_files_pref = []
-        if 'write_lat_log' in fio_cfg.vals:
-            fname = fio_cfg.vals['write_lat_log']
-            log_files_pref.append(fname + '_clat')
-            log_files_pref.append(fname + '_lat')
-            log_files_pref.append(fname + '_slat')
-
-        if 'write_iops_log' in fio_cfg.vals:
-            fname = fio_cfg.vals['write_iops_log']
-            log_files_pref.append(fname + '_iops')
-
-        if 'write_bw_log' in fio_cfg.vals:
-            fname = fio_cfg.vals['write_bw_log']
-            log_files_pref.append(fname + '_bw')
-
-        files = collections.defaultdict(lambda: [])
-        all_files = [os.path.basename(self.results_file)]
-        new_files = set(fnames_after.split()) - set(fnames_before.split())
-
-        for fname in new_files:
-            if fname.endswith('.log') and fname.split('.')[0] in log_files_pref:
-                name, _ = os.path.splitext(fname)
-                if fname.count('.') == 1:
-                    tp = name.split("_")[-1]
-                    cnt = 0
-                else:
-                    tp_cnt = name.split("_")[-1]
-                    tp, cnt = tp_cnt.split('.')
-                files[tp].append((int(cnt), fname))
-                all_files.append(fname)
-            elif fname == os.path.basename(self.io_log_file):
-                files['iops'].append(('sys', fname))
-                all_files.append(fname)
-
-        arch_name = self.join_remote('wally_result.tar.gz')
-        tmp_dir = os.path.join(self.config.log_directory, 'tmp_' + conn_id)
-
-        if os.path.exists(tmp_dir):
-            shutil.rmtree(tmp_dir)
-
-        os.mkdir(tmp_dir)
-        loc_arch_name = os.path.join(tmp_dir, 'wally_result.{0}.tar.gz'.format(conn_id))
-        file_full_names = " ".join(all_files)
-
-        try:
-            os.unlink(loc_arch_name)
-        except:
-            pass
-
-        with node.connection.open_sftp() as sftp:
-            try:
-                exit_code = read_from_remote(sftp, self.exit_code_file)
-            except IOError:
-                logger.error("No exit code file found on %s. Looks like process failed to start",
-                             conn_id)
-                return None
-
-            err_out = read_from_remote(sftp, self.err_out_file)
-            exit_code = exit_code.strip()
-
-            if exit_code != '0':
-                msg = "fio exit with code {0}: {1}".format(exit_code, err_out)
-                logger.critical(msg.strip())
-                raise StopTestError("fio failed")
-
-            rossh("rm -f {0}".format(arch_name), nolog=True)
-            pack_files_cmd = "cd {0} ; tar zcvf {1} {2}".format(exec_folder, arch_name, file_full_names)
-            rossh(pack_files_cmd, nolog=True)
-            sftp.get(arch_name, loc_arch_name)
-
-        unpack_files_cmd = "cd {0} ; tar xvzf {1} >/dev/null".format(tmp_dir, loc_arch_name)
-        subprocess.check_call(unpack_files_cmd, shell=True)
-        os.unlink(loc_arch_name)
-
-        for ftype, fls in files.items():
-            for idx, fname in fls:
-                cname = os.path.join(tmp_dir, fname)
-                loc_fname = "{0}_{1}_{2}.{3}.log".format(pos, conn_id, ftype, idx)
-                loc_path = os.path.join(self.config.log_directory, loc_fname)
-                os.rename(cname, loc_path)
-
-        cname = os.path.join(tmp_dir,
-                             os.path.basename(self.results_file))
-        loc_fname = "{0}_{1}_rawres.json".format(pos, conn_id)
-        loc_path = os.path.join(self.config.log_directory, loc_fname)
-        os.rename(cname, loc_path)
-        os.rmdir(tmp_dir)
-
-        remove_remote_res_files_cmd = "cd {0} ; rm -f {1} {2}".format(exec_folder,
-                                                                      arch_name,
-                                                                      file_full_names)
-        rossh(remove_remote_res_files_cmd, nolog=True)
-        return begin, end
+        run_data = node.rpc.fio.run_fio(self.use_sudo,
+                                        fio_path,
+                                        exec_folder,
+                                        str(fio_cfg),
+                                        exec_time + max(300, exec_time))
+        return parse_fio_result(run_data)
 
     @classmethod
-    def prepare_data(cls, results):
-        """
-        create a table with io performance report
-        for console
-        """
+    def prepare_data(cls, results) -> List[Dict[str, Any]]:
+        """create a table with io performance report for console"""
 
-        def key_func(data):
+        def key_func(data) -> Tuple(str, str, str, str, int):
             tpl = data.summary_tpl()
             return (data.name,
                     tpl.oper,
@@ -1067,11 +860,8 @@
     fiels_and_header_dct = dict((item.attr, item) for item in fiels_and_header)
 
     @classmethod
-    def format_for_console(cls, results):
-        """
-        create a table with io performance report
-        for console
-        """
+    def format_for_console(cls, results) -> str:
+        """create a table with io performance report for console"""
 
         tab = texttable.Texttable(max_width=120)
         tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
@@ -1090,11 +880,8 @@
         return tab.draw()
 
     @classmethod
-    def format_diff_for_console(cls, list_of_results):
-        """
-        create a table with io performance report
-        for console
-        """
+    def format_diff_for_console(cls, list_of_results: List[Any]) -> str:
+        """create a table with io performance report for console"""
 
         tab = texttable.Texttable(max_width=200)
         tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
diff --git a/wally/suits/io/fio_task_parser.py b/wally/suits/io/fio_task_parser.py
index 0f788ed..233f6e2 100644
--- a/wally/suits/io/fio_task_parser.py
+++ b/wally/suits/io/fio_task_parser.py
@@ -1,13 +1,17 @@
+#!/usr/bin/env python3
+
+import re
 import os
 import sys
 import copy
 import os.path
 import argparse
 import itertools
+from typing import Optional, Generator, Union, Dict, Iterable, Any, List, TypeVar, Callable
 from collections import OrderedDict, namedtuple
 
 
-from wally.utils import sec_to_str, ssize2b
+from ...utils import sec_to_str, ssize2b
 
 
 SECTION = 0
@@ -20,20 +24,20 @@
                                  'tp', 'name', 'val'))
 
 
-class FioJobSection(object):
-    def __init__(self, name):
+class FioJobSection:
+    def __init__(self, name: str):
         self.name = name
         self.vals = OrderedDict()
 
-    def copy(self):
+    def copy(self) -> 'FioJobSection':
         return copy.deepcopy(self)
 
-    def required_vars(self):
+    def required_vars(self) -> Generator[str, Var]:
         for name, val in self.vals.items():
             if isinstance(val, Var):
                 yield name, val
 
-    def is_free(self):
+    def is_free(self) -> bool:
         return len(list(self.required_vars())) == 0
 
     def __str__(self):
@@ -51,7 +55,7 @@
 
 
 class ParseError(ValueError):
-    def __init__(self, msg, fname, lineno, line_cont=""):
+    def __init__(self, msg: str, fname: str, lineno: int, line_cont:Optional[str] =""):
         ValueError.__init__(self, msg)
         self.file_name = fname
         self.lineno = lineno
@@ -65,21 +69,11 @@
                           super(ParseError, self).__str__())
 
 
-def is_name(name):
-    if len(name) == 0:
-        return False
-
-    if name[0] != '_' and not name[0].isalpha():
-        return False
-
-    for ch in name[1:]:
-        if name[0] != '_' and not name[0].isalnum():
-            return False
-
-    return True
+def is_name(name: str) -> bool:
+    return re.match("[a-zA-Z_][a-zA-Z_0-9]*", name)
 
 
-def parse_value(val):
+def parse_value(val: str) -> Union[int, str, Dict, Var]:
     try:
         return int(val)
     except ValueError:
@@ -103,7 +97,7 @@
     return val
 
 
-def fio_config_lexer(fio_cfg, fname):
+def fio_config_lexer(fio_cfg: str, fname: str) -> Generator[CfgLine]:
     for lineno, oline in enumerate(fio_cfg.split("\n")):
         try:
             line = oline.strip()
@@ -136,7 +130,7 @@
             raise ParseError(str(exc), fname, lineno, oline)
 
 
-def fio_config_parse(lexer_iter):
+def fio_config_parse(lexer_iter: Iterable[CfgLine]) -> Generator[FioJobSection]:
     in_globals = False
     curr_section = None
     glob_vals = OrderedDict()
@@ -210,19 +204,7 @@
         yield curr_section
 
 
-def process_repeats(sec):
-    sec = sec.copy()
-    count = sec.vals.pop('NUM_ROUNDS', 1)
-    assert isinstance(count, (int, long))
-
-    for _ in range(count):
-        yield sec.copy()
-
-        if 'ramp_time' in sec.vals:
-            sec.vals['_ramp_time'] = sec.vals.pop('ramp_time')
-
-
-def process_cycles(sec):
+def process_cycles(sec: FioJobSection) -> Generator[FioJobSection]:
     cycles = OrderedDict()
 
     for name, val in sec.vals.items():
@@ -232,8 +214,8 @@
     if len(cycles) == 0:
         yield sec
     else:
-        # thread should changes faster
-        numjobs = cycles.pop('numjobs', None)
+        # qd should changes faster
+        numjobs = cycles.pop('qd', None)
         items = cycles.items()
 
         if len(items) > 0:
@@ -246,7 +228,7 @@
 
         if numjobs is not None:
             vals.append(numjobs)
-            keys.append('numjobs')
+            keys.append('qd')
 
         for combination in itertools.product(*vals):
             new_sec = sec.copy()
@@ -254,7 +236,11 @@
             yield new_sec
 
 
-def apply_params(sec, params):
+FIO_PARAM_VAL = Union[str, Var]
+FIO_PARAMS = Dict[str, FIO_PARAM_VAL]
+
+
+def apply_params(sec: FioJobSection, params: FIO_PARAMS) -> FioJobSection:
     processed_vals = OrderedDict()
     processed_vals.update(params)
     for name, val in sec.vals.items():
@@ -273,10 +259,7 @@
     return sec
 
 
-MAGIC_OFFSET = 0.1885
-
-
-def abbv_name_to_full(name):
+def abbv_name_to_full(name: str) -> str:
     assert len(name) == 3
 
     smode = {
@@ -291,12 +274,11 @@
         off_mode[name[0]] + " " + oper[name[1]]
 
 
-def finall_process(sec, counter=[0]):
-    sec = sec.copy()
+MAGIC_OFFSET = 0.1885
 
-    if sec.vals.get('numjobs', '1') != 1:
-        msg = "Group reporting should be set if numjobs != 1"
-        assert 'group_reporting' in sec.vals, msg
+
+def finall_process(sec: FioJobSection, counter: Optional[List[int]] = [0]) -> FioJobSection:
+    sec = sec.copy()
 
     sec.vals['unified_rw_reporting'] = '1'
 
@@ -328,7 +310,7 @@
     return sec
 
 
-def get_test_sync_mode(sec):
+def get_test_sync_mode(sec: FioJobSection) -> str:
     if isinstance(sec, dict):
         vals = sec
     else:
@@ -347,10 +329,10 @@
         return 'a'
 
 
-TestSumm = namedtuple("TestSumm", ("oper", "mode", "bsize", "th_count", "vm_count"))
+TestSumm = namedtuple("TestSumm", ("oper", "mode", "bsize", "iodepth", "vm_count"))
 
 
-def get_test_summary_tuple(sec, vm_count=None):
+def get_test_summary_tuple(sec: FioJobSection, vm_count=None) -> TestSumm:
     if isinstance(sec, dict):
         vals = sec
     else:
@@ -365,48 +347,51 @@
           "readwrite": "sm"}[vals["rw"]]
 
     sync_mode = get_test_sync_mode(sec)
-    th_count = vals.get('numjobs')
-
-    if th_count is None:
-        th_count = vals.get('concurence', 1)
 
     return TestSumm(rw,
                     sync_mode,
                     vals['blocksize'],
-                    th_count,
+                    vals['iodepth'],
                     vm_count)
 
 
-def get_test_summary(sec, vm_count=None):
+def get_test_summary(sec: FioJobSection, vm_count: int=None, noqd: Optional[bool]=False) -> str:
     tpl = get_test_summary_tuple(sec, vm_count)
-    res = "{0.oper}{0.mode}{0.bsize}th{0.th_count}".format(tpl)
+
+    res = "{0.oper}{0.mode}{0.bsize}".format(tpl)
+    if not noqd:
+        res += "qd{}".format(tpl.qd)
 
     if tpl.vm_count is not None:
-        res += "vm" + str(tpl.vm_count)
+        res += "vm{}".format(tpl.vm_count)
 
     return res
 
 
-def execution_time(sec):
+def execution_time(sec: FioJobSection) -> int:
     return sec.vals.get('ramp_time', 0) + sec.vals.get('runtime', 0)
 
 
-def parse_all_in_1(source, fname=None):
+def parse_all_in_1(source:str, fname: str=None) -> Generator[FioJobSection]:
     return fio_config_parse(fio_config_lexer(source, fname))
 
 
-def flatmap(func, inp_iter):
+FM_FUNC_INPUT = TypeVar("FM_FUNC_INPUT")
+FM_FUNC_RES = TypeVar("FM_FUNC_RES")
+
+
+def flatmap(func: Callable[[FM_FUNC_INPUT], Iterable[FM_FUNC_RES]],
+            inp_iter: Iterable[FM_FUNC_INPUT]) -> Generator[FM_FUNC_RES]:
     for val in inp_iter:
         for res in func(val):
             yield res
 
 
-def fio_cfg_compile(source, fname, test_params):
+def fio_cfg_compile(source: str, fname: str, test_params: FIO_PARAMS) -> Generator[FioJobSection]:
     it = parse_all_in_1(source, fname)
     it = (apply_params(sec, test_params) for sec in it)
     it = flatmap(process_cycles, it)
-    it = flatmap(process_repeats, it)
-    return itertools.imap(finall_process, it)
+    return map(finall_process, it)
 
 
 def parse_args(argv):
@@ -438,12 +423,12 @@
     sec_it = fio_cfg_compile(job_cfg, argv_obj.jobfile, params)
 
     if argv_obj.action == 'estimate':
-        print sec_to_str(sum(map(execution_time, sec_it)))
+        print(sec_to_str(sum(map(execution_time, sec_it))))
     elif argv_obj.action == 'num_tests':
-        print sum(map(len, map(list, sec_it)))
+        print(sum(map(len, map(list, sec_it))))
     elif argv_obj.action == 'compile':
         splitter = "\n#" + "-" * 70 + "\n\n"
-        print splitter.join(map(str, sec_it))
+        print(splitter.join(map(str, sec_it)))
 
     return 0
 
diff --git a/wally/suits/io/rpc_plugin.py b/wally/suits/io/rpc_plugin.py
new file mode 100644
index 0000000..ca3f0f3
--- /dev/null
+++ b/wally/suits/io/rpc_plugin.py
@@ -0,0 +1,15 @@
+def rpc_run_fio(cfg):
+    fio_cmd_templ = "cd {exec_folder}; {fio_path}fio --output-format=json " + \
+                    "--output={out_file} --alloc-size=262144 {job_file}"
+
+    # fnames_before = node.run("ls -1 " + exec_folder, nolog=True)
+    #
+    # timeout = int(exec_time + max(300, exec_time))
+    # soft_end_time = time.time() + exec_time
+    # logger.error("Fio timeouted on node {}. Killing it".format(node))
+    # end = time.time()
+    # fnames_after = node.run("ls -1 " + exec_folder, nolog=True)
+    #
+
+def parse_fio_result(data):
+    pass
diff --git a/wally/suits/io/rrd.cfg b/wally/suits/io/rrd.cfg
index f42dff6..86de738 100644
--- a/wally/suits/io/rrd.cfg
+++ b/wally/suits/io/rrd.cfg
@@ -1,6 +1,6 @@
 [global]
 include defaults.cfg
-QD={% 1, 5 %}
+NUMJOBS=8
 ramp_time=5
 runtime=5
 
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index 0529c1f..00492c9 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -3,18 +3,21 @@
 import logging
 import os.path
 import functools
+from typing import Dict, Any, List, Tuple
 
 from concurrent.futures import ThreadPoolExecutor
 
-from wally.utils import Barrier, StopTestError
-from wally.statistic import data_property
-from wally.ssh_utils import run_over_ssh, copy_paths
+from ..utils import Barrier, StopTestError
+from ..statistic import data_property
+from ..ssh_utils import copy_paths
+from ..inode import INode
+
 
 
 logger = logging.getLogger("wally")
 
 
-class TestConfig(object):
+class TestConfig:
     """
     this class describe test input configuration
 
@@ -25,8 +28,13 @@
     nodes:[Node] - node to run tests on
     remote_dir:str - directory on nodes to be used for local files
     """
-    def __init__(self, test_type, params, test_uuid, nodes,
-                 log_directory, remote_dir):
+    def __init__(self,
+                 test_type: str,
+                 params: Dict[str, Any],
+                 test_uuid: str,
+                 nodes: List[INode],
+                 log_directory: str,
+                 remote_dir: str):
         self.test_type = test_type
         self.params = params
         self.test_uuid = test_uuid
@@ -35,7 +43,7 @@
         self.remote_dir = remote_dir
 
 
-class TestResults(object):
+class TestResults:
     """
     this class describe test results
 
@@ -45,7 +53,11 @@
     raw_result:Any - opaque object to store raw results
     run_interval:(float, float) - test tun time, used for sensors
     """
-    def __init__(self, config, results, raw_result, run_interval):
+    def __init__(self,
+                 config: TestConfig,
+                 results: Dict[str, Any],
+                 raw_result: Any,
+                 run_interval: Tuple[float, float]):
         self.config = config
         self.params = config.params
         self.results = results
@@ -76,22 +88,22 @@
         pass
 
 
-class MeasurementMatrix(object):
-    """
-    data:[[MeasurementResult]] - VM_COUNT x TH_COUNT matrix of MeasurementResult
-    """
-    def __init__(self, data, connections_ids):
-        self.data = data
-        self.connections_ids = connections_ids
-
-    def per_vm(self):
-        return self.data
-
-    def per_th(self):
-        return sum(self.data, [])
+# class MeasurementMatrix:
+#     """
+#     data:[[MeasurementResult]] - VM_COUNT x TH_COUNT matrix of MeasurementResult
+#     """
+#     def __init__(self, data, connections_ids):
+#         self.data = data
+#         self.connections_ids = connections_ids
+#
+#     def per_vm(self):
+#         return self.data
+#
+#     def per_th(self):
+#         return sum(self.data, [])
 
 
-class MeasurementResults(object):
+class MeasurementResults:
     def stat(self):
         return data_property(self.data)
 
@@ -112,7 +124,7 @@
     data:[(float, float, float)] - list of (start_time, lenght, average_value_for_interval)
     odata: original values
     """
-    def __init__(self, data):
+    def __init__(self, data: List[Tuple[float, float, float]]):
         assert len(data) > 0
         self.odata = data[:]
         self.data = []
@@ -123,13 +135,13 @@
             cstart = nstart
 
     @property
-    def values(self):
+    def values(self) -> List[float]:
         return [val[2] for val in self.data]
 
-    def average_interval(self):
+    def average_interval(self) -> float:
         return float(sum([val[1] for val in self.data])) / len(self.data)
 
-    def skip(self, seconds):
+    def skip(self, seconds) -> 'TimeSeriesValue':
         nres = []
         for start, ln, val in self.data:
             nstart = start + ln - seconds
@@ -137,7 +149,7 @@
                 nres.append([nstart, val])
         return self.__class__(nres)
 
-    def derived(self, tdelta):
+    def derived(self, tdelta) -> 'TimeSeriesValue':
         end = self.data[-1][0] + self.data[-1][1]
         tdelta = float(tdelta)
 
@@ -166,7 +178,7 @@
         return self.__class__(res)
 
 
-class PerfTest(object):
+class PerfTest:
     """
     Very base class for tests
     config:TestConfig - test configuration
@@ -176,41 +188,32 @@
         self.config = config
         self.stop_requested = False
 
-    def request_stop(self):
+    def request_stop(self) -> None:
         self.stop_requested = True
 
-    def join_remote(self, path):
+    def join_remote(self, path: str) -> str:
         return os.path.join(self.config.remote_dir, path)
 
     @classmethod
     @abc.abstractmethod
-    def load(cls, path):
+    def load(cls, path: str):
         pass
 
     @abc.abstractmethod
-    def run(self):
+    def run(self) -> List[TestResults]:
         pass
 
     @abc.abstractmethod
-    def format_for_console(cls, data):
+    def format_for_console(cls, data: Any) -> str:
         pass
 
 
-def run_on_node(node):
-    def closure(*args, **kwargs):
-        return run_over_ssh(node.connection,
-                            *args,
-                            node=node.get_conn_id(),
-                            **kwargs)
-    return closure
-
-
 class ThreadedTest(PerfTest):
     """
     Base class for tests, which spawn separated thread for each node
     """
 
-    def run(self):
+    def run(self) -> List[TestResults]:
         barrier = Barrier(len(self.config.nodes))
         th_test_func = functools.partial(self.th_test_func, barrier)
 
@@ -218,44 +221,27 @@
             return list(pool.map(th_test_func, self.config.nodes))
 
     @abc.abstractmethod
-    def do_test(self, node):
+    def do_test(self, node: INode) -> TestResults:
         pass
 
-    def th_test_func(self, barrier, node):
-        logger.debug("Starting {0} test on {1} node".format(self.__class__.__name__,
-                                                            node.conn_url))
-
-        logger.debug("Run preparation for {0}".format(node.get_conn_id()))
+    def th_test_func(self, barrier: Barrier, node: INode) -> TestResults:
+        test_name = self.__class__.__name__
+        logger.debug("Starting {} test on {}".format(test_name , node))
+        logger.debug("Run test preparation on {}".format(node))
         self.pre_run(node)
+
+        # wait till all thread became ready
         barrier.wait()
+
+        logger.debug("Run test on {}".format(node))
         try:
-            logger.debug("Run test for {0}".format(node.get_conn_id()))
             return self.do_test(node)
-        except StopTestError as exc:
-            pass
         except Exception as exc:
-            msg = "In test {0} for node {1}".format(self, node.get_conn_id())
+            msg = "In test {} for {}".format(test_name, node)
             logger.exception(msg)
-            exc = StopTestError(msg, exc)
+            raise StopTestError(msg) from exc
 
-        try:
-            self.cleanup()
-        except StopTestError as exc1:
-            if exc is None:
-                exc = exc1
-        except Exception as exc1:
-            if exc is None:
-                msg = "Duringf cleanup - in test {0} for node {1}".format(self, node)
-                logger.exception(msg)
-                exc = StopTestError(msg, exc)
-
-        if exc is not None:
-            raise exc
-
-    def pre_run(self, node):
-        pass
-
-    def cleanup(self, node):
+    def pre_run(self, node: INode) -> None:
         pass
 
 
@@ -269,25 +255,22 @@
         self.prerun_tout = self.config.params.get('prerun_tout', 3600)
         self.run_tout = self.config.params.get('run_tout', 3600)
 
-    def get_remote_for_script(self, script):
-        return os.path.join(self.remote_dir,
-                            os.path.basename(script))
+    def get_remote_for_script(self, script: str) -> str:
+        return os.path.join(self.remote_dir, os.path.basename(script))
 
-    def pre_run(self, node):
+    def pre_run(self, node: INode) -> None:
         copy_paths(node.connection,
-                   {
-                       self.run_script: self.get_remote_for_script(self.run_script),
-                       self.prerun_script: self.get_remote_for_script(self.prerun_script),
-                   })
+                   {self.run_script: self.get_remote_for_script(self.run_script),
+                    self.prerun_script: self.get_remote_for_script(self.prerun_script)})
 
         cmd = self.get_remote_for_script(self.prerun_script)
         cmd += ' ' + self.config.params.get('prerun_opts', '')
-        run_on_node(node)(cmd, timeout=self.prerun_tout)
+        node.run(cmd, timeout=self.prerun_tout)
 
-    def do_test(self, node):
+    def do_test(self, node: INode) -> TestResults:
         cmd = self.get_remote_for_script(self.run_script)
         cmd += ' ' + self.config.params.get('run_opts', '')
         t1 = time.time()
-        res = run_on_node(node)(cmd, timeout=self.run_tout)
+        res = node.run(cmd, timeout=self.run_tout)
         t2 = time.time()
         return TestResults(self.config, None, res, (t1, t2))
diff --git a/wally/test_run_class.py b/wally/test_run_class.py
new file mode 100644
index 0000000..9ce3370
--- /dev/null
+++ b/wally/test_run_class.py
@@ -0,0 +1,20 @@
+class TestRun:
+    """Test run information"""
+    def __init__(self):
+        # NodesInfo list
+        self.nodes_info = []
+
+        # Nodes list
+        self.nodes = []
+
+        self.build_meta = {}
+        self.clear_calls_stack = []
+
+        # created openstack nodes
+        self.openstack_nodes_ids = []
+        self.sensors_mon_q = None
+
+        # openstack credentials
+        self.fuel_openstack_creds = None
+
+
diff --git a/wally/utils.py b/wally/utils.py
index 3fba2b0..32c9056 100644
--- a/wally/utils.py
+++ b/wally/utils.py
@@ -1,13 +1,18 @@
 import re
 import os
+import io
 import sys
 import socket
 import logging
+import ipaddress
 import threading
 import contextlib
 import subprocess
 import collections
 
+from .inode import INode
+from typing import Any, Tuple, Union, List, Generator, Dict, Callable, Iterable, Optional
+
 try:
     import psutil
 except ImportError:
@@ -17,48 +22,27 @@
 logger = logging.getLogger("wally")
 
 
-def is_ip(data):
-    if data.count('.') != 3:
-        return False
-
+def is_ip(data: str) -> bool:
     try:
-        for part in map(int, data.split('.')):
-            if part > 255 or part < 0:
-                raise ValueError()
+        ipaddress.ip_address(data)
+        return True
     except ValueError:
         return False
-    return True
 
 
 class StopTestError(RuntimeError):
-    def __init__(self, reason, orig_exc=None):
-        RuntimeError.__init__(self, reason)
-        self.orig_exc = orig_exc
+    pass
 
 
-@contextlib.contextmanager
-def log_block(message, exc_logger=None):
-    logger.debug("Starts : " + message)
-    with log_error(message, exc_logger):
-        yield
-    # try:
-    #     yield
-    # except Exception as exc:
-    #     if isinstance(exc, types) and not isinstance(exc, StopIteration):
-    #         templ = "Error during {0} stage: {1!s}"
-    #         logger.debug(templ.format(action, exc))
-    #     raise
-
-
-class log_error(object):
-    def __init__(self, message, exc_logger=None):
+class LogError:
+    def __init__(self, message: str, exc_logger=None):
         self.message = message
         self.exc_logger = exc_logger
 
     def __enter__(self):
         return self
 
-    def __exit__(self, tp, value, traceback):
+    def __exit__(self, tp: type, value: Exception, traceback: Any):
         if value is None or isinstance(value, StopTestError):
             return
 
@@ -71,13 +55,18 @@
         raise StopTestError(self.message, value)
 
 
-def check_input_param(is_ok, message):
+def log_block(message: str, exc_logger=None) -> LogError:
+    logger.debug("Starts : " + message)
+    return LogError(message, exc_logger)
+
+
+def check_input_param(is_ok: bool, message: str) -> None:
     if not is_ok:
         logger.error(message)
         raise StopTestError(message)
 
 
-def parse_creds(creds):
+def parse_creds(creds: str) -> Tuple[str, str, str]:
     # parse user:passwd@host
     user, passwd_host = creds.split(":", 1)
 
@@ -93,14 +82,14 @@
     pass
 
 
-class Barrier(object):
-    def __init__(self, count):
+class Barrier:
+    def __init__(self, count: int):
         self.count = count
         self.curr_count = 0
         self.cond = threading.Condition()
         self.exited = False
 
-    def wait(self, timeout=None):
+    def wait(self, timeout: int=None) -> bool:
         with self.cond:
             if self.exited:
                 raise TaksFinished()
@@ -114,7 +103,7 @@
                 self.cond.wait(timeout=timeout)
                 return False
 
-    def exit(self):
+    def exit(self) -> None:
         with self.cond:
             self.exited = True
 
@@ -122,9 +111,9 @@
 SMAP = dict(k=1024, m=1024 ** 2, g=1024 ** 3, t=1024 ** 4)
 
 
-def ssize2b(ssize):
+def ssize2b(ssize: Union[str, int]) -> int:
     try:
-        if isinstance(ssize, (int, long)):
+        if isinstance(ssize, int):
             return ssize
 
         ssize = ssize.lower()
@@ -132,7 +121,7 @@
             return int(ssize[:-1]) * SMAP[ssize[-1]]
         return int(ssize)
     except (ValueError, TypeError, AttributeError):
-        raise ValueError("Unknow size format {0!r}".format(ssize))
+        raise ValueError("Unknow size format {!r}".format(ssize))
 
 
 RSMAP = [('K', 1024),
@@ -141,18 +130,18 @@
          ('T', 1024 ** 4)]
 
 
-def b2ssize(size):
+def b2ssize(size: int) -> str:
     if size < 1024:
         return str(size)
 
     for name, scale in RSMAP:
         if size < 1024 * scale:
             if size % scale == 0:
-                return "{0} {1}i".format(size // scale, name)
+                return "{} {}i".format(size // scale, name)
             else:
-                return "{0:.1f} {1}i".format(float(size) / scale, name)
+                return "{:.1f} {}i".format(float(size) / scale, name)
 
-    return "{0}{1}i".format(size // scale, name)
+    return "{}{}i".format(size // scale, name)
 
 
 RSMAP_10 = [('k', 1000),
@@ -161,22 +150,22 @@
             ('t', 1000 ** 4)]
 
 
-def b2ssize_10(size):
+def b2ssize_10(size: int) -> str:
     if size < 1000:
         return str(size)
 
     for name, scale in RSMAP_10:
         if size < 1000 * scale:
             if size % scale == 0:
-                return "{0} {1}".format(size // scale, name)
+                return "{} {}".format(size // scale, name)
             else:
-                return "{0:.1f} {1}".format(float(size) / scale, name)
+                return "{:.1f} {}".format(float(size) / scale, name)
 
-    return "{0}{1}".format(size // scale, name)
+    return "{}{}".format(size // scale, name)
 
 
-def run_locally(cmd, input_data="", timeout=20):
-    shell = isinstance(cmd, basestring)
+def run_locally(cmd: Union[str, List[str]], input_data: str="", timeout:int =20) -> str:
+    shell = isinstance(cmd, str)
     proc = subprocess.Popen(cmd,
                             shell=shell,
                             stdin=subprocess.PIPE,
@@ -184,7 +173,7 @@
                             stderr=subprocess.PIPE)
     res = []
 
-    def thread_func():
+    def thread_func() -> None:
         rr = proc.communicate(input_data)
         res.extend(rr)
 
@@ -214,7 +203,7 @@
     return out
 
 
-def get_ip_for_target(target_ip):
+def get_ip_for_target(target_ip: str) -> str:
     if not is_ip(target_ip):
         target_ip = socket.gethostbyname(target_ip)
 
@@ -245,7 +234,7 @@
     raise OSError("Can't define interface for {0}".format(target_ip))
 
 
-def open_for_append_or_create(fname):
+def open_for_append_or_create(fname: str) -> io.IO:
     if not os.path.exists(fname):
         return open(fname, "w")
 
@@ -254,20 +243,17 @@
     return fd
 
 
-def sec_to_str(seconds):
+def sec_to_str(seconds: int) -> str:
     h = seconds // 3600
     m = (seconds % 3600) // 60
     s = seconds % 60
-    return "{0}:{1:02d}:{2:02d}".format(h, m, s)
+    return "{}:{:02d}:{:02d}".format(h, m, s)
 
 
-def yamable(data):
+def yamable(data: Any) -> Any:
     if isinstance(data, (tuple, list)):
         return map(yamable, data)
 
-    if isinstance(data, unicode):
-        return str(data)
-
     if isinstance(data, dict):
         res = {}
         for k, v in data.items():
@@ -280,16 +266,16 @@
 CLEANING = []
 
 
-def clean_resource(func, *args, **kwargs):
+def clean_resource(func: Callable[..., Any], *args, **kwargs) -> None:
     CLEANING.append((func, args, kwargs))
 
 
-def iter_clean_func():
+def iter_clean_func() -> Generator[Callable[..., Any], List[Any], Dict[str, Any]]:
     while CLEANING != []:
         yield CLEANING.pop()
 
 
-def flatten(data):
+def flatten(data: Iterable[Any]) -> List[Any]:
     res = []
     for i in data:
         if isinstance(i, (list, tuple, set)):
@@ -299,17 +285,17 @@
     return res
 
 
-def get_creds_openrc(path):
+def get_creds_openrc(path: str) -> Tuple[str, str, str, str, str]:
     fc = open(path).read()
 
     echo = 'echo "$OS_INSECURE:$OS_TENANT_NAME:$OS_USERNAME:$OS_PASSWORD@$OS_AUTH_URL"'
 
     msg = "Failed to get creads from openrc file"
-    with log_error(msg):
+    with LogError(msg):
         data = run_locally(['/bin/bash'], input_data=fc + "\n" + echo)
 
     msg = "Failed to get creads from openrc file: " + data
-    with log_error(msg):
+    with LogError(msg):
         data = data.strip()
         insecure_str, user, tenant, passwd_auth_url = data.split(':', 3)
         insecure = (insecure_str in ('1', 'True', 'true'))
@@ -323,20 +309,20 @@
 os_release = collections.namedtuple("Distro", ["distro", "release", "arch"])
 
 
-def get_os(run_func):
-    arch = run_func("arch", nolog=True).strip()
+def get_os(node: INode) -> os_release:
+    arch = node.run("arch", nolog=True).strip()
 
     try:
-        run_func("ls -l /etc/redhat-release", nolog=True)
+        node.run("ls -l /etc/redhat-release", nolog=True)
         return os_release('redhat', None, arch)
     except:
         pass
 
     try:
-        run_func("ls -l /etc/debian_version", nolog=True)
+        node.run("ls -l /etc/debian_version", nolog=True)
 
         release = None
-        for line in run_func("lsb_release -a", nolog=True).split("\n"):
+        for line in node.run("lsb_release -a", nolog=True).split("\n"):
             if ':' not in line:
                 continue
             opt, val = line.split(":", 1)
@@ -352,16 +338,16 @@
 
 
 @contextlib.contextmanager
-def empty_ctx(val=None):
+def empty_ctx(val: Any=None) -> Generator[Any]:
     yield val
 
 
-def mkdirs_if_unxists(path):
+def mkdirs_if_unxists(path: str) -> None:
     if not os.path.exists(path):
         os.makedirs(path)
 
 
-def log_nodes_statistic(nodes):
+def log_nodes_statistic(nodes: Iterable[INode]) -> None:
     logger.info("Found {0} nodes total".format(len(nodes)))
     per_role = collections.defaultdict(lambda: 0)
     for node in nodes:
@@ -372,7 +358,7 @@
         logger.debug("Found {0} nodes with role {1}".format(count, role))
 
 
-def which(program):
+def which(program: str) -> Optional[str]:
     def is_exe(fpath):
         return os.path.isfile(fpath) and os.access(fpath, os.X_OK)