Add stage base class, refactor discovery, etc
diff --git a/wally/ceph.py b/wally/ceph.py
new file mode 100644
index 0000000..e23343e
--- /dev/null
+++ b/wally/ceph.py
@@ -0,0 +1,112 @@
+""" Collect data about ceph nodes"""
+import json
+import logging
+from typing import Set, Dict, cast
+
+
+from .node_interfaces import NodeInfo, IRPCNode
+from .ssh_utils import ConnCreds
+from .common_types import IP
+from .stage import Stage, StepOrder
+from .test_run_class import TestRun
+from .ssh_utils import parse_ssh_uri
+from .node import connect, setup_rpc
+
+
+logger = logging.getLogger("wally.discover")
+
+
+def get_osds_ips(node: IRPCNode, conf: str, key: str) -> Set[IP]:
+    """Get set of osd's ip"""
+
+    data = node.run("ceph -c {} -k {} --format json osd dump".format(conf, key))
+    jdata = json.loads(data)
+    ips = set()  # type: Set[IP]
+    first_error = True
+    for osd_data in jdata["osds"]:
+        if "public_addr" not in osd_data:
+            if first_error:
+                osd_id = osd_data.get("osd", "<OSD_ID_MISSED>")
+                logger.warning("No 'public_addr' field in 'ceph osd dump' output for osd %s" +
+                               "(all subsequent errors omitted)", osd_id)
+                first_error = False
+        else:
+            ip_port = osd_data["public_addr"]
+            if '/' in ip_port:
+                ip_port = ip_port.split("/", 1)[0]
+            ips.add(IP(ip_port.split(":")[0]))
+    return ips
+
+
+def get_mons_ips(node: IRPCNode, conf: str, key: str) -> Set[IP]:
+    """Return mon ip set"""
+
+    data = node.run("ceph -c {} -k {} --format json mon_status".format(conf, key))
+    jdata = json.loads(data)
+    ips = set()  # type: Set[IP]
+    first_error = True
+    for mon_data in jdata["monmap"]["mons"]:
+        if "addr" not in mon_data:
+            if first_error:
+                mon_name = mon_data.get("name", "<MON_NAME_MISSED>")
+                logger.warning("No 'addr' field in 'ceph mon_status' output for mon %s" +
+                               "(all subsequent errors omitted)", mon_name)
+                first_error = False
+        else:
+            ip_port = mon_data["addr"]
+            if '/' in ip_port:
+                ip_port = ip_port.split("/", 1)[0]
+            ips.add(IP(ip_port.split(":")[0]))
+
+    return ips
+
+
+class DiscoverCephStage(Stage):
+    config_block = 'ceph'
+    priority = StepOrder.DISCOVER
+
+    def run(self, ctx: TestRun) -> None:
+        """Return list of ceph's nodes NodeInfo"""
+
+        if 'ceph_nodes' in ctx.storage:
+            ctx.nodes_info.extend(ctx.storage.load_list(NodeInfo, 'ceph_nodes'))
+        else:
+            ceph = ctx.config.ceph
+            root_node_uri = cast(str, ceph.root_node)
+            cluster = ceph.get("cluster", "ceph")
+            conf = ceph.get("conf")
+            key = ceph.get("key")
+            info = NodeInfo(parse_ssh_uri(root_node_uri), set())
+            ceph_nodes = {}  # type: Dict[IP, NodeInfo]
+
+            if conf is None:
+                conf = "/etc/ceph/{}.conf".format(cluster)
+
+            if key is None:
+                key = "/etc/ceph/{}.client.admin.keyring".format(cluster)
+
+            with setup_rpc(connect(info), ctx.rpc_code) as node:
+
+                # new_nodes.extend(ceph.discover_ceph_nodes(ceph_root_conn, cluster=cluster, conf=conf, key=key))
+                ssh_key = node.get_file_content("~/.ssh/id_rsa")
+
+                try:
+                    for ip in get_osds_ips(node, conf, key):
+                        if ip in ceph_nodes:
+                            ceph_nodes[ip].roles.add("ceph-osd")
+                        else:
+                            ceph_nodes[ip] = NodeInfo(ConnCreds(cast(str, ip), user="root", key=ssh_key), {"ceph-osd"})
+                except Exception as exc:
+                    logger.error("OSD discovery failed: %s", exc)
+
+                try:
+                    for ip in get_mons_ips(node, conf, key):
+                        if ip in ceph_nodes:
+                            ceph_nodes[ip].roles.add("ceph-mon")
+                        else:
+                            ceph_nodes[ip] = NodeInfo(ConnCreds(cast(str, ip), user="root", key=ssh_key), {"ceph-mon"})
+                except Exception as exc:
+                    logger.error("MON discovery failed: %s", exc)
+
+            ctx.nodes_info.extend(ceph_nodes.values())
+            ctx.storage['ceph-nodes'] = list(ceph_nodes.values())
diff --git a/wally/config.py b/wally/config.py
index cac4acf..e96a03e 100644
--- a/wally/config.py
+++ b/wally/config.py
@@ -10,13 +10,14 @@
     storage_url = None  # type: str
     comment = None  # type: str
     keep_vm = None  # type: bool
-    no_tests = None  # type: bool
     dont_discover_nodes = None  # type: bool
     build_id = None  # type: str
     build_description = None  # type: str
     build_type = None  # type: str
     default_test_local_folder = None  # type: str
     settings_dir = None  # type: str
+    connect_timeout = 30  # type: int
+    no_tests = False  # type: bool
 
     def __init__(self, dct: ConfigBlock) -> None:
         self.__dict__['_dct'] = dct
