diff --git a/wally/config.py b/wally/config.py
index c5d1db0..59db0e1 100644
--- a/wally/config.py
+++ b/wally/config.py
@@ -1,141 +1,60 @@
-import os
-import uuid
-import functools
-
-import yaml
-
-try:
-    from petname import Generate as pet_generate
-except ImportError:
-    def pet_generate(x, y):
-        return str(uuid.uuid4())
-
-from . import pretty_yaml
-
+from typing import Any, Dict
+from .storage import IStorable, IStorage
 
 class NoData:
     @classmethod
-    def get(cls, name, x):
+    def get(cls: type, name: str, x: Any) -> type:
         return cls
 
 
-class Config:
-    def __init__(self, val=None):
-        if val is not None:
-            self.update(val)
-        self.results_dir = None
-        self.run_uuid = None
-        self.settings = {}
-        self.run_params_file = None
-        self.default_test_local_folder = None
-        self.hwinfo_directory = None
-        self.hwreport_fname = None
+class Config(IStorable):
+    # for mypy only
+    run_uuid = None  # type: str
+    storage_url = None  # type: str
+    comment = None  # type: str
+    keep_vm = None  # type: bool
+    no_tests = None  # type: bool
+    dont_discover_nodes = None  # type: bool
+    build_id = None  # type: str
+    build_description = None  # type: str
+    build_type = None  # type: str
 
-    def get(self, name, defval=None):
-        obj = self.__dict__
-        for cname in name.split("."):
-            obj = obj.get(cname, NoData)
+    def __init__(self, dct: Dict[str, Any]) -> None:
+        self.__dict__['_dct'] = dct
 
-        if obj is NoData:
-            return defval
-        return obj
+    def get(self, path: str, default: Any = NoData) -> Any:
+        curr = self
 
-    def update(self, val):
-        self.__dict__.update(val)
+        while path:
+            if '/' in path:
+                name, path = path.split('/', 1)
+            else:
+                name = path
+                path = ""
+
+            try:
+                curr = getattr(curr, name)
+            except AttributeError:
+                return default
+
+        return curr
+
+    def __getattr__(self, name: str) -> Any:
+        try:
+            val = self.__dct[name]
+        except KeyError:
+            raise AttributeError(name)
+
+        if isinstance(val, dict):
+            val = self.__class__(val)
+
+        return val
+
+    def __setattr__(self, name: str, val: Any):
+        self.__dct[name] = val
 
 
-def get_test_files(results_dir):
-    in_var_dir = functools.partial(os.path.join, results_dir)
-
-    res = dict(
-        run_params_file='run_params.yaml',
-        saved_config_file='config.yaml',
-        vm_ids_fname='os_vm_ids',
-        html_report_file='{0}_report.html',
-        load_report_file='load_report.html',
-        text_report_file='report.txt',
-        log_file='log.txt',
-        sensor_storage='sensor_storage',
-        nodes_report_file='nodes.yaml',
-        results_storage='results',
-        hwinfo_directory='hwinfo',
-        hwreport_fname='hwinfo.txt',
-        raw_results='raw_results.yaml')
-
-    res = dict((k, in_var_dir(v)) for k, v in res.items())
-    res['results_dir'] = results_dir
-    return res
-
-
-def load_config(file_name):
-    file_name = os.path.abspath(file_name)
-
-    defaults = dict(
-        testnode_log_root='/tmp/wally',
-        settings={}
-    )
-
-    raw_cfg = yaml.load(open(file_name).read())
-    raw_cfg['config_folder'] = os.path.dirname(file_name)
-    if 'include' in raw_cfg:
-        default_path = os.path.join(raw_cfg['config_folder'],
-                                    raw_cfg.pop('include'))
-        default_cfg = yaml.load(open(default_path).read())
-
-        # TODO: Need more intelectual configs merge?
-        default_cfg.update(raw_cfg)
-        raw_cfg = default_cfg
-
-    cfg = Config(defaults)
-    cfg.update(raw_cfg)
-
-    results_storage = cfg.settings.get('results_storage', '/tmp')
-    results_storage = os.path.abspath(results_storage)
-
-    existing = file_name.startswith(results_storage)
-
-    if existing:
-        cfg.results_dir = os.path.dirname(file_name)
-        cfg.run_uuid = os.path.basename(cfg.results_dir)
-    else:
-        # genarate result folder name
-        for i in range(10):
-            cfg.run_uuid = pet_generate(2, "_")
-            cfg.results_dir = os.path.join(results_storage,
-                                           cfg.run_uuid)
-            if not os.path.exists(cfg.results_dir):
-                break
-        else:
-            cfg.run_uuid = str(uuid.uuid4())
-            cfg.results_dir = os.path.join(results_storage,
-                                           cfg.run_uuid)
-
-    # setup all files paths
-    cfg.update(get_test_files(cfg.results_dir))
-
-    if existing:
-        cfg.update(load_run_params(cfg.run_params_file))
-
-    testnode_log_root = cfg.get('testnode_log_root')
-    testnode_log_dir = os.path.join(testnode_log_root, "{0}/{{name}}")
-    cfg.default_test_local_folder = testnode_log_dir.format(cfg.run_uuid)
-
-    return cfg
-
-
-def save_run_params(cfg):
-    params = {
-        'comment': cfg.comment,
-        'run_uuid': cfg.run_uuid
-    }
-
-    with open(cfg.run_params_file, 'w') as fd:
-        fd.write(pretty_yaml.dumps(params))
-
-
-def load_run_params(run_params_file):
-    with open(run_params_file) as fd:
-        dt = yaml.load(fd)
-
-    return dict(run_uuid=dt['run_uuid'],
-                comment=dt.get('comment'))
+class Context:
+    def __init__(self, config: Config, storage: IStorage):
+        self.config = config
+        self.storage = storage
\ No newline at end of file
diff --git a/wally/discover/__init__.py b/wally/discover/__init__.py
index 3ac983e..e69de29 100644
--- a/wally/discover/__init__.py
+++ b/wally/discover/__init__.py
@@ -1,5 +0,0 @@
-"this package contains node discovery code"
-from .node import Node
-from .discover import discover
-
-__all__ = ["discover", "Node"]
diff --git a/wally/fuel_rest_api.py b/wally/fuel_rest_api.py
index 8799ed2..03beb37 100644
--- a/wally/fuel_rest_api.py
+++ b/wally/fuel_rest_api.py
@@ -4,7 +4,7 @@
 import logging
 import urllib.request
 import urllib.parse
-from typing import Dict, Any
+from typing import Dict, Any, Iterator, Match
 from functools import partial, wraps
 
 import netaddr
@@ -36,7 +36,7 @@
     def host(self) -> str:
         return self.root_url.split('/')[2]
 
-    def do(self, method: str, path: str, params: Dict[Any, Any]=None):
+    def do(self, method: str, path: str, params: Dict[Any, Any]=None) -> Dict[str, Any]:
         if path.startswith('/'):
             url = self.root_url + path
         else:
@@ -78,7 +78,7 @@
 
 
 class KeystoneAuth(Urllib2HTTP):
-    def __init__(self, root_url: str, creds: Dict[str, str], headers: Dict[str, str]=None):
+    def __init__(self, root_url: str, creds: Dict[str, str], headers: Dict[str, str] = None) -> None:
         super(KeystoneAuth, self).__init__(root_url, headers)
         admin_node_ip = urllib.parse.urlparse(root_url).hostname
         self.keystone_url = "http://{0}:5000/v2.0".format(admin_node_ip)
@@ -86,7 +86,7 @@
             auth_url=self.keystone_url, **creds)
         self.refresh_token()
 
-    def refresh_token(self):
+    def refresh_token(self) -> None:
         """Get new token from keystone and update headers"""
         try:
             self.keystone.authenticate()
@@ -96,7 +96,7 @@
                 'Cant establish connection to keystone with url %s',
                 self.keystone_url)
 
-    def do(self, method: str, path: str, params: Dict[str, str]=None):
+    def do(self, method: str, path: str, params: Dict[str, str] = None) -> Dict[str, Any]:
         """Do request. If gets 401 refresh token"""
         try:
             return super(KeystoneAuth, self).do(method, path, params)
@@ -110,7 +110,7 @@
                 raise
 
 
-def get_inline_param_list(url: str):
+def get_inline_param_list(url: str) -> Iterator[Match]:
     format_param_rr = re.compile(r"\{([a-zA-Z_]+)\}")
     for match in format_param_rr.finditer(url):
         yield match.group(1)
diff --git a/wally/hw_info.py b/wally/hw_info.py
index 214883d..4f21314 100644
--- a/wally/hw_info.py
+++ b/wally/hw_info.py
@@ -1,9 +1,10 @@
 import re
-from typing import Dict, Any, Iterable
+from typing import Dict, Iterable
 import xml.etree.ElementTree as ET
+from typing import List, Tuple
 
 from . import utils
-from .inode import INode
+from .interfaces import IRemoteNode
 
 
 def get_data(rr: str, data: str) -> str:
@@ -12,31 +13,31 @@
 
 
 class HWInfo:
-    def __init__(self):
-        self.hostname = None
-        self.cores = []
+    def __init__(self) -> None:
+        self.hostname = None  # type: str
+        self.cores = []  # type: List[Tuple[str, int]]
 
         # /dev/... devices
-        self.disks_info = {}
+        self.disks_info = {}  # type: Dict[str, Tuple[str, int]]
 
         # real disks on raid controller
-        self.disks_raw_info = {}
+        self.disks_raw_info = {}  # type: Dict[str, str]
 
         # name => (speed, is_full_diplex, ip_addresses)
-        self.net_info = {}
+        self.net_info = {}  # type: Dict[str, Tuple[int, bool, str]]
 
-        self.ram_size = 0
-        self.sys_name = None
-        self.mb = None
-        self.raw = None
+        self.ram_size = 0  # type: int
+        self.sys_name = None  # type: str
+        self.mb = None  # type: str
+        self.raw = None  # type: str
 
-        self.storage_controllers = []
+        self.storage_controllers = []  # type: List[str]
 
     def get_hdd_count(self) -> Iterable[int]:
         # SATA HDD COUNT, SAS 10k HDD COUNT, SAS SSD count, PCI-E SSD count
         return []
 
-    def get_summary(self) -> Dict[str, Any]:
+    def get_summary(self) -> Dict[str, int]:
         cores = sum(count for _, count in self.cores)
         disks = sum(size for _, size in self.disks_info.values())
 
@@ -103,21 +104,21 @@
 
 
 class SWInfo:
-    def __init__(self):
-        self.partitions = None
-        self.kernel_version = None
-        self.fio_version = None
-        self.libvirt_version = None
-        self.kvm_version = None
-        self.qemu_version = None
-        self.OS_version = None
-        self.ceph_version = None
+    def __init__(self) -> None:
+        self.partitions = None  # type: str
+        self.kernel_version = None  # type: str
+        self.fio_version = None  # type: str
+        self.libvirt_version = None  # type: str
+        self.kvm_version = None  # type: str
+        self.qemu_version = None  # type: str
+        self.OS_version = None  # type: str
+        self.ceph_version = None  # type: str
 
 
-def get_sw_info(node: INode) -> SWInfo:
+def get_sw_info(node: IRemoteNode) -> SWInfo:
     res = SWInfo()
 
-    res.OS_version = utils.get_os()
+    res.OS_version = utils.get_os(node)
     res.kernel_version = node.get_file_content('/proc/version')
     res.partitions = node.get_file_content('/etc/mtab')
     res.libvirt_version = node.run("virsh -v", nolog=True)
@@ -127,7 +128,7 @@
     return res
 
 
-def get_hw_info(node: INode) -> HWInfo:
+def get_hw_info(node: IRemoteNode) -> HWInfo:
     res = HWInfo()
     lshw_out = node.run('sudo lshw -xml 2>/dev/null', nolog=True)
 
diff --git a/wally/inode.py b/wally/inode.py
index 225a807..7851a4a 100644
--- a/wally/inode.py
+++ b/wally/inode.py
@@ -1,7 +1,9 @@
 import abc
-from typing import Set, Dict, Tuple, Any, Optional
+from typing import Set, Dict, Optional
 
 from .ssh_utils import parse_ssh_uri
+from . import hw_info
+from .interfaces import IRemoteNode, IHost
 
 
 class FuelNodeInfo:
@@ -9,11 +11,11 @@
     def __init__(self,
                  version: str,
                  fuel_ext_iface: str,
-                 openrc: Dict[str, str]):
+                 openrc: Dict[str, str]) -> None:
 
-        self.version = version
-        self.fuel_ext_iface = fuel_ext_iface
-        self.openrc = openrc
+        self.version = version  # type: str
+        self.fuel_ext_iface = fuel_ext_iface  # type: str
+        self.openrc = openrc  # type: Dict[str, str]
 
 
 class NodeInfo:
@@ -21,26 +23,31 @@
     def __init__(self,
                  ssh_conn_url: str,
                  roles: Set[str],
-                 bind_ip: str=None,
-                 ssh_key: str=None):
-        self.ssh_conn_url = ssh_conn_url
-        self.roles = roles
+                 bind_ip: str = None,
+                 ssh_key: str = None) -> None:
+        self.ssh_conn_url = ssh_conn_url  # type: str
+        self.roles = roles  # type: Set[str]
 
         if bind_ip is None:
             bind_ip = parse_ssh_uri(self.ssh_conn_url).host
 
-        self.bind_ip = bind_ip
-        self.ssh_key = ssh_key
+        self.bind_ip = bind_ip  # type: str
+        self.ssh_key = ssh_key  # type: Optional[str]
 
 
-class INode(metaclass=abc.ABCMeta):
+class INode(IRemoteNode, metaclass=abc.ABCMeta):
     """Node object"""
 
     def __init__(self, node_info: NodeInfo):
