v2 is comming
diff --git a/wally/suits/io/fio.py b/wally/suits/io/fio.py
index c650069..2a76fc2 100644
--- a/wally/suits/io/fio.py
+++ b/wally/suits/io/fio.py
@@ -15,13 +15,14 @@
 from concurrent.futures import ThreadPoolExecutor
 
 from wally.pretty_yaml import dumps
-from wally.statistic import round_3_digit, data_property
+from wally.statistic import round_3_digit, data_property, average
 from wally.utils import ssize2b, sec_to_str, StopTestError, Barrier, get_os
 from wally.ssh_utils import (save_to_remote, read_from_remote, BGSSHTask, reconnect)
 
 from .fio_task_parser import (execution_time, fio_cfg_compile,
-                              get_test_summary, get_test_sync_mode)
-from ..itest import TimeSeriesValue, PerfTest, TestResults, run_on_node
+                              get_test_summary, get_test_sync_mode, FioJobSection)
+from ..itest import (TimeSeriesValue, PerfTest, TestResults,
+                     run_on_node, TestConfig, MeasurementMatrix)
 
 logger = logging.getLogger("wally")
 
@@ -52,7 +53,12 @@
 def load_fio_log_file(fname):
     with open(fname) as fd:
         it = [ln.split(',')[:2] for ln in fd]
-    vals = [(float(off) / 1000, float(val.strip())) for off, val in it]
+
+    vals = [(float(off) / 1000,  # convert us to ms
+             float(val.strip()) + 0.5)  # add 0.5 to compemsate average value
+                                        # as fio trimm all values in log to integer
+            for off, val in it]
+
     return TimeSeriesValue(vals)
 
 
@@ -63,9 +69,9 @@
     fn = os.path.join(folder, str(run_num) + '_params.yaml')
     params = yaml.load(open(fn).read())
 
-    conn_ids = set()
+    conn_ids_set = set()
+    rr = r"{0}_(?P<conn_id>.*?)_(?P<type>[^_.]*)\.\d+\.log$".format(run_num)
     for fname in os.listdir(folder):
-        rr = r"{0}_(?P<conn_id>.*?)_(?P<type>[^_.]*)\.\d+\.log$".format(run_num)
         rm = re.match(rr, fname)
         if rm is None:
             continue
@@ -77,16 +83,29 @@
         if ftype not in ('iops', 'bw', 'lat'):
             continue
 
-        try:
-            ts = load_fio_log_file(os.path.join(folder, fname))
-            if ftype in res:
-                assert conn_id not in res[ftype]
+        ts = load_fio_log_file(os.path.join(folder, fname))
+        res.setdefault(ftype, {}).setdefault(conn_id, []).append(ts)
 
-            res.setdefault(ftype, {})[conn_id] = ts
-        except AssertionError:
-            pass
+        conn_ids_set.add(conn_id)
 
-        conn_ids.add(conn_id)
+    mm_res = {}
+
+    for key, data in res.items():
+        conn_ids = sorted(conn_ids_set)
+        matr = [data[conn_id] for conn_id in conn_ids]
+
+        mm_res[key] = MeasurementMatrix(matr, conn_ids)
+
+    iops_from_lat_matr = []
+    for node_ts in mm_res['lat'].data:
+        iops_from_lat_matr.append([])
+        for thread_ts in node_ts:
+            ndt = [(start + ln, 1000000. / val)
+                   for (start, ln, val) in thread_ts.data]
+            new_ts = TimeSeriesValue(ndt)
+            iops_from_lat_matr[-1].append(new_ts)
+
+    mm_res['iops_from_lat'] = MeasurementMatrix(iops_from_lat_matr, conn_ids)
 
     raw_res = {}
     for conn_id in conn_ids:
@@ -96,7 +115,11 @@
         fc = "{" + open(fn).read().split('{', 1)[1]
         raw_res[conn_id] = json.loads(fc)
 
-    return cls(params, res, raw_res)
+    fio_task = FioJobSection(params['name'])
+    fio_task.vals.update(params['vals'])
+
+    config = TestConfig('io', params, None, params['nodes'], folder, None)
+    return cls(config, fio_task, mm_res, raw_res, params['intervals'])
 
 
 class Attrmapper(object):