diff --git a/wally/discover/__init__.py b/wally/discover/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/wally/discover/__init__.py
+++ /dev/null
diff --git a/wally/discover/ceph.py b/wally/discover/ceph.py
deleted file mode 100644
index 4a72bfb..0000000
--- a/wally/discover/ceph.py
+++ /dev/null
@@ -1,91 +0,0 @@
-""" Collect data about ceph nodes"""
-import json
-import logging
-from typing import List, Set, Dict
-
-
-from ..node_interfaces import NodeInfo, IRPCNode
-from ..ssh_utils import ConnCreds
-from ..common_types import IP
-
-logger = logging.getLogger("wally.discover")
-
-
-def discover_ceph_nodes(node: IRPCNode,
-                        cluster: str = "ceph",
-                        conf: str = None,
-                        key: str = None) -> List[NodeInfo]:
-    """Return list of ceph's nodes NodeInfo"""
-
-    if conf is None:
-        conf = "/etc/ceph/{}.conf".format(cluster)
-
-    if key is None:
-        key = "/etc/ceph/{}.client.admin.keyring".format(cluster)
-
-    try:
-        osd_ips = get_osds_ips(node, conf, key)
-    except Exception as exc:
-        logger.error("OSD discovery failed: %s", exc)
-        osd_ips = set()
-
-    try:
-        mon_ips = get_mons_ips(node, conf, key)
-    except Exception as exc:
-        logger.error("MON discovery failed: %s", exc)
-        mon_ips = set()
-
-    ips = {}  # type: Dict[str, List[str]]
-    for ip in osd_ips:
-        ips.setdefault(ip, []).append("ceph-osd")
-
-    for ip in mon_ips:
-        ips.setdefault(ip, []).append("ceph-mon")
-
-    ssh_key = node.get_file_content("~/.ssh/id_rsa")
-    return [NodeInfo(ConnCreds(host=ip, user="root", key=ssh_key), set(roles)) for ip, roles in ips.items()]
-
-
-def get_osds_ips(node: IRPCNode, conf: str, key: str) -> Set[IP]:
-    """Get set of osd's ip"""
-
-    data = node.run("ceph -c {} -k {} --format json osd dump".format(conf, key))
-    jdata = json.loads(data)
-    ips = set()  # type: Set[IP]
-    first_error = True
-    for osd_data in jdata["osds"]:
-        if "public_addr" not in osd_data:
-            if first_error:
-                osd_id = osd_data.get("osd", "<OSD_ID_MISSED>")
-                logger.warning("No 'public_addr' field in 'ceph osd dump' output for osd %s" +
-                               "(all subsequent errors omitted)", osd_id)
-                first_error = False
-        else:
-            ip_port = osd_data["public_addr"]
-            if '/' in ip_port:
-                ip_port = ip_port.split("/", 1)[0]
-            ips.add(IP(ip_port.split(":")[0]))
-    return ips
-
-
-def get_mons_ips(node: IRPCNode, conf: str, key: str) -> Set[IP]:
-    """Return mon ip set"""
-
-    data = node.run("ceph -c {} -k {} --format json mon_status".format(conf, key))
-    jdata = json.loads(data)
-    ips = set()  # type: Set[IP]
-    first_error = True
-    for mon_data in jdata["monmap"]["mons"]:
-        if "addr" not in mon_data:
-            if first_error:
-                mon_name = mon_data.get("name", "<MON_NAME_MISSED>")
-                logger.warning("No 'addr' field in 'ceph mon_status' output for mon %s" +
-                               "(all subsequent errors omitted)", mon_name)
-                first_error = False
-        else:
-            ip_port = mon_data["addr"]
-            if '/' in ip_port:
-                ip_port = ip_port.split("/", 1)[0]
-            ips.add(IP(ip_port.split(":")[0]))
-
-    return ips
diff --git a/wally/discover/discover.py b/wally/discover/discover.py
deleted file mode 100644
index 35ddc62..0000000
--- a/wally/discover/discover.py
+++ /dev/null
@@ -1,108 +0,0 @@
-import os.path
-import logging
-from typing import Dict, NamedTuple, List, Optional, cast
-
-from paramiko.ssh_exception import AuthenticationException
-
-from . import ceph
-from . import fuel
-from . import openstack
-from ..utils import parse_creds, StopTestError
-from ..config import ConfigBlock
-from ..start_vms import OSCreds
-from ..node_interfaces import NodeInfo
-from ..node import connect, setup_rpc
-from ..ssh_utils import parse_ssh_uri
-from ..test_run_class import TestRun
-
-
-logger = logging.getLogger("wally.discover")
-
-
-openrc_templ = """#!/bin/sh
-export LC_ALL=C
-export OS_NO_CACHE='true'
-export OS_TENANT_NAME='{tenant}'
-export OS_USERNAME='{name}'
-export OS_PASSWORD='{passwd}'
-export OS_AUTH_URL='{auth_url}'
-export OS_INSECURE={insecure}
-export OS_AUTH_STRATEGY='keystone'
-export OS_REGION_NAME='RegionOne'
-export CINDER_ENDPOINT_TYPE='publicURL'
-export GLANCE_ENDPOINT_TYPE='publicURL'
-export KEYSTONE_ENDPOINT_TYPE='publicURL'
-export NOVA_ENDPOINT_TYPE='publicURL'
-export NEUTRON_ENDPOINT_TYPE='publicURL'
-"""
-
-
-DiscoveryResult = NamedTuple("DiscoveryResult", [("os_creds", Optional[OSCreds]), ("nodes", List[NodeInfo])])
-
-
-def discover(ctx: TestRun,
-             discover_list: List[str],
-             clusters_info: ConfigBlock,
-             discover_nodes: bool = True) -> DiscoveryResult:
-    """Discover nodes in clusters"""
-
-    new_nodes = []  # type: List[NodeInfo]
-    os_creds = None  # type: Optional[OSCreds]
-
-    for cluster in discover_list:
-        if cluster == "openstack":
-            if not discover_nodes:
-                logger.warning("Skip openstack cluster discovery")
-                continue
-
-            cluster_info = clusters_info["openstack"]  # type: ConfigBlock
-            new_nodes.extend(openstack.discover_openstack_nodes(ctx.os_connection, cluster_info))
-
-        elif cluster == "fuel" or cluster == "fuel_openrc_only":
-            if cluster == "fuel_openrc_only":
-                discover_nodes = False
-
-            fuel_node_info = NodeInfo(parse_ssh_uri(clusters_info['fuel']['ssh_creds']), {'fuel_master'})
-            try:
-                fuel_rpc_conn = setup_rpc(connect(fuel_node_info), ctx.rpc_code)
-            except AuthenticationException:
-                raise StopTestError("Wrong fuel credentials")
-            except Exception:
-                logger.exception("While connection to FUEL")
-                raise StopTestError("Failed to connect to FUEL")
-
-            # TODO(koder): keep FUEL rpc in context? Maybe open this connection on upper stack level?
-            with fuel_rpc_conn:
-                nodes, fuel_info = fuel.discover_fuel_nodes(
-                    fuel_rpc_conn, ctx.fuel_conn, clusters_info['fuel'], discover_nodes)
-                new_nodes.extend(nodes)
-
-                if fuel_info.openrc:
-                    auth_url = cast(str, fuel_info.openrc['os_auth_url'])
-                    if fuel_info.version >= [8, 0] and auth_url.startswith("https://"):
-                            logger.warning("Fixing FUEL 8.0 AUTH url - replace https://->http://")
-                            auth_url = auth_url.replace("https", "http", 1)
-
-                    os_creds = OSCreds(name=cast(str, fuel_info.openrc['username']),
-                                       passwd=cast(str, fuel_info.openrc['password']),
-                                       tenant=cast(str, fuel_info.openrc['tenant_name']),
-                                       auth_url=cast(str, auth_url),
-                                       insecure=cast(bool, fuel_info.openrc['insecure']))
-
-        elif cluster == "ceph":
-            if discover_nodes:
-                cluster_info = clusters_info["ceph"]
-                root_node_uri = cast(str, cluster_info["root_node"])
-                cluster = clusters_info["ceph"].get("cluster", "ceph")
-                conf = clusters_info["ceph"].get("conf")
-                key = clusters_info["ceph"].get("key")
-                info = NodeInfo(parse_ssh_uri(root_node_uri), set())
-                with setup_rpc(connect(info), ctx.rpc_code) as ceph_root_conn:
-                    new_nodes.extend(ceph.discover_ceph_nodes(ceph_root_conn, cluster=cluster, conf=conf, key=key))
-            else:
-                logger.warning("Skip ceph cluster discovery")
-        else:
-            msg_templ = "Unknown cluster type in 'discover' parameter: {!r}"
-            raise ValueError(msg_templ.format(cluster))
-
-    return DiscoveryResult(os_creds, new_nodes)
diff --git a/wally/discover/fuel.py b/wally/discover/fuel.py
deleted file mode 100644
index 7b4f90f..0000000
--- a/wally/discover/fuel.py
+++ /dev/null
@@ -1,58 +0,0 @@
-import logging
-import socket
-from typing import Dict, Any, Tuple, List, NamedTuple, Union, cast
-from urllib.parse import urlparse
-
-from .. import fuel_rest_api
-from ..node_interfaces import NodeInfo, IRPCNode
-from ..ssh_utils import ConnCreds
-from ..utils import check_input_param
-
-logger = logging.getLogger("wally.discover")
-
-
-FuelNodeInfo = NamedTuple("FuelNodeInfo",
-                          [("version", List[int]),
-                           ("fuel_ext_iface", str),
-                           ("openrc", Dict[str, Union[str, bool]])])
-
-
-def discover_fuel_nodes(fuel_master_node: IRPCNode,
-                        fuel_conn: fuel_rest_api.Connection,
-                        fuel_data: Dict[str, Any],
-                        discover_nodes: bool = True) -> Tuple[List[NodeInfo], FuelNodeInfo]:
-    """Discover nodes in fuel cluster, get openrc for selected cluster"""
-
-    msg = "openstack_env should be provided in fuel config"
-    check_input_param('openstack_env' in fuel_data, msg)
-
-    # get cluster information from REST API
-    cluster_id = fuel_rest_api.get_cluster_id(fuel_conn, fuel_data['openstack_env'])
-    cluster = fuel_rest_api.reflect_cluster(fuel_conn, cluster_id)
-    version = fuel_rest_api.FuelInfo(fuel_conn).get_version()
-
-    if not discover_nodes:
-        logger.warning("Skip fuel cluster discovery")
-        return [], FuelNodeInfo(version, None, cluster.get_openrc())  # type: ignore
-
-    logger.info("Found fuel {0}".format(".".join(map(str, version))))
-
-    # get FUEL master key to connect to cluster nodes via ssh
-    logger.debug("Downloading fuel master key")
-    fuel_key = fuel_master_node.get_file_content('/root/.ssh/id_rsa')
-
-    network = 'fuelweb_admin' if version >= [6, 0] else 'admin'
-    fuel_ip = socket.gethostbyname(fuel_conn.host)
-    fuel_ext_iface = fuel_master_node.get_interface(fuel_ip)
-
-    nodes = []
-    for fuel_node in list(cluster.get_nodes()):
-        ip = str(fuel_node.get_ip(network))
-        creds = ConnCreds(ip, "root", key=fuel_key)
-        nodes.append(NodeInfo(creds, roles=set(fuel_node.get_roles())))
-
-    logger.debug("Found {} fuel nodes for env {}".format(len(nodes), fuel_data['openstack_env']))
-
-    return nodes, FuelNodeInfo(version, fuel_ext_iface,
-                               cast(Dict[str, Union[str, bool]], cluster.get_openrc()))
-
diff --git a/wally/discover/openstack.py b/wally/discover/openstack.py
deleted file mode 100644
index f590359..0000000
--- a/wally/discover/openstack.py
+++ /dev/null
@@ -1,66 +0,0 @@
-import socket
-import logging
-from typing import Dict, Any, List, Optional, cast
-
-from ..node_interfaces import NodeInfo
-from ..config import ConfigBlock
-from ..ssh_utils import ConnCreds
-from ..start_vms import OSConnection, NovaClient
-
-
-logger = logging.getLogger("wally.discover")
-
-
-def get_floating_ip(vm: Any) -> str:
-    """Get VM floating IP address"""
-
-    for net_name, ifaces in vm.addresses.items():
-        for iface in ifaces:
-            if iface.get('OS-EXT-IPS:type') == "floating":
-                return iface['addr']
-
-    raise ValueError("VM {} has no floating ip".format(vm))
-
-
-def discover_vms(client: NovaClient, search_data: str) -> List[NodeInfo]:
-    """Discover virtual machines"""
-    name, user, key_file = search_data.split(",")
-    servers = client.servers.list(search_opts={"name": name})
-    logger.debug("Found %s openstack vms" % len(servers))
-
-    nodes = []  # type: List[NodeInfo]
-    for server in servers:
-        ip = get_floating_ip(server)
-        creds = ConnCreds(host=ip, user=user, key_file=key_file)
-        nodes.append(NodeInfo(creds, roles={"test_vm"}))
-
-    return nodes
-
-
-def discover_openstack_nodes(conn: OSConnection, conf: ConfigBlock) -> List[NodeInfo]:
-    """Discover openstack services for given cluster"""
-    os_nodes_auth = conf['auth']  # type: str
-
-    if os_nodes_auth.count(":") == 2:
-        user, password, key_file = os_nodes_auth.split(":")  # type: str, Optional[str], Optional[str]
-        if not password:
-            password = None
-    else:
-        user, password = os_nodes_auth.split(":")
-        key_file = None
-
-    services = conn.nova.services.list()  # type: List[Any]
-    host_services_mapping = {}  # type: Dict[str, List[str]]
-
-    for service in services:
-        ip = cast(str, socket.gethostbyname(service.host))
-        host_services_mapping.get(ip, []).append(service.binary)
-
-    logger.debug("Found %s openstack service nodes" % len(host_services_mapping))
-
-    nodes = []  # type: List[NodeInfo]
-    for host, services in host_services_mapping.items():
-        creds = ConnCreds(host=host, user=user, passwd=password, key_file=key_file)
-        nodes.append(NodeInfo(creds, set(services)))
-
-    return nodes
diff --git a/wally/fuel.py b/wally/fuel.py
new file mode 100644
index 0000000..040dcf4
--- /dev/null
+++ b/wally/fuel.py
@@ -0,0 +1,105 @@
+import logging
+from typing import Dict, List, NamedTuple, Union, cast
+
+from paramiko.ssh_exception import AuthenticationException
+
+from .fuel_rest_api import get_cluster_id, reflect_cluster, FuelInfo, KeystoneAuth
+from .node_interfaces import NodeInfo
+from .ssh_utils import ConnCreds, parse_ssh_uri
+from .utils import check_input_param, StopTestError, parse_creds
+from .stage import Stage, StepOrder
+from .test_run_class import TestRun
+from .node import connect, setup_rpc
+from .config import ConfigBlock
+from .openstack_api import OSCreds
+
+
+logger = logging.getLogger("wally.discover")
+
+
+FuelNodeInfo = NamedTuple("FuelNodeInfo",
+                          [("version", List[int]),
+                           ("fuel_ext_iface", str),
+                           ("openrc", Dict[str, Union[str, bool]])])
+
+
+
+class DiscoverFuelStage(Stage):
+    """"Fuel nodes discovery, also can get openstack openrc"""
+
+    priority = StepOrder.DISCOVER
+    config_block = 'fuel'
+
+    @classmethod
+    def validate(cls, cfg: ConfigBlock) -> None:
+        # msg = "openstack_env should be provided in fuel config"
+        # check_input_param('openstack_env' in fuel_data, msg)
+        # fuel.openstack_env
+        pass
+
+    def run(self, ctx: TestRun) -> None:
+        if 'fuel' in ctx.storage:
+            ctx.nodes_info.extend(ctx.storage.load_list(NodeInfo, 'fuel/nodes'))
+            ctx.fuel_openstack_creds = ctx.storage['fuel/os_creds']  # type: ignore
+            ctx.fuel_version = ctx.storage['fuel/version']  # type: ignore
+        else:
+            fuel = ctx.config.fuel
+            discover_nodes = (fuel.discover != "fuel_openrc_only")
+            fuel_node_info = NodeInfo(parse_ssh_uri(fuel.ssh_creds), {'fuel_master'})
+            fuel_nodes = [fuel_node_info]
+
+            creds = dict(zip(("user", "passwd", "tenant"), parse_creds(fuel.creds)))
+            fuel_conn = KeystoneAuth(fuel.url, creds)
+
+            # get cluster information from REST API
+            cluster_id = get_cluster_id(fuel_conn, fuel.openstack_env)
+            cluster = reflect_cluster(fuel_conn, cluster_id)
+            ctx.fuel_version = FuelInfo(fuel_conn).get_version()
+            logger.info("Found fuel {0}".format(".".join(map(str, ctx.fuel_version))))
+            openrc = cluster.get_openrc()
+
+            if openrc:
+                auth_url = cast(str, openrc['os_auth_url'])
+                if ctx.fuel_version >= [8, 0] and auth_url.startswith("https://"):
+                    logger.warning("Fixing FUEL 8.0 AUTH url - replace https://->http://")
+                    auth_url = auth_url.replace("https", "http", 1)
+
+                os_creds = OSCreds(name=cast(str, openrc['username']),
+                                   passwd=cast(str, openrc['password']),
+                                   tenant=cast(str, openrc['tenant_name']),
+                                   auth_url=cast(str, auth_url),
+                                   insecure=cast(bool, openrc['insecure']))
+
+                ctx.fuel_openstack_creds = os_creds
+            else:
+                ctx.fuel_openstack_creds = None
+
+            if discover_nodes:
+
+                try:
+                    fuel_rpc = setup_rpc(connect(fuel_node_info), ctx.rpc_code)
+                except AuthenticationException:
+                    raise StopTestError("Wrong fuel credentials")
+                except Exception:
+                    logger.exception("While connection to FUEL")
+                    raise StopTestError("Failed to connect to FUEL")
+
+                logger.debug("Downloading FUEL node ssh master key")
+                fuel_key = fuel_rpc.get_file_content('/root/.ssh/id_rsa')
+                network = 'fuelweb_admin' if ctx.fuel_version >= [6, 0] else 'admin'
+
+                for fuel_node in list(cluster.get_nodes()):
+                    ip = str(fuel_node.get_ip(network))
+                    fuel_nodes.append(NodeInfo(ConnCreds(ip, "root", key=fuel_key),
+                                               roles=set(fuel_node.get_roles())))
+
+                ctx.storage['fuel_nodes'] = fuel_nodes
+                ctx.nodes_info.extend(fuel_nodes)
+                ctx.nodes_info.append(fuel_node_info)
+                logger.debug("Found {} FUEL nodes for env {}".format(len(fuel_nodes) - 1, fuel.openstack_env))
+            else:
+                logger.debug("Skip FUEL nodes  discovery, as 'fuel_openrc_only' is set to fuel.discover option")
+
+            ctx.storage["fuel/nodes"] = fuel_nodes
+            ctx.storage["fuel/os_creds"] = ctx.fuel_openstack_creds
+            ctx.storage["fuel/version"] = ctx.fuel_version
diff --git a/wally/main.py b/wally/main.py
index 3e1fcb3..14da140 100644
--- a/wally/main.py
+++ b/wally/main.py
@@ -5,7 +5,8 @@
 import logging
 import argparse
 import functools