-        self.rpc = None
-        self.node_info = node_info
-        self.hwinfo = None
-        self.roles = []
+        IRemoteNode.__init__(self)
+        self.node_info = node_info  # type: NodeInfo
+        self.hwinfo = None  # type: hw_info.HWInfo
+        self.swinfo = None  # type: hw_info.SWInfo
+        self.os_vm_id = None  # type: str
+        self.ssh_conn = None  # type: IHost
+        self.ssh_conn_url = None  # type: str
+        self.rpc_conn = None
+        self.rpc_conn_url = None  # type: str
 
     @abc.abstractmethod
     def __str__(self):
@@ -50,57 +57,5 @@
         return str(self)
 
     @abc.abstractmethod
-    def is_connected(self) -> bool:
-        pass
-
-    @abc.abstractmethod
-    def connect_ssh(self, timeout: int=None) -> None:
-        pass
-
-    @abc.abstractmethod
-    def connect_rpc(self) -> None:
-        pass
-
-    @abc.abstractmethod
-    def prepare_rpc(self) -> None:
-        pass
-
-    @abc.abstractmethod
-    def get_ip(self) -> str:
-        pass
-
-    @abc.abstractmethod
-    def get_user(self) -> str:
-        pass
-
-    @abc.abstractmethod
-    def run(self, cmd: str, stdin_data: str=None, timeout: int=60, nolog: bool=False) -> str:
-        pass
-
-    @abc.abstractmethod
-    def discover_hardware_info(self) -> None:
-        pass
-
-    @abc.abstractmethod
-    def copy_file(self, local_path: str, remote_path: Optional[str]=None) -> str:
-        pass
-
-    @abc.abstractmethod
-    def get_file_content(self, path: str) -> bytes:
-        pass
-
-    @abc.abstractmethod
-    def put_to_file(self, path:str, content: bytes) -> None:
-        pass
-
-    @abc.abstractmethod
-    def forward_port(self, ip: str, remote_port: int, local_port: int=None) -> int:
-        pass
-
-    @abc.abstractmethod
-    def get_interface(self, ip: str) -> str:
-        pass
-
-    @abc.abstractmethod
-    def stat_file(self, path:str) -> Any:
+    def node_id(self) -> str:
         pass
diff --git a/wally/interfaces.py b/wally/interfaces.py
new file mode 100644
index 0000000..d87c753
--- /dev/null
+++ b/wally/interfaces.py
@@ -0,0 +1,81 @@
+import abc
+from typing import Any, Set, Dict
+
+
+class IRemoteShell(metaclass=abc.ABCMeta):
+    @abc.abstractmethod
+    def run(self, cmd: str, timeout: int = 60, nolog: bool = False) -> str:
+        pass
+
+
+class IHost(IRemoteShell, metaclass=abc.ABCMeta):
+    @abc.abstractmethod
+    def get_ip(self) -> str:
+        pass
+
+    @abc.abstractmethod
+    def __str__(self) -> str:
+        pass
+
+    @abc.abstractmethod
+    def put_to_file(self, path: str, content: bytes) -> None:
+        pass
+
+
+class IRemoteFS(metaclass=abc.ABCMeta):
+    @abc.abstractmethod
+    def copy_file(self, local_path: str, remote_path: str = None) -> str:
+        pass
+
+    @abc.abstractmethod
+    def get_file_content(self, path: str) -> bytes:
+        pass
+
+    @abc.abstractmethod
+    def put_to_file(self, path:str, content: bytes) -> None:
+        pass
+
+    @abc.abstractmethod
+    def forward_port(self, ip: str, remote_port: int, local_port: int = None) -> int:
+        pass
+
+    @abc.abstractmethod
+    def get_interface(self, ip: str) -> str:
+        pass
+
+    @abc.abstractmethod
+    def stat_file(self, path:str) -> Any:
+        pass
+
+
+class IRPC(metaclass=abc.ABCMeta):
+    pass
+
+
+class IRemoteNode(IRemoteFS, IRemoteShell, metaclass=abc.ABCMeta):
+
+    def __init__(self) -> None:
+        self.roles = set()  # type: Set[str]
+        self.rpc = None  # type: IRPC
+        self.rpc_params = None  # type: Dict[str, Any]
+
+    @abc.abstractmethod
+    def is_connected(self) -> bool:
+        pass
+
+    @abc.abstractmethod
+    def disconnect(self) -> None:
+        pass
+
+    @abc.abstractmethod
+    def connect_ssh(self, timeout: int = None) -> None:
+        pass
+
+    @abc.abstractmethod
+    def get_ip(self) -> str:
+        pass
+
+    @abc.abstractmethod
+    def get_user(self) -> str:
+        pass
+
diff --git a/wally/keystone.py b/wally/keystone.py
index 77c1d5b..358d128 100644
--- a/wally/keystone.py
+++ b/wally/keystone.py
@@ -14,9 +14,8 @@
 
     allowed_methods = ('get', 'put', 'post', 'delete', 'patch', 'head')
 
-    def __init__(self, root_url:str, headers:Dict[str, str]=None, echo: bool=False):
-        """
-        """
+    def __init__(self, root_url:str, headers:Dict[str, str]=None, echo: bool=False) -> None:
+        """"""
         if root_url.endswith('/'):
             self.root_url = root_url[:-1]
         else:
diff --git a/wally/logger.py b/wally/logger.py
index 43eff6b..a8cbf2a 100644
--- a/wally/logger.py
+++ b/wally/logger.py
@@ -1,7 +1,8 @@
 import logging
+from typing import Callable, IO
 
 
-def color_me(color):
+def color_me(color: int) -> Callable[[str], str]:
     RESET_SEQ = "\033[0m"
     COLOR_SEQ = "\033[1;%dm"
 
@@ -22,11 +23,11 @@
         'ERROR': color_me(RED)
     }
 
-    def __init__(self, msg, use_color=True, datefmt=None):
+    def __init__(self, msg: str, use_color: bool=True, datefmt: str=None) -> None:
         logging.Formatter.__init__(self, msg, datefmt=datefmt)
         self.use_color = use_color
 
-    def format(self, record):
+    def format(self, record: logging.LogRecord) -> str:
         orig = record.__dict__
         record.__dict__ = record.__dict__.copy()
         levelname = record.levelname
@@ -47,7 +48,7 @@
         return res
 
 
-def setup_loggers(def_level=logging.DEBUG, log_fname=None):
+def setup_loggers(def_level: int = logging.DEBUG, log_fname: str = None, log_fd: IO = None) -> None:
     logger = logging.getLogger('wally')
     logger.setLevel(logging.DEBUG)
     sh = logging.StreamHandler()
@@ -61,14 +62,18 @@
 
     logger_api = logging.getLogger("wally.fuel_api")
 
-    if log_fname is not None:
-        fh = logging.FileHandler(log_fname)
+    if log_fname or log_fd:
+        if log_fname:
+            handler = logging.FileHandler(log_fname)
+        else:
+            handler = logging.StreamHandler(log_fd)
+
         log_format = '%(asctime)s - %(levelname)8s - %(name)-15s - %(message)s'
         formatter = logging.Formatter(log_format, datefmt="%H:%M:%S")
-        fh.setFormatter(formatter)
-        fh.setLevel(logging.DEBUG)
-        logger.addHandler(fh)
-        logger_api.addHandler(fh)
+        handler.setFormatter(formatter)
+        handler.setLevel(logging.DEBUG)
+        logger.addHandler(handler)
+        logger_api.addHandler(handler)
     else:
         fh = None
 
diff --git a/wally/main.py b/wally/main.py
index 9358b53..11d6fc1 100644
--- a/wally/main.py
+++ b/wally/main.py
@@ -5,14 +5,19 @@
 import logging
 import argparse
 import functools
-
+from typing import List, Tuple, Any, Callable, IO, cast, TYPE_CHECKING
 from yaml import load as _yaml_load
 
+
+YLoader = Callable[[IO], Any]
+yaml_load = None  # type: YLoader
+
+
 try:
     from yaml import CLoader
-    yaml_load = functools.partial(_yaml_load, Loader=CLoader)
+    yaml_load = cast(YLoader,  functools.partial(_yaml_load, Loader=CLoader))
 except ImportError:
-    yaml_load = _yaml_load
+    yaml_load = cast(YLoader,  _yaml_load)
 
 
 import texttable
@@ -23,93 +28,55 @@
     faulthandler = None
 
 
-from .timeseries import SensorDatastore
 from . import utils, run_test, pretty_yaml
-from .config import (load_config,
-                     get_test_files, save_run_params, load_run_params)
+from .storage import make_storage, IStorage
+from .config import Config
 from .logger import setup_loggers
-from .stage import log_stage
+from .stage import log_stage, StageType
+from .test_run_class import TestRun
 
 
 logger = logging.getLogger("wally")
 
 
-def get_test_names(raw_res):
-    res = []
-    for tp, data in raw_res:
-        if not isinstance(data, list):
-            raise ValueError()
-
-        keys = []
-        for dt in data:
-            if not isinstance(dt, dict):
-                raise ValueError()
-
-            keys.append(",".join(dt.keys()))
-
-        res.append(tp + "(" + ",".join(keys) + ")")
-    return res
-
-
-def list_results(path):
+def list_results(path: str) -> List[Tuple[str, str, str, str]]:
     results = []
 
-    for dname in os.listdir(path):
+    for dir_name in os.listdir(path):
+        full_path = os.path.join(path, dir_name)
+
         try:
-            files_cfg = get_test_files(os.path.join(path, dname))
+            stor = make_storage(full_path, existing=True)
+        except Exception as exc:
+            logger.warning("Can't load folder {}. Error {}".format(full_path, exc))
 
-            if not os.path.isfile(files_cfg['raw_results']):
-                continue
+        comment = stor['info/comment']
+        run_uuid = stor['info/run_uuid']
+        run_time = stor['info/run_time']
+        test_types = ""
+        results.append((time.ctime(run_time),
+                        run_uuid,
+                        test_types,
+                        run_time,
+                        '-' if comment is None else comment))
 
-            mt = os.path.getmtime(files_cfg['raw_results'])
-            res_mtime = time.ctime(mt)
-
-            raw_res = yaml_load(open(files_cfg['raw_results']).read())
-            test_names = ",".join(sorted(get_test_names(raw_res)))
-
-            params = load_run_params(files_cfg['run_params_file'])
-
-            comm = params.get('comment')
-            results.append((mt, dname, test_names, res_mtime,
-                           '-' if comm is None else comm))
-        except ValueError:
-            pass
-
-    tab = texttable.Texttable(max_width=200)
-    tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
-    tab.set_cols_align(["l", "l", "l", "l"])
     results.sort()
-
-    for data in results[::-1]:
-        tab.add_row(data[1:])
-
-    tab.header(["Name", "Tests", "etime", "Comment"])
-
-    print(tab.draw())
+    return [i[1:] for i in results]
 
 
-def make_storage_dir_struct(cfg):
-    utils.mkdirs_if_unxists(cfg.results_dir)
-    utils.mkdirs_if_unxists(cfg.sensor_storage)
-    utils.mkdirs_if_unxists(cfg.hwinfo_directory)
-    utils.mkdirs_if_unxists(cfg.results_storage)
-
-
-def log_nodes_statistic_stage(_, ctx):
+def log_nodes_statistic_stage(ctx: TestRun) -> None:
     utils.log_nodes_statistic(ctx.nodes)
 
 
 def parse_args(argv):
     descr = "Disk io performance test suite"
     parser = argparse.ArgumentParser(prog='wally', description=descr)
-    parser.add_argument("-l", '--log-level',
-                        help="print some extra log info")
+    parser.add_argument("-l", '--log-level', help="print some extra log info")
 
     subparsers = parser.add_subparsers(dest='subparser_name')
 
     # ---------------------------------------------------------------------
-    compare_help = 'list all results'
-    report_parser = subparsers.add_parser('ls', help=compare_help)
+    report_parser = subparsers.add_parser('ls', help='list all results')
     report_parser.add_argument("result_storage", help="Folder with test results")
 
     # ---------------------------------------------------------------------
@@ -121,91 +88,93 @@
     # ---------------------------------------------------------------------
     report_help = 'run report on previously obtained results'
     report_parser = subparsers.add_parser('report', help=report_help)
-    report_parser.add_argument('--load_report', action='store_true')
     report_parser.add_argument("data_dir", help="folder with rest results")
 
     # ---------------------------------------------------------------------
     test_parser = subparsers.add_parser('test', help='run tests')
-    test_parser.add_argument('--build-description',
-                             type=str, default="Build info")
+    test_parser.add_argument('--build-description', type=str, default="Build info")
     test_parser.add_argument('--build-id', type=str, default="id")
     test_parser.add_argument('--build-type', type=str, default="GA")
-    test_parser.add_argument('-n', '--no-tests', action='store_true',
-                             help="Don't run tests", default=False)
-    test_parser.add_argument('--load_report', action='store_true')
-    test_parser.add_argument("-k", '--keep-vm', action='store_true',
-                             help="Don't remove test vm's", default=False)
+    test_parser.add_argument('-n', '--no-tests', action='store_true', help="Don't run tests")
+    test_parser.add_argument('--load-report', action='store_true')
+    test_parser.add_argument("-k", '--keep-vm', action='store_true', help="Don't remove test vm's")
     test_parser.add_argument("-d", '--dont-discover-nodes', action='store_true',
-                             help="Don't connect/discover fuel nodes",
-                             default=False)
-    test_parser.add_argument('--no-report', action='store_true',
-                             help="Skip report stages", default=False)
+                             help="Don't connect/discover fuel nodes")
+    test_parser.add_argument('--no-report', action='store_true', help="Skip report stages")
+    test_parser.add_argument('-r', '--resume', default=None, help="Resume previously stopped test, stored in DIR",
+                             metavar="DIR")
+    test_parser.add_argument('--result-dir', default=None, help="Save results to DIR", metavart="DIR")
     test_parser.add_argument("comment", help="Test information")
