2.0 is on the way
diff --git a/configs-examples/full.yaml b/configs-examples/full.yaml
index f5f479b..0874f4f 100644
--- a/configs-examples/full.yaml
+++ b/configs-examples/full.yaml
@@ -3,7 +3,6 @@
 results_storage: /var/wally_results
 var_dir_root: /tmp/perf_tests
 settings_dir: ~/.wally
-discover: fuel_openrc_only
 collect_info: false
 suspend_unused_vms: true
@@ -42,15 +41,21 @@
         OPENRC: /home/koder/workspace/scale_openrc
+        auth: USER:PASSWD:KEY_FILE
-            - "wally-phytographic-sharla,ssh://ubuntu@{ip}::wally_vm_key.pem"
+            - "wally-phytographic-sharla,ubuntu,wally_vm_key.pem"
+discover: fuel,openstack,fuel_openrc_only
    online: true
        testnode: system-cpu, block-io, net-io
        ceph-osd: system-cpu, block-io, net-io
-       compute: system-cpu, block-io, net-io
+       compute:
+            system-cpu: *
+            block-io: sd*
+            net-io: *
     - start_test_nodes:
@@ -64,7 +69,7 @@
             - io:
                 node_limit: 2
-                cfg: ceph
+                load: ceph
                     FILENAME: /dev/vdb
                     TEST_FILE_SIZE: 100G
diff --git a/tests/fake_run_test.py b/tests/fake_run_test.py
deleted file mode 100644
index eeffde3..0000000
--- a/tests/fake_run_test.py
+++ /dev/null
@@ -1,106 +0,0 @@
-import json
-import sys
-import run_test
-logger = run_test.logger
-tool = None
-class FakeVMContext(object):
-    def __init__(self, *args, **kwargs):
-        pass
-    def __enter__(self):
-        return ["fake@fake"]
-    def __exit__(self, *args, **kwargs):
-        pass
-def fake_start_vms(*args, **kwargs):
-    return FakeVMContext
-class FakeFD(object):
-    def __init__(self, content):
-        self.content = content
-        self.channel = FakeChannel()
-    def read(self):
-        return self.content
-class FakeChannel(object):
-    def recv_exit_status(self):
-        return 0
-def get_fake_out(cmd):
-    empty_fd = FakeFD("")
-    if "pgbench" == tool:
-        if "run" in cmd:
-            out = FakeFD("2 1:43\n2 1:42\n4 2:77")
-        else:
-            out = empty_fd
-    elif "iozone" == tool or "fio" == tool:
-        data = {'__meta__': {
-            'direct': 1,
-            'action': 'r',
-            'concurence': 1,
-            'blocksize': 1,
-            'sync': 's'},
-            'bw': 10}
-        out = FakeFD(json.dumps(data))
-    else:
-        raise Exception("tool not found")
-    err = empty_fd
-    return empty_fd, out, err
-def fake_ssh_connect(*args, **kwargs):
-    return FakeSSH()
-class FakeSFTP(object):
-    def put(self, what, where):
-        logger.debug("Called sftp put with %s %s" % (what, where))
-    def chmod(self, f, mode):
-        logger.debug("called sftp chmod %s %s" % (mode, f))
-    def close(self):
-        logger.debug("called sftp close")
-class FakeSSH(object):
-    def exec_command(self, cmd, **kwargs):
-        return get_fake_out(cmd)
-    def close(self):
-        pass
-    def open_sftp(self):
-        return FakeSFTP()
-class FakePopen(object):
-    def __init__(self, cmd,
-                 shell=True,
-                 stdout=None,
-                 stderr=None,
-                 stdin=None):
-        self.stdin, self.stdout, self.stderr = get_fake_out(cmd)
-    def wait(self):
-        return 0
-if __name__ == '__main__':
-    run_test.subprocess.Popen = FakePopen
-    run_test.start_test_vms = fake_start_vms()
-    run_test.ssh_runner.ssh_connect = fake_ssh_connect
-    opts = run_test.parse_args(sys.argv[1:])
-    tool = opts.tool_type
-    exit(run_test.main(sys.argv[1:]))
diff --git a/tests/test_storage.py b/tests/test_storage.py
new file mode 100644
index 0000000..46f38e6
--- /dev/null
+++ b/tests/test_storage.py
@@ -0,0 +1,53 @@
+import shutil
+import tempfile
+import contextlib
+from oktest import ok, main, test
+from wally.storage import make_storage
+def in_temp_dir():
+    dname = tempfile.mkdtemp()
+    try:
+        yield dname
+    finally:
+        shutil.rmtree(dname)
+def test_basic():
+    with in_temp_dir() as root:
+        values = {
+            "int": 1,
+            "str/1": "test",
+            "bytes/2": b"test",
+            "none/s/1": None,
+            "bool/xx/1/2/1": None,
+            "float/s/1": 1.234,
+            "list": [1, 2, "3"],
+            "dict": {1: 3, "2": "4", "1.2": 1.3}
+        }
+        with make_storage(root, existing=False) as storage:
+            for path, val in values.items():
+                storage[path] = val
+        with make_storage(root, existing=True) as storage:
+            for path, val in values.items():
+                ok(storage[path])  == val
+def test_large_arrays():
+    pass
+def test_array_append():
+    pass
+def test_performance():
+    pass
diff --git a/v2_plans.md b/v2_plans.md
index be719b6..95ffc92 100644
--- a/v2_plans.md
+++ b/v2_plans.md
@@ -1,74 +1,64 @@
-    * revise type checking
+* Code:
     * use overloading module
-    Discover/reuse - returns NodeInfo
-    Connect - returns Node from NodeInfo
-    Node contains ssh, rpc interface and basic API
-    Add aio rpc client
     * Make storage class with dict-like interface
         - map path to value, e.g.  'cluster_info': yaml
-        - 'results/r20w80b60kQD10VM2/iops': [iops]
-        - should support both binary and text(yaml) formats, maybe store in both
+        - should support both binary and text(yaml) formats, maybe
+          store in both
         - store all results in it
-        - before call a stage/part check that it results is not awailable yet,
-          or chek this inside stage. Skip stage if data already awailable
-        - use for discovery, tests, etc
-    * aio?
-    * Update to newest fio
-    * Add fio build script to download fio from git and build it
-    * Add integration tests with nbd
-    * Move from threads to QD to mitigate fio issues
-    * Use agent to communicate with remote node
-    * fix existing folder detection
-    * fio load reporters
-    * Results stored in archived binary format for fast parsing (sqlite)?
-    * Split all code on separated modules:
-        * logging
-        * Test run class
-        * Test stage class
-    * Results are set of timeseries with attached descriptions
+        - Results stored in archived binary format for fast parsing
+    * Collect and store cluster info
+    * Simplify settings
+    * Unit-tests
+    * 'perf' sensor
+    * ftrace, https://github.com/iovisor/bcc, etc
+    * Config validation
+    * Add sync 4k write with small set of thcount
-    * move agent and ssh code to separated library
-    * plugins for agent
-    * evaluate bokeh for visualization
-        https://github.com/cronburg/ceph-viz/tree/master/histogram
+* Infra:
+    * Add script to download fio from git and build it
+    * Docker/lxd public container as default distribution way
 * Statistical result check and report:
-    - check results distribution
-    - warn for non-normal results
-    - correct comparison
-    - detect internal pattern
-    https://habrahabr.ru/post/311092/
-    https://blog.cloudera.com/blog/2015/12/common-probability-distributions-the-data-scientists-crib-sheet/
-    http://docs.scipy.org/doc/scipy-0.14.0/reference/generated/scipy.stats.mstats.normaltest.html
-    http://profitraders.com/Math/Shapiro.html
-    http://www.machinelearning.ru/wiki/index.php?title=%D0%9A%D1%80%D0%B8%D1%82%D0%B5%D1%80%D0%B8%D0%B9_%D1%85%D0%B8-%D0%BA%D0%B2%D0%B0%D0%B4%D1%80%D0%B0%D1%82
-    http://docs.scipy.org/doc/numpy/reference/generated/numpy.fft.fft.html#numpy.fft.fft
-    https://en.wikipedia.org/wiki/Log-normal_distribution
-    http://stats.stackexchange.com/questions/25709/what-distribution-is-most-commonly-used-to-model-server-response-time
-    http://www.lognormal.com/features/
-    http://blog.simiacryptus.com/2015/10/modeling-network-latency.html
+    * Comprehensive report with results histograms and other
+    * Check results distribution
+    * Warn for non-normal results
+    * Check that distribution of different parts is close. Average
+      performance should be steady across test
+    * Graphs for raw data over time
+    * Save pictures from report in jpg in separated folder
+    * Node histogram distribution
+    * Interactive report, which shows different plots and data,
+      depending on selected visualization type
+    * Offload simple report table to cvs/yaml/json/test/ascii_table
+    * fio load reporters (visualizers), ceph report tool
+        https://github.com/cronburg/ceph-viz/tree/master/histogram
+    * evaluate bokeh for visualization
+    * flamegraph for 'perf' output - https://www.youtube.com/watch?v=nZfNehCzGdw
+    * detect internal pattern:
+        - FFT
+        - http://mabrek.github.io/
+        - https://github.com/rushter/MLAlgorithms
+        - https://github.com/rushter/data-science-blogs
+        - https://habrahabr.ru/post/311092/
+        - https://blog.cloudera.com/blog/2015/12/common-probability-distributions-the-data-scientists-crib-sheet/
+        - http://docs.scipy.org/doc/scipy-0.14.0/reference/generated/scipy.stats.mstats.normaltest.html
+        - http://profitraders.com/Math/Shapiro.html
+        - http://www.machinelearning.ru/wiki/index.php?title=%D0%9A%D1%80%D0%B8%D1%82%D0%B5%D1%80%D0%B8%D0%B9_%D1%85%D0%B8-%D0%BA%D0%B2%D0%B0%D0%B4%D1%80%D0%B0%D1%82
+        - http://docs.scipy.org/doc/numpy/reference/generated/numpy.fft.fft.html#numpy.fft.fft
+        - https://en.wikipedia.org/wiki/Log-normal_distribution
+        - http://stats.stackexchange.com/questions/25709/what-distribution-is-most-commonly-used-to-model-server-response-time
+        - http://www.lognormal.com/features/
+        - http://blog.simiacryptus.com/2015/10/modeling-network-latency.html
+* Intelectual postprocessing:
+    * Difference calculation
+    * Resource usage calculator/visualizer, bottleneck hunter
+    * correct comparison between different systems
-* Collect and store cluster info
-* Resume stopped/paused run
-* Difference calculation
-* Resource usage calculator/visualizer
-* Bottleneck hunter
-* Comprehensive report with results histograms and other
-* python3.5
-* Docker/lxd public container as default distribution way
-* Allow to reuse vm from previous run (store connection config, keys and vm id's in run info)
-* Simplify settings
-* Save pictures from report in jpg in separated folder
-* Node histogram distribution
-* Integration with ceph report tool
-* Automatically scale QD till saturation
-* Integrate vdbench/spc/TPC/TPB
-* Runtime visualization
+* Maybe move to 2.1:
+    * Automatically scale QD till saturation
+    * Runtime visualization
+    * Integrate vdbench/spc/TPC/TPB
+    * Add aio rpc client
+    * Add integration tests with nbd
+    * fix existing folder detection
diff --git a/wally/discover/discover.py b/wally/discover/discover.py
index e0d7dab..35ddc62 100644
--- a/wally/discover/discover.py
+++ b/wally/discover/discover.py
@@ -13,6 +13,7 @@
 from ..node_interfaces import NodeInfo
 from ..node import connect, setup_rpc
 from ..ssh_utils import parse_ssh_uri
+from ..test_run_class import TestRun
 logger = logging.getLogger("wally.discover")
@@ -39,7 +40,10 @@
 DiscoveryResult = NamedTuple("DiscoveryResult", [("os_creds", Optional[OSCreds]), ("nodes", List[NodeInfo])])
-def discover(discover_list: List[str], clusters_info: ConfigBlock, discover_nodes: bool = True) -> DiscoveryResult:
+def discover(ctx: TestRun,
+             discover_list: List[str],
+             clusters_info: ConfigBlock,
+             discover_nodes: bool = True) -> DiscoveryResult:
     """Discover nodes in clusters"""
     new_nodes = []  # type: List[NodeInfo]
@@ -52,21 +56,7 @@
             cluster_info = clusters_info["openstack"]  # type: ConfigBlock
-            conn = cluster_info['connection']  # type: ConfigBlock
-            if not conn:
-                logger.error("No connection provided for %s. Skipping", cluster)
-                continue
-            user, passwd, tenant = parse_creds(conn['creds'])
-            auth_data = dict(auth_url=conn['auth_url'],
-                             username=user,
-                             api_key=passwd,
-                             project_id=tenant)  # type: Dict[str, str]
-            logger.debug("Discovering openstack nodes with connection details: %r", conn)
-            new_nodes.extend(openstack.discover_openstack_nodes(auth_data, cluster_info))
+            new_nodes.extend(openstack.discover_openstack_nodes(ctx.os_connection, cluster_info))
         elif cluster == "fuel" or cluster == "fuel_openrc_only":
             if cluster == "fuel_openrc_only":
@@ -74,15 +64,17 @@
             fuel_node_info = NodeInfo(parse_ssh_uri(clusters_info['fuel']['ssh_creds']), {'fuel_master'})
-                fuel_rpc_conn = setup_rpc(connect(fuel_node_info))
+                fuel_rpc_conn = setup_rpc(connect(fuel_node_info), ctx.rpc_code)
             except AuthenticationException:
                 raise StopTestError("Wrong fuel credentials")
             except Exception:
                 logger.exception("While connection to FUEL")
                 raise StopTestError("Failed to connect to FUEL")
+            # TODO(koder): keep FUEL rpc in context? Maybe open this connection on upper stack level?
             with fuel_rpc_conn:
-                nodes, fuel_info = fuel.discover_fuel_nodes(fuel_rpc_conn, clusters_info['fuel'], discover_nodes)
+                nodes, fuel_info = fuel.discover_fuel_nodes(
+                    fuel_rpc_conn, ctx.fuel_conn, clusters_info['fuel'], discover_nodes)
                 if fuel_info.openrc:
@@ -105,7 +97,7 @@
                 conf = clusters_info["ceph"].get("conf")
                 key = clusters_info["ceph"].get("key")
                 info = NodeInfo(parse_ssh_uri(root_node_uri), set())
-                with setup_rpc(connect(info)) as ceph_root_conn:
+                with setup_rpc(connect(info), ctx.rpc_code) as ceph_root_conn:
                     new_nodes.extend(ceph.discover_ceph_nodes(ceph_root_conn, cluster=cluster, conf=conf, key=key))
                 logger.warning("Skip ceph cluster discovery")
diff --git a/wally/discover/fuel.py b/wally/discover/fuel.py
index 119cbd0..7b4f90f 100644
--- a/wally/discover/fuel.py
+++ b/wally/discover/fuel.py
@@ -1,12 +1,12 @@
 import logging
 import socket
-from typing import Dict, Any, Tuple, List, NamedTuple, Union
+from typing import Dict, Any, Tuple, List, NamedTuple, Union, cast
 from urllib.parse import urlparse
 from .. import fuel_rest_api
 from ..node_interfaces import NodeInfo, IRPCNode
 from ..ssh_utils import ConnCreds
-from ..utils import parse_creds, check_input_param
+from ..utils import check_input_param
 logger = logging.getLogger("wally.discover")
@@ -18,50 +18,41 @@
 def discover_fuel_nodes(fuel_master_node: IRPCNode,
+                        fuel_conn: fuel_rest_api.Connection,
                         fuel_data: Dict[str, Any],
                         discover_nodes: bool = True) -> Tuple[List[NodeInfo], FuelNodeInfo]:
     """Discover nodes in fuel cluster, get openrc for selected cluster"""
-    # parse FUEL REST credentials
-    username, tenant_name, password = parse_creds(fuel_data['creds'])
-    creds = {"username": username,
-             "tenant_name": tenant_name,
-             "password": password}
-    # connect to FUEL
-    conn = fuel_rest_api.KeystoneAuth(fuel_data['url'], creds, headers=None)
     msg = "openstack_env should be provided in fuel config"
     check_input_param('openstack_env' in fuel_data, msg)
     # get cluster information from REST API
-    cluster_id = fuel_rest_api.get_cluster_id(conn, fuel_data['openstack_env'])
-    cluster = fuel_rest_api.reflect_cluster(conn, cluster_id)
-    version = fuel_rest_api.FuelInfo(conn).get_version()
+    cluster_id = fuel_rest_api.get_cluster_id(fuel_conn, fuel_data['openstack_env'])
+    cluster = fuel_rest_api.reflect_cluster(fuel_conn, cluster_id)
+    version = fuel_rest_api.FuelInfo(fuel_conn).get_version()
     if not discover_nodes:
         logger.warning("Skip fuel cluster discovery")
-        return [], FuelNodeInfo(version, None, cluster.get_openrc())
+        return [], FuelNodeInfo(version, None, cluster.get_openrc())  # type: ignore
-    fuel_nodes = list(cluster.get_nodes())
-    logger.info("Found FUEL {0}".format(".".join(map(str, version))))
-    network = 'fuelweb_admin' if version >= [6, 0] else 'admin'
-    fuel_host = urlparse(fuel_data['url']).hostname
-    fuel_ip = socket.gethostbyname(fuel_host)
-    fuel_ext_iface = fuel_master_node.get_interface(fuel_ip)
+    logger.info("Found fuel {0}".format(".".join(map(str, version))))
     # get FUEL master key to connect to cluster nodes via ssh
     logger.debug("Downloading fuel master key")
     fuel_key = fuel_master_node.get_file_content('/root/.ssh/id_rsa')
+    network = 'fuelweb_admin' if version >= [6, 0] else 'admin'
+    fuel_ip = socket.gethostbyname(fuel_conn.host)
+    fuel_ext_iface = fuel_master_node.get_interface(fuel_ip)
     nodes = []
-    for fuel_node in fuel_nodes:
+    for fuel_node in list(cluster.get_nodes()):
         ip = str(fuel_node.get_ip(network))
-        nodes.append(NodeInfo(ConnCreds(ip, "root", key=fuel_key), roles=set(fuel_node.get_roles())))
+        creds = ConnCreds(ip, "root", key=fuel_key)
+        nodes.append(NodeInfo(creds, roles=set(fuel_node.get_roles())))
     logger.debug("Found {} fuel nodes for env {}".format(len(nodes), fuel_data['openstack_env']))
-    return nodes, FuelNodeInfo(version, fuel_ext_iface, cluster.get_openrc())
+    return nodes, FuelNodeInfo(version, fuel_ext_iface,
+                               cast(Dict[str, Union[str, bool]], cluster.get_openrc()))
diff --git a/wally/discover/openstack.py b/wally/discover/openstack.py
index 88e8656..f590359 100644
--- a/wally/discover/openstack.py
+++ b/wally/discover/openstack.py
@@ -1,13 +1,11 @@
 import socket
 import logging
-from typing import Dict, Any, List
-from novaclient.client import Client
+from typing import Dict, Any, List, Optional, cast
 from ..node_interfaces import NodeInfo
 from ..config import ConfigBlock
-from ..utils import parse_creds
+from ..ssh_utils import ConnCreds
+from ..start_vms import OSConnection, NovaClient
 logger = logging.getLogger("wally.discover")
@@ -24,73 +22,45 @@
     raise ValueError("VM {} has no floating ip".format(vm))
-def get_ssh_url(user: str, password: str, ip: str, key: str) -> str:
-    """Get ssh connection URL from parts"""
-    if password is not None:
-        assert key is None, "Both key and password provided"
-        return "ssh://{}:{}@{}".format(user, password, ip)
-    else:
-        assert key is not None, "None of key/password provided"
-        return "ssh://{}@{}::{}".format(user, ip, key)
-def discover_vms(client: Client, search_opts: Dict) -> List[NodeInfo]:
+def discover_vms(client: NovaClient, search_data: str) -> List[NodeInfo]:
     """Discover virtual machines"""
-    user, password, key = parse_creds(search_opts.pop('auth'))
-    servers = client.servers.list(search_opts=search_opts)
+    name, user, key_file = search_data.split(",")
+    servers = client.servers.list(search_opts={"name": name})
     logger.debug("Found %s openstack vms" % len(servers))
     nodes = []  # type: List[NodeInfo]
     for server in servers:
         ip = get_floating_ip(server)
-        nodes.append(NodeInfo(get_ssh_url(user, password, ip, key), roles={"test_vm"}))
+        creds = ConnCreds(host=ip, user=user, key_file=key_file)
+        nodes.append(NodeInfo(creds, roles={"test_vm"}))
     return nodes
-def discover_services(client: Client, opts: Dict[str, Any]) -> List[NodeInfo]:
+def discover_openstack_nodes(conn: OSConnection, conf: ConfigBlock) -> List[NodeInfo]:
     """Discover openstack services for given cluster"""
-    user, password, key = parse_creds(opts.pop('auth'))
+    os_nodes_auth = conf['auth']  # type: str
-    services = []
-    if opts['service'] == "all":
-        services = client.services.list()
+    if os_nodes_auth.count(":") == 2:
+        user, password, key_file = os_nodes_auth.split(":")  # type: str, Optional[str], Optional[str]
+        if not password:
+            password = None
-        if isinstance(opts['service'], str):
-            opts['service'] = [opts['service']]
+        user, password = os_nodes_auth.split(":")
+        key_file = None
-        for s in opts['service']:
-            services.extend(client.services.list(binary=s))
-    host_services_mapping = {}  # type: Dict[str, [str]]
+    services = conn.nova.services.list()  # type: List[Any]
+    host_services_mapping = {}  # type: Dict[str, List[str]]
     for service in services:
-        ip = socket.gethostbyname(service.host)
+        ip = cast(str, socket.gethostbyname(service.host))
         host_services_mapping.get(ip, []).append(service.binary)
-    logger.debug("Found %s openstack service nodes" %
-                 len(host_services_mapping))
+    logger.debug("Found %s openstack service nodes" % len(host_services_mapping))
     nodes = []  # type: List[NodeInfo]
     for host, services in host_services_mapping.items():
-        ssh_url = get_ssh_url(user, password, host, key)
-        nodes.append(NodeInfo(ssh_url, services))
+        creds = ConnCreds(host=host, user=user, passwd=password, key_file=key_file)
+        nodes.append(NodeInfo(creds, set(services)))
     return nodes
-def discover_openstack_nodes(conn_details: Dict[str, str], conf: ConfigBlock) -> List[NodeInfo]:
-    """Discover vms running in openstack
-    conn_details - dict with openstack connection details -
-        auth_url, api_key (password), username
-    conf - test configuration object
-    """
-    client = Client(version='1.1', **conn_details)
-    if conf.get('discover'):
-        services_to_discover = conf['discover'].get('nodes')
-        if services_to_discover:
-            return discover_services(client, services_to_discover)
-    return []
diff --git a/wally/fuel_rest_api.py b/wally/fuel_rest_api.py
index 2addaf4..6117606 100644
--- a/wally/fuel_rest_api.py
+++ b/wally/fuel_rest_api.py
@@ -2,9 +2,10 @@
 import abc
 import json
 import logging
-import urllib.request
 import urllib.parse
-from typing import Dict, Any, Iterator, Match, List, Callable
+import urllib.error
+import urllib.request
+from typing import Dict, Any, Iterator, List, Callable, cast
 from functools import partial
 import netaddr
@@ -16,13 +17,14 @@
 class Connection(metaclass=abc.ABCMeta):
+    host = None  # type: str
     def do(self, method: str, path: str, params: Dict = None) -> Dict:
-    @abc.abstractmethod
     def get(self, path: str, params: Dict = None) -> Dict:
-        pass
+        return self.do("GET", path, params)
 class Urllib2HTTP(Connection):
@@ -47,7 +49,7 @@
             self.headers  = headers
-    def do(self, method: str, path: str, params: Dict = None) -> Dict:
+    def do(self, method: str, path: str, params: Dict = None) -> Any:
         if path.startswith('/'):
             url = self.root_url + path
@@ -62,17 +64,17 @@
         logger.debug("HTTP: {0} {1}".format(method.upper(), url))
         request = urllib.request.Request(url,
-                                         data=data_json,
+                                         data=data_json.encode("utf8"),
         if data_json is not None:
             request.add_header('Content-Type', 'application/json')
-        request.get_method = lambda: method.upper()
+        request.get_method = lambda: method.upper()  # type: ignore
         response = urllib.request.urlopen(request)
+        code = response.code  # type: ignore
-        logger.debug("HTTP Responce: {0}".format(response.code))
-        if response.code < 200 or response.code > 209:
+        logger.debug("HTTP Responce: {0}".format(code))
+        if code < 200 or code > 209:
             raise IndexError(url)
         content = response.read()
@@ -80,9 +82,9 @@
         if '' == content:
             return None
-        return json.loads(content)
+        return json.loads(content.decode("utf8"))
-    def __getattr__(self, name: str):
+    def __getattr__(self, name: str) -> Any:
         if name in self.allowed_methods:
             return partial(self.do, name)
         raise AttributeError(name)
@@ -107,11 +109,11 @@
                 'Cant establish connection to keystone with url %s',
-    def do(self, method: str, path: str, params: Dict[str, str] = None) -> Dict[str, Any]:
+    def do(self, method: str, path: str, params: Dict[str, str] = None) -> Any:
         """Do request. If gets 401 refresh token"""
             return super(KeystoneAuth, self).do(method, path, params)
-        except urllib.request.HTTPError as e:
+        except urllib.error.HTTPError as e:
             if e.code == 401:
                     'Authorization failure: {0}'.format(e.read()))
@@ -121,17 +123,17 @@
-def get_inline_param_list(url: str) -> Iterator[Match]:
+def get_inline_param_list(url: str) -> Iterator[str]:
     format_param_rr = re.compile(r"\{([a-zA-Z_]+)\}")
     for match in format_param_rr.finditer(url):
         yield match.group(1)
 class RestObj:
-    name = None
-    id = None
+    name = None  # type: str
+    id = None   # type: int
-    def __init__(self, conn, **kwargs) -> None:
+    def __init__(self, conn: Connection, **kwargs: Any) -> None:
         self.__connection__ = conn
@@ -148,8 +150,8 @@
         return getattr(self, item)
-def make_call(method: str, url: str) -> Callable[[Any, Any], Dict]:
-    def closure(obj: Any, entire_obj: Any = None, **data) -> Dict:
+def make_call(method: str, url: str) -> Callable:
+    def closure(obj: Any, entire_obj: Any = None, **data: Any) -> Any:
         inline_params_vals = {}
         for name in get_inline_param_list(url):
             if name in data:
@@ -167,9 +169,12 @@
     return closure
-PUT = partial(make_call, 'put')
-GET = partial(make_call, 'get')
-DELETE = partial(make_call, 'delete')
+RequestMethod = Callable[[str], Callable]
+PUT = cast(RequestMethod, partial(make_call, 'put'))  # type: RequestMethod
+GET = cast(RequestMethod, partial(make_call, 'get'))  # type: RequestMethod
+DELETE = cast(RequestMethod, partial(make_call, 'delete')) # type: RequestMethod
 # -------------------------------  ORM ----------------------------------------
@@ -270,14 +275,13 @@
         super(Cluster, self).__init__(*dt, **mp)
         self.nodes = NodeList([Node(self.__connection__, **node) for node in
-        self.network_roles = {}
     def check_exists(self) -> bool:
         """Check if cluster exists"""
             return True
-        except urllib.request.HTTPError as err:
+        except urllib.error.HTTPError as err:
             if err.code == 404:
                 return False
diff --git a/wally/main.py b/wally/main.py
index 360506e..3e1fcb3 100644
--- a/wally/main.py
+++ b/wally/main.py
@@ -162,6 +162,7 @@
             storage['config'] = config  # type: ignore
+            run_test.clouds_connect_stage,
diff --git a/wally/meta_info.py b/wally/meta_info.py
index e0e2b30..7dc3901 100644
--- a/wally/meta_info.py
+++ b/wally/meta_info.py
@@ -1,4 +1,4 @@
-from typing import Any, Dict, Union, List
+from typing import Any, Dict, List
 from .fuel_rest_api import KeystoneAuth, FuelInfo
@@ -21,13 +21,13 @@
     return lab_data
-def collect_lab_data(url: str, cred: Dict[str, str]) -> Dict[str, Union[List[Dict[str, str]], str]]:
+def collect_lab_data(url: str, cred: Dict[str, str]) -> Dict[str, Any]:
     finfo = FuelInfo(KeystoneAuth(url, cred))
     nodes = []  # type: List[Dict[str, str]]
-    result = {}
+    result = {}  # type: Dict[str, Any]
-    for node in finfo.get_nodes():
+    for node in finfo.get_nodes():  # type: ignore
         node_info = {
             'name': node['name'],
             'processors': [],
@@ -51,4 +51,5 @@
     result['nodes'] = nodes
     result['fuel_version'] = finfo.get_version()
     result['total_info'] = total_lab_info(nodes)
     return result
diff --git a/wally/node_interfaces.py b/wally/node_interfaces.py
index e75c2a3..ac83267 100644
--- a/wally/node_interfaces.py
+++ b/wally/node_interfaces.py
@@ -54,6 +54,7 @@
 class IRPCNode(metaclass=abc.ABCMeta):
     """Remote filesystem interface"""
     info = None  # type: NodeInfo
+    conn = None  # type: Any
     def run(self, cmd: str, timeout: int = 60, nolog: bool = False) -> str:
diff --git a/wally/report.py b/wally/report.py
index ed3c362..88c97b7 100644
--- a/wally/report.py
+++ b/wally/report.py
@@ -5,7 +5,7 @@
 import itertools
 import collections
 from io import StringIO
-from typing import Dict
+from typing import Dict, Any, Iterator, Tuple, cast
     import numpy
@@ -19,6 +19,8 @@
 import wally
 from .utils import ssize2b
 from .statistic import round_3_digit
+from .storage import Storage
+from .result_classes import TestInfo, FullTestResult, SensorInfo
 from .suits.io.fio_task_parser import (get_test_sync_mode,
@@ -28,811 +30,1546 @@
 logger = logging.getLogger("wally.report")
-class DiskInfo:
-    def __init__(self):
-        self.direct_iops_r_max = 0
-        self.direct_iops_w_max = 0
+def load_test_results(storage: Storage) -> Iterator[FullTestResult]:
+    sensors_data = {}  # type: Dict[Tuple[str, str, str], SensorInfo]
-        # 64 used instead of 4k to faster feed caches
-        self.direct_iops_w64_max = 0
+    for _, node_id in storage.list("metric"):
+        for _, dev_name in storage.list("metric", node_id):
+            for _, sensor_name in storage.list("metric", node_id, dev_name):
+                key = (node_id, dev_name, sensor_name)
+                si = SensorInfo(*key)
+                si.begin_time, si.end_time, si.data = storage["metric/{}/{}/{}".format(*key)]  # type: ignore
+                sensors_data[key] = si
-        self.rws4k_10ms = 0
-        self.rws4k_30ms = 0
-        self.rws4k_100ms = 0
-        self.bw_write_max = 0
-        self.bw_read_max = 0
+    for _, run_id in storage.list("result"):
+        path = "result/" + run_id
+        ftr = FullTestResult()
+        ftr.info = storage.load(TestInfo, path, "info")
+        ftr.performance_data = {}
+        p1 = "result/{}/measurement".format(run_id)
+        for _, node_id in storage.list(p1):
+            for _, measurement_name in storage.list(p1, node_id):
+                perf_key = (node_id, measurement_name)
+                ftr.performance_data[perf_key] = storage["{}/{}/{}".format(p1, *perf_key)]  # type: ignore
-report_funcs = []
+        yield ftr
-class Attrmapper(object):
-    def __init__(self, dct: Dict):
-        self.__dct = dct
-    def __getattr__(self, name):
-        try:
-            return self.__dct[name]
-        except KeyError:
-            raise AttributeError(name)
-class PerfInfo(object):
-    def __init__(self, name, summary, intervals, params, testnodes_count):
-        self.name = name
-        self.bw = None
-        self.iops = None
-        self.lat = None
-        self.lat_50 = None
-        self.lat_95 = None
-        self.raw_bw = []
-        self.raw_iops = []
-        self.raw_lat = []
-        self.params = params
-        self.intervals = intervals
-        self.testnodes_count = testnodes_count
-        self.summary = summary
-        self.p = Attrmapper(self.params.vals)
-        self.sync_mode = get_test_sync_mode(self.params)
-        self.concurence = self.params.vals.get('numjobs', 1)
-# disk_info = None
-# base = None
-# linearity = None
-def group_by_name(test_data):
-    name_map = collections.defaultdict(lambda: [])
-    for data in test_data:
-        name_map[(data.name, data.summary())].append(data)
-    return name_map
-def report(name, required_fields):
-    def closure(func):
-        report_funcs.append((required_fields.split(","), name, func))
-        return func
-    return closure
-def get_test_lcheck_params(pinfo):
-    res = [{
-        's': 'sync',
-        'd': 'direct',
-        'a': 'async',
-        'x': 'sync direct'
-    }[pinfo.sync_mode]]
-    res.append(pinfo.p.rw)
-    return " ".join(res)
-def get_emb_data_svg(plt):
-    sio = StringIO()
-    plt.savefig(sio, format='svg')
-    img_start = "<!-- Created with matplotlib (http://matplotlib.org/) -->"
-    return sio.getvalue().split(img_start, 1)[1]
-def get_template(templ_name):
-    very_root_dir = os.path.dirname(os.path.dirname(wally.__file__))
-    templ_dir = os.path.join(very_root_dir, 'report_templates')
-    templ_file = os.path.join(templ_dir, templ_name)
-    return open(templ_file, 'r').read()
-def group_by(data, func):
-    if len(data) < 2:
-        yield data
-        return
-    ndata = [(func(dt), dt) for dt in data]
-    ndata.sort(key=func)
-    pkey, dt = ndata[0]
-    curr_list = [dt]
-    for key, val in ndata[1:]:
-        if pkey != key:
-            yield curr_list
-            curr_list = [val]
-        else:
-            curr_list.append(val)
-        pkey = key
-    yield curr_list
-@report('linearity', 'linearity_test')
-def linearity_report(processed_results, lab_info, comment):
-    labels_and_data_mp = collections.defaultdict(lambda: [])
-    vls = {}
-    # plot io_time = func(bsize)
-    for res in processed_results.values():
-        if res.name.startswith('linearity_test'):
-            iotimes = [1000. / val for val in res.iops.raw]
-            op_summ = get_test_summary(res.params)[:3]
-            labels_and_data_mp[op_summ].append(
-                [res.p.blocksize, res.iops.raw, iotimes])
-            cvls = res.params.vals.copy()
-            del cvls['blocksize']
-            del cvls['rw']
-            cvls.pop('sync', None)
-            cvls.pop('direct', None)
-            cvls.pop('buffered', None)
-            if op_summ not in vls:
-                vls[op_summ] = cvls
-            else:
-                assert cvls == vls[op_summ]
-    all_labels = None
-    _, ax1 = plt.subplots()
-    for name, labels_and_data in labels_and_data_mp.items():
-        labels_and_data.sort(key=lambda x: ssize2b(x[0]))
-        labels, _, iotimes = zip(*labels_and_data)
-        if all_labels is None:
-            all_labels = labels
-        else:
-            assert all_labels == labels
-        plt.boxplot(iotimes)
-        if len(labels_and_data) > 2 and \
-           ssize2b(labels_and_data[-2][0]) >= 4096:
-            xt = range(1, len(labels) + 1)
-            def io_time(sz, bw, initial_lat):
-                return sz / bw + initial_lat
-            x = numpy.array(map(ssize2b, labels))
-            y = numpy.array([sum(dt) / len(dt) for dt in iotimes])
-            popt, _ = scipy.optimize.curve_fit(io_time, x, y, p0=(100., 1.))
-            y1 = io_time(x, *popt)
-            plt.plot(xt, y1, linestyle='--',
-                     label=name + ' LS linear approx')
-            for idx, (sz, _, _) in enumerate(labels_and_data):
-                if ssize2b(sz) >= 4096:
-                    break
-            bw = (x[-1] - x[idx]) / (y[-1] - y[idx])
-            lat = y[-1] - x[-1] / bw
-            y2 = io_time(x, bw, lat)
-            plt.plot(xt, y2, linestyle='--',
-                     label=abbv_name_to_full(name) +
-                     ' (4k & max) linear approx')
-    plt.setp(ax1, xticklabels=labels)
-    plt.xlabel("Block size")
-    plt.ylabel("IO time, ms")
-    plt.subplots_adjust(top=0.85)
-    plt.legend(bbox_to_anchor=(0.5, 1.15),
-               loc='upper center',
-               prop={'size': 10}, ncol=2)
-    plt.grid()
-    iotime_plot = get_emb_data_svg(plt)
-    plt.clf()
-    # plot IOPS = func(bsize)
-    _, ax1 = plt.subplots()
-    for name, labels_and_data in labels_and_data_mp.items():
-        labels_and_data.sort(key=lambda x: ssize2b(x[0]))
-        _, data, _ = zip(*labels_and_data)
-        plt.boxplot(data)
-        avg = [float(sum(arr)) / len(arr) for arr in data]
-        xt = range(1, len(data) + 1)
-        plt.plot(xt, avg, linestyle='--',
-                 label=abbv_name_to_full(name) + " avg")
-    plt.setp(ax1, xticklabels=labels)
-    plt.xlabel("Block size")
-    plt.ylabel("IOPS")
-    plt.legend(bbox_to_anchor=(0.5, 1.15),
-               loc='upper center',
-               prop={'size': 10}, ncol=2)
-    plt.grid()
-    plt.subplots_adjust(top=0.85)
-    iops_plot = get_emb_data_svg(plt)
-    res = set(get_test_lcheck_params(res) for res in processed_results.values())
-    ncount = list(set(res.testnodes_count for res in processed_results.values()))
-    conc = list(set(res.concurence for res in processed_results.values()))
-    assert len(conc) == 1
-    assert len(ncount) == 1
-    descr = {
-        'vm_count': ncount[0],
-        'concurence': conc[0],
-        'oper_descr': ", ".join(res).capitalize()
-    }
-    params_map = {'iotime_vs_size': iotime_plot,
-                  'iops_vs_size': iops_plot,
-                  'descr': descr}
-    return get_template('report_linearity.html').format(**params_map)
-@report('lat_vs_iops', 'lat_vs_iops')
-def lat_vs_iops(processed_results, lab_info, comment):
-    lat_iops = collections.defaultdict(lambda: [])
-    requsted_vs_real = collections.defaultdict(lambda: {})
-    for res in processed_results.values():
-        if res.name.startswith('lat_vs_iops'):
-            lat_iops[res.concurence].append((res.lat,
-                                             0,
-                                             res.iops.average,
-                                             res.iops.deviation))
-            # lat_iops[res.concurence].append((res.lat.average / 1000.0,
-            #                                  res.lat.deviation / 1000.0,
-            #                                  res.iops.average,
-            #                                  res.iops.deviation))
-            requested_iops = res.p.rate_iops * res.concurence
-            requsted_vs_real[res.concurence][requested_iops] = \
-                (res.iops.average, res.iops.deviation)
-    colors = ['red', 'green', 'blue', 'orange', 'magenta', "teal"]
-    colors_it = iter(colors)
-    for conc, lat_iops in sorted(lat_iops.items()):
-        lat, dev, iops, iops_dev = zip(*lat_iops)
-        plt.errorbar(iops, lat, xerr=iops_dev, yerr=dev, fmt='ro',
-                     label=str(conc) + " threads",
-                     color=next(colors_it))
-    plt.xlabel("IOPS")
-    plt.ylabel("Latency, ms")
-    plt.grid()
-    plt.legend(loc=0)
-    plt_iops_vs_lat = get_emb_data_svg(plt)
-    plt.clf()
-    colors_it = iter(colors)
-    for conc, req_vs_real in sorted(requsted_vs_real.items()):
-        req, real = zip(*sorted(req_vs_real.items()))
-        iops, dev = zip(*real)
-        plt.errorbar(req, iops, yerr=dev, fmt='ro',
-                     label=str(conc) + " threads",
-                     color=next(colors_it))
-    plt.xlabel("Requested IOPS")
-    plt.ylabel("Get IOPS")
-    plt.grid()
-    plt.legend(loc=0)
-    plt_iops_vs_requested = get_emb_data_svg(plt)
-    res1 = processed_results.values()[0]
-    params_map = {'iops_vs_lat': plt_iops_vs_lat,
-                  'iops_vs_requested': plt_iops_vs_requested,
-                  'oper_descr': get_test_lcheck_params(res1).capitalize()}
-    return get_template('report_iops_vs_lat.html').format(**params_map)
-def render_all_html(comment, info, lab_description, images, templ_name):
-    data = info.__dict__.copy()
-    for name, val in data.items():
-        if not name.startswith('__'):
-            if val is None:
-                if name in ('direct_iops_w64_max', 'direct_iops_w_max'):
-                    data[name] = ('-', '-', '-')
-                else:
-                    data[name] = '-'
-            elif isinstance(val, (int, float, long)):
-                data[name] = round_3_digit(val)
-    data['bw_read_max'] = (data['bw_read_max'][0] // 1024,
-                           data['bw_read_max'][1],
-                           data['bw_read_max'][2])
-    data['bw_write_max'] = (data['bw_write_max'][0] // 1024,
-                            data['bw_write_max'][1],
-                            data['bw_write_max'][2])
-    images.update(data)
-    templ = get_template(templ_name)
-    return templ.format(lab_info=lab_description,
-                        comment=comment,
-                        **images)
-def io_chart(title, concurence,
-             latv, latv_min, latv_max,
-             iops_or_bw, iops_or_bw_err,
-             legend,
-             log_iops=False,
-             log_lat=False,
-             boxplots=False,
-             latv_50=None,
-             latv_95=None,
-             error2=None):
-    matplotlib.rcParams.update({'font.size': 10})
-    points = " MiBps" if legend == 'BW' else ""
-    lc = len(concurence)
-    width = 0.35
-    xt = range(1, lc + 1)
-    op_per_vm = [v / (vm * th) for v, (vm, th) in zip(iops_or_bw, concurence)]
-    fig, p1 = plt.subplots()
-    xpos = [i - width / 2 for i in xt]
-    p1.bar(xpos, iops_or_bw,
-           width=width,
-           color='y',
-           label=legend)
-    err1_leg = None
-    for pos, y, err in zip(xpos, iops_or_bw, iops_or_bw_err):
-        err1_leg = p1.errorbar(pos + width / 2,
-                               y,
-                               err,
-                               color='magenta')
-    err2_leg = None
-    if error2 is not None:
-        for pos, y, err in zip(xpos, iops_or_bw, error2):
-            err2_leg = p1.errorbar(pos + width / 2 + 0.08,
-                                   y,
-                                   err,
-                                   lw=2,
-                                   alpha=0.5,
-                                   color='teal')
-    p1.grid(True)
-    p1.plot(xt, op_per_vm, '--', label=legend + "/thread", color='black')
-    handles1, labels1 = p1.get_legend_handles_labels()
-    handles1 += [err1_leg]
-    labels1 += ["95% conf"]
-    if err2_leg is not None:
-        handles1 += [err2_leg]
-        labels1 += ["95% dev"]
-    p2 = p1.twinx()
-    if latv_50 is None:
-        p2.plot(xt, latv_max, label="lat max")
-        p2.plot(xt, latv, label="lat avg")
-        p2.plot(xt, latv_min, label="lat min")
-    else:
-        p2.plot(xt, latv_50, label="lat med")
-        p2.plot(xt, latv_95, label="lat 95%")
-    plt.xlim(0.5, lc + 0.5)
-    plt.xticks(xt, ["{0} * {1}".format(vm, th) for (vm, th) in concurence])
-    p1.set_xlabel("VM Count * Thread per VM")
-    p1.set_ylabel(legend + points)
-    p2.set_ylabel("Latency ms")
-    plt.title(title)
-    handles2, labels2 = p2.get_legend_handles_labels()
-    plt.legend(handles1 + handles2, labels1 + labels2,
-               loc='center left', bbox_to_anchor=(1.1, 0.81))
-    if log_iops:
-        p1.set_yscale('log')
-    if log_lat:
-        p2.set_yscale('log')
-    plt.subplots_adjust(right=0.68)
-    return get_emb_data_svg(plt)
-def make_plots(processed_results, plots):
-    """
-    processed_results: [PerfInfo]
-    plots = [(test_name_prefix:str, fname:str, description:str)]
-    """
-    files = {}
-    for name_pref, fname, desc in plots:
-        chart_data = []
-        for res in processed_results:
-            summ = res.name + "_" + res.summary
-            if summ.startswith(name_pref):
-                chart_data.append(res)
-        if len(chart_data) == 0:
-            raise ValueError("Can't found any date for " + name_pref)
-        use_bw = ssize2b(chart_data[0].p.blocksize) > 16 * 1024
-        chart_data.sort(key=lambda x: x.params['vals']['numjobs'])
-        lat = None
-        lat_min = None
-        lat_max = None
-        lat_50 = [x.lat_50 for x in chart_data]
-        lat_95 = [x.lat_95 for x in chart_data]
-        lat_diff_max = max(x.lat_95 / x.lat_50 for x in chart_data)
-        lat_log_scale = (lat_diff_max > 10)
-        testnodes_count = x.testnodes_count
-        concurence = [(testnodes_count, x.concurence)
-                      for x in chart_data]
-        if use_bw:
-            data = [x.bw.average / 1000 for x in chart_data]
-            data_conf = [x.bw.confidence / 1000 for x in chart_data]
-            data_dev = [x.bw.deviation * 2.5 / 1000 for x in chart_data]
-            name = "BW"
-        else:
-            data = [x.iops.average for x in chart_data]
-            data_conf = [x.iops.confidence for x in chart_data]
-            data_dev = [x.iops.deviation * 2 for x in chart_data]
-            name = "IOPS"
-        fc = io_chart(title=desc,
-                      concurence=concurence,
-                      latv=lat,
-                      latv_min=lat_min,
-                      latv_max=lat_max,
-                      iops_or_bw=data,
-                      iops_or_bw_err=data_conf,
-                      legend=name,
-                      log_lat=lat_log_scale,
-                      latv_50=lat_50,
-                      latv_95=lat_95,
-                      error2=data_dev)
-        files[fname] = fc
-    return files
-def find_max_where(processed_results, sync_mode, blocksize, rw, iops=True):
-    result = None
-    attr = 'iops' if iops else 'bw'
-    for measurement in processed_results:
-        ok = measurement.sync_mode == sync_mode
-        ok = ok and (measurement.p.blocksize == blocksize)
-        ok = ok and (measurement.p.rw == rw)
-        if ok:
-            field = getattr(measurement, attr)
-            if result is None:
-                result = field
-            elif field.average > result.average:
-                result = field
-    return result
-def get_disk_info(processed_results):
-    di = DiskInfo()
-    di.direct_iops_w_max = find_max_where(processed_results,
-                                          'd', '4k', 'randwrite')
-    di.direct_iops_r_max = find_max_where(processed_results,
-                                          'd', '4k', 'randread')
-    di.direct_iops_w64_max = find_max_where(processed_results,
-                                            'd', '64k', 'randwrite')
-    for sz in ('16m', '64m'):
-        di.bw_write_max = find_max_where(processed_results,
-                                         'd', sz, 'randwrite', False)
-        if di.bw_write_max is not None:
-            break
-    if di.bw_write_max is None:
-        for sz in ('1m', '2m', '4m', '8m'):
-            di.bw_write_max = find_max_where(processed_results,
-                                             'd', sz, 'write', False)
-            if di.bw_write_max is not None:
-                break
-    for sz in ('16m', '64m'):
-        di.bw_read_max = find_max_where(processed_results,
-                                        'd', sz, 'randread', False)
-        if di.bw_read_max is not None:
-            break
-    if di.bw_read_max is None:
-        di.bw_read_max = find_max_where(processed_results,
-                                        'd', '1m', 'read', False)
-    rws4k_iops_lat_th = []
-    for res in processed_results:
-        if res.sync_mode in 'xs' and res.p.blocksize == '4k':
-            if res.p.rw != 'randwrite':
-                continue
-            rws4k_iops_lat_th.append((res.iops.average,
-                                      res.lat,
-                                      # res.lat.average,
-                                      res.concurence))
-    rws4k_iops_lat_th.sort(key=lambda x: x[2])
-    latv = [lat for _, lat, _ in rws4k_iops_lat_th]
-    for tlat in [10, 30, 100]:
-        pos = bisect.bisect_left(latv, tlat)
-        if 0 == pos:
-            setattr(di, 'rws4k_{}ms'.format(tlat), 0)
-        elif pos == len(latv):
-            iops3, _, _ = rws4k_iops_lat_th[-1]
-            iops3 = int(round_3_digit(iops3))
-            setattr(di, 'rws4k_{}ms'.format(tlat), ">=" + str(iops3))
-        else:
-            lat1 = latv[pos - 1]
-            lat2 = latv[pos]
-            iops1, _, th1 = rws4k_iops_lat_th[pos - 1]
-            iops2, _, th2 = rws4k_iops_lat_th[pos]
-            th_lat_coef = (th2 - th1) / (lat2 - lat1)
-            th3 = th_lat_coef * (tlat - lat1) + th1
-            th_iops_coef = (iops2 - iops1) / (th2 - th1)
-            iops3 = th_iops_coef * (th3 - th1) + iops1
-            iops3 = int(round_3_digit(iops3))
-            setattr(di, 'rws4k_{}ms'.format(tlat), iops3)
-    hdi = DiskInfo()
-    def pp(x):
-        med, conf = x.rounded_average_conf()
-        conf_perc = int(float(conf) / med * 100)
-        dev_perc = int(float(x.deviation) / med * 100)
-        return (round_3_digit(med), conf_perc, dev_perc)
-    hdi.direct_iops_r_max = pp(di.direct_iops_r_max)
-    if di.direct_iops_w_max is not None:
-        hdi.direct_iops_w_max = pp(di.direct_iops_w_max)
-    else:
-        hdi.direct_iops_w_max = None
-    if di.direct_iops_w64_max is not None:
-        hdi.direct_iops_w64_max = pp(di.direct_iops_w64_max)
-    else:
-        hdi.direct_iops_w64_max = None
-    hdi.bw_write_max = pp(di.bw_write_max)
-    hdi.bw_read_max = pp(di.bw_read_max)
-    hdi.rws4k_10ms = di.rws4k_10ms if 0 != di.rws4k_10ms else None
-    hdi.rws4k_30ms = di.rws4k_30ms if 0 != di.rws4k_30ms else None
-    hdi.rws4k_100ms = di.rws4k_100ms if 0 != di.rws4k_100ms else None
-    return hdi
-@report('hdd', 'hdd')
-def make_hdd_report(processed_results, lab_info, comment):
-    plots = [
-        ('hdd_rrd4k', 'rand_read_4k', 'Random read 4k direct IOPS'),
-        ('hdd_rwx4k', 'rand_write_4k', 'Random write 4k sync IOPS')
-    ]
-    perf_infos = [res.disk_perf_info() for res in processed_results]
-    images = make_plots(perf_infos, plots)
-    di = get_disk_info(perf_infos)
-    return render_all_html(comment, di, lab_info, images, "report_hdd.html")
-@report('cinder_iscsi', 'cinder_iscsi')
-def make_cinder_iscsi_report(processed_results, lab_info, comment):
-    plots = [
-        ('cinder_iscsi_rrd4k', 'rand_read_4k', 'Random read 4k direct IOPS'),
-        ('cinder_iscsi_rwx4k', 'rand_write_4k', 'Random write 4k sync IOPS')
-    ]
-    perf_infos = [res.disk_perf_info() for res in processed_results]
-    try:
-        images = make_plots(perf_infos, plots)
-    except ValueError:
-        plots = [
-            ('cinder_iscsi_rrd4k', 'rand_read_4k', 'Random read 4k direct IOPS'),
-            ('cinder_iscsi_rws4k', 'rand_write_4k', 'Random write 4k sync IOPS')
-        ]
-        images = make_plots(perf_infos, plots)
-    di = get_disk_info(perf_infos)
-    return render_all_html(comment, di, lab_info, images, "report_cinder_iscsi.html")
-@report('ceph', 'ceph')
-def make_ceph_report(processed_results, lab_info, comment):
-    plots = [
-        ('ceph_rrd4k', 'rand_read_4k', 'Random read 4k direct IOPS'),
-        ('ceph_rws4k', 'rand_write_4k', 'Random write 4k sync IOPS'),
-        ('ceph_rrd16m', 'rand_read_16m', 'Random read 16m direct MiBps'),
-        ('ceph_rwd16m', 'rand_write_16m',
-         'Random write 16m direct MiBps'),
-    ]
-    perf_infos = [res.disk_perf_info() for res in processed_results]
-    images = make_plots(perf_infos, plots)
-    di = get_disk_info(perf_infos)
-    return render_all_html(comment, di, lab_info, images, "report_ceph.html")
-@report('mixed', 'mixed')
-def make_mixed_report(processed_results, lab_info, comment):
-    #
-    # IOPS(X% read) = 100 / ( X / IOPS_W + (100 - X) / IOPS_R )
-    #
-    perf_infos = [res.disk_perf_info() for res in processed_results]
-    mixed = collections.defaultdict(lambda: [])
-    is_ssd = False
-    for res in perf_infos:
-        if res.name.startswith('mixed'):
-            if res.name.startswith('mixed-ssd'):
-                is_ssd = True
-            mixed[res.concurence].append((res.p.rwmixread,
-                                          res.lat,
-                                          0,
-                                          # res.lat.average / 1000.0,
-                                          # res.lat.deviation / 1000.0,
-                                          res.iops.average,
-                                          res.iops.deviation))
-    if len(mixed) == 0:
-        raise ValueError("No mixed load found")
-    fig, p1 = plt.subplots()
-    p2 = p1.twinx()
-    colors = ['red', 'green', 'blue', 'orange', 'magenta', "teal"]
-    colors_it = iter(colors)
-    for conc, mix_lat_iops in sorted(mixed.items()):
-        mix_lat_iops = sorted(mix_lat_iops)
-        read_perc, lat, dev, iops, iops_dev = zip(*mix_lat_iops)
-        p1.errorbar(read_perc, iops, color=next(colors_it),
-                    yerr=iops_dev, label=str(conc) + " th")
-        p2.errorbar(read_perc, lat, color=next(colors_it),
-                    ls='--', yerr=dev, label=str(conc) + " th lat")
-    if is_ssd:
-        p1.set_yscale('log')
-        p2.set_yscale('log')
-    p1.set_xlim(-5, 105)
-    read_perc = set(read_perc)
-    read_perc.add(0)
-    read_perc.add(100)
-    read_perc = sorted(read_perc)
-    plt.xticks(read_perc, map(str, read_perc))
-    p1.grid(True)
-    p1.set_xlabel("% of reads")
-    p1.set_ylabel("Mixed IOPS")
-    p2.set_ylabel("Latency, ms")
-    handles1, labels1 = p1.get_legend_handles_labels()
-    handles2, labels2 = p2.get_legend_handles_labels()
-    plt.subplots_adjust(top=0.85)
-    plt.legend(handles1 + handles2, labels1 + labels2,
-               bbox_to_anchor=(0.5, 1.15),
-               loc='upper center',
-               prop={'size': 12}, ncol=3)
-    plt.show()
-def make_load_report(idx, results_dir, fname):
-    dpath = os.path.join(results_dir, "io_" + str(idx))
-    files = sorted(os.listdir(dpath))
-    gf = lambda x: "_".join(x.rsplit(".", 1)[0].split('_')[:3])
-    for key, group in itertools.groupby(files, gf):
-        fname = os.path.join(dpath, key + ".fio")
-        cfgs = list(parse_all_in_1(open(fname).read(), fname))
-        fname = os.path.join(dpath, key + "_lat.log")
-        curr = []
-        arrays = []
-        with open(fname) as fd:
-            for offset, lat, _, _ in csv.reader(fd):
-                offset = int(offset)
-                lat = int(lat)
-                if len(curr) > 0 and curr[-1][0] > offset:
-                    arrays.append(curr)
-                    curr = []
-                curr.append((offset, lat))
-            arrays.append(curr)
-        conc = int(cfgs[0].vals.get('numjobs', 1))
-        if conc != 5:
-            continue
-        assert len(arrays) == len(cfgs) * conc
-        garrays = [[(0, 0)] for _ in range(conc)]
-        for offset in range(len(cfgs)):
-            for acc, new_arr in zip(garrays, arrays[offset * conc:(offset + 1) * conc]):
-                last = acc[-1][0]
-                for off, lat in new_arr:
-                    acc.append((off / 1000. + last, lat / 1000.))
-        for cfg, arr in zip(cfgs, garrays):
-            plt.plot(*zip(*arr[1:]))
-        plt.show()
-        exit(1)
-def make_io_report(dinfo, comment, path, lab_info=None):
-    lab_info = {
-        "total_disk": "None",
-        "total_memory": "None",
-        "nodes_count": "None",
-        "processor_count": "None"
-    }
-    try:
-        res_fields = sorted(v.name for v in dinfo)
-        found = False
-        for fields, name, func in report_funcs:
-            for field in fields:
-                pos = bisect.bisect_left(res_fields, field)
-                if pos == len(res_fields):
-                    break
-                if not res_fields[pos].startswith(field):
-                    break
-            else:
-                found = True
-                hpath = path.format(name)
-                try:
-                    report = func(dinfo, lab_info, comment)
-                except:
-                    logger.exception("Diring {0} report generation".format(name))
-                    continue
-                if report is not None:
-                    try:
-                        with open(hpath, "w") as fd:
-                            fd.write(report)
-                    except:
-                        logger.exception("Diring saving {0} report".format(name))
-                        continue
-                    logger.info("Report {0} saved into {1}".format(name, hpath))
-                else:
-                    logger.warning("No report produced by {0!r}".format(name))
-        if not found:
-            logger.warning("No report generator found for this load")
-    except Exception as exc:
-        import traceback
-        traceback.print_exc()
-        logger.error("Failed to generate html report:" + str(exc))
+# class StoragePerfInfo:
+#     def __init__(self, name: str, summary: Any, params, testnodes_count) -> None:
+#         self.direct_iops_r_max = 0  # type: int
+#         self.direct_iops_w_max = 0  # type: int
+#         # 64 used instead of 4k to faster feed caches
+#         self.direct_iops_w64_max = 0  # type: int
+#         self.rws4k_10ms = 0  # type: int
+#         self.rws4k_30ms = 0  # type: int
+#         self.rws4k_100ms = 0  # type: int
+#         self.bw_write_max = 0  # type: int
+#         self.bw_read_max = 0  # type: int
+#         self.bw = None  #
+#         self.iops = None
+#         self.lat = None
+#         self.lat_50 = None
+#         self.lat_95 = None
+# # disk_info = None
+# # base = None
+# # linearity = None
+# def group_by_name(test_data):
+#     name_map = collections.defaultdict(lambda: [])
+#     for data in test_data:
+#         name_map[(data.name, data.summary())].append(data)
+#     return name_map
+# def report(name, required_fields):
+#     def closure(func):
+#         report_funcs.append((required_fields.split(","), name, func))
+#         return func
+#     return closure
+# def get_test_lcheck_params(pinfo):
+#     res = [{
+#         's': 'sync',
+#         'd': 'direct',
+#         'a': 'async',
+#         'x': 'sync direct'
+#     }[pinfo.sync_mode]]
+#     res.append(pinfo.p.rw)
+#     return " ".join(res)
+# def get_emb_data_svg(plt):
+#     sio = StringIO()
+#     plt.savefig(sio, format='svg')
+#     img_start = "<!-- Created with matplotlib (http://matplotlib.org/) -->"
+#     return sio.getvalue().split(img_start, 1)[1]
+# def get_template(templ_name):
+#     very_root_dir = os.path.dirname(os.path.dirname(wally.__file__))
+#     templ_dir = os.path.join(very_root_dir, 'report_templates')
+#     templ_file = os.path.join(templ_dir, templ_name)
+#     return open(templ_file, 'r').read()
+# def group_by(data, func):
+#     if len(data) < 2:
+#         yield data
+#         return
+#     ndata = [(func(dt), dt) for dt in data]
+#     ndata.sort(key=func)
+#     pkey, dt = ndata[0]
+#     curr_list = [dt]
+#     for key, val in ndata[1:]:
+#         if pkey != key:
+#             yield curr_list
+#             curr_list = [val]
+#         else:
+#             curr_list.append(val)
+#         pkey = key
+#     yield curr_list
+# @report('linearity', 'linearity_test')
+# def linearity_report(processed_results, lab_info, comment):
+#     labels_and_data_mp = collections.defaultdict(lambda: [])
+#     vls = {}
+#     # plot io_time = func(bsize)
+#     for res in processed_results.values():
+#         if res.name.startswith('linearity_test'):
+#             iotimes = [1000. / val for val in res.iops.raw]
+#             op_summ = get_test_summary(res.params)[:3]
+#             labels_and_data_mp[op_summ].append(
+#                 [res.p.blocksize, res.iops.raw, iotimes])
+#             cvls = res.params.vals.copy()
+#             del cvls['blocksize']
+#             del cvls['rw']
+#             cvls.pop('sync', None)
+#             cvls.pop('direct', None)
+#             cvls.pop('buffered', None)
+#             if op_summ not in vls:
+#                 vls[op_summ] = cvls
+#             else:
+#                 assert cvls == vls[op_summ]
+#     all_labels = None
+#     _, ax1 = plt.subplots()
+#     for name, labels_and_data in labels_and_data_mp.items():
+#         labels_and_data.sort(key=lambda x: ssize2b(x[0]))
+#         labels, _, iotimes = zip(*labels_and_data)
+#         if all_labels is None:
+#             all_labels = labels
+#         else:
+#             assert all_labels == labels
+#         plt.boxplot(iotimes)
+#         if len(labels_and_data) > 2 and \
+#            ssize2b(labels_and_data[-2][0]) >= 4096:
+#             xt = range(1, len(labels) + 1)
+#             def io_time(sz, bw, initial_lat):
+#                 return sz / bw + initial_lat
+#             x = numpy.array(map(ssize2b, labels))
+#             y = numpy.array([sum(dt) / len(dt) for dt in iotimes])
+#             popt, _ = scipy.optimize.curve_fit(io_time, x, y, p0=(100., 1.))
+#             y1 = io_time(x, *popt)
+#             plt.plot(xt, y1, linestyle='--',
+#                      label=name + ' LS linear approx')
+#             for idx, (sz, _, _) in enumerate(labels_and_data):
+#                 if ssize2b(sz) >= 4096:
+#                     break
+#             bw = (x[-1] - x[idx]) / (y[-1] - y[idx])
+#             lat = y[-1] - x[-1] / bw
+#             y2 = io_time(x, bw, lat)
+#             plt.plot(xt, y2, linestyle='--',
+#                      label=abbv_name_to_full(name) +
+#                      ' (4k & max) linear approx')
+#     plt.setp(ax1, xticklabels=labels)
+#     plt.xlabel("Block size")
+#     plt.ylabel("IO time, ms")
+#     plt.subplots_adjust(top=0.85)
+#     plt.legend(bbox_to_anchor=(0.5, 1.15),
+#                loc='upper center',
+#                prop={'size': 10}, ncol=2)
+#     plt.grid()
+#     iotime_plot = get_emb_data_svg(plt)
+#     plt.clf()
+#     # plot IOPS = func(bsize)
+#     _, ax1 = plt.subplots()
+#     for name, labels_and_data in labels_and_data_mp.items():
+#         labels_and_data.sort(key=lambda x: ssize2b(x[0]))
+#         _, data, _ = zip(*labels_and_data)
+#         plt.boxplot(data)
+#         avg = [float(sum(arr)) / len(arr) for arr in data]
+#         xt = range(1, len(data) + 1)
+#         plt.plot(xt, avg, linestyle='--',
+#                  label=abbv_name_to_full(name) + " avg")
+#     plt.setp(ax1, xticklabels=labels)
+#     plt.xlabel("Block size")
+#     plt.ylabel("IOPS")
+#     plt.legend(bbox_to_anchor=(0.5, 1.15),
+#                loc='upper center',
+#                prop={'size': 10}, ncol=2)
+#     plt.grid()
+#     plt.subplots_adjust(top=0.85)
+#     iops_plot = get_emb_data_svg(plt)
+#     res = set(get_test_lcheck_params(res) for res in processed_results.values())
+#     ncount = list(set(res.testnodes_count for res in processed_results.values()))
+#     conc = list(set(res.concurence for res in processed_results.values()))
+#     assert len(conc) == 1
+#     assert len(ncount) == 1
+#     descr = {
+#         'vm_count': ncount[0],
+#         'concurence': conc[0],
+#         'oper_descr': ", ".join(res).capitalize()
+#     }
+#     params_map = {'iotime_vs_size': iotime_plot,
+#                   'iops_vs_size': iops_plot,
+#                   'descr': descr}
+#     return get_template('report_linearity.html').format(**params_map)
+# @report('lat_vs_iops', 'lat_vs_iops')
+# def lat_vs_iops(processed_results, lab_info, comment):
+#     lat_iops = collections.defaultdict(lambda: [])
+#     requsted_vs_real = collections.defaultdict(lambda: {})
+#     for res in processed_results.values():
+#         if res.name.startswith('lat_vs_iops'):
+#             lat_iops[res.concurence].append((res.lat,
+#                                              0,
+#                                              res.iops.average,
+#                                              res.iops.deviation))
+#             # lat_iops[res.concurence].append((res.lat.average / 1000.0,
+#             #                                  res.lat.deviation / 1000.0,
+#             #                                  res.iops.average,
+#             #                                  res.iops.deviation))
+#             requested_iops = res.p.rate_iops * res.concurence
+#             requsted_vs_real[res.concurence][requested_iops] = \
+#                 (res.iops.average, res.iops.deviation)
+#     colors = ['red', 'green', 'blue', 'orange', 'magenta', "teal"]
+#     colors_it = iter(colors)
+#     for conc, lat_iops in sorted(lat_iops.items()):
+#         lat, dev, iops, iops_dev = zip(*lat_iops)
+#         plt.errorbar(iops, lat, xerr=iops_dev, yerr=dev, fmt='ro',
+#                      label=str(conc) + " threads",
+#                      color=next(colors_it))
+#     plt.xlabel("IOPS")
+#     plt.ylabel("Latency, ms")
+#     plt.grid()
+#     plt.legend(loc=0)
+#     plt_iops_vs_lat = get_emb_data_svg(plt)
+#     plt.clf()
+#     colors_it = iter(colors)
+#     for conc, req_vs_real in sorted(requsted_vs_real.items()):
+#         req, real = zip(*sorted(req_vs_real.items()))
+#         iops, dev = zip(*real)
+#         plt.errorbar(req, iops, yerr=dev, fmt='ro',
+#                      label=str(conc) + " threads",
+#                      color=next(colors_it))
+#     plt.xlabel("Requested IOPS")
+#     plt.ylabel("Get IOPS")
+#     plt.grid()
+#     plt.legend(loc=0)
+#     plt_iops_vs_requested = get_emb_data_svg(plt)
+#     res1 = processed_results.values()[0]
+#     params_map = {'iops_vs_lat': plt_iops_vs_lat,
+#                   'iops_vs_requested': plt_iops_vs_requested,
+#                   'oper_descr': get_test_lcheck_params(res1).capitalize()}
+#     return get_template('report_iops_vs_lat.html').format(**params_map)
+# def render_all_html(comment, info, lab_description, images, templ_name):
+#     data = info.__dict__.copy()
+#     for name, val in data.items():
+#         if not name.startswith('__'):
+#             if val is None:
+#                 if name in ('direct_iops_w64_max', 'direct_iops_w_max'):
+#                     data[name] = ('-', '-', '-')
+#                 else:
+#                     data[name] = '-'
+#             elif isinstance(val, (int, float, long)):
+#                 data[name] = round_3_digit(val)
+#     data['bw_read_max'] = (data['bw_read_max'][0] // 1024,
+#                            data['bw_read_max'][1],
+#                            data['bw_read_max'][2])
+#     data['bw_write_max'] = (data['bw_write_max'][0] // 1024,
+#                             data['bw_write_max'][1],
+#                             data['bw_write_max'][2])
+#     images.update(data)
+#     templ = get_template(templ_name)
+#     return templ.format(lab_info=lab_description,
+#                         comment=comment,
+#                         **images)
+# def io_chart(title, concurence,
+#              latv, latv_min, latv_max,
+#              iops_or_bw, iops_or_bw_err,
+#              legend,
+#              log_iops=False,
+#              log_lat=False,
+#              boxplots=False,
+#              latv_50=None,
+#              latv_95=None,
+#              error2=None):
+#     matplotlib.rcParams.update({'font.size': 10})
+#     points = " MiBps" if legend == 'BW' else ""
+#     lc = len(concurence)
+#     width = 0.35
+#     xt = range(1, lc + 1)
+#     op_per_vm = [v / (vm * th) for v, (vm, th) in zip(iops_or_bw, concurence)]
+#     fig, p1 = plt.subplots()
+#     xpos = [i - width / 2 for i in xt]
+#     p1.bar(xpos, iops_or_bw,
+#            width=width,
+#            color='y',
+#            label=legend)
+#     err1_leg = None
+#     for pos, y, err in zip(xpos, iops_or_bw, iops_or_bw_err):
+#         err1_leg = p1.errorbar(pos + width / 2,
+#                                y,
+#                                err,
+#                                color='magenta')
+#     err2_leg = None
+#     if error2 is not None:
+#         for pos, y, err in zip(xpos, iops_or_bw, error2):
+#             err2_leg = p1.errorbar(pos + width / 2 + 0.08,
+#                                    y,
+#                                    err,
+#                                    lw=2,
+#                                    alpha=0.5,
+#                                    color='teal')
+#     p1.grid(True)
+#     p1.plot(xt, op_per_vm, '--', label=legend + "/thread", color='black')
+#     handles1, labels1 = p1.get_legend_handles_labels()
+#     handles1 += [err1_leg]
+#     labels1 += ["95% conf"]
+#     if err2_leg is not None:
+#         handles1 += [err2_leg]
+#         labels1 += ["95% dev"]
+#     p2 = p1.twinx()
+#     if latv_50 is None:
+#         p2.plot(xt, latv_max, label="lat max")
+#         p2.plot(xt, latv, label="lat avg")
+#         p2.plot(xt, latv_min, label="lat min")
+#     else:
+#         p2.plot(xt, latv_50, label="lat med")
+#         p2.plot(xt, latv_95, label="lat 95%")
+#     plt.xlim(0.5, lc + 0.5)
+#     plt.xticks(xt, ["{0} * {1}".format(vm, th) for (vm, th) in concurence])
+#     p1.set_xlabel("VM Count * Thread per VM")
+#     p1.set_ylabel(legend + points)
+#     p2.set_ylabel("Latency ms")
+#     plt.title(title)
+#     handles2, labels2 = p2.get_legend_handles_labels()
+#     plt.legend(handles1 + handles2, labels1 + labels2,
+#                loc='center left', bbox_to_anchor=(1.1, 0.81))
+#     if log_iops:
+#         p1.set_yscale('log')
+#     if log_lat:
+#         p2.set_yscale('log')
+#     plt.subplots_adjust(right=0.68)
+#     return get_emb_data_svg(plt)
+# def make_plots(processed_results, plots):
+#     """
+#     processed_results: [PerfInfo]
+#     plots = [(test_name_prefix:str, fname:str, description:str)]
+#     """
+#     files = {}
+#     for name_pref, fname, desc in plots:
+#         chart_data = []
+#         for res in processed_results:
+#             summ = res.name + "_" + res.summary
+#             if summ.startswith(name_pref):
+#                 chart_data.append(res)
+#         if len(chart_data) == 0:
+#             raise ValueError("Can't found any date for " + name_pref)
+#         use_bw = ssize2b(chart_data[0].p.blocksize) > 16 * 1024
+#         chart_data.sort(key=lambda x: x.params['vals']['numjobs'])
+#         lat = None
+#         lat_min = None
+#         lat_max = None
+#         lat_50 = [x.lat_50 for x in chart_data]
+#         lat_95 = [x.lat_95 for x in chart_data]
+#         lat_diff_max = max(x.lat_95 / x.lat_50 for x in chart_data)
+#         lat_log_scale = (lat_diff_max > 10)
+#         testnodes_count = x.testnodes_count
+#         concurence = [(testnodes_count, x.concurence)
+#                       for x in chart_data]
+#         if use_bw:
+#             data = [x.bw.average / 1000 for x in chart_data]
+#             data_conf = [x.bw.confidence / 1000 for x in chart_data]
+#             data_dev = [x.bw.deviation * 2.5 / 1000 for x in chart_data]
+#             name = "BW"
+#         else:
+#             data = [x.iops.average for x in chart_data]
+#             data_conf = [x.iops.confidence for x in chart_data]
+#             data_dev = [x.iops.deviation * 2 for x in chart_data]
+#             name = "IOPS"
+#         fc = io_chart(title=desc,
+#                       concurence=concurence,
+#                       latv=lat,
+#                       latv_min=lat_min,
+#                       latv_max=lat_max,
+#                       iops_or_bw=data,
+#                       iops_or_bw_err=data_conf,
+#                       legend=name,
+#                       log_lat=lat_log_scale,
+#                       latv_50=lat_50,
+#                       latv_95=lat_95,
+#                       error2=data_dev)
+#         files[fname] = fc
+#     return files
+# def find_max_where(processed_results, sync_mode, blocksize, rw, iops=True):
+#     result = None
+#     attr = 'iops' if iops else 'bw'
+#     for measurement in processed_results:
+#         ok = measurement.sync_mode == sync_mode
+#         ok = ok and (measurement.p.blocksize == blocksize)
+#         ok = ok and (measurement.p.rw == rw)
+#         if ok:
+#             field = getattr(measurement, attr)
+#             if result is None:
+#                 result = field
+#             elif field.average > result.average:
+#                 result = field
+#     return result
+# def get_disk_info(processed_results):
+#     di = DiskInfo()
+#     di.direct_iops_w_max = find_max_where(processed_results,
+#                                           'd', '4k', 'randwrite')
+#     di.direct_iops_r_max = find_max_where(processed_results,
+#                                           'd', '4k', 'randread')
+#     di.direct_iops_w64_max = find_max_where(processed_results,
+#                                             'd', '64k', 'randwrite')
+#     for sz in ('16m', '64m'):
+#         di.bw_write_max = find_max_where(processed_results,
+#                                          'd', sz, 'randwrite', False)
+#         if di.bw_write_max is not None:
+#             break
+#     if di.bw_write_max is None:
+#         for sz in ('1m', '2m', '4m', '8m'):
+#             di.bw_write_max = find_max_where(processed_results,
+#                                              'd', sz, 'write', False)
+#             if di.bw_write_max is not None:
+#                 break
+#     for sz in ('16m', '64m'):
+#         di.bw_read_max = find_max_where(processed_results,
+#                                         'd', sz, 'randread', False)
+#         if di.bw_read_max is not None:
+#             break
+#     if di.bw_read_max is None:
+#         di.bw_read_max = find_max_where(processed_results,
+#                                         'd', '1m', 'read', False)
+#     rws4k_iops_lat_th = []
+#     for res in processed_results:
+#         if res.sync_mode in 'xs' and res.p.blocksize == '4k':
+#             if res.p.rw != 'randwrite':
+#                 continue
+#             rws4k_iops_lat_th.append((res.iops.average,
+#                                       res.lat,
+#                                       # res.lat.average,
+#                                       res.concurence))
+#     rws4k_iops_lat_th.sort(key=lambda x: x[2])
+#     latv = [lat for _, lat, _ in rws4k_iops_lat_th]
+#     for tlat in [10, 30, 100]:
+#         pos = bisect.bisect_left(latv, tlat)
+#         if 0 == pos:
+#             setattr(di, 'rws4k_{}ms'.format(tlat), 0)
+#         elif pos == len(latv):
+#             iops3, _, _ = rws4k_iops_lat_th[-1]
+#             iops3 = int(round_3_digit(iops3))
+#             setattr(di, 'rws4k_{}ms'.format(tlat), ">=" + str(iops3))
+#         else:
+#             lat1 = latv[pos - 1]
+#             lat2 = latv[pos]
+#             iops1, _, th1 = rws4k_iops_lat_th[pos - 1]
+#             iops2, _, th2 = rws4k_iops_lat_th[pos]
+#             th_lat_coef = (th2 - th1) / (lat2 - lat1)
+#             th3 = th_lat_coef * (tlat - lat1) + th1
+#             th_iops_coef = (iops2 - iops1) / (th2 - th1)
+#             iops3 = th_iops_coef * (th3 - th1) + iops1
+#             iops3 = int(round_3_digit(iops3))
+#             setattr(di, 'rws4k_{}ms'.format(tlat), iops3)
+#     hdi = DiskInfo()
+#     def pp(x):
+#         med, conf = x.rounded_average_conf()
+#         conf_perc = int(float(conf) / med * 100)
+#         dev_perc = int(float(x.deviation) / med * 100)
+#         return (round_3_digit(med), conf_perc, dev_perc)
+#     hdi.direct_iops_r_max = pp(di.direct_iops_r_max)
+#     if di.direct_iops_w_max is not None:
+#         hdi.direct_iops_w_max = pp(di.direct_iops_w_max)
+#     else:
+#         hdi.direct_iops_w_max = None
+#     if di.direct_iops_w64_max is not None:
+#         hdi.direct_iops_w64_max = pp(di.direct_iops_w64_max)
+#     else:
+#         hdi.direct_iops_w64_max = None
+#     hdi.bw_write_max = pp(di.bw_write_max)
+#     hdi.bw_read_max = pp(di.bw_read_max)
+#     hdi.rws4k_10ms = di.rws4k_10ms if 0 != di.rws4k_10ms else None
+#     hdi.rws4k_30ms = di.rws4k_30ms if 0 != di.rws4k_30ms else None
+#     hdi.rws4k_100ms = di.rws4k_100ms if 0 != di.rws4k_100ms else None
+#     return hdi
+# @report('hdd', 'hdd')
+# def make_hdd_report(processed_results, lab_info, comment):
+#     plots = [
+#         ('hdd_rrd4k', 'rand_read_4k', 'Random read 4k direct IOPS'),
+#         ('hdd_rwx4k', 'rand_write_4k', 'Random write 4k sync IOPS')
+#     ]
+#     perf_infos = [res.disk_perf_info() for res in processed_results]
+#     images = make_plots(perf_infos, plots)
+#     di = get_disk_info(perf_infos)
+#     return render_all_html(comment, di, lab_info, images, "report_hdd.html")
+# @report('cinder_iscsi', 'cinder_iscsi')
+# def make_cinder_iscsi_report(processed_results, lab_info, comment):
+#     plots = [
+#         ('cinder_iscsi_rrd4k', 'rand_read_4k', 'Random read 4k direct IOPS'),
+#         ('cinder_iscsi_rwx4k', 'rand_write_4k', 'Random write 4k sync IOPS')
+#     ]
+#     perf_infos = [res.disk_perf_info() for res in processed_results]
+#     try:
+#         images = make_plots(perf_infos, plots)
+#     except ValueError:
+#         plots = [
+#             ('cinder_iscsi_rrd4k', 'rand_read_4k', 'Random read 4k direct IOPS'),
+#             ('cinder_iscsi_rws4k', 'rand_write_4k', 'Random write 4k sync IOPS')
+#         ]
+#         images = make_plots(perf_infos, plots)
+#     di = get_disk_info(perf_infos)
+#     return render_all_html(comment, di, lab_info, images, "report_cinder_iscsi.html")
+# @report('ceph', 'ceph')
+# def make_ceph_report(processed_results, lab_info, comment):
+#     plots = [
+#         ('ceph_rrd4k', 'rand_read_4k', 'Random read 4k direct IOPS'),
+#         ('ceph_rws4k', 'rand_write_4k', 'Random write 4k sync IOPS'),
+#         ('ceph_rrd16m', 'rand_read_16m', 'Random read 16m direct MiBps'),
+#         ('ceph_rwd16m', 'rand_write_16m',
+#          'Random write 16m direct MiBps'),
+#     ]
+#     perf_infos = [res.disk_perf_info() for res in processed_results]
+#     images = make_plots(perf_infos, plots)
+#     di = get_disk_info(perf_infos)
+#     return render_all_html(comment, di, lab_info, images, "report_ceph.html")
+# @report('mixed', 'mixed')
+# def make_mixed_report(processed_results, lab_info, comment):
+#     #
+#     # IOPS(X% read) = 100 / ( X / IOPS_W + (100 - X) / IOPS_R )
+#     #
+#     perf_infos = [res.disk_perf_info() for res in processed_results]
+#     mixed = collections.defaultdict(lambda: [])
+#     is_ssd = False
+#     for res in perf_infos:
+#         if res.name.startswith('mixed'):
+#             if res.name.startswith('mixed-ssd'):
+#                 is_ssd = True
+#             mixed[res.concurence].append((res.p.rwmixread,
+#                                           res.lat,
+#                                           0,
+#                                           # res.lat.average / 1000.0,
+#                                           # res.lat.deviation / 1000.0,
+#                                           res.iops.average,
+#                                           res.iops.deviation))
+#     if len(mixed) == 0:
+#         raise ValueError("No mixed load found")
+#     fig, p1 = plt.subplots()
+#     p2 = p1.twinx()
+#     colors = ['red', 'green', 'blue', 'orange', 'magenta', "teal"]
+#     colors_it = iter(colors)
+#     for conc, mix_lat_iops in sorted(mixed.items()):
+#         mix_lat_iops = sorted(mix_lat_iops)
+#         read_perc, lat, dev, iops, iops_dev = zip(*mix_lat_iops)
+#         p1.errorbar(read_perc, iops, color=next(colors_it),
+#                     yerr=iops_dev, label=str(conc) + " th")
+#         p2.errorbar(read_perc, lat, color=next(colors_it),
+#                     ls='--', yerr=dev, label=str(conc) + " th lat")
+#     if is_ssd:
+#         p1.set_yscale('log')
+#         p2.set_yscale('log')
+#     p1.set_xlim(-5, 105)
+#     read_perc = set(read_perc)
+#     read_perc.add(0)
+#     read_perc.add(100)
+#     read_perc = sorted(read_perc)
+#     plt.xticks(read_perc, map(str, read_perc))
+#     p1.grid(True)
+#     p1.set_xlabel("% of reads")
+#     p1.set_ylabel("Mixed IOPS")
+#     p2.set_ylabel("Latency, ms")
+#     handles1, labels1 = p1.get_legend_handles_labels()
+#     handles2, labels2 = p2.get_legend_handles_labels()
+#     plt.subplots_adjust(top=0.85)
+#     plt.legend(handles1 + handles2, labels1 + labels2,
+#                bbox_to_anchor=(0.5, 1.15),
+#                loc='upper center',
+#                prop={'size': 12}, ncol=3)
+#     plt.show()
+# def make_load_report(idx, results_dir, fname):
+#     dpath = os.path.join(results_dir, "io_" + str(idx))
+#     files = sorted(os.listdir(dpath))
+#     gf = lambda x: "_".join(x.rsplit(".", 1)[0].split('_')[:3])
+#     for key, group in itertools.groupby(files, gf):
+#         fname = os.path.join(dpath, key + ".fio")
+#         cfgs = list(parse_all_in_1(open(fname).read(), fname))
+#         fname = os.path.join(dpath, key + "_lat.log")
+#         curr = []
+#         arrays = []
+#         with open(fname) as fd:
+#             for offset, lat, _, _ in csv.reader(fd):
+#                 offset = int(offset)
+#                 lat = int(lat)
+#                 if len(curr) > 0 and curr[-1][0] > offset:
+#                     arrays.append(curr)
+#                     curr = []
+#                 curr.append((offset, lat))
+#             arrays.append(curr)
+#         conc = int(cfgs[0].vals.get('numjobs', 1))
+#         if conc != 5:
+#             continue
+#         assert len(arrays) == len(cfgs) * conc
+#         garrays = [[(0, 0)] for _ in range(conc)]
+#         for offset in range(len(cfgs)):
+#             for acc, new_arr in zip(garrays, arrays[offset * conc:(offset + 1) * conc]):
+#                 last = acc[-1][0]
+#                 for off, lat in new_arr:
+#                     acc.append((off / 1000. + last, lat / 1000.))
+#         for cfg, arr in zip(cfgs, garrays):
+#             plt.plot(*zip(*arr[1:]))
+#         plt.show()
+#         exit(1)
+# def make_io_report(dinfo, comment, path, lab_info=None):
+#     lab_info = {
+#         "total_disk": "None",
+#         "total_memory": "None",
+#         "nodes_count": "None",
+#         "processor_count": "None"
+#     }
+#     try:
+#         res_fields = sorted(v.name for v in dinfo)
+#         found = False
+#         for fields, name, func in report_funcs:
+#             for field in fields:
+#                 pos = bisect.bisect_left(res_fields, field)
+#                 if pos == len(res_fields):
+#                     break
+#                 if not res_fields[pos].startswith(field):
+#                     break
+#             else:
+#                 found = True
+#                 hpath = path.format(name)
+#                 try:
+#                     report = func(dinfo, lab_info, comment)
+#                 except:
+#                     logger.exception("Diring {0} report generation".format(name))
+#                     continue
+#                 if report is not None:
+#                     try:
+#                         with open(hpath, "w") as fd:
+#                             fd.write(report)
+#                     except:
+#                         logger.exception("Diring saving {0} report".format(name))
+#                         continue
+#                     logger.info("Report {0} saved into {1}".format(name, hpath))
+#                 else:
+#                     logger.warning("No report produced by {0!r}".format(name))
+#         if not found:
+#             logger.warning("No report generator found for this load")
+#     except Exception as exc:
+#         import traceback
+#         traceback.print_exc()
+#         logger.error("Failed to generate html report:" + str(exc))
+#     # @classmethod
+#     # def prepare_data(cls, results) -> List[Dict[str, Any]]:
+#     #     """create a table with io performance report for console"""
+#     #
+#     #     def key_func(data: FioRunResult) -> Tuple[str, str, str, str, int]:
+#     #         tpl = data.summary_tpl()
+#     #         return (data.name,
+#     #                 tpl.oper,
+#     #                 tpl.mode,
+#     #                 ssize2b(tpl.bsize),
+#     #                 int(tpl.th_count) * int(tpl.vm_count))
+#     #     res = []
+#     #
+#     #     for item in sorted(results, key=key_func):
+#     #         test_dinfo = item.disk_perf_info()
+#     #         testnodes_count = len(item.config.nodes)
+#     #
+#     #         iops, _ = test_dinfo.iops.rounded_average_conf()
+#     #
+#     #         if test_dinfo.iops_sys is not None:
+#     #             iops_sys, iops_sys_conf = test_dinfo.iops_sys.rounded_average_conf()
+#     #             _, iops_sys_dev = test_dinfo.iops_sys.rounded_average_dev()
+#     #             iops_sys_per_vm = round_3_digit(iops_sys / testnodes_count)
+#     #             iops_sys = round_3_digit(iops_sys)
+#     #         else:
+#     #             iops_sys = None
+#     #             iops_sys_per_vm = None
+#     #             iops_sys_dev = None
+#     #             iops_sys_conf = None
+#     #
+#     #         bw, bw_conf = test_dinfo.bw.rounded_average_conf()
+#     #         _, bw_dev = test_dinfo.bw.rounded_average_dev()
+#     #         conf_perc = int(round(bw_conf * 100 / bw))
+#     #         dev_perc = int(round(bw_dev * 100 / bw))
+#     #
+#     #         lat_50 = round_3_digit(int(test_dinfo.lat_50))
+#     #         lat_95 = round_3_digit(int(test_dinfo.lat_95))
+#     #         lat_avg = round_3_digit(int(test_dinfo.lat_avg))
+#     #
+#     #         iops_per_vm = round_3_digit(iops / testnodes_count)
+#     #         bw_per_vm = round_3_digit(bw / testnodes_count)
+#     #
+#     #         iops = round_3_digit(iops)
+#     #         bw = round_3_digit(bw)
+#     #
+#     #         summ = "{0.oper}{0.mode} {0.bsize:>4} {0.th_count:>3}th {0.vm_count:>2}vm".format(item.summary_tpl())
+#     #
+#     #         res.append({"name": key_func(item)[0],
+#     #                     "key": key_func(item)[:4],
+#     #                     "summ": summ,
+#     #                     "iops": int(iops),
+#     #                     "bw": int(bw),
+#     #                     "conf": str(conf_perc),
+#     #                     "dev": str(dev_perc),
+#     #                     "iops_per_vm": int(iops_per_vm),
+#     #                     "bw_per_vm": int(bw_per_vm),
+#     #                     "lat_50": lat_50,
+#     #                     "lat_95": lat_95,
+#     #                     "lat_avg": lat_avg,
+#     #
+#     #                     "iops_sys": iops_sys,
+#     #                     "iops_sys_per_vm": iops_sys_per_vm,
+#     #                     "sys_conf": iops_sys_conf,
+#     #                     "sys_dev": iops_sys_dev})
+#     #
+#     #     return res
+#     #
+#     # Field = collections.namedtuple("Field", ("header", "attr", "allign", "size"))
+#     # fiels_and_header = [
+#     #     Field("Name",           "name",        "l",  7),
+#     #     Field("Description",    "summ",        "l", 19),
+#     #     Field("IOPS\ncum",      "iops",        "r",  3),
+#     #     # Field("IOPS_sys\ncum",  "iops_sys",    "r",  3),
+#     #     Field("KiBps\ncum",     "bw",          "r",  6),
+#     #     Field("Cnf %\n95%",     "conf",        "r",  3),
+#     #     Field("Dev%",           "dev",         "r",  3),
+#     #     Field("iops\n/vm",      "iops_per_vm", "r",  3),
+#     #     Field("KiBps\n/vm",     "bw_per_vm",   "r",  6),
+#     #     Field("lat ms\nmedian", "lat_50",      "r",  3),
+#     #     Field("lat ms\n95%",    "lat_95",      "r",  3),
+#     #     Field("lat\navg",       "lat_avg",     "r",  3),
+#     # ]
+#     #
+#     # fiels_and_header_dct = dict((item.attr, item) for item in fiels_and_header)
+#     #
+#     # @classmethod
+#     # def format_for_console(cls, results) -> str:
+#     #     """create a table with io performance report for console"""
+#     #
+#     #     tab = texttable.Texttable(max_width=120)
+#     #     tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
+#     #     tab.set_cols_align([f.allign for f in cls.fiels_and_header])
+#     #     sep = ["-" * f.size for f in cls.fiels_and_header]
+#     #     tab.header([f.header for f in cls.fiels_and_header])
+#     #     prev_k = None
+#     #     for item in cls.prepare_data(results):
+#     #         if prev_k is not None:
+#     #             if prev_k != item["key"]:
+#     #                 tab.add_row(sep)
+#     #
+#     #         prev_k = item["key"]
+#     #         tab.add_row([item[f.attr] for f in cls.fiels_and_header])
+#     #
+#     #     return tab.draw()
+#     #
+#     # @classmethod
+#     # def format_diff_for_console(cls, list_of_results: List[Any]) -> str:
+#     #     """create a table with io performance report for console"""
+#     #
+#     #     tab = texttable.Texttable(max_width=200)
+#     #     tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
+#     #
+#     #     header = [
+#     #         cls.fiels_and_header_dct["name"].header,
+#     #         cls.fiels_and_header_dct["summ"].header,
+#     #     ]
+#     #     allign = ["l", "l"]
+#     #
+#     #     header.append("IOPS ~ Cnf% ~ Dev%")
+#     #     allign.extend(["r"] * len(list_of_results))
+#     #     header.extend(
+#     #         "IOPS_{0} %".format(i + 2) for i in range(len(list_of_results[1:]))
+#     #     )
+#     #
+#     #     header.append("BW")
+#     #     allign.extend(["r"] * len(list_of_results))
+#     #     header.extend(
+#     #         "BW_{0} %".format(i + 2) for i in range(len(list_of_results[1:]))
+#     #     )
+#     #
+#     #     header.append("LAT")
+#     #     allign.extend(["r"] * len(list_of_results))
+#     #     header.extend(
+#     #         "LAT_{0}".format(i + 2) for i in range(len(list_of_results[1:]))
+#     #     )
+#     #
+#     #     tab.header(header)
+#     #     sep = ["-" * 3] * len(header)
+#     #     processed_results = map(cls.prepare_data, list_of_results)
+#     #
+#     #     key2results = []
+#     #     for res in processed_results:
+#     #         key2results.append(dict(
+#     #             ((item["name"], item["summ"]), item) for item in res
+#     #         ))
+#     #
+#     #     prev_k = None
+#     #     iops_frmt = "{0[iops]} ~ {0[conf]:>2} ~ {0[dev]:>2}"
+#     #     for item in processed_results[0]:
+#     #         if prev_k is not None:
+#     #             if prev_k != item["key"]:
+#     #                 tab.add_row(sep)
+#     #
+#     #         prev_k = item["key"]
+#     #
+#     #         key = (item['name'], item['summ'])
+#     #         line = list(key)
+#     #         base = key2results[0][key]
+#     #
+#     #         line.append(iops_frmt.format(base))
+#     #
+#     #         for test_results in key2results[1:]:
+#     #             val = test_results.get(key)
+#     #             if val is None:
+#     #                 line.append("-")
+#     #             elif base['iops'] == 0:
+#     #                 line.append("Nan")
+#     #             else:
+#     #                 prc_val = {'dev': val['dev'], 'conf': val['conf']}
+#     #                 prc_val['iops'] = int(100 * val['iops'] / base['iops'])
+#     #                 line.append(iops_frmt.format(prc_val))
+#     #
+#     #         line.append(base['bw'])
+#     #
+#     #         for test_results in key2results[1:]:
+#     #             val = test_results.get(key)
+#     #             if val is None:
+#     #                 line.append("-")
+#     #             elif base['bw'] == 0:
+#     #                 line.append("Nan")
+#     #             else:
+#     #                 line.append(int(100 * val['bw'] / base['bw']))
+#     #
+#     #         for test_results in key2results:
+#     #             val = test_results.get(key)
+#     #             if val is None:
+#     #                 line.append("-")
+#     #             else:
+#     #                 line.append("{0[lat_50]} - {0[lat_95]}".format(val))
+#     #
+#     #         tab.add_row(line)
+#     #
+#     #     tab.set_cols_align(allign)
+#     #     return tab.draw()
+# #
+# #
+# # def load_sys_log_file(ftype: str, fname: str) -> TimeSeriesValue:
+# #     assert ftype == 'iops'
+# #     pval = None
+# #     with open(fname) as fd:
+# #         iops = []
+# #         for ln in fd:
+# #             params = ln.split()
+# #             cval = int(params[WRITE_IOPS_DISCSTAT_POS]) + \
+# #                 int(params[READ_IOPS_DISCSTAT_POS])
+# #             if pval is not None:
+# #                 iops.append(cval - pval)
+# #             pval = cval
+# #
+# #     vals = [(idx * 1000, val) for idx, val in enumerate(iops)]
+# #     return TimeSeriesValue(vals)
+# #
+# #
+# # def load_test_results(folder: str, run_num: int) -> 'FioRunResult':
+# #     res = {}
+# #     params = None
+# #
+# #     fn = os.path.join(folder, str(run_num) + '_params.yaml')
+# #     params = yaml.load(open(fn).read())
+# #
+# #     conn_ids_set = set()
+# #     rr = r"{}_(?P<conn_id>.*?)_(?P<type>[^_.]*)\.\d+\.log$".format(run_num)
+# #     for fname in os.listdir(folder):
+# #         rm = re.match(rr, fname)
+# #         if rm is None:
+# #             continue
+# #
+# #         conn_id_s = rm.group('conn_id')
+# #         conn_id = conn_id_s.replace('_', ':')
+# #         ftype = rm.group('type')
+# #
+# #         if ftype not in ('iops', 'bw', 'lat'):
+# #             continue
+# #
+# #         ts = load_fio_log_file(os.path.join(folder, fname))
+# #         res.setdefault(ftype, {}).setdefault(conn_id, []).append(ts)
+# #
+# #         conn_ids_set.add(conn_id)
+# #
+# #     rr = r"{}_(?P<conn_id>.*?)_(?P<type>[^_.]*)\.sys\.log$".format(run_num)
+# #     for fname in os.listdir(folder):
+# #         rm = re.match(rr, fname)
+# #         if rm is None:
+# #             continue
+# #
+# #         conn_id_s = rm.group('conn_id')
+# #         conn_id = conn_id_s.replace('_', ':')
+# #         ftype = rm.group('type')
+# #
+# #         if ftype not in ('iops', 'bw', 'lat'):
+# #             continue
+# #
+# #         ts = load_sys_log_file(ftype, os.path.join(folder, fname))
+# #         res.setdefault(ftype + ":sys", {}).setdefault(conn_id, []).append(ts)
+# #
+# #         conn_ids_set.add(conn_id)
+# #
+# #     mm_res = {}
+# #
+# #     if len(res) == 0:
+# #         raise ValueError("No data was found")
+# #
+# #     for key, data in res.items():
+# #         conn_ids = sorted(conn_ids_set)
+# #         awail_ids = [conn_id for conn_id in conn_ids if conn_id in data]
+# #         matr = [data[conn_id] for conn_id in awail_ids]
+# #         mm_res[key] = MeasurementMatrix(matr, awail_ids)
+# #
+# #     raw_res = {}
+# #     for conn_id in conn_ids:
+# #         fn = os.path.join(folder, "{0}_{1}_rawres.json".format(run_num, conn_id_s))
+# #
+# #         # remove message hack
+# #         fc = "{" + open(fn).read().split('{', 1)[1]
+# #         raw_res[conn_id] = json.loads(fc)
+# #
+# #     fio_task = FioJobSection(params['name'])
+# #     fio_task.vals.update(params['vals'])
+# #
+# #     config = TestConfig('io', params, None, params['nodes'], folder, None)
+# #     return FioRunResult(config, fio_task, mm_res, raw_res, params['intervals'], run_num)
+# #
+# # class DiskPerfInfo:
+# #     def __init__(self, name: str, summary: str, params: Dict[str, Any], testnodes_count: int) -> None:
+# #         self.name = name
+# #         self.bw = None
+# #         self.iops = None
+# #         self.lat = None
+# #         self.lat_50 = None
+# #         self.lat_95 = None
+# #         self.lat_avg = None
+# #
+# #         self.raw_bw = []
+# #         self.raw_iops = []
+# #         self.raw_lat = []
+# #
+# #         self.params = params
+# #         self.testnodes_count = testnodes_count
+# #         self.summary = summary
+# #
+# #         self.sync_mode = get_test_sync_mode(self.params['vals'])
+# #         self.concurence = self.params['vals'].get('numjobs', 1)
+# #
+# #
+# # class IOTestResults:
+# #     def __init__(self, suite_name: str, fio_results: 'FioRunResult', log_directory: str):
+# #         self.suite_name = suite_name
+# #         self.fio_results = fio_results
+# #         self.log_directory = log_directory
+# #
+# #     def __iter__(self):
+# #         return iter(self.fio_results)
+# #
+# #     def __len__(self):
+# #         return len(self.fio_results)
+# #
+# #     def get_yamable(self) -> Dict[str, List[str]]:
+# #         items = [(fio_res.summary(), fio_res.idx) for fio_res in self]
+# #         return {self.suite_name: [self.log_directory] + items}
+# # class FioRunResult(TestResults):
+# #     """
+# #     Fio run results
+# #     config: TestConfig
+# #     fio_task: FioJobSection
+# #     ts_results: {str: MeasurementMatrix[TimeSeriesValue]}
+# #     raw_result: ????
+# #     run_interval:(float, float) - test tun time, used for sensors
+# #     """
+# #     def __init__(self, config, fio_task, ts_results, raw_result, run_interval, idx):
+# #
+# #         self.name = fio_task.name.rsplit("_", 1)[0]
+# #         self.fio_task = fio_task
+# #         self.idx = idx
+# #
+# #         self.bw = ts_results['bw']
+# #         self.lat = ts_results['lat']
+# #         self.iops = ts_results['iops']
+# #
+# #         if 'iops:sys' in ts_results:
+# #             self.iops_sys = ts_results['iops:sys']
+# #         else:
+# #             self.iops_sys = None
+# #
+# #         res = {"bw": self.bw,
+# #                "lat": self.lat,
+# #                "iops": self.iops,
+# #                "iops:sys": self.iops_sys}
+# #
+# #         self.sensors_data = None
+# #         self._pinfo = None
+# #         TestResults.__init__(self, config, res, raw_result, run_interval)
+# #
+# #     def get_params_from_fio_report(self):
+# #         nodes = self.bw.connections_ids
+# #
+# #         iops = [self.raw_result[node]['jobs'][0]['mixed']['iops'] for node in nodes]
+# #         total_ios = [self.raw_result[node]['jobs'][0]['mixed']['total_ios'] for node in nodes]
+# #         runtime = [self.raw_result[node]['jobs'][0]['mixed']['runtime'] / 1000 for node in nodes]
+# #         flt_iops = [float(ios) / rtime for ios, rtime in zip(total_ios, runtime)]
+# #
+# #         bw = [self.raw_result[node]['jobs'][0]['mixed']['bw'] for node in nodes]
+# #         total_bytes = [self.raw_result[node]['jobs'][0]['mixed']['io_bytes'] for node in nodes]
+# #         flt_bw = [float(tbytes) / rtime for tbytes, rtime in zip(total_bytes, runtime)]
+# #
+# #         return {'iops': iops,
+# #                 'flt_iops': flt_iops,
+# #                 'bw': bw,
+# #                 'flt_bw': flt_bw}
+# #
+# #     def summary(self):
+# #         return get_test_summary(self.fio_task, len(self.config.nodes))
+# #
+# #     def summary_tpl(self):
+# #         return get_test_summary_tuple(self.fio_task, len(self.config.nodes))
+# #
+# #     def get_lat_perc_50_95_multy(self):
+# #         lat_mks = collections.defaultdict(lambda: 0)
+# #         num_res = 0
+# #
+# #         for result in self.raw_result.values():
+# #             num_res += len(result['jobs'])
+# #             for job_info in result['jobs']:
+# #                 for k, v in job_info['latency_ms'].items():
+# #                     if isinstance(k, basestring) and k.startswith('>='):
+# #                         lat_mks[int(k[2:]) * 1000] += v
+# #                     else:
+# #                         lat_mks[int(k) * 1000] += v
+# #
+# #                 for k, v in job_info['latency_us'].items():
+# #                     lat_mks[int(k)] += v
+# #
+# #         for k, v in lat_mks.items():
+# #             lat_mks[k] = float(v) / num_res
+# #         return get_lat_perc_50_95(lat_mks)
+# #
+# #     def disk_perf_info(self, avg_interval=2.0):
+# #
+# #         if self._pinfo is not None:
+# #             return self._pinfo
+# #
+# #         testnodes_count = len(self.config.nodes)
+# #
+# #         pinfo = DiskPerfInfo(self.name,
+# #                              self.summary(),
+# #                              self.params,
+# #                              testnodes_count)
+# #
+# #         def prepare(data, drop=1):
+# #             if data is None:
+# #                 return data
+# #
+# #             res = []
+# #             for ts_data in data:
+# #                 if ts_data.average_interval() < avg_interval:
+# #                     ts_data = ts_data.derived(avg_interval)
+# #
+# #                 # drop last value on bounds
+# #                 # as they may contains ranges without activities
+# #                 assert len(ts_data.values) >= drop + 1, str(drop) + " " + str(ts_data.values)
+# #
+# #                 if drop > 0:
+# #                     res.append(ts_data.values[:-drop])
+# #                 else:
+# #                     res.append(ts_data.values)
+# #
+# #             return res
+# #
+# #         def agg_data(matr):
+# #             arr = sum(matr, [])
+# #             min_len = min(map(len, arr))
+# #             res = []
+# #             for idx in range(min_len):
+# #                 res.append(sum(dt[idx] for dt in arr))
+# #             return res
+# #
+# #         pinfo.raw_lat = map(prepare, self.lat.per_vm())
+# #         num_th = sum(map(len, pinfo.raw_lat))
+# #         lat_avg = [val / num_th for val in agg_data(pinfo.raw_lat)]
+# #         pinfo.lat_avg = data_property(lat_avg).average / 1000  # us to ms
+# #
+# #         pinfo.lat_50, pinfo.lat_95 = self.get_lat_perc_50_95_multy()
+# #         pinfo.lat = pinfo.lat_50
+# #
+# #         pinfo.raw_bw = map(prepare, self.bw.per_vm())
+# #         pinfo.raw_iops = map(prepare, self.iops.per_vm())
+# #
+# #         if self.iops_sys is not None:
+# #             pinfo.raw_iops_sys = map(prepare, self.iops_sys.per_vm())
+# #             pinfo.iops_sys = data_property(agg_data(pinfo.raw_iops_sys))
+# #         else:
+# #             pinfo.raw_iops_sys = None
+# #             pinfo.iops_sys = None
+# #
+# #         fparams = self.get_params_from_fio_report()
+# #         fio_report_bw = sum(fparams['flt_bw'])
+# #         fio_report_iops = sum(fparams['flt_iops'])
+# #
+# #         agg_bw = agg_data(pinfo.raw_bw)
+# #         agg_iops = agg_data(pinfo.raw_iops)
+# #
+# #         log_bw_avg = average(agg_bw)
+# #         log_iops_avg = average(agg_iops)
+# #
+# #         # update values to match average from fio report
+# #         coef_iops = fio_report_iops / float(log_iops_avg)
+# #         coef_bw = fio_report_bw / float(log_bw_avg)
+# #
+# #         bw_log = data_property([val * coef_bw for val in agg_bw])
+# #         iops_log = data_property([val * coef_iops for val in agg_iops])
+# #
+# #         bw_report = data_property([fio_report_bw])
+# #         iops_report = data_property([fio_report_iops])
+# #
+# #         # When IOPS/BW per thread is too low
+# #         # data from logs is rounded to match
+# #         iops_per_th = sum(sum(pinfo.raw_iops, []), [])
+# #         if average(iops_per_th) > 10:
+# #             pinfo.iops = iops_log
+# #             pinfo.iops2 = iops_report
+# #         else:
+# #             pinfo.iops = iops_report
+# #             pinfo.iops2 = iops_log
+# #
+# #         bw_per_th = sum(sum(pinfo.raw_bw, []), [])
+# #         if average(bw_per_th) > 10:
+# #             pinfo.bw = bw_log
+# #             pinfo.bw2 = bw_report
+# #         else:
+# #             pinfo.bw = bw_report
+# #             pinfo.bw2 = bw_log
+# #
+# #         self._pinfo = pinfo
+# #
+# #         return pinfo
+# # class TestResult:
+# #     """Hold all information for a given test - test info,
+# #     sensors data and performance results for test period from all nodes"""
+# #     run_id = None  # type: int
+# #     test_info = None  # type: Any
+# #     begin_time = None  # type: int
+# #     end_time = None  # type: int
+# #     sensors = None  # Dict[Tuple[str, str, str], TimeSeries]
+# #     performance = None  # Dict[Tuple[str, str], TimeSeries]
+# #
+# #     class TestResults:
+# #         """
+# #         this class describe test results
+# #
+# #         config:TestConfig - test config object
+# #         params:dict - parameters from yaml file for this test
+# #         results:{str:MeasurementMesh} - test results object
+# #         raw_result:Any - opaque object to store raw results
+# #         run_interval:(float, float) - test tun time, used for sensors
+# #         """
+# #
+# #         def __init__(self,
+# #                      config: TestConfig,
+# #                      results: Dict[str, Any],
+# #                      raw_result: Any,
+# #                      run_interval: Tuple[float, float]) -> None:
+# #             self.config = config
+# #             self.params = config.params
+# #             self.results = results
+# #             self.raw_result = raw_result
+# #             self.run_interval = run_interval
+# #
+# #         def __str__(self) -> str:
+# #             res = "{0}({1}):\n    results:\n".format(
+# #                 self.__class__.__name__,
+# #                 self.summary())
+# #
+# #             for name, val in self.results.items():
+# #                 res += "        {0}={1}\n".format(name, val)
+# #
+# #             res += "    params:\n"
+# #
+# #             for name, val in self.params.items():
+# #                 res += "        {0}={1}\n".format(name, val)
+# #
+# #             return res
+# #
+# #         def summary(self) -> str:
+# #             raise NotImplementedError()
+# #             return ""
+# #
+# #         def get_yamable(self) -> Any:
+# #             raise NotImplementedError()
+# #             return None
+#             # class MeasurementMatrix:
+# #     """
+# #     data:[[MeasurementResult]] - VM_COUNT x TH_COUNT matrix of MeasurementResult
+# #     """
+# #     def __init__(self, data, connections_ids):
+# #         self.data = data
+# #         self.connections_ids = connections_ids
+# #
+# #     def per_vm(self):
+# #         return self.data
+# #
+# #     def per_th(self):
+# #         return sum(self.data, [])
+# # class MeasurementResults:
+# #     data = None  # type: List[Any]
+# #
+# #     def stat(self) -> StatProps:
+# #         return data_property(self.data)
+# #
+# #     def __str__(self) -> str:
+# #         return 'TS([' + ", ".join(map(str, self.data)) + '])'
+# #
+# #
+# # class SimpleVals(MeasurementResults):
+# #     """
+# #     data:[float] - list of values
+# #     """
+# #     def __init__(self, data: List[float]) -> None:
+# #         self.data = data
+# #
+# #
+# # class TimeSeriesValue(MeasurementResults):
+# #     """
+# #     data:[(float, float, float)] - list of (start_time, lenght, average_value_for_interval)
+# #     odata: original values
+# #     """
+# #     def __init__(self, data: List[Tuple[float, float]]) -> None:
+# #         assert len(data) > 0
+# #         self.odata = data[:]
+# #         self.data = []  # type: List[Tuple[float, float, float]]
+# #
+# #         cstart = 0.0
+# #         for nstart, nval in data:
+# #             self.data.append((cstart, nstart - cstart, nval))
+# #             cstart = nstart
+# #
+# #     @property
+# #     def values(self) -> List[float]:
+# #         return [val[2] for val in self.data]
+# #
+# #     def average_interval(self) -> float:
+# #         return float(sum([val[1] for val in self.data])) / len(self.data)
+# #
+# #     def skip(self, seconds) -> 'TimeSeriesValue':
+# #         nres = []
+# #         for start, ln, val in self.data:
+# #             nstart = start + ln - seconds
+# #             if nstart > 0:
+# #                 nres.append([nstart, val])
+# #         return self.__class__(nres)
+# #
+# #     def derived(self, tdelta) -> 'TimeSeriesValue':
+# #         end = self.data[-1][0] + self.data[-1][1]
+# #         tdelta = float(tdelta)
+# #
+# #         ln = end / tdelta
+# #
+# #         if ln - int(ln) > 0:
+# #             ln += 1
+# #
+# #         res = [[tdelta * i, 0.0] for i in range(int(ln))]
+# #
+# #         for start, lenght, val in self.data:
+# #             start_idx = int(start / tdelta)
+# #             end_idx = int((start + lenght) / tdelta)
+# #
+# #             for idx in range(start_idx, end_idx + 1):
+# #                 rstart = tdelta * idx
+# #                 rend = tdelta * (idx + 1)
+# #
+# #                 intersection_ln = min(rend, start + lenght) - max(start, rstart)
+# #                 if intersection_ln > 0:
+# #                     try:
+# #                         res[idx][1] += val * intersection_ln / tdelta
+# #                     except IndexError:
+# #                         raise
+# #
+# #         return self.__class__(res)
+# def console_report_stage(ctx: TestRun) -> None:
+#     # TODO(koder): load data from storage
+#     raise NotImplementedError("...")
+#     # first_report = True
+#     # text_rep_fname = ctx.config.text_report_file
+#     #
+#     # with open(text_rep_fname, "w") as fd:
+#     #     for tp, data in ctx.results.items():
+#     #         if 'io' == tp and data is not None:
+#     #             rep_lst = []
+#     #             for result in data:
+#     #                 rep_lst.append(
+#     #                     IOPerfTest.format_for_console(list(result)))
+#     #             rep = "\n\n".join(rep_lst)
+#     #         elif tp in ['mysql', 'pgbench'] and data is not None:
+#     #             rep = MysqlTest.format_for_console(data)
+#     #         elif tp == 'omg':
+#     #             rep = OmgTest.format_for_console(data)
+#     #         else:
+#     #             logger.warning("Can't generate text report for " + tp)
+#     #             continue
+#     #
+#     #         fd.write(rep)
+#     #         fd.write("\n")
+#     #
+#     #         if first_report:
+#     #             logger.info("Text report were stored in " + text_rep_fname)
+#     #             first_report = False
+#     #
+#     #         print("\n" + rep + "\n")
+# # def test_load_report_stage(cfg: Config, ctx: TestRun) -> None:
+# #     load_rep_fname = cfg.load_report_file
+# #     found = False
+# #     for idx, (tp, data) in enumerate(ctx.results.items()):
+# #         if 'io' == tp and data is not None:
+# #             if found:
+# #                 logger.error("Making reports for more than one " +
+# #                              "io block isn't supported! All " +
+# #                              "report, except first are skipped")
+# #                 continue
+# #             found = True
+# #             report.make_load_report(idx, cfg['results'], load_rep_fname)
+# #
+# #
+# # def html_report_stage(ctx: TestRun) -> None:
+#     # TODO(koder): load data from storage
+#     # raise NotImplementedError("...")
+#     # html_rep_fname = cfg.html_report_file
+#     # found = False
+#     # for tp, data in ctx.results.items():
+#     #     if 'io' == tp and data is not None:
+#     #         if found or len(data) > 1:
+#     #             logger.error("Making reports for more than one " +
+#     #                          "io block isn't supported! All " +
+#     #                          "report, except first are skipped")
+#     #             continue
+#     #         found = True
+#     #         report.make_io_report(list(data[0]),
+#     #                               cfg.get('comment', ''),
+#     #                               html_rep_fname,
+#     #                               lab_info=ctx.nodes)
+# #
+# # def load_data_from_path(test_res_dir: str) -> Mapping[str, List[Any]]:
+# #     files = get_test_files(test_res_dir)
+# #     raw_res = yaml_load(open(files['raw_results']).read())
+# #     res = collections.defaultdict(list)
+# #
+# #     for tp, test_lists in raw_res:
+# #         for tests in test_lists:
+# #             for suite_name, suite_data in tests.items():
+# #                 result_folder = suite_data[0]
+# #                 res[tp].append(TOOL_TYPE_MAPPER[tp].load(suite_name, result_folder))
+# #
+# #     return res
+# #
+# #
+# # def load_data_from_path_stage(var_dir: str, _, ctx: TestRun) -> None:
+# #     for tp, vals in load_data_from_path(var_dir).items():
+# #         ctx.results.setdefault(tp, []).extend(vals)
+# #
+# #
+# # def load_data_from(var_dir: str) -> Callable[[TestRun], None]:
+# #     return functools.partial(load_data_from_path_stage, var_dir)
diff --git a/wally/result_classes.py b/wally/result_classes.py
new file mode 100644
index 0000000..9b488b7
--- /dev/null
+++ b/wally/result_classes.py
@@ -0,0 +1,54 @@
+from typing import Union, Dict, List, Any, Tuple
+# Stores test result for integral value, which
+# can be expressed as a single value for given time period,
+# like IO, BW, etc.
+TimeSeriesIntegral = List[float]
+# Stores test result for value, which
+# requires distribution to be stored for any time period,
+# like latency.
+TimeSeriesHistogram = List[List[float]]
+TimeSeries = Union[TimeSeriesIntegral, TimeSeriesHistogram]
+RawTestResults = Dict[str, TimeSeries]
+class SensorInfo:
+    """Holds information from a single sensor from a single node"""
+    node_id = None  # type: str
+    source_id = None  # type: str
+    sensor_name = None  # type: str
+    begin_time = None  # type: int
+    end_time = None  # type: int
+    data = None  # type: TimeSeries
+    def __init__(self, node_id: str, source_id: str, sensor_name: str) -> None:
+        self.node_id = node_id
+        self.source_id = source_id
+        self.sensor_name = sensor_name
+class TestInfo:
+    """Contains done test information"""
+    name = None  # type: str
+    iteration_name = None # type: str
+    nodes = None  # type: List[str]
+    start_time = None  # type: int
+    stop_time = None  # type: int
+    params = None  # type: Dict[str, Any]
+    config = None  # type: str
+    node_ids = None # type: List[str]
+class FullTestResult:
+    test_info = None  # type: TestInfo
+    # TODO(koder): array.array or numpy.array?
+    # {(node_id, perf_metrics_name): values}
+    performance_data = None  # type: Dict[Tuple[str, str], TimeSeries]
+    # {(node_id, perf_metrics_name): values}
+    sensors_data = None  # type: Dict[Tuple[str, str, str], SensorInfo]
diff --git a/wally/run_test.py b/wally/run_test.py
index d8ef685..9ae2c9e 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -12,7 +12,7 @@
 from .config import ConfigBlock, Config
 from .suits.mysql import MysqlTest
-from .suits.itest import TestConfig
+from .suits.itest import TestInputConfig
 from .suits.io.fio import IOPerfTest
 from .suits.postgres import PgBenchTest
 from .suits.omgbench import OmgTest
@@ -162,12 +162,12 @@
         with suspend_ctx:
             test_cls = TOOL_TYPE_MAPPER[name]
             remote_dir = ctx.config.default_test_local_folder.format(name=name, uuid=ctx.config.run_uuid)
-            test_cfg = TestConfig(test_cls.__name__,
-                                  params=params,
-                                  run_uuid=ctx.config.run_uuid,
-                                  nodes=test_nodes,
-                                  storage=ctx.storage,
-                                  remote_dir=remote_dir)
+            test_cfg = TestInputConfig(test_cls.__name__,
+                                       params=params,
+                                       run_uuid=ctx.config.run_uuid,
+                                       nodes=test_nodes,
+                                       storage=ctx.storage,
+                                       remote_dir=remote_dir)
@@ -192,7 +192,8 @@
             discover_objs = [i.strip() for i in discover_info.strip().split(",")]
-            ctx.fuel_openstack_creds, nodes = discover.discover(discover_objs,
+            ctx.fuel_openstack_creds, nodes = discover.discover(ctx,
+                                                                discover_objs,
                                                                 not ctx.config.dont_discover_nodes)
@@ -407,6 +408,28 @@
+def clouds_connect_stage(ctx: TestRun) -> None:
+    # TODO(koder): need to use this to connect to openstack in upper code
+    # conn = ctx.config['clouds/openstack']
+    # user, passwd, tenant = parse_creds(conn['creds'])
+    # auth_data = dict(auth_url=conn['auth_url'],
+    #                  username=user,
+    #                  api_key=passwd,
+    #                  project_id=tenant)  # type: Dict[str, str]
+    # logger.debug("Discovering openstack nodes with connection details: %r", conn)
+    # connect to openstack, fuel
+    # # parse FUEL REST credentials
+    # username, tenant_name, password = parse_creds(fuel_data['creds'])
+    # creds = {"username": username,
+    #          "tenant_name": tenant_name,
+    #          "password": password}
+    #
+    # # connect to FUEL
+    # conn = fuel_rest_api.KeystoneAuth(fuel_data['url'], creds, headers=None)
+    pass
 def shut_down_vms_stage(ctx: TestRun, nodes_ids: List[int]) -> None:
     if nodes_ids:
         logger.info("Removing nodes")
@@ -430,87 +453,7 @@
 def console_report_stage(ctx: TestRun) -> None:
     # TODO(koder): load data from storage
     raise NotImplementedError("...")
-    # first_report = True
-    # text_rep_fname = ctx.config.text_report_file
-    #
-    # with open(text_rep_fname, "w") as fd:
-    #     for tp, data in ctx.results.items():
-    #         if 'io' == tp and data is not None:
-    #             rep_lst = []
-    #             for result in data:
-    #                 rep_lst.append(
-    #                     IOPerfTest.format_for_console(list(result)))
-    #             rep = "\n\n".join(rep_lst)
-    #         elif tp in ['mysql', 'pgbench'] and data is not None:
-    #             rep = MysqlTest.format_for_console(data)
-    #         elif tp == 'omg':
-    #             rep = OmgTest.format_for_console(data)
-    #         else:
-    #             logger.warning("Can't generate text report for " + tp)
-    #             continue
-    #
-    #         fd.write(rep)
-    #         fd.write("\n")
-    #
-    #         if first_report:
-    #             logger.info("Text report were stored in " + text_rep_fname)
-    #             first_report = False
-    #
-    #         print("\n" + rep + "\n")
-# def test_load_report_stage(cfg: Config, ctx: TestRun) -> None:
-#     load_rep_fname = cfg.load_report_file
-#     found = False
-#     for idx, (tp, data) in enumerate(ctx.results.items()):
-#         if 'io' == tp and data is not None:
-#             if found:
-#                 logger.error("Making reports for more than one " +
-#                              "io block isn't supported! All " +
-#                              "report, except first are skipped")
-#                 continue
-#             found = True
-#             report.make_load_report(idx, cfg['results'], load_rep_fname)
 def html_report_stage(ctx: TestRun) -> None:
     # TODO(koder): load data from storage
     raise NotImplementedError("...")
-    # html_rep_fname = cfg.html_report_file
-    # found = False
-    # for tp, data in ctx.results.items():
-    #     if 'io' == tp and data is not None:
-    #         if found or len(data) > 1:
-    #             logger.error("Making reports for more than one " +
-    #                          "io block isn't supported! All " +
-    #                          "report, except first are skipped")
-    #             continue
-    #         found = True
-    #         report.make_io_report(list(data[0]),
-    #                               cfg.get('comment', ''),
-    #                               html_rep_fname,
-    #                               lab_info=ctx.nodes)
-# def load_data_from_path(test_res_dir: str) -> Mapping[str, List[Any]]:
-#     files = get_test_files(test_res_dir)
-#     raw_res = yaml_load(open(files['raw_results']).read())
-#     res = collections.defaultdict(list)
-#     for tp, test_lists in raw_res:
-#         for tests in test_lists:
-#             for suite_name, suite_data in tests.items():
-#                 result_folder = suite_data[0]
-#                 res[tp].append(TOOL_TYPE_MAPPER[tp].load(suite_name, result_folder))
-#     return res
-# def load_data_from_path_stage(var_dir: str, _, ctx: TestRun) -> None:
-#     for tp, vals in load_data_from_path(var_dir).items():
-#         ctx.results.setdefault(tp, []).extend(vals)
-# def load_data_from(var_dir: str) -> Callable[[TestRun], None]:
-#     return functools.partial(load_data_from_path_stage, var_dir)
diff --git a/wally/sensors.py b/wally/sensors.py
new file mode 100644
index 0000000..b579f3f
--- /dev/null
+++ b/wally/sensors.py
@@ -0,0 +1,90 @@
+from typing import List, Dict, Tuple
+from .test_run_class import TestRun
+from . import sensors_rpc_plugin
+plugin_fname = sensors_rpc_plugin.__file__.rsplit(".", 1)[0] + ".py"
+SENSORS_PLUGIN_CODE = open(plugin_fname).read()
+# TODO(koder): in case if node has more than one role sensor settigns might be incorrect
+def start_sensors_stage(ctx: TestRun) -> None:
+    if 'sensors' not in ctx.config:
+        return
+    per_role_config = {}
+    for name, val in ctx.config['sensors'].copy():
+        if isinstance(val, str):
+            val = {vl.strip(): ".*" for vl in val.split(",")}
+        elif isinstance(val, list):
+            val = {vl: ".*" for vl in val}
+        per_role_config[name] = val
+    if 'all' in per_role_config:
+        all_vl = per_role_config.pop('all')
+        all_roles = set(per_role_config)
+        for node in ctx.nodes:
+            all_roles.update(node.info.roles)
+        for name, vals in list(per_role_config.items()):
+            new_vals = all_roles.copy()
+            new_vals.update(vals)
+            per_role_config[name] = new_vals
+    for node in ctx.nodes:
+        node_cfg = {}
+        for role in node.info.roles:
+            node_cfg.update(per_role_config.get(role, {}))
+        if node_cfg:
+            node.conn.upload_plugin(SENSORS_PLUGIN_CODE)
+            ctx.sensors_run_on.add(node.info.node_id())
+        node.conn.sensors.start()
+def collect_sensors_stage(ctx: TestRun, stop: bool = True) -> None:
+    for node in ctx.nodes:
+        node_id = node.info.node_id()
+        if node_id in ctx.sensors_run_on:
+            if stop:
+                data, collected_at = node.conn.sensors.stop()  # type: Dict[Tuple[str, str], List[int]], List[float]
+            else:
+                data, collected_at = node.conn.sensors.get_updates()
+            for (source_name, sensor_name), values in data.items():
+                path = "metric/{}/{}/{}".format(node_id, source_name, sensor_name)
+                ctx.storage.append(path, values)
+                ctx.storage.append("metric/{}/collected_at".format(node_id), collected_at)
+# def delta(func, only_upd=True):
+#     prev = {}
+#     while True:
+#         for dev_name, vals in func():
+#             if dev_name not in prev:
+#                 prev[dev_name] = {}
+#                 for name, (val, _) in vals.items():
+#                     prev[dev_name][name] = val
+#             else:
+#                 dev_prev = prev[dev_name]
+#                 res = {}
+#                 for stat_name, (val, accum_val) in vals.items():
+#                     if accum_val:
+#                         if stat_name in dev_prev:
+#                             delta = int(val) - int(dev_prev[stat_name])
+#                             if not only_upd or 0 != delta:
+#                                 res[stat_name] = str(delta)
+#                         dev_prev[stat_name] = val
+#                     elif not only_upd or '0' != val:
+#                         res[stat_name] = val
+#                 if only_upd and len(res) == 0:
+#                     continue
+#                 yield dev_name, res
+#         yield None, None
diff --git a/wally/sensors_rpc_plugin.py b/wally/sensors_rpc_plugin.py
index a2758c3..c218bff 100644
--- a/wally/sensors_rpc_plugin.py
+++ b/wally/sensors_rpc_plugin.py
@@ -1,12 +1,19 @@
 import os
-from collections import namedtuple
-SensorInfo = namedtuple("SensorInfo", ['value', 'is_accumulated'])
-# SensorInfo = NamedTuple("SensorInfo", [('value', int), ('is_accumulated', bool)])
+import time
+import array
+import threading
-def provides(name: str):
+mod_name = "sensor"
+__version__ = (0, 1)
+SensorsMap = {}
+def provides(name):
     def closure(func):
+        SensorsMap[name] = func
         return func
     return closure
@@ -28,16 +35,16 @@
 def get_pid_list(disallowed_prefixes, allowed_prefixes):
     """Return pid list from list of pids and names"""
     # exceptions
-    but = disallowed_prefixes if disallowed_prefixes is not None else []
+    disallowed = disallowed_prefixes if disallowed_prefixes is not None else []
     if allowed_prefixes is None:
         # if nothing setted - all ps will be returned except setted
         result = [pid
                   for pid in os.listdir('/proc')
-                  if pid.isdigit() and pid not in but]
+                  if pid.isdigit() and pid not in disallowed]
         result = []
         for pid in os.listdir('/proc'):
-            if pid.isdigit() and pid not in but:
+            if pid.isdigit() and pid not in disallowed:
                 name = get_pid_name(pid)
                 if pid in allowed_prefixes or \
                    any(name.startswith(val) for val in allowed_prefixes):
@@ -61,33 +68,6 @@
         return "no_such_process"
-def delta(func, only_upd=True):
-    prev = {}
-    while True:
-        for dev_name, vals in func():
-            if dev_name not in prev:
-                prev[dev_name] = {}
-                for name, (val, _) in vals.items():
-                    prev[dev_name][name] = val
-            else:
-                dev_prev = prev[dev_name]
-                res = {}
-                for stat_name, (val, accum_val) in vals.items():
-                    if accum_val:
-                        if stat_name in dev_prev:
-                            delta = int(val) - int(dev_prev[stat_name])
-                            if not only_upd or 0 != delta:
-                                res[stat_name] = str(delta)
-                        dev_prev[stat_name] = val
-                    elif not only_upd or '0' != val:
-                        res[stat_name] = val
-                if only_upd and len(res) == 0:
-                    continue
-                yield dev_name, res
-        yield None, None
 #  1 - major number
 #  2 - minor mumber
 #  3 - device name
@@ -121,43 +101,14 @@
     for line in open('/proc/diskstats'):
         vals = line.split()
         dev_name = vals[2]
         dev_ok = is_dev_accepted(dev_name,
-        if dev_name[-1].isdigit():
-            dev_ok = False
+        if not dev_ok or dev_name[-1].isdigit():
+            continue
-        if dev_ok:
-            for pos, name, accum_val in io_values_pos:
-                sensor_name = "{0}.{1}".format(dev_name, name)
-                results[sensor_name] = SensorInfo(int(vals[pos]), accum_val)
-    return results
-def get_latency(stat1, stat2):
-    disks = set(i.split('.', 1)[0] for i in stat1)
-    results = {}
-    for disk in disks:
-        rdc = disk + '.reads_completed'
-        wrc = disk + '.writes_completed'
-        rdt = disk + '.rtime'
-        wrt = disk + '.wtime'
-        lat = 0.0
-        io_ops1 = stat1[rdc].value + stat1[wrc].value
-        io_ops2 = stat2[rdc].value + stat2[wrc].value
-        diops = io_ops2 - io_ops1
-        if diops != 0:
-            io1 = stat1[rdt].value + stat1[wrt].value
-            io2 = stat2[rdt].value + stat2[wrt].value
-            lat = abs(float(io1 - io2)) / diops
-        results[disk + '.latence'] = SensorInfo(lat, False)
+        for pos, name, _ in io_values_pos:
+            results["{0}.{1}".format(dev_name, name)] = int(vals[pos])
     return results
@@ -201,28 +152,8 @@
             dev_ok = False
         if dev_ok:
-            for pos, name, accum_val in net_values_pos:
-                sensor_name = "{0}.{1}".format(dev_name, name)
-                results[sensor_name] = SensorInfo(int(vals[pos]), accum_val)
-    return results
-def pscpu_stat(disallowed_prefixes=None, allowed_prefixes=None):
-    results = {}
-    pid_list = get_pid_list(disallowed_prefixes, allowed_prefixes)
-    for pid in pid_list:
-        try:
-            dev_name = get_pid_name(pid)
-            pid_stat1 = pid_stat(pid)
-            sensor_name = "{0}.{1}".format(dev_name, pid)
-            results[sensor_name] = SensorInfo(pid_stat1, True)
-        except IOError:
-            # may be, proc has already terminated, skip it
-            continue
+            for pos, name, _ in net_values_pos:
+                results["{0}.{1}".format(dev_name, name)] = int(vals[pos])
     return results
@@ -239,14 +170,19 @@
     return float(int(utime) + int(stime))
-# Based on ps_mem.py:
-# Licence: LGPLv2
-# Author:  P@draigBrady.com
-# Source:  http://www.pixelbeat.org/scripts/ps_mem.py
-#   http://github.com/pixelb/scripts/commits/master/scripts/ps_mem.py
+def pscpu_stat(disallowed_prefixes=None, allowed_prefixes=None):
+    results = {}
+    # TODO(koder): fixed list of PID's nust be given
+    for pid in get_pid_list(disallowed_prefixes, allowed_prefixes):
+        try:
+            results["{0}.{1}".format(get_pid_name(pid), pid)] = pid_stat(pid)
+        except IOError:
+            # may be, proc has already terminated, skip it
+            continue
+    return results
-# Note shared is always a subset of rss (trs is not always)
 def get_mem_stats(pid):
     """Return memory data of pid in format (private, shared)"""
@@ -257,7 +193,7 @@
     private = 0
     pss = 0
-    # add 0.5KiB as this avg error due to trunctation
+    # add 0.5KiB as this avg error due to truncation
     pss_adjust = 0.5
     for line in lines:
@@ -279,39 +215,38 @@
     return (private, shared)
+def get_ram_size():
+    """Return RAM size in Kb"""
+    with open("/proc/meminfo") as proc:
+        mem_total = proc.readline().split()
+    return int(mem_total[1])
 def psram_stat(disallowed_prefixes=None, allowed_prefixes=None):
     results = {}
-    pid_list = get_pid_list(disallowed_prefixes, allowed_prefixes)
-    for pid in pid_list:
+    # TODO(koder): fixed list of PID's nust be given
+    for pid in get_pid_list(disallowed_prefixes, allowed_prefixes):
             dev_name = get_pid_name(pid)
             private, shared = get_mem_stats(pid)
             total = private + shared
             sys_total = get_ram_size()
-            usage = float(total) / float(sys_total)
+            usage = float(total) / sys_total
             sensor_name = "{0}({1})".format(dev_name, pid)
-            results[sensor_name + ".private_mem"] = SensorInfo(private, False)
-            results[sensor_name + ".shared_mem"] = SensorInfo(shared, False)
-            results[sensor_name + ".used_mem"] = SensorInfo(total, False)
-            name = sensor_name + ".mem_usage_percent"
-            results[name] = SensorInfo(usage * 100, False)
+            results.update([
+                (sensor_name + ".private_mem", private),
+                (sensor_name + ".shared_mem", shared),
+                (sensor_name + ".used_mem", total),
+                (sensor_name + ".mem_usage_percent", int(usage * 100))])
         except IOError:
             # permission denied or proc die
     return results
-def get_ram_size():
-    """Return RAM size in Kb"""
-    with open("/proc/meminfo") as proc:
-        mem_total = proc.readline().split()
-    return mem_total[1]
 # 0 - cpu name
 # 1 - user: normal processes executing in user mode
 # 2 - nice: niced processes executing in user mode
@@ -341,13 +276,12 @@
         dev_name = vals[0]
         if dev_name == 'cpu':
-            for pos, name, accum_val in cpu_values_pos:
+            for pos, name, _ in cpu_values_pos:
                 sensor_name = "{0}.{1}".format(dev_name, name)
-                results[sensor_name] = SensorInfo(int(vals[pos]),
-                                                  accum_val)
+                results[sensor_name] = int(vals[pos])
         elif dev_name == 'procs_blocked':
             val = int(vals[1])
-            results["cpu.procs_blocked"] = SensorInfo(val, False)
+            results["cpu.procs_blocked"] = val
         elif dev_name.startswith('cpu'):
             core_count += 1
@@ -355,9 +289,10 @@
     TASKSPOS = 3
     vals = open('/proc/loadavg').read().split()
     ready_procs = vals[TASKSPOS].partition('/')[0]
     # dec on current proc
     procs_queue = (float(ready_procs) - 1) / core_count
-    results["cpu.procs_queue"] = SensorInfo(procs_queue, False)
+    results["cpu.procs_queue"] = procs_queue
     return results
@@ -380,7 +315,9 @@
 def sysram_stat(disallowed_prefixes=None, allowed_prefixes=None):
     if allowed_prefixes is None:
         allowed_prefixes = ram_fields
     results = {}
     for line in open('/proc/meminfo'):
         vals = line.split()
         dev_name = vals[0].rstrip(":")
@@ -392,10 +329,112 @@
         title = "ram.{0}".format(dev_name)
         if dev_ok:
-            results[title] = SensorInfo(int(vals[1]), False)
+            results[title] = int(vals[1])
     if 'ram.MemFree' in results and 'ram.MemTotal' in results:
         used = results['ram.MemTotal'].value - results['ram.MemFree'].value
-        usage = float(used) / results['ram.MemTotal'].value
-        results["ram.usage_percent"] = SensorInfo(usage, False)
+        results["ram.usage_percent"] = int(float(used) / results['ram.MemTotal'].value)
     return results
+class SensorsData(object):
+    def __init__(self):
+        self.cond = threading.Condition()
+        self.collected_at = array.array("f")
+        self.stop = False
+        self.data = {}  # map sensor_name to list of results
+        self.data_fd = None
+# TODO(koder): a lot code here can be optimized and cached, but nobody cares (c)
+def sensors_bg_thread(sensors_config, sdata):
+    next_collect_at = time.time()
+    while not sdata.stop:
+        dtime = next_collect_at - time.time()
+        if dtime > 0:
+            sdata.cond.wait(dtime)
+        if sdata.stop:
+            break
+        ctm = time.time()
+        curr = {}
+        for name, config in sensors_config.items():
+            params = {}
+            if "allow" in config:
+                params["allowed_prefixes"] = config["allow"]
+            if "disallow" in config:
+                params["disallowed_prefixes"] = config["disallow"]
+            curr[name] = SensorsMap[name](**params)
+        etm = time.time()
+        if etm - ctm > 0.1:
+            # TODO(koder): need to signal that something in not really ok with sensor collecting
+            pass
+        with sdata.cond:
+            sdata.collected_at.append(ctm)
+            for source_name, vals in curr.items():
+                for sensor_name, val in vals.items():
+                    key = (source_name, sensor_name)
+                    if key not in sdata.data:
+                        sdata.data[key] = array.array("I", [val])
+                    else:
+                        sdata.data[key].append(val)
+sensors_thread = None
+sdata = None  # type: SensorsData
+def rpc_start(sensors_config):
+    global sensors_thread
+    global sdata
+    if sensors_thread is not None:
+        raise ValueError("Thread already running")
+    sdata = SensorsData()
+    sensors_thread = threading.Thread(target=sensors_bg_thread, args=(sensors_config, sdata))
+    sensors_thread.daemon = True
+    sensors_thread.start()
+def rpc_get_updates():
+    if sdata is None:
+        raise ValueError("No sensor thread running")
+    with sdata.cond:
+        res = sdata.data
+        collected_at = sdata.collected_at
+        sdata.collected_at = array.array("f")
+        sdata.data = {name: array.array("I") for name in sdata.data}
+    return res, collected_at
+def rpc_stop():
+    global sensors_thread
+    global sdata
+    if sensors_thread is None:
+        raise ValueError("No sensor thread running")
+    sdata.stop = True
+    with sdata.cond:
+        sdata.cond.notify_all()
+    sensors_thread.join()
+    res = sdata.data
+    collected_at = sdata.collected_at
+    sensors_thread = None
+    sdata = None
+    return res, collected_at
diff --git a/wally/sensors_rpc_plugin.pyi b/wally/sensors_rpc_plugin.pyi
index c4b387b..21fe1d5 100644
--- a/wally/sensors_rpc_plugin.pyi
+++ b/wally/sensors_rpc_plugin.pyi
@@ -1,24 +1,22 @@
-import os
 from typing import NamedTuple, TypeVar, Callable, Any, Optional, List, Iterable, Dict, Tuple
-SensorInfo = NamedTuple("SensorInfo", [('value', int), ('is_accumulated', bool)])
 Pid = TypeVar('Pid', str)
 AnyFunc = TypeVar('AnyFunc', Callable[..., Any])
 PrefixList = Optional[List[str]]
-SensorDict = Dict[str, SensorInfo]
+SensorDict = Dict[str, int]
 def provides(name: str) -> Callable[[AnyFunc], AnyFunc]: ...
-def is_dev_accepted(name, disallowed_prefixes: PrefixList, allowed_prefixes: PrefixList) -> bool: ...
+def is_dev_accepted(name: str, disallowed_prefixes: PrefixList, allowed_prefixes: PrefixList) -> bool: ...
 def get_pid_name(pid: Pid) -> str: ...
-def delta(func, only_upd: bool = True) -> Iterable[Optional[str], Optional[Dict[str, str]]]: ...
-def get_latency(stat1: SensorDict, stat2: SensorDict) -> SensorDict: ...
 def pid_stat(pid: Pid) -> float: ...
 def get_mem_stats(pid : Pid) -> Tuple[int, int]: ...
-def get_ram_size() -> str: ...
+def get_ram_size() -> int: ...
-def io_stat(disallowed_prefixes: PrefixList, allowed_prefixes: PrefixList) -> SensorDict: ...
-def net_stat(disallowed_prefixes: PrefixList, allowed_prefixes: PrefixList) -> SensorDict: ...
-def pscpu_stat(disallowed_prefixes: PrefixList, allowed_prefixes: PrefixList) -> SensorDict: ...
-def psram_stat(disallowed_prefixes: PrefixList, allowed_prefixes: PrefixList) -> SensorDict: ...
-def syscpu_stat(disallowed_prefixes: PrefixList, allowed_prefixes: PrefixList) -> SensorDict: ...
-def sysram_stat(disallowed_prefixes: PrefixList, allowed_prefixes: PrefixList) -> SensorDict: ...
+def io_stat(disallowed_prefixes: PrefixList, allowed_prefixes: PrefixList) -> int: ...
+def net_stat(disallowed_prefixes: PrefixList, allowed_prefixes: PrefixList) -> int: ...
+def pscpu_stat(disallowed_prefixes: PrefixList, allowed_prefixes: PrefixList) -> int: ...
+def psram_stat(disallowed_prefixes: PrefixList, allowed_prefixes: PrefixList) -> int: ...
+def syscpu_stat(disallowed_prefixes: PrefixList, allowed_prefixes: PrefixList) -> int: ...
+def sysram_stat(disallowed_prefixes: PrefixList, allowed_prefixes: PrefixList) -> int: ...
diff --git a/wally/stage.py b/wally/stage.py
index e06e31d..0451d18 100644
--- a/wally/stage.py
+++ b/wally/stage.py
@@ -1,6 +1,6 @@
 import logging
 import contextlib
-from typing import Callable
+from typing import Callable, Iterator
 from .utils import StopTestError
 from .test_run_class import TestRun
@@ -10,7 +10,7 @@
-def log_stage(stage) -> None:
+def log_stage(stage) -> Iterator[None]:
     msg_templ = "Exception during {0}: {1!s}"
     msg_templ_no_exc = "During {0}"
diff --git a/wally/storage.py b/wally/storage.py
index 349995f..05e4259 100644
--- a/wally/storage.py
+++ b/wally/storage.py
@@ -141,8 +141,8 @@
     def __contains__(self, path: str) -> bool:
         return path in self.storage
-    def list(self, path: str) -> Iterable[Tuple[bool, str]]:
-        return self.storage.list(path)
+    def list(self, *path: str) -> Iterable[Tuple[bool, str]]:
+        return self.storage.list("/".join(path))
     def construct(self, path: str, raw_val: IStorable, obj_class: Type[ObjClass]) -> ObjClass:
         if obj_class in (int, str, dict, list, None):
@@ -162,16 +162,18 @@
         return obj
-    def load_list(self, path: str, obj_class: Type[ObjClass]) -> List[ObjClass]:
-        raw_val = self[path]
+    def load_list(self, obj_class: Type[ObjClass], *path: str) -> List[ObjClass]:
+        path_s = "/".join(path)
+        raw_val = self[path_s]
         assert isinstance(raw_val, list)
-        return [self.construct(path, val, obj_class) for val in cast(list, raw_val)]
+        return [self.construct(path_s, val, obj_class) for val in cast(list, raw_val)]
-    def load(self, path: str, obj_class: Type[ObjClass]) -> ObjClass:
-        return self.construct(path, self[path], obj_class)
+    def load(self, obj_class: Type[ObjClass], *path: str) -> ObjClass:
+        path_s = "/".join(path)
+        return self.construct(path_s, self[path_s], obj_class)
-    def get_stream(self, path: str) -> IO:
-        return self.storage.get_stream(path)
+    def get_stream(self, *path: str) -> IO:
+        return self.storage.get_stream("/".join(path))
     def get(self, path: str, default: Any = None) -> Any:
@@ -179,6 +181,9 @@
         except KeyError:
             return default
+    def append(self, path: str, data: List):
+        raise NotImplemented()
 def make_storage(url: str, existing: bool = False) -> Storage:
     return Storage(FSStorage(url, existing), YAMLSerializer())
diff --git a/wally/storage_structure.txt b/wally/storage_structure.txt
index 8ba89c1..583a4a0 100644
--- a/wally/storage_structure.txt
+++ b/wally/storage_structure.txt
@@ -5,7 +5,17 @@
 discovered_nodes: List[NodeInfo] - list of discovered nodes
 reused_nodes: List[NodeInfo] - list of reused nodes from cluster
 spawned_vm_ids: List[int] - list of openstack VM id's, spawned for test
-__types__ = type of data in keys
 info/comment : str - run comment
 info/run_uuid : str - run uuid
-info/run_time : float - run unix time
\ No newline at end of file
+info/run_time : float - run unix time
+# test results
+result/{id}/info : TestInfo - test information: name, cluster config, test parameters, etc.
+result/{id}/measurement/{node}/{name} : List[float] - measurements data. E.g.:
+    result/{id}/measurement/node-12/iops   - for BW uses iops * block_sz
+    result/{id}/measurement/node-12/lat_histo
+metric/{node_name}/{dev}/{metric_name} : List[float] - node metrics data. E.g.:
+    metric/node-22/cpu/load
+    metric/node-22/sda/read_io
+    metric/node-22/eth0/data_recv
\ No newline at end of file
diff --git a/wally/suits/io/fio.py b/wally/suits/io/fio.py
index 2360a55..0f4ebde 100644
--- a/wally/suits/io/fio.py
+++ b/wally/suits/io/fio.py
@@ -1,970 +1,128 @@
-import re
-import time
-import json
-import stat
-import random
-import hashlib
 import os.path
 import logging
-import datetime
-import functools
-import collections
-from typing import Dict, List, Callable, Any, Tuple, Optional
-import yaml
-import texttable
-from paramiko.ssh_exception import SSHException
-from concurrent.futures import ThreadPoolExecutor, wait
+from typing import Dict, List, Union, cast
 import wally
-from ...pretty_yaml import dumps
-from ...statistic import round_3_digit, data_property, average
-from ...utils import ssize2b, sec_to_str, StopTestError, Barrier, get_os
-from ...inode import INode
-from ..itest import (TimeSeriesValue, PerfTest, TestResults, TestConfig)
-from .fio_task_parser import (execution_time, fio_cfg_compile,
-                              get_test_summary, get_test_summary_tuple,
-                              get_test_sync_mode, FioJobSection)
-from .rpc_plugin import parse_fio_result
+from ...utils import ssize2b, StopTestError, get_os
+from ...node_interfaces import IRPCNode
+from ..itest import ThreadedTest, IterationConfig, RunTestRes
+from .fio_task_parser import execution_time, fio_cfg_compile, FioJobSection, FioParams
 logger = logging.getLogger("wally")
-class NoData:
-    pass
-def cached_prop(func: Callable[..., Any]) -> Callable[..., Any]:
-    @property
-    @functools.wraps(func)
-    def closure(self) -> Any:
-        val = getattr(self, "_" + func.__name__)
-        if val is NoData:
-            val = func(self)
-            setattr(self, "_" + func.__name__, val)
-        return val
-    return closure
-def load_fio_log_file(fname: str) -> TimeSeriesValue:
-    with open(fname) as fd:
-        it = [ln.split(',')[:2] for ln in fd]
-    vals = [(float(off) / 1000,  # convert us to ms
-             float(val.strip()) + 0.5)  # add 0.5 to compemsate average value
-                                        # as fio trimm all values in log to integer
-            for off, val in it]
-    return TimeSeriesValue(vals)
-def load_sys_log_file(ftype: str, fname: str) -> TimeSeriesValue:
-    assert ftype == 'iops'
-    pval = None
-    with open(fname) as fd:
-        iops = []
-        for ln in fd:
-            params = ln.split()
-            cval = int(params[WRITE_IOPS_DISCSTAT_POS]) + \
-                int(params[READ_IOPS_DISCSTAT_POS])
-            if pval is not None:
-                iops.append(cval - pval)
-            pval = cval
-    vals = [(idx * 1000, val) for idx, val in enumerate(iops)]
-    return TimeSeriesValue(vals)
-def load_test_results(folder: str, run_num: int) -> 'FioRunResult':
-    res = {}
-    params = None
-    fn = os.path.join(folder, str(run_num) + '_params.yaml')
-    params = yaml.load(open(fn).read())
-    conn_ids_set = set()
-    rr = r"{}_(?P<conn_id>.*?)_(?P<type>[^_.]*)\.\d+\.log$".format(run_num)
-    for fname in os.listdir(folder):
-        rm = re.match(rr, fname)
-        if rm is None:
-            continue
-        conn_id_s = rm.group('conn_id')
-        conn_id = conn_id_s.replace('_', ':')
-        ftype = rm.group('type')
-        if ftype not in ('iops', 'bw', 'lat'):
-            continue
-        ts = load_fio_log_file(os.path.join(folder, fname))
-        res.setdefault(ftype, {}).setdefault(conn_id, []).append(ts)
-        conn_ids_set.add(conn_id)
-    rr = r"{}_(?P<conn_id>.*?)_(?P<type>[^_.]*)\.sys\.log$".format(run_num)
-    for fname in os.listdir(folder):
-        rm = re.match(rr, fname)
-        if rm is None:
-            continue
-        conn_id_s = rm.group('conn_id')
-        conn_id = conn_id_s.replace('_', ':')
-        ftype = rm.group('type')
-        if ftype not in ('iops', 'bw', 'lat'):
-            continue
-        ts = load_sys_log_file(ftype, os.path.join(folder, fname))
-        res.setdefault(ftype + ":sys", {}).setdefault(conn_id, []).append(ts)
-        conn_ids_set.add(conn_id)
-    mm_res = {}
-    if len(res) == 0:
-        raise ValueError("No data was found")
-    for key, data in res.items():
-        conn_ids = sorted(conn_ids_set)
-        awail_ids = [conn_id for conn_id in conn_ids if conn_id in data]
-        matr = [data[conn_id] for conn_id in awail_ids]
-        mm_res[key] = MeasurementMatrix(matr, awail_ids)
-    raw_res = {}
-    for conn_id in conn_ids:
-        fn = os.path.join(folder, "{0}_{1}_rawres.json".format(run_num, conn_id_s))
-        # remove message hack
-        fc = "{" + open(fn).read().split('{', 1)[1]
-        raw_res[conn_id] = json.loads(fc)
-    fio_task = FioJobSection(params['name'])
-    fio_task.vals.update(params['vals'])
-    config = TestConfig('io', params, None, params['nodes'], folder, None)
-    return FioRunResult(config, fio_task, mm_res, raw_res, params['intervals'], run_num)
-class Attrmapper:
-    def __init__(self, dct: Dict[str, Any]):
-        self.__dct = dct
-    def __getattr__(self, name):
-        try:
-            return self.__dct[name]
-        except KeyError:
-            raise AttributeError(name)
-class DiskPerfInfo:
-    def __init__(self, name: str, summary: str, params: Dict[str, Any], testnodes_count: int):
-        self.name = name
-        self.bw = None
-        self.iops = None
-        self.lat = None
-        self.lat_50 = None
-        self.lat_95 = None
-        self.lat_avg = None
-        self.raw_bw = []
-        self.raw_iops = []
-        self.raw_lat = []
-        self.params = params
-        self.testnodes_count = testnodes_count
-        self.summary = summary
-        self.p = Attrmapper(self.params['vals'])
-        self.sync_mode = get_test_sync_mode(self.params['vals'])
-        self.concurence = self.params['vals'].get('numjobs', 1)
-def get_lat_perc_50_95(lat_mks: List[float]) -> Tuple[float, float]:
-    curr_perc = 0
-    perc_50 = None
-    perc_95 = None
-    pkey = None
-    for key, val in sorted(lat_mks.items()):
-        if curr_perc + val >= 50 and perc_50 is None:
-            if pkey is None or val < 1.:
-                perc_50 = key
-            else:
-                perc_50 = (50. - curr_perc) / val * (key - pkey) + pkey
-        if curr_perc + val >= 95:
-            if pkey is None or val < 1.:
-                perc_95 = key
-            else:
-                perc_95 = (95. - curr_perc) / val * (key - pkey) + pkey
-            break
-        pkey = key
-        curr_perc += val
-    # for k, v in sorted(lat_mks.items()):
-    #     if k / 1000 > 0:
-    #         print "{0:>4}".format(k / 1000), v
-    # print perc_50 / 1000., perc_95 / 1000.
-    # exit(1)
-    return perc_50 / 1000., perc_95 / 1000.
-class IOTestResults:
-    def __init__(self, suite_name: str, fio_results: 'FioRunResult', log_directory: str):
-        self.suite_name = suite_name
-        self.fio_results = fio_results
-        self.log_directory = log_directory
-    def __iter__(self):
-        return iter(self.fio_results)
-    def __len__(self):
-        return len(self.fio_results)
-    def get_yamable(self) -> Dict[str, List[str]]:
-        items = [(fio_res.summary(), fio_res.idx) for fio_res in self]
-        return {self.suite_name: [self.log_directory] + items}
-class FioRunResult(TestResults):
-    """
-    Fio run results
-    config: TestConfig
-    fio_task: FioJobSection
-    ts_results: {str: MeasurementMatrix[TimeSeriesValue]}
-    raw_result: ????
-    run_interval:(float, float) - test tun time, used for sensors
-    """
-    def __init__(self, config, fio_task, ts_results, raw_result, run_interval, idx):
-        self.name = fio_task.name.rsplit("_", 1)[0]
-        self.fio_task = fio_task
-        self.idx = idx
-        self.bw = ts_results['bw']
-        self.lat = ts_results['lat']
-        self.iops = ts_results['iops']
-        if 'iops:sys' in ts_results:
-            self.iops_sys = ts_results['iops:sys']
-        else:
-            self.iops_sys = None
-        res = {"bw": self.bw,
-               "lat": self.lat,
-               "iops": self.iops,
-               "iops:sys": self.iops_sys}
-        self.sensors_data = None
-        self._pinfo = None
-        TestResults.__init__(self, config, res, raw_result, run_interval)
-    def get_params_from_fio_report(self):
-        nodes = self.bw.connections_ids
-        iops = [self.raw_result[node]['jobs'][0]['mixed']['iops'] for node in nodes]
-        total_ios = [self.raw_result[node]['jobs'][0]['mixed']['total_ios'] for node in nodes]
-        runtime = [self.raw_result[node]['jobs'][0]['mixed']['runtime'] / 1000 for node in nodes]
-        flt_iops = [float(ios) / rtime for ios, rtime in zip(total_ios, runtime)]
-        bw = [self.raw_result[node]['jobs'][0]['mixed']['bw'] for node in nodes]
-        total_bytes = [self.raw_result[node]['jobs'][0]['mixed']['io_bytes'] for node in nodes]
-        flt_bw = [float(tbytes) / rtime for tbytes, rtime in zip(total_bytes, runtime)]
-        return {'iops': iops,
-                'flt_iops': flt_iops,
-                'bw': bw,
-                'flt_bw': flt_bw}
-    def summary(self):
-        return get_test_summary(self.fio_task, len(self.config.nodes))
-    def summary_tpl(self):
-        return get_test_summary_tuple(self.fio_task, len(self.config.nodes))
-    def get_lat_perc_50_95_multy(self):
-        lat_mks = collections.defaultdict(lambda: 0)
-        num_res = 0
-        for result in self.raw_result.values():
-            num_res += len(result['jobs'])
-            for job_info in result['jobs']:
-                for k, v in job_info['latency_ms'].items():
-                    if isinstance(k, basestring) and k.startswith('>='):
-                        lat_mks[int(k[2:]) * 1000] += v
-                    else:
-                        lat_mks[int(k) * 1000] += v
-                for k, v in job_info['latency_us'].items():
-                    lat_mks[int(k)] += v
-        for k, v in lat_mks.items():
-            lat_mks[k] = float(v) / num_res
-        return get_lat_perc_50_95(lat_mks)
-    def disk_perf_info(self, avg_interval=2.0):
-        if self._pinfo is not None:
-            return self._pinfo
-        testnodes_count = len(self.config.nodes)
-        pinfo = DiskPerfInfo(self.name,
-                             self.summary(),
-                             self.params,
-                             testnodes_count)
-        def prepare(data, drop=1):
-            if data is None:
-                return data
-            res = []
-            for ts_data in data:
-                if ts_data.average_interval() < avg_interval:
-                    ts_data = ts_data.derived(avg_interval)
-                # drop last value on bounds
-                # as they may contains ranges without activities
-                assert len(ts_data.values) >= drop + 1, str(drop) + " " + str(ts_data.values)
-                if drop > 0:
-                    res.append(ts_data.values[:-drop])
-                else:
-                    res.append(ts_data.values)
-            return res
-        def agg_data(matr):
-            arr = sum(matr, [])
-            min_len = min(map(len, arr))
-            res = []
-            for idx in range(min_len):
-                res.append(sum(dt[idx] for dt in arr))
-            return res
-        pinfo.raw_lat = map(prepare, self.lat.per_vm())
-        num_th = sum(map(len, pinfo.raw_lat))
-        lat_avg = [val / num_th for val in agg_data(pinfo.raw_lat)]
-        pinfo.lat_avg = data_property(lat_avg).average / 1000  # us to ms
-        pinfo.lat_50, pinfo.lat_95 = self.get_lat_perc_50_95_multy()
-        pinfo.lat = pinfo.lat_50
-        pinfo.raw_bw = map(prepare, self.bw.per_vm())
-        pinfo.raw_iops = map(prepare, self.iops.per_vm())
-        if self.iops_sys is not None:
-            pinfo.raw_iops_sys = map(prepare, self.iops_sys.per_vm())
-            pinfo.iops_sys = data_property(agg_data(pinfo.raw_iops_sys))
-        else:
-            pinfo.raw_iops_sys = None
-            pinfo.iops_sys = None
-        fparams = self.get_params_from_fio_report()
-        fio_report_bw = sum(fparams['flt_bw'])
-        fio_report_iops = sum(fparams['flt_iops'])
-        agg_bw = agg_data(pinfo.raw_bw)
-        agg_iops = agg_data(pinfo.raw_iops)
-        log_bw_avg = average(agg_bw)
-        log_iops_avg = average(agg_iops)
-        # update values to match average from fio report
-        coef_iops = fio_report_iops / float(log_iops_avg)
-        coef_bw = fio_report_bw / float(log_bw_avg)
-        bw_log = data_property([val * coef_bw for val in agg_bw])
-        iops_log = data_property([val * coef_iops for val in agg_iops])
-        bw_report = data_property([fio_report_bw])
-        iops_report = data_property([fio_report_iops])
-        # When IOPS/BW per thread is too low
-        # data from logs is rounded to match
-        iops_per_th = sum(sum(pinfo.raw_iops, []), [])
-        if average(iops_per_th) > 10:
-            pinfo.iops = iops_log
-            pinfo.iops2 = iops_report
-        else:
-            pinfo.iops = iops_report
-            pinfo.iops2 = iops_log
-        bw_per_th = sum(sum(pinfo.raw_bw, []), [])
-        if average(bw_per_th) > 10:
-            pinfo.bw = bw_log
-            pinfo.bw2 = bw_report
-        else:
-            pinfo.bw = bw_report
-            pinfo.bw2 = bw_log
-        self._pinfo = pinfo
-        return pinfo
-class IOPerfTest(PerfTest):
-    tcp_conn_timeout = 30
-    max_pig_timeout = 5
+class IOPerfTest(ThreadedTest):
     soft_runcycle = 5 * 60
     retry_time = 30
+    configs_dir = os.path.dirname(__file__)  # type: str
-    zero_md5_hash = hashlib.md5()
-    zero_md5_hash.update(b"\x00" * 1024)
-    zero_md5 = zero_md5_hash.hexdigest()
-    def __init__(self, config):
-        PerfTest.__init__(self, config)
+    def __init__(self, *args, **kwargs) -> None:
+        super().__init__(*args, **kwargs)
         get = self.config.params.get
-        do_get = self.config.params.__getitem__
-        self.config_fname = do_get('cfg')
+        self.load_profile_name = self.config.params['load']  # type: str
+        self.name = "io." + self.load_profile_name
-        if '/' not in self.config_fname and '.' not in self.config_fname:
-            cfgs_dir = os.path.dirname(__file__)
-            self.config_fname = os.path.join(cfgs_dir,
-                                             self.config_fname + '.cfg')
-        self.alive_check_interval = get('alive_check_interval')
-        self.use_system_fio = get('use_system_fio', False)
-        if get('prefill_files') is not None:
-            logger.warning("prefill_files option is depricated. Use force_prefill instead")
-        self.force_prefill = get('force_prefill', False)
-        self.config_params = get('params', {}).copy()
-        self.io_py_remote = self.join_remote("agent.py")
-        self.results_file = self.join_remote("results.json")
-        self.pid_file = self.join_remote("pid")
-        self.task_file = self.join_remote("task.cfg")
-        self.sh_file = self.join_remote("cmd.sh")
-        self.err_out_file = self.join_remote("fio_err_out")
-        self.io_log_file = self.join_remote("io_log.txt")
-        self.exit_code_file = self.join_remote("exit_code")
-        self.max_latency = get("max_lat", None)
-        self.min_bw_per_thread = get("min_bw", None)
-        self.use_sudo = get("use_sudo", True)
-        self.raw_cfg = open(self.config_fname).read()
-        self.fio_configs = None
-    @classmethod
-    def load(cls, suite_name: str, folder: str) -> IOTestResults:
-        res = []
-        for fname in os.listdir(folder):
-            if re.match("\d+_params.yaml$", fname):
-                num = int(fname.split('_')[0])
-                res.append(load_test_results(folder, num))
-        return IOTestResults(suite_name, res, folder)
-    def cleanup(self):
-        # delete_file(conn, self.io_py_remote)
-        # Need to remove tempo files, used for testing
-        pass
-    # size is megabytes
-    def check_prefill_required(self, node: INode, fname: str, size: int, num_blocks: Optional[int]=16) -> bool:
-        try:
-            fstats = node.stat_file(fname)
-            if stat.S_ISREG(fstats.st_mode) and fstats.st_size < size * 1024 ** 2:
-                return True
-        except EnvironmentError:
-            return True
-        cmd = 'python -c "' + \
-              "import sys;" + \
-              "fd = open('{0}', 'rb');" + \
-              "fd.seek({1});" + \
-              "data = fd.read(1024); " + \
-              "sys.stdout.write(data + ' ' * ( 1024 - len(data)))\" | md5sum"
-        if self.use_sudo:
-            cmd = "sudo " + cmd
-        bsize = size * (1024 ** 2)
-        offsets = [random.randrange(bsize - 1024) for _ in range(num_blocks)]
-        offsets.append(bsize - 1024)
-        offsets.append(0)
-        for offset in offsets:
-            data = node.run(cmd.format(fname, offset), nolog=True)
-            md = ""
-            for line in data.split("\n"):
-                if "unable to resolve" not in line:
-                    md = line.split()[0].strip()
-                    break
-            if len(md) != 32:
-                logger.error("File data check is failed - " + data)
-                return True
-            if self.zero_md5 == md:
-                return True
-        return False
-    def prefill_test_files(self, node: INode, files: List[str], force:bool=False) -> None:
-        if self.use_system_fio:
-            cmd_templ = "fio "
+        if os.path.isfile(self.load_profile_name):
+            self.load_profile_path = os.path.join(self.configs_dir, self.load_profile_name+ '.cfg')  # type: str
-            cmd_templ = "{0}/fio ".format(self.config.remote_dir)
+            self.load_profile_path = self.load_profile_name
-        if self.use_sudo:
-            cmd_templ = "sudo " + cmd_templ
+        self.load_profile = open(self.load_profile_path, 'rt').read()  # type: str
-        cmd_templ += "--name=xxx --filename={0} --direct=1" + \
-                     " --bs=4m --size={1}m --rw=write"
-        ssize = 0
-        if force:
-            logger.info("File prefilling is forced")
-        ddtime = 0
-        for fname, curr_sz in files.items():
-            if not force:
-                if not self.check_prefill_required(node, fname, curr_sz):
-                    logger.debug("prefill is skipped")
-                    continue
-            logger.info("Prefilling file {0}".format(fname))
-            cmd = cmd_templ.format(fname, curr_sz)
-            ssize += curr_sz
-            stime = time.time()
-            node.run(cmd, timeout=curr_sz)
-            ddtime += time.time() - stime
-        if ddtime > 1.0:
-            fill_bw = int(ssize / ddtime)
-            mess = "Initiall fio fill bw is {0} MiBps for this vm"
-            logger.info(mess.format(fill_bw))
-    def install_utils(self, node: INode) -> None:
-        need_install = []
-        packs = [('screen', 'screen')]
-        os_info = get_os(node)
+        self.use_system_fio = get('use_system_fio', False)  # type: bool
         if self.use_system_fio:
-            packs.append(('fio', 'fio'))
+            self.fio_path = "fio"    # type: str
-            packs.append(('bzip2', 'bzip2'))
+            self.fio_path = os.path.join(self.config.remote_dir, "fio")
-        for bin_name, package in packs:
-            if bin_name is None:
-                need_install.append(package)
-                continue
+        self.force_prefill = get('force_prefill', False)  # type: bool
-            try:
-                node.run('which ' + bin_name, nolog=True)
-            except OSError:
-                need_install.append(package)
-        if len(need_install) != 0:
-            if 'redhat' == os_info.distro:
-                cmd = "sudo yum -y install " + " ".join(need_install)
-            else:
-                cmd = "sudo apt-get -y install " + " ".join(need_install)
-            try:
-                node.run(cmd)
-            except OSError as err:
-                raise OSError("Can't install - {}".format(" ".join(need_install))) from err
-        if not self.use_system_fio:
-            fio_dir = os.path.dirname(os.path.dirname(wally.__file__))
-            fio_dir = os.path.join(os.getcwd(), fio_dir)
-            fio_dir = os.path.join(fio_dir, 'fio_binaries')
-            fname = 'fio_{0.release}_{0.arch}.bz2'.format(os_info)
-            fio_path = os.path.join(fio_dir, fname)
-            if not os.path.exists(fio_path):
-                raise RuntimeError("No prebuild fio binary available for {0}".format(os_info))
-            bz_dest = self.join_remote('fio.bz2')
-            node.copy_file(fio_path, bz_dest)
-            node.run("bzip2 --decompress {}" + bz_dest, nolog=True)
-            node.run("chmod a+x " + self.join_remote("fio"), nolog=True)
-    def pre_run(self) -> None:
-        if 'FILESIZE' not in self.config_params:
+        if 'FILESIZE' not in self.config.params:
             raise NotImplementedError("File size detection is not implemented")
-        self.fio_configs = fio_cfg_compile(self.raw_cfg,
-                                           self.config_fname,
-                                           self.config_params)
-        self.fio_configs = list(self.fio_configs)
+        # self.max_latency = get("max_lat")  # type: Optional[int]
+        # self.min_bw_per_thread = get("min_bw")   # type: Optional[int]
-        files = {}
+        self.use_sudo = get("use_sudo", True)  # type: bool
+        self.fio_configs = list(fio_cfg_compile(self.load_profile,
+                                                self.load_profile_path,
+                                                cast(FioParams, self.config.params)))
+        if len(self.fio_configs) == 0:
+            logger.exception("Empty fio config provided")
+            raise StopTestError("Empty fio config provided")
+        self.iterations_configs = self.fio_configs  # type: ignore
+        self.files_sizes = self.get_file_sizes()
+        self.exec_folder = self.config.remote_dir
+        self.fio_path = "" if self.use_system_fio else self.exec_folder
+    def get_file_sizes(self) -> Dict[str, int]:
+        files_sizes = {}  # type: Dict[str, int]
         for section in self.fio_configs:
             sz = ssize2b(section.vals['size'])
-            msz = sz / (1024 ** 2)
-            if sz % (1024 ** 2) != 0:
-                msz += 1
-            fname = section.vals['filename']
+            msz = sz // (1024 ** 2) + (1 if sz % (1024 ** 2) != 0 else 0)
+            fname = section.vals['filename']  # type: str
             # if already has other test with the same file name
             # take largest size
-            files[fname] = max(files.get(fname, 0), msz)
+            files_sizes[fname] = max(files_sizes.get(fname, 0), msz)
-        with ThreadPoolExecutor(len(self.config.nodes)) as pool:
-            fc = functools.partial(self.pre_run_th,
-                                   files=files,
-                                   force=self.force_prefill)
-            list(pool.map(fc, self.config.nodes))
+        return files_sizes
-    def pre_run_th(self, node: INode, files: List[str], force_prefil: Optional[bool]=False) -> None:
+    def config_node(self, node: IRPCNode) -> None:
-            cmd = 'mkdir -p "{0}"'.format(self.config.remote_dir)
-            if self.use_sudo:
-                cmd = "sudo " + cmd
-                cmd += " ; sudo chown {0} {1}".format(node.get_user(),
-                                                      self.config.remote_dir)
-            node.run(cmd, nolog=True)
-            assert self.config.remote_dir != "" and self.config.remote_dir != "/"
-            node.run("rm -rf {}/*".format(self.config.remote_dir), nolog=True)
+            node.conn.rmdir(self.config.remote_dir, recursive=True, ignore_missing=True)
+            node.conn.mkdir(self.config.remote_dir)
         except Exception as exc:
-            msg = "Failed to create folder {} on remote {}."
-            msg = msg.format(self.config.remote_dir, node, exc)
+            msg = "Failed to create folder {} on remote {}.".format(self.config.remote_dir, node, exc)
             raise StopTestError(msg) from exc
-        self.prefill_test_files(node, files, force_prefil)
+        logger.info("Prefilling test files with random data")
+        fill_bw = node.conn.prefill_test_files(self.files_sizes, force=self.force_prefill, fio_path=self.fio_path)
+        if fill_bw is not None:
+            logger.info("Initial fio fill bw is {} MiBps for {}".format(fill_bw, node.info.node_id()))
-    def show_expected_execution_time(self) -> None:
-        if len(self.fio_configs) > 1:
-            # +10% - is a rough estimation for additional operations
-            # like sftp, etc
-            exec_time = int(sum(map(execution_time, self.fio_configs)) * 1.1)
-            exec_time_s = sec_to_str(exec_time)
-            now_dt = datetime.datetime.now()
-            end_dt = now_dt + datetime.timedelta(0, exec_time)
-            msg = "Entire test should takes aroud: {0} and finished at {1}"
-            logger.info(msg.format(exec_time_s,
-                                   end_dt.strftime("%H:%M:%S")))
-    def run(self) -> IOTestResults:
-        logger.debug("Run preparation")
-        self.pre_run()
-        self.show_expected_execution_time()
-        num_nodes = len(self.config.nodes)
-        tname = os.path.basename(self.config_fname)
-        if tname.endswith('.cfg'):
-            tname = tname[:-4]
-        barrier = Barrier(num_nodes)
-        results = []
-        # set of Operation_Mode_BlockSize str's
-        # which should not be tested anymore, as
-        # they already too slow with previous thread count
-        lat_bw_limit_reached = set()
-        with ThreadPoolExecutor(num_nodes) as pool:
-            for pos, fio_cfg in enumerate(self.fio_configs):
-                test_descr = get_test_summary(fio_cfg.vals, noqd=True)
-                if test_descr in lat_bw_limit_reached:
-                    continue
-                logger.info("Will run {} test".format(fio_cfg.name))
-                templ = "Test should takes about {}. Should finish at {}, will wait at most till {}"
-                exec_time = execution_time(fio_cfg)
-                exec_time_str = sec_to_str(exec_time)
-                timeout = int(exec_time + max(300, exec_time))
-                now_dt = datetime.datetime.now()
-                end_dt = now_dt + datetime.timedelta(0, exec_time)
-                wait_till = now_dt + datetime.timedelta(0, timeout)
-                logger.info(templ.format(exec_time_str,
-                                         end_dt.strftime("%H:%M:%S"),
-                                         wait_till.strftime("%H:%M:%S")))
-                run_test_func = functools.partial(self.do_run,
-                                                  barrier=barrier,
-                                                  fio_cfg=fio_cfg,
-                                                  pos=pos)
-                max_retr = 3
-                for idx in range(max_retr):
-                    if 0 != idx:
-                        logger.info("Sleeping %ss and retrying", self.retry_time)
-                        time.sleep(self.retry_time)
-                    try:
-                        intervals = list(pool.map(run_test_func, self.config.nodes))
-                        if None not in intervals:
-                            break
-                    except (EnvironmentError, SSHException) as exc:
-                        if max_retr - 1 == idx:
-                            raise StopTestError("Fio failed") from exc
-                        logger.exception("During fio run")
-                fname = "{}_task.fio".format(pos)
-                with open(os.path.join(self.config.log_directory, fname), "w") as fd:
-                    fd.write(str(fio_cfg))
-                params = {'vm_count': num_nodes}
-                params['name'] = fio_cfg.name
-                params['vals'] = dict(fio_cfg.vals.items())
-                params['intervals'] = intervals
-                params['nodes'] = [node.get_conn_id() for node in self.config.nodes]
-                fname = "{}_params.yaml".format(pos)
-                with open(os.path.join(self.config.log_directory, fname), "w") as fd:
-                    fd.write(dumps(params))
-                res = load_test_results(self.config.log_directory, pos)
-                results.append(res)
-                if self.max_latency is not None:
-                    lat_50, _ = res.get_lat_perc_50_95_multy()
-                    # conver us to ms
-                    if self.max_latency < lat_50:
-                        logger.info(("Will skip all subsequent tests of {} " +
-                                     "due to lat/bw limits").format(fio_cfg.name))
-                        lat_bw_limit_reached.add(test_descr)
-                test_res = res.get_params_from_fio_report()
-                if self.min_bw_per_thread is not None:
-                    if self.min_bw_per_thread > average(test_res['bw']):
-                        lat_bw_limit_reached.add(test_descr)
-        return IOTestResults(self.config.params['cfg'],
-                             results, self.config.log_directory)
-    def do_run(self, node: INode, barrier: Barrier, fio_cfg, pos: int, nolog: bool=False):
-        exec_folder = self.config.remote_dir
+    def install_utils(self, node: IRPCNode) -> None:
         if self.use_system_fio:
-            fio_path = ""
-        else:
-            if not exec_folder.endswith("/"):
-                fio_path = exec_folder + "/"
-            else:
-                fio_path = exec_folder
+            node.conn.install('fio', binary='fio')
-        exec_time = execution_time(fio_cfg)
-        barrier.wait()
-        run_data = node.rpc.fio.run_fio(self.use_sudo,
-                                        fio_path,
-                                        exec_folder,
-                                        str(fio_cfg),
+        if not self.use_system_fio:
+            os_info = get_os(node)
+            fio_dir = os.path.dirname(os.path.dirname(wally.__file__))  # type: str
+            fio_dir = os.path.join(os.getcwd(), fio_dir)
+            fio_dir = os.path.join(fio_dir, 'fio_binaries')
+            fname = 'fio_{0.release}_{0.arch}.bz2'.format(os_info)
+            fio_path = os.path.join(fio_dir, fname)  # type: str
+            if not os.path.exists(fio_path):
+                raise RuntimeError("No prebuild fio binary available for {0}".format(os_info))
+            bz_dest = self.join_remote('fio.bz2')  # type: str
+            node.copy_file(fio_path, bz_dest)
+            node.run("bzip2 --decompress {}" + bz_dest)
+            node.run("chmod a+x " + self.join_remote("fio"))
+    def get_expected_runtime(self, iteration_info: IterationConfig) -> int:
+        return execution_time(cast(FioJobSection, iteration_info))
+    def do_test(self, node: IRPCNode, iter_config: IterationConfig) -> RunTestRes:
+        exec_time = execution_time(cast(FioJobSection, iter_config))
+        raw_res = node.conn.fio.run_fio(self.fio_path,
+                                        self.exec_folder,
+                                        str(cast(FioJobSection, iter_config)),
                                         exec_time + max(300, exec_time))
-        return parse_fio_result(run_data)
+        # TODO(koder): fix next error
+        raise NotImplementedError("Need to extract time from test result")
+        return raw_res, (0, 0)
-    @classmethod
-    def prepare_data(cls, results) -> List[Dict[str, Any]]:
-        """create a table with io performance report for console"""
-        def key_func(data: FioRunResult) -> Tuple[str, str, str, str, int]:
-            tpl = data.summary_tpl()
-            return (data.name,
-                    tpl.oper,
-                    tpl.mode,
-                    ssize2b(tpl.bsize),
-                    int(tpl.th_count) * int(tpl.vm_count))
-        res = []
-        for item in sorted(results, key=key_func):
-            test_dinfo = item.disk_perf_info()
-            testnodes_count = len(item.config.nodes)
-            iops, _ = test_dinfo.iops.rounded_average_conf()
-            if test_dinfo.iops_sys is not None:
-                iops_sys, iops_sys_conf = test_dinfo.iops_sys.rounded_average_conf()
-                _, iops_sys_dev = test_dinfo.iops_sys.rounded_average_dev()
-                iops_sys_per_vm = round_3_digit(iops_sys / testnodes_count)
-                iops_sys = round_3_digit(iops_sys)
-            else:
-                iops_sys = None
-                iops_sys_per_vm = None
-                iops_sys_dev = None
-                iops_sys_conf = None
-            bw, bw_conf = test_dinfo.bw.rounded_average_conf()
-            _, bw_dev = test_dinfo.bw.rounded_average_dev()
-            conf_perc = int(round(bw_conf * 100 / bw))
-            dev_perc = int(round(bw_dev * 100 / bw))
-            lat_50 = round_3_digit(int(test_dinfo.lat_50))
-            lat_95 = round_3_digit(int(test_dinfo.lat_95))
-            lat_avg = round_3_digit(int(test_dinfo.lat_avg))
-            iops_per_vm = round_3_digit(iops / testnodes_count)
-            bw_per_vm = round_3_digit(bw / testnodes_count)
-            iops = round_3_digit(iops)
-            bw = round_3_digit(bw)
-            summ = "{0.oper}{0.mode} {0.bsize:>4} {0.th_count:>3}th {0.vm_count:>2}vm".format(item.summary_tpl())
-            res.append({"name": key_func(item)[0],
-                        "key": key_func(item)[:4],
-                        "summ": summ,
-                        "iops": int(iops),
-                        "bw": int(bw),
-                        "conf": str(conf_perc),
-                        "dev": str(dev_perc),
-                        "iops_per_vm": int(iops_per_vm),
-                        "bw_per_vm": int(bw_per_vm),
-                        "lat_50": lat_50,
-                        "lat_95": lat_95,
-                        "lat_avg": lat_avg,
-                        "iops_sys": iops_sys,
-                        "iops_sys_per_vm": iops_sys_per_vm,
-                        "sys_conf": iops_sys_conf,
-                        "sys_dev": iops_sys_dev})
-        return res
-    Field = collections.namedtuple("Field", ("header", "attr", "allign", "size"))
-    fiels_and_header = [
-        Field("Name",           "name",        "l",  7),
-        Field("Description",    "summ",        "l", 19),
-        Field("IOPS\ncum",      "iops",        "r",  3),
-        # Field("IOPS_sys\ncum",  "iops_sys",    "r",  3),
-        Field("KiBps\ncum",     "bw",          "r",  6),
-        Field("Cnf %\n95%",     "conf",        "r",  3),
-        Field("Dev%",           "dev",         "r",  3),
-        Field("iops\n/vm",      "iops_per_vm", "r",  3),
-        Field("KiBps\n/vm",     "bw_per_vm",   "r",  6),
-        Field("lat ms\nmedian", "lat_50",      "r",  3),
-        Field("lat ms\n95%",    "lat_95",      "r",  3),
-        Field("lat\navg",       "lat_avg",     "r",  3),
-    ]
-    fiels_and_header_dct = dict((item.attr, item) for item in fiels_and_header)
-    @classmethod
-    def format_for_console(cls, results) -> str:
-        """create a table with io performance report for console"""
-        tab = texttable.Texttable(max_width=120)
-        tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
-        tab.set_cols_align([f.allign for f in cls.fiels_and_header])
-        sep = ["-" * f.size for f in cls.fiels_and_header]
-        tab.header([f.header for f in cls.fiels_and_header])
-        prev_k = None
-        for item in cls.prepare_data(results):
-            if prev_k is not None:
-                if prev_k != item["key"]:
-                    tab.add_row(sep)
-            prev_k = item["key"]
-            tab.add_row([item[f.attr] for f in cls.fiels_and_header])
-        return tab.draw()
-    @classmethod
-    def format_diff_for_console(cls, list_of_results: List[Any]) -> str:
-        """create a table with io performance report for console"""
-        tab = texttable.Texttable(max_width=200)
-        tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
-        header = [
-            cls.fiels_and_header_dct["name"].header,
-            cls.fiels_and_header_dct["summ"].header,
-        ]
-        allign = ["l", "l"]
-        header.append("IOPS ~ Cnf% ~ Dev%")
-        allign.extend(["r"] * len(list_of_results))
-        header.extend(
-            "IOPS_{0} %".format(i + 2) for i in range(len(list_of_results[1:]))
-        )
-        header.append("BW")
-        allign.extend(["r"] * len(list_of_results))
-        header.extend(
-            "BW_{0} %".format(i + 2) for i in range(len(list_of_results[1:]))
-        )
-        header.append("LAT")
-        allign.extend(["r"] * len(list_of_results))
-        header.extend(
-            "LAT_{0}".format(i + 2) for i in range(len(list_of_results[1:]))
-        )
-        tab.header(header)
-        sep = ["-" * 3] * len(header)
-        processed_results = map(cls.prepare_data, list_of_results)
-        key2results = []
-        for res in processed_results:
-            key2results.append(dict(
-                ((item["name"], item["summ"]), item) for item in res
-            ))
-        prev_k = None
-        iops_frmt = "{0[iops]} ~ {0[conf]:>2} ~ {0[dev]:>2}"
-        for item in processed_results[0]:
-            if prev_k is not None:
-                if prev_k != item["key"]:
-                    tab.add_row(sep)
-            prev_k = item["key"]
-            key = (item['name'], item['summ'])
-            line = list(key)
-            base = key2results[0][key]
-            line.append(iops_frmt.format(base))
-            for test_results in key2results[1:]:
-                val = test_results.get(key)
-                if val is None:
-                    line.append("-")
-                elif base['iops'] == 0:
-                    line.append("Nan")
-                else:
-                    prc_val = {'dev': val['dev'], 'conf': val['conf']}
-                    prc_val['iops'] = int(100 * val['iops'] / base['iops'])
-                    line.append(iops_frmt.format(prc_val))
-            line.append(base['bw'])
-            for test_results in key2results[1:]:
-                val = test_results.get(key)
-                if val is None:
-                    line.append("-")
-                elif base['bw'] == 0:
-                    line.append("Nan")
-                else:
-                    line.append(int(100 * val['bw'] / base['bw']))
-            for test_results in key2results:
-                val = test_results.get(key)
-                if val is None:
-                    line.append("-")
-                else:
-                    line.append("{0[lat_50]} - {0[lat_95]}".format(val))
-            tab.add_row(line)
-        tab.set_cols_align(allign)
-        return tab.draw()
diff --git a/wally/suits/io/fio_task_parser.py b/wally/suits/io/fio_task_parser.py
index 1b6ba21..1bdbb15 100644
--- a/wally/suits/io/fio_task_parser.py
+++ b/wally/suits/io/fio_task_parser.py
@@ -7,10 +7,11 @@
 import os.path
 import argparse
 import itertools
-from typing import Optional, Iterator, Union, Dict, Iterable, List, TypeVar, Callable, Tuple
-from collections import OrderedDict, namedtuple
+from typing import Optional, Iterator, Union, Dict, Iterable, List, TypeVar, Callable, Tuple, NamedTuple, Any
+from collections import OrderedDict
+from ..itest import IterationConfig
 from ...utils import sec_to_str, ssize2b
@@ -19,15 +20,27 @@
-Var = namedtuple('Var', ('name',))
-CfgLine = namedtuple('CfgLine', ('fname', 'lineno', 'oline',
-                                 'tp', 'name', 'val'))
+Var = NamedTuple('Var', [('name', str)])
+CfgLine = NamedTuple('CfgLine',
+                     [('fname', str),
+                      ('lineno', int),
+                      ('oline', str),
+                      ('tp', int),
+                      ('name', str),
+                      ('val', Any)])
+TestSumm = NamedTuple("TestSumm",
+                      [("oper", str),
+                       ("mode", str),
+                       ("bsize", int),
+                       ("iodepth", int),
+                       ("vm_count", int)])
-class FioJobSection:
-    def __init__(self, name: str):
+class FioJobSection(IterationConfig):
+    def __init__(self, name: str) -> None:
         self.name = name
-        self.vals = OrderedDict()
+        self.vals = OrderedDict()  # type: Dict[str, Any]
     def copy(self) -> 'FioJobSection':
         return copy.deepcopy(self)
@@ -40,7 +53,7 @@
     def is_free(self) -> bool:
         return len(list(self.required_vars())) == 0
-    def __str__(self):
+    def __str__(self) -> str:
         res = "[{0}]\n".format(self.name)
         for name, val in self.vals.items():
@@ -55,13 +68,13 @@
 class ParseError(ValueError):
-    def __init__(self, msg: str, fname: str, lineno: int, line_cont:Optional[str] =""):
+    def __init__(self, msg: str, fname: str, lineno: int, line_cont:Optional[str] = "") -> None:
         ValueError.__init__(self, msg)
         self.file_name = fname
         self.lineno = lineno
         self.line_cont = line_cont
-    def __str__(self):
+    def __str__(self) -> str:
         msg = "In {0}:{1} ({2}) : {3}"
         return msg.format(self.file_name,
@@ -70,10 +83,10 @@
 def is_name(name: str) -> bool:
-    return re.match("[a-zA-Z_][a-zA-Z_0-9]*", name)
+    return re.match("[a-zA-Z_][a-zA-Z_0-9]*", name) is not None
-def parse_value(val: str) -> Union[int, str, Dict, Var]:
+def parse_value(val: str) -> Union[int, str, float, List, Var]:
         return int(val)
     except ValueError:
@@ -88,12 +101,13 @@
         assert val.endswith("%}")
         content = val[2:-2]
         vals = list(i.strip() for i in content.split(','))
-        return map(parse_value, vals)
+        return list(map(parse_value, vals))
     if val.startswith('{'):
         assert val.endswith("}")
         assert is_name(val[1:-1])
         return Var(val[1:-1])
     return val
@@ -133,15 +147,15 @@
 def fio_config_parse(lexer_iter: Iterable[CfgLine]) -> Iterator[FioJobSection]:
     in_globals = False
     curr_section = None
-    glob_vals = OrderedDict()
+    glob_vals = OrderedDict()  # type: Dict[str, Any]
     sections_count = 0
-    lexed_lines = list(lexer_iter)
+    lexed_lines = list(lexer_iter)  # type: List[CfgLine]
     one_more = True
     includes = {}
     while one_more:
-        new_lines = []
+        new_lines = []  # type: List[CfgLine]
         one_more = False
         for line in lexed_lines:
             fname, lineno, oline, tp, name, val = line
@@ -205,7 +219,7 @@
 def process_cycles(sec: FioJobSection) -> Iterator[FioJobSection]:
-    cycles = OrderedDict()
+    cycles = OrderedDict()  # type: Dict[str, Any]
     for name, val in sec.vals.items():
         if isinstance(val, list) and name.upper() != name:
@@ -214,11 +228,11 @@
     if len(cycles) == 0:
         yield sec
-        # qd should changes faster
-        numjobs = cycles.pop('qd', None)
-        items = cycles.items()
+        # iodepth should changes faster
+        numjobs = cycles.pop('iodepth', None)
+        items = list(cycles.items())
-        if len(items) > 0:
+        if items:
             keys, vals = zip(*items)
             keys = list(keys)
             vals = list(vals)
@@ -228,7 +242,7 @@
         if numjobs is not None:
-            keys.append('qd')
+            keys.append('iodepth')
         for combination in itertools.product(*vals):
             new_sec = sec.copy()
@@ -236,12 +250,12 @@
             yield new_sec
-FIO_PARAM_VAL = Union[str, Var]
+FioParamsVal = Union[str, Var]
+FioParams = Dict[str, FioParamsVal]
-def apply_params(sec: FioJobSection, params: FIO_PARAMS) -> FioJobSection:
-    processed_vals = OrderedDict()
+def apply_params(sec: FioJobSection, params: FioParams) -> FioJobSection:
+    processed_vals = OrderedDict()  # type: Dict[str, Any]
     for name, val in sec.vals.items():
         if name in params:
@@ -329,9 +343,6 @@
         return 'a'
-TestSumm = namedtuple("TestSumm", ("oper", "mode", "bsize", "iodepth", "vm_count"))
 def get_test_summary_tuple(sec: FioJobSection, vm_count: int = None) -> TestSumm:
     if isinstance(sec, dict):
         vals = sec
@@ -355,12 +366,12 @@
-def get_test_summary(sec: FioJobSection, vm_count: int = None, noqd: bool = False) -> str:
+def get_test_summary(sec: FioJobSection, vm_count: int = None, noiodepth: bool = False) -> str:
     tpl = get_test_summary_tuple(sec, vm_count)
     res = "{0.oper}{0.mode}{0.bsize}".format(tpl)
-    if not noqd:
-        res += "qd{}".format(tpl.qd)
+    if not noiodepth:
+        res += "qd{}".format(tpl.iodepth)
     if tpl.vm_count is not None:
         res += "vm{}".format(tpl.vm_count)
@@ -387,7 +398,7 @@
             yield res
-def fio_cfg_compile(source: str, fname: str, test_params: FIO_PARAMS) -> Iterator[FioJobSection]:
+def fio_cfg_compile(source: str, fname: str, test_params: FioParams) -> Iterator[FioJobSection]:
     it = parse_all_in_1(source, fname)
     it = (apply_params(sec, test_params) for sec in it)
     it = flatmap(process_cycles, it)
diff --git a/wally/suits/io/rpc_plugin.py b/wally/suits/io/rpc_plugin.py
index ca3f0f3..8e2e09f 100644
--- a/wally/suits/io/rpc_plugin.py
+++ b/wally/suits/io/rpc_plugin.py
@@ -1,7 +1,20 @@
+import os
+import time
+import stat
+import random
+import subprocess
 def rpc_run_fio(cfg):
     fio_cmd_templ = "cd {exec_folder}; {fio_path}fio --output-format=json " + \
                     "--output={out_file} --alloc-size=262144 {job_file}"
+    result = {
+        "name": [float],
+        "lat_name": [[float]]
+    }
+    return result
     # fnames_before = node.run("ls -1 " + exec_folder, nolog=True)
     # timeout = int(exec_time + max(300, exec_time))
@@ -11,5 +24,66 @@
     # fnames_after = node.run("ls -1 " + exec_folder, nolog=True)
-def parse_fio_result(data):
-    pass
+def rpc_check_file_prefilled(path, used_size_mb):
+    used_size = used_size_mb * 1024 ** 2
+    blocks_to_check = 16
+    try:
+        fstats = os.stat(path)
+        if stat.S_ISREG(fstats.st_mode) and fstats.st_size < used_size:
+            return True
+    except EnvironmentError:
+        return True
+    offsets = [random.randrange(used_size - 1024) for _ in range(blocks_to_check)]
+    offsets.append(used_size - 1024)
+    offsets.append(0)
+    with open(path, 'rb') as fd:
+        for offset in offsets:
+            fd.seek(offset)
+            if b"\x00" * 1024 == fd.read(1024):
+                return True
+    return False
+def rpc_prefill_test_files(files, force=False, fio_path='fio'):
+    cmd_templ = "{0} --name=xxx --filename={1} --direct=1" + \
+                " --bs=4m --size={2}m --rw=write"
+    ssize = 0
+    ddtime = 0.0
+    for fname, curr_sz in files.items():
+        if not force:
+            if not rpc_check_file_prefilled(fname, curr_sz):
+                continue
+        cmd = cmd_templ.format(fio_path, fname, curr_sz)
+        ssize += curr_sz
+        stime = time.time()
+        subprocess.check_call(cmd)
+        ddtime += time.time() - stime
+    if ddtime > 1.0:
+        return int(ssize / ddtime)
+    return None
+def load_fio_log_file(fname):
+    with open(fname) as fd:
+        it = [ln.split(',')[:2] for ln in fd]
+    return [(float(off) / 1000,  # convert us to ms
+            float(val.strip()) + 0.5)  # add 0.5 to compemsate average value
+                                       # as fio trimm all values in log to integer
+            for off, val in it]
diff --git a/wally/suits/io/rpc_plugin.pyi b/wally/suits/io/rpc_plugin.pyi
new file mode 100644
index 0000000..1155007
--- /dev/null
+++ b/wally/suits/io/rpc_plugin.pyi
@@ -0,0 +1,8 @@
+from typing import Any, Optional, Dict, List
+def rpc_run_fio(cfg: Dict[str, str]) -> Any: ...
+def rpc_check_file_prefilled(path: str, used_size_mb: int) -> bool: ...
+def rpc_prefill_test_files(files: Dict[str, int], force: bool = False, fio_path: str = 'fio') -> Optional[int]: ...
+def load_fio_log_file(fname: str) -> List[float]: ...
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index 7004b8e..6d1eeee 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -2,38 +2,43 @@
 import time
 import logging
 import os.path
-import functools
-from typing import Dict, Any, List, Tuple
+import datetime
+from typing import Dict, Any, List, Optional, Tuple, cast
 from concurrent.futures import ThreadPoolExecutor
-from ..utils import Barrier, StopTestError
-from ..statistic import data_property
-from ..inode import INode
+from ..utils import Barrier, StopTestError, sec_to_str
+from ..node_interfaces import IRPCNode
 from ..storage import Storage
+from ..result_classes import RawTestResults
+import agent
 logger = logging.getLogger("wally")
-class TestConfig:
+__doc__ = "Contains base classes for performance tests"
+class TestInputConfig:
     this class describe test input configuration
-    test_type:str - test type name
-    params:{str:Any} - parameters from yaml file for this test
-    test_uuid:str - UUID to be used to create filenames and Co
-    log_directory:str - local directory to store results
-    nodes:[Node] - node to run tests on
-    remote_dir:str - directory on nodes to be used for local files
+    test_type - test type name
+    params - parameters from yaml file for this test
+    test_uuid - UUID to be used to create file names & Co
+    log_directory - local directory to store results
+    nodes - nodes to run tests on
+    remote_dir - directory on nodes to be used for local files
     def __init__(self,
                  test_type: str,
                  params: Dict[str, Any],
                  run_uuid: str,
-                 nodes: List[INode],
+                 nodes: List[IRPCNode],
                  storage: Storage,
-                 remote_dir: str):
+                 remote_dir: str) -> None:
         self.test_type = test_type
         self.params = params
         self.run_uuid = run_uuid
@@ -42,150 +47,21 @@
         self.remote_dir = remote_dir
-class TestResults:
-    """
-    this class describe test results
-    config:TestConfig - test config object
-    params:dict - parameters from yaml file for this test
-    results:{str:MeasurementMesh} - test results object
-    raw_result:Any - opaque object to store raw results
-    run_interval:(float, float) - test tun time, used for sensors
-    """
-    def __init__(self,
-                 config: TestConfig,
-                 results: Dict[str, Any],
-                 raw_result: Any,
-                 run_interval: Tuple[float, float]):
-        self.config = config
-        self.params = config.params
-        self.results = results
-        self.raw_result = raw_result
-        self.run_interval = run_interval
-    def __str__(self):
-        res = "{0}({1}):\n    results:\n".format(
-                    self.__class__.__name__,
-                    self.summary())
-        for name, val in self.results.items():
-            res += "        {0}={1}\n".format(name, val)
-        res += "    params:\n"
-        for name, val in self.params.items():
-            res += "        {0}={1}\n".format(name, val)
-        return res
-    @abc.abstractmethod
-    def summary(self):
-        pass
-    @abc.abstractmethod
-    def get_yamable(self):
-        pass
-# class MeasurementMatrix:
-#     """
-#     data:[[MeasurementResult]] - VM_COUNT x TH_COUNT matrix of MeasurementResult
-#     """
-#     def __init__(self, data, connections_ids):
-#         self.data = data
-#         self.connections_ids = connections_ids
-#     def per_vm(self):
-#         return self.data
-#     def per_th(self):
-#         return sum(self.data, [])
-class MeasurementResults:
-    def stat(self):
-        return data_property(self.data)
-    def __str__(self):
-        return 'TS([' + ", ".join(map(str, self.data)) + '])'
-class SimpleVals(MeasurementResults):
-    """
-    data:[float] - list of values
-    """
-    def __init__(self, data):
-        self.data = data
-class TimeSeriesValue(MeasurementResults):
-    """
-    data:[(float, float, float)] - list of (start_time, lenght, average_value_for_interval)
-    odata: original values
-    """
-    def __init__(self, data: List[Tuple[float, float, float]]):
-        assert len(data) > 0
-        self.odata = data[:]
-        self.data = []
-        cstart = 0
-        for nstart, nval in data:
-            self.data.append((cstart, nstart - cstart, nval))
-            cstart = nstart
-    @property
-    def values(self) -> List[float]:
-        return [val[2] for val in self.data]
-    def average_interval(self) -> float:
-        return float(sum([val[1] for val in self.data])) / len(self.data)
-    def skip(self, seconds) -> 'TimeSeriesValue':
-        nres = []
-        for start, ln, val in self.data:
-            nstart = start + ln - seconds
-            if nstart > 0:
-                nres.append([nstart, val])
-        return self.__class__(nres)
-    def derived(self, tdelta) -> 'TimeSeriesValue':
-        end = self.data[-1][0] + self.data[-1][1]
-        tdelta = float(tdelta)
-        ln = end / tdelta
-        if ln - int(ln) > 0:
-            ln += 1
-        res = [[tdelta * i, 0.0] for i in range(int(ln))]
-        for start, lenght, val in self.data:
-            start_idx = int(start / tdelta)
-            end_idx = int((start + lenght) / tdelta)
-            for idx in range(start_idx, end_idx + 1):
-                rstart = tdelta * idx
-                rend = tdelta * (idx + 1)
-                intersection_ln = min(rend, start + lenght) - max(start, rstart)
-                if intersection_ln > 0:
-                    try:
-                        res[idx][1] += val * intersection_ln / tdelta
-                    except IndexError:
-                        raise
-        return self.__class__(res)
+class IterationConfig:
+    name = None  # type: str
 class PerfTest:
-    """
-    Very base class for tests
-    config:TestConfig - test configuration
-    stop_requested:bool - stop for test requested
-    """
-    def __init__(self, config):
+    """Base class for all tests"""
+    name = None  # type: str
+    max_retry = 3
+    retry_time = 30
+    def __init__(self, config: TestInputConfig) -> None:
         self.config = config
         self.stop_requested = False
+        self.nodes = self.config.nodes  # type: List[IRPCNode]
+        self.sorted_nodes_ids = sorted(node.info.node_id() for node in self.nodes)
     def request_stop(self) -> None:
         self.stop_requested = True
@@ -193,13 +69,8 @@
     def join_remote(self, path: str) -> str:
         return os.path.join(self.config.remote_dir, path)
-    @classmethod
-    def load(cls, path: str):
-        pass
-    @abc.abstractmethod
-    def run(self):
+    def run(self, storage: Storage) -> None:
@@ -207,69 +78,182 @@
-class ThreadedTest(PerfTest):
-    """
-    Base class for tests, which spawn separated thread for each node
-    """
+RunTestRes = Tuple[RawTestResults, Tuple[int, int]]
-    def run(self) -> List[TestResults]:
-        barrier = Barrier(len(self.config.nodes))
-        th_test_func = functools.partial(self.th_test_func, barrier)
-        with ThreadPoolExecutor(len(self.config.nodes)) as pool:
-            return list(pool.map(th_test_func, self.config.nodes))
+class ThreadedTest(PerfTest, metaclass=abc.ABCMeta):
+    """Base class for tests, which spawn separated thread for each node"""
+    # max allowed time difference between starts and stops of run of the same test on different test nodes
+    # used_max_diff = max((min_run_time * max_rel_time_diff), max_time_diff)
+    max_time_diff = 5
+    max_rel_time_diff = 0.05
+    def __init__(self, config: TestInputConfig) -> None:
+        PerfTest.__init__(self, config)
+        self.iterations_configs = [None]  # type: List[Optional[IterationConfig]]
-    def do_test(self, node: INode) -> TestResults:
+    def get_expected_runtime(self, iter_cfg: IterationConfig) -> Optional[int]:
-    def th_test_func(self, barrier: Barrier, node: INode) -> TestResults:
-        test_name = self.__class__.__name__
-        logger.debug("Starting {} test on {}".format(test_name , node))
-        logger.debug("Run test preparation on {}".format(node))
-        self.pre_run(node)
+    def get_not_done_stages(self, storage: Storage) -> Dict[int, IterationConfig]:
+        start_run_id = max(int(name) for _, name in storage.list('result')) + 1
+        not_in_storage = {}  # type: Dict[int, IterationConfig]
+        for run_id, iteration_config in enumerate(self.iterations_configs, start_run_id):
+            info_path = "result/{}/info".format(run_id)
+            if info_path in storage:
+                info = cast(Dict[str, Any], storage[info_path]) # type: Dict[str, Any]
-        # wait till all thread became ready
-        barrier.wait()
+                assert isinstance(info, dict), \
+                    "Broken storage at path {}. Expect test info dict, obtain {!r}".format(info_path, info)
-        logger.debug("Run test on {}".format(node))
-        try:
-            return self.do_test(node)
-        except Exception as exc:
-            msg = "In test {} for {}".format(test_name, node)
-            logger.exception(msg)
-            raise StopTestError(msg) from exc
+                info = info.copy()
+                del info['begin_time']
+                del info['end_time']
-    def pre_run(self, node: INode) -> None:
+                iter_name = "Unnamed" if iteration_config is None else iteration_config.name
+                expected_config = {
+                    'name': self.name,
+                    'iteration_name': iter_name,
+                    'iteration_config': iteration_config,
+                    'params': self.config.params,
+                    'nodes': self.sorted_nodes_ids
+                }
+                assert info == expected_config, \
+                    "Test info at path {} is not equal to expected config." + \
+                    "Maybe configuration was changed before test was restarted. " + \
+                    "Current cfg is {!r}, expected cfg is {!r}".format(info_path, info, expected_config)
+                logger.info("Test iteration {} found in storage and will be skipped".format(iter_name))
+            else:
+                not_in_storage[run_id] = iteration_config
+        return not_in_storage
+    def run(self, storage: Storage) -> None:
+        not_in_storage = self.get_not_done_stages(storage)
+        if not not_in_storage:
+            logger.info("All test iteration in storage already. Skip test")
+            return
+        logger.debug("Run test {} on nodes {}.".format(self.name, ",".join(self.sorted_nodes_ids)))
+        barrier = Barrier(len(self.nodes))
+        logger.debug("Run preparation")
+        with ThreadPoolExecutor(len(self.nodes)) as pool:
+            list(pool.map(self.config_node, self.nodes))
+            # +5% - is a rough estimation for additional operations
+            run_times = [self.get_expected_runtime(iteration_config) for iteration_config in not_in_storage.values()]
+            if None not in run_times:
+                expected_run_time = int(sum(run_times) * 1.05)
+                exec_time_s = sec_to_str(expected_run_time)
+                now_dt = datetime.datetime.now()
+                end_dt = now_dt + datetime.timedelta(0, expected_run_time)
+                logger.info("Entire test should takes aroud: {} and finished at {:%H:%M:%S}"
+                            .format(exec_time_s, end_dt))
+            for run_id, iteration_config in sorted(not_in_storage.items()):
+                iter_name = "Unnamed" if iteration_config is None else iteration_config.name
+                logger.info("Run test iteration {} ".format(iter_name))
+                results = []  # type: List[RunTestRes]
+                for idx in range(self.max_retry):
+                    barrier.wait()
+                    try:
+                        futures = [pool.submit(self.do_test, node, iteration_config) for node in self.nodes]
+                        results = [fut.result() for fut in futures]
+                    except (EnvironmentError, agent.RPCError) as exc:
+                        if self.max_retry - 1 == idx:
+                            raise StopTestError("Fio failed") from exc
+                        logger.exception("During fio run")
+                    else:
+                        if all(results):
+                            break
+                    logger.info("Sleeping %ss and retrying", self.retry_time)
+                    time.sleep(self.retry_time)
+                start_times = []  # type: List[int]
+                stop_times = []  # type: List[int]
+                for (result, (t_start, t_stop)), node in zip(results, self.config.nodes):
+                    for metrics_name, data in result.items():
+                        path = "result/{}/measurement/{}/{}".format(run_id, node.info.node_id(), metrics_name)
+                        storage[path] = data  # type: ignore
+                    start_times.append(t_start)
+                    stop_times.append(t_stop)
+                min_start_time = min(start_times)
+                max_start_time = max(start_times)
+                min_stop_time = min(stop_times)
+                max_stop_time = max(stop_times)
+                max_allowed_time_diff = int((min_stop_time - max_start_time) * self.max_rel_time_diff)
+                max_allowed_time_diff = max(max_allowed_time_diff, self.max_time_diff)
+                if min_start_time + self.max_time_diff < max_allowed_time_diff:
+                    logger.warning("Too large difference in {}:{} start time - {}. Max recommended difference is {}"
+                                   .format(self.name, iter_name, max_start_time - min_start_time, self.max_time_diff))
+                if min_stop_time + self.max_time_diff < max_allowed_time_diff:
+                    logger.warning("Too large difference in {}:{} stop time - {}. Max recommended difference is {}"
+                                   .format(self.name, iter_name, max_start_time - min_start_time, self.max_time_diff))
+                test_config = {
+                    'name': self.name,
+                    'iteration_name': iter_name,
+                    'iteration_config': iteration_config,
+                    'params': self.config.params,
+                    'nodes': self.sorted_nodes_ids,
+                    'begin_time': min_start_time,
+                    'end_time': max_stop_time
+                }
+                storage["result/{}/info".format(run_id)] = test_config  # type: ignore
+    @abc.abstractmethod
+    def config_node(self, node: IRPCNode) -> None:
+        pass
+    @abc.abstractmethod
+    def do_test(self, node: IRPCNode, iter_config: IterationConfig) -> RunTestRes:
-class TwoScriptTest(ThreadedTest):
-    def __init__(self, *dt, **mp):
+class TwoScriptTest(ThreadedTest, metaclass=abc.ABCMeta):
+    def __init__(self, *dt, **mp) -> None:
         ThreadedTest.__init__(self, *dt, **mp)
-        self.remote_dir = '/tmp'
         self.prerun_script = self.config.params['prerun_script']
         self.run_script = self.config.params['run_script']
         self.prerun_tout = self.config.params.get('prerun_tout', 3600)
         self.run_tout = self.config.params.get('run_tout', 3600)
+        self.iterations_configs = [None]
-    def get_remote_for_script(self, script: str) -> str:
-        return os.path.join(self.remote_dir, os.path.basename(script))
+    def get_expected_runtime(self, iter_cfg: IterationConfig) -> Optional[int]:
+        return None
-    def pre_run(self, node: INode) -> None:
-        copy_paths(node.connection,
-                   {self.run_script: self.get_remote_for_script(self.run_script),
-                    self.prerun_script: self.get_remote_for_script(self.prerun_script)})
+    def config_node(self, node: IRPCNode) -> None:
+        node.copy_file(self.run_script, self.join_remote(self.run_script))
+        node.copy_file(self.prerun_script, self.join_remote(self.prerun_script))
-        cmd = self.get_remote_for_script(self.prerun_script)
+        cmd = self.join_remote(self.prerun_script)
         cmd += ' ' + self.config.params.get('prerun_opts', '')
         node.run(cmd, timeout=self.prerun_tout)
-    def do_test(self, node: INode) -> TestResults:
-        cmd = self.get_remote_for_script(self.run_script)
+    def do_test(self, node: IRPCNode, iter_config: IterationConfig) -> RunTestRes:
+        cmd = self.join_remote(self.run_script)
         cmd += ' ' + self.config.params.get('run_opts', '')
         t1 = time.time()
-        res = node.run(cmd, timeout=self.run_tout)
+        res = self.parse_results(node.run(cmd, timeout=self.run_tout))
         t2 = time.time()
-        return TestResults(self.config, None, res, (t1, t2))
+        return res, (int(t1), int(t2))
+    @abc.abstractmethod
+    def parse_results(self, data: str) -> RawTestResults:
+        pass
diff --git a/wally/test_run_class.py b/wally/test_run_class.py
index ecdfa4f..db41a46 100644
--- a/wally/test_run_class.py
+++ b/wally/test_run_class.py
@@ -1,4 +1,4 @@
-from typing import List, Callable, Any, Dict, Optional
+from typing import List, Callable, Any, Dict, Optional, Set
 from concurrent.futures import ThreadPoolExecutor
@@ -7,11 +7,12 @@
 from .start_vms import OSCreds, OSConnection
 from .storage import Storage
 from .config import Config
+from .fuel_rest_api import Connection
 class TestRun:
     """Test run information"""
-    def __init__(self, config: Config, storage: Storage):
+    def __init__(self, config: Config, storage: Storage) -> None:
         # NodesInfo list
         self.nodes_info = []  # type: List[NodeInfo]
@@ -20,16 +21,18 @@
         self.build_meta = {}  # type: Dict[str,Any]
         self.clear_calls_stack = []  # type: List[Callable[['TestRun'], None]]
-        self.sensors_mon_q = None
         # openstack credentials
         self.fuel_openstack_creds = None  # type: Optional[OSCreds]
         self.os_creds = None  # type: Optional[OSCreds]
         self.os_connection = None  # type: Optional[OSConnection]
+        self.fuel_conn = None  # type: Optional[Connection]
+        self.rpc_code = None  # type: bytes
         self.storage = storage
         self.config = config
         self.sensors_data = SensorDatastore()
+        self.sensors_run_on = set()  # type: Set[str]
     def get_pool(self):
         return ThreadPoolExecutor(self.config.get('worker_pool_sz', 32))