-from typing import List, Tuple, Any, Callable, IO, cast, Optional
+import contextlib
+from typing import List, Tuple, Any, Callable, IO, cast, Optional, Iterator
 from yaml import load as _yaml_load
 
 
@@ -32,13 +33,33 @@
 from .storage import make_storage, Storage
 from .config import Config
 from .logger import setup_loggers
-from .stage import log_stage, StageType
+from .stage import Stage
 from .test_run_class import TestRun
 
 
+# stages
+from .ceph import DiscoverCephStage
+from .openstack import DiscoverOSStage
+from .fuel import DiscoverFuelStage
+from .run_test import CollectInfoStage, ExplicitNodesStage, SaveNodesStage, RunTestsStage
+from .report import ConsoleReportStage, HtmlReportStage
+from .sensors import StartSensorsStage, CollectSensorsStage
+
+
 logger = logging.getLogger("wally")
 
 
+@contextlib.contextmanager
+def log_stage(stage: Stage) -> Iterator[None]:
+    logger.info("Start " + stage.name())
+    try:
+        yield
+    except utils.StopTestError as exc:
+        logger.error("Exception during %s: %r", stage.name(), exc)
+    except Exception:
+        logger.exception("During %s", stage.name())
+
+
 def list_results(path: str) -> List[Tuple[str, str, str, str]]:
     results = []  # type: List[Tuple[float, str, str, str, str]]
 
@@ -97,6 +118,7 @@
     test_parser.add_argument('--build-description', type=str, default="Build info")
     test_parser.add_argument('--build-id', type=str, default="id")
     test_parser.add_argument('--build-type', type=str, default="GA")
+    test_parser.add_argument('--dont-collect', action='store_true', help="Don't collect cluster info")
     test_parser.add_argument('-n', '--no-tests', action='store_true', help="Don't run tests")
     test_parser.add_argument('--load-report', action='store_true')
     test_parser.add_argument("-k", '--keep-vm', action='store_true', help="Don't remove test vm's")
@@ -131,8 +153,7 @@
 
     opts = parse_args(argv)
 
-    stages = []  # type: List[StageType]
-    report_stages = []  # type: List[StageType]
+    stages = []  # type: List[Stage]
 
     # stop mypy from telling that config & storage might be undeclared
     config = None  # type: Config
@@ -141,7 +162,7 @@
     if opts.subparser_name == 'test':
         if opts.resume:
             storage = make_storage(opts.resume, existing=True)
-            config = storage.load('config', Config)
+            config = storage.load(Config, 'config')
         else:
             file_name = os.path.abspath(opts.config_file)
             with open(file_name) as fd:
@@ -161,20 +182,18 @@
 
             storage['config'] = config  # type: ignore
 
-        stages.extend([
-            run_test.clouds_connect_stage,
-            run_test.discover_stage,
-            run_test.reuse_vms_stage,
-            log_nodes_statistic_stage,
-            run_test.save_nodes_stage,
-            run_test.connect_stage])
 
-        if config.get("collect_info", True):
-            stages.append(run_test.collect_info_stage)
+        stages.append(DiscoverCephStage)  # type: ignore
+        stages.append(DiscoverOSStage)  # type: ignore
+        stages.append(DiscoverFuelStage)  # type: ignore
+        stages.append(ExplicitNodesStage)  # type: ignore
+        stages.append(SaveNodesStage)  # type: ignore
+        stages.append(StartSensorsStage)  # type: ignore
+        stages.append(RunTestsStage)  # type: ignore
+        stages.append(CollectSensorsStage)  # type: ignore
 
-        stages.extend([
-            run_test.run_tests_stage,
-        ])
+        if not opts.dont_collect:
+            stages.append(CollectInfoStage)   # type: ignore
 
     elif opts.subparser_name == 'ls':
         tab = texttable.Texttable(max_width=200)
@@ -196,9 +215,10 @@
         #     [x['io'][0], y['io'][0]]))
         return 0
 
+    report_stages = []  # type: List[Stage]
     if not getattr(opts, "no_report", False):
-        report_stages.append(run_test.console_report_stage)
-        report_stages.append(run_test.html_report_stage)
+        report_stages.append(ConsoleReportStage)   # type: ignore
+        report_stages.append(HtmlReportStage)   # type: ignore
 
     # log level is not a part of config
     if opts.log_level is not None:
@@ -206,39 +226,44 @@
     else:
         str_level = config.get('logging/log_level', 'INFO')
 
-    setup_loggers(getattr(logging, str_level), log_fd=storage.get_stream('log'))
+    setup_loggers(getattr(logging, str_level), log_fd=storage.get_stream('log', "w"))
     logger.info("All info would be stored into %r", config.storage_url)
 
     ctx = TestRun(config, storage)
 
+    stages.sort(key=lambda x: x.priority)
+
+    # TODO: run only stages, which have configs
+    failed = False
+    cleanup_stages = []
     for stage in stages:
-        ok = False
-        with log_stage(stage):
-            stage(ctx)
-            ok = True
-        if not ok:
+        try:
+            cleanup_stages.append(stage)
+            with log_stage(stage):
+                stage.run(ctx)
+        except:
+            failed = True
             break
 
-    exc, cls, tb = sys.exc_info()
-    for stage in ctx.clear_calls_stack[::-1]:
-        with log_stage(stage):
-            stage(ctx)
+    logger.debug("Start cleanup")
+    cleanup_failed = False
+    for stage in cleanup_stages[::-1]:
+        try:
+            with log_stage(stage):
+                stage.cleanup(ctx)
+        except:
+            cleanup_failed = True
 
-    logger.debug("Start utils.cleanup")
-    for clean_func, args, kwargs in utils.iter_clean_func():
-        with log_stage(clean_func):
-            clean_func(*args, **kwargs)
-
-    if exc is None:
+    if not failed:
         for report_stage in report_stages:
             with log_stage(report_stage):
-                report_stage(ctx)
+                report_stage.run(ctx)
 
     logger.info("All info is stored into %r", config.storage_url)
 
-    if exc is None:
-        logger.info("Tests finished successfully")
-        return 0
-    else:
+    if failed or cleanup_failed:
         logger.error("Tests are failed. See error details in log above")
         return 1
+    else:
+        logger.info("Tests finished successfully")
+        return 0
diff --git a/wally/node.py b/wally/node.py
index 2b58571..fae7879 100644
--- a/wally/node.py
+++ b/wally/node.py
@@ -133,22 +133,22 @@
         raise NotImplementedError()
 
     def run(self, cmd: str, timeout: int = 60, nolog: bool = False) -> str:
-        raise NotImplemented()
+        raise NotImplementedError()
 
     def copy_file(self, local_path: str, remote_path: str = None) -> str:
-        raise NotImplemented()
+        raise NotImplementedError()
 
     def put_to_file(self, path: str, content: bytes) -> None:
-        raise NotImplemented()
+        raise NotImplementedError()
 
     def get_interface(self, ip: str) -> str:
-        raise NotImplemented()
+        raise NotImplementedError()
 
     def stat_file(self, path: str) -> Any:
-        raise NotImplemented()
+        raise NotImplementedError()
 
     def disconnect(self) -> str:
-        raise NotImplemented()
+        raise NotImplementedError()
 
 
 def setup_rpc(node: ISSHHost, rpc_server_code: bytes, port: int = 0) -> IRPCNode:
diff --git a/wally/openstack.py b/wally/openstack.py
new file mode 100644
index 0000000..cff6150
--- /dev/null
+++ b/wally/openstack.py
@@ -0,0 +1,256 @@
+import os.path
+import socket
+import logging
+from typing import Dict, Any, List, Tuple, cast, Optional
+
+from .node_interfaces import NodeInfo
+from .config import ConfigBlock, Config
+from .ssh_utils import ConnCreds
+from .openstack_api import (os_connect, find_vms,
+                            OSCreds, get_openstack_credentials, prepare_os, launch_vms, clear_nodes)
+from .test_run_class import TestRun
+from .stage import Stage, StepOrder
+from .utils import LogError, StopTestError, get_creds_openrc
+
+
+logger = logging.getLogger("wally.discover")
+
+
+def get_floating_ip(vm: Any) -> str:
+    """Get VM floating IP address"""
+
+    for net_name, ifaces in vm.addresses.items():
+        for iface in ifaces:
+            if iface.get('OS-EXT-IPS:type') == "floating":
+                return iface['addr']
+
+    raise ValueError("VM {} has no floating ip".format(vm))
+
+
+def ensure_connected_to_openstack(ctx: TestRun) -> None:
+    if not ctx.os_connection is None:
+        if ctx.os_creds is None:
+            ctx.os_creds = get_OS_credentials(ctx)
+        ctx.os_connection = os_connect(ctx.os_creds)
+
+
+def get_OS_credentials(ctx: TestRun) -> OSCreds:
+    if "openstack_openrc" in ctx.storage:
+        return ctx.storage.load(OSCreds, "openstack_openrc")
+
+    creds = None
+    os_creds = None
+    force_insecure = False
+    cfg = ctx.config
+
+    if 'openstack' in cfg.clouds:
+        os_cfg = cfg.clouds['openstack']
+        if 'OPENRC' in os_cfg:
+            logger.info("Using OS credentials from " + os_cfg['OPENRC'])
+            creds_tuple = get_creds_openrc(os_cfg['OPENRC'])
+            os_creds = OSCreds(*creds_tuple)
+        elif 'ENV' in os_cfg:
+            logger.info("Using OS credentials from shell environment")
+            os_creds = get_openstack_credentials()
+        elif 'OS_TENANT_NAME' in os_cfg:
+            logger.info("Using predefined credentials")
+            os_creds = OSCreds(os_cfg['OS_USERNAME'].strip(),
+                               os_cfg['OS_PASSWORD'].strip(),
+                               os_cfg['OS_TENANT_NAME'].strip(),
+                               os_cfg['OS_AUTH_URL'].strip(),
+                               os_cfg.get('OS_INSECURE', False))
+
+        elif 'OS_INSECURE' in os_cfg:
+            force_insecure = os_cfg.get('OS_INSECURE', False)
+
+    if os_creds is None and 'fuel' in cfg.clouds and 'openstack_env' in cfg.clouds['fuel'] and \
+            ctx.fuel_openstack_creds is not None:
+        logger.info("Using fuel creds")
+        creds = ctx.fuel_openstack_creds
+    elif os_creds is None:
+        logger.error("Can't found OS credentials")
+        raise StopTestError("Can't found OS credentials", None)
+
+    if creds is None:
+        creds = os_creds
+
+    if force_insecure and not creds.insecure:
+        creds = OSCreds(creds.name, creds.passwd, creds.tenant, creds.auth_url, True)
+
+    logger.debug(("OS_CREDS: user={0.name} tenant={0.tenant} " +
+                  "auth_url={0.auth_url} insecure={0.insecure}").format(creds))
+
+    ctx.storage["openstack_openrc"] = creds  # type: ignore
+    return creds
+
+
+def get_vm_keypair_path(cfg: Config) -> Tuple[str, str]:
+    key_name = cfg.vm_configs['keypair_name']
+    private_path = os.path.join(cfg.settings_dir, key_name + "_private.pem")
+    public_path = os.path.join(cfg.settings_dir, key_name + "_public.pub")
+    return (private_path, public_path)
+
+
+class DiscoverOSStage(Stage):
+    """Discover openstack nodes and VMS"""
+
+    config_block = 'openstack'
+
+    # discover FUEL cluster first
+    priority = StepOrder.DISCOVER + 1
+
+    @classmethod
+    def validate(cls, conf: ConfigBlock) -> None:
+        pass
+
+    def run(self, ctx: TestRun) -> None:
+        cfg = ctx.config.openstack
+        os_nodes_auth = cfg.auth  # type: str
+
+        if os_nodes_auth.count(":") == 2:
+            user, password, key_file = os_nodes_auth.split(":")  # type: str, Optional[str], Optional[str]
+            if not password:
+                password = None
+        else:
+            user, password = os_nodes_auth.split(":")
+            key_file = None
+
+        ensure_connected_to_openstack(ctx)
+
+        if 'openstack_nodes' in ctx.storage:
+            ctx.nodes_info.extend(ctx.storage.load_list(NodeInfo, "openstack_nodes"))
+        else:
+            openstack_nodes = []  # type: List[NodeInfo]
+            services = ctx.os_connection.nova.services.list()  # type: List[Any]
+            host_services_mapping = {}  # type: Dict[str, List[str]]
+
+            for service in services:
+                ip = cast(str, socket.gethostbyname(service.host))
+                host_services_mapping.get(ip, []).append(service.binary)
+
+            logger.debug("Found %s openstack service nodes" % len(host_services_mapping))
+
+            for host, services in host_services_mapping.items():
+                creds = ConnCreds(host=host, user=user, passwd=password, key_file=key_file)
+                openstack_nodes.append(NodeInfo(creds, set(services)))
+
+            ctx.nodes_info.extend(openstack_nodes)
+            ctx.storage['openstack_nodes'] = openstack_nodes  # type: ignore
+
+        if "reused_os_nodes" in ctx.storage:
+            ctx.nodes_info.extend(ctx.storage.load_list(NodeInfo, "reused_nodes"))
+        else:
+            reused_nodes = []  # type: List[NodeInfo]
+            private_key_path = get_vm_keypair_path(ctx.config)[0]
+
+            vm_creds = None  # type: str
+            for vm_creds in cfg.get("vms", []):
+                user_name, vm_name_pattern = vm_creds.split("@", 1)
+                msg = "Vm like {} lookup failed".format(vm_name_pattern)
+
+                with LogError(msg):
+                    msg = "Looking for vm with name like {0}".format(vm_name_pattern)
+                    logger.debug(msg)
+
+                    ensure_connected_to_openstack(ctx)
+
+                    for ip, vm_id in find_vms(ctx.os_connection, vm_name_pattern):
+                        creds = ConnCreds(host=ip, user=user_name, key_file=private_key_path)
+                        node_info = NodeInfo(creds, {'testnode'})
+                        node_info.os_vm_id = vm_id
+                        reused_nodes.append(node_info)
+
+            ctx.nodes_info.extend(reused_nodes)
+            ctx.storage["reused_os_nodes"] = reused_nodes  # type: ignore
+
+
+class CreateOSVMSStage(Stage):
+    "Spawn new VM's in Openstack cluster"
+
+    priority = StepOrder.SPAWN  # type: int
+    config_block = 'spawn_os_vms'  # type: str
+
+    def run(self, ctx: TestRun) -> None:
+        vm_spawn_config = ctx.config.spawn_os_vms
+        vm_image_config = ctx.config.vm_configs[vm_spawn_config.cfg_name]
+
+        if 'spawned_os_nodes' in ctx.storage:
+            ctx.nodes_info.extend(ctx.storage.load_list(NodeInfo, "spawned_os_nodes"))
+        else:
+            ensure_connected_to_openstack(ctx)
+            params = vm_image_config.copy()
+            params.update(vm_spawn_config)
+            params.update(get_vm_keypair_path(ctx.config))
+            params['group_name'] = ctx.config.run_uuid
+            params['keypair_name'] = ctx.config.vm_configs['keypair_name']
+
+            if not ctx.config.openstack.get("skip_preparation", False):
+                logger.info("Preparing openstack")
+                prepare_os(ctx.os_connection, params)
+
+            new_nodes = []
+            ctx.os_spawned_nodes_ids = []
+            with ctx.get_pool() as pool:
+                for node_info in launch_vms(ctx.os_connection, params, pool):
+                    node_info.roles.add('testnode')
+                    ctx.os_spawned_nodes_ids.append(node_info.os_vm_id)
+                    new_nodes.append(node_info)
+
+            ctx.storage['spawned_os_nodes'] = new_nodes  # type: ignore
+
+    def cleanup(self, ctx: TestRun) -> None:
+        # keep nodes in case of error for future test restart
+        if not ctx.config.keep_vm and ctx.os_spawned_nodes_ids:
+            logger.info("Removing nodes")
+
+            clear_nodes(ctx.os_connection, ctx.os_spawned_nodes_ids)
+            del ctx.storage['spawned_os_nodes']
+
+            logger.info("Nodes has been removed")
+
+
+
+# @contextlib.contextmanager
+# def suspend_vm_nodes_ctx(ctx: TestRun, unused_nodes: List[IRPCNode]) -> Iterator[List[int]]:
+#
+#     pausable_nodes_ids = [cast(int, node.info.os_vm_id)
+#                           for node in unused_nodes
+#                           if node.info.os_vm_id is not None]
+#
+#     non_pausable = len(unused_nodes) - len(pausable_nodes_ids)
+#
+#     if non_pausable:
+#         logger.warning("Can't pause {} nodes".format(non_pausable))
+#
+#     if pausable_nodes_ids:
+#         logger.debug("Try to pause {} unused nodes".format(len(pausable_nodes_ids)))
+#         with ctx.get_pool() as pool:
+#             openstack_api.pause(ctx.os_connection, pausable_nodes_ids, pool)
+#
+#     try:
+#         yield pausable_nodes_ids
+#     finally:
+#         if pausable_nodes_ids:
+#             logger.debug("Unpausing {} nodes".format(len(pausable_nodes_ids)))
+#             with ctx.get_pool() as pool:
+#                 openstack_api.unpause(ctx.os_connection, pausable_nodes_ids, pool)
+# def clouds_connect_stage(ctx: TestRun) -> None:
+    # TODO(koder): need to use this to connect to openstack in upper code
+    # conn = ctx.config['clouds/openstack']
+    # user, passwd, tenant = parse_creds(conn['creds'])
+    # auth_data = dict(auth_url=conn['auth_url'],
+    #                  username=user,
+    #                  api_key=passwd,
+    #                  project_id=tenant)  # type: Dict[str, str]
+    # logger.debug("Discovering openstack nodes with connection details: %r", conn)
+    # connect to openstack, fuel
+
+    # # parse FUEL REST credentials
+    # username, tenant_name, password = parse_creds(fuel_data['creds'])
+    # creds = {"username": username,
+    #          "tenant_name": tenant_name,
+    #          "password": password}
+    #
+    # # connect to FUEL
+    # conn = fuel_rest_api.KeystoneAuth(fuel_data['url'], creds, headers=None)
+    # pass
\ No newline at end of file
diff --git a/wally/start_vms.py b/wally/openstack_api.py
similarity index 97%
rename from wally/start_vms.py
rename to wally/openstack_api.py
index a55fdbf..2e9ab63 100644
--- a/wally/start_vms.py
+++ b/wally/openstack_api.py
@@ -7,7 +7,7 @@
 import tempfile
 import subprocess
 import urllib.request
-from typing import Dict, Any, Iterable, Iterator, NamedTuple, Optional, List, Tuple
+from typing import Dict, Any, Iterable, Iterator, NamedTuple, Optional, List, Tuple, Set
 from concurrent.futures import ThreadPoolExecutor
 
 from keystoneauth1 import loading, session
@@ -19,6 +19,7 @@
 from .utils import Timeout
 from .node_interfaces import NodeInfo
 from .storage import IStorable
