Add stage base class, refactor discovery, etc
diff --git a/Makefile b/Makefile
index b14cd96..ddc1a48 100644
--- a/Makefile
+++ b/Makefile
@@ -1,8 +1,18 @@
-.PHONY: mypy
+.PHONY: mypy pylint pylint_e
 ALL_FILES=$(shell find wally/ -type f -name '*.py')
+ACTIVATE=cd ~/workspace/wally; source .env/bin/activate
-		bash -c "cd ~/workspace/wally; source .env/bin/activate ; MYPYPATH=${STUBS} python3 -m mypy -s ${ALL_FILES}"
+		bash -c "${ACTIVATE}; MYPYPATH=${STUBS} python3 -m mypy -s ${ALL_FILES}"
+PYLINT_FMT=--msg-template={path}:{line}: [{msg_id}({symbol}), {obj}] {msg}
+		bash -c "${ACTIVATE} ; python3 -m pylint '${PYLINT_FMT}' --rcfile=pylint.rc ${ALL_FILES}"
+		bash -c "${ACTIVATE} ; python3 -m pylint -E '${PYLINT_FMT}' --rcfile=pylint.rc ${ALL_FILES}"
diff --git a/configs-examples/full.yaml b/configs-examples/full.yaml
index 0874f4f..5c336ff 100644
--- a/configs-examples/full.yaml
+++ b/configs-examples/full.yaml
@@ -32,7 +32,7 @@
         aa_group_name: wally-aa-{0}
         security_group: wally_ssh_to_everyone
         creds: admin:admin@admin
