2.0 is on the way
diff --git a/wally/suits/io/fio.py b/wally/suits/io/fio.py
index 2360a55..0f4ebde 100644
--- a/wally/suits/io/fio.py
+++ b/wally/suits/io/fio.py
@@ -1,970 +1,128 @@
-import re
-import time
-import json
-import stat
-import random
-import hashlib
 import os.path
 import logging
-import datetime
-import functools
-import collections
-from typing import Dict, List, Callable, Any, Tuple, Optional
-
-import yaml
-import texttable
-from paramiko.ssh_exception import SSHException
-from concurrent.futures import ThreadPoolExecutor, wait
+from typing import Dict, List, Union, cast
 
 import wally
 
-from ...pretty_yaml import dumps
-from ...statistic import round_3_digit, data_property, average
-from ...utils import ssize2b, sec_to_str, StopTestError, Barrier, get_os
-from ...inode import INode
-
-from ..itest import (TimeSeriesValue, PerfTest, TestResults, TestConfig)
-
-from .fio_task_parser import (execution_time, fio_cfg_compile,
-                              get_test_summary, get_test_summary_tuple,
-                              get_test_sync_mode, FioJobSection)
-from .rpc_plugin import parse_fio_result
+from ...utils import ssize2b, StopTestError, get_os
+from ...node_interfaces import IRPCNode
+from ..itest import ThreadedTest, IterationConfig, RunTestRes
+from .fio_task_parser import execution_time, fio_cfg_compile, FioJobSection, FioParams
 
 
 logger = logging.getLogger("wally")
 
 
-class NoData:
-    pass
-
-
-def cached_prop(func: Callable[..., Any]) -> Callable[..., Any]:
-    @property
-    @functools.wraps(func)
-    def closure(self) -> Any:
-        val = getattr(self, "_" + func.__name__)
-        if val is NoData:
-            val = func(self)
-            setattr(self, "_" + func.__name__, val)
-        return val
-    return closure
-
-
-def load_fio_log_file(fname: str) -> TimeSeriesValue:
-    with open(fname) as fd:
-        it = [ln.split(',')[:2] for ln in fd]
-
-    vals = [(float(off) / 1000,  # convert us to ms
-             float(val.strip()) + 0.5)  # add 0.5 to compemsate average value
-                                        # as fio trimm all values in log to integer
-            for off, val in it]
-
-    return TimeSeriesValue(vals)
-
-
-READ_IOPS_DISCSTAT_POS = 3
-WRITE_IOPS_DISCSTAT_POS = 7
-
-
-def load_sys_log_file(ftype: str, fname: str) -> TimeSeriesValue:
-    assert ftype == 'iops'
-    pval = None
-    with open(fname) as fd:
-        iops = []
-        for ln in fd:
-            params = ln.split()
-            cval = int(params[WRITE_IOPS_DISCSTAT_POS]) + \
-                int(params[READ_IOPS_DISCSTAT_POS])
-            if pval is not None:
-                iops.append(cval - pval)
-            pval = cval
-
-    vals = [(idx * 1000, val) for idx, val in enumerate(iops)]
-    return TimeSeriesValue(vals)
-
-
-def load_test_results(folder: str, run_num: int) -> 'FioRunResult':
-    res = {}
-    params = None
-
-    fn = os.path.join(folder, str(run_num) + '_params.yaml')
-    params = yaml.load(open(fn).read())
-
-    conn_ids_set = set()
-    rr = r"{}_(?P<conn_id>.*?)_(?P<type>[^_.]*)\.\d+\.log$".format(run_num)
-    for fname in os.listdir(folder):
-        rm = re.match(rr, fname)
-        if rm is None:
-            continue
-
-        conn_id_s = rm.group('conn_id')
-        conn_id = conn_id_s.replace('_', ':')
-        ftype = rm.group('type')
-
-        if ftype not in ('iops', 'bw', 'lat'):
-            continue
-
-        ts = load_fio_log_file(os.path.join(folder, fname))
-        res.setdefault(ftype, {}).setdefault(conn_id, []).append(ts)
-
-        conn_ids_set.add(conn_id)
-
-    rr = r"{}_(?P<conn_id>.*?)_(?P<type>[^_.]*)\.sys\.log$".format(run_num)
-    for fname in os.listdir(folder):
-        rm = re.match(rr, fname)
-        if rm is None:
-            continue
-
-        conn_id_s = rm.group('conn_id')
-        conn_id = conn_id_s.replace('_', ':')
-        ftype = rm.group('type')
-
-        if ftype not in ('iops', 'bw', 'lat'):
-            continue
-
-        ts = load_sys_log_file(ftype, os.path.join(folder, fname))
-        res.setdefault(ftype + ":sys", {}).setdefault(conn_id, []).append(ts)
-
-        conn_ids_set.add(conn_id)
-
-    mm_res = {}
-
-    if len(res) == 0:
-        raise ValueError("No data was found")
-
-    for key, data in res.items():
-        conn_ids = sorted(conn_ids_set)
-        awail_ids = [conn_id for conn_id in conn_ids if conn_id in data]
-        matr = [data[conn_id] for conn_id in awail_ids]
-        mm_res[key] = MeasurementMatrix(matr, awail_ids)
-
-    raw_res = {}
-    for conn_id in conn_ids:
-        fn = os.path.join(folder, "{0}_{1}_rawres.json".format(run_num, conn_id_s))
-
-        # remove message hack
-        fc = "{" + open(fn).read().split('{', 1)[1]
-        raw_res[conn_id] = json.loads(fc)
-
-    fio_task = FioJobSection(params['name'])
-    fio_task.vals.update(params['vals'])
-
-    config = TestConfig('io', params, None, params['nodes'], folder, None)
-    return FioRunResult(config, fio_task, mm_res, raw_res, params['intervals'], run_num)
-
-
-class Attrmapper:
-    def __init__(self, dct: Dict[str, Any]):
-        self.__dct = dct
-
-    def __getattr__(self, name):
-        try:
-            return self.__dct[name]
-        except KeyError:
-            raise AttributeError(name)
-
-
-class DiskPerfInfo:
-    def __init__(self, name: str, summary: str, params: Dict[str, Any], testnodes_count: int):
-        self.name = name
-        self.bw = None
-        self.iops = None
-        self.lat = None
-        self.lat_50 = None
-        self.lat_95 = None
-        self.lat_avg = None
-
-        self.raw_bw = []
-        self.raw_iops = []
-        self.raw_lat = []
-
-        self.params = params
-        self.testnodes_count = testnodes_count
-        self.summary = summary
-        self.p = Attrmapper(self.params['vals'])
-
-        self.sync_mode = get_test_sync_mode(self.params['vals'])
-        self.concurence = self.params['vals'].get('numjobs', 1)
-
-
-def get_lat_perc_50_95(lat_mks: List[float]) -> Tuple[float, float]:
-    curr_perc = 0
-    perc_50 = None
-    perc_95 = None
-    pkey = None
-    for key, val in sorted(lat_mks.items()):
-        if curr_perc + val >= 50 and perc_50 is None:
-            if pkey is None or val < 1.:
-                perc_50 = key
-            else:
-                perc_50 = (50. - curr_perc) / val * (key - pkey) + pkey
-
-        if curr_perc + val >= 95:
-            if pkey is None or val < 1.:
-                perc_95 = key
-            else:
-                perc_95 = (95. - curr_perc) / val * (key - pkey) + pkey
-            break
-
-        pkey = key
-        curr_perc += val
-
-    # for k, v in sorted(lat_mks.items()):
-    #     if k / 1000 > 0:
-    #         print "{0:>4}".format(k / 1000), v
-
-    # print perc_50 / 1000., perc_95 / 1000.
-    # exit(1)
-    return perc_50 / 1000., perc_95 / 1000.
-
-
-class IOTestResults:
-    def __init__(self, suite_name: str, fio_results: 'FioRunResult', log_directory: str):
-        self.suite_name = suite_name
-        self.fio_results = fio_results
-        self.log_directory = log_directory
-
-    def __iter__(self):
-        return iter(self.fio_results)
-
-    def __len__(self):
-        return len(self.fio_results)
-
-    def get_yamable(self) -> Dict[str, List[str]]:
-        items = [(fio_res.summary(), fio_res.idx) for fio_res in self]
-        return {self.suite_name: [self.log_directory] + items}
-
-
-class FioRunResult(TestResults):
-    """
-    Fio run results
-    config: TestConfig
-    fio_task: FioJobSection
-    ts_results: {str: MeasurementMatrix[TimeSeriesValue]}
-    raw_result: ????
-    run_interval:(float, float) - test tun time, used for sensors
-    """
-    def __init__(self, config, fio_task, ts_results, raw_result, run_interval, idx):
-
-        self.name = fio_task.name.rsplit("_", 1)[0]
-        self.fio_task = fio_task
-        self.idx = idx
-
-        self.bw = ts_results['bw']
-        self.lat = ts_results['lat']
-        self.iops = ts_results['iops']
-
-        if 'iops:sys' in ts_results:
-            self.iops_sys = ts_results['iops:sys']
-        else:
-            self.iops_sys = None
-
-        res = {"bw": self.bw,
-               "lat": self.lat,
-               "iops": self.iops,
-               "iops:sys": self.iops_sys}
-
-        self.sensors_data = None
-        self._pinfo = None
-        TestResults.__init__(self, config, res, raw_result, run_interval)
-
-    def get_params_from_fio_report(self):
-        nodes = self.bw.connections_ids
-
-        iops = [self.raw_result[node]['jobs'][0]['mixed']['iops'] for node in nodes]
-        total_ios = [self.raw_result[node]['jobs'][0]['mixed']['total_ios'] for node in nodes]
-        runtime = [self.raw_result[node]['jobs'][0]['mixed']['runtime'] / 1000 for node in nodes]
-        flt_iops = [float(ios) / rtime for ios, rtime in zip(total_ios, runtime)]
-
-        bw = [self.raw_result[node]['jobs'][0]['mixed']['bw'] for node in nodes]
-        total_bytes = [self.raw_result[node]['jobs'][0]['mixed']['io_bytes'] for node in nodes]
-        flt_bw = [float(tbytes) / rtime for tbytes, rtime in zip(total_bytes, runtime)]
-
-        return {'iops': iops,
-                'flt_iops': flt_iops,
-                'bw': bw,
-                'flt_bw': flt_bw}
-
-    def summary(self):
-        return get_test_summary(self.fio_task, len(self.config.nodes))
-
-    def summary_tpl(self):
-        return get_test_summary_tuple(self.fio_task, len(self.config.nodes))
-
-    def get_lat_perc_50_95_multy(self):
-        lat_mks = collections.defaultdict(lambda: 0)
-        num_res = 0
-
-        for result in self.raw_result.values():
-            num_res += len(result['jobs'])
-            for job_info in result['jobs']:
-                for k, v in job_info['latency_ms'].items():
-                    if isinstance(k, basestring) and k.startswith('>='):
-                        lat_mks[int(k[2:]) * 1000] += v
-                    else:
-                        lat_mks[int(k) * 1000] += v
-
-                for k, v in job_info['latency_us'].items():
-                    lat_mks[int(k)] += v
-
-        for k, v in lat_mks.items():
-            lat_mks[k] = float(v) / num_res
-        return get_lat_perc_50_95(lat_mks)
-
-    def disk_perf_info(self, avg_interval=2.0):
-
-        if self._pinfo is not None:
-            return self._pinfo
-
-        testnodes_count = len(self.config.nodes)
-
-        pinfo = DiskPerfInfo(self.name,
-                             self.summary(),
-                             self.params,
-                             testnodes_count)
-
-        def prepare(data, drop=1):
-            if data is None:
-                return data
-
-            res = []
-            for ts_data in data:
-                if ts_data.average_interval() < avg_interval:
-                    ts_data = ts_data.derived(avg_interval)
-
-                # drop last value on bounds
-                # as they may contains ranges without activities
-                assert len(ts_data.values) >= drop + 1, str(drop) + " " + str(ts_data.values)
-
-                if drop > 0:
-                    res.append(ts_data.values[:-drop])
-                else:
-                    res.append(ts_data.values)
-
-            return res
-
-        def agg_data(matr):
-            arr = sum(matr, [])
-            min_len = min(map(len, arr))
-            res = []
-            for idx in range(min_len):
-                res.append(sum(dt[idx] for dt in arr))
-            return res
-
-        pinfo.raw_lat = map(prepare, self.lat.per_vm())
-        num_th = sum(map(len, pinfo.raw_lat))
-        lat_avg = [val / num_th for val in agg_data(pinfo.raw_lat)]
-        pinfo.lat_avg = data_property(lat_avg).average / 1000  # us to ms
-
-        pinfo.lat_50, pinfo.lat_95 = self.get_lat_perc_50_95_multy()
-        pinfo.lat = pinfo.lat_50
-
-        pinfo.raw_bw = map(prepare, self.bw.per_vm())
-        pinfo.raw_iops = map(prepare, self.iops.per_vm())
-
-        if self.iops_sys is not None:
-            pinfo.raw_iops_sys = map(prepare, self.iops_sys.per_vm())
-            pinfo.iops_sys = data_property(agg_data(pinfo.raw_iops_sys))
-        else:
-            pinfo.raw_iops_sys = None
-            pinfo.iops_sys = None
-
-        fparams = self.get_params_from_fio_report()
-        fio_report_bw = sum(fparams['flt_bw'])
-        fio_report_iops = sum(fparams['flt_iops'])
-
-        agg_bw = agg_data(pinfo.raw_bw)
-        agg_iops = agg_data(pinfo.raw_iops)
-
-        log_bw_avg = average(agg_bw)
-        log_iops_avg = average(agg_iops)
-
-        # update values to match average from fio report
-        coef_iops = fio_report_iops / float(log_iops_avg)
-        coef_bw = fio_report_bw / float(log_bw_avg)
-
-        bw_log = data_property([val * coef_bw for val in agg_bw])
-        iops_log = data_property([val * coef_iops for val in agg_iops])
-
-        bw_report = data_property([fio_report_bw])
-        iops_report = data_property([fio_report_iops])
-
-        # When IOPS/BW per thread is too low
-        # data from logs is rounded to match
-        iops_per_th = sum(sum(pinfo.raw_iops, []), [])
-        if average(iops_per_th) > 10:
-            pinfo.iops = iops_log
-            pinfo.iops2 = iops_report
-        else:
-            pinfo.iops = iops_report
-            pinfo.iops2 = iops_log
-
-        bw_per_th = sum(sum(pinfo.raw_bw, []), [])
-        if average(bw_per_th) > 10:
-            pinfo.bw = bw_log
-            pinfo.bw2 = bw_report
-        else:
-            pinfo.bw = bw_report
-            pinfo.bw2 = bw_log
-
-        self._pinfo = pinfo
-
-        return pinfo
-
-
-class IOPerfTest(PerfTest):
-    tcp_conn_timeout = 30
-    max_pig_timeout = 5
+class IOPerfTest(ThreadedTest):
     soft_runcycle = 5 * 60
     retry_time = 30
