resume working
diff --git a/wally/suits/io/fio.py b/wally/suits/io/fio.py
index ee34af4..fb18165 100644
--- a/wally/suits/io/fio.py
+++ b/wally/suits/io/fio.py
@@ -1,7 +1,7 @@
 import array
 import os.path
 import logging
-from typing import cast, Dict
+from typing import cast, Any
 
 import wally
 
@@ -12,7 +12,7 @@
 from .fio_task_parser import execution_time, fio_cfg_compile, FioJobSection, FioParams, get_log_files, get_test_summary
 from . import rpc_plugin
 from .fio_hist import expected_lat_bins
-from ...storage import Storage
+
 
 logger = logging.getLogger("wally")
 
@@ -21,6 +21,7 @@
     soft_runcycle = 5 * 60
     retry_time = 30
     configs_dir = os.path.dirname(__file__)  # type: str
+    name = 'fio'
 
     def __init__(self, *args, **kwargs) -> None:
         super().__init__(*args, **kwargs)
@@ -136,8 +137,9 @@
         node.put_to_file(self.remote_task_file, str(iter_config).encode("utf8"))
 
     # TODO: get a link to substorage as a parameter
-    def run_iteration(self, node: IRPCNode, iter_config: IterationConfig, substorage: Storage) -> NodeTestResults:
-        exec_time = execution_time(cast(FioJobSection, iter_config))
+    def run_iteration(self, node: IRPCNode, iter_config: IterationConfig, stor_prefix: str) -> NodeTestResults:
+        f_iter_config = cast(FioJobSection, iter_config)
+        exec_time = execution_time(f_iter_config)
 
         fio_cmd_templ = "cd {exec_folder}; " + \
                         "{fio_path} --output-format=json --output={out_file} --alloc-size=262144 {job_file}"
@@ -151,12 +153,10 @@
         if must_be_empty:
             logger.error("Unexpected fio output: %r", must_be_empty)
 
-        res = NodeTestResults(self.__class__.__name__,
-                              node.info.node_id(),
-                              get_test_summary(cast(FioJobSection, iter_config)))
+        res = NodeTestResults(self.__class__.__name__, node.info.node_id(), get_test_summary(f_iter_config))
 
         res.extra_logs['fio'] = node.get_file_content(self.remote_output_file)
-        substorage.store_raw(res.extra_logs['fio'], "fio_raw")
+        self.store_data(res.extra_logs['fio'], "raw", stor_prefix, "fio_raw")
         node.conn.fs.unlink(self.remote_output_file)
 
         files = [name for name in node.conn.fs.listdir(self.exec_folder)]
@@ -164,7 +164,7 @@
         expected_time_delta = 1000  # 1000ms == 1s
         max_time_diff = 50  # 50ms - 5%
 
-        for name, path in get_log_files(cast(FioJobSection, iter_config)):
+        for name, path in get_log_files(f_iter_config):
             log_files = [fname for fname in files if fname.startswith(path)]
             if len(log_files) != 1:
                 logger.error("Found %s files, match log pattern %s(%s) - %s",
@@ -173,7 +173,7 @@
 
             fname = os.path.join(self.exec_folder, log_files[0])
             raw_result = node.get_file_content(fname)  # type: bytes
-            substorage.store_raw(raw_result, "{}_raw".format(name))
+            self.store_data(raw_result, "raw", stor_prefix, "{}_raw".format(name))
             node.conn.fs.unlink(fname)
 
             try:
@@ -222,7 +222,10 @@
                                          step=expected_time_delta,
                                          data=parsed)
 
-            substorage.set_array(parsed, "{}_data".format(name))  # type: ignore
-            substorage["{}_meta".format(name)] = res.series[name].meta()  # type: ignore
+            self.store_data(parsed, "array", stor_prefix, "{}_data".format(name))
+            self.store_data(res.series[name].meta(), "yaml", stor_prefix, "{}_meta".format(name))
 
         return res
+
+    def format_for_console(self, data: Any) -> str:
+        raise NotImplementedError()
diff --git a/wally/suits/io/fio_task_parser.py b/wally/suits/io/fio_task_parser.py
index 8390e3a..6790c97 100644
--- a/wally/suits/io/fio_task_parser.py
+++ b/wally/suits/io/fio_task_parser.py
@@ -11,6 +11,7 @@
 from collections import OrderedDict
 
 
+from ...result_classes import IStorable
 from ..itest import IterationConfig
 from ...utils import sec_to_str, ssize2b
 
@@ -37,7 +38,9 @@
                        ("vm_count", int)])
 
 
-class FioJobSection(IterationConfig):
+class FioJobSection(IterationConfig, IStorable):
+    yaml_tag = 'fio_job'
+
     def __init__(self, name: str) -> None:
         self.name = name
         self.vals = OrderedDict()  # type: Dict[str, Any]
@@ -67,6 +70,20 @@
 
         return res
 
+    def raw(self) -> Dict[str, Any]:
+        return {
+            'name': self.name,
+            'vals': list(map(list, self.vals.items())),
+            'summary': self.summary
+        }
+
+    @classmethod
+    def fromraw(cls, data: Dict[str, Any]) -> 'FioJobSection':
+        obj = cls(data['name'])
+        obj.summary = data['summary']
+        obj.vals.update(data['vals'])
+        return obj
+
 
 class ParseError(ValueError):
     def __init__(self, msg: str, fname: str, lineno: int, line_cont:Optional[str] = "") -> None:
diff --git a/wally/suits/io/rrd.cfg b/wally/suits/io/rrd.cfg
index 1075aea..d32d6a8 100644
--- a/wally/suits/io/rrd.cfg
+++ b/wally/suits/io/rrd.cfg
@@ -1,8 +1,14 @@
 [global]
 include defaults_qd.cfg
-ramp_time=5
-runtime=5
+ramp_time=0
+runtime=4
 
 [test_{TEST_SUMM}]
 blocksize=60k
 rw=randread
+iodepth=1
+
+[test_{TEST_SUMM}]
+iodepth=16
+blocksize=60k
+rw=randread
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index aae475c..78986f6 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -3,14 +3,15 @@
 import logging
 import os.path
 import datetime
-from typing import Dict, Any, List, Optional, Tuple, cast
+from typing import Dict, Any, List, Optional, Callable, cast
 
 from concurrent.futures import ThreadPoolExecutor, wait
 
 from ..utils import Barrier, StopTestError, sec_to_str
 from ..node_interfaces import IRPCNode
 from ..storage import Storage
-from ..result_classes import NodeTestResults, TimeSerie
+from ..result_classes import NodeTestResults, IStorable
+from queue import Queue
 
 
 logger = logging.getLogger("wally")
@@ -45,22 +46,23 @@
         self.remote_dir = remote_dir
 
 
-class IterationConfig:
+class IterationConfig(IStorable):
     name = None  # type: str
     summary = None  # type: str
 
 
-class PerfTest:
+class PerfTest(metaclass=abc.ABCMeta):
     """Base class for all tests"""
     name = None  # type: str
     max_retry = 3
     retry_time = 30
 
-    def __init__(self, config: TestInputConfig) -> None:
+    def __init__(self, config: TestInputConfig, on_idle: Callable[[], None] = None) -> None:
         self.config = config
         self.stop_requested = False
         self.nodes = self.config.nodes  # type: List[IRPCNode]
         self.sorted_nodes_ids = sorted(node.info.node_id() for node in self.nodes)
+        self.on_idle = on_idle
 
     def request_stop(self) -> None:
         self.stop_requested = True
@@ -85,29 +87,22 @@
     max_time_diff = 5
     max_rel_time_diff = 0.05
 
-    def __init__(self, config: TestInputConfig) -> None:
-        PerfTest.__init__(self, config)
+    def __init__(self, *args, **kwargs) -> None:
+        PerfTest.__init__(self, *args, **kwargs)
         self.iterations_configs = [None]  # type: List[Optional[IterationConfig]]
+        self.storage_q = Queue()  # type: Any
 
     @abc.abstractmethod
     def get_expected_runtime(self, iter_cfg: IterationConfig) -> Optional[int]:
         pass
 
     def get_not_done_stages(self, storage: Storage) -> Dict[int, IterationConfig]:
-        done_stages = [int(name_id.split("_")[1])
-                       for is_leaf, name_id in storage.list('result')
-                       if not is_leaf]
-        if len(done_stages) == 0:
-            start_run_id = 0
-        else:
-            start_run_id = max(done_stages) + 1
+        not_done = {}  # type: Dict[int, IterationConfig]
 
-        not_in_storage = {}  # type: Dict[int, IterationConfig]
-
-        for run_id, iteration_config in enumerate(self.iterations_configs[start_run_id:], start_run_id):
+        for run_id, iteration_config in enumerate(self.iterations_configs):
             info_path = "result/{}/info".format(run_id)
             if info_path in storage:
-                info = cast(Dict[str, Any], storage[info_path]) # type: Dict[str, Any]
+                info = cast(Dict[str, Any], storage.get(info_path)) # type: Dict[str, Any]
 
                 assert isinstance(info, dict), \
                     "Broken storage at path {}. Expect test info dict, obtain {!r}".format(info_path, info)
@@ -120,7 +115,7 @@
                 expected_config = {
                     'name': self.name,
                     'iteration_name': iter_name,
-                    'iteration_config': iteration_config,
+                    'iteration_config': iteration_config.raw(),
                     'params': self.config.params,
                     'nodes': self.sorted_nodes_ids
                 }
@@ -132,8 +127,9 @@
 
                 logger.info("Test iteration {} found in storage and will be skipped".format(iter_name))
             else:
-                not_in_storage[run_id] = iteration_config
-        return not_in_storage
+                not_done[run_id] = iteration_config
+
+        return not_done
 
     def run(self) -> None:
         not_in_storage = self.get_not_done_stages(self.config.storage)
@@ -162,6 +158,7 @@
                 iter_name = "Unnamed" if iteration_config is None else iteration_config.name
                 logger.info("Run test iteration %s", iter_name)
 
+                current_result_path = "result/{}_{}".format(iteration_config.summary, run_id)
                 results = []  # type: List[NodeTestResults]
                 for idx in range(self.max_retry):
                     logger.debug("Prepare iteration %s", iter_name)
@@ -175,12 +172,8 @@
                     try:
                         futures = []
                         for node in self.nodes:
-                            path = "result/{}_{}/measurement/{}".format(iteration_config.summary,
-                                                                        run_id,
-                                                                        node.info.node_id())
-                            self.config.storage.clear(path)
-                            mstorage = self.config.storage.sub_storage(path)
-                            futures.append(pool.submit(self.run_iteration, node, iteration_config, mstorage))
+                            path = "{}/measurement/{}".format(current_result_path, node.info.node_id())
+                            futures.append(pool.submit(self.run_iteration, node, iteration_config, path))
 
                         results = [fut.result() for fut in futures]
                         break
@@ -196,11 +189,7 @@
                 stop_times = []  # type: List[int]
 
                 # TODO: FIX result processing - NodeTestResults
-                result = None  # type: NodeTestResults
                 for result in results:
-                    mstorage = self.config.storage.sub_storage("result/{}_{}/measurement/{}"
-                                                               .format(result.summary, run_id, result.node_id))
-                    serie = None   # type: TimeSerie
                     for name, serie in result.series.items():
                         start_times.append(serie.start_at)
                         stop_times.append(serie.step * len(serie.data))
@@ -224,14 +213,43 @@
                 test_config = {
                     'name': self.name,
                     'iteration_name': iter_name,
-                    'iteration_config': iteration_config,
+                    'iteration_config': iteration_config.raw(),
                     'params': self.config.params,
                     'nodes': self.sorted_nodes_ids,
                     'begin_time': min_start_time,
                     'end_time': max_stop_time
                 }
 
-                self.config.storage["result", str(run_id), "info"] = test_config  # type: ignore
+                self.process_storage_queue()
+                self.config.storage.put(test_config, "result", str(run_id), "info")
+
+                if "all_results" in self.config.storage:
+                    all_results = self.config.storage.get("all_results")
+                else:
+                    all_results = []
+
+                all_results.append([self.name, iteration_config.summary, current_result_path])
+                self.config.storage.put(all_results, "all_results")
+                self.config.storage.sync()
+
+                if self.on_idle is not None:
+                    self.on_idle()
+
+    def store_data(self, val: Any, type: str, prefix: str, *path: str) -> None:
+        self.storage_q.put((val, type, prefix, path))
+
+    def process_storage_queue(self) -> None:
+        while not self.storage_q.empty():
+            value, val_type, subpath, val_path = self.storage_q.get()
+            if val_type == 'raw':
+                self.config.storage.put_raw(value, subpath, *val_path)
+            elif val_type == 'yaml':
+                self.config.storage.put(value, subpath, *val_path)
+            elif val_type == 'array':
+                self.config.storage.put_array(value, subpath, *val_path)
+            else:
+                logger.error("Internal logic error - unknown data stop type {!r}".format(val_path))
+                raise StopTestError()
 
     @abc.abstractmethod
     def config_node(self, node: IRPCNode) -> None:
@@ -242,7 +260,7 @@
         pass
 
     @abc.abstractmethod
-    def run_iteration(self, node: IRPCNode, iter_config: IterationConfig, substorage: Storage) -> NodeTestResults:
+    def run_iteration(self, node: IRPCNode, iter_config: IterationConfig, stor_prefix: str) -> NodeTestResults:
         pass
 
 
@@ -269,7 +287,7 @@
     def prepare_iteration(self, node: IRPCNode, iter_config: IterationConfig) -> None:
         pass
 
-    def run_iteration(self, node: IRPCNode, iter_config: IterationConfig, substorage: Storage) -> NodeTestResults:
+    def run_iteration(self, node: IRPCNode, iter_config: IterationConfig, stor_prefix: str) -> NodeTestResults:
         # TODO: have to store logs
         cmd = self.join_remote(self.run_script)
         cmd += ' ' + self.config.params.get('run_opts', '')