diff --git a/wally/discover/ b/configs-examples/local_lxc_ceph.yaml
similarity index 100%
rename from wally/discover/
rename to configs-examples/local_lxc_ceph.yaml
diff --git a/configs-examples/v2_default.yaml b/configs-examples/v2_default.yaml
new file mode 100644
index 0000000..931d101
--- /dev/null
+++ b/configs-examples/v2_default.yaml
@@ -0,0 +1,106 @@
+#  ------------------------------------    CONFIGS   -------------------------------------------------------------------
+    url:
+    creds: admin:admin@admin
+    ssh_creds: root:r00tme
+    openstack_env: test
+    skip_preparation: false
+    openrc: /home/koder/workspace/scale_openrc
+    openrc:
+        user: USER
+        passwd: PASSWD
+        tenant: KEY_FILE
+        auth_url: URL
+    vms:
+collect_info: true
+var_dir_root: /tmp/perf_tests
+settings_dir: ~/.wally
+    extra_logs: 1
+    level: DEBUG
+    keypair_file_private: wally_vm_key_perf3.pem
+    keypair_file_public:
+    keypair_name: wally_vm_key
+    wally_1024:
+        image:
+            name: wally_ubuntu
+            user: ubuntu
+            url:
+        flavor:
+            name: wally_1024
+            hdd_size: 100
+            ram_size: 1024
+            cpu_count: 2
+        vol_sz: 100
+        name_templ: wally-{group}-{id}
+        aa_group_name: wally-aa-{0}
+        security_group: wally_ssh_to_everyone
+ceph: nodeep-scrub, noscrub
+#-----------------------------------------    STEPS   ------------------------------------------------------------------
+# discover: ...
+# spawn: ...
+# connect: ...
+# sensors: ...
+# test: ...
+   online: true
+   roles_mapping:
+       testnode: system-cpu, block-io, net-io
+       ceph-osd: system-cpu, block-io, net-io, ceph
+       compute:
+            system-cpu: *
+            block-io: sd*
+            net-io: *
+#----------------------------------   TEST PROFILES --------------------------------------------------------------------
+    spawn:
+        OS_1_to_1:
+            openstack:
+                count: "=1"
+                cfg_name: wally_1024
+                network_zone_name: net04
+                flt_ip_pool: net04_ext
+                skip_preparation: true
+    test:
+        ceph_vdb:
+            - io:
+                load: ceph
+                params:
+                    FILENAME: /dev/vdb
+                    TEST_FILE_SIZE: AUTO
+        cinder_iscsi_vdb:
+            - io:
+                load: cinder_iscsi
+                params:
+                    FILENAME: /dev/vdb
+                    TEST_FILE_SIZE: AUTO
+        nova_io:
+            - io:
+                load: hdd
+                params:
+                    FILENAME: /dev/vdb
+                    TEST_FILE_SIZE: AUTO
+    openstack_ceph: OS_1_to_1 + ceph_vdb
+    openstack_cinder: OS_1_to_1 + ceph_iscsi_vdb
+    openstack_nova: OS_1_to_1 + nova_io
diff --git a/configs-examples/v2_user.yaml b/configs-examples/v2_user.yaml
new file mode 100644
index 0000000..32e5a6d
--- /dev/null
+++ b/configs-examples/v2_user.yaml
@@ -0,0 +1,24 @@
+include: v2_default.yaml
+discover: openstack,fuel_openrc_only
+run_sensors: true
+results_storage: /var/wally_results
+    ssh_creds: USER:PASSWD
+    openstack_env: ENV_NAME
+kubernetes: null
+lxd: null
+docker_swarm: null
+    OPENRC: /home/koder/workspace/scale_openrc
+    VM: ["ubuntu@wally-phytographic-sharla"]
+    test: ["some_testname"]
+test_profile: openstack_ceph
diff --git a/pylint.rc b/pylint.rc
new file mode 100644
index 0000000..16028d5
--- /dev/null
+++ b/pylint.rc
@@ -0,0 +1,407 @@
+# Specify a configuration file.
+# Python code to execute, usually for sys.path manipulation such as
+# pygtk.require().
+# Add files or directories to the blacklist. They should be base names, not
+# paths.
+# Add files or directories matching the regex patterns to the blacklist. The
+# regex matches against base names, not paths.
+# Pickle collected data for later comparisons.
+# List of plugins (as comma separated values of python modules names) to load,
+# usually to register additional checkers.
+# Use multiple processes to speed up Pylint.
+# Allow loading of arbitrary C extensions. Extensions are imported into the
+# active Python interpreter and may run arbitrary code.
+# A comma-separated list of package or module names from where C extensions may
+# be loaded. Extensions are loading into the active Python interpreter and may
+# run arbitrary code
+# Allow optimization of some AST trees. This will activate a peephole AST
+# optimizer, which will apply various small optimizations. For instance, it can
+# be used to obtain the result of joining multiple strings with the addition
+# operator. Joining a lot of strings can lead to a maximum recursion error in
+# Pylint and this flag can prevent that. It has one side effect, the resulting
+# AST will be different than the one from reality. This option is deprecated
+# and it will be removed in Pylint 2.0.
+# Only show warnings with the listed confidence levels. Leave empty to show
+# Enable the message, report, category or checker with the given id(s). You can
+# either give multiple identifier separated by comma (,) or put this option
+# multiple time (only on the command line, not in the configuration file where
+# it should appear only once). See also the "--disable" option for examples.
+# Disable the message, report, category or checker with the given id(s). You
+# can either give multiple identifiers separated by comma (,) or put this
+# option multiple times (only on the command line, not in the configuration
+# file where it should appear only once).You can also use "--disable=all" to
+# disable everything first and then reenable specific checks. For example, if
+# you want to run only the similarities checker, you can use "--disable=all
+# --enable=similarities". If you want to run only the classes checker, but have
+# no Warning level messages displayed, use"--disable=all --enable=classes
+# --disable=W"
+# Set the output format. Available formats are text, parseable, colorized, msvs
+# (visual studio) and html. You can also give a reporter class, eg
+# mypackage.mymodule.MyReporterClass.
+# Put messages in a separate file for each module / package specified on the
+# command line instead of printing them on stdout. Reports (if any) will be
+# written in a file name "pylint_global.[txt|html]". This option is deprecated
+# and it will be removed in Pylint 2.0.
+# Tells whether to display a full report or only the messages
+# Python expression which should return a note less than 10 (10 is the highest
+# note). You have access to the variables errors warning, statement which
+# respectively contain the number of errors / warnings messages and the total
+# number of statements analyzed. This is used by the global evaluation report
+# (RP0004).
+evaluation=10.0 - ((float(5 * error + warning + refactor + convention) / statement) * 10)
+# Template used to display messages. This is a python new-style format string
+# used to format the message information. See doc for all details
+# Good variable names which should always be accepted, separated by a comma
+# Bad variable names which should always be refused, separated by a comma
+# Colon-delimited sets of names that determine each other's naming style when
+# the name regexes allow several styles.
+# Include a hint for the correct naming format with invalid-name
+# List of decorators that produce properties, such as abc.abstractproperty. Add
+# to this list to register other decorators that produce valid properties.
+# Regular expression matching correct class attribute names
+# Naming hint for class attribute names
+# Regular expression matching correct inline iteration names
+# Naming hint for inline iteration names
+# Regular expression matching correct attribute names
+# Naming hint for attribute names
+# Regular expression matching correct class names
+# Naming hint for class names
+# Regular expression matching correct constant names
+# Naming hint for constant names
+# Regular expression matching correct variable names
+# Naming hint for variable names
+# Regular expression matching correct argument names
+# Naming hint for argument names
+# Regular expression matching correct module names
+# Naming hint for module names
+# Regular expression matching correct method names
+# Naming hint for method names
+# Regular expression matching correct function names
+# Naming hint for function names
+# Regular expression which should only match function or class names that do
+# not require a docstring.
+# Minimum line length for functions/classes that require docstrings, shorter
+# ones are exempt.
+# Maximum number of nested blocks for function / method body
+# Maximum number of characters on a single line.
+# Regexp for a line that is allowed to be longer than the limit.
+ignore-long-lines=^\s*(# )?<?https?://\S+>?$
+# Allow the body of an if to be on the same line as the test if there is no
+# else.
+# List of optional constructs for which whitespace checking is disabled. `dict-
+# separator` is used to allow tabulation in dicts, etc.: {1  : 1,\n222: 2}.
+# `trailing-comma` allows a space between comma and closing bracket: (a, ).
+# `empty-line` allows space-only lines.
+# Maximum number of lines in a module
+# String used as indentation unit. This is usually "    " (4 spaces) or "\t" (1
+# tab).
+indent-string='    '
+# Number of spaces of indent required inside a hanging  or continued line.
+# Expected format of line ending, e.g. empty (any line ending), LF or CRLF.
+# Logging modules to check that the string format arguments are in logging
+# function parameter format
+# List of note tags to take in consideration, separated by a comma.
+# Minimum lines number of a similarity.
+# Ignore comments when computing similarities.
+# Ignore docstrings when computing similarities.
+# Ignore imports when computing similarities.
+# Spelling dictionary name. Available dictionaries: none. To make it working
+# install python-enchant package.
+# List of comma separated words that should not be checked.
+# A path to a file that contains private dictionary; one word per line.
+# Tells whether to store unknown words to indicated private dictionary in
+# --spelling-private-dict-file option instead of raising a message.
+# Tells whether missing members accessed in mixin class should be ignored. A
+# mixin class is detected if its name ends with "mixin" (case insensitive).
+# List of module names for which member attributes should not be checked
+# (useful for modules/projects where namespaces are manipulated during runtime
+# and thus existing member attributes cannot be deduced by static analysis. It
+# supports qualified module names, as well as Unix pattern matching.
+# List of class names for which member attributes should not be checked (useful
+# for classes with dynamically set attributes). This supports the use of
+# qualified names.
+# List of members which are set dynamically and missed by pylint inference
+# system, and so shouldn't trigger E1101 when accessed. Python regular
+# expressions are accepted.
+# List of decorators that produce context managers, such as
+# contextlib.contextmanager. Add to this list to register other decorators that
+# produce valid context managers.
+# Tells whether we should check for unused import in __init__ files.
+# A regular expression matching the name of dummy variables (i.e. expectedly
+# not used).
+# List of additional names supposed to be defined in builtins. Remember that
+# you should avoid to define new builtins when possible.
+# List of strings which can identify a callback function by name. A callback
+# name must start or end with one of those strings.
+# List of qualified module names which can have objects that can redefine
+# builtins.
+# List of method names used to declare (i.e. assign) instance attributes.
+# List of valid names for the first argument in a class method.
+# List of valid names for the first argument in a metaclass class method.
+# List of member names, which should be excluded from the protected access
+# warning.
+# Maximum number of arguments for function / method
+# Argument names that match this expression will be ignored. Default to name
+# with leading underscore
+# Maximum number of locals for function / method body
+# Maximum number of return / yield for function / method body
+# Maximum number of branch for function / method body
+# Maximum number of statements in function / method body
+# Maximum number of parents for a class (see R0901).
+# Maximum number of attributes for a class (see R0902).
+# Minimum number of public methods for a class (see R0903).
+# Maximum number of public methods for a class (see R0904).
+# Maximum number of boolean expressions in a if statement
+# Deprecated modules which should not be used, separated by a comma
+# Create a graph of every (i.e. internal and external) dependencies in the
+# given file (report RP0402 must not be disabled)
+# Create a graph of external dependencies in the given file (report RP0402 must
+# not be disabled)
+# Create a graph of internal dependencies in the given file (report RP0402 must
+# not be disabled)
+# Force import order to recognize a module as part of the standard
+# compatibility libraries.
+# Force import order to recognize a module as part of a third party library.
+# Analyse import fallback blocks. This can be used to support both Python 2 and
+# 3 compatible code, which means that the block might have code that exists
+# only in one or another interpreter, leading to false positives when analysed.
+# Exceptions that will emit a warning when being caught. Defaults to
+# "Exception"
diff --git a/stubs/yaml.pyi b/stubs/yaml.pyi
index d8d6516..4f231f5 100644
--- a/stubs/yaml.pyi
+++ b/stubs/yaml.pyi
@@ -4,6 +4,11 @@
 Basic = Union[List, Dict[str, Any]]
-def load(stream: IO, loader: Any) -> Any: ...
+class Loader: ...
+class Dumper: ...
 class CLoader: ...
+class CDumper: ...
+# def load(stream: IO, loader: Any) -> Any: ...
+def load(data: bytes, Loader: Any = None, encoding: str = 'utf8') -> Any: ...
+def dump(data: str, Dumper: Any = None, encoding: str = 'utf8') -> bytes: ...
diff --git a/tests/ b/tests/
similarity index 83%
rename from tests/
rename to tests/
index f9fa389..b496190 100644
--- a/tests/
+++ b/tests/
@@ -5,7 +5,7 @@
 from oktest import ok, main, test
-from import agent
+from import fio_task_parser
 code_test_defaults = """
@@ -94,21 +94,18 @@
-P = agent.parse_all_in_1
 class AgentTest(unittest.TestCase):
     def test_parse_value(self):
         x = "asdfasd adsd d"
-        ok(agent.parse_value(x)) == x
-        ok(agent.parse_value("10 2")) == "10 2"
-        ok(agent.parse_value(None)).is_(None)
-        ok(agent.parse_value("10")) == 10
-        ok(agent.parse_value("20")) == 20
-        ok(agent.parse_value("10.1") - 10.1) < 1E-7
-        ok(agent.parse_value("{% 10, 20 %}")) == [10, 20]
-        ok(agent.parse_value("{% 10,20 %}")) == [10, 20]
+        ok(fio_task_parser.parse_value(x)) == x
+        ok(fio_task_parser.parse_value("10 2")) == "10 2"
+        ok(fio_task_parser.parse_value("None")).is_(None)
+        ok(fio_task_parser.parse_value("10")) == 10
+        ok(fio_task_parser.parse_value("20")) == 20
+        ok(fio_task_parser.parse_value("10.1") - 10.1) < 1E-7
+        ok(fio_task_parser.parse_value("{% 10, 20 %}")) == [10, 20]
+        ok(fio_task_parser.parse_value("{% 10,20 %}")) == [10, 20]
     code_test_compile_simplest = defaults + """
@@ -117,7 +114,7 @@
     def test_compile_simplest(self):
-        sections = P(self.code_test_compile_simplest, {})
+        sections = fio_task_parser.parse_all_in_1(self.code_test_compile_simplest, {})
         sections = list(sections)
         ok(len(sections)) == 1
@@ -138,7 +135,7 @@
     def test_compile_defaults(self):
-        sections = P(self.code_test_params_in_defaults, {})
+        sections = fio_task_parser.parse_all_in_1(self.code_test_params_in_defaults, {})
         sections = list(sections)
         ok(len(sections)) == 1
@@ -151,7 +148,7 @@
     def test_defaults(self):
-        sections = P(code_test_defaults, {})
+        sections = fio_task_parser.parse_all_in_1(code_test_defaults, {})
         sections = list(sections)
         ok(len(sections)) == 2
@@ -177,11 +174,10 @@
     def test_external_params(self):
         with self.assertRaises(KeyError):
-            sections = P(self.code_test_ext_params, {})
+            sections = fio_task_parser.parse_all_in_1(self.code_test_ext_params, {})
-        sections = P(self.code_test_ext_params,
-                     {'RUNTIME': 20})
+        sections = fio_task_parser.parse_all_in_1(self.code_test_ext_params, {'RUNTIME': 20})
         sections = list(sections)
     code_test_cycle = defaults + """
@@ -192,8 +188,7 @@
     def test_cycle(self):
-        sections = P(self.code_test_cycle,
-                     {'RUNTIME': 20})
+        sections = fio_task_parser.parse_all_in_1(self.code_test_cycle, {'RUNTIME': 20})
         sections = list(sections)
         ok(len(sections)) == 2
         ok(sections[0].vals['ramp_time']) == 20
@@ -208,8 +203,7 @@
     def test_cycles(self):
-        sections = P(self.code_test_cycles,
-                     {'RUNTIME': 20})
+        sections = fio_task_parser.parse_all_in_1(self.code_test_cycles, {'RUNTIME': 20})
         sections = list(sections)
         ok(len(sections)) == 4
@@ -224,13 +218,12 @@
     def test_time_estimate(self):
-        sections = P(self.code_test_cycles,
-                     {'RUNTIME': 20})
+        sections = fio_task_parser.parse_all_in_1(self.code_test_cycles, {'RUNTIME': 20})
         sections = list(sections)
-        etime = agent.calculate_execution_time(sections)
+        etime = fio_task_parser.calculate_execution_time(sections)
         ok(etime) == 20 * 4 + 20 * 2 + 40 * 2
-        ok(agent.sec_to_str(etime)) == "0:03:20"
+        ok(fio_task_parser.sec_to_str(etime)) == "0:03:20"
     code_test_cycles2 = defaults + """
 [sec1 * 7]
@@ -241,14 +234,13 @@
     def test_time_estimate_large(self):
-        sections = P(self.code_test_cycles2,
-                     {'RUNTIME': 30})
+        sections = fio_task_parser.parse_all_in_1(self.code_test_cycles2, {'RUNTIME': 30})
         sections = list(sections)
         ok(sections[0].name) == 'sec1'
         ok(len(sections)) == 7 * 4
-        etime = agent.calculate_execution_time(sections)
+        etime = fio_task_parser.calculate_execution_time(sections)
         # ramptime optimization
         expected_time = (20 + 30 + 30 * 6) * 2
         expected_time += (40 + 30 + 30 * 6) * 2
@@ -268,14 +260,14 @@
     def test_time_estimate_large2(self):
-        sections = P(self.code_test_cycles3, {'RUNTIME': 30})
+        sections = fio_task_parser.parse_all_in_1(self.code_test_cycles3, {'RUNTIME': 30})
         sections = list(sections)
         ok(sections[0].name) == 'sec1'
         ok(sections[1].name) == 'sec1'
         ok(len(sections)) == 7 * 4 * 2
-        etime = agent.calculate_execution_time(sections)
+        etime = fio_task_parser.calculate_execution_time(sections)
         # ramptime optimization
         expected_time = (20 + 30 + 30 * 6) * 2
         expected_time += (40 + 30 + 30 * 6) * 2
@@ -290,7 +282,7 @@
     def test_repeat(self):
-        sections = P(self.code_test_repeats, {})
+        sections = fio_task_parser.parse_all_in_1(self.code_test_repeats, {})
         sections = list(sections)
         ok(len(sections)) == 2 + 3
         ok(sections[0].name) == 'sec1'
@@ -301,7 +293,7 @@
     def test_real_tasks(self):
-        tasks_dir = os.path.dirname(agent.__file__)
+        tasks_dir = os.path.dirname(fio_task_parser.__file__)
         fname = os.path.join(tasks_dir, 'io_scenario_ceph.cfg')
         fc = open(fname).read()
@@ -310,7 +302,7 @@
         ok(len(sections)) == 7 * 9 * 4 + 7
-        etime = agent.calculate_execution_time(sections)
+        etime = fio_task_parser.calculate_execution_time(sections)
         # ramptime optimization
         expected_time = (60 * 7 + 30) * 9 * 4 + (60 * 7 + 30)
         ok(etime) == expected_time
diff --git a/tests/ b/tests/
new file mode 100644
index 0000000..efc5f09
--- /dev/null
+++ b/tests/
@@ -0,0 +1,33 @@
+import getpass
+from oktest import ok
+from wally import ssh_utils, ssh
+creds = "root@osd-0"
+def test_ssh_url_parser():
+    curr_user = getpass.getuser()
+    creds = {
+        "test": ssh_utils.ConnCreds("test", curr_user, port=23),
+        "test:13": ssh_utils.ConnCreds("test", curr_user, port=13),
+        "test::xxx.key": ssh_utils.ConnCreds("test", curr_user, port=23, key_file="xxx.key"),
+        "test:123:xxx.key": ssh_utils.ConnCreds("test", curr_user, port=123, key_file="xxx.key"),
+        "user@test": ssh_utils.ConnCreds("test", "user", port=23),
+        "user@test:13": ssh_utils.ConnCreds("test", "user", port=13),
+        "user@test::xxx.key": ssh_utils.ConnCreds("test", "user", port=23, key_file="xxx.key"),
+        "user@test:123:xxx.key": ssh_utils.ConnCreds("test", "user", port=123, key_file="xxx.key"),
+        "user:passwd:@test": ssh_utils.ConnCreds("test", curr_user, port=23, passwd="passwd:"),
+        "user:passwd:@test:123": ssh_utils.ConnCreds("test", curr_user, port=123, passwd="passwd:"),
+    }
+    for uri, expected in creds.items():
+        parsed = ssh_utils.parse_ssh_uri(uri)
+        ok(parsed.user) == expected.user
+        ok(parsed.addr.port) == expected.addr.port
+        ok( ==
+        ok(parsed.key_file) == expected.key_file
+        ok(parsed.passwd) == expected.passwd
diff --git a/tests/ b/tests/
index 46f38e6..8fd8f4d 100644
--- a/tests/
+++ b/tests/
@@ -1,9 +1,11 @@
+import array
 import shutil
 import tempfile
 import contextlib
-from oktest import ok, main, test
+import pytest
+from oktest import ok
 from import make_storage
@@ -35,19 +37,162 @@
             for path, val in values.items():
                 storage[path] = val
+        with make_storage(root, existing=True) as storage:
+            for path, val in values.items():
+                ok(storage[path]) == val
+                ok(storage.get(path)) == val
+def test_path_list():
+    values = {
+        "int": 1,
+        "str/1": "test",
+        "bytes/2": b"test",
+        "none/s/1": None,
+        "bool/xx/1/2/1": None,
+        "float/s/1": 1.234,
+        "list": [1, 2, "3"],
+        "dict": {1: 3, "2": "4", "1.2": 1.3}
+    }
+    with in_temp_dir() as root:
+        with make_storage(root, existing=False) as storage:
+            for path, val in values.items():
+                storage[path.split('/')] = val
         with make_storage(root, existing=True) as storage:
             for path, val in values.items():
-                ok(storage[path])  == val
+                ok(storage[path.split('/')]) == val
+                ok(storage.get(path.split('/'))) == val
+    with in_temp_dir() as root:
+        with make_storage(root, existing=False) as storage:
+            for path, val in values.items():
+                storage[path] = val
+        with make_storage(root, existing=True) as storage:
+            for path, val in values.items():
+                ok(storage[path.split('/')]) == val
+                ok(storage.get(path.split('/'))) == val
-def test_large_arrays():
-    pass
+def test_list():
+    with in_temp_dir() as root:
+        with make_storage(root, existing=False) as storage:
+            storage["x/some_path1"] = "1"
+            storage["x/some_path2"] = [1, 2, 3]
+            storage["x/some_path3"] = [1, 2, 3, 4]
+            storage["x/y/some_path11"] = "1"
+            storage["x/y/some_path22"] = [1, 2, 3]
+        with make_storage(root, existing=True) as storage:
+            assert 'x' in storage
+            assert 'x/y' in storage
+            assert {(False, 'x')} == set(storage.list())
+            assert {(True, 'some_path1'),
+                    (True, 'some_path2'),
+                    (True, 'some_path3'),
+                    (False, "y")} == set(storage.list("x"))
+            assert {(True, 'some_path11'), (True, 'some_path22')} == set(storage.list("x/y"))
-def test_array_append():
-    pass
+def test_overwrite():
+    with in_temp_dir() as root:
+        with make_storage(root, existing=False) as storage:
+            storage["some_path"] = "1"
+            storage["some_path"] = [1, 2, 3]
+        with make_storage(root, existing=True) as storage:
+            assert storage["some_path"] == [1, 2, 3]
-def test_performance():
-    pass
+def test_multy_level():
+    with in_temp_dir() as root:
+        values = {
+            "dict1": {1: {3: 4, 6: [12, {123, 3}, {4: 3}]}, "2": "4", "1.2": 1.3}
+        }
+        with make_storage(root, existing=False) as storage:
+            for path, val in values.items():
+                storage[path] = val
+        with make_storage(root, existing=True) as storage:
+            for path, val in values.items():
+                ok(storage[path]) == val
+def test_arrays():
+    with in_temp_dir() as root:
+        val_l = list(range(10000)) * 10
+        val_i = array.array("i", val_l)
+        val_f = array.array("f", map(float, val_l))
+        val_2f = val_f + val_f
+        with make_storage(root, existing=False) as storage:
+            storage.set_array(val_i, "array_i")
+            storage.set_array(val_f, "array_f")
+            storage.set_array(val_f, "array_x2")
+            storage.append(val_f, "array_x2")
+        with make_storage(root, existing=True) as storage:
+            ok(val_i) == storage.get_array("i", "array_i")
+            ok(val_f) == storage.get_array("f", "array_f")
+            ok(val_2f) == storage.get_array("f", "array_x2")
+class LoadMe:
+    def __init__(self, **vals):
+        self.__dict__.update(vals)
+def test_load_user_obj():
+    obj = LoadMe(x=1, y=12, z=[1,2,3], t="asdad", gg={"a": 1, "g": [["x"]]})
+    with in_temp_dir() as root:
+        with make_storage(root, existing=False) as storage:
+            storage["obj"] = obj
+        with make_storage(root, existing=True) as storage:
+            obj2 = storage.load(LoadMe, "obj")
+            assert isinstance(obj2, LoadMe)
+            ok(obj2.__dict__) == obj.__dict__
+def test_path_not_exists():
+    with in_temp_dir() as root:
+        pass
+    with pytest.raises(IOError):
+        with make_storage(root, existing=True) as storage:
+            pass
+    with in_temp_dir() as root:
+        pass
+    with make_storage(root, existing=False) as storage:
+        with pytest.raises(IOError):
+            storage["x"]
+def test_incorrect_user_object():
+    obj = LoadMe(x=1, y=LoadMe(t=12))
+    with in_temp_dir() as root:
+        with make_storage(root, existing=False) as storage:
+            with pytest.raises(ValueError):
+                storage["obj"] = obj
+def test_substorage():
+    with in_temp_dir() as root:
+        with make_storage(root, existing=False) as storage:
+            storage["x/y"] = "data"
+            storage.sub_storage("t")["r"] = "sub_data"
+        with make_storage(root, existing=True) as storage:
+            ok(storage["t/r"]) == "sub_data"
+            ok(storage.sub_storage("x")["y"]) == "data"
diff --git a/ b/
index 95ffc92..edce540 100644
--- a/
+++ b/
@@ -10,16 +10,23 @@
     * Simplify settings
     * Unit-tests
     * 'perf' sensor
-    * ftrace,, etc
-    * Config validation
+    * ftrace, [bcc](, etc
+    * Config revised:
+        * Full config is a set of independent sections, each related to one plugin or 'step'
+        * Simple user config get compiled into "full" config with variable substitution
+        * Result config then validated
+        * Each plugin defines config sections tructure and validation
     * Add sync 4k write with small set of thcount
+    * White-box event logs for UT
+    * Result-to-yaml for UT
 * Infra:
     * Add script to download fio from git and build it
     * Docker/lxd public container as default distribution way
+    * Update to provide CLI entry points
 * Statistical result check and report:
-    * Comprehensive report with results histograms and other
+    * Comprehensive report with results histograms and other, [Q-Q plot](
     * Check results distribution
     * Warn for non-normal results
     * Check that distribution of different parts is close. Average
@@ -31,9 +38,9 @@
       depending on selected visualization type
     * Offload simple report table to cvs/yaml/json/test/ascii_table
     * fio load reporters (visualizers), ceph report tool
+        [ceph-viz-histo](
     * evaluate bokeh for visualization
-    * flamegraph for 'perf' output -
+    * [flamegraph]( for 'perf' output
     * detect internal pattern:
         - FFT
@@ -50,7 +57,23 @@
-* Intelectual postprocessing:
+* Report structure
+    * Overall report
+    * Extended engineering report
+    * Cluster information
+    * Loads. For each load:
+        - IOPS distribution, stat analisys
+        - LAT heatmap/histo, stat analisys
+        - Bottleneck analisys
+    * Changes for load groups - show how IOPS/LAT histo is chages with thread count
+    * Report help page, link for explanations
+* Report pictures:
+    * checkboxes for show/hide part of image
+    * pop-up help for part of picture
+    * pop-up text values for bars/lines
+* Intellectual postprocessing:
     * Difference calculation
     * Resource usage calculator/visualizer, bottleneck hunter
     * correct comparison between different systems
@@ -62,3 +85,4 @@
     * Add aio rpc client
     * Add integration tests with nbd
     * fix existing folder detection
+    * Simple REST API for external in-browser UI
\ No newline at end of file
diff --git a/wally/ b/wally/
new file mode 100644
index 0000000..e23343e
--- /dev/null
+++ b/wally/
@@ -0,0 +1,112 @@
+""" Collect data about ceph nodes"""
+import json
+import logging
+from typing import Set, Dict, cast
+from .node_interfaces import NodeInfo, IRPCNode
+from .ssh_utils import ConnCreds
+from .common_types import IP
+from .stage import Stage, StepOrder
+from .test_run_class import TestRun
+from .ssh_utils import parse_ssh_uri
+from .node import connect, setup_rpc
+logger = logging.getLogger("")
+def get_osds_ips(node: IRPCNode, conf: str, key: str) -> Set[IP]:
+    """Get set of osd's ip"""
+    data ="ceph -c {} -k {} --format json osd dump".format(conf, key))
+    jdata = json.loads(data)
+    ips = set()  # type: Set[IP]
+    first_error = True
+    for osd_data in jdata["osds"]:
+        if "public_addr" not in osd_data:
+            if first_error:
+                osd_id = osd_data.get("osd", "<OSD_ID_MISSED>")
+                logger.warning("No 'public_addr' field in 'ceph osd dump' output for osd %s" +
+                               "(all subsequent errors omitted)", osd_id)
+                first_error = False
+        else:
+            ip_port = osd_data["public_addr"]
+            if '/' in ip_port:
+                ip_port = ip_port.split("/", 1)[0]
+            ips.add(IP(ip_port.split(":")[0]))
+    return ips
+def get_mons_ips(node: IRPCNode, conf: str, key: str) -> Set[IP]:
+    """Return mon ip set"""
+    data ="ceph -c {} -k {} --format json mon_status".format(conf, key))
+    jdata = json.loads(data)
+    ips = set()  # type: Set[IP]
+    first_error = True
+    for mon_data in jdata["monmap"]["mons"]:
+        if "addr" not in mon_data:
+            if first_error:
+                mon_name = mon_data.get("name", "<MON_NAME_MISSED>")
+                logger.warning("No 'addr' field in 'ceph mon_status' output for mon %s" +
+                               "(all subsequent errors omitted)", mon_name)
+                first_error = False
+        else:
+            ip_port = mon_data["addr"]
+            if '/' in ip_port:
+                ip_port = ip_port.split("/", 1)[0]
+            ips.add(IP(ip_port.split(":")[0]))
+    return ips
+class DiscoverCephStage(Stage):
+    config_block = 'ceph'
+    priority = StepOrder.DISCOVER
+    def run(self, ctx: TestRun) -> None:
+        """Return list of ceph's nodes NodeInfo"""
+        if 'ceph_nodes' in
+            ctx.nodes_info.extend(, 'ceph_nodes'))
+        else:
+            ceph = ctx.config.ceph
+            root_node_uri = cast(str, ceph.root_node)
+            cluster = ceph.get("cluster", "ceph")
+            conf = ceph.get("conf")
+            key = ceph.get("key")
+            info = NodeInfo(parse_ssh_uri(root_node_uri), set())
+            ceph_nodes = {}  # type: Dict[IP, NodeInfo]
+            if conf is None:
+                conf = "/etc/ceph/{}.conf".format(cluster)
+            if key is None:
+                key = "/etc/ceph/{}.client.admin.keyring".format(cluster)
+            with setup_rpc(connect(info), ctx.rpc_code) as node:
+                # new_nodes.extend(ceph.discover_ceph_nodes(ceph_root_conn, cluster=cluster, conf=conf, key=key))
+                ssh_key = node.get_file_content("~/.ssh/id_rsa")
+                try:
+                    for ip in get_osds_ips(node, conf, key):
+                        if ip in ceph_nodes:
+                            ceph_nodes[ip].roles.add("ceph-osd")
+                        else:
+                            ceph_nodes[ip] = NodeInfo(ConnCreds(cast(str, ip), user="root", key=ssh_key), {"ceph-osd"})
+                except Exception as exc:
+                    logger.error("OSD discovery failed: %s", exc)
+                try:
+                    for ip in get_mons_ips(node, conf, key):
+                        if ip in ceph_nodes:
+                            ceph_nodes[ip].roles.add("ceph-mon")
+                        else:
+                            ceph_nodes[ip] = NodeInfo(ConnCreds(cast(str, ip), user="root", key=ssh_key), {"ceph-mon"})
+                except Exception as exc:
+                    logger.error("MON discovery failed: %s", exc)
+            ctx.nodes_info.extend(ceph_nodes.values())
+  ['ceph-nodes'] = list(ceph_nodes.values())
diff --git a/wally/ b/wally/
index cac4acf..e96a03e 100644
--- a/wally/
+++ b/wally/
@@ -10,13 +10,14 @@
     storage_url = None  # type: str
     comment = None  # type: str
     keep_vm = None  # type: bool
-    no_tests = None  # type: bool
     dont_discover_nodes = None  # type: bool
     build_id = None  # type: str
     build_description = None  # type: str
     build_type = None  # type: str
     default_test_local_folder = None  # type: str
     settings_dir = None  # type: str
+    connect_timeout = 30  # type: int
+    no_tests = False  # type: bool
     def __init__(self, dct: ConfigBlock) -> None:
         self.__dict__['_dct'] = dct
diff --git a/wally/discover/ b/wally/discover/
deleted file mode 100644
index 4a72bfb..0000000
--- a/wally/discover/
+++ /dev/null
@@ -1,91 +0,0 @@
-""" Collect data about ceph nodes"""
-import json
-import logging
-from typing import List, Set, Dict
-from ..node_interfaces import NodeInfo, IRPCNode
-from ..ssh_utils import ConnCreds
-from ..common_types import IP
-logger = logging.getLogger("")
-def discover_ceph_nodes(node: IRPCNode,
-                        cluster: str = "ceph",
-                        conf: str = None,
-                        key: str = None) -> List[NodeInfo]:
-    """Return list of ceph's nodes NodeInfo"""
-    if conf is None:
-        conf = "/etc/ceph/{}.conf".format(cluster)
-    if key is None:
-        key = "/etc/ceph/{}.client.admin.keyring".format(cluster)
-    try:
-        osd_ips = get_osds_ips(node, conf, key)
-    except Exception as exc:
-        logger.error("OSD discovery failed: %s", exc)
-        osd_ips = set()
-    try:
-        mon_ips = get_mons_ips(node, conf, key)
-    except Exception as exc:
-        logger.error("MON discovery failed: %s", exc)
-        mon_ips = set()
-    ips = {}  # type: Dict[str, List[str]]
-    for ip in osd_ips:
-        ips.setdefault(ip, []).append("ceph-osd")
-    for ip in mon_ips:
-        ips.setdefault(ip, []).append("ceph-mon")
-    ssh_key = node.get_file_content("~/.ssh/id_rsa")
-    return [NodeInfo(ConnCreds(host=ip, user="root", key=ssh_key), set(roles)) for ip, roles in ips.items()]
-def get_osds_ips(node: IRPCNode, conf: str, key: str) -> Set[IP]:
-    """Get set of osd's ip"""
-    data ="ceph -c {} -k {} --format json osd dump".format(conf, key))
-    jdata = json.loads(data)
-    ips = set()  # type: Set[IP]
-    first_error = True
-    for osd_data in jdata["osds"]:
-        if "public_addr" not in osd_data:
-            if first_error:
-                osd_id = osd_data.get("osd", "<OSD_ID_MISSED>")
-                logger.warning("No 'public_addr' field in 'ceph osd dump' output for osd %s" +
-                               "(all subsequent errors omitted)", osd_id)
-                first_error = False
-        else:
-            ip_port = osd_data["public_addr"]
-            if '/' in ip_port:
-                ip_port = ip_port.split("/", 1)[0]
-            ips.add(IP(ip_port.split(":")[0]))
-    return ips
-def get_mons_ips(node: IRPCNode, conf: str, key: str) -> Set[IP]:
-    """Return mon ip set"""
-    data ="ceph -c {} -k {} --format json mon_status".format(conf, key))
-    jdata = json.loads(data)
-    ips = set()  # type: Set[IP]
-    first_error = True
-    for mon_data in jdata["monmap"]["mons"]:
-        if "addr" not in mon_data:
-            if first_error:
-                mon_name = mon_data.get("name", "<MON_NAME_MISSED>")
-                logger.warning("No 'addr' field in 'ceph mon_status' output for mon %s" +
-                               "(all subsequent errors omitted)", mon_name)
-                first_error = False
-        else:
-            ip_port = mon_data["addr"]
-            if '/' in ip_port:
-                ip_port = ip_port.split("/", 1)[0]
-            ips.add(IP(ip_port.split(":")[0]))
-    return ips
diff --git a/wally/discover/ b/wally/discover/
deleted file mode 100644
index 35ddc62..0000000
--- a/wally/discover/
+++ /dev/null
@@ -1,108 +0,0 @@
-import os.path
-import logging
-from typing import Dict, NamedTuple, List, Optional, cast
-from paramiko.ssh_exception import AuthenticationException
-from . import ceph
-from . import fuel
-from . import openstack
-from ..utils import parse_creds, StopTestError
-from ..config import ConfigBlock
-from ..start_vms import OSCreds
-from ..node_interfaces import NodeInfo
-from ..node import connect, setup_rpc
-from ..ssh_utils import parse_ssh_uri
-from ..test_run_class import TestRun
-logger = logging.getLogger("")
-openrc_templ = """#!/bin/sh
-export LC_ALL=C
-export OS_NO_CACHE='true'
-export OS_TENANT_NAME='{tenant}'
-export OS_USERNAME='{name}'
-export OS_PASSWORD='{passwd}'
-export OS_AUTH_URL='{auth_url}'
-export OS_INSECURE={insecure}
-export OS_AUTH_STRATEGY='keystone'
-export OS_REGION_NAME='RegionOne'
-export NOVA_ENDPOINT_TYPE='publicURL'
-DiscoveryResult = NamedTuple("DiscoveryResult", [("os_creds", Optional[OSCreds]), ("nodes", List[NodeInfo])])
-def discover(ctx: TestRun,
-             discover_list: List[str],
-             clusters_info: ConfigBlock,
-             discover_nodes: bool = True) -> DiscoveryResult:
-    """Discover nodes in clusters"""
-    new_nodes = []  # type: List[NodeInfo]
-    os_creds = None  # type: Optional[OSCreds]
-    for cluster in discover_list:
-        if cluster == "openstack":
-            if not discover_nodes:
-                logger.warning("Skip openstack cluster discovery")
-                continue
-            cluster_info = clusters_info["openstack"]  # type: ConfigBlock
-            new_nodes.extend(openstack.discover_openstack_nodes(ctx.os_connection, cluster_info))
-        elif cluster == "fuel" or cluster == "fuel_openrc_only":
-            if cluster == "fuel_openrc_only":
-                discover_nodes = False
-            fuel_node_info = NodeInfo(parse_ssh_uri(clusters_info['fuel']['ssh_creds']), {'fuel_master'})
-            try:
-                fuel_rpc_conn = setup_rpc(connect(fuel_node_info), ctx.rpc_code)
-            except AuthenticationException:
-                raise StopTestError("Wrong fuel credentials")
-            except Exception:
-                logger.exception("While connection to FUEL")
-                raise StopTestError("Failed to connect to FUEL")
-            # TODO(koder): keep FUEL rpc in context? Maybe open this connection on upper stack level?
-            with fuel_rpc_conn:
-                nodes, fuel_info = fuel.discover_fuel_nodes(
-                    fuel_rpc_conn, ctx.fuel_conn, clusters_info['fuel'], discover_nodes)
-                new_nodes.extend(nodes)
-                if fuel_info.openrc:
-                    auth_url = cast(str, fuel_info.openrc['os_auth_url'])
-                    if fuel_info.version >= [8, 0] and auth_url.startswith("https://"):
-                            logger.warning("Fixing FUEL 8.0 AUTH url - replace https://->http://")
-                            auth_url = auth_url.replace("https", "http", 1)
-                    os_creds = OSCreds(name=cast(str, fuel_info.openrc['username']),
-                                       passwd=cast(str, fuel_info.openrc['password']),
-                                       tenant=cast(str, fuel_info.openrc['tenant_name']),
-                                       auth_url=cast(str, auth_url),
-                                       insecure=cast(bool, fuel_info.openrc['insecure']))
-        elif cluster == "ceph":
-            if discover_nodes:
-                cluster_info = clusters_info["ceph"]
-                root_node_uri = cast(str, cluster_info["root_node"])
-                cluster = clusters_info["ceph"].get("cluster", "ceph")
-                conf = clusters_info["ceph"].get("conf")
-                key = clusters_info["ceph"].get("key")
-                info = NodeInfo(parse_ssh_uri(root_node_uri), set())
-                with setup_rpc(connect(info), ctx.rpc_code) as ceph_root_conn:
-                    new_nodes.extend(ceph.discover_ceph_nodes(ceph_root_conn, cluster=cluster, conf=conf, key=key))
-            else:
-                logger.warning("Skip ceph cluster discovery")
-        else:
-            msg_templ = "Unknown cluster type in 'discover' parameter: {!r}"
-            raise ValueError(msg_templ.format(cluster))
-    return DiscoveryResult(os_creds, new_nodes)
diff --git a/wally/discover/ b/wally/discover/
deleted file mode 100644
index 7b4f90f..0000000
--- a/wally/discover/
+++ /dev/null
@@ -1,58 +0,0 @@
-import logging
-import socket
-from typing import Dict, Any, Tuple, List, NamedTuple, Union, cast
-from urllib.parse import urlparse
-from .. import fuel_rest_api
-from ..node_interfaces import NodeInfo, IRPCNode
-from ..ssh_utils import ConnCreds
-from ..utils import check_input_param
-logger = logging.getLogger("")
-FuelNodeInfo = NamedTuple("FuelNodeInfo",
-                          [("version", List[int]),
-                           ("fuel_ext_iface", str),
-                           ("openrc", Dict[str, Union[str, bool]])])
-def discover_fuel_nodes(fuel_master_node: IRPCNode,
-                        fuel_conn: fuel_rest_api.Connection,
-                        fuel_data: Dict[str, Any],
-                        discover_nodes: bool = True) -> Tuple[List[NodeInfo], FuelNodeInfo]:
-    """Discover nodes in fuel cluster, get openrc for selected cluster"""
-    msg = "openstack_env should be provided in fuel config"
-    check_input_param('openstack_env' in fuel_data, msg)
-    # get cluster information from REST API
-    cluster_id = fuel_rest_api.get_cluster_id(fuel_conn, fuel_data['openstack_env'])
-    cluster = fuel_rest_api.reflect_cluster(fuel_conn, cluster_id)
-    version = fuel_rest_api.FuelInfo(fuel_conn).get_version()
-    if not discover_nodes:
-        logger.warning("Skip fuel cluster discovery")
-        return [], FuelNodeInfo(version, None, cluster.get_openrc())  # type: ignore
-"Found fuel {0}".format(".".join(map(str, version))))
-    # get FUEL master key to connect to cluster nodes via ssh
-    logger.debug("Downloading fuel master key")
-    fuel_key = fuel_master_node.get_file_content('/root/.ssh/id_rsa')
-    network = 'fuelweb_admin' if version >= [6, 0] else 'admin'
-    fuel_ip = socket.gethostbyname(
-    fuel_ext_iface = fuel_master_node.get_interface(fuel_ip)
-    nodes = []
-    for fuel_node in list(cluster.get_nodes()):
-        ip = str(fuel_node.get_ip(network))
-        creds = ConnCreds(ip, "root", key=fuel_key)
-        nodes.append(NodeInfo(creds, roles=set(fuel_node.get_roles())))
-    logger.debug("Found {} fuel nodes for env {}".format(len(nodes), fuel_data['openstack_env']))
-    return nodes, FuelNodeInfo(version, fuel_ext_iface,
-                               cast(Dict[str, Union[str, bool]], cluster.get_openrc()))
diff --git a/wally/discover/ b/wally/discover/
deleted file mode 100644
index f590359..0000000
--- a/wally/discover/
+++ /dev/null
@@ -1,66 +0,0 @@
-import socket
-import logging
-from typing import Dict, Any, List, Optional, cast
-from ..node_interfaces import NodeInfo
-from ..config import ConfigBlock
-from ..ssh_utils import ConnCreds
-from ..start_vms import OSConnection, NovaClient
-logger = logging.getLogger("")
-def get_floating_ip(vm: Any) -> str:
-    """Get VM floating IP address"""
-    for net_name, ifaces in vm.addresses.items():
-        for iface in ifaces:
-            if iface.get('OS-EXT-IPS:type') == "floating":
-                return iface['addr']
-    raise ValueError("VM {} has no floating ip".format(vm))
-def discover_vms(client: NovaClient, search_data: str) -> List[NodeInfo]:
-    """Discover virtual machines"""
-    name, user, key_file = search_data.split(",")
-    servers = client.servers.list(search_opts={"name": name})
-    logger.debug("Found %s openstack vms" % len(servers))
-    nodes = []  # type: List[NodeInfo]
-    for server in servers:
-        ip = get_floating_ip(server)
-        creds = ConnCreds(host=ip, user=user, key_file=key_file)
-        nodes.append(NodeInfo(creds, roles={"test_vm"}))
-    return nodes
-def discover_openstack_nodes(conn: OSConnection, conf: ConfigBlock) -> List[NodeInfo]:
-    """Discover openstack services for given cluster"""
-    os_nodes_auth = conf['auth']  # type: str
-    if os_nodes_auth.count(":") == 2:
-        user, password, key_file = os_nodes_auth.split(":")  # type: str, Optional[str], Optional[str]
-        if not password:
-            password = None
-    else:
-        user, password = os_nodes_auth.split(":")
-        key_file = None
-    services =  # type: List[Any]
-    host_services_mapping = {}  # type: Dict[str, List[str]]
-    for service in services:
-        ip = cast(str, socket.gethostbyname(
-        host_services_mapping.get(ip, []).append(service.binary)
-    logger.debug("Found %s openstack service nodes" % len(host_services_mapping))
-    nodes = []  # type: List[NodeInfo]
-    for host, services in host_services_mapping.items():
-        creds = ConnCreds(host=host, user=user, passwd=password, key_file=key_file)
-        nodes.append(NodeInfo(creds, set(services)))
-    return nodes
diff --git a/wally/ b/wally/
new file mode 100644
index 0000000..040dcf4
--- /dev/null
+++ b/wally/
@@ -0,0 +1,105 @@
+import logging
+from typing import Dict, List, NamedTuple, Union, cast
+from paramiko.ssh_exception import AuthenticationException
+from .fuel_rest_api import get_cluster_id, reflect_cluster, FuelInfo, KeystoneAuth
+from .node_interfaces import NodeInfo
+from .ssh_utils import ConnCreds, parse_ssh_uri
+from .utils import check_input_param, StopTestError, parse_creds
+from .stage import Stage, StepOrder
+from .test_run_class import TestRun
+from .node import connect, setup_rpc
+from .config import ConfigBlock
+from .openstack_api import OSCreds
+logger = logging.getLogger("")
+FuelNodeInfo = NamedTuple("FuelNodeInfo",
+                          [("version", List[int]),
+                           ("fuel_ext_iface", str),
+                           ("openrc", Dict[str, Union[str, bool]])])
+class DiscoverFuelStage(Stage):
+    """"Fuel nodes discovery, also can get openstack openrc"""
+    priority = StepOrder.DISCOVER
+    config_block = 'fuel'
+    @classmethod
+    def validate(cls, cfg: ConfigBlock) -> None:
+        # msg = "openstack_env should be provided in fuel config"
+        # check_input_param('openstack_env' in fuel_data, msg)
+        # fuel.openstack_env
+        pass
+    def run(self, ctx: TestRun) -> None:
+        if 'fuel' in
+            ctx.nodes_info.extend(, 'fuel/nodes'))
+            ctx.fuel_openstack_creds =['fuel/os_creds']  # type: ignore
+            ctx.fuel_version =['fuel/version']  # type: ignore
+        else:
+            fuel = ctx.config.fuel
+            discover_nodes = ( != "fuel_openrc_only")
+            fuel_node_info = NodeInfo(parse_ssh_uri(fuel.ssh_creds), {'fuel_master'})
+            fuel_nodes = [fuel_node_info]
+            creds = dict(zip(("user", "passwd", "tenant"), parse_creds(fuel.creds)))
+            fuel_conn = KeystoneAuth(fuel.url, creds)
+            # get cluster information from REST API
+            cluster_id = get_cluster_id(fuel_conn, fuel.openstack_env)
+            cluster = reflect_cluster(fuel_conn, cluster_id)
+            ctx.fuel_version = FuelInfo(fuel_conn).get_version()
+  "Found fuel {0}".format(".".join(map(str, ctx.fuel_version))))
+            openrc = cluster.get_openrc()
+            if openrc:
+                auth_url = cast(str, openrc['os_auth_url'])
+                if ctx.fuel_version >= [8, 0] and auth_url.startswith("https://"):
+                    logger.warning("Fixing FUEL 8.0 AUTH url - replace https://->http://")
+                    auth_url = auth_url.replace("https", "http", 1)
+                os_creds = OSCreds(name=cast(str, openrc['username']),
+                                   passwd=cast(str, openrc['password']),
+                                   tenant=cast(str, openrc['tenant_name']),
+                                   auth_url=cast(str, auth_url),
+                                   insecure=cast(bool, openrc['insecure']))
+                ctx.fuel_openstack_creds = os_creds
+            else:
+                ctx.fuel_openstack_creds = None
+            if discover_nodes:
+                try:
+                    fuel_rpc = setup_rpc(connect(fuel_node_info), ctx.rpc_code)
+                except AuthenticationException:
+                    raise StopTestError("Wrong fuel credentials")
+                except Exception:
+                    logger.exception("While connection to FUEL")
+                    raise StopTestError("Failed to connect to FUEL")
+                logger.debug("Downloading FUEL node ssh master key")
+                fuel_key = fuel_rpc.get_file_content('/root/.ssh/id_rsa')
+                network = 'fuelweb_admin' if ctx.fuel_version >= [6, 0] else 'admin'
+                for fuel_node in list(cluster.get_nodes()):
+                    ip = str(fuel_node.get_ip(network))
+                    fuel_nodes.append(NodeInfo(ConnCreds(ip, "root", key=fuel_key),
+                                               roles=set(fuel_node.get_roles())))
+      ['fuel_nodes'] = fuel_nodes
+                ctx.nodes_info.extend(fuel_nodes)
+                ctx.nodes_info.append(fuel_node_info)
+                logger.debug("Found {} FUEL nodes for env {}".format(len(fuel_nodes) - 1, fuel.openstack_env))
+            else:
+                logger.debug("Skip FUEL nodes  discovery, as 'fuel_openrc_only' is set to option")
+  ["fuel/nodes"] = fuel_nodes
+  ["fuel/os_creds"] = ctx.fuel_openstack_creds
+  ["fuel/version"] = ctx.fuel_version
diff --git a/wally/ b/wally/
index 3e1fcb3..14da140 100644
--- a/wally/
+++ b/wally/
@@ -5,7 +5,8 @@
 import logging
 import argparse
 import functools