@@ -157,25 +180,12 @@
     return perc_50 / 1000., perc_95 / 1000.
 
 
-def prepare(ramp_time, data, avg_interval):
-    if data is None:
-        return data
-
-    res = {}
-    for key, ts_data in data.items():
-        if ramp_time > 0:
-            ts_data = ts_data.skip(ramp_time)
-
-        res[key] = ts_data.derived(avg_interval)
-    return res
-
-
 class IOTestResult(TestResults):
     """
     Fio run results
     config: TestConfig
     fio_task: FioJobSection
-    ts_results: {str: TimeSeriesValue}
+    ts_results: {str: MeasurementMatrix[TimeSeriesValue]}
     raw_result: ????
     run_interval:(float, float) - test tun time, used for sensors
     """
@@ -184,11 +194,11 @@
         self.name = fio_task.name.split("_")[0]
         self.fio_task = fio_task
 
-        ramp_time = fio_task.vals.get('ramp_time', 0)
+        self.bw = ts_results.get('bw')
+        self.lat = ts_results.get('lat')
+        self.iops = ts_results.get('iops')
+        self.iops_from_lat = ts_results.get('iops_from_lat')
 
-        self.bw = prepare(ramp_time, ts_results.get('bw'), 1.0)
-        self.lat = prepare(ramp_time, ts_results.get('lat'), 1.0)
-        self.iops = prepare(ramp_time, ts_results.get('iops'), 1.0)
         # self.slat = drop_warmup(res.get('clat', None), self.params)
         # self.clat = drop_warmup(res.get('slat', None), self.params)
 
@@ -198,6 +208,26 @@
         self._pinfo = None
         TestResults.__init__(self, config, res, raw_result, run_interval)
 
+    def get_params_from_fio_report(self):
+        nodes = self.bw.connections_ids
+
+        iops = [self.raw_result[node]['jobs'][0]['mixed']['iops'] for node in nodes]
+        total_ios = [self.raw_result[node]['jobs'][0]['mixed']['total_ios'] for node in nodes]
+        runtime = [self.raw_result[node]['jobs'][0]['mixed']['runtime'] / 1000 for node in nodes]
+        flt_iops = [float(ios) / rtime for ios, rtime in zip(total_ios, runtime)]
+
+        bw = [self.raw_result[node]['jobs'][0]['mixed']['bw'] for node in nodes]
+        total_bytes = [self.raw_result[node]['jobs'][0]['mixed']['io_bytes'] for node in nodes]
+        flt_bw = [float(tbytes) / rtime for tbytes, rtime in zip(total_bytes, runtime)]
+
+        lat = [self.raw_result[node]['jobs'][0]['mixed']['lat'] for node in nodes]
+
+        return {'iops': iops,
+                'flt_iops': flt_iops,
+                'bw': bw,
+                'flt_bw': flt_bw,
+                'lat': lat}
+
     def summary(self):
         return get_test_summary(self.fio_task) + "vm" \
                + str(len(self.config.nodes))
@@ -205,8 +235,8 @@
     def get_yamable(self):
         return self.summary()
 
-    @property
-    def disk_perf_info(self):
+    def disk_perf_info(self, avg_interval=5.0):
+
         if self._pinfo is not None:
             return self._pinfo
 
@@ -228,22 +258,82 @@
         for k, v in lat_mks.items():
             lat_mks[k] = float(v) / num_res
 
-        testnodes_count = len(self.fio_raw_res)
+        testnodes_count = len(self.config.nodes)
 
         pinfo = DiskPerfInfo(self.name,
                              self.summary(),
                              self.params,
                              testnodes_count)
 
-        pinfo.raw_bw = [res.vals() for res in self.bw.values()]
-        pinfo.raw_iops = [res.vals() for res in self.iops.values()]
-        pinfo.raw_lat = [res.vals() for res in self.lat.values()]
+        # ramp_time = self.fio_task.vals.get('ramp_time', 0)
 