-    test_parser.add_argument("config_file", help="Yaml config file")
+    test_parser.add_argument("config_file", help="Yaml config file", nargs='?', default=None)
 
     # ---------------------------------------------------------------------
 
     return parser.parse_args(argv[1:])
 
 
-def main(argv):
+def main(argv: List[str]) -> int:
     if faulthandler is not None:
         faulthandler.register(signal.SIGUSR1, all_threads=True)
 
     opts = parse_args(argv)
-    stages = []
-    report_stages = []
 
-    ctx = Context()
-    ctx.results = {}
-    ctx.sensors_data = SensorDatastore()
+    stages = []  # type: List[StageType]
+    report_stages = []  # type: List[StageType]
+
+    # stop mypy from telling that config & storage might be undeclared
+    config = None  # type: Config
+    storage = None  # type: IStorage
 
     if opts.subparser_name == 'test':
-        cfg = load_config(opts.config_file)
-        make_storage_dir_struct(cfg)
-        cfg.comment = opts.comment
-        save_run_params(cfg)
+        if opts.resume:
+            storage = make_storage(opts.resume, existing=True)
+            config = storage.load('config', Config)
+        else:
+            file_name = os.path.abspath(opts.config_file)
+            with open(file_name) as fd:
+                config = Config(yaml_load(fd.read()))  # type: ignore
 
-        with open(cfg.saved_config_file, 'w') as fd:
-            fd.write(pretty_yaml.dumps(cfg.__dict__))
+            config.run_uuid = utils.get_uniq_path_uuid(config.results_dir)
+            config.storage_url = os.path.join(config.results_dir, config.run_uuid)
+            config.comment = opts.comment
+            config.keep_vm = opts.keep_vm
+            config.no_tests = opts.no_tests
+            config.dont_discover_nodes = opts.dont_discover_nodes
+            config.build_id = opts.build_id
+            config.build_description = opts.build_description
+            config.build_type = opts.build_type
 
-        stages = [
-            run_test.discover_stage
-        ]
+            storage = make_storage(config.storage_url)
+
+            storage['config'] = config
 
         stages.extend([
+            run_test.discover_stage,
             run_test.reuse_vms_stage,
             log_nodes_statistic_stage,
             run_test.save_nodes_stage,
             run_test.connect_stage])
 
-        if cfg.settings.get('collect_info', True):
+        if config.get("collect_info", True):
             stages.append(run_test.collect_hw_info_stage)
 
         stages.extend([
-            # deploy_sensors_stage,
             run_test.run_tests_stage,
             run_test.store_raw_results_stage,
-            # gather_sensors_stage
         ])
 
-        cfg.keep_vm = opts.keep_vm
-        cfg.no_tests = opts.no_tests
-        cfg.dont_discover_nodes = opts.dont_discover_nodes
-
-        ctx.build_meta['build_id'] = opts.build_id
-        ctx.build_meta['build_descrption'] = opts.build_description
-        ctx.build_meta['build_type'] = opts.build_type
-
     elif opts.subparser_name == 'ls':
-        list_results(opts.result_storage)
+        tab = texttable.Texttable(max_width=200)
+        tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
+        tab.set_cols_align(["l", "l", "l", "l"])
+        tab.header(["Name", "Tests", "Run at", "Comment"])
+        tab.add_rows(list_results(opts.result_storage))
+        print(tab.draw())
         return 0
 
     elif opts.subparser_name == 'report':
-        cfg = load_config(get_test_files(opts.data_dir)['saved_config_file'])
-        stages.append(run_test.load_data_from(opts.data_dir))
-        opts.no_report = False
-        # load build meta
+        storage = make_storage(opts.data_dir, existing=True)
+        config = storage.load('config', Config)
 
     elif opts.subparser_name == 'compare':
         x = run_test.load_data_from_path(opts.data_path1)
@@ -214,24 +183,25 @@
             [x['io'][0], y['io'][0]]))
         return 0
 
-    if not opts.no_report:
+    if not getattr(opts, "no_report", False):
         report_stages.append(run_test.console_report_stage)
-        if opts.load_report:
-            report_stages.append(run_test.test_load_report_stage)
         report_stages.append(run_test.html_report_stage)
 
+    # log level is not a part of config
     if opts.log_level is not None:
         str_level = opts.log_level
     else:
-        str_level = cfg.settings.get('log_level', 'INFO')
+        str_level = config.get('logging/log_level', 'INFO')
 
-    setup_loggers(getattr(logging, str_level), cfg.log_file)
-    logger.info("All info would be stored into " + cfg.results_dir)
+    setup_loggers(getattr(logging, str_level), log_fd=storage.get_stream('log'))
+    logger.info("All info would be stored into %r", config.storage_url)
+
+    ctx = TestRun(config, storage)
 
     for stage in stages:
         ok = False
         with log_stage(stage):
-            stage(cfg, ctx)
+            stage(ctx)
             ok = True
         if not ok:
             break
@@ -239,7 +209,7 @@
     exc, cls, tb = sys.exc_info()
     for stage in ctx.clear_calls_stack[::-1]:
         with log_stage(stage):
-            stage(cfg, ctx)
+            stage(ctx)
 
     logger.debug("Start utils.cleanup")
     for clean_func, args, kwargs in utils.iter_clean_func():
@@ -249,13 +219,13 @@
     if exc is None:
         for report_stage in report_stages:
             with log_stage(report_stage):
-                report_stage(cfg, ctx)
+                report_stage(ctx)
 
-    logger.info("All info stored into " + cfg.results_dir)
+    logger.info("All info is stored into %r", config.storage_url)
 
     if exc is None:
         logger.info("Tests finished successfully")
         return 0
     else:
-        logger.error("Tests are failed. See detailed error above")
+        logger.error("Tests are failed. See error details in log above")
         return 1
diff --git a/wally/meta_info.py b/wally/meta_info.py
index 19fdad9..3de33cb 100644
--- a/wally/meta_info.py
+++ b/wally/meta_info.py
@@ -1,5 +1,7 @@
 from typing import Any, Dict
 from urllib.parse import urlparse
+
+
 from .keystone import KeystoneAuth
 
 
@@ -35,7 +37,7 @@
     result = {}
 
     for node in lab_info:
-        # <koder>: give p,i,d,... vars meaningful names
+        # TODO(koder): give p,i,d,... vars meaningful names
         d = {}
         d['name'] = node['name']
         p = []
diff --git a/wally/node.py b/wally/node.py
index b146876..8435cf3 100644
--- a/wally/node.py
+++ b/wally/node.py
@@ -9,7 +9,9 @@
 class Node(INode):
     """Node object"""
 
-    def __init__(self, node_info: NodeInfo):
+    def __init__(self, node_info: NodeInfo) -> None:
+        INode.__init__(self)
+
         self.info = node_info
         self.roles = node_info.roles
         self.bind_ip = node_info.bind_ip
@@ -30,17 +32,17 @@
             self.ssh_cred = None
             self.node_id = None
 
-    def __str__(self):
+    def __str__(self) -> str:
         template = "<Node: url={conn_url!r} roles={roles}" + \
                    " connected={is_connected}>"
         return template.format(conn_url=self.ssh_conn_url,
                                roles=", ".join(self.roles),
                                is_connected=self.ssh_conn is not None)
 
-    def __repr__(self):
+    def __repr__(self) -> str:
         return str(self)
 
-    def connect_ssh(self) -> None:
+    def connect_ssh(self, timeout: int=None) -> None:
         self.ssh_conn = connect(self.ssh_conn_url)
 
     def connect_rpc(self) -> None:
@@ -106,3 +108,9 @@
                     return curr_iface
 
         raise KeyError("Can't found interface for ip {0}".format(ip))
+
+    def sync_hw_info(self) -> None:
+        pass
+
+    def sync_sw_info(self) -> None:
+        pass
\ No newline at end of file
diff --git a/wally/node_rpc.py b/wally/node_rpc.py
deleted file mode 100644
index 9a458ae..0000000
--- a/wally/node_rpc.py
+++ /dev/null
@@ -1,428 +0,0 @@
-import re
-import json
-import time
-import errno
-import socket
-import shutil
-import logging
-import os.path
-import getpass
-import StringIO
-import threading
-import subprocess
-
-import paramiko
-
-from agent import connect
-from .ssh_utils import Local, ssh_connect, ssh_copy_file
-
-
-logger = logging.getLogger("wally")
-
-
-def setup_node(conn, agent_path, ip):
-    agent_fname, log_fname = run_over_ssh(conn, "mktemp;echo;mktemp").strip().split()
-    with conn.open_sftp() as sftp:
-        ssh_copy_file(sftp, agent_path, agent_fname)
-
-    cmd = "python {} server -d --listen-addr={}:0 --stdout-file={}"
-    jdata = run_over_ssh(conn, cmd.format(agent_fname, ip, log_fname)).strip()
-    run_over_ssh(conn, "rm {}".format(agent_fname))
-    sett = json.loads(jdata)
-    return connect(sett['addr'])
-
-
-def exists(rpc, path):
-    """os.path.exists for paramiko's SCP object"""
-    return rpc.exists(path)
-
-
-def save_to_remote(sftp, path, content):
-    with sftp.open(path, "wb") as fd:
-        fd.write(content)
-
-
-def read_from_remote(sftp, path):
-    with sftp.open(path, "rb") as fd:
-        return fd.read()
-
-
-def normalize_dirpath(dirpath):
-    while dirpath.endswith("/"):
-        dirpath = dirpath[:-1]
-    return dirpath
-
-
-ALL_RWX_MODE = ((1 << 9) - 1)
-
-
-def ssh_mkdir(sftp, remotepath, mode=ALL_RWX_MODE, intermediate=False):
-    remotepath = normalize_dirpath(remotepath)
-    if intermediate:
-        try:
-            sftp.mkdir(remotepath, mode=mode)
-        except (IOError, OSError):
-            upper_dir = remotepath.rsplit("/", 1)[0]
-
-            if upper_dir == '' or upper_dir == '/':
-                raise
-
-            ssh_mkdir(sftp, upper_dir, mode=mode, intermediate=True)
-            return sftp.mkdir(remotepath, mode=mode)
-    else:
-        sftp.mkdir(remotepath, mode=mode)
-
-
-def ssh_copy_file(sftp, localfile, remfile, preserve_perm=True):
-    sftp.put(localfile, remfile)
-    if preserve_perm:
-        sftp.chmod(remfile, os.stat(localfile).st_mode & ALL_RWX_MODE)
-
-
-def put_dir_recursively(sftp, localpath, remotepath, preserve_perm=True):
-    "upload local directory to remote recursively"
-
-    # hack for localhost connection
-    if hasattr(sftp, "copytree"):
-        sftp.copytree(localpath, remotepath)
-        return
-
-    assert remotepath.startswith("/"), "%s must be absolute path" % remotepath
-
-    # normalize
-    localpath = normalize_dirpath(localpath)
-    remotepath = normalize_dirpath(remotepath)
-
-    try:
-        sftp.chdir(remotepath)
-        localsuffix = localpath.rsplit("/", 1)[1]
-        remotesuffix = remotepath.rsplit("/", 1)[1]
-        if localsuffix != remotesuffix:
-            remotepath = os.path.join(remotepath, localsuffix)
-    except IOError:
-        pass
-
-    for root, dirs, fls in os.walk(localpath):
-        prefix = os.path.commonprefix([localpath, root])
-        suffix = root.split(prefix, 1)[1]
-        if suffix.startswith("/"):
-            suffix = suffix[1:]
-
-        remroot = os.path.join(remotepath, suffix)
-
-        try:
-            sftp.chdir(remroot)
-        except IOError:
-            if preserve_perm:
-                mode = os.stat(root).st_mode & ALL_RWX_MODE
-            else:
-                mode = ALL_RWX_MODE
-            ssh_mkdir(sftp, remroot, mode=mode, intermediate=True)
-            sftp.chdir(remroot)
-
-        for f in fls:
-            remfile = os.path.join(remroot, f)
-            localfile = os.path.join(root, f)
-            ssh_copy_file(sftp, localfile, remfile, preserve_perm)
-
-
-def delete_file(conn, path):
-    sftp = conn.open_sftp()
-    sftp.remove(path)
-    sftp.close()
-
-
-def copy_paths(conn, paths):
-    sftp = conn.open_sftp()
-    try:
-        for src, dst in paths.items():
-            try:
-                if os.path.isfile(src):
-                    ssh_copy_file(sftp, src, dst)
-                elif os.path.isdir(src):
-                    put_dir_recursively(sftp, src, dst)
-                else:
-                    templ = "Can't copy {0!r} - " + \
-                            "it neither a file not a directory"
-                    raise OSError(templ.format(src))
-            except Exception as exc:
-                tmpl = "Scp {0!r} => {1!r} failed - {2!r}"
-                raise OSError(tmpl.format(src, dst, exc))
-    finally:
-        sftp.close()
-
-
-class ConnCreds(object):
-    conn_uri_attrs = ("user", "passwd", "host", "port", "path")
-
-    def __init__(self):
-        for name in self.conn_uri_attrs:
-            setattr(self, name, None)
-
-    def __str__(self):
-        return str(self.__dict__)
-
-
-uri_reg_exprs = []
-
-
-class URIsNamespace(object):
-    class ReParts(object):
-        user_rr = "[^:]*?"
-        host_rr = "[^:@]*?"
-        port_rr = "\\d+"
-        key_file_rr = "[^:@]*"
-        passwd_rr = ".*?"
-
-    re_dct = ReParts.__dict__
-
-    for attr_name, val in re_dct.items():
-        if attr_name.endswith('_rr'):
-            new_rr = "(?P<{0}>{1})".format(attr_name[:-3], val)
-            setattr(ReParts, attr_name, new_rr)
-
-    re_dct = ReParts.__dict__
-
-    templs = [
-        "^{host_rr}$",
-        "^{host_rr}:{port_rr}$",
-        "^{host_rr}::{key_file_rr}$",
-        "^{host_rr}:{port_rr}:{key_file_rr}$",
-        "^{user_rr}@{host_rr}$",
-        "^{user_rr}@{host_rr}:{port_rr}$",
-        "^{user_rr}@{host_rr}::{key_file_rr}$",
-        "^{user_rr}@{host_rr}:{port_rr}:{key_file_rr}$",
-        "^{user_rr}:{passwd_rr}@{host_rr}$",
-        "^{user_rr}:{passwd_rr}@{host_rr}:{port_rr}$",
-    ]
-
-    for templ in templs:
-        uri_reg_exprs.append(templ.format(**re_dct))
-
-
-def parse_ssh_uri(uri):
-    # user:passwd@ip_host:port
-    # user:passwd@ip_host
-    # user@ip_host:port
-    # user@ip_host
-    # ip_host:port
-    # ip_host
-    # user@ip_host:port:path_to_key_file
-    # user@ip_host::path_to_key_file
-    # ip_host:port:path_to_key_file
-    # ip_host::path_to_key_file
-
-    if uri.startswith("ssh://"):
-        uri = uri[len("ssh://"):]
-
-    res = ConnCreds()
-    res.port = "22"
-    res.key_file = None
-    res.passwd = None
-    res.user = getpass.getuser()
-
-    for rr in uri_reg_exprs:
-        rrm = re.match(rr, uri)
-        if rrm is not None:
-            res.__dict__.update(rrm.groupdict())
-            return res
-
-    raise ValueError("Can't parse {0!r} as ssh uri value".format(uri))
-
-
-def reconnect(conn, uri, **params):
-    if uri == 'local':
-        return conn
-
-    creds = parse_ssh_uri(uri)
-    creds.port = int(creds.port)
-    return ssh_connect(creds, reuse_conn=conn, **params)
-
-
-def connect(uri, **params):
-    if uri == 'local':
-        res = Local()
-    else:
-        creds = parse_ssh_uri(uri)
-        creds.port = int(creds.port)
-        res = ssh_connect(creds, **params)
-    return res
-
-
-all_sessions_lock = threading.Lock()
-all_sessions = {}
-
-
-class BGSSHTask(object):
-    CHECK_RETRY = 5
-
-    def __init__(self, node, use_sudo):
-        self.node = node
-        self.pid = None
-        self.use_sudo = use_sudo
-
-    def start(self, orig_cmd, **params):
-        uniq_name = 'test'
-        cmd = "screen -S {0} -d -m {1}".format(uniq_name, orig_cmd)
-        run_over_ssh(self.node.connection, cmd,
-                     timeout=10, node=self.node.get_conn_id(),
-                     **params)
-        processes = run_over_ssh(self.node.connection, "ps aux", nolog=True)
-
-        for iter in range(self.CHECK_RETRY):
-            for proc in processes.split("\n"):
-                if orig_cmd in proc and "SCREEN" not in proc:
-                    self.pid = proc.split()[1]
-                    break
-            if self.pid is not None:
-                break
-            time.sleep(1)
-
-        if self.pid is None:
-            self.pid = -1
-
-    def check_running(self):
-        assert self.pid is not None
-        if -1 == self.pid:
-            return False
-        try:
-            run_over_ssh(self.node.connection,
-                         "ls /proc/{0}".format(self.pid),
-                         timeout=10, nolog=True)
-            return True
-        except OSError:
-            return False
-
-    def kill(self, soft=True, use_sudo=True):
-        assert self.pid is not None
-        if self.pid == -1:
-            return True
-        try:
-            if soft:
-                cmd = "kill {0}"
-            else:
-                cmd = "kill -9 {0}"
-
-            if self.use_sudo:
-                cmd = "sudo " + cmd
-
-            run_over_ssh(self.node.connection,
-                         cmd.format(self.pid), nolog=True)
-            return True
-        except OSError:
-            return False
-
-    def wait(self, soft_timeout, timeout):
-        end_of_wait_time = timeout + time.time()
-        soft_end_of_wait_time = soft_timeout + time.time()
-
-        # time_till_check = random.randint(5, 10)
-        time_till_check = 2
-
-        # time_till_first_check = random.randint(2, 6)
-        time_till_first_check = 2
-        time.sleep(time_till_first_check)
-        if not self.check_running():
-            return True
-
-        while self.check_running() and time.time() < soft_end_of_wait_time:
-            # time.sleep(soft_end_of_wait_time - time.time())
-            time.sleep(time_till_check)
-
-        while end_of_wait_time > time.time():
-            time.sleep(time_till_check)
-            if not self.check_running():
-                break
-        else:
-            self.kill()
-            time.sleep(1)
-            if self.check_running():
-                self.kill(soft=False)
-            return False
-        return True
-
-
-def run_over_ssh(conn, cmd, stdin_data=None, timeout=60,
-                 nolog=False, node=None):
-    "should be replaces by normal implementation, with select"
-
-    if isinstance(conn, Local):
-        if not nolog:
-            logger.debug("SSH:local Exec {0!r}".format(cmd))
-        proc = subprocess.Popen(cmd, shell=True,
-                                stdin=subprocess.PIPE,
-                                stdout=subprocess.PIPE,
-                                stderr=subprocess.STDOUT)
-
-        stdoutdata, _ = proc.communicate(input=stdin_data)
-        if proc.returncode != 0:
-            templ = "SSH:{0} Cmd {1!r} failed with code {2}. Output: {3}"
-            raise OSError(templ.format(node, cmd, proc.returncode, stdoutdata))
-
-        return stdoutdata
-
-    transport = conn.get_transport()
-    session = transport.open_session()
-
-    if node is None:
-        node = ""
-
-    with all_sessions_lock:
-        all_sessions[id(session)] = session
-
-    try:
-        session.set_combine_stderr(True)
-
-        stime = time.time()
-
-        if not nolog:
-            logger.debug("SSH:{0} Exec {1!r}".format(node, cmd))
-
-        session.exec_command(cmd)
-
-        if stdin_data is not None:
-            session.sendall(stdin_data)
-
-        session.settimeout(1)
-        session.shutdown_write()
-        output = ""
-
-        while True:
-            try:
-                ndata = session.recv(1024)
-                output += ndata
-                if "" == ndata:
-                    break
-            except socket.timeout:
-                pass
-
-            if time.time() - stime > timeout:
-                raise OSError(output + "\nExecution timeout")
-
-        code = session.recv_exit_status()
-    finally:
-        found = False
-        with all_sessions_lock:
-            if id(session) in all_sessions:
-                found = True
-                del all_sessions[id(session)]
-
-        if found:
-            session.close()
-
-    if code != 0:
-        templ = "SSH:{0} Cmd {1!r} failed with code {2}. Output: {3}"
-        raise OSError(templ.format(node, cmd, code, output))
-
-    return output
-
-
-def close_all_sessions():
-    with all_sessions_lock:
-        for session in all_sessions.values():
-            try:
-                session.sendall('\x03')
-                session.close()
-            except:
-                pass
-        all_sessions.clear()
diff --git a/wally/pretty_yaml.py b/wally/pretty_yaml.py
index 4038ea8..7cd0f3a 100644
--- a/wally/pretty_yaml.py
+++ b/wally/pretty_yaml.py
@@ -1,7 +1,7 @@
 __doc__ = "functions for make pretty yaml files"
 __all__ = ['dumps']
 
-from typing import Any, Iterable
+from typing import Any, Iterable, List
 
 
 def dumps_simple(val: Any) -> str:
@@ -37,13 +37,13 @@
     return all(isinstance(val, (int, float)) for val in vals)
 
 
-def dumpv(data: Any, tab_sz: int=4, width: int=160, min_width: int=40) -> str:
+def dumpv(data: Any, tab_sz: int = 4, width: int = 160, min_width: int = 40) -> List[str]:
     tab = ' ' * tab_sz
 
     if width < min_width:
         width = min_width
 
-    res = []
+    res = []  # type: List[str]
     if is_simple(data):
         return [dumps_simple(data)]
 
@@ -108,5 +108,5 @@
     return res
 
 
-def dumps(data: Any, tab_sz: int=4, width: int=120, min_width: int=40) -> str:
+def dumps(data: Any, tab_sz: int = 4, width: int = 120, min_width: int = 40) -> str:
     return "\n".join(dumpv(data, tab_sz, width, min_width))
diff --git a/wally/run_test.py b/wally/run_test.py
index 4390f5e..b96ecf8 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -1,28 +1,16 @@
 import os
-import re
 import time
 import logging
 import functools
 import contextlib
 import collections
-from typing import List, Dict, Optional, Iterable, Any, Generator, Mapping, Callable
-from yaml import load as _yaml_load
+from typing import List, Dict, Iterable, Any, Iterator, Mapping, Callable, Tuple, Optional
+from concurrent.futures import ThreadPoolExecutor, Future
 
-try:
-    from yaml import CLoader
-    yaml_load = functools.partial(_yaml_load, Loader=CLoader)
-except ImportError:
-    yaml_load = _yaml_load
-
-from concurrent.futures import ThreadPoolExecutor, wait
-
-from .config import Config
-from .config import get_test_files
-from .discover import discover, Node
 from .inode import INode
+from .discover import discover
 from .test_run_class import TestRun
-
-from . import pretty_yaml, utils, report, ssh_utils, start_vms
+from . import pretty_yaml, utils, report, ssh_utils, start_vms, hw_info
 
 from .suits.mysql import MysqlTest
 from .suits.itest import TestConfig
@@ -42,27 +30,27 @@
 logger = logging.getLogger("wally")
 
 
-def connect_all(nodes: Iterable[INode], spawned_node: Optional[bool]=False) -> None:
+
+def connect_all(nodes: Iterable[INode],
+                pool: ThreadPoolExecutor,
+                conn_timeout: int = 30,
+                rpc_conn_callback: ssh_utils.RPCBeforeConnCallback = None) -> None:
     """Connect to all nodes, log errors
-    nodes:[Node] - list of nodes
-    spawned_node:bool - whenever nodes is newly spawned VM
+    nodes - list of nodes
     """
 
-    logger.info("Connecting to nodes")
-
-    conn_timeout = 240 if spawned_node else 30
+    logger.info("Connecting to %s nodes", len(nodes))
 
     def connect_ext(node: INode) -> bool:
         try:
             node.connect_ssh(conn_timeout)
-            node.connect_rpc(conn_timeout)
+            node.rpc, node.rpc_params = ssh_utils.setup_rpc(node, rpc_conn_callback=rpc_conn_callback)
             return True
         except Exception as exc:
             logger.error("During connect to {}: {!s}".format(node, exc))
             return False
 
-    with ThreadPoolExecutor(32) as pool:
-        list(pool.map(connect_ext, nodes))
+    list(pool.map(connect_ext, nodes))
 
     failed_testnodes = []
     failed_nodes = []
@@ -88,39 +76,25 @@
         logger.info("All nodes connected successfully")
 
 
-def collect_hw_info_stage(cfg: Config, nodes: Iterable[INode]) -> None:
-    # TODO(koder): rewrite this function, to use other storage type
-    if os.path.exists(cfg.hwreport_fname):
-        msg = "{0} already exists. Skip hw info"
-        logger.info(msg.format(cfg['hwreport_fname']))
-        return
+def collect_info_stage(ctx: TestRun, nodes: Iterable[INode]) -> None:
+    futures = {}  # type: Dict[str, Future]
 
-    with ThreadPoolExecutor(32) as pool:
-        fitures = pool.submit(node.discover_hardware_info
-                              for node in nodes)
-        wait(fitures)
-
-    with open(cfg.hwreport_fname, 'w') as hwfd:
+    with ctx.get_pool() as pool:
         for node in nodes:
-            hwfd.write("-" * 60 + "\n")
-            hwfd.write("Roles : " + ", ".join(node.roles) + "\n")
-            hwfd.write(str(node.hwinfo) + "\n")
-            hwfd.write("-" * 60 + "\n\n")
+            hw_info_path = "hw_info/{}".format(node.node_id())
+            if hw_info_path not in ctx.storage:
+                futures[hw_info_path] = pool.submit(hw_info.get_hw_info, node)
 
-            if node.hwinfo.hostname is not None:
-                fname = os.path.join(
-                    cfg.hwinfo_directory,
-                    node.hwinfo.hostname + "_lshw.xml")
+            sw_info_path = "sw_info/{}".format(node.node_id())
+            if sw_info_path not in ctx.storage:
+                futures[sw_info_path] = pool.submit(hw_info.get_sw_info, node)
 
-                with open(fname, "w") as fd:
-                    fd.write(node.hwinfo.raw)
-
-    logger.info("Hardware report stored in " + cfg.hwreport_fname)
-    logger.debug("Raw hardware info in " + cfg.hwinfo_directory + " folder")
+        for path, future in futures.items():
+            ctx.storage[path] = future.result()
 
 
 @contextlib.contextmanager
-def suspend_vm_nodes_ctx(unused_nodes: Iterable[INode]) -> Generator[List[int]]:
+def suspend_vm_nodes_ctx(unused_nodes: List[INode]) -> Iterator[List[int]]:
 
     pausable_nodes_ids = [node.os_vm_id for node in unused_nodes
                           if node.os_vm_id is not None]
@@ -163,17 +137,17 @@
 
 
 @contextlib.contextmanager
-def sensor_monitoring(sensor_cfg: Any, nodes: Iterable[INode]) -> Generator[None]:
+def sensor_monitoring(sensor_cfg: Any, nodes: Iterable[INode]) -> Iterator[None]:
     # TODO(koder): write this function
     pass
 
 
-def run_tests(cfg: Config, test_block, nodes: Iterable[INode]) -> None:
-    """
-    Run test from test block
-    """
+def run_tests(cfg: Config,
+              test_block: Dict[str, Dict[str, Any]],
+              nodes: Iterable[INode]) -> Iterator[Tuple[str, List[Any]]]:
+    """Run test from test block"""
+
     test_nodes = [node for node in nodes if 'testnode' in node.roles]
-    not_test_nodes = [node for node in nodes if 'testnode' not in node.roles]
 
     if len(test_nodes) == 0:
         logger.error("No test nodes found")
@@ -185,7 +159,7 @@
         # iterate over all node counts
         limit = params.get('node_limit', len(test_nodes))
         if isinstance(limit, int):
-            vm_limits = [limit]
+            vm_limits = [limit]  # type: List[int]
         else:
             list_or_tpl = isinstance(limit, (tuple, list))
             all_ints = list_or_tpl and all(isinstance(climit, int)
@@ -194,7 +168,7 @@
                 msg = "'node_limit' parameter ion config should" + \
                       "be either int or list if integers, not {0!r}".format(limit)
                 raise ValueError(msg)
-            vm_limits = limit
+            vm_limits = limit  # type: List[int]
 
         for vm_count in vm_limits:
             # select test nodes
@@ -249,7 +223,7 @@
 def connect_stage(cfg: Config, ctx: TestRun) -> None:
     ctx.clear_calls_stack.append(disconnect_stage)
     connect_all(ctx.nodes)
-    ctx.nodes = [node for node in ctx.nodes if node.connection is not None]
+    ctx.nodes = [node for node in ctx.nodes if node.is_connected()]
 
 
 def discover_stage(cfg: Config, ctx: TestRun) -> None:
@@ -279,7 +253,7 @@
             roles.remove('testnode')
 
         if len(roles) != 0:
-            cluster[node.conn_url] = roles
+            cluster[node.ssh_conn_url] = roles
 
     with open(cfg.nodes_report_file, "w") as fd:
         fd.write(pretty_yaml.dumps(cluster))
@@ -363,7 +337,7 @@
 
 
 def get_vm_keypair(cfg: Config) -> Dict[str, str]:
-    res = {}
+    res = {}  # type: Dict[str, str]
     for field, ext in (('keypair_file_private', 'pem'),
                        ('keypair_file_public', 'pub')):
         fpath = cfg.vm_configs.get(field)
@@ -379,7 +353,7 @@
 
 
 @contextlib.contextmanager
-def create_vms_ctx(ctx: TestRun, cfg: Config, config, already_has_count: int=0) -> Generator[List[INode]]:
+def create_vms_ctx(ctx: TestRun, cfg: Config, config, already_has_count: int=0) -> Iterator[List[INode]]:
     if config['count'].startswith('='):
         count = int(config['count'][1:])
         if count <= already_has_count:
@@ -405,7 +379,7 @@
 
     if not config.get('skip_preparation', False):
         logger.info("Preparing openstack")
-        start_vms.prepare_os_subpr(nova, params, os_creds)
+        start_vms.prepare_os(nova, params, os_creds)
 
     new_nodes = []
     old_nodes = ctx.nodes[:]
@@ -431,13 +405,13 @@
     ctx.results = collections.defaultdict(lambda: [])
 
     for group in cfg.get('tests', []):
-
-        if len(group.items()) != 1:
+        gitems = list(group.items())
+        if len(gitems) != 1:
             msg = "Items in tests section should have len == 1"
             logger.error(msg)
             raise utils.StopTestError(msg)
 
-        key, config = group.items()[0]
+        key, config = gitems[0]
 
         if 'start_test_nodes' == key:
             if 'openstack' not in config:
@@ -469,7 +443,8 @@
             if not cfg.no_tests:
                 for test_group in tests:
                     with sensor_ctx:
-                        for tp, res in run_tests(cfg, test_group, ctx.nodes):
+                        it = run_tests(cfg, test_group, ctx.nodes)
+                        for tp, res in it:
                             ctx.results[tp].extend(res)
 
 
@@ -489,7 +464,7 @@
         os.remove(vm_ids_fname)
 
 
-def store_nodes_in_log(cfg: Config, nodes_ids: Iterable[str]):
+def store_nodes_in_log(cfg: Config, nodes_ids: Iterable[str]) -> None:
     with open(cfg.vm_ids_fname, 'w') as fd:
         fd.write("\n".join(nodes_ids))
 
@@ -503,8 +478,7 @@
     ssh_utils.close_all_sessions()
 
     for node in ctx.nodes:
-        if node.connection is not None:
-            node.connection.close()
+        node.disconnect()
 
 
 def store_raw_results_stage(cfg: Config, ctx: TestRun) -> None:
@@ -577,7 +551,7 @@
             report.make_io_report(list(data[0]),
                                   cfg.get('comment', ''),
                                   html_rep_fname,
-                                  lab_info=ctx.hw_info)
+                                  lab_info=ctx.nodes)
 
 
 def load_data_from_path(test_res_dir: str) -> Mapping[str, List[Any]]:
@@ -599,5 +573,5 @@
         ctx.results.setdefault(tp, []).extend(vals)
 
 
-def load_data_from(var_dir: str) -> Callable:
+def load_data_from(var_dir: str) -> Callable[[TestRun], None]:
     return functools.partial(load_data_from_path_stage, var_dir)
diff --git a/wally/sensors.py b/wally/sensors_rpc_plugin.py
similarity index 98%
rename from wally/sensors.py
rename to wally/sensors_rpc_plugin.py
index 7560a11..a2758c3 100644
--- a/wally/sensors.py
+++ b/wally/sensors_rpc_plugin.py
@@ -1,8 +1,8 @@
 import os
 from collections import namedtuple
 
-
 SensorInfo = namedtuple("SensorInfo", ['value', 'is_accumulated'])
+# SensorInfo = NamedTuple("SensorInfo", [('value', int), ('is_accumulated', bool)])
 
 
 def provides(name: str):
@@ -321,7 +321,7 @@
 # 6 - irq: servicing interrupts
 # 7 - softirq: servicing softirqs
 