-from typing import List, Tuple, Any, Callable, IO, cast, Optional
+import contextlib
+from typing import List, Tuple, Any, Callable, IO, cast, Optional, Iterator
 from yaml import load as _yaml_load
@@ -32,13 +33,33 @@
 from .storage import make_storage, Storage
 from .config import Config
 from .logger import setup_loggers
-from .stage import log_stage, StageType
+from .stage import Stage
 from .test_run_class import TestRun
+# stages
+from .ceph import DiscoverCephStage
+from .openstack import DiscoverOSStage
+from .fuel import DiscoverFuelStage
+from .run_test import CollectInfoStage, ExplicitNodesStage, SaveNodesStage, RunTestsStage
+from .report import ConsoleReportStage, HtmlReportStage
+from .sensors import StartSensorsStage, CollectSensorsStage
 logger = logging.getLogger("wally")
+def log_stage(stage: Stage) -> Iterator[None]:
+"Start " +
+    try:
+        yield
+    except utils.StopTestError as exc:
+        logger.error("Exception during %s: %r",, exc)
+    except Exception:
+        logger.exception("During %s",
 def list_results(path: str) -> List[Tuple[str, str, str, str]]:
     results = []  # type: List[Tuple[float, str, str, str, str]]
@@ -97,6 +118,7 @@
     test_parser.add_argument('--build-description', type=str, default="Build info")
     test_parser.add_argument('--build-id', type=str, default="id")
     test_parser.add_argument('--build-type', type=str, default="GA")
+    test_parser.add_argument('--dont-collect', action='store_true', help="Don't collect cluster info")
     test_parser.add_argument('-n', '--no-tests', action='store_true', help="Don't run tests")
     test_parser.add_argument('--load-report', action='store_true')
     test_parser.add_argument("-k", '--keep-vm', action='store_true', help="Don't remove test vm's")
@@ -131,8 +153,7 @@
     opts = parse_args(argv)
-    stages = []  # type: List[StageType]
-    report_stages = []  # type: List[StageType]
+    stages = []  # type: List[Stage]
     # stop mypy from telling that config & storage might be undeclared
     config = None  # type: Config
@@ -141,7 +162,7 @@
     if opts.subparser_name == 'test':
         if opts.resume:
             storage = make_storage(opts.resume, existing=True)
-            config = storage.load('config', Config)
+            config = storage.load(Config, 'config')
             file_name = os.path.abspath(opts.config_file)
             with open(file_name) as fd:
@@ -161,20 +182,18 @@
             storage['config'] = config  # type: ignore
-        stages.extend([
-            run_test.clouds_connect_stage,
-            run_test.discover_stage,
-            run_test.reuse_vms_stage,
-            log_nodes_statistic_stage,
-            run_test.save_nodes_stage,
-            run_test.connect_stage])
-        if config.get("collect_info", True):
-            stages.append(run_test.collect_info_stage)
+        stages.append(DiscoverCephStage)  # type: ignore
+        stages.append(DiscoverOSStage)  # type: ignore
+        stages.append(DiscoverFuelStage)  # type: ignore
+        stages.append(ExplicitNodesStage)  # type: ignore
+        stages.append(SaveNodesStage)  # type: ignore
+        stages.append(StartSensorsStage)  # type: ignore
+        stages.append(RunTestsStage)  # type: ignore
+        stages.append(CollectSensorsStage)  # type: ignore
-        stages.extend([
-            run_test.run_tests_stage,
-        ])
+        if not opts.dont_collect:
+            stages.append(CollectInfoStage)   # type: ignore
     elif opts.subparser_name == 'ls':
         tab = texttable.Texttable(max_width=200)
@@ -196,9 +215,10 @@
         #     [x['io'][0], y['io'][0]]))
         return 0
+    report_stages = []  # type: List[Stage]
     if not getattr(opts, "no_report", False):
-        report_stages.append(run_test.console_report_stage)
-        report_stages.append(run_test.html_report_stage)
+        report_stages.append(ConsoleReportStage)   # type: ignore
+        report_stages.append(HtmlReportStage)   # type: ignore
     # log level is not a part of config
     if opts.log_level is not None:
@@ -206,39 +226,44 @@
         str_level = config.get('logging/log_level', 'INFO')
-    setup_loggers(getattr(logging, str_level), log_fd=storage.get_stream('log'))
+    setup_loggers(getattr(logging, str_level), log_fd=storage.get_stream('log', "w"))"All info would be stored into %r", config.storage_url)
     ctx = TestRun(config, storage)
+    stages.sort(key=lambda x: x.priority)
+    # TODO: run only stages, which have configs
+    failed = False
+    cleanup_stages = []
     for stage in stages:
-        ok = False
-        with log_stage(stage):
-            stage(ctx)
-            ok = True
-        if not ok:
+        try:
+            cleanup_stages.append(stage)
+            with log_stage(stage):
+        except:
+            failed = True
-    exc, cls, tb = sys.exc_info()
-    for stage in ctx.clear_calls_stack[::-1]:
-        with log_stage(stage):
-            stage(ctx)
+    logger.debug("Start cleanup")
+    cleanup_failed = False
+    for stage in cleanup_stages[::-1]:
+        try:
+            with log_stage(stage):
+                stage.cleanup(ctx)
+        except:
+            cleanup_failed = True
-    logger.debug("Start utils.cleanup")
-    for clean_func, args, kwargs in utils.iter_clean_func():
-        with log_stage(clean_func):
-            clean_func(*args, **kwargs)
-    if exc is None:
+    if not failed:
         for report_stage in report_stages:
             with log_stage(report_stage):
-                report_stage(ctx)
+      "All info is stored into %r", config.storage_url)
-    if exc is None:
-"Tests finished successfully")
-        return 0
-    else:
+    if failed or cleanup_failed:
         logger.error("Tests are failed. See error details in log above")
         return 1
+    else:
+"Tests finished successfully")
+        return 0
diff --git a/wally/ b/wally/
index 2b58571..fae7879 100644
--- a/wally/
+++ b/wally/
@@ -133,22 +133,22 @@
         raise NotImplementedError()
     def run(self, cmd: str, timeout: int = 60, nolog: bool = False) -> str:
-        raise NotImplemented()
+        raise NotImplementedError()
     def copy_file(self, local_path: str, remote_path: str = None) -> str:
-        raise NotImplemented()
+        raise NotImplementedError()
     def put_to_file(self, path: str, content: bytes) -> None:
-        raise NotImplemented()
+        raise NotImplementedError()
     def get_interface(self, ip: str) -> str:
-        raise NotImplemented()
+        raise NotImplementedError()
     def stat_file(self, path: str) -> Any:
-        raise NotImplemented()
+        raise NotImplementedError()
     def disconnect(self) -> str:
-        raise NotImplemented()
+        raise NotImplementedError()
 def setup_rpc(node: ISSHHost, rpc_server_code: bytes, port: int = 0) -> IRPCNode:
diff --git a/wally/ b/wally/
new file mode 100644
index 0000000..cff6150
--- /dev/null
+++ b/wally/
@@ -0,0 +1,256 @@
+import os.path
+import socket
+import logging
+from typing import Dict, Any, List, Tuple, cast, Optional
+from .node_interfaces import NodeInfo
+from .config import ConfigBlock, Config
+from .ssh_utils import ConnCreds
+from .openstack_api import (os_connect, find_vms,
+                            OSCreds, get_openstack_credentials, prepare_os, launch_vms, clear_nodes)
+from .test_run_class import TestRun
+from .stage import Stage, StepOrder
+from .utils import LogError, StopTestError, get_creds_openrc
+logger = logging.getLogger("")
+def get_floating_ip(vm: Any) -> str:
+    """Get VM floating IP address"""
+    for net_name, ifaces in vm.addresses.items():
+        for iface in ifaces:
+            if iface.get('OS-EXT-IPS:type') == "floating":
+                return iface['addr']
+    raise ValueError("VM {} has no floating ip".format(vm))
+def ensure_connected_to_openstack(ctx: TestRun) -> None:
+    if not ctx.os_connection is None:
+        if ctx.os_creds is None:
+            ctx.os_creds = get_OS_credentials(ctx)
+        ctx.os_connection = os_connect(ctx.os_creds)
+def get_OS_credentials(ctx: TestRun) -> OSCreds:
+    if "openstack_openrc" in
+        return, "openstack_openrc")
+    creds = None
+    os_creds = None
+    force_insecure = False
+    cfg = ctx.config
+    if 'openstack' in cfg.clouds:
+        os_cfg = cfg.clouds['openstack']
+        if 'OPENRC' in os_cfg:
+  "Using OS credentials from " + os_cfg['OPENRC'])
+            creds_tuple = get_creds_openrc(os_cfg['OPENRC'])
+            os_creds = OSCreds(*creds_tuple)
+        elif 'ENV' in os_cfg:
+  "Using OS credentials from shell environment")
+            os_creds = get_openstack_credentials()
+        elif 'OS_TENANT_NAME' in os_cfg:
+  "Using predefined credentials")
+            os_creds = OSCreds(os_cfg['OS_USERNAME'].strip(),
+                               os_cfg['OS_PASSWORD'].strip(),
+                               os_cfg['OS_TENANT_NAME'].strip(),
+                               os_cfg['OS_AUTH_URL'].strip(),
+                               os_cfg.get('OS_INSECURE', False))
+        elif 'OS_INSECURE' in os_cfg:
+            force_insecure = os_cfg.get('OS_INSECURE', False)
+    if os_creds is None and 'fuel' in cfg.clouds and 'openstack_env' in cfg.clouds['fuel'] and \
+            ctx.fuel_openstack_creds is not None:
+"Using fuel creds")
+        creds = ctx.fuel_openstack_creds
+    elif os_creds is None:
+        logger.error("Can't found OS credentials")
+        raise StopTestError("Can't found OS credentials", None)
+    if creds is None:
+        creds = os_creds
+    if force_insecure and not creds.insecure:
+        creds = OSCreds(, creds.passwd, creds.tenant, creds.auth_url, True)
+    logger.debug(("OS_CREDS: user={} tenant={0.tenant} " +
+                  "auth_url={0.auth_url} insecure={0.insecure}").format(creds))
+["openstack_openrc"] = creds  # type: ignore
+    return creds
+def get_vm_keypair_path(cfg: Config) -> Tuple[str, str]:
+    key_name = cfg.vm_configs['keypair_name']
+    private_path = os.path.join(cfg.settings_dir, key_name + "_private.pem")
+    public_path = os.path.join(cfg.settings_dir, key_name + "")
+    return (private_path, public_path)
+class DiscoverOSStage(Stage):
+    """Discover openstack nodes and VMS"""
+    config_block = 'openstack'
+    # discover FUEL cluster first
+    priority = StepOrder.DISCOVER + 1
+    @classmethod
+    def validate(cls, conf: ConfigBlock) -> None:
+        pass
+    def run(self, ctx: TestRun) -> None:
+        cfg = ctx.config.openstack
+        os_nodes_auth = cfg.auth  # type: str
+        if os_nodes_auth.count(":") == 2:
+            user, password, key_file = os_nodes_auth.split(":")  # type: str, Optional[str], Optional[str]
+            if not password:
+                password = None
+        else:
+            user, password = os_nodes_auth.split(":")
+            key_file = None
+        ensure_connected_to_openstack(ctx)
+        if 'openstack_nodes' in
+            ctx.nodes_info.extend(, "openstack_nodes"))
+        else:
+            openstack_nodes = []  # type: List[NodeInfo]
+            services =  # type: List[Any]
+            host_services_mapping = {}  # type: Dict[str, List[str]]
+            for service in services:
+                ip = cast(str, socket.gethostbyname(
+                host_services_mapping.get(ip, []).append(service.binary)
+            logger.debug("Found %s openstack service nodes" % len(host_services_mapping))
+            for host, services in host_services_mapping.items():
+                creds = ConnCreds(host=host, user=user, passwd=password, key_file=key_file)
+                openstack_nodes.append(NodeInfo(creds, set(services)))
+            ctx.nodes_info.extend(openstack_nodes)
+  ['openstack_nodes'] = openstack_nodes  # type: ignore
+        if "reused_os_nodes" in
+            ctx.nodes_info.extend(, "reused_nodes"))
+        else:
+            reused_nodes = []  # type: List[NodeInfo]
+            private_key_path = get_vm_keypair_path(ctx.config)[0]
+            vm_creds = None  # type: str
+            for vm_creds in cfg.get("vms", []):
+                user_name, vm_name_pattern = vm_creds.split("@", 1)
+                msg = "Vm like {} lookup failed".format(vm_name_pattern)
+                with LogError(msg):
+                    msg = "Looking for vm with name like {0}".format(vm_name_pattern)
+                    logger.debug(msg)
+                    ensure_connected_to_openstack(ctx)
+                    for ip, vm_id in find_vms(ctx.os_connection, vm_name_pattern):
+                        creds = ConnCreds(host=ip, user=user_name, key_file=private_key_path)
+                        node_info = NodeInfo(creds, {'testnode'})
+                        node_info.os_vm_id = vm_id
+                        reused_nodes.append(node_info)
+            ctx.nodes_info.extend(reused_nodes)
+  ["reused_os_nodes"] = reused_nodes  # type: ignore
+class CreateOSVMSStage(Stage):
+    "Spawn new VM's in Openstack cluster"
+    priority = StepOrder.SPAWN  # type: int
+    config_block = 'spawn_os_vms'  # type: str
+    def run(self, ctx: TestRun) -> None:
+        vm_spawn_config = ctx.config.spawn_os_vms
+        vm_image_config = ctx.config.vm_configs[vm_spawn_config.cfg_name]
+        if 'spawned_os_nodes' in
+            ctx.nodes_info.extend(, "spawned_os_nodes"))
+        else:
+            ensure_connected_to_openstack(ctx)
+            params = vm_image_config.copy()
+            params.update(vm_spawn_config)
+            params.update(get_vm_keypair_path(ctx.config))
+            params['group_name'] = ctx.config.run_uuid
+            params['keypair_name'] = ctx.config.vm_configs['keypair_name']
+            if not ctx.config.openstack.get("skip_preparation", False):
+      "Preparing openstack")
+                prepare_os(ctx.os_connection, params)
+            new_nodes = []
+            ctx.os_spawned_nodes_ids = []
+            with ctx.get_pool() as pool:
+                for node_info in launch_vms(ctx.os_connection, params, pool):
+                    node_info.roles.add('testnode')
+                    ctx.os_spawned_nodes_ids.append(node_info.os_vm_id)
+                    new_nodes.append(node_info)
+  ['spawned_os_nodes'] = new_nodes  # type: ignore
+    def cleanup(self, ctx: TestRun) -> None:
+        # keep nodes in case of error for future test restart
+        if not ctx.config.keep_vm and ctx.os_spawned_nodes_ids:
+  "Removing nodes")
+            clear_nodes(ctx.os_connection, ctx.os_spawned_nodes_ids)
+            del['spawned_os_nodes']
+  "Nodes has been removed")
+# @contextlib.contextmanager
+# def suspend_vm_nodes_ctx(ctx: TestRun, unused_nodes: List[IRPCNode]) -> Iterator[List[int]]:
+#     pausable_nodes_ids = [cast(int,
+#                           for node in unused_nodes
+#                           if is not None]
+#     non_pausable = len(unused_nodes) - len(pausable_nodes_ids)
+#     if non_pausable:
+#         logger.warning("Can't pause {} nodes".format(non_pausable))
+#     if pausable_nodes_ids:
+#         logger.debug("Try to pause {} unused nodes".format(len(pausable_nodes_ids)))
+#         with ctx.get_pool() as pool:
+#             openstack_api.pause(ctx.os_connection, pausable_nodes_ids, pool)
+#     try:
+#         yield pausable_nodes_ids
+#     finally:
+#         if pausable_nodes_ids:
+#             logger.debug("Unpausing {} nodes".format(len(pausable_nodes_ids)))
+#             with ctx.get_pool() as pool:
+#                 openstack_api.unpause(ctx.os_connection, pausable_nodes_ids, pool)
+# def clouds_connect_stage(ctx: TestRun) -> None:
+    # TODO(koder): need to use this to connect to openstack in upper code
+    # conn = ctx.config['clouds/openstack']
+    # user, passwd, tenant = parse_creds(conn['creds'])
+    # auth_data = dict(auth_url=conn['auth_url'],
+    #                  username=user,
+    #                  api_key=passwd,
+    #                  project_id=tenant)  # type: Dict[str, str]
+    # logger.debug("Discovering openstack nodes with connection details: %r", conn)
+    # connect to openstack, fuel
+    # # parse FUEL REST credentials
+    # username, tenant_name, password = parse_creds(fuel_data['creds'])
+    # creds = {"username": username,
+    #          "tenant_name": tenant_name,
+    #          "password": password}
+    #
+    # # connect to FUEL
+    # conn = fuel_rest_api.KeystoneAuth(fuel_data['url'], creds, headers=None)
+    # pass
\ No newline at end of file
diff --git a/wally/ b/wally/
similarity index 97%
rename from wally/
rename to wally/
index a55fdbf..2e9ab63 100644
--- a/wally/
+++ b/wally/
@@ -7,7 +7,7 @@
 import tempfile
 import subprocess
 import urllib.request
-from typing import Dict, Any, Iterable, Iterator, NamedTuple, Optional, List, Tuple
+from typing import Dict, Any, Iterable, Iterator, NamedTuple, Optional, List, Tuple, Set
 from concurrent.futures import ThreadPoolExecutor
 from keystoneauth1 import loading, session
@@ -19,6 +19,7 @@
 from .utils import Timeout
 from .node_interfaces import NodeInfo
 from .storage import IStorable