-        pinfo.bw = data_property(map(sum, zip(*pinfo.raw_bw)))
-        pinfo.iops = data_property(map(sum, zip(*pinfo.raw_iops)))
-        pinfo.lat = data_property(sum(pinfo.raw_lat, []))
+        def prepare(data):
+            if data is None:
+                return data
+
+            res = []
+            for ts_data in data:
+                # if ramp_time > 0:
+                    # ts_data = ts_data.skip(ramp_time)
+
+                if ts_data.average_interval() < avg_interval:
+                    ts_data = ts_data.derived(avg_interval)
+
+                res.append(ts_data.values)
+            return res
+
+        def agg_data(matr):
+            arr = sum(matr, [])
+            min_len = min(map(len, arr))
+            res = []
+            for idx in range(min_len):
+                res.append(sum(dt[idx] for dt in arr))
+            return res
+
+        pinfo.raw_lat = map(prepare, self.lat.per_vm())
+        num_th = sum(map(len, pinfo.raw_lat))
+        avg_lat = [val / num_th for val in agg_data(pinfo.raw_lat)]
+        pinfo.lat = data_property(avg_lat)
         pinfo.lat_50, pinfo.lat_95 = get_lat_perc_50_95(lat_mks)
 
+        pinfo.raw_bw = map(prepare, self.bw.per_vm())
+        pinfo.raw_iops = map(prepare, self.iops.per_vm())
+
+        iops_per_th = sum(sum(pinfo.raw_iops, []), [])
+
+        fparams = self.get_params_from_fio_report()
+        fio_report_bw = sum(fparams['flt_bw'])
+        fio_report_iops = sum(fparams['flt_iops'])
+
+        agg_bw = agg_data(pinfo.raw_bw)
+        agg_iops = agg_data(pinfo.raw_iops)
+
+        log_bw_avg = average(agg_bw)
+        log_iops_avg = average(agg_iops)
+
+        # update values to match average from fio report
+        coef_iops = fio_report_iops / float(log_iops_avg)
+        coef_bw = fio_report_bw / float(log_bw_avg)
+
+        bw_log = data_property([val * coef_bw for val in agg_bw])
+        iops_log = data_property([val * coef_iops for val in agg_iops])
+
+        bw_report = data_property([fio_report_bw])
+        iops_report = data_property([fio_report_iops])
+
+        # When IOPS/BW per thread is too low
+        # data from logs is rounded to match
+        if average(iops_per_th) > 10:
+            pinfo.bw = bw_log
+            pinfo.iops = iops_log
+            pinfo.bw2 = bw_report
+            pinfo.iops2 = iops_report
+        else:
+            pinfo.bw = bw_report
+            pinfo.iops = iops_report
+            pinfo.bw2 = bw_log
+            pinfo.iops2 = iops_log
+
         self._pinfo = pinfo
 
         return pinfo
@@ -280,6 +370,9 @@
         self.err_out_file = self.join_remote("fio_err_out")
         self.exit_code_file = self.join_remote("exit_code")
 
+        self.max_latency = get("max_lat", None)
+        self.min_bw_per_thread = get("max_bw", None)
+
         self.use_sudo = get("use_sudo", True)
         self.test_logging = get("test_logging", False)
 
@@ -365,7 +458,6 @@
             raise OSError("Can't install - " + str(err))
 
     def pre_run(self):
-        prefill = False
         prefill = self.config.options.get('prefill_files', True)
 
         if prefill:
@@ -433,9 +525,21 @@
         barrier = Barrier(len(self.config.nodes))
         results = []
 
+        # set of OperationModeBlockSize str
+        # which should not ne tested anymore, as
+        # they already too slow with previous thread count
+        lat_bw_limit_reached = set()
+
         with ThreadPoolExecutor(len(self.config.nodes)) as pool:
+            self.fio_configs.sort(key=lambda x: int(x.vals['numjobs']))
+
             for pos, fio_cfg in enumerate(self.fio_configs):
-                logger.info("Will run {0} test".format(fio_cfg.name))
+                test_descr = get_test_summary(fio_cfg.vals).split("th")[0]
+                if test_descr in lat_bw_limit_reached:
+                    logger.info("Will skip {0} test due to lat/bw limits".format(fio_cfg.name))
+                    continue
+                else:
+                    logger.info("Will run {0} test".format(fio_cfg.name))
 
                 templ = "Test should takes about {0}." + \
                         " Should finish at {1}," + \
@@ -484,8 +588,18 @@
                 with open(os.path.join(self.config.log_directory, fname), "w") as fd:
                     fd.write(dumps(params))
 
-                res = load_test_results(self.config.log_directory, pos)
+                res = load_test_results(IOTestResult, self.config.log_directory, pos)
                 results.append(res)
+
+                test_res = res.get_params_from_fio_report()
+                if self.max_latency is not None:
+                    if self.max_latency < average(test_res['lat']):
+                        lat_bw_limit_reached.add(test_descr)
+
+                if self.min_bw_per_thread is not None:
+                    if self.min_bw_per_thread > average(test_res['bw']):
+                        lat_bw_limit_reached.add(test_descr)
+
         return results
 
     def do_run(self, node, barrier, fio_cfg, pos, nolog=False):
@@ -630,7 +744,7 @@
         return begin, end
 
     @classmethod
-    def format_for_console(cls, data, dinfo):
+    def format_for_console(cls, results):
         """
         create a table with io performance report
         for console
@@ -650,32 +764,29 @@
 
             return (data.name.rsplit("_", 1)[0],
                     p['rw'],
-                    get_test_sync_mode(data.params),
+                    get_test_sync_mode(data.params['vals']),
                     ssize2b(p['blocksize']),
-                    int(th_count) * data.testnodes_count)
+                    int(th_count) * len(data.config.nodes))
 
         tab = texttable.Texttable(max_width=120)
         tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
-        tab.set_cols_align(["l", "l", "r", "r", "r", "r", "r", "r", "r"])
 
-        items = sorted(dinfo.values(), key=key_func)
-
-        prev_k = None
         header = ["Name", "Description", "iops\ncum", "KiBps\ncum",
                   "Cnf\n95%", "Dev%", "iops\nper vm", "KiBps\nper vm", "lat\nms"]
+        tab.set_cols_align(["l", "l"] + ['r'] * (len(header) - 2))
+        sep = ["-------", "-----------"] + ["---"] * (len(header) - 2)
+        tab.header(header)
 
-        for data in items:
-            curr_k = key_func(data)[:4]
-
+        prev_k = None
+        for item in sorted(results, key=key_func):
+            curr_k = key_func(item)[:4]
             if prev_k is not None:
                 if prev_k != curr_k:
-                    tab.add_row(
-                        ["-------", "-----------", "-----", "------",
-                         "---", "----", "------", "---", "-----"])
+                    tab.add_row(sep)
 
             prev_k = curr_k
 
-            test_dinfo = dinfo[(data.name, data.summary)]
+            test_dinfo = item.disk_perf_info()
 
             iops, _ = test_dinfo.iops.rounded_average_conf()
 
@@ -687,18 +798,23 @@
             lat, _ = test_dinfo.lat.rounded_average_conf()
             lat = round_3_digit(int(lat) // 1000)
 
-            iops_per_vm = round_3_digit(iops / data.testnodes_count)
-            bw_per_vm = round_3_digit(bw / data.testnodes_count)
+            testnodes_count = len(item.config.nodes)
+            iops_per_vm = round_3_digit(iops / testnodes_count)
+            bw_per_vm = round_3_digit(bw / testnodes_count)
 
             iops = round_3_digit(iops)
+            # iops_from_lat = round_3_digit(iops_from_lat)
             bw = round_3_digit(bw)
 
-            params = (data.name.rsplit('_', 1)[0],
-                      data.summary, int(iops), int(bw), str(conf_perc),
+            params = (item.name.rsplit('_', 1)[0],
+                      item.summary(),
+                      int(iops),
+                      int(bw),
+                      str(conf_perc),
                       str(dev_perc),
-                      int(iops_per_vm), int(bw_per_vm), lat)
+                      int(iops_per_vm),
+                      int(bw_per_vm),
+                      lat)
             tab.add_row(params)
 
-        tab.header(header)
-
         return tab.draw()