+    configs_dir = os.path.dirname(__file__)  # type: str
 
-    zero_md5_hash = hashlib.md5()
-    zero_md5_hash.update(b"\x00" * 1024)
-    zero_md5 = zero_md5_hash.hexdigest()
-
-    def __init__(self, config):
-        PerfTest.__init__(self, config)
+    def __init__(self, *args, **kwargs) -> None:
+        super().__init__(*args, **kwargs)
 
         get = self.config.params.get
-        do_get = self.config.params.__getitem__
 
-        self.config_fname = do_get('cfg')
+        self.load_profile_name = self.config.params['load']  # type: str
+        self.name = "io." + self.load_profile_name
 
-        if '/' not in self.config_fname and '.' not in self.config_fname:
-            cfgs_dir = os.path.dirname(__file__)
-            self.config_fname = os.path.join(cfgs_dir,
-                                             self.config_fname + '.cfg')
-
-        self.alive_check_interval = get('alive_check_interval')
-        self.use_system_fio = get('use_system_fio', False)
-
-        if get('prefill_files') is not None:
-            logger.warning("prefill_files option is depricated. Use force_prefill instead")
-
-        self.force_prefill = get('force_prefill', False)
-        self.config_params = get('params', {}).copy()
-
-        self.io_py_remote = self.join_remote("agent.py")
-        self.results_file = self.join_remote("results.json")
-        self.pid_file = self.join_remote("pid")
-        self.task_file = self.join_remote("task.cfg")
-        self.sh_file = self.join_remote("cmd.sh")
-        self.err_out_file = self.join_remote("fio_err_out")
-        self.io_log_file = self.join_remote("io_log.txt")
-        self.exit_code_file = self.join_remote("exit_code")
-
-        self.max_latency = get("max_lat", None)
-        self.min_bw_per_thread = get("min_bw", None)
-
-        self.use_sudo = get("use_sudo", True)
-
-        self.raw_cfg = open(self.config_fname).read()
-        self.fio_configs = None
-
-    @classmethod
-    def load(cls, suite_name: str, folder: str) -> IOTestResults:
-        res = []
-        for fname in os.listdir(folder):
-            if re.match("\d+_params.yaml$", fname):
-                num = int(fname.split('_')[0])
-                res.append(load_test_results(folder, num))
-        return IOTestResults(suite_name, res, folder)
-
-    def cleanup(self):
-        # delete_file(conn, self.io_py_remote)
-        # Need to remove tempo files, used for testing
-        pass
-
-    # size is megabytes
-    def check_prefill_required(self, node: INode, fname: str, size: int, num_blocks: Optional[int]=16) -> bool:
-        try:
-            fstats = node.stat_file(fname)
-            if stat.S_ISREG(fstats.st_mode) and fstats.st_size < size * 1024 ** 2:
-                return True
-        except EnvironmentError:
-            return True
-
-        cmd = 'python -c "' + \
-              "import sys;" + \
-              "fd = open('{0}', 'rb');" + \
-              "fd.seek({1});" + \
-              "data = fd.read(1024); " + \
-              "sys.stdout.write(data + ' ' * ( 1024 - len(data)))\" | md5sum"
-
-        if self.use_sudo:
-            cmd = "sudo " + cmd
-
-        bsize = size * (1024 ** 2)
-        offsets = [random.randrange(bsize - 1024) for _ in range(num_blocks)]
-        offsets.append(bsize - 1024)
-        offsets.append(0)
-
-        for offset in offsets:
-            data = node.run(cmd.format(fname, offset), nolog=True)
-
-            md = ""
-            for line in data.split("\n"):
-                if "unable to resolve" not in line:
-                    md = line.split()[0].strip()
-                    break
-
-            if len(md) != 32:
-                logger.error("File data check is failed - " + data)
-                return True
-
-            if self.zero_md5 == md:
-                return True
-
-        return False
-
-    def prefill_test_files(self, node: INode, files: List[str], force:bool=False) -> None:
-        if self.use_system_fio:
-            cmd_templ = "fio "
+        if os.path.isfile(self.load_profile_name):
+            self.load_profile_path = os.path.join(self.configs_dir, self.load_profile_name+ '.cfg')  # type: str
         else:
-            cmd_templ = "{0}/fio ".format(self.config.remote_dir)
+            self.load_profile_path = self.load_profile_name
 
-        if self.use_sudo:
-            cmd_templ = "sudo " + cmd_templ
+        self.load_profile = open(self.load_profile_path, 'rt').read()  # type: str
 
-        cmd_templ += "--name=xxx --filename={0} --direct=1" + \
-                     " --bs=4m --size={1}m --rw=write"
-
-        ssize = 0
-
-        if force:
-            logger.info("File prefilling is forced")
-
-        ddtime = 0
-        for fname, curr_sz in files.items():
-            if not force:
-                if not self.check_prefill_required(node, fname, curr_sz):
-                    logger.debug("prefill is skipped")
-                    continue
-
-            logger.info("Prefilling file {0}".format(fname))
-            cmd = cmd_templ.format(fname, curr_sz)
-            ssize += curr_sz
-
-            stime = time.time()
-            node.run(cmd, timeout=curr_sz)
-            ddtime += time.time() - stime
-
-        if ddtime > 1.0:
-            fill_bw = int(ssize / ddtime)
-            mess = "Initiall fio fill bw is {0} MiBps for this vm"
-            logger.info(mess.format(fill_bw))
-
-    def install_utils(self, node: INode) -> None:
-        need_install = []
-        packs = [('screen', 'screen')]
-        os_info = get_os(node)
+        self.use_system_fio = get('use_system_fio', False)  # type: bool
 
         if self.use_system_fio:
-            packs.append(('fio', 'fio'))
+            self.fio_path = "fio"    # type: str
         else:
-            packs.append(('bzip2', 'bzip2'))
+            self.fio_path = os.path.join(self.config.remote_dir, "fio")
 
-        for bin_name, package in packs:
-            if bin_name is None:
-                need_install.append(package)
-                continue
+        self.force_prefill = get('force_prefill', False)  # type: bool
 
-            try:
-                node.run('which ' + bin_name, nolog=True)
-            except OSError:
-                need_install.append(package)
-
-        if len(need_install) != 0:
-            if 'redhat' == os_info.distro:
-                cmd = "sudo yum -y install " + " ".join(need_install)
-            else:
-                cmd = "sudo apt-get -y install " + " ".join(need_install)
-
-            try:
-                node.run(cmd)
-            except OSError as err:
-                raise OSError("Can't install - {}".format(" ".join(need_install))) from err
-
-        if not self.use_system_fio:
-            fio_dir = os.path.dirname(os.path.dirname(wally.__file__))
-            fio_dir = os.path.join(os.getcwd(), fio_dir)
-            fio_dir = os.path.join(fio_dir, 'fio_binaries')
-            fname = 'fio_{0.release}_{0.arch}.bz2'.format(os_info)
-            fio_path = os.path.join(fio_dir, fname)
-
-            if not os.path.exists(fio_path):
-                raise RuntimeError("No prebuild fio binary available for {0}".format(os_info))
-
-            bz_dest = self.join_remote('fio.bz2')
-            node.copy_file(fio_path, bz_dest)
-            node.run("bzip2 --decompress {}" + bz_dest, nolog=True)
-            node.run("chmod a+x " + self.join_remote("fio"), nolog=True)
-
-    def pre_run(self) -> None:
-        if 'FILESIZE' not in self.config_params:
+        if 'FILESIZE' not in self.config.params:
             raise NotImplementedError("File size detection is not implemented")
 
-        self.fio_configs = fio_cfg_compile(self.raw_cfg,
-                                           self.config_fname,
-                                           self.config_params)
-        self.fio_configs = list(self.fio_configs)
+        # self.max_latency = get("max_lat")  # type: Optional[int]
+        # self.min_bw_per_thread = get("min_bw")   # type: Optional[int]
 
-        files = {}
+        self.use_sudo = get("use_sudo", True)  # type: bool
+
+        self.fio_configs = list(fio_cfg_compile(self.load_profile,
+                                                self.load_profile_path,
+                                                cast(FioParams, self.config.params)))
+
+        if len(self.fio_configs) == 0:
+            logger.exception("Empty fio config provided")
+            raise StopTestError("Empty fio config provided")
+
+        self.iterations_configs = self.fio_configs  # type: ignore
+        self.files_sizes = self.get_file_sizes()
+
+        self.exec_folder = self.config.remote_dir
+        self.fio_path = "" if self.use_system_fio else self.exec_folder
+
+    def get_file_sizes(self) -> Dict[str, int]:
+        files_sizes = {}  # type: Dict[str, int]
+
         for section in self.fio_configs:
             sz = ssize2b(section.vals['size'])
-            msz = sz / (1024 ** 2)
-
-            if sz % (1024 ** 2) != 0:
-                msz += 1
-
-            fname = section.vals['filename']
+            msz = sz // (1024 ** 2) + (1 if sz % (1024 ** 2) != 0 else 0)
+            fname = section.vals['filename']  # type: str
 
             # if already has other test with the same file name
             # take largest size
-            files[fname] = max(files.get(fname, 0), msz)
+            files_sizes[fname] = max(files_sizes.get(fname, 0), msz)
 
-        with ThreadPoolExecutor(len(self.config.nodes)) as pool:
-            fc = functools.partial(self.pre_run_th,
-                                   files=files,
-                                   force=self.force_prefill)
-            list(pool.map(fc, self.config.nodes))
+        return files_sizes
 
-    def pre_run_th(self, node: INode, files: List[str], force_prefil: Optional[bool]=False) -> None:
+    def config_node(self, node: IRPCNode) -> None:
         try:
-            cmd = 'mkdir -p "{0}"'.format(self.config.remote_dir)
-            if self.use_sudo:
-                cmd = "sudo " + cmd
-                cmd += " ; sudo chown {0} {1}".format(node.get_user(),
-                                                      self.config.remote_dir)
-            node.run(cmd, nolog=True)
-
-            assert self.config.remote_dir != "" and self.config.remote_dir != "/"
-            node.run("rm -rf {}/*".format(self.config.remote_dir), nolog=True)
-
+            node.conn.rmdir(self.config.remote_dir, recursive=True, ignore_missing=True)
+            node.conn.mkdir(self.config.remote_dir)
         except Exception as exc:
-            msg = "Failed to create folder {} on remote {}."
-            msg = msg.format(self.config.remote_dir, node, exc)
+            msg = "Failed to create folder {} on remote {}.".format(self.config.remote_dir, node, exc)
             logger.exception(msg)
             raise StopTestError(msg) from exc
 
         self.install_utils(node)
-        self.prefill_test_files(node, files, force_prefil)
+        logger.info("Prefilling test files with random data")
+        fill_bw = node.conn.prefill_test_files(self.files_sizes, force=self.force_prefill, fio_path=self.fio_path)
+        if fill_bw is not None:
+            logger.info("Initial fio fill bw is {} MiBps for {}".format(fill_bw, node.info.node_id()))
 
-    def show_expected_execution_time(self) -> None:
-        if len(self.fio_configs) > 1:
-            # +10% - is a rough estimation for additional operations
-            # like sftp, etc
-            exec_time = int(sum(map(execution_time, self.fio_configs)) * 1.1)
-            exec_time_s = sec_to_str(exec_time)
-            now_dt = datetime.datetime.now()
-            end_dt = now_dt + datetime.timedelta(0, exec_time)
-            msg = "Entire test should takes aroud: {0} and finished at {1}"
-            logger.info(msg.format(exec_time_s,
-                                   end_dt.strftime("%H:%M:%S")))
-
-    def run(self) -> IOTestResults:
-        logger.debug("Run preparation")
-        self.pre_run()
-        self.show_expected_execution_time()
-        num_nodes = len(self.config.nodes)
-
-        tname = os.path.basename(self.config_fname)
-        if tname.endswith('.cfg'):
-            tname = tname[:-4]
-
-        barrier = Barrier(num_nodes)
-        results = []
-
-        # set of Operation_Mode_BlockSize str's
-        # which should not be tested anymore, as
-        # they already too slow with previous thread count
-        lat_bw_limit_reached = set()
-
-        with ThreadPoolExecutor(num_nodes) as pool:
-            for pos, fio_cfg in enumerate(self.fio_configs):
-                test_descr = get_test_summary(fio_cfg.vals, noqd=True)
-                if test_descr in lat_bw_limit_reached:
-                    continue
-
-                logger.info("Will run {} test".format(fio_cfg.name))
-                templ = "Test should takes about {}. Should finish at {}, will wait at most till {}"
-                exec_time = execution_time(fio_cfg)
-                exec_time_str = sec_to_str(exec_time)
-                timeout = int(exec_time + max(300, exec_time))
-
-                now_dt = datetime.datetime.now()
-                end_dt = now_dt + datetime.timedelta(0, exec_time)
-                wait_till = now_dt + datetime.timedelta(0, timeout)
-
-                logger.info(templ.format(exec_time_str,
-                                         end_dt.strftime("%H:%M:%S"),
-                                         wait_till.strftime("%H:%M:%S")))
-
-                run_test_func = functools.partial(self.do_run,
-                                                  barrier=barrier,
-                                                  fio_cfg=fio_cfg,
-                                                  pos=pos)
-
-                max_retr = 3
-                for idx in range(max_retr):
-                    if 0 != idx:
-                        logger.info("Sleeping %ss and retrying", self.retry_time)
-                        time.sleep(self.retry_time)
-
-                    try:
-                        intervals = list(pool.map(run_test_func, self.config.nodes))
-                        if None not in intervals:
-                            break
-                    except (EnvironmentError, SSHException) as exc:
-                        if max_retr - 1 == idx:
-                            raise StopTestError("Fio failed") from exc
-                        logger.exception("During fio run")
-
-                fname = "{}_task.fio".format(pos)
-                with open(os.path.join(self.config.log_directory, fname), "w") as fd:
-                    fd.write(str(fio_cfg))
-
-                params = {'vm_count': num_nodes}
-                params['name'] = fio_cfg.name
-                params['vals'] = dict(fio_cfg.vals.items())
-                params['intervals'] = intervals
-                params['nodes'] = [node.get_conn_id() for node in self.config.nodes]
-
-                fname = "{}_params.yaml".format(pos)
-                with open(os.path.join(self.config.log_directory, fname), "w") as fd:
-                    fd.write(dumps(params))
-
-                res = load_test_results(self.config.log_directory, pos)
-                results.append(res)
-
-                if self.max_latency is not None:
-                    lat_50, _ = res.get_lat_perc_50_95_multy()
-
-                    # conver us to ms
-                    if self.max_latency < lat_50:
-                        logger.info(("Will skip all subsequent tests of {} " +
-                                     "due to lat/bw limits").format(fio_cfg.name))
-                        lat_bw_limit_reached.add(test_descr)
-
-                test_res = res.get_params_from_fio_report()
-                if self.min_bw_per_thread is not None:
-                    if self.min_bw_per_thread > average(test_res['bw']):
-                        lat_bw_limit_reached.add(test_descr)
-
-        return IOTestResults(self.config.params['cfg'],
-                             results, self.config.log_directory)
-
-    def do_run(self, node: INode, barrier: Barrier, fio_cfg, pos: int, nolog: bool=False):
-        exec_folder = self.config.remote_dir
-
+    def install_utils(self, node: IRPCNode) -> None:
         if self.use_system_fio:
-            fio_path = ""
-        else:
-            if not exec_folder.endswith("/"):
-                fio_path = exec_folder + "/"
-            else:
-                fio_path = exec_folder
+            node.conn.install('fio', binary='fio')
 
-        exec_time = execution_time(fio_cfg)
-        barrier.wait()
-        run_data = node.rpc.fio.run_fio(self.use_sudo,
-                                        fio_path,
-                                        exec_folder,
-                                        str(fio_cfg),
+        if not self.use_system_fio:
+            os_info = get_os(node)
+            fio_dir = os.path.dirname(os.path.dirname(wally.__file__))  # type: str
+            fio_dir = os.path.join(os.getcwd(), fio_dir)
+            fio_dir = os.path.join(fio_dir, 'fio_binaries')
+            fname = 'fio_{0.release}_{0.arch}.bz2'.format(os_info)
+            fio_path = os.path.join(fio_dir, fname)  # type: str
+
+            if not os.path.exists(fio_path):
+                raise RuntimeError("No prebuild fio binary available for {0}".format(os_info))
+
+            bz_dest = self.join_remote('fio.bz2')  # type: str
+            node.copy_file(fio_path, bz_dest)
+            node.run("bzip2 --decompress {}" + bz_dest)
+            node.run("chmod a+x " + self.join_remote("fio"))
+
+    def get_expected_runtime(self, iteration_info: IterationConfig) -> int:
+        return execution_time(cast(FioJobSection, iteration_info))
+
+    def do_test(self, node: IRPCNode, iter_config: IterationConfig) -> RunTestRes:
+        exec_time = execution_time(cast(FioJobSection, iter_config))
+        raw_res = node.conn.fio.run_fio(self.fio_path,
+                                        self.exec_folder,
+                                        str(cast(FioJobSection, iter_config)),
                                         exec_time + max(300, exec_time))
-        return parse_fio_result(run_data)
+        # TODO(koder): fix next error
+        raise NotImplementedError("Need to extract time from test result")
+        return raw_res, (0, 0)
 
-    @classmethod
-    def prepare_data(cls, results) -> List[Dict[str, Any]]:
-        """create a table with io performance report for console"""
-
-        def key_func(data: FioRunResult) -> Tuple[str, str, str, str, int]:
-            tpl = data.summary_tpl()
-            return (data.name,
-                    tpl.oper,
-                    tpl.mode,
-                    ssize2b(tpl.bsize),
-                    int(tpl.th_count) * int(tpl.vm_count))
-        res = []
-
-        for item in sorted(results, key=key_func):
-            test_dinfo = item.disk_perf_info()
-            testnodes_count = len(item.config.nodes)
-
-            iops, _ = test_dinfo.iops.rounded_average_conf()
-
-            if test_dinfo.iops_sys is not None:
-                iops_sys, iops_sys_conf = test_dinfo.iops_sys.rounded_average_conf()
-                _, iops_sys_dev = test_dinfo.iops_sys.rounded_average_dev()
-                iops_sys_per_vm = round_3_digit(iops_sys / testnodes_count)
-                iops_sys = round_3_digit(iops_sys)
-            else:
-                iops_sys = None
-                iops_sys_per_vm = None
-                iops_sys_dev = None
-                iops_sys_conf = None
-
-            bw, bw_conf = test_dinfo.bw.rounded_average_conf()
-            _, bw_dev = test_dinfo.bw.rounded_average_dev()
-            conf_perc = int(round(bw_conf * 100 / bw))
-            dev_perc = int(round(bw_dev * 100 / bw))
-
-            lat_50 = round_3_digit(int(test_dinfo.lat_50))
-            lat_95 = round_3_digit(int(test_dinfo.lat_95))
-            lat_avg = round_3_digit(int(test_dinfo.lat_avg))
-
-            iops_per_vm = round_3_digit(iops / testnodes_count)
-            bw_per_vm = round_3_digit(bw / testnodes_count)
-
-            iops = round_3_digit(iops)
-            bw = round_3_digit(bw)
-
-            summ = "{0.oper}{0.mode} {0.bsize:>4} {0.th_count:>3}th {0.vm_count:>2}vm".format(item.summary_tpl())
-
-            res.append({"name": key_func(item)[0],
-                        "key": key_func(item)[:4],
-                        "summ": summ,
-                        "iops": int(iops),
-                        "bw": int(bw),
-                        "conf": str(conf_perc),
-                        "dev": str(dev_perc),
-                        "iops_per_vm": int(iops_per_vm),
-                        "bw_per_vm": int(bw_per_vm),
-                        "lat_50": lat_50,
-                        "lat_95": lat_95,
-                        "lat_avg": lat_avg,
-
-                        "iops_sys": iops_sys,
-                        "iops_sys_per_vm": iops_sys_per_vm,
-                        "sys_conf": iops_sys_conf,
-                        "sys_dev": iops_sys_dev})
-
-        return res
-
-    Field = collections.namedtuple("Field", ("header", "attr", "allign", "size"))
-    fiels_and_header = [
-        Field("Name",           "name",        "l",  7),
-        Field("Description",    "summ",        "l", 19),
-        Field("IOPS\ncum",      "iops",        "r",  3),
-        # Field("IOPS_sys\ncum",  "iops_sys",    "r",  3),
-        Field("KiBps\ncum",     "bw",          "r",  6),
-        Field("Cnf %\n95%",     "conf",        "r",  3),
-        Field("Dev%",           "dev",         "r",  3),
-        Field("iops\n/vm",      "iops_per_vm", "r",  3),
-        Field("KiBps\n/vm",     "bw_per_vm",   "r",  6),
-        Field("lat ms\nmedian", "lat_50",      "r",  3),
-        Field("lat ms\n95%",    "lat_95",      "r",  3),
-        Field("lat\navg",       "lat_avg",     "r",  3),
-    ]
-
-    fiels_and_header_dct = dict((item.attr, item) for item in fiels_and_header)
-
-    @classmethod
-    def format_for_console(cls, results) -> str:
-        """create a table with io performance report for console"""
-
-        tab = texttable.Texttable(max_width=120)
-        tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
-        tab.set_cols_align([f.allign for f in cls.fiels_and_header])
-        sep = ["-" * f.size for f in cls.fiels_and_header]
-        tab.header([f.header for f in cls.fiels_and_header])
-        prev_k = None
-        for item in cls.prepare_data(results):
-            if prev_k is not None:
-                if prev_k != item["key"]:
-                    tab.add_row(sep)
-
-            prev_k = item["key"]
-            tab.add_row([item[f.attr] for f in cls.fiels_and_header])
-
-        return tab.draw()
-
-    @classmethod
-    def format_diff_for_console(cls, list_of_results: List[Any]) -> str:
-        """create a table with io performance report for console"""
-
-        tab = texttable.Texttable(max_width=200)
-        tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
-
-        header = [
-            cls.fiels_and_header_dct["name"].header,
-            cls.fiels_and_header_dct["summ"].header,
-        ]
-        allign = ["l", "l"]
-
-        header.append("IOPS ~ Cnf% ~ Dev%")
-        allign.extend(["r"] * len(list_of_results))
-        header.extend(
-            "IOPS_{0} %".format(i + 2) for i in range(len(list_of_results[1:]))
-        )
-
-        header.append("BW")
-        allign.extend(["r"] * len(list_of_results))
-        header.extend(
-            "BW_{0} %".format(i + 2) for i in range(len(list_of_results[1:]))
-        )
-
-        header.append("LAT")
-        allign.extend(["r"] * len(list_of_results))
-        header.extend(
-            "LAT_{0}".format(i + 2) for i in range(len(list_of_results[1:]))
-        )
-
-        tab.header(header)
-        sep = ["-" * 3] * len(header)
-        processed_results = map(cls.prepare_data, list_of_results)
-
-        key2results = []
-        for res in processed_results:
-            key2results.append(dict(
-                ((item["name"], item["summ"]), item) for item in res
-            ))
-
-        prev_k = None
-        iops_frmt = "{0[iops]} ~ {0[conf]:>2} ~ {0[dev]:>2}"
-        for item in processed_results[0]:
-            if prev_k is not None:
-                if prev_k != item["key"]:
-                    tab.add_row(sep)
-
-            prev_k = item["key"]
-
-            key = (item['name'], item['summ'])
-            line = list(key)
-            base = key2results[0][key]
-
-            line.append(iops_frmt.format(base))
-
-            for test_results in key2results[1:]:
-                val = test_results.get(key)
-                if val is None:
-                    line.append("-")
-                elif base['iops'] == 0:
-                    line.append("Nan")
-                else:
-                    prc_val = {'dev': val['dev'], 'conf': val['conf']}
-                    prc_val['iops'] = int(100 * val['iops'] / base['iops'])
-                    line.append(iops_frmt.format(prc_val))
-
-            line.append(base['bw'])
-
-            for test_results in key2results[1:]:
-                val = test_results.get(key)
-                if val is None:
-                    line.append("-")
-                elif base['bw'] == 0:
-                    line.append("Nan")
-                else:
-                    line.append(int(100 * val['bw'] / base['bw']))
-
-            for test_results in key2results:
-                val = test_results.get(key)
-                if val is None:
-                    line.append("-")
-                else:
-                    line.append("{0[lat_50]} - {0[lat_95]}".format(val))
-
-            tab.add_row(line)
-
-        tab.set_cols_align(allign)
-        return tab.draw()
diff --git a/wally/suits/io/fio_task_parser.py b/wally/suits/io/fio_task_parser.py
index 1b6ba21..1bdbb15 100644
--- a/wally/suits/io/fio_task_parser.py
+++ b/wally/suits/io/fio_task_parser.py
@@ -7,10 +7,11 @@
 import os.path
 import argparse
 import itertools
-from typing import Optional, Iterator, Union, Dict, Iterable, List, TypeVar, Callable, Tuple
-from collections import OrderedDict, namedtuple
+from typing import Optional, Iterator, Union, Dict, Iterable, List, TypeVar, Callable, Tuple, NamedTuple, Any
+from collections import OrderedDict
 
 
+from ..itest import IterationConfig
 from ...utils import sec_to_str, ssize2b
 
 
@@ -19,15 +20,27 @@
 INCLUDE = 2
 
 
-Var = namedtuple('Var', ('name',))
-CfgLine = namedtuple('CfgLine', ('fname', 'lineno', 'oline',
-                                 'tp', 'name', 'val'))
+Var = NamedTuple('Var', [('name', str)])
+CfgLine = NamedTuple('CfgLine',
+                     [('fname', str),
+                      ('lineno', int),
+                      ('oline', str),
+                      ('tp', int),
+                      ('name', str),
+                      ('val', Any)])
+
+TestSumm = NamedTuple("TestSumm",
+                      [("oper", str),
+                       ("mode", str),
+                       ("bsize", int),
+                       ("iodepth", int),
+                       ("vm_count", int)])
 
 
-class FioJobSection:
-    def __init__(self, name: str):
+class FioJobSection(IterationConfig):
+    def __init__(self, name: str) -> None:
         self.name = name
-        self.vals = OrderedDict()
+        self.vals = OrderedDict()  # type: Dict[str, Any]
 
     def copy(self) -> 'FioJobSection':
         return copy.deepcopy(self)
@@ -40,7 +53,7 @@
     def is_free(self) -> bool:
         return len(list(self.required_vars())) == 0
 
-    def __str__(self):
+    def __str__(self) -> str:
         res = "[{0}]\n".format(self.name)
 
         for name, val in self.vals.items():
@@ -55,13 +68,13 @@
 
 
 class ParseError(ValueError):
-    def __init__(self, msg: str, fname: str, lineno: int, line_cont:Optional[str] =""):
+    def __init__(self, msg: str, fname: str, lineno: int, line_cont:Optional[str] = "") -> None:
         ValueError.__init__(self, msg)
         self.file_name = fname
         self.lineno = lineno
         self.line_cont = line_cont
 
-    def __str__(self):
+    def __str__(self) -> str:
         msg = "In {0}:{1} ({2}) : {3}"
         return msg.format(self.file_name,
                           self.lineno,
@@ -70,10 +83,10 @@
 
 
 def is_name(name: str) -> bool:
-    return re.match("[a-zA-Z_][a-zA-Z_0-9]*", name)
+    return re.match("[a-zA-Z_][a-zA-Z_0-9]*", name) is not None
 
 
-def parse_value(val: str) -> Union[int, str, Dict, Var]:
+def parse_value(val: str) -> Union[int, str, float, List, Var]:
     try:
         return int(val)
     except ValueError:
@@ -88,12 +101,13 @@
         assert val.endswith("%}")
         content = val[2:-2]
         vals = list(i.strip() for i in content.split(','))
-        return map(parse_value, vals)
+        return list(map(parse_value, vals))
 
     if val.startswith('{'):
         assert val.endswith("}")
         assert is_name(val[1:-1])
         return Var(val[1:-1])
+
     return val
 
 
@@ -133,15 +147,15 @@
 def fio_config_parse(lexer_iter: Iterable[CfgLine]) -> Iterator[FioJobSection]:
     in_globals = False
     curr_section = None
-    glob_vals = OrderedDict()
+    glob_vals = OrderedDict()  # type: Dict[str, Any]
     sections_count = 0
 
-    lexed_lines = list(lexer_iter)
+    lexed_lines = list(lexer_iter)  # type: List[CfgLine]
     one_more = True
     includes = {}
 
     while one_more:
-        new_lines = []
+        new_lines = []  # type: List[CfgLine]
         one_more = False
         for line in lexed_lines:
             fname, lineno, oline, tp, name, val = line
@@ -205,7 +219,7 @@
 
 
 def process_cycles(sec: FioJobSection) -> Iterator[FioJobSection]:
-    cycles = OrderedDict()
+    cycles = OrderedDict()  # type: Dict[str, Any]
 
     for name, val in sec.vals.items():
         if isinstance(val, list) and name.upper() != name:
@@ -214,11 +228,11 @@
     if len(cycles) == 0:
         yield sec
     else:
-        # qd should changes faster
-        numjobs = cycles.pop('qd', None)
-        items = cycles.items()
+        # iodepth should changes faster
+        numjobs = cycles.pop('iodepth', None)
+        items = list(cycles.items())
 
-        if len(items) > 0:
+        if items:
             keys, vals = zip(*items)
             keys = list(keys)
             vals = list(vals)
@@ -228,7 +242,7 @@
 
         if numjobs is not None:
             vals.append(numjobs)
-            keys.append('qd')
+            keys.append('iodepth')
 
         for combination in itertools.product(*vals):
             new_sec = sec.copy()
@@ -236,12 +250,12 @@
             yield new_sec
 
 
-FIO_PARAM_VAL = Union[str, Var]
-FIO_PARAMS = Dict[str, FIO_PARAM_VAL]
+FioParamsVal = Union[str, Var]
+FioParams = Dict[str, FioParamsVal]
 
 
-def apply_params(sec: FioJobSection, params: FIO_PARAMS) -> FioJobSection:
-    processed_vals = OrderedDict()
+def apply_params(sec: FioJobSection, params: FioParams) -> FioJobSection:
+    processed_vals = OrderedDict()  # type: Dict[str, Any]
     processed_vals.update(params)
     for name, val in sec.vals.items():
         if name in params:
@@ -329,9 +343,6 @@
         return 'a'
 
 
-TestSumm = namedtuple("TestSumm", ("oper", "mode", "bsize", "iodepth", "vm_count"))
-
-
 def get_test_summary_tuple(sec: FioJobSection, vm_count: int = None) -> TestSumm:
     if isinstance(sec, dict):
         vals = sec
@@ -355,12 +366,12 @@
                     vm_count)
 
 
-def get_test_summary(sec: FioJobSection, vm_count: int = None, noqd: bool = False) -> str:
+def get_test_summary(sec: FioJobSection, vm_count: int = None, noiodepth: bool = False) -> str:
     tpl = get_test_summary_tuple(sec, vm_count)
 
     res = "{0.oper}{0.mode}{0.bsize}".format(tpl)
-    if not noqd:
-        res += "qd{}".format(tpl.qd)
+    if not noiodepth:
+        res += "qd{}".format(tpl.iodepth)
 
     if tpl.vm_count is not None:
         res += "vm{}".format(tpl.vm_count)
@@ -387,7 +398,7 @@
             yield res
 
 
-def fio_cfg_compile(source: str, fname: str, test_params: FIO_PARAMS) -> Iterator[FioJobSection]:
+def fio_cfg_compile(source: str, fname: str, test_params: FioParams) -> Iterator[FioJobSection]:
     it = parse_all_in_1(source, fname)
     it = (apply_params(sec, test_params) for sec in it)
     it = flatmap(process_cycles, it)
diff --git a/wally/suits/io/rpc_plugin.py b/wally/suits/io/rpc_plugin.py
index ca3f0f3..8e2e09f 100644
--- a/wally/suits/io/rpc_plugin.py
+++ b/wally/suits/io/rpc_plugin.py
@@ -1,7 +1,20 @@
+import os
+import time
+import stat
+import random
+import subprocess
+
+
 def rpc_run_fio(cfg):
     fio_cmd_templ = "cd {exec_folder}; {fio_path}fio --output-format=json " + \
                     "--output={out_file} --alloc-size=262144 {job_file}"
 
+    result = {
+        "name": [float],
+        "lat_name": [[float]]
+    }
+
+    return result
     # fnames_before = node.run("ls -1 " + exec_folder, nolog=True)
     #
     # timeout = int(exec_time + max(300, exec_time))
@@ -11,5 +24,66 @@
     # fnames_after = node.run("ls -1 " + exec_folder, nolog=True)
     #
 
-def parse_fio_result(data):
-    pass
+def rpc_check_file_prefilled(path, used_size_mb):
+    used_size = used_size_mb * 1024 ** 2
+    blocks_to_check = 16
+
+    try:
+        fstats = os.stat(path)
+        if stat.S_ISREG(fstats.st_mode) and fstats.st_size < used_size:
+            return True
+    except EnvironmentError:
+        return True
+
+    offsets = [random.randrange(used_size - 1024) for _ in range(blocks_to_check)]
+    offsets.append(used_size - 1024)
+    offsets.append(0)
+
+    with open(path, 'rb') as fd:
+        for offset in offsets:
+            fd.seek(offset)
+            if b"\x00" * 1024 == fd.read(1024):
+                return True
+
+    return False
+
+
+def rpc_prefill_test_files(files, force=False, fio_path='fio'):
+    cmd_templ = "{0} --name=xxx --filename={1} --direct=1" + \
+                " --bs=4m --size={2}m --rw=write"
+
+    ssize = 0
+    ddtime = 0.0
+
+    for fname, curr_sz in files.items():
+        if not force:
+            if not rpc_check_file_prefilled(fname, curr_sz):
+                continue
+
+        cmd = cmd_templ.format(fio_path, fname, curr_sz)
+        ssize += curr_sz
+
+        stime = time.time()
+        subprocess.check_call(cmd)
+        ddtime += time.time() - stime
+
+    if ddtime > 1.0:
+        return int(ssize / ddtime)
+
+    return None
+
+
+def load_fio_log_file(fname):
+    with open(fname) as fd:
+        it = [ln.split(',')[:2] for ln in fd]
+
+    return [(float(off) / 1000,  # convert us to ms
+            float(val.strip()) + 0.5)  # add 0.5 to compemsate average value
+                                       # as fio trimm all values in log to integer
+            for off, val in it]
+
+
+
+
+
+
diff --git a/wally/suits/io/rpc_plugin.pyi b/wally/suits/io/rpc_plugin.pyi
new file mode 100644
index 0000000..1155007
--- /dev/null
+++ b/wally/suits/io/rpc_plugin.pyi
@@ -0,0 +1,8 @@
+from typing import Any, Optional, Dict, List
+
+def rpc_run_fio(cfg: Dict[str, str]) -> Any: ...
+def rpc_check_file_prefilled(path: str, used_size_mb: int) -> bool: ...
+def rpc_prefill_test_files(files: Dict[str, int], force: bool = False, fio_path: str = 'fio') -> Optional[int]: ...
+
+
+def load_fio_log_file(fname: str) -> List[float]: ...
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index 7004b8e..6d1eeee 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -2,38 +2,43 @@
 import time
 import logging
 import os.path
-import functools
-from typing import Dict, Any, List, Tuple
+import datetime
+from typing import Dict, Any, List, Optional, Tuple, cast
 
 from concurrent.futures import ThreadPoolExecutor
 
-from ..utils import Barrier, StopTestError
-from ..statistic import data_property
-from ..inode import INode
+from ..utils import Barrier, StopTestError, sec_to_str
+from ..node_interfaces import IRPCNode
 from ..storage import Storage
+from ..result_classes import RawTestResults
+
+import agent
 
 
 logger = logging.getLogger("wally")
 
 
-class TestConfig:
+__doc__ = "Contains base classes for performance tests"
+
+
+class TestInputConfig:
     """
     this class describe test input configuration
 
-    test_type:str - test type name
-    params:{str:Any} - parameters from yaml file for this test
-    test_uuid:str - UUID to be used to create filenames and Co
-    log_directory:str - local directory to store results
-    nodes:[Node] - node to run tests on
-    remote_dir:str - directory on nodes to be used for local files
+    test_type - test type name
+    params - parameters from yaml file for this test
+    test_uuid - UUID to be used to create file names & Co
+    log_directory - local directory to store results
+    nodes - nodes to run tests on
+    remote_dir - directory on nodes to be used for local files
     """
     def __init__(self,
                  test_type: str,
                  params: Dict[str, Any],
                  run_uuid: str,
-                 nodes: List[INode],
+                 nodes: List[IRPCNode],
                  storage: Storage,
-                 remote_dir: str):
+                 remote_dir: str) -> None:
         self.test_type = test_type
         self.params = params
         self.run_uuid = run_uuid
@@ -42,150 +47,21 @@
         self.remote_dir = remote_dir
 
 
-class TestResults:
-    """
-    this class describe test results
-
-    config:TestConfig - test config object
-    params:dict - parameters from yaml file for this test
-    results:{str:MeasurementMesh} - test results object
-    raw_result:Any - opaque object to store raw results
-    run_interval:(float, float) - test tun time, used for sensors
-    """
-    def __init__(self,
-                 config: TestConfig,
-                 results: Dict[str, Any],
-                 raw_result: Any,
-                 run_interval: Tuple[float, float]):
-        self.config = config
-        self.params = config.params
-        self.results = results
-        self.raw_result = raw_result
-        self.run_interval = run_interval
-
-    def __str__(self):
-        res = "{0}({1}):\n    results:\n".format(
-                    self.__class__.__name__,
-                    self.summary())
-
-        for name, val in self.results.items():
-            res += "        {0}={1}\n".format(name, val)
-
-        res += "    params:\n"
-
-        for name, val in self.params.items():
-            res += "        {0}={1}\n".format(name, val)
-
-        return res
-
-    @abc.abstractmethod
-    def summary(self):
-        pass
-
-    @abc.abstractmethod
-    def get_yamable(self):
-        pass
-
-
-# class MeasurementMatrix:
-#     """
-#     data:[[MeasurementResult]] - VM_COUNT x TH_COUNT matrix of MeasurementResult
-#     """
-#     def __init__(self, data, connections_ids):
-#         self.data = data
-#         self.connections_ids = connections_ids
-#
-#     def per_vm(self):
-#         return self.data
-#
-#     def per_th(self):
-#         return sum(self.data, [])
-
-
-class MeasurementResults:
-    def stat(self):
-        return data_property(self.data)
-
-    def __str__(self):
-        return 'TS([' + ", ".join(map(str, self.data)) + '])'
-
-
-class SimpleVals(MeasurementResults):
-    """
-    data:[float] - list of values
-    """
-    def __init__(self, data):
-        self.data = data
-
-
-class TimeSeriesValue(MeasurementResults):
-    """
-    data:[(float, float, float)] - list of (start_time, lenght, average_value_for_interval)
-    odata: original values
-    """
-    def __init__(self, data: List[Tuple[float, float, float]]):
-        assert len(data) > 0
-        self.odata = data[:]
-        self.data = []
-
-        cstart = 0
-        for nstart, nval in data:
-            self.data.append((cstart, nstart - cstart, nval))
-            cstart = nstart
-
-    @property
-    def values(self) -> List[float]:
-        return [val[2] for val in self.data]
-
-    def average_interval(self) -> float:
-        return float(sum([val[1] for val in self.data])) / len(self.data)
-
-    def skip(self, seconds) -> 'TimeSeriesValue':
-        nres = []
-        for start, ln, val in self.data:
-            nstart = start + ln - seconds
-            if nstart > 0:
-                nres.append([nstart, val])
-        return self.__class__(nres)
-
-    def derived(self, tdelta) -> 'TimeSeriesValue':
-        end = self.data[-1][0] + self.data[-1][1]
-        tdelta = float(tdelta)
-
-        ln = end / tdelta
-
-        if ln - int(ln) > 0:
-            ln += 1
-
-        res = [[tdelta * i, 0.0] for i in range(int(ln))]
-
-        for start, lenght, val in self.data:
-            start_idx = int(start / tdelta)
-            end_idx = int((start + lenght) / tdelta)
-
-            for idx in range(start_idx, end_idx + 1):
-                rstart = tdelta * idx
-                rend = tdelta * (idx + 1)
-
-                intersection_ln = min(rend, start + lenght) - max(start, rstart)
-                if intersection_ln > 0:
-                    try:
-                        res[idx][1] += val * intersection_ln / tdelta
-                    except IndexError:
-                        raise
-
-        return self.__class__(res)
+class IterationConfig:
+    name = None  # type: str
 
 
 class PerfTest:
-    """
-    Very base class for tests
-    config:TestConfig - test configuration
-    stop_requested:bool - stop for test requested
-    """
-    def __init__(self, config):
+    """Base class for all tests"""
+    name = None  # type: str
+    max_retry = 3
+    retry_time = 30
+
+    def __init__(self, config: TestInputConfig) -> None:
         self.config = config
         self.stop_requested = False
+        self.nodes = self.config.nodes  # type: List[IRPCNode]
+        self.sorted_nodes_ids = sorted(node.info.node_id() for node in self.nodes)
 
     def request_stop(self) -> None:
         self.stop_requested = True
@@ -193,13 +69,8 @@
     def join_remote(self, path: str) -> str:
         return os.path.join(self.config.remote_dir, path)
 
-    @classmethod
     @abc.abstractmethod
-    def load(cls, path: str):
-        pass
-
-    @abc.abstractmethod
-    def run(self):
+    def run(self, storage: Storage) -> None:
         pass
 
     @abc.abstractmethod
@@ -207,69 +78,182 @@
         pass
 
 
-class ThreadedTest(PerfTest):
-    """
-    Base class for tests, which spawn separated thread for each node
-    """
+RunTestRes = Tuple[RawTestResults, Tuple[int, int]]
 
-    def run(self) -> List[TestResults]:
-        barrier = Barrier(len(self.config.nodes))
-        th_test_func = functools.partial(self.th_test_func, barrier)
 
-        with ThreadPoolExecutor(len(self.config.nodes)) as pool:
-            return list(pool.map(th_test_func, self.config.nodes))
+class ThreadedTest(PerfTest, metaclass=abc.ABCMeta):
+    """Base class for tests, which spawn separated thread for each node"""
+
+    # max allowed time difference between starts and stops of run of the same test on different test nodes
+    # used_max_diff = max((min_run_time * max_rel_time_diff), max_time_diff)
+    max_time_diff = 5
+    max_rel_time_diff = 0.05
+
+    def __init__(self, config: TestInputConfig) -> None:
+        PerfTest.__init__(self, config)
+        self.iterations_configs = [None]  # type: List[Optional[IterationConfig]]
 
     @abc.abstractmethod
-    def do_test(self, node: INode) -> TestResults:
+    def get_expected_runtime(self, iter_cfg: IterationConfig) -> Optional[int]:
         pass
 
-    def th_test_func(self, barrier: Barrier, node: INode) -> TestResults:
-        test_name = self.__class__.__name__
-        logger.debug("Starting {} test on {}".format(test_name , node))
-        logger.debug("Run test preparation on {}".format(node))
-        self.pre_run(node)
+    def get_not_done_stages(self, storage: Storage) -> Dict[int, IterationConfig]:
+        start_run_id = max(int(name) for _, name in storage.list('result')) + 1
+        not_in_storage = {}  # type: Dict[int, IterationConfig]
+        for run_id, iteration_config in enumerate(self.iterations_configs, start_run_id):
+            info_path = "result/{}/info".format(run_id)
+            if info_path in storage:
+                info = cast(Dict[str, Any], storage[info_path]) # type: Dict[str, Any]
 
-        # wait till all thread became ready
-        barrier.wait()
+                assert isinstance(info, dict), \
+                    "Broken storage at path {}. Expect test info dict, obtain {!r}".format(info_path, info)
 
-        logger.debug("Run test on {}".format(node))
-        try:
-            return self.do_test(node)
-        except Exception as exc:
-            msg = "In test {} for {}".format(test_name, node)
-            logger.exception(msg)
-            raise StopTestError(msg) from exc
+                info = info.copy()
+                del info['begin_time']
+                del info['end_time']
 
-    def pre_run(self, node: INode) -> None:
+                iter_name = "Unnamed" if iteration_config is None else iteration_config.name
+                expected_config = {
+                    'name': self.name,
+                    'iteration_name': iter_name,
+                    'iteration_config': iteration_config,
+                    'params': self.config.params,
+                    'nodes': self.sorted_nodes_ids
+                }
+
+                assert info == expected_config, \
+                    "Test info at path {} is not equal to expected config." + \
+                    "Maybe configuration was changed before test was restarted. " + \
+                    "Current cfg is {!r}, expected cfg is {!r}".format(info_path, info, expected_config)
+
+                logger.info("Test iteration {} found in storage and will be skipped".format(iter_name))
+            else:
+                not_in_storage[run_id] = iteration_config
+        return not_in_storage
+
+    def run(self, storage: Storage) -> None:
+        not_in_storage = self.get_not_done_stages(storage)
+
+        if not not_in_storage:
+            logger.info("All test iteration in storage already. Skip test")
+            return
+
+        logger.debug("Run test {} on nodes {}.".format(self.name, ",".join(self.sorted_nodes_ids)))
+
+        barrier = Barrier(len(self.nodes))
+
+        logger.debug("Run preparation")
+
+        with ThreadPoolExecutor(len(self.nodes)) as pool:
+            list(pool.map(self.config_node, self.nodes))
+
+            # +5% - is a rough estimation for additional operations
+            run_times = [self.get_expected_runtime(iteration_config) for iteration_config in not_in_storage.values()]
+            if None not in run_times:
+                expected_run_time = int(sum(run_times) * 1.05)
+                exec_time_s = sec_to_str(expected_run_time)
+                now_dt = datetime.datetime.now()
+                end_dt = now_dt + datetime.timedelta(0, expected_run_time)
+                logger.info("Entire test should takes aroud: {} and finished at {:%H:%M:%S}"
+                            .format(exec_time_s, end_dt))
+
+            for run_id, iteration_config in sorted(not_in_storage.items()):
+                iter_name = "Unnamed" if iteration_config is None else iteration_config.name
+                logger.info("Run test iteration {} ".format(iter_name))
+
+                results = []  # type: List[RunTestRes]
+                for idx in range(self.max_retry):
+                    barrier.wait()
+                    try:
+                        futures = [pool.submit(self.do_test, node, iteration_config) for node in self.nodes]
+                        results = [fut.result() for fut in futures]
+                    except (EnvironmentError, agent.RPCError) as exc:
+                        if self.max_retry - 1 == idx:
+                            raise StopTestError("Fio failed") from exc
+                        logger.exception("During fio run")
+                    else:
+                        if all(results):
+                            break
+
+                    logger.info("Sleeping %ss and retrying", self.retry_time)
+                    time.sleep(self.retry_time)
+
+                start_times = []  # type: List[int]
+                stop_times = []  # type: List[int]
+
+                for (result, (t_start, t_stop)), node in zip(results, self.config.nodes):
+                    for metrics_name, data in result.items():
+                        path = "result/{}/measurement/{}/{}".format(run_id, node.info.node_id(), metrics_name)
+                        storage[path] = data  # type: ignore
+                    start_times.append(t_start)
+                    stop_times.append(t_stop)
+
+                min_start_time = min(start_times)
+                max_start_time = max(start_times)
+                min_stop_time = min(stop_times)
+                max_stop_time = max(stop_times)
+
+                max_allowed_time_diff = int((min_stop_time - max_start_time) * self.max_rel_time_diff)
+                max_allowed_time_diff = max(max_allowed_time_diff, self.max_time_diff)
+
+                if min_start_time + self.max_time_diff < max_allowed_time_diff:
+                    logger.warning("Too large difference in {}:{} start time - {}. Max recommended difference is {}"
+                                   .format(self.name, iter_name, max_start_time - min_start_time, self.max_time_diff))
+
+                if min_stop_time + self.max_time_diff < max_allowed_time_diff:
+                    logger.warning("Too large difference in {}:{} stop time - {}. Max recommended difference is {}"
+                                   .format(self.name, iter_name, max_start_time - min_start_time, self.max_time_diff))
+
+                test_config = {
+                    'name': self.name,
+                    'iteration_name': iter_name,
+                    'iteration_config': iteration_config,
+                    'params': self.config.params,
+                    'nodes': self.sorted_nodes_ids,
+                    'begin_time': min_start_time,
+                    'end_time': max_stop_time
+                }
+
+                storage["result/{}/info".format(run_id)] = test_config  # type: ignore
+
+    @abc.abstractmethod
+    def config_node(self, node: IRPCNode) -> None:
+        pass
+
+    @abc.abstractmethod
+    def do_test(self, node: IRPCNode, iter_config: IterationConfig) -> RunTestRes:
         pass
 
 
-class TwoScriptTest(ThreadedTest):
-    def __init__(self, *dt, **mp):
+class TwoScriptTest(ThreadedTest, metaclass=abc.ABCMeta):
+    def __init__(self, *dt, **mp) -> None:
         ThreadedTest.__init__(self, *dt, **mp)
-        self.remote_dir = '/tmp'
         self.prerun_script = self.config.params['prerun_script']
         self.run_script = self.config.params['run_script']
-
         self.prerun_tout = self.config.params.get('prerun_tout', 3600)
         self.run_tout = self.config.params.get('run_tout', 3600)
+        self.iterations_configs = [None]
 
-    def get_remote_for_script(self, script: str) -> str:
-        return os.path.join(self.remote_dir, os.path.basename(script))
+    def get_expected_runtime(self, iter_cfg: IterationConfig) -> Optional[int]:
+        return None
 
-    def pre_run(self, node: INode) -> None:
-        copy_paths(node.connection,
-                   {self.run_script: self.get_remote_for_script(self.run_script),
-                    self.prerun_script: self.get_remote_for_script(self.prerun_script)})
+    def config_node(self, node: IRPCNode) -> None:
+        node.copy_file(self.run_script, self.join_remote(self.run_script))
+        node.copy_file(self.prerun_script, self.join_remote(self.prerun_script))
 
-        cmd = self.get_remote_for_script(self.prerun_script)
+        cmd = self.join_remote(self.prerun_script)
         cmd += ' ' + self.config.params.get('prerun_opts', '')
         node.run(cmd, timeout=self.prerun_tout)
 
-    def do_test(self, node: INode) -> TestResults:
-        cmd = self.get_remote_for_script(self.run_script)
+    def do_test(self, node: IRPCNode, iter_config: IterationConfig) -> RunTestRes:
+        cmd = self.join_remote(self.run_script)
         cmd += ' ' + self.config.params.get('run_opts', '')
         t1 = time.time()
-        res = node.run(cmd, timeout=self.run_tout)
+        res = self.parse_results(node.run(cmd, timeout=self.run_tout))
         t2 = time.time()
-        return TestResults(self.config, None, res, (t1, t2))
+        return res, (int(t1), int(t2))
+
+    @abc.abstractmethod
+    def parse_results(self, data: str) -> RawTestResults:
+        pass
+