+from .ssh_utils import ConnCreds
 __doc__ = """
@@ -81,7 +82,6 @@
 def find_vms(conn: OSConnection, name_prefix: str) -> Iterable[Tuple[str, int]]:
     for srv in conn.nova.servers.list():
             # need to exit after found server first external IP
             # so have to rollout two cycles to avoid using exceptions
             all_ip = []  # type: List[Any]
@@ -451,11 +451,10 @@
     # precache all errors before start creating vms
     private_key_path = params['keypair_file_private']
-    creds = params['image']['creds']
+    user = params['image']['user']
     for ip, os_node in create_vms_mt(conn, count, executor, **vm_params):
-        conn_uri = creds.format(ip=ip, private_key_path=private_key_path)
-        info = NodeInfo(conn_uri, set())
+        info = NodeInfo(ConnCreds(ip, user, key_file=private_key_path), set())
         info.os_vm_id =
         yield info
@@ -574,19 +573,18 @@
                                        scheduler_hints=scheduler_hints, security_groups=security_groups)
         if not wait_for_server_active(conn, srv):
-            msg = "Server {0} fails to start. Kill it and try again"
-            logger.debug(msg.format(srv))
+            logger.debug("Server {} fails to start. Kill it and try again".format(srv))
-                for _ in Timeout(delete_timeout, "Server {0} delete timeout".format(
+                for _ in Timeout(delete_timeout, "Server {} delete timeout".format(
                     srv = conn.nova.servers.get(
             except NotFound:
-        raise RuntimeError("Failed to start server".format(
+        raise RuntimeError("Failed to start server {}".format(
     if vol_sz is not None:
         vol = create_volume(conn, vol_sz, name)
@@ -598,6 +596,7 @@
     if flt_ip is not None:
+    # pylint: disable=E1101
     return flt_ip.ip, conn.nova.servers.get(
diff --git a/wally/ b/wally/
new file mode 100644
index 0000000..48f2b61
--- /dev/null
+++ b/wally/
@@ -0,0 +1,9 @@
+from typing import Any
+log = []
+def log_op(name: str, *params: Any) -> None:
+    log.append([name] + list(params))
diff --git a/wally/ b/wally/
index 88c97b7..ecf3ba7 100644
--- a/wally/
+++ b/wally/
@@ -20,6 +20,8 @@
 from .utils import ssize2b
 from .statistic import round_3_digit
 from .storage import Storage
+from .stage import Stage, StepOrder
+from .test_run_class import TestRun
 from .result_classes import TestInfo, FullTestResult, SensorInfo
 from import (get_test_sync_mode,
@@ -33,29 +35,46 @@
 def load_test_results(storage: Storage) -> Iterator[FullTestResult]:
     sensors_data = {}  # type: Dict[Tuple[str, str, str], SensorInfo]
-    for _, node_id in storage.list("metric"):
-        for _, dev_name in storage.list("metric", node_id):
-            for _, sensor_name in storage.list("metric", node_id, dev_name):
+    mstorage = storage.sub_storage("metric")
+    for _, node_id in mstorage.list():
+        for _, dev_name in mstorage.list(node_id):
+            for _, sensor_name in mstorage.list(node_id, dev_name):
                 key = (node_id, dev_name, sensor_name)
                 si = SensorInfo(*key)
-                si.begin_time, si.end_time, = storage["metric/{}/{}/{}".format(*key)]  # type: ignore
+                si.begin_time, si.end_time, = storage[node_id, dev_name, sensor_name]  # type: ignore
                 sensors_data[key] = si
-    for _, run_id in storage.list("result"):
-        path = "result/" + run_id
+    rstorage = storage.sub_storage("result")
+    for _, run_id in rstorage.list():
         ftr = FullTestResult()
- = storage.load(TestInfo, path, "info")
+        ftr.test_info = rstorage.load(TestInfo, run_id, "info")
         ftr.performance_data = {}
-        p1 = "result/{}/measurement".format(run_id)
-        for _, node_id in storage.list(p1):
-            for _, measurement_name in storage.list(p1, node_id):
+        p1 = "{}/measurement".format(run_id)
+        for _, node_id in rstorage.list(p1):
+            for _, measurement_name in rstorage.list(p1, node_id):
                 perf_key = (node_id, measurement_name)
-                ftr.performance_data[perf_key] = storage["{}/{}/{}".format(p1, *perf_key)]  # type: ignore
+                ftr.performance_data[perf_key] = rstorage["{}/{}/{}".format(p1, *perf_key)]  # type: ignore
         yield ftr
+class ConsoleReportStage(Stage):
+    priority = StepOrder.REPORT
+    def run(self, ctx: TestRun) -> None:
+        # TODO(koder): load data from storage
+        raise NotImplementedError("...")
+class HtmlReportStage(Stage):
+    priority = StepOrder.REPORT
+    def run(self, ctx: TestRun) -> None:
+        # TODO(koder): load data from storage
+        raise NotImplementedError("...")
 # class StoragePerfInfo:
 #     def __init__(self, name: str, summary: Any, params, testnodes_count) -> None:
 #         self.direct_iops_r_max = 0  # type: int
diff --git a/wally/ b/wally/
index 9ae2c9e..1a645b6 100755
--- a/wally/
+++ b/wally/
@@ -1,21 +1,18 @@
-import os
 import logging
-import contextlib
-from typing import List, Dict, Iterable, Iterator, Tuple, Optional, Union, cast
-from concurrent.futures import ThreadPoolExecutor, Future
+from concurrent.futures import Future
+from typing import List, Dict, Tuple, Optional, Union, cast
-from .node_interfaces import NodeInfo, IRPCNode
-from .test_run_class import TestRun
-from .discover import discover
-from . import pretty_yaml, utils, report, ssh_utils, start_vms, hw_info
+from . import utils, ssh_utils, hw_info
+from .config import ConfigBlock
 from .node import setup_rpc, connect
-from .config import ConfigBlock, Config
-from .suits.mysql import MysqlTest
-from .suits.itest import TestInputConfig
+from .node_interfaces import NodeInfo, IRPCNode
+from .stage import Stage, StepOrder
 from import IOPerfTest
-from .suits.postgres import PgBenchTest
+from .suits.itest import TestInputConfig
+from .suits.mysql import MysqlTest
 from .suits.omgbench import OmgTest
+from .suits.postgres import PgBenchTest
+from .test_run_class import TestRun
@@ -29,431 +26,149 @@
 logger = logging.getLogger("wally")
-def connect_all(nodes_info: List[NodeInfo], pool: ThreadPoolExecutor, conn_timeout: int = 30) -> List[IRPCNode]:
-    """Connect to all nodes, log errors"""
+class ConnectStage(Stage):
+    """Connect to nodes stage"""
-"Connecting to %s nodes", len(nodes_info))
+    priority = StepOrder.CONNECT
-    def connect_ext(node_info: NodeInfo) -> Tuple[bool, Union[IRPCNode, NodeInfo]]:
-        try:
-            ssh_node = connect(node_info, conn_timeout=conn_timeout)
-            # TODO(koder): need to pass all required rpc bytes to this call
-            return True, setup_rpc(ssh_node, b"")
-        except Exception as exc:
-            logger.error("During connect to {}: {!s}".format(node, exc))
-            return False, node_info
-    failed_testnodes = []  # type: List[NodeInfo]
-    failed_nodes = []  # type: List[NodeInfo]
-    ready = []  # type: List[IRPCNode]
-    for ok, node in, nodes_info):
-        if not ok:
-            node = cast(NodeInfo, node)
-            if 'testnode' in node.roles:
-                failed_testnodes.append(node)
-            else:
-                failed_nodes.append(node)
-        else:
-            ready.append(cast(IRPCNode, node))
-    if failed_nodes:
-        msg = "Node(s) {} would be excluded - can't connect"
-        logger.warning(msg.format(",".join(map(str, failed_nodes))))
-    if failed_testnodes:
-        msg = "Can't connect to testnode(s) " + \
-              ",".join(map(str, failed_testnodes))
-        logger.error(msg)
-        raise utils.StopTestError(msg)
-    if not failed_nodes:
-"All nodes connected successfully")
-    return ready
-def collect_info_stage(ctx: TestRun) -> None:
-    futures = {}  # type: Dict[str, Future]
-    with ctx.get_pool() as pool:
-        for node in ctx.nodes:
-            hw_info_path = "hw_info/{}".format(
-            if hw_info_path not in
-                futures[hw_info_path] = pool.submit(hw_info.get_hw_info, node), node
-            sw_info_path = "sw_info/{}".format(
-            if sw_info_path not in
-                futures[sw_info_path] = pool.submit(hw_info.get_sw_info, node)
-        for path, future in futures.items():
-  [path] = future.result()
-def suspend_vm_nodes_ctx(ctx: TestRun, unused_nodes: List[IRPCNode]) -> Iterator[List[int]]:
-    pausable_nodes_ids = [cast(int,
-                          for node in unused_nodes
-                          if is not None]
-    non_pausable = len(unused_nodes) - len(pausable_nodes_ids)
-    if non_pausable:
-        logger.warning("Can't pause {} nodes".format(non_pausable))
-    if pausable_nodes_ids:
-        logger.debug("Try to pause {} unused nodes".format(len(pausable_nodes_ids)))
+    def run(self, ctx: TestRun) -> None:
         with ctx.get_pool() as pool:
-            start_vms.pause(ctx.os_connection, pausable_nodes_ids, pool)
+  "Connecting to %s nodes", len(ctx.nodes_info))
-    try:
-        yield pausable_nodes_ids
-    finally:
-        if pausable_nodes_ids:
-            logger.debug("Unpausing {} nodes".format(len(pausable_nodes_ids)))
-            with ctx.get_pool() as pool:
-                start_vms.unpause(ctx.os_connection, pausable_nodes_ids, pool)
+            def connect_ext(node_info: NodeInfo) -> Tuple[bool, Union[IRPCNode, NodeInfo]]:
+                try:
+                    ssh_node = connect(node_info, conn_timeout=ctx.config.connect_timeout)
+                    # TODO(koder): need to pass all required rpc bytes to this call
+                    return True, setup_rpc(ssh_node, b"")
+                except Exception as exc:
+                    logger.error("During connect to {}: {!s}".format(node, exc))
+                    return False, node_info
+            failed_testnodes = []  # type: List[NodeInfo]
+            failed_nodes = []  # type: List[NodeInfo]
+            ctx.nodes = []
-def run_tests(ctx: TestRun, test_block: ConfigBlock, nodes: List[IRPCNode]) -> None:
-    """Run test from test block"""
+            for ok, node in, ctx.nodes_info):
+                if not ok:
+                    node = cast(NodeInfo, node)
+                    if 'testnode' in node.roles:
+                        failed_testnodes.append(node)
+                    else:
+                        failed_nodes.append(node)
+                else:
+                    ctx.nodes.append(cast(IRPCNode, node))
-    test_nodes = [node for node in nodes if 'testnode' in]
+            if failed_nodes:
+                msg = "Node(s) {} would be excluded - can't connect"
+                logger.warning(msg.format(",".join(map(str, failed_nodes))))
-    if not test_nodes:
-        logger.error("No test nodes found")
-        return
-    for name, params in test_block.items():
-        vm_count = params.get('node_limit', None)  # type: Optional[int]
-        # select test nodes
-        if vm_count is None:
-            curr_test_nodes = test_nodes
-            unused_nodes = []  # type: List[IRPCNode]
-        else:
-            curr_test_nodes = test_nodes[:vm_count]
-            unused_nodes = test_nodes[vm_count:]
-        if not curr_test_nodes:
-            logger.error("No nodes found for test, skipping it.")
-            continue
-        # results_path = generate_result_dir_name(cfg.results_storage, name, params)
-        # utils.mkdirs_if_unxists(results_path)
-        # suspend all unused virtual nodes
-        if ctx.config.get('suspend_unused_vms', True):
-            suspend_ctx = suspend_vm_nodes_ctx(ctx, unused_nodes)
-        else:
-            suspend_ctx = utils.empty_ctx()
-        resumable_nodes_ids = [cast(int,
-                               for node in curr_test_nodes
-                               if is not None]
-        if resumable_nodes_ids:
-            logger.debug("Check and unpause {} nodes".format(len(resumable_nodes_ids)))
-            with ctx.get_pool() as pool:
-                start_vms.unpause(ctx.os_connection, resumable_nodes_ids, pool)
-        with suspend_ctx:
-            test_cls = TOOL_TYPE_MAPPER[name]
-            remote_dir = ctx.config.default_test_local_folder.format(name=name, uuid=ctx.config.run_uuid)
-            test_cfg = TestInputConfig(test_cls.__name__,
-                                       params=params,
-                                       run_uuid=ctx.config.run_uuid,
-                                       nodes=test_nodes,
-                             ,
-                                       remote_dir=remote_dir)
-            test_cls(test_cfg).run()
-def connect_stage(ctx: TestRun) -> None:
-    ctx.clear_calls_stack.append(disconnect_stage)
-    with ctx.get_pool() as pool:
-        ctx.nodes = connect_all(ctx.nodes_info, pool)
-def discover_stage(ctx: TestRun) -> None:
-    """discover clusters and nodes stage"""
-    # TODO(koder): Properly store discovery info and check if it available to skip phase
-    discover_info = ctx.config.get('discover')
-    if discover_info:
-        if "discovered_nodes" in
-            nodes ="discovered_nodes", NodeInfo)
-            ctx.fuel_openstack_creds ="fuel_openstack_creds", start_vms.OSCreds)
-        else:
-            discover_objs = [i.strip() for i in discover_info.strip().split(",")]
-            ctx.fuel_openstack_creds, nodes =,
-                                                                discover_objs,
-                                                                ctx.config.clouds,
-                                                                not ctx.config.dont_discover_nodes)
-  ["fuel_openstack_creds"] = ctx.fuel_openstack_creds  # type: ignore
-  ["discovered_nodes"] = nodes  # type: ignore
-        ctx.nodes_info.extend(nodes)
-    for url, roles in ctx.config.get('explicit_nodes', {}).items():
-        creds = ssh_utils.parse_ssh_uri(url)
-        roles = set(roles.split(","))
-        ctx.nodes_info.append(NodeInfo(creds, roles))
-def save_nodes_stage(ctx: TestRun) -> None:
-    """Save nodes list to file"""
-['nodes'] = ctx.nodes_info   # type: ignore
-def ensure_connected_to_openstack(ctx: TestRun) -> None:
-    if not ctx.os_connection is None:
-        if ctx.os_creds is None:
-            ctx.os_creds = get_OS_credentials(ctx)
-        ctx.os_connection = start_vms.os_connect(ctx.os_creds)
-def reuse_vms_stage(ctx: TestRun) -> None:
-    if "reused_nodes" in
-        ctx.nodes_info.extend("reused_nodes", NodeInfo))
-    else:
-        reused_nodes = []
-        vms_patterns = ctx.config.get('clouds/openstack/vms', [])
-        private_key_path = get_vm_keypair_path(ctx.config)[0]
-        for creds in vms_patterns:
-            user_name, vm_name_pattern = creds.split("@", 1)
-            msg = "Vm like {} lookup failed".format(vm_name_pattern)
-            with utils.LogError(msg):
-                msg = "Looking for vm with name like {0}".format(vm_name_pattern)
-                logger.debug(msg)
-                ensure_connected_to_openstack(ctx)
-                for ip, vm_id in start_vms.find_vms(ctx.os_connection, vm_name_pattern):
-                    creds = ssh_utils.ConnCreds(host=ip, user=user_name, key_file=private_key_path)
-                    node_info = NodeInfo(creds, {'testnode'})
-                    node_info.os_vm_id = vm_id
-                    reused_nodes.append(node_info)
-                    ctx.nodes_info.append(node_info)
-["reused_nodes"] = reused_nodes  # type: ignore
-def get_OS_credentials(ctx: TestRun) -> start_vms.OSCreds:
-    if "openstack_openrc" in
-        return"openstack_openrc", start_vms.OSCreds)
-    creds = None
-    os_creds = None
-    force_insecure = False
-    cfg = ctx.config
-    if 'openstack' in cfg.clouds:
-        os_cfg = cfg.clouds['openstack']
-        if 'OPENRC' in os_cfg:
-  "Using OS credentials from " + os_cfg['OPENRC'])
-            creds_tuple = utils.get_creds_openrc(os_cfg['OPENRC'])
-            os_creds = start_vms.OSCreds(*creds_tuple)
-        elif 'ENV' in os_cfg:
-  "Using OS credentials from shell environment")
-            os_creds = start_vms.get_openstack_credentials()
-        elif 'OS_TENANT_NAME' in os_cfg:
-  "Using predefined credentials")
-            os_creds = start_vms.OSCreds(os_cfg['OS_USERNAME'].strip(),
-                                         os_cfg['OS_PASSWORD'].strip(),
-                                         os_cfg['OS_TENANT_NAME'].strip(),
-                                         os_cfg['OS_AUTH_URL'].strip(),
-                                         os_cfg.get('OS_INSECURE', False))
-        elif 'OS_INSECURE' in os_cfg:
-            force_insecure = os_cfg.get('OS_INSECURE', False)
-    if os_creds is None and 'fuel' in cfg.clouds and 'openstack_env' in cfg.clouds['fuel'] and \
-            ctx.fuel_openstack_creds is not None:
-"Using fuel creds")
-        creds = ctx.fuel_openstack_creds
-    elif os_creds is None:
-        logger.error("Can't found OS credentials")
-        raise utils.StopTestError("Can't found OS credentials", None)
-    if creds is None:
-        creds = os_creds
-    if force_insecure and not creds.insecure:
-        creds = start_vms.OSCreds(,
-                                  creds.passwd,
-                                  creds.tenant,
-                                  creds.auth_url,
-                                  True)
-    logger.debug(("OS_CREDS: user={} tenant={0.tenant} " +
-                  "auth_url={0.auth_url} insecure={0.insecure}").format(creds))
-["openstack_openrc"] = creds  # type: ignore
-    return creds
-def get_vm_keypair_path(cfg: Config) -> Tuple[str, str]:
-    key_name = cfg.vm_configs['keypair_name']
-    private_path = os.path.join(cfg.settings_dir, key_name + "_private.pem")
-    public_path = os.path.join(cfg.settings_dir, key_name + "")
-    return (private_path, public_path)
-def create_vms_ctx(ctx: TestRun, vm_config: ConfigBlock, already_has_count: int = 0) -> Iterator[List[NodeInfo]]:
-    if 'spawned_vm_ids' in
-        os_nodes_ids ='spawned_vm_ids', [])  # type: List[int]
-        new_nodes = []  # type: List[NodeInfo]
-        # TODO(koder): reconnect to old VM's
-        raise NotImplementedError("Reconnect to old vms is not implemented")
-    else:
-        os_nodes_ids = []
-        new_nodes = []
-        no_spawn = False
-        if vm_config['count'].startswith('='):
-            count = int(vm_config['count'][1:])
-            if count <= already_has_count:
-                logger.debug("Not need new vms")
-                no_spawn = True
-        if not no_spawn:
-            ensure_connected_to_openstack(ctx)
-            params = ctx.config.vm_configs[vm_config['cfg_name']].copy()
-            params.update(vm_config)
-            params.update(get_vm_keypair_path(ctx.config))
-            params['group_name'] = ctx.config.run_uuid
-            params['keypair_name'] = ctx.config.vm_configs['keypair_name']
-            if not vm_config.get('skip_preparation', False):
-      "Preparing openstack")
-                start_vms.prepare_os(ctx.os_connection, params)
-            with ctx.get_pool() as pool:
-                for node_info in start_vms.launch_vms(ctx.os_connection, params, pool, already_has_count):
-                    node_info.roles.add('testnode')
-                    os_nodes_ids.append(node_info.os_vm_id)
-                    new_nodes.append(node_info)
-['spawned_vm_ids'] = os_nodes_ids  # type: ignore
-        yield new_nodes
-        # keep nodes in case of error for future test restart
-        if not ctx.config.keep_vm:
-            shut_down_vms_stage(ctx, os_nodes_ids)
-        del['spawned_vm_ids']
-def sensor_monitoring(ctx: TestRun, cfg: ConfigBlock, nodes: List[IRPCNode]) -> Iterator[None]:
-    yield
-def run_tests_stage(ctx: TestRun) -> None:
-    for group in ctx.config.get('tests', []):
-        gitems = list(group.items())
-        if len(gitems) != 1:
-            msg = "Items in tests section should have len == 1"
-            logger.error(msg)
-            raise utils.StopTestError(msg)
-        key, config = gitems[0]
-        if 'start_test_nodes' == key:
-            if 'openstack' not in config:
-                msg = "No openstack block in config - can't spawn vm's"
+            if failed_testnodes:
+                msg = "Can't connect to testnode(s) " + \
+                      ",".join(map(str, failed_testnodes))
                 raise utils.StopTestError(msg)
-            num_test_nodes = len([node for node in ctx.nodes if 'testnode' in])
-            vm_ctx = create_vms_ctx(ctx, config['openstack'], num_test_nodes)
-            tests = config.get('tests', [])
-        else:
-            vm_ctx = utils.empty_ctx([])
-            tests = [group]
+            if not failed_nodes:
+      "All nodes connected successfully")
-        # make mypy happy
-        new_nodes = []  # type: List[NodeInfo]
+    def cleanup(self, ctx: TestRun) -> None:
+        # TODO(koder): what next line was for?
+        # ssh_utils.close_all_sessions()
-        with vm_ctx as new_nodes:
-            if new_nodes:
-                with ctx.get_pool() as pool:
-                    new_rpc_nodes = connect_all(new_nodes, pool)
+        for node in ctx.nodes:
+            node.disconnect()
-            test_nodes = ctx.nodes + new_rpc_nodes
-            if ctx.config.get('sensors'):
-                sensor_ctx = sensor_monitoring(ctx, ctx.config.get('sensors'), test_nodes)
-            else:
-                sensor_ctx = utils.empty_ctx([])
+class CollectInfoStage(Stage):
+    """Collect node info"""
+    priority = StepOrder.START_SENSORS - 1
+    config_block = 'collect_info'
+    def run(self, ctx: TestRun) -> None:
+        if not ctx.config.collect_info:
+            return
+        futures = {}  # type: Dict[str, Future]
+        with ctx.get_pool() as pool:
+            for node in ctx.nodes:
+                hw_info_path = "hw_info/{}".format(
+                if hw_info_path not in
+                    futures[hw_info_path] = pool.submit(hw_info.get_hw_info, node), node
+                sw_info_path = "sw_info/{}".format(
+                if sw_info_path not in
+                    futures[sw_info_path] = pool.submit(hw_info.get_sw_info, node)
+            for path, future in futures.items():
+      [path] = future.result()
+class ExplicitNodesStage(Stage):
+    """add explicit nodes"""
+    priority = StepOrder.DISCOVER
+    config_block = 'nodes'
+    def run(self, ctx: TestRun) -> None:
+        explicit_nodes = []
+        for url, roles in ctx.config.get('explicit_nodes', {}).items():
+            creds = ssh_utils.parse_ssh_uri(url)
+            roles = set(roles.split(","))
+            explicit_nodes.append(NodeInfo(creds, roles))
+        ctx.nodes_info.extend(explicit_nodes)
+['explicit_nodes'] = explicit_nodes  # type: ignore
+class SaveNodesStage(Stage):
+    """Save nodes list to file"""
+    priority = StepOrder.CONNECT
+    def run(self, ctx: TestRun) -> None:
+['all_nodes'] = ctx.nodes_info   # type: ignore
+class RunTestsStage(Stage):
+    priority = StepOrder.TEST
+    config_block = 'tests'
+    def run(self, ctx: TestRun) -> None:
+        for test_group in ctx.config.get('tests', []):
             if not ctx.config.no_tests:
-                for test_group in tests:
-                    with sensor_ctx:
-                        run_tests(ctx, test_group, test_nodes)
+                test_nodes = [node for node in ctx.nodes if 'testnode' in]
-            for node in new_rpc_nodes:
-                node.disconnect()
+                if not test_nodes:
+                    logger.error("No test nodes found")
+                    return
+                for name, params in test_group.items():
+                    vm_count = params.get('node_limit', None)  # type: Optional[int]
-def clouds_connect_stage(ctx: TestRun) -> None:
-    # TODO(koder): need to use this to connect to openstack in upper code
-    # conn = ctx.config['clouds/openstack']
-    # user, passwd, tenant = parse_creds(conn['creds'])
-    # auth_data = dict(auth_url=conn['auth_url'],
-    #                  username=user,
-    #                  api_key=passwd,
-    #                  project_id=tenant)  # type: Dict[str, str]
-    # logger.debug("Discovering openstack nodes with connection details: %r", conn)
-    # connect to openstack, fuel
+                    # select test nodes
+                    if vm_count is None:
+                        curr_test_nodes = test_nodes
+                    else:
+                        curr_test_nodes = test_nodes[:vm_count]
-    # # parse FUEL REST credentials
-    # username, tenant_name, password = parse_creds(fuel_data['creds'])
-    # creds = {"username": username,
-    #          "tenant_name": tenant_name,
-    #          "password": password}
-    #
-    # # connect to FUEL
-    # conn = fuel_rest_api.KeystoneAuth(fuel_data['url'], creds, headers=None)
-    pass
+                    if not curr_test_nodes:
+                        logger.error("No nodes found for test, skipping it.")
+                        continue
+                    test_cls = TOOL_TYPE_MAPPER[name]
+                    remote_dir = ctx.config.default_test_local_folder.format(name=name, uuid=ctx.config.run_uuid)
+                    test_cfg = TestInputConfig(test_cls.__name__,
+                                               params=params,
+                                               run_uuid=ctx.config.run_uuid,
+                                               nodes=test_nodes,
+                                     ,
+                                               remote_dir=remote_dir)
-def shut_down_vms_stage(ctx: TestRun, nodes_ids: List[int]) -> None:
-    if nodes_ids:
-"Removing nodes")
-        start_vms.clear_nodes(ctx.os_connection, nodes_ids)
-"Nodes has been removed")
+                    test_cls(test_cfg).run()
-def clear_enviroment(ctx: TestRun) -> None:
-    shut_down_vms_stage(ctx,'spawned_vm_ids', []))
-['spawned_vm_ids'] = []  # type: ignore
-def disconnect_stage(ctx: TestRun) -> None:
-    # TODO(koder): what next line was for?
-    # ssh_utils.close_all_sessions()
-    for node in ctx.nodes:
-        node.disconnect()
-def console_report_stage(ctx: TestRun) -> None:
-    # TODO(koder): load data from storage
-    raise NotImplementedError("...")
-def html_report_stage(ctx: TestRun) -> None:
-    # TODO(koder): load data from storage
-    raise NotImplementedError("...")
+    @classmethod
+    def validate_config(cls, cfg: ConfigBlock) -> None:
+        pass
diff --git a/wally/ b/wally/
index b579f3f..c86aeb4 100644
--- a/wally/
+++ b/wally/
@@ -1,62 +1,69 @@
-from typing import List, Dict, Tuple
+from typing import List, Dict, Tuple, Any
 from .test_run_class import TestRun
 from . import sensors_rpc_plugin
+from .stage import Stage, StepOrder
 plugin_fname = sensors_rpc_plugin.__file__.rsplit(".", 1)[0] + ".py"
 SENSORS_PLUGIN_CODE = open(plugin_fname).read()
 # TODO(koder): in case if node has more than one role sensor settigns might be incorrect
-def start_sensors_stage(ctx: TestRun) -> None:
-    if 'sensors' not in ctx.config:
-        return
-    per_role_config = {}
-    for name, val in ctx.config['sensors'].copy():
-        if isinstance(val, str):
-            val = {vl.strip(): ".*" for vl in val.split(",")}
-        elif isinstance(val, list):
-            val = {vl: ".*" for vl in val}
-        per_role_config[name] = val
+class StartSensorsStage(Stage):
+    priority = StepOrder.START_SENSORS
+    config_block = 'sensors'
-    if 'all' in per_role_config:
-        all_vl = per_role_config.pop('all')
-        all_roles = set(per_role_config)
+    def run(self, ctx: TestRun) -> None:
+        if 'sensors' not in ctx.config:
+            return
+        per_role_config = {}  # type: Dict[str, Dict[str, str]]
+        for name, val in ctx.config['sensors'].copy():
+            if isinstance(val, str):
+                val = {vl.strip(): ".*" for vl in val.split(",")}
+            elif isinstance(val, list):
+                val = {vl: ".*" for vl in val}
+            per_role_config[name] = val
+        if 'all' in per_role_config:
+            all_vl = per_role_config.pop('all')
+            all_roles = set(per_role_config)
+            for node in ctx.nodes:
+                all_roles.update(
+            for name, vals in list(per_role_config.items()):
+                new_vals = all_vl.copy()
+                new_vals.update(vals)
+                per_role_config[name] = new_vals
         for node in ctx.nodes:
-            all_roles.update(
+            node_cfg = {}  # type: Dict[str, str]
+            for role in
+                node_cfg.update(per_role_config.get(role, {}))
-        for name, vals in list(per_role_config.items()):
-            new_vals = all_roles.copy()
-            new_vals.update(vals)
-            per_role_config[name] = new_vals
-    for node in ctx.nodes:
-        node_cfg = {}
-        for role in
-            node_cfg.update(per_role_config.get(role, {}))
-        if node_cfg:
-            node.conn.upload_plugin(SENSORS_PLUGIN_CODE)
-            ctx.sensors_run_on.add(
-        node.conn.sensors.start()
+            if node_cfg:
+                node.conn.upload_plugin(SENSORS_PLUGIN_CODE)
+                ctx.sensors_run_on.add(
+            node.conn.sensors.start()
-def collect_sensors_stage(ctx: TestRun, stop: bool = True) -> None:
-    for node in ctx.nodes:
-        node_id =
-        if node_id in ctx.sensors_run_on:
+class CollectSensorsStage(Stage):
+    priority = StepOrder.COLLECT_SENSORS
+    config_block = 'sensors'
-            if stop:
+    def run(self, ctx: TestRun) -> None:
+        for node in ctx.nodes:
+            node_id =
+            if node_id in ctx.sensors_run_on:
                 data, collected_at = node.conn.sensors.stop()  # type: Dict[Tuple[str, str], List[int]], List[float]
-            else:
-                data, collected_at = node.conn.sensors.get_updates()
-            for (source_name, sensor_name), values in data.items():
-                path = "metric/{}/{}/{}".format(node_id, source_name, sensor_name)
-      , values)
-      "metric/{}/collected_at".format(node_id), collected_at)
+                mstore ="metric", node_id)
+                for (source_name, sensor_name), values in data.items():
+                    mstore[source_name, sensor_name] = values
+                    mstore["collected_at"] = collected_at
 # def delta(func, only_upd=True):
diff --git a/wally/ b/wally/
index 0451d18..95542f3 100644
--- a/wally/
+++ b/wally/
@@ -1,27 +1,36 @@
-import logging
-import contextlib
-from typing import Callable, Iterator
+import abc
+from typing import Optional
-from .utils import StopTestError
 from .test_run_class import TestRun
+from .config import ConfigBlock
-logger = logging.getLogger("wally")
+class StepOrder:
+    DISCOVER = 0
+    SPAWN = 10
+    CONNECT = 20
+    TEST = 40
+    REPORT = 60
-def log_stage(stage) -> Iterator[None]:
-    msg_templ = "Exception during {0}: {1!s}"
-    msg_templ_no_exc = "During {0}"
+class Stage(metaclass=abc.ABCMeta):
+    priority = None  # type: int
+    config_block = None  # type: Optional[str]
-"Start " +
+    @classmethod
+    def name(cls) -> str:
+        return cls.__name__
-    try:
-        yield
-    except StopTestError as exc:
-        logger.error(msg_templ.format(stage.__name__, exc))
-    except Exception:
-        logger.exception(msg_templ_no_exc.format(stage.__name__))
+    @classmethod
+    def validate_config(cls, cfg: ConfigBlock) -> None:
+        pass
+    @abc.abstractmethod
+    def run(self, ctx: TestRun) -> None:
+        pass
-StageType = Callable[[TestRun], None]
+    def cleanup(self, ctx: TestRun) -> None:
+        pass
diff --git a/wally/ b/wally/
index 05e4259..540da88 100644
--- a/wally/
+++ b/wally/
@@ -4,30 +4,24 @@
 import os
 import abc
-from typing import Any, Iterable, TypeVar, Type, IO, Tuple, cast, List
+import array
+from typing import Any, Iterator, TypeVar, Type, IO, Tuple, cast, List, Dict, Union, Iterable
+import yaml
+    from yaml import CLoader as Loader, CDumper as Dumper  # type: ignore
+except ImportError:
+    from yaml import Loader, Dumper  # type: ignore
 class IStorable(metaclass=abc.ABCMeta):
     """Interface for type, which can be stored"""
-    @abc.abstractmethod
-    def __getstate__(self) -> Any:
-        pass
-    @abc.abstractmethod
-    def __setstate__(self, Any):
-        pass
-# all builtin types can be stored
-IStorable.register(list)  # type: ignore
-IStorable.register(dict)  # type: ignore
-IStorable.register(tuple)  # type: ignore
-IStorable.register(set)  # type: ignore
-IStorable.register(None)  # type: ignore
-IStorable.register(int)  # type: ignore
-IStorable.register(str)  # type: ignore
-IStorable.register(bytes)  # type: ignore
-IStorable.register(bool)  # type: ignore
+basic_types = {list, dict, tuple, set, type(None), int, str, bytes, bool, float}
+for btype in basic_types:
+    # pylint: disable=E1101
+    IStorable.register(btype)  # type: ignore
 ObjClass = TypeVar('ObjClass')
@@ -54,11 +48,15 @@
-    def list(self, path: str) -> Iterable[Tuple[bool, str]]:
+    def list(self, path: str) -> Iterator[Tuple[bool, str]]:
-    def get_stream(self, path: str) -> IO:
+    def get_stream(self, path: str, mode: str = "rb+") -> IO:
+        pass
+    @abc.abstractmethod
+    def sub_storage(self, path: str) -> 'ISimpleStorage':
@@ -78,14 +76,18 @@
     def __init__(self, root_path: str, existing: bool) -> None:
         self.root_path = root_path
+        self.existing = existing
         if existing:
             if not os.path.isdir(self.root_path):
-                raise ValueError("No storage found at {!r}".format(root_path))
+                raise IOError("No storage found at {!r}".format(root_path))
+    def j(self, path: str) -> str:
+        return os.path.join(self.root_path, path)
     def __setitem__(self, path: str, value: bytes) -> None:
-        path = os.path.join(self.root_path, path)
-        os.makedirs(os.path.dirname(path), exist_ok=True)
-        with open(path, "wb") as fd:
+        jpath = self.j(path)
+        os.makedirs(os.path.dirname(jpath), exist_ok=True)
+        with open(jpath, "wb") as fd:
     def __delitem__(self, path: str) -> None:
@@ -95,32 +97,53 @@
     def __getitem__(self, path: str) -> bytes:
-        path = os.path.join(self.root_path, path)
-        with open(path, "rb") as fd:
+        with open(self.j(path), "rb") as fd:
     def __contains__(self, path: str) -> bool:
-        path = os.path.join(self.root_path, path)
-        return os.path.exists(path)
+        return os.path.exists(self.j(path))
-    def list(self, path: str) -> Iterable[Tuple[bool, str]]:
-        path = os.path.join(self.root_path, path)
-        for entry in os.scandir(path):
+    def list(self, path: str = "") -> Iterator[Tuple[bool, str]]:
+        jpath = self.j(path)
+        for entry in os.scandir(jpath):
             if not in ('..', '.'):
                 yield entry.is_file(),
-    def get_stream(self, path: str, mode: str = "rb") -> IO:
-        path = os.path.join(self.root_path, path)
-        return open(path, mode)
+    def get_stream(self, path: str, mode: str = "rb+") -> IO[bytes]:
+        jpath = self.j(path)
+        if "cb" == mode:
+            create_on_fail = True
+            mode = "rb+"
+        else:
+            create_on_fail = False
+        try:
+            fd = open(jpath, mode)
+        except IOError:
+            if not create_on_fail:
+                raise
+            fd = open(jpath, "wb")
+        return cast(IO[bytes], fd)
+    def sub_storage(self, path: str) -> 'FSStorage':
+        return self.__class__(self.j(path), self.existing)
 class YAMLSerializer(ISerializer):
     """Serialize data to yaml"""
-    def pack(self, value: IStorable) -> bytes:
-        raise NotImplementedError()
+    def pack(self, value: Any) -> bytes:
+        if type(value) not in basic_types:
+            for name, val in value.__dict__.items():
+                if type(val) not in basic_types:
+                    raise ValueError(("Can't pack {!r}. Attribute {} has value {!r} (type: {}), but only" +
+                                      " basic types accepted as attributes").format(value, name, val, type(val)))
+            value = value.__dict__
+        return yaml.dump(value, Dumper=Dumper, encoding="utf8")
     def unpack(self, data: bytes) -> IStorable:
-        raise NotImplementedError()
+        return yaml.load(data, Loader=Loader)
 class Storage:
@@ -129,32 +152,65 @@ = storage
         self.serializer = serializer
-    def __setitem__(self, path: str, value: IStorable) -> None:
-[path] = self.serializer.pack(value)
+    def sub_storage(self, *path: str) -> 'Storage':
+        return self.__class__("/".join(path)), self.serializer)
-    def __getitem__(self, path: str) -> IStorable:
+    def __setitem__(self, path: Union[str, Iterable[str]], value: Any) -> None:
+        if not isinstance(path, str):
+            path = "/".join(path)
+[path] = self.serializer.pack(cast(IStorable, value))
+    def __getitem__(self, path: Union[str, Iterable[str]]) -> IStorable:
+        if not isinstance(path, str):
+            path = "/".join(path)
         return self.serializer.unpack([path])
-    def __delitem__(self, path: str) -> None:
+    def __delitem__(self, path: Union[str, Iterable[str]]) -> None:
+        if not isinstance(path, str):
+            path = "/".join(path)
-    def __contains__(self, path: str) -> bool:
+    def __contains__(self, path: Union[str, Iterable[str]]) -> bool:
+        if not isinstance(path, str):
+            path = "/".join(path)
         return path in
-    def list(self, *path: str) -> Iterable[Tuple[bool, str]]:
+    def list(self, *path: str) -> Iterator[Tuple[bool, str]]:
-    def construct(self, path: str, raw_val: IStorable, obj_class: Type[ObjClass]) -> ObjClass:
+    def set_array(self, value: array.array, *path: str) -> None:
+        with self.get_stream("/".join(path), "wb") as fd:
+            value.tofile(fd)  # type: ignore
+    def get_array(self, typecode: str, *path: str) -> array.array:
+        res = array.array(typecode)
+        path_s = "/".join(path)
+        with self.get_stream(path_s, "rb") as fd:
+  , os.SEEK_END)
+            size = fd.tell()
+  , os.SEEK_SET)
+            assert size % res.itemsize == 0, "Storage object at path {} contains no array of {} or corrupted."\
+                .format(path_s, typecode)
+            res.fromfile(fd, size // res.itemsize)  # type: ignore
+        return res
+    def append(self, value: array.array, *path: str) -> None:
+        with self.get_stream("/".join(path), "cb") as fd:
+  , os.SEEK_END)
+            value.tofile(fd)  # type: ignore
+    def construct(self, path: str, raw_val: Dict, obj_class: Type[ObjClass]) -> ObjClass:
+        "Internal function, used to construct user type from raw unpacked value"
         if obj_class in (int, str, dict, list, None):
-            if not isinstance(raw_val, obj_class):
-                raise ValueError("Can't load path {!r} into type {}. Real type is {}"
-                                 .format(path, obj_class, type(raw_val)))
-            return cast(ObjClass, raw_val)
+            raise ValueError("Can't load into build-in value - {!r} into type {}")
         if not isinstance(raw_val, dict):
             raise ValueError("Can't load path {!r} into python type. Raw value not dict".format(path))
-        if not all(isinstance(str, key) for key in raw_val.keys):
+        if not all(isinstance(key, str) for key in raw_val.keys()):
             raise ValueError("Can't load path {!r} into python type.".format(path) +
                              "Raw not all keys in raw value is strings")
@@ -170,19 +226,25 @@
     def load(self, obj_class: Type[ObjClass], *path: str) -> ObjClass:
         path_s = "/".join(path)
-        return self.construct(path_s, self[path_s], obj_class)
+        return self.construct(path_s, cast(Dict, self[path_s]), obj_class)
-    def get_stream(self, *path: str) -> IO:
-        return"/".join(path))
+    def get_stream(self, path: str, mode: str = "r") -> IO:
+        return, mode)
-    def get(self, path: str, default: Any = None) -> Any:
+    def get(self, path: Union[str, Iterable[str]], default: Any = None) -> Any:
+        if not isinstance(path, str):
+            path = "/".join(path)
             return self[path]
-        except KeyError:
+        except Exception:
             return default
-    def append(self, path: str, data: List):
-        raise NotImplemented()
+    def __enter__(self) -> 'Storage':
+        return self
+    def __exit__(self, x: Any, y: Any, z: Any) -> None:
+        return
 def make_storage(url: str, existing: bool = False) -> Storage:
diff --git a/wally/storage_structure.txt b/wally/storage_structure.txt
index 583a4a0..5715046 100644
--- a/wally/storage_structure.txt
+++ b/wally/storage_structure.txt
@@ -1,10 +1,19 @@
 config: Config - full configuration
-nodes: List[NodeInfo] - all nodes
-fuel_openstack_creds: OSCreds - openstack creds, discovered from fuel (or None)
+all_nodes: List[NodeInfo] - all nodes
+    version: List[int] - FUEL master node version
+    os_creds: OSCreds - openstack creds, discovered from fuel (or None)
+    nodes: List[NodeInfo] - FUEL cluster nodes
 openstack_openrc: OSCreds - openrc used for openstack cluster
-discovered_nodes: List[NodeInfo] - list of discovered nodes
-reused_nodes: List[NodeInfo] - list of reused nodes from cluster
-spawned_vm_ids: List[int] - list of openstack VM id's, spawned for test
+openstack_nodes: List[NodeInfo] - list of openstack nodes
+reused_os_nodes: List[NodeInfo] - list of openstack VM, reused in test
+spawned_os_nodes: List[NodeInfo] - list of openstack VM, spawned for test
+ceph_nodes: List[NodeInfo] - list of ceph nodes
+explicit_nodes: List[NodeInfo] - list of explicit nodes
 info/comment : str - run comment
 info/run_uuid : str - run uuid
 info/run_time : float - run unix time
diff --git a/wally/suits/io/ b/wally/suits/io/
index 0f4ebde..1b5f38e 100644
--- a/wally/suits/io/
+++ b/wally/suits/io/
@@ -83,7 +83,7 @@
             node.conn.rmdir(self.config.remote_dir, recursive=True, ignore_missing=True)
         except Exception as exc:
-            msg = "Failed to create folder {} on remote {}.".format(self.config.remote_dir, node, exc)
+            msg = "Failed to create folder {} on remote {}.".format(self.config.remote_dir, node)
             raise StopTestError(msg) from exc
diff --git a/wally/suits/ b/wally/suits/
index 6d1eeee..ef69b05 100644
--- a/wally/suits/
+++ b/wally/suits/
@@ -74,7 +74,7 @@
-    def format_for_console(cls, data: Any) -> str:
+    def format_for_console(self, data: Any) -> str:
@@ -122,9 +122,9 @@
                 assert info == expected_config, \
-                    "Test info at path {} is not equal to expected config." + \
-                    "Maybe configuration was changed before test was restarted. " + \
-                    "Current cfg is {!r}, expected cfg is {!r}".format(info_path, info, expected_config)
+                    ("Test info at path {} is not equal to expected config." +
+                     "Maybe configuration was changed before test was restarted. " +
+                     "Current cfg is {!r}, expected cfg is {!r}").format(info_path, info, expected_config)
       "Test iteration {} found in storage and will be skipped".format(iter_name))
@@ -181,10 +181,10 @@
                 start_times = []  # type: List[int]
                 stop_times = []  # type: List[int]
+                mstorage = storage.sub_storage("result", str(run_id), "measurement")
                 for (result, (t_start, t_stop)), node in zip(results, self.config.nodes):
                     for metrics_name, data in result.items():
-                        path = "result/{}/measurement/{}/{}".format(run_id,, metrics_name)
-                        storage[path] = data  # type: ignore
+                        mstorage[, metrics_name] = data  # type: ignore
@@ -214,7 +214,7 @@
                     'end_time': max_stop_time
-                storage["result/{}/info".format(run_id)] = test_config  # type: ignore
+                storage["result", str(run_id), "info"] = test_config  # type: ignore
     def config_node(self, node: IRPCNode) -> None:
diff --git a/wally/ b/wally/
index db41a46..30c46e7 100644
--- a/wally/
+++ b/wally/
@@ -4,7 +4,7 @@
 from .timeseries import SensorDatastore
 from .node_interfaces import NodeInfo, IRPCNode
-from .start_vms import OSCreds, OSConnection
+from .openstack_api import OSCreds, OSConnection
 from .storage import Storage
 from .config import Config
 from .fuel_rest_api import Connection
@@ -24,6 +24,7 @@
         # openstack credentials
         self.fuel_openstack_creds = None  # type: Optional[OSCreds]
+        self.fuel_version = None  # type: Optional[List[int]]
         self.os_creds = None  # type: Optional[OSCreds]
         self.os_connection = None  # type: Optional[OSConnection]
         self.fuel_conn = None  # type: Optional[Connection]
@@ -33,6 +34,7 @@
         self.config = config
         self.sensors_data = SensorDatastore()
         self.sensors_run_on = set()  # type: Set[str]
+        self.os_spawned_nodes_ids = None  # type: List[int]
     def get_pool(self):
         return ThreadPoolExecutor(self.config.get('worker_pool_sz', 32))
diff --git a/wally/ b/wally/
index b27b1ba..45b67b4 100644
--- a/wally/
+++ b/wally/
@@ -260,7 +260,7 @@
     raise OSError("Can't define interface for {0}".format(target_ip))
-def open_for_append_or_create(fname: str) -> IO:
+def open_for_append_or_create(fname: str) -> IO[str]:
     if not os.path.exists(fname):
         return open(fname, "w")
@@ -289,18 +289,6 @@
     return data
-CLEANING = []  # type: List[Tuple[Callable[..., Any], List[Any], Dict[str, Any]]]
-def clean_resource(func: Callable[..., Any], *args: Any, **kwargs: Any) -> None:
-    CLEANING.append((func, list(args), kwargs))
-def iter_clean_func() -> Iterator[Tuple[Callable[..., Any], List[Any], Dict[str, Any]]]:
-    while CLEANING:
-        yield CLEANING.pop()
 def flatten(data: Iterable[Any]) -> List[Any]:
     res = []
     for i in data: