remove fuel support, many bugfixes, add sudo support for some cmd, add default ssh user
diff --git a/wally/suits/io/defaults_qd.cfg b/wally/suits/io/defaults_qd.cfg
index c3dee19..ee10055 100644
--- a/wally/suits/io/defaults_qd.cfg
+++ b/wally/suits/io/defaults_qd.cfg
@@ -21,5 +21,6 @@
 write_bw_log=fio_bw_log
 log_avg_msec=1000
 write_hist_log=fio_lat_hist_log
+log_hist_coarseness=0
 log_hist_msec=1000
 log_unix_epoch=1
diff --git a/wally/suits/io/fio.py b/wally/suits/io/fio.py
index 374e5a4..693a547 100644
--- a/wally/suits/io/fio.py
+++ b/wally/suits/io/fio.py
@@ -1,6 +1,6 @@
 import os.path
 import logging
-from typing import cast, Any, List, Union, Tuple, Optional
+from typing import cast, Any, List, Union
 
 import numpy
 
@@ -14,7 +14,7 @@
 from ..job import JobConfig
 from .fio_task_parser import execution_time, fio_cfg_compile, FioJobConfig, FioParams, get_log_files
 from . import rpc_plugin
-from .fio_hist import get_lat_vals, expected_lat_bins
+from .fio_hist import get_lat_vals
 
 
 logger = logging.getLogger("wally")
@@ -209,7 +209,7 @@
                 raise StopTestError()
 
             # TODO: fix units, need to get array type from stream
-
+            open("/tmp/tt", 'wb').write(raw_result)
             parsed = []  # type: List[Union[List[int], int]]
             times = []
 
@@ -223,11 +223,11 @@
                         if name == 'lat':
                             vals = [int(i.strip()) for i in rest]
 
-                            if len(vals) != expected_lat_bins:
-                                msg = "Expect {} bins in latency histogram, but found {} at time {}" \
-                                             .format(expected_lat_bins, len(vals), time_ms_s)
-                                logger.error(msg)
-                                raise StopTestError(msg)
+                            # if len(vals) != expected_lat_bins:
+                            #     msg = f"Expect {expected_lat_bins} bins in latency histogram, " + \
+                            #           f"but found {len(vals)} at time {time_ms_s}"
+                            #     logger.error(msg)
+                            #     raise StopTestError(msg)
 
                             parsed.append(vals)
                         else:
@@ -238,7 +238,7 @@
 
             assert not self.suite.keep_raw_files, "keep_raw_files is not supported"
 
-            histo_bins = None if name != 'lat' else numpy.array(get_lat_vals())
+            histo_bins = None if name != 'lat' else numpy.array(get_lat_vals(len(parsed[0])))
             ts = TimeSeries(data=numpy.array(parsed, dtype='uint64'),
                             units=units,
                             times=numpy.array(times, dtype='uint64'),
diff --git a/wally/suits/io/fio_hist.py b/wally/suits/io/fio_hist.py
index a2ded70..fc32d0d 100644
--- a/wally/suits/io/fio_hist.py
+++ b/wally/suits/io/fio_hist.py
@@ -1,9 +1,6 @@
 from typing import List
 
 
-expected_lat_bins = 1216
-
-
 #----------------------------  FIO HIST LOG PARSE CODE -----------------------------------------------------------------
 
 # Copy-paste from fio/tools/hist/fiologparser_hist.py.
@@ -52,6 +49,12 @@
     return lower + (upper - lower) * edge
 
 
-def get_lat_vals(columns: int = expected_lat_bins, coarseness: int = 0) -> List[float]:
-    return [plat_idx_to_val_coarse(val, coarseness) for val in range(columns)]
+def get_lat_vals(columns: int, coarseness: int = 0) -> List[float]:
+    # convert ns to ms
+    if columns == 1216:
+        coef = 1
+    elif columns == 1856:
+        coef = 1000
+
+    return [plat_idx_to_val_coarse(val, coarseness) / coef for val in range(columns)]
 
diff --git a/wally/suits/io/fio_job.py b/wally/suits/io/fio_job.py
index 6676895..9dffb49 100644
--- a/wally/suits/io/fio_job.py
+++ b/wally/suits/io/fio_job.py
@@ -1,13 +1,10 @@
 import copy
 from collections import OrderedDict
-from typing import Optional, Iterator, Union, Dict, Tuple, NamedTuple, Any, cast
+from typing import Optional, Iterator, Union, Dict, Tuple, Any, cast
 
 from cephlib.units import ssize2b, b2ssize
 
-from ..job import JobConfig, JobParams
-
-
-Var = NamedTuple('Var', [('name', str)])
+from ..job import JobConfig, JobParams, Var
 
 
 def is_fio_opt_true(vl: Union[str, int]) -> bool:
@@ -40,7 +37,7 @@
     @property
     def summary(self) -> str:
         """Test short summary, used mostly for file names and short image description"""
-        res = "{0[oper_short]}{0[sync_mode]}{0[bsize]}".format(self)
+        res = f"{self['oper_short']}{self['sync_mode']}{self['bsize']}"
         if self['qd'] is not None:
             res += "_qd" + str(self['qd'])
         if self['thcount'] not in (1, None):
@@ -52,13 +49,13 @@
     @property
     def long_summary(self) -> str:
         """Readable long summary for management and deployment engineers"""
-        res = "{0[oper]}, {0.sync_mode_long}, block size {1}B".format(self, b2ssize(self['bsize'] * 1024))
+        res = f"{self['oper']}, {self.sync_mode_long}, block size {b2ssize(self['bsize'] * 1024)}B"
         if self['qd'] is not None:
             res += ", QD = " + str(self['qd'])
         if self['thcount'] not in (1, None):
-            res += ", threads={0[thcount]}".format(self)
+            res += f", threads={self['thcount']}"
         if self['write_perc'] is not None:
-            res += ", write_perc={0[write_perc]}%".format(self)
+            res += f", fwrite_perc={self['write_perc']}%"
         return res
 
     def copy(self, **kwargs: Dict[str, Any]) -> 'FioJobParams':
@@ -89,24 +86,24 @@
     def __init__(self, name: str, idx: int) -> None:
         JobConfig.__init__(self, idx)
         self.name = name
-        self._sync_mode = None  # type: Optional[str]
-        self._params = None  # type: Optional[Dict[str, Any]]
+        self._sync_mode: Optional[str] = None
+        self._params: Optional[Dict[str, Any]] = None
 
     # ------------- BASIC PROPERTIES -----------------------------------------------------------------------------------
 
     @property
     def write_perc(self) -> Optional[int]:
         try:
-            return int(self.vals["rwmixwrite"])
+            return int(self.vals["rwmixwrite"])  # type: ignore
         except (KeyError, TypeError):
             try:
-                return 100 - int(self.vals["rwmixread"])
+                return 100 - int(self.vals["rwmixread"])  # type: ignore
             except (KeyError, TypeError):
                 return None
 
     @property
     def qd(self) -> int:
-        return int(self.vals.get('iodepth', '1'))
+        return int(self.vals.get('iodepth', '1'))  # type: ignore
 
     @property
     def bsize(self) -> int:
@@ -117,7 +114,7 @@
     @property
     def oper(self) -> str:
         vl = self.vals['rw']
-        return vl if ':' not in vl else vl.split(":")[0]
+        return vl if ':' not in vl else vl.split(":")[0]  # type: ignore
 
     @property
     def op_type_short(self) -> str:
@@ -125,14 +122,14 @@
 
     @property
     def thcount(self) -> int:
-        return int(self.vals.get('numjobs', 1))
+        return int(self.vals.get('numjobs', 1))  # type: ignore
 
     @property
     def sync_mode(self) -> str:
         if self._sync_mode is None:
-            direct = is_fio_opt_true(self.vals.get('direct', '0')) or \
-                     not is_fio_opt_true(self.vals.get('buffered', '0'))
-            sync = is_fio_opt_true(self.vals.get('sync', '0'))
+            direct = is_fio_opt_true(self.vals.get('direct', '0'))  # type: ignore
+            direct = direct or not is_fio_opt_true(self.vals.get('buffered', '0'))  # type: ignore
+            sync = is_fio_opt_true(self.vals.get('sync', '0'))  # type: ignore
             self._sync_mode = self.ds2mode[(sync, direct)]
         return cast(str, self._sync_mode)
 
diff --git a/wally/suits/io/fio_task_parser.py b/wally/suits/io/fio_task_parser.py
index 5b91885..a9e13dc 100644
--- a/wally/suits/io/fio_task_parser.py
+++ b/wally/suits/io/fio_task_parser.py
@@ -1,9 +1,8 @@
 #!/usr/bin/env python3
 
 import re
-import os
 import sys
-import os.path
+import pathlib
 import argparse
 import itertools
 from typing import Optional, Iterator, Union, Dict, Iterable, List, Tuple, NamedTuple, Any
@@ -104,34 +103,30 @@
 def fio_config_parse(lexer_iter: Iterable[CfgLine]) -> Iterator[FioJobConfig]:
     in_globals = False
     curr_section = None
-    glob_vals = OrderedDict()  # type: Dict[str, Any]
+    glob_vals: Dict[str, Any] = OrderedDict()
     sections_count = 0
 
-    lexed_lines = list(lexer_iter)  # type: List[CfgLine]
+    lexed_lines: List[CfgLine] = list(lexer_iter)
     one_more = True
-    includes = {}
+    includes: Dict[str, Tuple[str, int]] = {}
 
     while one_more:
-        new_lines = []  # type: List[CfgLine]
+        new_lines: List[CfgLine] = []
         one_more = False
         for line in lexed_lines:
             fname, lineno, oline, tp, name, val = line
 
             if INCLUDE == tp:
-                if not os.path.exists(fname):
-                    dirname = '.'
-                else:
-                    dirname = os.path.dirname(fname)
-
-                new_fname = os.path.join(dirname, name)
-                includes[new_fname] = (fname, lineno)
+                fobj = pathlib.Path(fname)
+                new_fname = (fobj.parent / name) if fobj.exists() else pathlib.Path(name)
+                includes[str(new_fname)] = (fname, lineno)
 
                 try:
-                    cont = open(new_fname).read()
+                    cont = new_fname.open().read()
                 except IOError as err:
-                    raise ParseError("Error while including file {}: {}".format(new_fname, err), fname, lineno, oline)
+                    raise ParseError(f"Error while including file {new_fname}: {err}", fname, lineno, oline)
 
-                new_lines.extend(fio_config_lexer(cont, new_fname))
+                new_lines.extend(fio_config_lexer(cont, str(new_fname)))
                 one_more = True
             else:
                 new_lines.append(line)
@@ -161,7 +156,7 @@
             if in_globals:
                 glob_vals[name] = val
             elif name == name.upper():
-                raise ParseError("Param {!r} not in [global] section".format(name), fname, lineno, oline)
+                raise ParseError(f"Param {name!r} not in [global] section", fname, lineno, oline)
             elif curr_section is None:
                     raise ParseError("Data outside section", fname, lineno, oline)
             else:
@@ -172,7 +167,7 @@
 
 
 def process_cycles(sec: FioJobConfig) -> Iterator[FioJobConfig]:
-    cycles = OrderedDict()  # type: Dict[str, Any]
+    cycles: Dict[str, Any] = OrderedDict()
 
     for name, val in sec.vals.items():
         if isinstance(val, list) and name.upper() != name:
@@ -203,12 +198,12 @@
             yield new_sec
 
 
-FioParamsVal = Union[str, Var]
+FioParamsVal = Union[str, Var, int]
 FioParams = Dict[str, FioParamsVal]
 
 
 def apply_params(sec: FioJobConfig, params: FioParams) -> FioJobConfig:
-    processed_vals = OrderedDict()  # type: Dict[str, Any]
+    processed_vals: Dict[str, Any] = OrderedDict()
     processed_vals.update(params)
 
     for name, val in sec.vals.items():
@@ -251,8 +246,7 @@
     sec.vals['unified_rw_reporting'] = '1'
 
     if isinstance(sec.vals['size'], Var):
-        raise ValueError("Variable {0} isn't provided".format(
-            sec.vals['size'].name))
+        raise ValueError(f"Variable {sec.vals['size'].name} isn't provided")
 
     sz = ssize2b(sec.vals['size'])
     offset = sz * ((MAGIC_OFFSET * counter[0]) % 1.0)
@@ -266,7 +260,7 @@
 
     for vl in sec.vals.values():
         if isinstance(vl, Var):
-            raise ValueError("Variable {0} isn't provided".format(vl.name))
+            raise ValueError(f"Variable {vl.name} isn't provided")
 
     params = sec.vals.copy()
     params['UNIQ'] = 'UN{0}'.format(counter[0])
@@ -282,13 +276,11 @@
     return sec.vals.get('ramp_time', 0) + sec.vals.get('runtime', 0)
 
 
-def parse_all_in_1(source:str, fname: str = None) -> Iterator[FioJobConfig]:
+def parse_all_in_1(source:str, fname: str) -> Iterator[FioJobConfig]:
     return fio_config_parse(fio_config_lexer(source, fname))
 
 
 def get_log_files(sec: FioJobConfig, iops: bool = False) -> Iterator[Tuple[str, str, str]]:
-    res = []  # type: List[Tuple[str, str, str]]
-
     keys = [('write_bw_log', 'bw', 'KiBps'),
             ('write_hist_log', 'lat', 'us')]
     if iops:
@@ -304,8 +296,8 @@
     test_params = test_params.copy()
 
     if 'RAMPTIME' not in test_params and 'RUNTIME' in test_params:
-        ramp = int(int(test_params['RUNTIME']) * 0.05)
-        test_params['RAMPTIME'] = min(30, max(5, ramp))
+        ramp = int(int(test_params['RUNTIME']) * 0.05)  # type: ignore
+        test_params['RAMPTIME'] = min(30, max(5, ramp))  # type: ignore
 
     it = parse_all_in_1(source, fname)
     it = (apply_params(sec, test_params) for sec in it)
diff --git a/wally/suits/io/rrd_raw.cfg b/wally/suits/io/rrd_raw.cfg
index 2b0fc74..e7ca82d 100644
--- a/wally/suits/io/rrd_raw.cfg
+++ b/wally/suits/io/rrd_raw.cfg
@@ -1,6 +1,6 @@
 [test]
-blocksize=4k
-rw=randread
+blocksize=4m
+rw=write
 iodepth=1
 ramp_time=0
 runtime=120
@@ -17,5 +17,8 @@
 wait_for_previous=1
 per_job_logs=0
 randrepeat=0
-filename=/dev/sdb
-size=100G
\ No newline at end of file
+filename=/media/data/test.db
+size=50G
+;verify_pattern=0x00
+buffer_compress_percentage=99
+write_bw_log=/tmp/bw.non-compress.log
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index 26fd252..111e6bb 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -26,11 +26,12 @@
     retry_time = 30
     job_config_cls = None  # type: type
 
-    def __init__(self, storage: IWallyStorage, suite: SuiteConfig, on_idle: Callable[[], None] = None) -> None:
+    def __init__(self, storage: IWallyStorage, suite: SuiteConfig,
+                 on_tests_boundry: Callable[[bool], None] = None) -> None:
         self.suite = suite
         self.stop_requested = False
         self.sorted_nodes_ids = sorted(node.node_id for node in self.suite.nodes)
-        self.on_idle = on_idle
+        self.on_tests_boundry = on_tests_boundry
         self.storage = storage
 
     def request_stop(self) -> None:
@@ -55,11 +56,11 @@
     # used_max_diff = max((min_run_time * max_rel_time_diff), max_time_diff)
     max_time_diff = 5
     max_rel_time_diff = 0.05
-    load_profile_name = None  # type: str
+    load_profile_name: str = None  # type: ignore
 
     def __init__(self, *args, **kwargs) -> None:
         PerfTest.__init__(self, *args, **kwargs)
-        self.job_configs = None  # type: List[JobConfig]
+        self.job_configs: List[JobConfig] = None  # type: ignore
 
     @abc.abstractmethod
     def get_expected_runtime(self, iter_cfg: JobConfig) -> Optional[int]:
@@ -107,13 +108,12 @@
 
             if None not in run_times:
                 # +10s - is a rough estimation for additional operations per iteration
-                expected_run_time = int(sum(run_times) + 10 * len(not_in_storage))
-
+                expected_run_time: int = int(sum(run_times) + 10 * len(not_in_storage))  # type: ignore
                 exec_time_s, end_dt_s = get_time_interval_printable_info(expected_run_time)
                 logger.info("Entire test should takes around %s and finish at %s", exec_time_s, end_dt_s)
 
             for job in not_in_storage:
-                results = []  # type: List[TimeSeries]
+                results: List[TimeSeries] = []
                 for retry_idx in range(self.max_retry):
                     logger.info("Preparing job %s", job.params.summary)
 
@@ -121,8 +121,14 @@
                     wait([pool.submit(self.prepare_iteration, node, job) for node in self.suite.nodes])
 
                     expected_job_time = self.get_expected_runtime(job)
-                    exec_time_s, end_dt_s = get_time_interval_printable_info(expected_job_time)
-                    logger.info("Job should takes around %s and finish at %s", exec_time_s, end_dt_s)
+                    if expected_job_time is None:
+                        logger.info("Job execution time is unknown")
+                    else:
+                        exec_time_s, end_dt_s = get_time_interval_printable_info(expected_job_time)
+                        logger.info("Job should takes around %s and finish at %s", exec_time_s, end_dt_s)
+
+                    if self.on_tests_boundry is not None:
+                        self.on_tests_boundry(True)
 
                     jfutures = [pool.submit(self.run_iteration, node, job) for node in self.suite.nodes]
                     failed = False
@@ -132,6 +138,9 @@
                         except EnvironmentError:
                             failed = True
 
+                    if self.on_tests_boundry is not None:
+                        self.on_tests_boundry(False)
+
                     if not failed:
                         break
 
@@ -145,8 +154,8 @@
                     results = []
 
                 # per node jobs start and stop times
-                start_times = []  # type: List[int]
-                stop_times = []  # type: List[int]
+                start_times: List[int] = []
+                stop_times: List[int] = []
 
                 for ts in results:
                     self.storage.put_ts(ts)
@@ -180,8 +189,6 @@
                 self.storage.put_job(self.suite, job)
                 self.storage.sync()
 
-                if self.on_idle is not None:
-                    self.on_idle()
 
     @abc.abstractmethod
     def config_node(self, node: IRPCNode) -> None:
diff --git a/wally/suits/job.py b/wally/suits/job.py
index 8ef1093..d336807 100644
--- a/wally/suits/job.py
+++ b/wally/suits/job.py
@@ -1,10 +1,13 @@
 import abc
-from typing import Dict, Any, Tuple, cast, Union
+from typing import Dict, Any, Tuple, cast, Union, NamedTuple
 from collections import OrderedDict
 
 from cephlib.istorage import Storable
 
 
+Var = NamedTuple('Var', [('name', str)])
+
+
 class JobParams(metaclass=abc.ABCMeta):
     """Class contains all job parameters, which significantly affects job results.
     Like block size or operation type, but not file name or file size.
@@ -41,12 +44,12 @@
 
     def __eq__(self, o: object) -> bool:
         if not isinstance(o, self.__class__):
-            raise TypeError("Can't compare {!r} to {!r}".format(self.__class__.__qualname__, type(o).__qualname__))
+            raise TypeError(f"Can't compare {self.__class__.__qualname__!r} to {type(o).__qualname__!r}")
         return sorted(self.params.items()) == sorted(cast(JobParams, o).params.items())
 
     def __lt__(self, o: object) -> bool:
         if not isinstance(o, self.__class__):
-            raise TypeError("Can't compare {!r} to {!r}".format(self.__class__.__qualname__, type(o).__qualname__))
+            raise TypeError(f"Can't compare {self.__class__.__qualname__!r} to {type(o).__qualname__!r}")
         return self.char_tpl < cast(JobParams, o).char_tpl
 
     @property
@@ -63,11 +66,10 @@
         self.idx = idx
 
         # time interval, in seconds, when test was running on all nodes
-        self.reliable_info_range = None  # type: Tuple[int, int]
-
+        self.reliable_info_range: Tuple[int, int] = None  # type: ignore
 
         # all job parameters, both from suite file and config file
-        self.vals = OrderedDict()  # type: Dict[str, Any]
+        self.vals: Dict[str, Any] = OrderedDict()
 
     @property
     def reliable_info_range_s(self) -> Tuple[int, int]:
@@ -76,7 +78,7 @@
     @property
     def storage_id(self) -> str:
         """unique string, used as key in storage"""
-        return "{}_{}".format(self.summary, self.idx)
+        return f"{self.summary}_{self.idx}"
 
     @property
     @abc.abstractmethod