test code working
diff --git a/wally/suits/io/fio.py b/wally/suits/io/fio.py
index e055d98..ee34af4 100644
--- a/wally/suits/io/fio.py
+++ b/wally/suits/io/fio.py
@@ -1,14 +1,18 @@
+import array
 import os.path
 import logging
-from typing import cast
+from typing import cast, Dict
 
 import wally
 
 from ...utils import StopTestError, get_os, ssize2b
 from ...node_interfaces import IRPCNode
-from ..itest import ThreadedTest, IterationConfig, RunTestRes
-from .fio_task_parser import execution_time, fio_cfg_compile, FioJobSection, FioParams, get_log_files
+from ..itest import ThreadedTest, IterationConfig, NodeTestResults
+from ...result_classes import TimeSerie
+from .fio_task_parser import execution_time, fio_cfg_compile, FioJobSection, FioParams, get_log_files, get_test_summary
 from . import rpc_plugin
+from .fio_hist import expected_lat_bins
+from ...storage import Storage
 
 logger = logging.getLogger("wally")
 
@@ -79,8 +83,8 @@
         self.exec_folder = self.config.remote_dir
 
     def config_node(self, node: IRPCNode) -> None:
-        plugin_code = open(rpc_plugin.__file__.rsplit(".", 1)[0] + ".py", "rb").read()
-        node.upload_plugin(code=plugin_code, name="fio")
+        plugin_code = open(rpc_plugin.__file__.rsplit(".", 1)[0] + ".py", "rb").read()  # type: bytes
+        node.upload_plugin("fio", plugin_code)
 
         try:
             node.conn.fs.rmtree(self.config.remote_dir)
@@ -102,9 +106,6 @@
         if fill_bw is not None:
             logger.info("Initial fio fill bw is {} MiBps for {}".format(fill_bw, node))
 
-        fio_config = "\n".join(map(str, self.iterations_configs))
-        node.put_to_file(self.remote_task_file, fio_config.encode("utf8"))
-
     def install_utils(self, node: IRPCNode) -> None:
         os_info = get_os(node)
         if self.use_system_fio:
@@ -131,22 +132,97 @@
     def get_expected_runtime(self, iteration_info: IterationConfig) -> int:
         return execution_time(cast(FioJobSection, iteration_info))
 
-    def do_test(self, node: IRPCNode, iter_config: IterationConfig) -> RunTestRes:
+    def prepare_iteration(self, node: IRPCNode, iter_config: IterationConfig) -> None:
+        node.put_to_file(self.remote_task_file, str(iter_config).encode("utf8"))
+
+    # TODO: get a link to substorage as a parameter
+    def run_iteration(self, node: IRPCNode, iter_config: IterationConfig, substorage: Storage) -> NodeTestResults:
         exec_time = execution_time(cast(FioJobSection, iter_config))
+
         fio_cmd_templ = "cd {exec_folder}; " + \
                         "{fio_path} --output-format=json --output={out_file} --alloc-size=262144 {job_file}"
 
-        bw_log, iops_log, lat_hist_log = get_log_files(iter_config)
-
         cmd = fio_cmd_templ.format(exec_folder=self.exec_folder,
                                    fio_path=self.fio_path,
                                    out_file=self.remote_output_file,
                                    job_file=self.remote_task_file)
-        raw_res = node.run(cmd, timeout=exec_time + max(300, exec_time))
-        
-        return
+        must_be_empty = node.run(cmd, timeout=exec_time + max(300, exec_time), check_timeout=1).strip()
 
-        # TODO(koder): fix next error
-        # raise NotImplementedError("Need to extract time from test result")
-        # return raw_res, (0, 0)
+        if must_be_empty:
+            logger.error("Unexpected fio output: %r", must_be_empty)
 
+        res = NodeTestResults(self.__class__.__name__,
+                              node.info.node_id(),
+                              get_test_summary(cast(FioJobSection, iter_config)))
+
+        res.extra_logs['fio'] = node.get_file_content(self.remote_output_file)
+        substorage.store_raw(res.extra_logs['fio'], "fio_raw")
+        node.conn.fs.unlink(self.remote_output_file)
+
+        files = [name for name in node.conn.fs.listdir(self.exec_folder)]
+
+        expected_time_delta = 1000  # 1000ms == 1s
+        max_time_diff = 50  # 50ms - 5%
+
+        for name, path in get_log_files(cast(FioJobSection, iter_config)):
+            log_files = [fname for fname in files if fname.startswith(path)]
+            if len(log_files) != 1:
+                logger.error("Found %s files, match log pattern %s(%s) - %s",
+                             len(log_files), path, name, ",".join(log_files[10:]))
+                raise StopTestError()
+
+            fname = os.path.join(self.exec_folder, log_files[0])
+            raw_result = node.get_file_content(fname)  # type: bytes
+            substorage.store_raw(raw_result, "{}_raw".format(name))
+            node.conn.fs.unlink(fname)
+
+            try:
+                log_data = raw_result.decode("utf8").split("\n")
+            except UnicodeEncodeError:
+                logger.exception("Error during parse %s fio log file - can't decode usint UTF8", name)
+                raise StopTestError()
+
+            parsed = array.array('L' if name == 'lat' else 'Q')
+            prev_ts = None
+            load_start_at = None
+
+            # TODO: need to adjust vals for timedelta
+            for idx, line in enumerate(log_data):
+                line = line.strip()
+                if line:
+                    try:
+                        time_ms_s, val_s, _, *rest = line.split(",")
+                        time_ms = int(time_ms_s.strip())
+                        if prev_ts and abs(time_ms - prev_ts - expected_time_delta) > max_time_diff:
+                            logger.warning("Too large gap in {} log at {} - {}ms"
+                                           .format(time_ms, name, time_ms - prev_ts))
+                        else:
+                            prev_ts = time_ms - expected_time_delta
+                            load_start_at = time_ms
+                        if name == 'lat':
+                            vals = [int(i.strip()) for i in rest]
+
+                            if len(vals) != expected_lat_bins:
+                                logger.error("Expect {} bins in latency histogram, but found {} at time {}"
+                                             .format(expected_lat_bins, len(vals), time_ms_s))
+                                raise StopTestError()
+
+                            parsed.extend(vals)
+                        else:
+                            parsed.append(int(val_s.strip()))
+                    except ValueError:
+                        logger.exception("Error during parse %s fio log file in line %s: %r", name, idx, line)
+                        raise StopTestError()
+                    prev_ts += expected_time_delta
+
+            res.series[name] = TimeSerie(name=name,
+                                         raw=raw_result,
+                                         second_axis_size=expected_lat_bins if name == 'lat' else 1,
+                                         start_at=load_start_at,
+                                         step=expected_time_delta,
+                                         data=parsed)
+
+            substorage.set_array(parsed, "{}_data".format(name))  # type: ignore
+            substorage["{}_meta".format(name)] = res.series[name].meta()  # type: ignore
+
+        return res