-io_values_pos = [
+cpu_values_pos = [
     (1, 'user_processes', True),
     (2, 'nice_processes', True),
     (3, 'system_processes', True),
@@ -341,7 +341,7 @@
         dev_name = vals[0]
 
         if dev_name == 'cpu':
-            for pos, name, accum_val in io_values_pos:
+            for pos, name, accum_val in cpu_values_pos:
                 sensor_name = "{0}.{1}".format(dev_name, name)
                 results[sensor_name] = SensorInfo(int(vals[pos]),
                                                   accum_val)
diff --git a/wally/sensors_rpc_plugin.pyi b/wally/sensors_rpc_plugin.pyi
new file mode 100644
index 0000000..c4b387b
--- /dev/null
+++ b/wally/sensors_rpc_plugin.pyi
@@ -0,0 +1,24 @@
+import os
+from typing import NamedTuple, TypeVar, Callable, Any, Optional, List, Iterable, Dict, Tuple
+
+SensorInfo = NamedTuple("SensorInfo", [('value', int), ('is_accumulated', bool)])
+Pid = TypeVar('Pid', str)
+AnyFunc = TypeVar('AnyFunc', Callable[..., Any])
+PrefixList = Optional[List[str]]
+SensorDict = Dict[str, SensorInfo]
+
+def provides(name: str) -> Callable[[AnyFunc], AnyFunc]: ...
+def is_dev_accepted(name, disallowed_prefixes: PrefixList, allowed_prefixes: PrefixList) -> bool: ...
+def get_pid_name(pid: Pid) -> str: ...
+def delta(func, only_upd: bool = True) -> Iterable[Optional[str], Optional[Dict[str, str]]]: ...
+def get_latency(stat1: SensorDict, stat2: SensorDict) -> SensorDict: ...
+def pid_stat(pid: Pid) -> float: ...
+def get_mem_stats(pid : Pid) -> Tuple[int, int]: ...
+def get_ram_size() -> str: ...
+
+def io_stat(disallowed_prefixes: PrefixList, allowed_prefixes: PrefixList) -> SensorDict: ...
+def net_stat(disallowed_prefixes: PrefixList, allowed_prefixes: PrefixList) -> SensorDict: ...
+def pscpu_stat(disallowed_prefixes: PrefixList, allowed_prefixes: PrefixList) -> SensorDict: ...
+def psram_stat(disallowed_prefixes: PrefixList, allowed_prefixes: PrefixList) -> SensorDict: ...
+def syscpu_stat(disallowed_prefixes: PrefixList, allowed_prefixes: PrefixList) -> SensorDict: ...
+def sysram_stat(disallowed_prefixes: PrefixList, allowed_prefixes: PrefixList) -> SensorDict: ...
diff --git a/wally/ssh_utils.py b/wally/ssh_utils.py
index ada4af6..2941e7c 100644
--- a/wally/ssh_utils.py
+++ b/wally/ssh_utils.py
@@ -1,298 +1,25 @@
 import re
+import json
 import time
-import errno
-import random
 import socket
-import shutil
 import logging
 import os.path
 import getpass
-import StringIO
-import threading
+from io import BytesIO
 import subprocess
+from typing import Union, Optional, cast, Dict, List, Tuple, Any, Callable
+from concurrent.futures import ThreadPoolExecutor
 
 import paramiko
 
+import agent
+
+from . import interfaces, utils
+
 
 logger = logging.getLogger("wally")
 
 
-class Local(object):
-    "simulate ssh connection to local"
-    @classmethod
-    def open_sftp(cls):
-        return cls()
-
-    @classmethod
-    def mkdir(cls, remotepath, mode=None):
-        os.mkdir(remotepath)
-        if mode is not None:
-            os.chmod(remotepath, mode)
-
-    @classmethod
-    def put(cls, localfile, remfile):
-        dirname = os.path.dirname(remfile)
-        if not os.path.exists(dirname):
-            os.makedirs(dirname)
-        shutil.copyfile(localfile, remfile)
-
-    @classmethod
-    def get(cls, remfile, localfile):
-        dirname = os.path.dirname(localfile)
-        if not os.path.exists(dirname):
-            os.makedirs(dirname)
-        shutil.copyfile(remfile, localfile)
-
-    @classmethod
-    def chmod(cls, path, mode):
-        os.chmod(path, mode)
-
-    @classmethod
-    def copytree(cls, src, dst):
-        shutil.copytree(src, dst)
-
-    @classmethod
-    def remove(cls, path):
-        os.unlink(path)
-
-    @classmethod
-    def close(cls):
-        pass
-
-    @classmethod
-    def open(cls, *args, **kwarhgs):
-        return open(*args, **kwarhgs)
-
-    @classmethod
-    def stat(cls, path):
-        return os.stat(path)
-
-    def __enter__(self):
-        return self
-
-    def __exit__(self, x, y, z):
-        return False
-
-
-NODE_KEYS = {}
-
-
-def exists(sftp, path):
-    "os.path.exists for paramiko's SCP object"
-    try:
-        sftp.stat(path)
-        return True
-    except IOError as e:
-        if e.errno == errno.ENOENT:
-            return False
-        raise
-
-
-def set_key_for_node(host_port, key):
-    sio = StringIO.StringIO(key)
-    NODE_KEYS[host_port] = paramiko.RSAKey.from_private_key(sio)
-    sio.close()
-
-
-def ssh_connect(creds, conn_timeout=60, reuse_conn=None):
-    if creds == 'local':
-        return Local()
-
-    tcp_timeout = 15
-    banner_timeout = 30
-
-    if reuse_conn is None:
-        ssh = paramiko.SSHClient()
-        ssh.load_host_keys('/dev/null')
-        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
-        ssh.known_hosts = None
-    else:
-        ssh = reuse_conn
-
-    etime = time.time() + conn_timeout
-
-    while True:
-        try:
-            tleft = etime - time.time()
-            c_tcp_timeout = min(tcp_timeout, tleft)
-
-            if paramiko.__version_info__ >= (1, 15, 2):
-                banner_timeout = {'banner_timeout': min(banner_timeout, tleft)}
-            else:
-                banner_timeout = {}
-
-            if creds.passwd is not None:
-                ssh.connect(creds.host,
-                            timeout=c_tcp_timeout,
-                            username=creds.user,
-                            password=creds.passwd,
-                            port=creds.port,
-                            allow_agent=False,
-                            look_for_keys=False,
-                            **banner_timeout)
-            elif creds.key_file is not None:
-                ssh.connect(creds.host,
-                            username=creds.user,
-                            timeout=c_tcp_timeout,
-                            key_filename=creds.key_file,
-                            look_for_keys=False,
-                            port=creds.port,
-                            **banner_timeout)
-            elif (creds.host, creds.port) in NODE_KEYS:
-                ssh.connect(creds.host,
-                            username=creds.user,
-                            timeout=c_tcp_timeout,
-                            pkey=NODE_KEYS[(creds.host, creds.port)],
-                            look_for_keys=False,
-                            port=creds.port,
-                            **banner_timeout)
-            else:
-                key_file = os.path.expanduser('~/.ssh/id_rsa')
-                ssh.connect(creds.host,
-                            username=creds.user,
-                            timeout=c_tcp_timeout,
-                            key_filename=key_file,
-                            look_for_keys=False,
-                            port=creds.port,
-                            **banner_timeout)
-            return ssh
-        except paramiko.PasswordRequiredException:
-            raise
-        except (socket.error, paramiko.SSHException):
-            if time.time() > etime:
-                raise
-            time.sleep(1)
-
-
-def save_to_remote(sftp, path, content):
-    with sftp.open(path, "wb") as fd:
-        fd.write(content)
-
-
-def read_from_remote(sftp, path):
-    with sftp.open(path, "rb") as fd:
-        return fd.read()
-
-
-def normalize_dirpath(dirpath):
-    while dirpath.endswith("/"):
-        dirpath = dirpath[:-1]
-    return dirpath
-
-
-ALL_RWX_MODE = ((1 << 9) - 1)
-
-
-def ssh_mkdir(sftp, remotepath, mode=ALL_RWX_MODE, intermediate=False):
-    remotepath = normalize_dirpath(remotepath)
-    if intermediate:
-        try:
-            sftp.mkdir(remotepath, mode=mode)
-        except (IOError, OSError):
-            upper_dir = remotepath.rsplit("/", 1)[0]
-
-            if upper_dir == '' or upper_dir == '/':
-                raise
-
-            ssh_mkdir(sftp, upper_dir, mode=mode, intermediate=True)
-            return sftp.mkdir(remotepath, mode=mode)
-    else:
-        sftp.mkdir(remotepath, mode=mode)
-
-
-def ssh_copy_file(sftp, localfile, remfile, preserve_perm=True):
-    sftp.put(localfile, remfile)
-    if preserve_perm:
-        sftp.chmod(remfile, os.stat(localfile).st_mode & ALL_RWX_MODE)
-
-
-def put_dir_recursively(sftp, localpath, remotepath, preserve_perm=True):
-    "upload local directory to remote recursively"
-
-    # hack for localhost connection
-    if hasattr(sftp, "copytree"):
-        sftp.copytree(localpath, remotepath)
-        return
-
-    assert remotepath.startswith("/"), "%s must be absolute path" % remotepath
-
-    # normalize
-    localpath = normalize_dirpath(localpath)
-    remotepath = normalize_dirpath(remotepath)
-
-    try:
-        sftp.chdir(remotepath)
-        localsuffix = localpath.rsplit("/", 1)[1]
-        remotesuffix = remotepath.rsplit("/", 1)[1]
-        if localsuffix != remotesuffix:
-            remotepath = os.path.join(remotepath, localsuffix)
-    except IOError:
-        pass
-
-    for root, dirs, fls in os.walk(localpath):
-        prefix = os.path.commonprefix([localpath, root])
-        suffix = root.split(prefix, 1)[1]
-        if suffix.startswith("/"):
-            suffix = suffix[1:]
-
-        remroot = os.path.join(remotepath, suffix)
-
-        try:
-            sftp.chdir(remroot)
-        except IOError:
-            if preserve_perm:
-                mode = os.stat(root).st_mode & ALL_RWX_MODE
-            else:
-                mode = ALL_RWX_MODE
-            ssh_mkdir(sftp, remroot, mode=mode, intermediate=True)
-            sftp.chdir(remroot)
-
-        for f in fls:
-            remfile = os.path.join(remroot, f)
-            localfile = os.path.join(root, f)
-            ssh_copy_file(sftp, localfile, remfile, preserve_perm)
-
-
-def delete_file(conn, path):
-    sftp = conn.open_sftp()
-    sftp.remove(path)
-    sftp.close()
-
-
-def copy_paths(conn, paths):
-    sftp = conn.open_sftp()
-    try:
-        for src, dst in paths.items():
-            try:
-                if os.path.isfile(src):
-                    ssh_copy_file(sftp, src, dst)
-                elif os.path.isdir(src):
-                    put_dir_recursively(sftp, src, dst)
-                else:
-                    templ = "Can't copy {0!r} - " + \
-                            "it neither a file not a directory"
-                    raise OSError(templ.format(src))
-            except Exception as exc:
-                tmpl = "Scp {0!r} => {1!r} failed - {2!r}"
-                raise OSError(tmpl.format(src, dst, exc))
-    finally:
-        sftp.close()
-
-
-class ConnCreds(object):
-    conn_uri_attrs = ("user", "passwd", "host", "port", "path")
-
-    def __init__(self):
-        for name in self.conn_uri_attrs:
-            setattr(self, name, None)
-
-    def __str__(self):
-        return str(self.__dict__)
-
-
-uri_reg_exprs = []
-
-
 class URIsNamespace(object):
     class ReParts(object):
         user_rr = "[^:]*?"
@@ -323,11 +50,29 @@
         "^{user_rr}:{passwd_rr}@{host_rr}:{port_rr}$",
     ]
 
+    uri_reg_exprs = []  # type: List[str]
     for templ in templs:
         uri_reg_exprs.append(templ.format(**re_dct))
 
 
-def parse_ssh_uri(uri):
+class ConnCreds:
+    conn_uri_attrs = ("user", "passwd", "host", "port", "key_file")
+
+    def __init__(self) -> None:
+        self.user = None  # type: Optional[str]
+        self.passwd = None  # type: Optional[str]
+        self.host = None  # type: str
+        self.port = 22  # type: int
+        self.key_file = None  # type: Optional[str]
+
+    def __str__(self) -> str:
+        return str(self.__dict__)
+
+
+SSHCredsType = Union[str, ConnCreds]
+
+
+def parse_ssh_uri(uri: str) -> ConnCreds:
     # [ssh://]+
     # user:passwd@ip_host:port
     # user:passwd@ip_host
@@ -344,12 +89,12 @@
         uri = uri[len("ssh://"):]
 
     res = ConnCreds()
-    res.port = "22"
+    res.port = 22
     res.key_file = None
     res.passwd = None
     res.user = getpass.getuser()
 
-    for rr in uri_reg_exprs:
+    for rr in URIsNamespace.uri_reg_exprs:
         rrm = re.match(rr, uri)
         if rrm is not None:
             res.__dict__.update(rrm.groupdict())
@@ -358,18 +103,174 @@
     raise ValueError("Can't parse {0!r} as ssh uri value".format(uri))
 
 
-def reconnect(conn, uri, **params):
+class LocalHost(interfaces.IHost):
+    def __str__(self):
+        return "<Local>"
+
+    def get_ip(self) -> str:
+        return 'localhost'
+
+    def put_to_file(self, path: str, content: bytes) -> None:
+        dirname = os.path.dirname(path)
+        if not os.path.exists(dirname):
+            os.makedirs(dirname)
+        with open(path, "wb") as fd:
+            fd.write(content)
+
+    def run(self, cmd: str, timeout: int = 60, nolog: bool = False) -> str:
+        proc = subprocess.Popen(cmd, shell=True,
+                                stdin=subprocess.PIPE,
+                                stdout=subprocess.PIPE,
+                                stderr=subprocess.STDOUT)
+
+        stdout_data, _ = proc.communicate()
+        if proc.returncode != 0:
+            templ = "SSH:{0} Cmd {1!r} failed with code {2}. Output: {3}"
+            raise OSError(templ.format(self, cmd, proc.returncode, stdout_data))
+
+        return stdout_data
+
+
+class SSHHost(interfaces.IHost):
+    def __init__(self, ssh_conn, node_name: str, ip: str) -> None:
+        self.conn = ssh_conn
+        self.node_name = node_name
+        self.ip = ip
+
+    def get_ip(self) -> str:
+        return self.ip
+
+    def __str__(self) -> str:
+        return self.node_name
+
+    def put_to_file(self, path: str, content: bytes) -> None:
+        with self.conn.open_sftp() as sftp:
+            with sftp.open(path, "wb") as fd:
+                fd.write(content)
+
+    def run(self, cmd: str, timeout: int = 60, nolog: bool = False) -> str:
+        transport = self.conn.get_transport()
+        session = transport.open_session()
+
+        try:
+            session.set_combine_stderr(True)
+
+            stime = time.time()
+
+            if not nolog:
+                logger.debug("SSH:{0} Exec {1!r}".format(self, cmd))
+
+            session.exec_command(cmd)
+            session.settimeout(1)
+            session.shutdown_write()
+            output = ""
+
+            while True:
+                try:
+                    ndata = session.recv(1024)
+                    output += ndata
+                    if "" == ndata:
+                        break
+                except socket.timeout:
+                    pass
+
+                if time.time() - stime > timeout:
+                    raise OSError(output + "\nExecution timeout")
+
+            code = session.recv_exit_status()
+        finally:
+            found = False
+
+            if found:
+                session.close()
+
+        if code != 0:
+            templ = "SSH:{0} Cmd {1!r} failed with code {2}. Output: {3}"
+            raise OSError(templ.format(self, cmd, code, output))
+
+        return output
+
+
+NODE_KEYS = {}  # type: Dict[Tuple[str, int], paramiko.RSAKey]
+
+
+def set_key_for_node(host_port: Tuple[str, int], key: bytes) -> None:
+    sio = BytesIO(key)
+    NODE_KEYS[host_port] = paramiko.RSAKey.from_private_key(sio)
+    sio.close()
+
+
+def ssh_connect(creds: SSHCredsType, conn_timeout: int = 60) -> interfaces.IHost:
+    if creds == 'local':
+        return LocalHost()
+
+    tcp_timeout = 15
+    default_banner_timeout = 30
+
+    ssh = paramiko.SSHClient()
+    ssh.load_host_keys('/dev/null')
+    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+    ssh.known_hosts = None
+
+    end_time = time.time() + conn_timeout  # type: float
+
+    while True:
+        try:
+            time_left = end_time - time.time()
+            c_tcp_timeout = min(tcp_timeout, time_left)
+
+            banner_timeout_arg = {}  # type: Dict[str, int]
+            if paramiko.__version_info__ >= (1, 15, 2):
+                banner_timeout_arg['banner_timeout'] = int(min(default_banner_timeout, time_left))
+
+            creds = cast(ConnCreds, creds)
+
+            if creds.passwd is not None:
+                ssh.connect(creds.host,
+                            timeout=c_tcp_timeout,
+                            username=creds.user,
+                            password=cast(str, creds.passwd),
+                            port=creds.port,
+                            allow_agent=False,
+                            look_for_keys=False,
+                            **banner_timeout_arg)
+            elif creds.key_file is not None:
+                ssh.connect(creds.host,
+                            username=creds.user,
+                            timeout=c_tcp_timeout,
+                            key_filename=cast(str, creds.key_file),
+                            look_for_keys=False,
+                            port=creds.port,
+                            **banner_timeout_arg)
+            elif (creds.host, creds.port) in NODE_KEYS:
+                ssh.connect(creds.host,
+                            username=creds.user,
+                            timeout=c_tcp_timeout,
+                            pkey=NODE_KEYS[(creds.host, creds.port)],
+                            look_for_keys=False,
+                            port=creds.port,
+                            **banner_timeout_arg)
+            else:
+                key_file = os.path.expanduser('~/.ssh/id_rsa')
+                ssh.connect(creds.host,
+                            username=creds.user,
+                            timeout=c_tcp_timeout,
+                            key_filename=key_file,
+                            look_for_keys=False,
+                            port=creds.port,
+                            **banner_timeout_arg)
+            return SSHHost(ssh, "{0.host}:{0.port}".format(creds), creds.host)
+        except paramiko.PasswordRequiredException:
+            raise
+        except (socket.error, paramiko.SSHException):
+            if time.time() > end_time:
+                raise
+            time.sleep(1)
+
+
+def connect(uri: str, **params) -> interfaces.IHost:
     if uri == 'local':
-        return conn
-
-    creds = parse_ssh_uri(uri)
-    creds.port = int(creds.port)
-    return ssh_connect(creds, reuse_conn=conn, **params)
-
-
-def connect(uri, **params):
-    if uri == 'local':
-        res = Local()
+        res = LocalHost()
     else:
         creds = parse_ssh_uri(uri)
         creds.port = int(creds.port)
@@ -377,180 +278,58 @@
     return res
 
 
-all_sessions_lock = threading.Lock()
-all_sessions = {}
+SetupResult = Tuple[interfaces.IRPC, Dict[str, Any]]
 
 
-class BGSSHTask(object):
-    CHECK_RETRY = 5
+RPCBeforeConnCallback = Callable[[interfaces.IHost, int], None]
 
-    def __init__(self, node, use_sudo):
-        self.node = node
-        self.pid = None
-        self.use_sudo = use_sudo
 
-    def start(self, orig_cmd, **params):
-        uniq_name = 'test'
-        cmd = "screen -S {0} -d -m {1}".format(uniq_name, orig_cmd)
-        run_over_ssh(self.node.connection, cmd,
-                     timeout=10, node=self.node.get_conn_id(),
-                     **params)
-        processes = run_over_ssh(self.node.connection, "ps aux", nolog=True)
+def setup_rpc(node: interfaces.IHost,
+              rpc_server_code: bytes,
+              port: int = 0,
+              rpc_conn_callback: RPCBeforeConnCallback = None) -> SetupResult:
+    code_file = node.run("mktemp").strip()
+    log_file = node.run("mktemp").strip()
+    node.put_to_file(code_file, rpc_server_code)
+    cmd = "python {code_file} server --listen-addr={listen_ip}:{port} --daemon " + \
+          "--show-settings --stdout-file={out_file}"
+    params_js = node.run(cmd.format(code_file=code_file,
+                                    listen_addr=node.get_ip(),
+                                    out_file=log_file,
+                                    port=port)).strip()
+    params = json.loads(params_js)
+    params['log_file'] = log_file
 
-        for iter in range(self.CHECK_RETRY):
-            for proc in processes.split("\n"):
-                if orig_cmd in proc and "SCREEN" not in proc:
-                    self.pid = proc.split()[1]
-                    break
-            if self.pid is not None:
-                break
-            time.sleep(1)
+    if rpc_conn_callback:
+        ip, port = rpc_conn_callback(node, port)
+    else:
+        ip = node.get_ip()
+        port = int(params['addr'].split(":")[1])
 
-        if self.pid is None:
-            self.pid = -1
+    return agent.connect((ip, port)), params
 
-    def check_running(self):
-        assert self.pid is not None
-        if -1 == self.pid:
-            return False
+
+def wait_ssh_awailable(addrs: List[Tuple[str, int]],
+                       timeout: int = 300,
+                       tcp_timeout: float = 1.0,
+                       max_threads: int = 32) -> None:
+    addrs = addrs[:]
+    tout = utils.Timeout(timeout)
+
+    def check_sock(addr):
+        s = socket.socket()
+        s.settimeout(tcp_timeout)
         try:
-            run_over_ssh(self.node.connection,
-                         "ls /proc/{0}".format(self.pid),
-                         timeout=10, nolog=True)
+            s.connect(addr)
             return True
-        except OSError:
+        except (socket.timeout, ConnectionRefusedError):
             return False
 
-    def kill(self, soft=True, use_sudo=True):
-        assert self.pid is not None
-        if self.pid == -1:
-            return True
-        try:
-            if soft:
-                cmd = "kill {0}"
-            else:
-                cmd = "kill -9 {0}"
-
-            if self.use_sudo:
-                cmd = "sudo " + cmd
-
-            run_over_ssh(self.node.connection,
-                         cmd.format(self.pid), nolog=True)
-            return True
-        except OSError:
-            return False
-
-    def wait(self, soft_timeout, timeout):
-        end_of_wait_time = timeout + time.time()
-        soft_end_of_wait_time = soft_timeout + time.time()
-
-        # time_till_check = random.randint(5, 10)
-        time_till_check = 2
-
-        # time_till_first_check = random.randint(2, 6)
-        time_till_first_check = 2
-        time.sleep(time_till_first_check)
-        if not self.check_running():
-            return True
-
-        while self.check_running() and time.time() < soft_end_of_wait_time:
-            # time.sleep(soft_end_of_wait_time - time.time())
-            time.sleep(time_till_check)
-
-        while end_of_wait_time > time.time():
-            time.sleep(time_till_check)
-            if not self.check_running():
-                break
-        else:
-            self.kill()
-            time.sleep(1)
-            if self.check_running():
-                self.kill(soft=False)
-            return False
-        return True
+    with ThreadPoolExecutor(max_workers=max_threads) as pool:
+        while addrs:
+            check_result = pool.map(check_sock, addrs)
+            addrs = [addr for ok, addr in zip(check_result, addrs) if not ok]  # type: List[Tuple[str, int]]
+            tout.tick()
 
 
-def run_over_ssh(conn, cmd, stdin_data=None, timeout=60,
-                 nolog=False, node=None):
-    "should be replaces by normal implementation, with select"
 
-    if isinstance(conn, Local):
-        if not nolog:
-            logger.debug("SSH:local Exec {0!r}".format(cmd))
-        proc = subprocess.Popen(cmd, shell=True,
-                                stdin=subprocess.PIPE,
-                                stdout=subprocess.PIPE,
-                                stderr=subprocess.STDOUT)
-
-        stdoutdata, _ = proc.communicate(input=stdin_data)
-        if proc.returncode != 0:
-            templ = "SSH:{0} Cmd {1!r} failed with code {2}. Output: {3}"
-            raise OSError(templ.format(node, cmd, proc.returncode, stdoutdata))
-
-        return stdoutdata
-
-    transport = conn.get_transport()
-    session = transport.open_session()
-
-    if node is None:
-        node = ""
-
-    with all_sessions_lock:
-        all_sessions[id(session)] = session
-
-    try:
-        session.set_combine_stderr(True)
-
-        stime = time.time()
-
-        if not nolog:
-            logger.debug("SSH:{0} Exec {1!r}".format(node, cmd))
-
-        session.exec_command(cmd)
-
-        if stdin_data is not None:
-            session.sendall(stdin_data)
-
-        session.settimeout(1)
-        session.shutdown_write()
-        output = ""
-
-        while True:
-            try:
-                ndata = session.recv(1024)
-                output += ndata
-                if "" == ndata:
-                    break
-            except socket.timeout:
-                pass
-
-            if time.time() - stime > timeout:
-                raise OSError(output + "\nExecution timeout")
-
-        code = session.recv_exit_status()
-    finally:
-        found = False
-        with all_sessions_lock:
-            if id(session) in all_sessions:
-                found = True
-                del all_sessions[id(session)]
-
-        if found:
-            session.close()
-
-    if code != 0:
-        templ = "SSH:{0} Cmd {1!r} failed with code {2}. Output: {3}"
-        raise OSError(templ.format(node, cmd, code, output))
-
-    return output
-
-
-def close_all_sessions():
-    with all_sessions_lock:
-        for session in all_sessions.values():
-            try:
-                session.sendall('\x03')
-                session.close()
-            except:
-                pass
-        all_sessions.clear()
diff --git a/wally/stage.py b/wally/stage.py
index 5f47a1b..e06e31d 100644
--- a/wally/stage.py
+++ b/wally/stage.py
@@ -1,28 +1,16 @@
 import logging
 import contextlib
-
+from typing import Callable
 
 from .utils import StopTestError
+from .test_run_class import TestRun
+
 
 logger = logging.getLogger("wally")
 
 
-class TestStage:
-    name = ""
-
-    def __init__(self, testrun, config):
-        self.testrun = testrun
-        self.config = config
-
-    def __enter__(self):
-        return self
-
-    def __exit__(self):
-        return self
-
-
 @contextlib.contextmanager
-def log_stage(stage):
+def log_stage(stage) -> None:
     msg_templ = "Exception during {0}: {1!s}"
     msg_templ_no_exc = "During {0}"
 
@@ -31,6 +19,9 @@
     try:
         yield
     except StopTestError as exc:
-        logger.error(msg_templ.format(stage.name, exc))
+        logger.error(msg_templ.format(stage.__name__, exc))
     except Exception:
-        logger.exception(msg_templ_no_exc.format(stage.name))
+        logger.exception(msg_templ_no_exc.format(stage.__name__))
+
+
+StageType = Callable[[TestRun], None]
diff --git a/wally/start_vms.py b/wally/start_vms.py
index 759d63b..075f348 100644
--- a/wally/start_vms.py
+++ b/wally/start_vms.py
@@ -5,18 +5,15 @@
 import urllib
 import os.path
 import logging
-import collections
-from typing import Dict, Any, Iterable
 
+from typing import Dict, Any, Iterable, Generator, NamedTuple
 from concurrent.futures import ThreadPoolExecutor
 
 from novaclient.exceptions import NotFound
 from novaclient.client import Client as n_client
 from cinderclient.v1.client import Client as c_client
 
-import wally
-from wally.discover import Node
-
+from .inode import NodeInfo
 
 __doc__ = """
 Module used to reliably spawn set of VM's, evenly distributed across
@@ -44,18 +41,23 @@
     return NOVA_CONNECTION is not None
 
 
-OSCreds = collections.namedtuple("OSCreds",
-                                 ["name", "passwd",
-                                  "tenant", "auth_url", "insecure"])
+OSCreds = NamedTuple("OSCreds",
+                     [("name", str),
+                      ("passwd", str),
+                      ("tenant", str),
+                      ("auth_url", str),
+                      ("insecure", bool)])
 
 
 def ostack_get_creds() -> OSCreds:
     if STORED_OPENSTACK_CREDS is None:
+        is_insecure = \
+            os.environ.get('OS_INSECURE', 'False').lower() in ('true', 'yes')
         return OSCreds(os.environ.get('OS_USERNAME'),
                        os.environ.get('OS_PASSWORD'),
                        os.environ.get('OS_TENANT_NAME'),
                        os.environ.get('OS_AUTH_URL'),
-                       os.environ.get('OS_INSECURE', False))
+                       is_insecure)
     else:
         return STORED_OPENSTACK_CREDS
 
@@ -106,8 +108,8 @@
                         break
 
 
-def pause(ids: Iterable[int]) -> None:
-    def pause_vm(conn, vm_id):
+def pause(ids: Iterable[str]) -> None:
+    def pause_vm(conn: n_client, vm_id: str) -> None:
         vm = conn.servers.get(vm_id)
         if vm.status == 'ACTIVE':
             vm.pause()
@@ -120,8 +122,8 @@
             future.result()
 
 
-def unpause(ids: Iterable[int], max_resume_time=10) -> None:
-    def unpause(conn, vm_id):
+def unpause(ids: Iterable[str], max_resume_time=10) -> None:
+    def unpause(conn: n_client, vm_id: str) -> None:
         vm = conn.servers.get(vm_id)
         if vm.status == 'PAUSED':
             vm.unpause()
@@ -424,7 +426,7 @@
     return [ip for ip in ip_list if ip.instance_id is None][:amount]
 
 
-def launch_vms(nova, params, already_has_count=0):
+def launch_vms(nova, params, already_has_count=0) -> Iterator[NodeInfo]:
     """launch virtual servers
 
     Parameters:
@@ -461,7 +463,7 @@
     lst = nova.services.list(binary='nova-compute')
     srv_count = len([srv for srv in lst if srv.status == 'enabled'])
 
-    if isinstance(count, basestring):
+    if isinstance(count, str):
         if count.startswith("x"):
             count = srv_count * int(count[1:])
         else:
@@ -474,7 +476,7 @@
 
     logger.debug("Starting new nodes on openstack")
 
-    assert isinstance(count, (int, long))
+    assert isinstance(count, int)
 
     srv_params = "img: {image[name]}, flavor: {flavor[name]}".format(**params)
     msg_templ = "Will start {0} servers with next params: {1}"
@@ -500,7 +502,7 @@
 
     for ip, os_node in create_vms_mt(NOVA_CONNECTION, count, **vm_params):
         conn_uri = creds.format(ip=ip, private_key_path=private_key_path)
-        yield Node(conn_uri, []), os_node.id
+        yield NodeInfo(conn_uri, []), os_node.id
 
 
 def get_free_server_grpoups(nova, template):
diff --git a/wally/storage.py b/wally/storage.py
new file mode 100644
index 0000000..5212f4a
--- /dev/null
+++ b/wally/storage.py
@@ -0,0 +1,122 @@
+"""
+This module contains interfaces for storage classes
+"""
+
+import abc
+from typing import Any, Iterable, TypeVar, Type, IO
+
+
+class IStorable(metaclass=abc.ABCMeta):
+    """Interface for type, which can be stored"""
+    @abc.abstractmethod
+    def __getstate__(self) -> Any:
+        pass
+
+    @abc.abstractmethod
+    def __setstate__(self, Any):
+        pass
+
+
+# all builtin types can be stored
+IStorable.register(list)  # type: ignore
+IStorable.register(dict)  # type: ignore
+IStorable.register(tuple)  # type: ignore
+IStorable.register(set)  # type: ignore
+IStorable.register(None)  # type: ignore
+IStorable.register(int)  # type: ignore
+IStorable.register(str)  # type: ignore
+IStorable.register(bytes)  # type: ignore
+IStorable.register(bool)  # type: ignore
+
+
+ObjClass = TypeVar('ObjClass')
+
+
+class IStorage(metaclass=abc.ABCMeta):
+    """interface for storage"""
+    @abc.abstractmethod
+    def __init__(self, path: str, existing_storage: bool = False) -> None:
+        pass
+
+    @abc.abstractmethod
+    def __setitem__(self, path: str, value: IStorable) -> None:
+        pass
+
+    @abc.abstractmethod
+    def __getitem__(self, path: str) -> IStorable:
+        pass
+
+    @abc.abstractmethod
+    def __contains__(self, path: str) -> bool:
+        pass
+
+    @abc.abstractmethod
+    def list(self, path: str) -> Iterable[str]:
+        pass
+
+    @abc.abstractmethod
+    def load(self, path: str, obj_class: Type[ObjClass]) -> ObjClass:
+        pass
+
+    @abc.abstractmethod
+    def get_stream(self, path: str) -> IO:
+        pass
+
+
+class ISimpleStorage(metaclass=abc.ABCMeta):
+    """interface for low-level storage, which doesn't support serialization
+    and can operate only on bytes"""
+
+    @abc.abstractmethod
+    def __init__(self, path: str) -> None:
+        pass
+
+    @abc.abstractmethod
+    def __setitem__(self, path: str, value: bytes) -> None:
+        pass
+
+    @abc.abstractmethod
+    def __getitem__(self, path: str) -> bytes:
+        pass
+
+    @abc.abstractmethod
+    def __contains__(self, path: str) -> bool:
+        pass
+
+    @abc.abstractmethod
+    def list(self, path: str) -> Iterable[str]:
+        pass
+
+    @abc.abstractmethod
+    def get_stream(self, path: str) -> IO:
+        pass
+
+
+class ISerializer(metaclass=abc.ABCMeta):
+    """Interface for serialization class"""
+    @abc.abstractmethod
+    def pack(self, value: IStorable) -> bytes:
+        pass
+
+    @abc.abstractmethod
+    def unpack(self, data: bytes) -> IStorable:
+        pass
+
+
+# TODO(koder): this is concrete storage and serializer classes to be implemented
+class FSStorage(IStorage):
+    """Store all data in files on FS"""
+
+    @abc.abstractmethod
+    def __init__(self, root_path: str, serializer: ISerializer, existing: bool = False) -> None:
+        pass
+
+
+class YAMLSerializer(ISerializer):
+    """Serialize data to yaml"""
+    pass
+
+
+def make_storage(url: str, existing: bool = False) -> IStorage:
+    return FSStorage(url, YAMLSerializer(), existing)
+
diff --git a/wally/suits/io/fio.py b/wally/suits/io/fio.py
index 50bb1fd..2360a55 100644
--- a/wally/suits/io/fio.py
+++ b/wally/suits/io/fio.py
@@ -778,7 +778,7 @@
     def prepare_data(cls, results) -> List[Dict[str, Any]]:
         """create a table with io performance report for console"""
 
-        def key_func(data) -> Tuple(str, str, str, str, int):
+        def key_func(data: FioRunResult) -> Tuple[str, str, str, str, int]:
             tpl = data.summary_tpl()
             return (data.name,
                     tpl.oper,
diff --git a/wally/suits/io/fio_task_parser.py b/wally/suits/io/fio_task_parser.py
index 233f6e2..1b6ba21 100644
--- a/wally/suits/io/fio_task_parser.py
+++ b/wally/suits/io/fio_task_parser.py
@@ -7,7 +7,7 @@
 import os.path
 import argparse
 import itertools
-from typing import Optional, Generator, Union, Dict, Iterable, Any, List, TypeVar, Callable
+from typing import Optional, Iterator, Union, Dict, Iterable, List, TypeVar, Callable, Tuple
 from collections import OrderedDict, namedtuple
 
 
@@ -32,7 +32,7 @@
     def copy(self) -> 'FioJobSection':
         return copy.deepcopy(self)
 
-    def required_vars(self) -> Generator[str, Var]:
+    def required_vars(self) -> Iterator[Tuple[str, Var]]:
         for name, val in self.vals.items():
             if isinstance(val, Var):
                 yield name, val
@@ -97,7 +97,7 @@
     return val
 
 
-def fio_config_lexer(fio_cfg: str, fname: str) -> Generator[CfgLine]:
+def fio_config_lexer(fio_cfg: str, fname: str) -> Iterator[CfgLine]:
     for lineno, oline in enumerate(fio_cfg.split("\n")):
         try:
             line = oline.strip()
@@ -130,7 +130,7 @@
             raise ParseError(str(exc), fname, lineno, oline)
 
 
-def fio_config_parse(lexer_iter: Iterable[CfgLine]) -> Generator[FioJobSection]:
+def fio_config_parse(lexer_iter: Iterable[CfgLine]) -> Iterator[FioJobSection]:
     in_globals = False
     curr_section = None
     glob_vals = OrderedDict()
@@ -204,7 +204,7 @@
         yield curr_section
 
 
-def process_cycles(sec: FioJobSection) -> Generator[FioJobSection]:
+def process_cycles(sec: FioJobSection) -> Iterator[FioJobSection]:
     cycles = OrderedDict()
 
     for name, val in sec.vals.items():
@@ -277,7 +277,7 @@
 MAGIC_OFFSET = 0.1885
 
 
-def finall_process(sec: FioJobSection, counter: Optional[List[int]] = [0]) -> FioJobSection:
+def finall_process(sec: FioJobSection, counter: List[int] = [0]) -> FioJobSection:
     sec = sec.copy()
 
     sec.vals['unified_rw_reporting'] = '1'
@@ -332,7 +332,7 @@
 TestSumm = namedtuple("TestSumm", ("oper", "mode", "bsize", "iodepth", "vm_count"))
 
 
-def get_test_summary_tuple(sec: FioJobSection, vm_count=None) -> TestSumm:
+def get_test_summary_tuple(sec: FioJobSection, vm_count: int = None) -> TestSumm:
     if isinstance(sec, dict):
         vals = sec
     else:
@@ -355,7 +355,7 @@
                     vm_count)
 
 
-def get_test_summary(sec: FioJobSection, vm_count: int=None, noqd: Optional[bool]=False) -> str:
+def get_test_summary(sec: FioJobSection, vm_count: int = None, noqd: bool = False) -> str:
     tpl = get_test_summary_tuple(sec, vm_count)
 
     res = "{0.oper}{0.mode}{0.bsize}".format(tpl)
@@ -372,7 +372,7 @@
     return sec.vals.get('ramp_time', 0) + sec.vals.get('runtime', 0)
 
 
-def parse_all_in_1(source:str, fname: str=None) -> Generator[FioJobSection]:
+def parse_all_in_1(source:str, fname: str = None) -> Iterator[FioJobSection]:
     return fio_config_parse(fio_config_lexer(source, fname))
 
 
@@ -381,13 +381,13 @@
 
 
 def flatmap(func: Callable[[FM_FUNC_INPUT], Iterable[FM_FUNC_RES]],
-            inp_iter: Iterable[FM_FUNC_INPUT]) -> Generator[FM_FUNC_RES]:
+            inp_iter: Iterable[FM_FUNC_INPUT]) -> Iterator[FM_FUNC_RES]:
     for val in inp_iter:
         for res in func(val):
             yield res
 
 
-def fio_cfg_compile(source: str, fname: str, test_params: FIO_PARAMS) -> Generator[FioJobSection]:
+def fio_cfg_compile(source: str, fname: str, test_params: FIO_PARAMS) -> Iterator[FioJobSection]:
     it = parse_all_in_1(source, fname)
     it = (apply_params(sec, test_params) for sec in it)
     it = flatmap(process_cycles, it)
diff --git a/wally/test_run_class.py b/wally/test_run_class.py
index 9ce3370..e937300 100644
--- a/wally/test_run_class.py
+++ b/wally/test_run_class.py
@@ -1,20 +1,37 @@
+from typing import List, Callable, Any, Dict, Optional
+from concurrent.futures import ThreadPoolExecutor
+
+
+from .timeseries import SensorDatastore
+from . import inode
+from .start_vms import OSCreds
+from .storage import IStorage
+from .config import Config
+
+
 class TestRun:
     """Test run information"""
-    def __init__(self):
+    def __init__(self, config: Config, storage: IStorage):
         # NodesInfo list
-        self.nodes_info = []
+        self.nodes_info = []  # type: List[inode.NodeInfo]
 
         # Nodes list
-        self.nodes = []
+        self.nodes = []  # type: List[inode.INode]
 
-        self.build_meta = {}
-        self.clear_calls_stack = []
+        self.build_meta = {}  # type: Dict[str,Any]
+        self.clear_calls_stack = []  # type: List[Callable[['TestRun'], None]]
 
         # created openstack nodes
-        self.openstack_nodes_ids = []
+        self.openstack_nodes_ids = []  # type: List[str]
         self.sensors_mon_q = None
 
         # openstack credentials
-        self.fuel_openstack_creds = None
+        self.fuel_openstack_creds = None  # type: Optional[OSCreds]
 
+        self.storage = storage
+        self.config = config
+        self.sensors_data = SensorDatastore()
+
+    def get_pool(self):
+        return ThreadPoolExecutor(self.config.get('worker_pool_sz', 32))
 
diff --git a/wally/utils.py b/wally/utils.py
index 32c9056..d2b867e 100644
--- a/wally/utils.py
+++ b/wally/utils.py
@@ -1,7 +1,8 @@
 import re
 import os
-import io
 import sys
+import time
+import uuid
 import socket
 import logging
 import ipaddress
@@ -10,14 +11,20 @@
 import subprocess
 import collections
 
-from .inode import INode
-from typing import Any, Tuple, Union, List, Generator, Dict, Callable, Iterable, Optional
+from .interfaces import IRemoteNode
+from typing import Any, Tuple, Union, List, Iterator, Dict, Callable, Iterable, Optional, IO, Sequence
 
 try:
     import psutil
 except ImportError:
     psutil = None
 
+try:
+    from petname import Generate as pet_generate
+except ImportError:
+    def pet_generate(x: str, y: str) -> str:
+        return str(uuid.uuid4())
+
 
 logger = logging.getLogger("wally")
 
@@ -35,16 +42,16 @@
 
 
 class LogError:
-    def __init__(self, message: str, exc_logger=None):
+    def __init__(self, message: str, exc_logger: logging.Logger = None) -> None:
         self.message = message
         self.exc_logger = exc_logger
 
-    def __enter__(self):
+    def __enter__(self) -> 'LogError':
         return self
 
-    def __exit__(self, tp: type, value: Exception, traceback: Any):
+    def __exit__(self, tp: type, value: Exception, traceback: Any) -> bool:
         if value is None or isinstance(value, StopTestError):
-            return
+            return False
 
         if self.exc_logger is None:
             exc_logger = sys._getframe(1).f_globals.get('logger', logger)
@@ -52,10 +59,10 @@
             exc_logger = self.exc_logger
 
         exc_logger.exception(self.message, exc_info=(tp, value, traceback))
-        raise StopTestError(self.message, value)
+        raise StopTestError(self.message) from value
 
 
-def log_block(message: str, exc_logger=None) -> LogError:
+def log_block(message: str, exc_logger:logging.Logger = None) -> LogError:
     logger.debug("Starts : " + message)
     return LogError(message, exc_logger)
 
@@ -67,7 +74,7 @@
 
 
 def parse_creds(creds: str) -> Tuple[str, str, str]:
-    # parse user:passwd@host
+    """Parse simple credentials format user[:passwd]@host"""
     user, passwd_host = creds.split(":", 1)
 
     if '@' not in passwd_host:
@@ -78,12 +85,12 @@
     return user, passwd, host
 
 
-class TaksFinished(Exception):
+class TaskFinished(Exception):
     pass
 
 
 class Barrier:
-    def __init__(self, count: int):
+    def __init__(self, count: int) -> None:
         self.count = count
         self.curr_count = 0
         self.cond = threading.Condition()
@@ -92,7 +99,7 @@
     def wait(self, timeout: int=None) -> bool:
         with self.cond:
             if self.exited:
-                raise TaksFinished()
+                raise TaskFinished()
 
             self.curr_count += 1
             if self.curr_count == self.count:
@@ -134,6 +141,10 @@
     if size < 1024:
         return str(size)
 
+    # make mypy happy
+    scale = 1
+    name = ""
+
     for name, scale in RSMAP:
         if size < 1024 * scale:
             if size % scale == 0:
@@ -154,6 +165,10 @@
     if size < 1000:
         return str(size)
 
+    # make mypy happy
+    scale = 1
+    name = ""
+
     for name, scale in RSMAP_10:
         if size < 1000 * scale:
             if size % scale == 0:
@@ -165,16 +180,21 @@
 
 
 def run_locally(cmd: Union[str, List[str]], input_data: str="", timeout:int =20) -> str:
-    shell = isinstance(cmd, str)
+    if isinstance(cmd, str):
+        shell = True
+        cmd_str = cmd
+    else:
+        cmd_str = " ".join(cmd)
+
     proc = subprocess.Popen(cmd,
                             shell=shell,
                             stdin=subprocess.PIPE,
                             stdout=subprocess.PIPE,
                             stderr=subprocess.PIPE)
-    res = []
+    res = []  # type: List[Tuple[bytes, bytes]]
 
     def thread_func() -> None:
-        rr = proc.communicate(input_data)
+        rr = proc.communicate(input_data.encode("utf8"))
         res.extend(rr)
 
     thread = threading.Thread(target=thread_func,
@@ -193,12 +213,16 @@
             proc.kill()
 
         thread.join()
-        raise RuntimeError("Local process timeout: " + str(cmd))
+        raise RuntimeError("Local process timeout: " + cmd_str)
 
-    out, err = res
+    stdout_data, stderr_data = zip(*res)  # type: List[bytes], List[bytes]
+
+    out = b"".join(stdout_data).decode("utf8")
+    err = b"".join(stderr_data).decode("utf8")
+
     if 0 != proc.returncode:
         raise subprocess.CalledProcessError(proc.returncode,
-                                            cmd, out + err)
+                                            cmd_str, out + err)
 
     return out
 
@@ -234,7 +258,7 @@
     raise OSError("Can't define interface for {0}".format(target_ip))
 
 
-def open_for_append_or_create(fname: str) -> io.IO:
+def open_for_append_or_create(fname: str) -> IO:
     if not os.path.exists(fname):
         return open(fname, "w")
 
@@ -263,15 +287,15 @@
     return data
 
 
-CLEANING = []
+CLEANING = []  # type: List[Tuple[Callable[..., Any], List[Any], Dict[str, Any]]]
 
 
-def clean_resource(func: Callable[..., Any], *args, **kwargs) -> None:
-    CLEANING.append((func, args, kwargs))
+def clean_resource(func: Callable[..., Any], *args: Any, **kwargs: Any) -> None:
+    CLEANING.append((func, list(args), kwargs))
 
 
-def iter_clean_func() -> Generator[Callable[..., Any], List[Any], Dict[str, Any]]:
-    while CLEANING != []:
+def iter_clean_func() -> Iterator[Tuple[Callable[..., Any], List[Any], Dict[str, Any]]]:
+    while CLEANING:
         yield CLEANING.pop()
 
 
@@ -285,7 +309,7 @@
     return res
 
 
-def get_creds_openrc(path: str) -> Tuple[str, str, str, str, str]:
+def get_creds_openrc(path: str) -> Tuple[str, str, str, str, bool]:
     fc = open(path).read()
 
     echo = 'echo "$OS_INSECURE:$OS_TENANT_NAME:$OS_USERNAME:$OS_PASSWORD@$OS_AUTH_URL"'
@@ -309,7 +333,9 @@
 os_release = collections.namedtuple("Distro", ["distro", "release", "arch"])
 
 
-def get_os(node: INode) -> os_release:
+def get_os(node: IRemoteNode) -> os_release:
+    """return os type, release and architecture for node.
+    """
     arch = node.run("arch", nolog=True).strip()
 
     try:
@@ -338,7 +364,7 @@
 
 
 @contextlib.contextmanager
-def empty_ctx(val: Any=None) -> Generator[Any]:
+def empty_ctx(val: Any=None) -> Iterator[Any]:
     yield val
 
 
@@ -347,9 +373,10 @@
         os.makedirs(path)
 
 
-def log_nodes_statistic(nodes: Iterable[INode]) -> None:
+def log_nodes_statistic(nodes: Sequence[IRemoteNode]) -> None:
     logger.info("Found {0} nodes total".format(len(nodes)))
-    per_role = collections.defaultdict(lambda: 0)
+
+    per_role = collections.defaultdict(int)  # type: Dict[str, int]
     for node in nodes:
         for role in node.roles:
             per_role[role] += 1
@@ -369,3 +396,31 @@
             return exe_file
 
     return None
+
+
+def get_uniq_path_uuid(path: str, max_iter: int = 10) -> Tuple[str, str]:
+    for i in range(max_iter):
+        run_uuid = pet_generate(2, "_")
+        results_dir = os.path.join(path, run_uuid)
+        if not os.path.exists(results_dir):
+            break
+    else:
+        run_uuid = str(uuid.uuid4())
+        results_dir = os.path.join(path, run_uuid)
+
+    return results_dir, run_uuid
+
+
+class Timeout:
+    def __init__(self, timeout: int, message: str = None) -> None:
+        self.etime = time.time() + timeout
+        self.message = message
+
+    def tick(self) -> None:
+        if time.time() > self.etime:
+            if self.message:
+                msg = "Timeout: {}".format(self.message)
+            else:
+                msg = "Timeout"
+
+            raise TimeoutError(msg)
\ No newline at end of file