+from .ssh_utils import ConnCreds
 
 
 __doc__ = """
@@ -81,7 +82,6 @@
 def find_vms(conn: OSConnection, name_prefix: str) -> Iterable[Tuple[str, int]]:
     for srv in conn.nova.servers.list():
         if srv.name.startswith(name_prefix):
-
             # need to exit after found server first external IP
             # so have to rollout two cycles to avoid using exceptions
             all_ip = []  # type: List[Any]
@@ -451,11 +451,10 @@
 
     # precache all errors before start creating vms
     private_key_path = params['keypair_file_private']
-    creds = params['image']['creds']
+    user = params['image']['user']
 
     for ip, os_node in create_vms_mt(conn, count, executor, **vm_params):
-        conn_uri = creds.format(ip=ip, private_key_path=private_key_path)
-        info = NodeInfo(conn_uri, set())
+        info = NodeInfo(ConnCreds(ip, user, key_file=private_key_path), set())
         info.os_vm_id = os_node.id
         yield info
 
@@ -574,19 +573,18 @@
                                        scheduler_hints=scheduler_hints, security_groups=security_groups)
 
         if not wait_for_server_active(conn, srv):
-            msg = "Server {0} fails to start. Kill it and try again"
-            logger.debug(msg.format(srv))
+            logger.debug("Server {} fails to start. Kill it and try again".format(srv))
             conn.nova.servers.delete(srv)
 
             try:
-                for _ in Timeout(delete_timeout, "Server {0} delete timeout".format(srv.id)):
+                for _ in Timeout(delete_timeout, "Server {} delete timeout".format(srv.id)):
                     srv = conn.nova.servers.get(srv.id)
             except NotFound:
                 pass
         else:
             break
     else:
-        raise RuntimeError("Failed to start server".format(srv.id))
+        raise RuntimeError("Failed to start server {}".format(srv.id))
 
     if vol_sz is not None:
         vol = create_volume(conn, vol_sz, name)
@@ -598,6 +596,7 @@
     if flt_ip is not None:
         srv.add_floating_ip(flt_ip)
 
+    # pylint: disable=E1101
     return flt_ip.ip, conn.nova.servers.get(srv.id)
 
 
diff --git a/wally/ops_log.py b/wally/ops_log.py
new file mode 100644
index 0000000..48f2b61
--- /dev/null
+++ b/wally/ops_log.py
@@ -0,0 +1,9 @@
+from typing import Any
+
+
+log = []
+
+
+def log_op(name: str, *params: Any) -> None:
+    log.append([name] + list(params))
+
diff --git a/wally/report.py b/wally/report.py
index 88c97b7..ecf3ba7 100644
--- a/wally/report.py
+++ b/wally/report.py
@@ -20,6 +20,8 @@
 from .utils import ssize2b
 from .statistic import round_3_digit
 from .storage import Storage
+from .stage import Stage, StepOrder
+from .test_run_class import TestRun
 from .result_classes import TestInfo, FullTestResult, SensorInfo
 from .suits.io.fio_task_parser import (get_test_sync_mode,
                                        get_test_summary,
@@ -33,29 +35,46 @@
 def load_test_results(storage: Storage) -> Iterator[FullTestResult]:
     sensors_data = {}  # type: Dict[Tuple[str, str, str], SensorInfo]
 
-    for _, node_id in storage.list("metric"):
-        for _, dev_name in storage.list("metric", node_id):
-            for _, sensor_name in storage.list("metric", node_id, dev_name):
+    mstorage = storage.sub_storage("metric")
+    for _, node_id in mstorage.list():
+        for _, dev_name in mstorage.list(node_id):
+            for _, sensor_name in mstorage.list(node_id, dev_name):
                 key = (node_id, dev_name, sensor_name)
                 si = SensorInfo(*key)
-                si.begin_time, si.end_time, si.data = storage["metric/{}/{}/{}".format(*key)]  # type: ignore
+                si.begin_time, si.end_time, si.data = storage[node_id, dev_name, sensor_name]  # type: ignore
                 sensors_data[key] = si
 
-    for _, run_id in storage.list("result"):
-        path = "result/" + run_id
+    rstorage = storage.sub_storage("result")
+    for _, run_id in rstorage.list():
         ftr = FullTestResult()
-        ftr.info = storage.load(TestInfo, path, "info")
+        ftr.test_info = rstorage.load(TestInfo, run_id, "info")
         ftr.performance_data = {}
 
-        p1 = "result/{}/measurement".format(run_id)
-        for _, node_id in storage.list(p1):
-            for _, measurement_name in storage.list(p1, node_id):
+        p1 = "{}/measurement".format(run_id)
+        for _, node_id in rstorage.list(p1):
+            for _, measurement_name in rstorage.list(p1, node_id):
                 perf_key = (node_id, measurement_name)
-                ftr.performance_data[perf_key] = storage["{}/{}/{}".format(p1, *perf_key)]  # type: ignore
+                ftr.performance_data[perf_key] = rstorage["{}/{}/{}".format(p1, *perf_key)]  # type: ignore
 
         yield ftr
 
 
+class ConsoleReportStage(Stage):
+
+    priority = StepOrder.REPORT
+
+    def run(self, ctx: TestRun) -> None:
+        # TODO(koder): load data from storage
+        raise NotImplementedError("...")
+
+class HtmlReportStage(Stage):
+
+    priority = StepOrder.REPORT
+
+    def run(self, ctx: TestRun) -> None:
+        # TODO(koder): load data from storage
+        raise NotImplementedError("...")
+
 # class StoragePerfInfo:
 #     def __init__(self, name: str, summary: Any, params, testnodes_count) -> None:
 #         self.direct_iops_r_max = 0  # type: int
diff --git a/wally/run_test.py b/wally/run_test.py
index 9ae2c9e..1a645b6 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -1,21 +1,18 @@
-import os
 import logging
-import contextlib
-from typing import List, Dict, Iterable, Iterator, Tuple, Optional, Union, cast
-from concurrent.futures import ThreadPoolExecutor, Future
+from concurrent.futures import Future
+from typing import List, Dict, Tuple, Optional, Union, cast
 
-from .node_interfaces import NodeInfo, IRPCNode
-from .test_run_class import TestRun
-from .discover import discover
-from . import pretty_yaml, utils, report, ssh_utils, start_vms, hw_info
+from . import utils, ssh_utils, hw_info
+from .config import ConfigBlock
 from .node import setup_rpc, connect
-from .config import ConfigBlock, Config
-
-from .suits.mysql import MysqlTest
-from .suits.itest import TestInputConfig
+from .node_interfaces import NodeInfo, IRPCNode
+from .stage import Stage, StepOrder
 from .suits.io.fio import IOPerfTest
-from .suits.postgres import PgBenchTest
+from .suits.itest import TestInputConfig
+from .suits.mysql import MysqlTest
 from .suits.omgbench import OmgTest
+from .suits.postgres import PgBenchTest
+from .test_run_class import TestRun
 
 
 TOOL_TYPE_MAPPER = {
@@ -29,431 +26,149 @@
 logger = logging.getLogger("wally")
 
 
-def connect_all(nodes_info: List[NodeInfo], pool: ThreadPoolExecutor, conn_timeout: int = 30) -> List[IRPCNode]:
-    """Connect to all nodes, log errors"""
+class ConnectStage(Stage):
+    """Connect to nodes stage"""
 
-    logger.info("Connecting to %s nodes", len(nodes_info))
+    priority = StepOrder.CONNECT
 
-    def connect_ext(node_info: NodeInfo) -> Tuple[bool, Union[IRPCNode, NodeInfo]]:
-        try:
-            ssh_node = connect(node_info, conn_timeout=conn_timeout)
-            # TODO(koder): need to pass all required rpc bytes to this call
-            return True, setup_rpc(ssh_node, b"")
-        except Exception as exc:
-            logger.error("During connect to {}: {!s}".format(node, exc))
-            return False, node_info
-
-    failed_testnodes = []  # type: List[NodeInfo]
-    failed_nodes = []  # type: List[NodeInfo]
-    ready = []  # type: List[IRPCNode]
-
-    for ok, node in pool.map(connect_ext, nodes_info):
-        if not ok:
-            node = cast(NodeInfo, node)
-            if 'testnode' in node.roles:
-                failed_testnodes.append(node)
-            else:
-                failed_nodes.append(node)
-        else:
-            ready.append(cast(IRPCNode, node))
-
-    if failed_nodes:
-        msg = "Node(s) {} would be excluded - can't connect"
-        logger.warning(msg.format(",".join(map(str, failed_nodes))))
-
-    if failed_testnodes:
-        msg = "Can't connect to testnode(s) " + \
-              ",".join(map(str, failed_testnodes))
-        logger.error(msg)
-        raise utils.StopTestError(msg)
-
-    if not failed_nodes:
-        logger.info("All nodes connected successfully")
-
-    return ready
-
-
-def collect_info_stage(ctx: TestRun) -> None:
-    futures = {}  # type: Dict[str, Future]
-
-    with ctx.get_pool() as pool:
-        for node in ctx.nodes:
-            hw_info_path = "hw_info/{}".format(node.info.node_id())
-            if hw_info_path not in ctx.storage:
-                futures[hw_info_path] = pool.submit(hw_info.get_hw_info, node), node
-
-            sw_info_path = "sw_info/{}".format(node.info.node_id())
-            if sw_info_path not in ctx.storage:
-                futures[sw_info_path] = pool.submit(hw_info.get_sw_info, node)
-
-        for path, future in futures.items():
-            ctx.storage[path] = future.result()
-
-
-@contextlib.contextmanager
-def suspend_vm_nodes_ctx(ctx: TestRun, unused_nodes: List[IRPCNode]) -> Iterator[List[int]]:
-
-    pausable_nodes_ids = [cast(int, node.info.os_vm_id)
-                          for node in unused_nodes
-                          if node.info.os_vm_id is not None]
-
-    non_pausable = len(unused_nodes) - len(pausable_nodes_ids)
-
-    if non_pausable:
-        logger.warning("Can't pause {} nodes".format(non_pausable))
-
-    if pausable_nodes_ids:
-        logger.debug("Try to pause {} unused nodes".format(len(pausable_nodes_ids)))
+    def run(self, ctx: TestRun) -> None:
         with ctx.get_pool() as pool:
-            start_vms.pause(ctx.os_connection, pausable_nodes_ids, pool)
+            logger.info("Connecting to %s nodes", len(ctx.nodes_info))
 
-    try:
-        yield pausable_nodes_ids
-    finally:
-        if pausable_nodes_ids:
-            logger.debug("Unpausing {} nodes".format(len(pausable_nodes_ids)))
-            with ctx.get_pool() as pool:
-                start_vms.unpause(ctx.os_connection, pausable_nodes_ids, pool)
+            def connect_ext(node_info: NodeInfo) -> Tuple[bool, Union[IRPCNode, NodeInfo]]:
+                try:
+                    ssh_node = connect(node_info, conn_timeout=ctx.config.connect_timeout)
+                    # TODO(koder): need to pass all required rpc bytes to this call
+                    return True, setup_rpc(ssh_node, b"")
+                except Exception as exc:
+                    logger.error("During connect to {}: {!s}".format(node, exc))
+                    return False, node_info
 
+            failed_testnodes = []  # type: List[NodeInfo]
+            failed_nodes = []  # type: List[NodeInfo]
+            ctx.nodes = []
 
-def run_tests(ctx: TestRun, test_block: ConfigBlock, nodes: List[IRPCNode]) -> None:
-    """Run test from test block"""
+            for ok, node in pool.map(connect_ext, ctx.nodes_info):
+                if not ok:
+                    node = cast(NodeInfo, node)
+                    if 'testnode' in node.roles:
+                        failed_testnodes.append(node)
+                    else:
+                        failed_nodes.append(node)
+                else:
+                    ctx.nodes.append(cast(IRPCNode, node))
 
-    test_nodes = [node for node in nodes if 'testnode' in node.info.roles]
+            if failed_nodes:
+                msg = "Node(s) {} would be excluded - can't connect"
+                logger.warning(msg.format(",".join(map(str, failed_nodes))))
 
-    if not test_nodes:
-        logger.error("No test nodes found")
-        return
-
-    for name, params in test_block.items():
-        vm_count = params.get('node_limit', None)  # type: Optional[int]
-
-        # select test nodes
-        if vm_count is None:
-            curr_test_nodes = test_nodes
-            unused_nodes = []  # type: List[IRPCNode]
-        else:
-            curr_test_nodes = test_nodes[:vm_count]
-            unused_nodes = test_nodes[vm_count:]
-
-        if not curr_test_nodes:
-            logger.error("No nodes found for test, skipping it.")
-            continue
-
-        # results_path = generate_result_dir_name(cfg.results_storage, name, params)
-        # utils.mkdirs_if_unxists(results_path)
-
-        # suspend all unused virtual nodes
-        if ctx.config.get('suspend_unused_vms', True):
-            suspend_ctx = suspend_vm_nodes_ctx(ctx, unused_nodes)
-        else:
-            suspend_ctx = utils.empty_ctx()
-
-        resumable_nodes_ids = [cast(int, node.info.os_vm_id)
-                               for node in curr_test_nodes
-                               if node.info.os_vm_id is not None]
-
-        if resumable_nodes_ids:
-            logger.debug("Check and unpause {} nodes".format(len(resumable_nodes_ids)))
-
-            with ctx.get_pool() as pool:
-                start_vms.unpause(ctx.os_connection, resumable_nodes_ids, pool)
-
-        with suspend_ctx:
-            test_cls = TOOL_TYPE_MAPPER[name]
-            remote_dir = ctx.config.default_test_local_folder.format(name=name, uuid=ctx.config.run_uuid)
-            test_cfg = TestInputConfig(test_cls.__name__,
-                                       params=params,
-                                       run_uuid=ctx.config.run_uuid,
-                                       nodes=test_nodes,
-                                       storage=ctx.storage,
-                                       remote_dir=remote_dir)
-
-            test_cls(test_cfg).run()
-
-
-def connect_stage(ctx: TestRun) -> None:
-    ctx.clear_calls_stack.append(disconnect_stage)
-
-    with ctx.get_pool() as pool:
-        ctx.nodes = connect_all(ctx.nodes_info, pool)
-
-
-def discover_stage(ctx: TestRun) -> None:
-    """discover clusters and nodes stage"""
-
-    # TODO(koder): Properly store discovery info and check if it available to skip phase
-
-    discover_info = ctx.config.get('discover')
-    if discover_info:
-        if "discovered_nodes" in ctx.storage:
-            nodes = ctx.storage.load_list("discovered_nodes", NodeInfo)
-            ctx.fuel_openstack_creds = ctx.storage.load("fuel_openstack_creds", start_vms.OSCreds)
-        else:
-            discover_objs = [i.strip() for i in discover_info.strip().split(",")]
-
-            ctx.fuel_openstack_creds, nodes = discover.discover(ctx,
-                                                                discover_objs,
-                                                                ctx.config.clouds,
-                                                                not ctx.config.dont_discover_nodes)
-
-            ctx.storage["fuel_openstack_creds"] = ctx.fuel_openstack_creds  # type: ignore
-            ctx.storage["discovered_nodes"] = nodes  # type: ignore
-        ctx.nodes_info.extend(nodes)
-
-    for url, roles in ctx.config.get('explicit_nodes', {}).items():
-        creds = ssh_utils.parse_ssh_uri(url)
-        roles = set(roles.split(","))
-        ctx.nodes_info.append(NodeInfo(creds, roles))
-
-
-def save_nodes_stage(ctx: TestRun) -> None:
-    """Save nodes list to file"""
-    ctx.storage['nodes'] = ctx.nodes_info   # type: ignore
-
-
-def ensure_connected_to_openstack(ctx: TestRun) -> None:
-    if not ctx.os_connection is None:
-        if ctx.os_creds is None:
-            ctx.os_creds = get_OS_credentials(ctx)
-        ctx.os_connection = start_vms.os_connect(ctx.os_creds)
-
-
-def reuse_vms_stage(ctx: TestRun) -> None:
-    if "reused_nodes" in ctx.storage:
-        ctx.nodes_info.extend(ctx.storage.load_list("reused_nodes", NodeInfo))
-    else:
-        reused_nodes = []
-        vms_patterns = ctx.config.get('clouds/openstack/vms', [])
-        private_key_path = get_vm_keypair_path(ctx.config)[0]
-
-        for creds in vms_patterns:
-            user_name, vm_name_pattern = creds.split("@", 1)
-            msg = "Vm like {} lookup failed".format(vm_name_pattern)
-
-            with utils.LogError(msg):
-                msg = "Looking for vm with name like {0}".format(vm_name_pattern)
-                logger.debug(msg)
-
-                ensure_connected_to_openstack(ctx)
-
-                for ip, vm_id in start_vms.find_vms(ctx.os_connection, vm_name_pattern):
-                    creds = ssh_utils.ConnCreds(host=ip, user=user_name, key_file=private_key_path)
-                    node_info = NodeInfo(creds, {'testnode'})
-                    node_info.os_vm_id = vm_id
-                    reused_nodes.append(node_info)
-                    ctx.nodes_info.append(node_info)
-
-        ctx.storage["reused_nodes"] = reused_nodes  # type: ignore
-
-
-def get_OS_credentials(ctx: TestRun) -> start_vms.OSCreds:
-
-    if "openstack_openrc" in ctx.storage:
-        return ctx.storage.load("openstack_openrc", start_vms.OSCreds)
-
-    creds = None
-    os_creds = None
-    force_insecure = False
-    cfg = ctx.config
-
-    if 'openstack' in cfg.clouds:
-        os_cfg = cfg.clouds['openstack']
-        if 'OPENRC' in os_cfg:
-            logger.info("Using OS credentials from " + os_cfg['OPENRC'])
-            creds_tuple = utils.get_creds_openrc(os_cfg['OPENRC'])
-            os_creds = start_vms.OSCreds(*creds_tuple)
-        elif 'ENV' in os_cfg:
-            logger.info("Using OS credentials from shell environment")
-            os_creds = start_vms.get_openstack_credentials()
-        elif 'OS_TENANT_NAME' in os_cfg:
-            logger.info("Using predefined credentials")
-            os_creds = start_vms.OSCreds(os_cfg['OS_USERNAME'].strip(),
-                                         os_cfg['OS_PASSWORD'].strip(),
-                                         os_cfg['OS_TENANT_NAME'].strip(),
-                                         os_cfg['OS_AUTH_URL'].strip(),
-                                         os_cfg.get('OS_INSECURE', False))
-
-        elif 'OS_INSECURE' in os_cfg:
-            force_insecure = os_cfg.get('OS_INSECURE', False)
-
-    if os_creds is None and 'fuel' in cfg.clouds and 'openstack_env' in cfg.clouds['fuel'] and \
-            ctx.fuel_openstack_creds is not None:
-        logger.info("Using fuel creds")
-        creds = ctx.fuel_openstack_creds
-    elif os_creds is None:
-        logger.error("Can't found OS credentials")
-        raise utils.StopTestError("Can't found OS credentials", None)
-
-    if creds is None:
-        creds = os_creds
-
-    if force_insecure and not creds.insecure:
-        creds = start_vms.OSCreds(creds.name,
-                                  creds.passwd,
-                                  creds.tenant,
-                                  creds.auth_url,
-                                  True)
-
-    logger.debug(("OS_CREDS: user={0.name} tenant={0.tenant} " +
-                  "auth_url={0.auth_url} insecure={0.insecure}").format(creds))
-
-    ctx.storage["openstack_openrc"] = creds  # type: ignore
-    return creds
-
-
-def get_vm_keypair_path(cfg: Config) -> Tuple[str, str]:
-    key_name = cfg.vm_configs['keypair_name']
-    private_path = os.path.join(cfg.settings_dir, key_name + "_private.pem")
-    public_path = os.path.join(cfg.settings_dir, key_name + "_public.pub")
-    return (private_path, public_path)
-
-
-@contextlib.contextmanager
-def create_vms_ctx(ctx: TestRun, vm_config: ConfigBlock, already_has_count: int = 0) -> Iterator[List[NodeInfo]]:
-
-    if 'spawned_vm_ids' in ctx.storage:
-        os_nodes_ids = ctx.storage.get('spawned_vm_ids', [])  # type: List[int]
-        new_nodes = []  # type: List[NodeInfo]
-
-        # TODO(koder): reconnect to old VM's
-        raise NotImplementedError("Reconnect to old vms is not implemented")
-    else:
-        os_nodes_ids = []
-        new_nodes = []
-        no_spawn = False
-        if vm_config['count'].startswith('='):
-            count = int(vm_config['count'][1:])
-            if count <= already_has_count:
-                logger.debug("Not need new vms")
-                no_spawn = True
-
-        if not no_spawn:
-            ensure_connected_to_openstack(ctx)
-            params = ctx.config.vm_configs[vm_config['cfg_name']].copy()
-            params.update(vm_config)
-            params.update(get_vm_keypair_path(ctx.config))
-            params['group_name'] = ctx.config.run_uuid
-            params['keypair_name'] = ctx.config.vm_configs['keypair_name']
-
-            if not vm_config.get('skip_preparation', False):
-                logger.info("Preparing openstack")
-                start_vms.prepare_os(ctx.os_connection, params)
-
-            with ctx.get_pool() as pool:
-                for node_info in start_vms.launch_vms(ctx.os_connection, params, pool, already_has_count):
-                    node_info.roles.add('testnode')
-                    os_nodes_ids.append(node_info.os_vm_id)
-                    new_nodes.append(node_info)
-
-        ctx.storage['spawned_vm_ids'] = os_nodes_ids  # type: ignore
-        yield new_nodes
-
-        # keep nodes in case of error for future test restart
-        if not ctx.config.keep_vm:
-            shut_down_vms_stage(ctx, os_nodes_ids)
-
-        del ctx.storage['spawned_vm_ids']
-
-
-@contextlib.contextmanager
-def sensor_monitoring(ctx: TestRun, cfg: ConfigBlock, nodes: List[IRPCNode]) -> Iterator[None]:
-    yield
-
-
-def run_tests_stage(ctx: TestRun) -> None:
-    for group in ctx.config.get('tests', []):
-        gitems = list(group.items())
-        if len(gitems) != 1:
-            msg = "Items in tests section should have len == 1"
-            logger.error(msg)
-            raise utils.StopTestError(msg)
-
-        key, config = gitems[0]
-
-        if 'start_test_nodes' == key:
-            if 'openstack' not in config:
-                msg = "No openstack block in config - can't spawn vm's"
+            if failed_testnodes:
+                msg = "Can't connect to testnode(s) " + \
+                      ",".join(map(str, failed_testnodes))
                 logger.error(msg)
                 raise utils.StopTestError(msg)
 
-            num_test_nodes = len([node for node in ctx.nodes if 'testnode' in node.info.roles])
-            vm_ctx = create_vms_ctx(ctx, config['openstack'], num_test_nodes)
-            tests = config.get('tests', [])
-        else:
-            vm_ctx = utils.empty_ctx([])
-            tests = [group]
+            if not failed_nodes:
+                logger.info("All nodes connected successfully")
 
-        # make mypy happy
-        new_nodes = []  # type: List[NodeInfo]
+    def cleanup(self, ctx: TestRun) -> None:
+        # TODO(koder): what next line was for?
+        # ssh_utils.close_all_sessions()
 
-        with vm_ctx as new_nodes:
-            if new_nodes:
-                with ctx.get_pool() as pool:
-                    new_rpc_nodes = connect_all(new_nodes, pool)
+        for node in ctx.nodes:
+            node.disconnect()
 
-            test_nodes = ctx.nodes + new_rpc_nodes
 
-            if ctx.config.get('sensors'):
-                sensor_ctx = sensor_monitoring(ctx, ctx.config.get('sensors'), test_nodes)
-            else:
-                sensor_ctx = utils.empty_ctx([])
+class CollectInfoStage(Stage):
+    """Collect node info"""
 
+    priority = StepOrder.START_SENSORS - 1
+    config_block = 'collect_info'
+
+    def run(self, ctx: TestRun) -> None:
+        if not ctx.config.collect_info:
+            return
+
+        futures = {}  # type: Dict[str, Future]
+
+        with ctx.get_pool() as pool:
+            for node in ctx.nodes:
+                hw_info_path = "hw_info/{}".format(node.info.node_id())
+                if hw_info_path not in ctx.storage:
+                    futures[hw_info_path] = pool.submit(hw_info.get_hw_info, node), node
+
+                sw_info_path = "sw_info/{}".format(node.info.node_id())
+                if sw_info_path not in ctx.storage:
+                    futures[sw_info_path] = pool.submit(hw_info.get_sw_info, node)
+
+            for path, future in futures.items():
+                ctx.storage[path] = future.result()
+
+
+class ExplicitNodesStage(Stage):
+    """add explicit nodes"""
+
+    priority = StepOrder.DISCOVER
+    config_block = 'nodes'
+
+    def run(self, ctx: TestRun) -> None:
+        explicit_nodes = []
+        for url, roles in ctx.config.get('explicit_nodes', {}).items():
+            creds = ssh_utils.parse_ssh_uri(url)
+            roles = set(roles.split(","))
+            explicit_nodes.append(NodeInfo(creds, roles))
+
+        ctx.nodes_info.extend(explicit_nodes)
+        ctx.storage['explicit_nodes'] = explicit_nodes  # type: ignore
+
+
+class SaveNodesStage(Stage):
+    """Save nodes list to file"""
+
+    priority = StepOrder.CONNECT
+
+    def run(self, ctx: TestRun) -> None:
+        ctx.storage['all_nodes'] = ctx.nodes_info   # type: ignore
+
+
+class RunTestsStage(Stage):
+
+    priority = StepOrder.TEST
+    config_block = 'tests'
+
+    def run(self, ctx: TestRun) -> None:
+        for test_group in ctx.config.get('tests', []):
             if not ctx.config.no_tests:
-                for test_group in tests:
-                    with sensor_ctx:
-                        run_tests(ctx, test_group, test_nodes)
+                test_nodes = [node for node in ctx.nodes if 'testnode' in node.info.roles]
 
-            for node in new_rpc_nodes:
-                node.disconnect()
+                if not test_nodes:
+                    logger.error("No test nodes found")
+                    return
 
+                for name, params in test_group.items():
+                    vm_count = params.get('node_limit', None)  # type: Optional[int]
 
-def clouds_connect_stage(ctx: TestRun) -> None:
-    # TODO(koder): need to use this to connect to openstack in upper code
-    # conn = ctx.config['clouds/openstack']
-    # user, passwd, tenant = parse_creds(conn['creds'])
-    # auth_data = dict(auth_url=conn['auth_url'],
-    #                  username=user,
-    #                  api_key=passwd,
-    #                  project_id=tenant)  # type: Dict[str, str]
-    # logger.debug("Discovering openstack nodes with connection details: %r", conn)
-    # connect to openstack, fuel
+                    # select test nodes
+                    if vm_count is None:
+                        curr_test_nodes = test_nodes
+                    else:
+                        curr_test_nodes = test_nodes[:vm_count]
 
-    # # parse FUEL REST credentials
-    # username, tenant_name, password = parse_creds(fuel_data['creds'])
-    # creds = {"username": username,
-    #          "tenant_name": tenant_name,
-    #          "password": password}
-    #
-    # # connect to FUEL
-    # conn = fuel_rest_api.KeystoneAuth(fuel_data['url'], creds, headers=None)
-    pass
+                    if not curr_test_nodes:
+                        logger.error("No nodes found for test, skipping it.")
+                        continue
 
+                    test_cls = TOOL_TYPE_MAPPER[name]
+                    remote_dir = ctx.config.default_test_local_folder.format(name=name, uuid=ctx.config.run_uuid)
+                    test_cfg = TestInputConfig(test_cls.__name__,
+                                               params=params,
+                                               run_uuid=ctx.config.run_uuid,
+                                               nodes=test_nodes,
+                                               storage=ctx.storage,
+                                               remote_dir=remote_dir)
 
-def shut_down_vms_stage(ctx: TestRun, nodes_ids: List[int]) -> None:
-    if nodes_ids:
-        logger.info("Removing nodes")
-        start_vms.clear_nodes(ctx.os_connection, nodes_ids)
-        logger.info("Nodes has been removed")
+                    test_cls(test_cfg).run()
 
-
-def clear_enviroment(ctx: TestRun) -> None:
-    shut_down_vms_stage(ctx, ctx.storage.get('spawned_vm_ids', []))
-    ctx.storage['spawned_vm_ids'] = []  # type: ignore
-
-
-def disconnect_stage(ctx: TestRun) -> None:
-    # TODO(koder): what next line was for?
-    # ssh_utils.close_all_sessions()
-
-    for node in ctx.nodes:
-        node.disconnect()
-
-
-def console_report_stage(ctx: TestRun) -> None:
-    # TODO(koder): load data from storage
-    raise NotImplementedError("...")
-
-def html_report_stage(ctx: TestRun) -> None:
-    # TODO(koder): load data from storage
-    raise NotImplementedError("...")
+    @classmethod
+    def validate_config(cls, cfg: ConfigBlock) -> None:
+        pass
diff --git a/wally/sensors.py b/wally/sensors.py
index b579f3f..c86aeb4 100644
--- a/wally/sensors.py
+++ b/wally/sensors.py
@@ -1,62 +1,69 @@
-from typing import List, Dict, Tuple
+from typing import List, Dict, Tuple, Any
+
 from .test_run_class import TestRun
 from . import sensors_rpc_plugin
-
+from .stage import Stage, StepOrder
 
 plugin_fname = sensors_rpc_plugin.__file__.rsplit(".", 1)[0] + ".py"
 SENSORS_PLUGIN_CODE = open(plugin_fname).read()
 
 
 # TODO(koder): in case if node has more than one role sensor settigns might be incorrect
-def start_sensors_stage(ctx: TestRun) -> None:
-    if 'sensors' not in ctx.config:
-        return
 
-    per_role_config = {}
-    for name, val in ctx.config['sensors'].copy():
-        if isinstance(val, str):
-            val = {vl.strip(): ".*" for vl in val.split(",")}
-        elif isinstance(val, list):
-            val = {vl: ".*" for vl in val}
-        per_role_config[name] = val
+class StartSensorsStage(Stage):
+    priority = StepOrder.START_SENSORS
+    config_block = 'sensors'
 
-    if 'all' in per_role_config:
-        all_vl = per_role_config.pop('all')
-        all_roles = set(per_role_config)
+    def run(self, ctx: TestRun) -> None:
+        if 'sensors' not in ctx.config:
+            return
+
+        per_role_config = {}  # type: Dict[str, Dict[str, str]]
+        for name, val in ctx.config['sensors'].copy():
+            if isinstance(val, str):
+                val = {vl.strip(): ".*" for vl in val.split(",")}
+            elif isinstance(val, list):
+                val = {vl: ".*" for vl in val}
+            per_role_config[name] = val
+
+        if 'all' in per_role_config:
+            all_vl = per_role_config.pop('all')
+            all_roles = set(per_role_config)
+
+            for node in ctx.nodes:
+                all_roles.update(node.info.roles)
+
+            for name, vals in list(per_role_config.items()):
+                new_vals = all_vl.copy()
+                new_vals.update(vals)
+                per_role_config[name] = new_vals
 
         for node in ctx.nodes:
-            all_roles.update(node.info.roles)
+            node_cfg = {}  # type: Dict[str, str]
+            for role in node.info.roles:
+                node_cfg.update(per_role_config.get(role, {}))
 
-        for name, vals in list(per_role_config.items()):
-            new_vals = all_roles.copy()
-            new_vals.update(vals)
-            per_role_config[name] = new_vals
-
-    for node in ctx.nodes:
-        node_cfg = {}
-        for role in node.info.roles:
-            node_cfg.update(per_role_config.get(role, {}))
-
-        if node_cfg:
-            node.conn.upload_plugin(SENSORS_PLUGIN_CODE)
-            ctx.sensors_run_on.add(node.info.node_id())
-        node.conn.sensors.start()
+            if node_cfg:
+                node.conn.upload_plugin(SENSORS_PLUGIN_CODE)
+                ctx.sensors_run_on.add(node.info.node_id())
+            node.conn.sensors.start()
 
 
-def collect_sensors_stage(ctx: TestRun, stop: bool = True) -> None:
-    for node in ctx.nodes:
-        node_id = node.info.node_id()
-        if node_id in ctx.sensors_run_on:
+class CollectSensorsStage(Stage):
+    priority = StepOrder.COLLECT_SENSORS
+    config_block = 'sensors'
 
-            if stop:
+    def run(self, ctx: TestRun) -> None:
+        for node in ctx.nodes:
+            node_id = node.info.node_id()
+            if node_id in ctx.sensors_run_on:
+
                 data, collected_at = node.conn.sensors.stop()  # type: Dict[Tuple[str, str], List[int]], List[float]
-            else:
-                data, collected_at = node.conn.sensors.get_updates()
 
-            for (source_name, sensor_name), values in data.items():
-                path = "metric/{}/{}/{}".format(node_id, source_name, sensor_name)
-                ctx.storage.append(path, values)
-                ctx.storage.append("metric/{}/collected_at".format(node_id), collected_at)
+                mstore = ctx.storage.sub_storage("metric", node_id)
+                for (source_name, sensor_name), values in data.items():
+                    mstore[source_name, sensor_name] = values
+                    mstore["collected_at"] = collected_at
 
 
 # def delta(func, only_upd=True):
diff --git a/wally/stage.py b/wally/stage.py
index 0451d18..95542f3 100644
--- a/wally/stage.py
+++ b/wally/stage.py
@@ -1,27 +1,36 @@
-import logging
-import contextlib
-from typing import Callable, Iterator
+import abc
+from typing import Optional
 
-from .utils import StopTestError
 from .test_run_class import TestRun
+from .config import ConfigBlock
 
 
-logger = logging.getLogger("wally")
+class StepOrder:
+    DISCOVER = 0
+    SPAWN = 10
+    CONNECT = 20
+    START_SENSORS = 30
+    TEST = 40
+    COLLECT_SENSORS = 50
+    REPORT = 60
 
 
-@contextlib.contextmanager
-def log_stage(stage) -> Iterator[None]:
-    msg_templ = "Exception during {0}: {1!s}"
-    msg_templ_no_exc = "During {0}"
+class Stage(metaclass=abc.ABCMeta):
+    priority = None  # type: int
+    config_block = None  # type: Optional[str]
 
-    logger.info("Start " + stage.name)
+    @classmethod
+    def name(cls) -> str:
+        return cls.__name__
 
-    try:
-        yield
-    except StopTestError as exc:
-        logger.error(msg_templ.format(stage.__name__, exc))
-    except Exception:
-        logger.exception(msg_templ_no_exc.format(stage.__name__))
+    @classmethod
+    def validate_config(cls, cfg: ConfigBlock) -> None:
+        pass
 
+    @abc.abstractmethod
+    def run(self, ctx: TestRun) -> None:
+        pass
 
-StageType = Callable[[TestRun], None]
+    def cleanup(self, ctx: TestRun) -> None:
+        pass
+
diff --git a/wally/storage.py b/wally/storage.py
index 05e4259..540da88 100644
--- a/wally/storage.py
+++ b/wally/storage.py
@@ -4,30 +4,24 @@
 
 import os
 import abc
-from typing import Any, Iterable, TypeVar, Type, IO, Tuple, cast, List
+import array
+from typing import Any, Iterator, TypeVar, Type, IO, Tuple, cast, List, Dict, Union, Iterable
+
+
+import yaml
+try:
+    from yaml import CLoader as Loader, CDumper as Dumper  # type: ignore
+except ImportError:
+    from yaml import Loader, Dumper  # type: ignore
 
 
 class IStorable(metaclass=abc.ABCMeta):
     """Interface for type, which can be stored"""
-    @abc.abstractmethod
-    def __getstate__(self) -> Any:
-        pass
 
-    @abc.abstractmethod
-    def __setstate__(self, Any):
-        pass
-
-
-# all builtin types can be stored
-IStorable.register(list)  # type: ignore
-IStorable.register(dict)  # type: ignore
-IStorable.register(tuple)  # type: ignore
-IStorable.register(set)  # type: ignore
-IStorable.register(None)  # type: ignore
-IStorable.register(int)  # type: ignore
-IStorable.register(str)  # type: ignore
-IStorable.register(bytes)  # type: ignore
-IStorable.register(bool)  # type: ignore
+basic_types = {list, dict, tuple, set, type(None), int, str, bytes, bool, float}
+for btype in basic_types:
+    # pylint: disable=E1101
+    IStorable.register(btype)  # type: ignore
 
 
 ObjClass = TypeVar('ObjClass')
@@ -54,11 +48,15 @@
         pass
 
     @abc.abstractmethod
-    def list(self, path: str) -> Iterable[Tuple[bool, str]]:
+    def list(self, path: str) -> Iterator[Tuple[bool, str]]:
         pass
 
     @abc.abstractmethod
-    def get_stream(self, path: str) -> IO:
+    def get_stream(self, path: str, mode: str = "rb+") -> IO:
+        pass
+
+    @abc.abstractmethod
+    def sub_storage(self, path: str) -> 'ISimpleStorage':
         pass
 
 
@@ -78,14 +76,18 @@
 
     def __init__(self, root_path: str, existing: bool) -> None:
         self.root_path = root_path
+        self.existing = existing
         if existing:
             if not os.path.isdir(self.root_path):
-                raise ValueError("No storage found at {!r}".format(root_path))
+                raise IOError("No storage found at {!r}".format(root_path))
+
+    def j(self, path: str) -> str:
+        return os.path.join(self.root_path, path)
 
     def __setitem__(self, path: str, value: bytes) -> None:
-        path = os.path.join(self.root_path, path)
-        os.makedirs(os.path.dirname(path), exist_ok=True)
-        with open(path, "wb") as fd:
+        jpath = self.j(path)
+        os.makedirs(os.path.dirname(jpath), exist_ok=True)
+        with open(jpath, "wb") as fd:
             fd.write(value)
 
     def __delitem__(self, path: str) -> None:
@@ -95,32 +97,53 @@
             pass
 
     def __getitem__(self, path: str) -> bytes:
-        path = os.path.join(self.root_path, path)
-        with open(path, "rb") as fd:
+        with open(self.j(path), "rb") as fd:
             return fd.read()
 
     def __contains__(self, path: str) -> bool:
-        path = os.path.join(self.root_path, path)
-        return os.path.exists(path)
+        return os.path.exists(self.j(path))
 
-    def list(self, path: str) -> Iterable[Tuple[bool, str]]:
-        path = os.path.join(self.root_path, path)
-        for entry in os.scandir(path):
+    def list(self, path: str = "") -> Iterator[Tuple[bool, str]]:
+        jpath = self.j(path)
+        for entry in os.scandir(jpath):
             if not entry.name in ('..', '.'):
                 yield entry.is_file(), entry.name
 
-    def get_stream(self, path: str, mode: str = "rb") -> IO:
-        path = os.path.join(self.root_path, path)
-        return open(path, mode)
+    def get_stream(self, path: str, mode: str = "rb+") -> IO[bytes]:
+        jpath = self.j(path)
+
+        if "cb" == mode:
+            create_on_fail = True
+            mode = "rb+"
+        else:
+            create_on_fail = False
+
+        try:
+            fd = open(jpath, mode)
+        except IOError:
+            if not create_on_fail:
+                raise
+            fd = open(jpath, "wb")
+
+        return cast(IO[bytes], fd)
+
+    def sub_storage(self, path: str) -> 'FSStorage':
+        return self.__class__(self.j(path), self.existing)
 
 
 class YAMLSerializer(ISerializer):
     """Serialize data to yaml"""
-    def pack(self, value: IStorable) -> bytes:
-        raise NotImplementedError()
+    def pack(self, value: Any) -> bytes:
+        if type(value) not in basic_types:
+            for name, val in value.__dict__.items():
+                if type(val) not in basic_types:
+                    raise ValueError(("Can't pack {!r}. Attribute {} has value {!r} (type: {}), but only" +
+                                      " basic types accepted as attributes").format(value, name, val, type(val)))
+            value = value.__dict__
+        return yaml.dump(value, Dumper=Dumper, encoding="utf8")
 
     def unpack(self, data: bytes) -> IStorable:
-        raise NotImplementedError()
+        return yaml.load(data, Loader=Loader)
 
 
 class Storage:
@@ -129,32 +152,65 @@
         self.storage = storage
         self.serializer = serializer
 
-    def __setitem__(self, path: str, value: IStorable) -> None:
-        self.storage[path] = self.serializer.pack(value)
+    def sub_storage(self, *path: str) -> 'Storage':
+        return self.__class__(self.storage.sub_storage("/".join(path)), self.serializer)
 
-    def __getitem__(self, path: str) -> IStorable:
+    def __setitem__(self, path: Union[str, Iterable[str]], value: Any) -> None:
+        if not isinstance(path, str):
+            path = "/".join(path)
+
+        self.storage[path] = self.serializer.pack(cast(IStorable, value))
+
+    def __getitem__(self, path: Union[str, Iterable[str]]) -> IStorable:
+        if not isinstance(path, str):
+            path = "/".join(path)
+
         return self.serializer.unpack(self.storage[path])
 
-    def __delitem__(self, path: str) -> None:
+    def __delitem__(self, path: Union[str, Iterable[str]]) -> None:
+        if not isinstance(path, str):
+            path = "/".join(path)
+
         del self.storage[path]
 
-    def __contains__(self, path: str) -> bool:
+    def __contains__(self, path: Union[str, Iterable[str]]) -> bool:
+        if not isinstance(path, str):
+            path = "/".join(path)
         return path in self.storage
 
-    def list(self, *path: str) -> Iterable[Tuple[bool, str]]:
+    def list(self, *path: str) -> Iterator[Tuple[bool, str]]:
         return self.storage.list("/".join(path))
 
-    def construct(self, path: str, raw_val: IStorable, obj_class: Type[ObjClass]) -> ObjClass:
+    def set_array(self, value: array.array, *path: str) -> None:
+        with self.get_stream("/".join(path), "wb") as fd:
+            value.tofile(fd)  # type: ignore
+
+    def get_array(self, typecode: str, *path: str) -> array.array:
+        res = array.array(typecode)
+        path_s = "/".join(path)
+        with self.get_stream(path_s, "rb") as fd:
+            fd.seek(0, os.SEEK_END)
+            size = fd.tell()
+            fd.seek(0, os.SEEK_SET)
+            assert size % res.itemsize == 0, "Storage object at path {} contains no array of {} or corrupted."\
+                .format(path_s, typecode)
+            res.fromfile(fd, size // res.itemsize)  # type: ignore
+        return res
+
+    def append(self, value: array.array, *path: str) -> None:
+        with self.get_stream("/".join(path), "cb") as fd:
+            fd.seek(0, os.SEEK_END)
+            value.tofile(fd)  # type: ignore
+
+    def construct(self, path: str, raw_val: Dict, obj_class: Type[ObjClass]) -> ObjClass:
+        "Internal function, used to construct user type from raw unpacked value"
         if obj_class in (int, str, dict, list, None):
-            if not isinstance(raw_val, obj_class):
-                raise ValueError("Can't load path {!r} into type {}. Real type is {}"
-                                 .format(path, obj_class, type(raw_val)))
-            return cast(ObjClass, raw_val)
+            raise ValueError("Can't load into build-in value - {!r} into type {}")
 
         if not isinstance(raw_val, dict):
             raise ValueError("Can't load path {!r} into python type. Raw value not dict".format(path))
 
-        if not all(isinstance(str, key) for key in raw_val.keys):
+        if not all(isinstance(key, str) for key in raw_val.keys()):
             raise ValueError("Can't load path {!r} into python type.".format(path) +
                              "Raw not all keys in raw value is strings")
 
@@ -170,19 +226,25 @@
 
     def load(self, obj_class: Type[ObjClass], *path: str) -> ObjClass:
         path_s = "/".join(path)
-        return self.construct(path_s, self[path_s], obj_class)
+        return self.construct(path_s, cast(Dict, self[path_s]), obj_class)
 
-    def get_stream(self, *path: str) -> IO:
-        return self.storage.get_stream("/".join(path))
+    def get_stream(self, path: str, mode: str = "r") -> IO:
+        return self.storage.get_stream(path, mode)
 
-    def get(self, path: str, default: Any = None) -> Any:
+    def get(self, path: Union[str, Iterable[str]], default: Any = None) -> Any:
+        if not isinstance(path, str):
+            path = "/".join(path)
+
         try:
             return self[path]
-        except KeyError:
+        except Exception:
             return default
 
-    def append(self, path: str, data: List):
-        raise NotImplemented()
+    def __enter__(self) -> 'Storage':
+        return self
+
+    def __exit__(self, x: Any, y: Any, z: Any) -> None:
+        return
 
 
 def make_storage(url: str, existing: bool = False) -> Storage:
diff --git a/wally/storage_structure.txt b/wally/storage_structure.txt
index 583a4a0..5715046 100644
--- a/wally/storage_structure.txt
+++ b/wally/storage_structure.txt
@@ -1,10 +1,19 @@
 config: Config - full configuration
-nodes: List[NodeInfo] - all nodes
-fuel_openstack_creds: OSCreds - openstack creds, discovered from fuel (or None)
+all_nodes: List[NodeInfo] - all nodes
+
+fuel:
+    version: List[int] - FUEL master node version
+    os_creds: OSCreds - openstack creds, discovered from fuel (or None)
+    nodes: List[NodeInfo] - FUEL cluster nodes
+
 openstack_openrc: OSCreds - openrc used for openstack cluster
-discovered_nodes: List[NodeInfo] - list of discovered nodes
-reused_nodes: List[NodeInfo] - list of reused nodes from cluster
-spawned_vm_ids: List[int] - list of openstack VM id's, spawned for test
+
+openstack_nodes: List[NodeInfo] - list of openstack nodes
+reused_os_nodes: List[NodeInfo] - list of openstack VM, reused in test
+spawned_os_nodes: List[NodeInfo] - list of openstack VM, spawned for test
+ceph_nodes: List[NodeInfo] - list of ceph nodes
+explicit_nodes: List[NodeInfo] - list of explicit nodes
+
 info/comment : str - run comment
 info/run_uuid : str - run uuid
 info/run_time : float - run unix time
diff --git a/wally/suits/io/fio.py b/wally/suits/io/fio.py
index 0f4ebde..1b5f38e 100644
--- a/wally/suits/io/fio.py
+++ b/wally/suits/io/fio.py
@@ -83,7 +83,7 @@
             node.conn.rmdir(self.config.remote_dir, recursive=True, ignore_missing=True)
             node.conn.mkdir(self.config.remote_dir)
         except Exception as exc:
-            msg = "Failed to create folder {} on remote {}.".format(self.config.remote_dir, node, exc)
+            msg = "Failed to create folder {} on remote {}.".format(self.config.remote_dir, node)
             logger.exception(msg)
             raise StopTestError(msg) from exc
 
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index 6d1eeee..ef69b05 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -74,7 +74,7 @@
         pass
 
     @abc.abstractmethod
-    def format_for_console(cls, data: Any) -> str:
+    def format_for_console(self, data: Any) -> str:
         pass
 
 
@@ -122,9 +122,9 @@
                 }
 
                 assert info == expected_config, \
-                    "Test info at path {} is not equal to expected config." + \
-                    "Maybe configuration was changed before test was restarted. " + \
-                    "Current cfg is {!r}, expected cfg is {!r}".format(info_path, info, expected_config)
+                    ("Test info at path {} is not equal to expected config." +
+                     "Maybe configuration was changed before test was restarted. " +
+                     "Current cfg is {!r}, expected cfg is {!r}").format(info_path, info, expected_config)
 
                 logger.info("Test iteration {} found in storage and will be skipped".format(iter_name))
             else:
@@ -181,10 +181,10 @@
                 start_times = []  # type: List[int]
                 stop_times = []  # type: List[int]
 
+                mstorage = storage.sub_storage("result", str(run_id), "measurement")
                 for (result, (t_start, t_stop)), node in zip(results, self.config.nodes):
                     for metrics_name, data in result.items():
-                        path = "result/{}/measurement/{}/{}".format(run_id, node.info.node_id(), metrics_name)
-                        storage[path] = data  # type: ignore
+                        mstorage[node.info.node_id(), metrics_name] = data  # type: ignore
                     start_times.append(t_start)
                     stop_times.append(t_stop)
 
@@ -214,7 +214,7 @@
                     'end_time': max_stop_time
                 }
 
-                storage["result/{}/info".format(run_id)] = test_config  # type: ignore
+                storage["result", str(run_id), "info"] = test_config  # type: ignore
 
     @abc.abstractmethod
     def config_node(self, node: IRPCNode) -> None:
diff --git a/wally/test_run_class.py b/wally/test_run_class.py
index db41a46..30c46e7 100644
--- a/wally/test_run_class.py
+++ b/wally/test_run_class.py
@@ -4,7 +4,7 @@
 
 from .timeseries import SensorDatastore
 from .node_interfaces import NodeInfo, IRPCNode
-from .start_vms import OSCreds, OSConnection
+from .openstack_api import OSCreds, OSConnection
 from .storage import Storage
 from .config import Config
 from .fuel_rest_api import Connection
@@ -24,6 +24,7 @@
 
         # openstack credentials
         self.fuel_openstack_creds = None  # type: Optional[OSCreds]
+        self.fuel_version = None  # type: Optional[List[int]]
         self.os_creds = None  # type: Optional[OSCreds]
         self.os_connection = None  # type: Optional[OSConnection]
         self.fuel_conn = None  # type: Optional[Connection]
@@ -33,6 +34,7 @@
         self.config = config
         self.sensors_data = SensorDatastore()
         self.sensors_run_on = set()  # type: Set[str]
+        self.os_spawned_nodes_ids = None  # type: List[int]
 
     def get_pool(self):
         return ThreadPoolExecutor(self.config.get('worker_pool_sz', 32))
diff --git a/wally/utils.py b/wally/utils.py
index b27b1ba..45b67b4 100644
--- a/wally/utils.py
+++ b/wally/utils.py
@@ -260,7 +260,7 @@
     raise OSError("Can't define interface for {0}".format(target_ip))
 
 
-def open_for_append_or_create(fname: str) -> IO:
+def open_for_append_or_create(fname: str) -> IO[str]:
     if not os.path.exists(fname):
         return open(fname, "w")
 
@@ -289,18 +289,6 @@
     return data
 
 
-CLEANING = []  # type: List[Tuple[Callable[..., Any], List[Any], Dict[str, Any]]]
-
-
-def clean_resource(func: Callable[..., Any], *args: Any, **kwargs: Any) -> None:
-    CLEANING.append((func, list(args), kwargs))
-
-
-def iter_clean_func() -> Iterator[Tuple[Callable[..., Any], List[Any], Dict[str, Any]]]:
-    while CLEANING:
-        yield CLEANING.pop()
-
-
 def flatten(data: Iterable[Any]) -> List[Any]:
     res = []
     for i in data: