a lot of fixes
diff --git a/wally/report.py b/wally/report.py
index aa7a4b0..260c031 100644
--- a/wally/report.py
+++ b/wally/report.py
@@ -4,7 +4,7 @@
 
 import wally
 from wally import charts
-from wally.utils import parse_creds
+from wally.utils import parse_creds, ssize_to_b
 from wally.suits.io.results_loader import process_disk_info
 from wally.meta_info import total_lab_info, collect_lab_data
 
@@ -12,10 +12,19 @@
 logger = logging.getLogger("wally.report")
 
 
-def render_html(dest, info, lab_description):
+def render_hdd_html(dest, info, lab_description):
     very_root_dir = os.path.dirname(os.path.dirname(wally.__file__))
     templ_dir = os.path.join(very_root_dir, 'report_templates')
-    templ_file = os.path.join(templ_dir, "report.html")
+    templ_file = os.path.join(templ_dir, "report_hdd.html")
+    templ = open(templ_file, 'r').read()
+    report = templ.format(lab_info=lab_description, **info.__dict__)
+    open(dest, 'w').write(report)
+
+
+def render_ceph_html(dest, info, lab_description):
+    very_root_dir = os.path.dirname(os.path.dirname(wally.__file__))
+    templ_dir = os.path.join(very_root_dir, 'report_templates')
+    templ_file = os.path.join(templ_dir, "report_ceph.html")
     templ = open(templ_file, 'r').read()
     report = templ.format(lab_info=lab_description, **info.__dict__)
     open(dest, 'w').write(report)
@@ -43,31 +52,58 @@
                                     lines=[
                                         (latv, "msec", "rr", "lat"),
                                         (iops_or_bw_per_vm, None, None,
-                                            "IOPS per thread")
+                                         legend[0] + " per thread")
                                     ])
     return str(ch)
 
 
-def make_plots(processed_results, path):
-    name_filters = [
-        ('hdd_test_rrd4k', 'rand_read_4k', 'Random read 4k sync IOPS'),
+def make_hdd_plots(processed_results, path):
+    plots = [
+        ('hdd_test_rrd4k', 'rand_read_4k', 'Random read 4k direct IOPS'),
         ('hdd_test_rws4k', 'rand_write_4k', 'Random write 4k sync IOPS')
     ]
+    make_plots(processed_results, path, plots)
 
-    for name_pref, fname, desc in name_filters:
+
+def make_ceph_plots(processed_results, path):
+    plots = [
+        ('ceph_test_rrd4k', 'rand_read_4k', 'Random read 4k direct IOPS'),
+        ('ceph_test_rws4k', 'rand_write_4k', 'Random write 4k sync IOPS'),
+        ('ceph_test_rrd16m', 'rand_read_16m', 'Random read 16m direct MiBps'),
+        ('ceph_test_swd1m', 'seq_write_1m',
+            'Sequential write 1m direct MiBps'),
+    ]
+    make_plots(processed_results, path, plots)
+
+
+def make_plots(processed_results, path, plots):
+    for name_pref, fname, desc in plots:
         chart_data = []
+
         for res in processed_results.values():
             if res.name.startswith(name_pref):
                 chart_data.append(res)
 
+        if len(chart_data) == 0:
+            raise ValueError("Can't found any date for " + name_pref)
+
+        use_bw = ssize_to_b(chart_data[0].raw['blocksize']) > 16 * 1024
+
         chart_data.sort(key=lambda x: x.raw['concurence'])
 
         lat = [x.lat for x in chart_data]
         concurence = [x.raw['concurence'] for x in chart_data]
-        iops = [x.iops for x in chart_data]
-        iops_dev = [x.iops * x.dev for x in chart_data]
 
-        io_chart(desc, concurence, lat, iops, iops_dev, 'bw', fname)
+        if use_bw:
+            data = [x.bw for x in chart_data]
+            data_dev = [x.bw * x.dev for x in chart_data]
+            name = "BW"
+        else:
+            data = [x.iops for x in chart_data]
+            data_dev = [x.iops * x.dev for x in chart_data]
+            name = "IOPS"
+
+        io_chart(desc, concurence, lat, data, data_dev, name, fname)
 
 
 class DiskInfo(object):
@@ -104,6 +140,11 @@
                 di.bw_write_max = max(di.bw_write_max, res.bw)
             elif res.raw['rw'] == 'read':
                 di.bw_read_max = max(di.bw_read_max, res.bw)
+        elif res.raw['sync_mode'] == 'd' and res.raw['blocksize'] == '16m':
+            if res.raw['rw'] == 'write' or res.raw['rw'] == 'randwrite':
+                di.bw_write_max = max(di.bw_write_max, res.bw)
+            elif res.raw['rw'] == 'read' or res.raw['rw'] == 'randread':
+                di.bw_read_max = max(di.bw_read_max, res.bw)
 
     di.bw_write_max /= 1000
     di.bw_read_max /= 1000
@@ -161,7 +202,14 @@
 def make_hdd_report(processed_results, path, lab_info):
     make_plots(processed_results, path)
     di = get_disk_info(processed_results)
-    render_html(path, di, lab_info)
+    render_hdd_html(path, di, lab_info)
+
+
+@report('Ceph', 'ceph_test')
+def make_ceph_report(processed_results, path, lab_info):
+    make_ceph_plots(processed_results, path)
+    di = get_disk_info(processed_results)
+    render_ceph_html(path, di, lab_info)
 
 
 def make_io_report(results, path, lab_url=None, creds=None):
@@ -183,20 +231,19 @@
     try:
         processed_results = process_disk_info(results)
         res_fields = sorted(processed_results.keys())
-
         for fields, name, func in report_funcs:
             for field in fields:
                 pos = bisect.bisect_left(res_fields, field)
 
                 if pos == len(res_fields):
-                    continue
+                    break
 
                 if not res_fields[pos + 1].startswith(field):
                     break
             else:
                 hpath = path.format(name)
+                logger.debug("Generatins report " + name + " into " + hpath)
                 func(processed_results, hpath, lab_info)
-                logger.debug(name + " report generated into " + hpath)
                 break
         else:
             logger.warning("No report generator found for this load")
diff --git a/wally/run_test.py b/wally/run_test.py
index 9cea103..fbe676f 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -17,11 +17,13 @@
 from concurrent.futures import ThreadPoolExecutor
 
 from wally import pretty_yaml
+from wally.timeseries import SensorDatastore
 from wally.discover import discover, Node, undiscover
 from wally import utils, report, ssh_utils, start_vms
 from wally.suits.itest import IOPerfTest, PgBenchTest
+from wally.sensors_utils import deploy_sensors_stage
 from wally.config import cfg_dict, load_config, setup_loggers
-from wally.sensors_utils import deploy_sensors_stage, SensorDatastore
+
 
 try:
     from wally import webui
@@ -197,7 +199,9 @@
         #     logger.warning("Some test threads still running")
 
         gather_results(res_q, results)
-        yield name, test.merge_results(results)
+        result = test.merge_results(results)
+        result['__test_meta__'] = {'testnodes_count': len(test_nodes)}
+        yield name, result
 
 
 def log_nodes_statistic(_, ctx):
@@ -523,6 +527,8 @@
                         default=False)
     parser.add_argument("-r", '--no-html-report', action='store_true',
                         help="Skip html report", default=False)
+    parser.add_argument("--params", nargs="*", metavar="testname.paramname",
+                        help="Test params", default=[])
     parser.add_argument("config_file")
 
     return parser.parse_args(argv[1:])
diff --git a/wally/sensors_utils.py b/wally/sensors_utils.py
index e5b4347..1e9e898 100644
--- a/wally/sensors_utils.py
+++ b/wally/sensors_utils.py
@@ -1,5 +1,4 @@
 import time
-import array
 import Queue
 import logging
 import threading
@@ -11,92 +10,14 @@
                                SensorConfig,
                                stop_and_remove_sensors)
 
-
 logger = logging.getLogger("wally.sensors")
 DEFAULT_RECEIVER_URL = "udp://{ip}:5699"
 
 
-class SensorDatastore(object):
-    def __init__(self, stime=None):
-        self.lock = threading.Lock()
-        self.stime = stime
-
-        self.min_size = 60 * 60
-        self.max_size = 60 * 61
-
-        self.data = {
-            'testnodes:io': array.array("B"),
-            'testnodes:cpu': array.array("B"),
-        }
-
-    def get_values(self, name, start, end):
-        assert end >= start
-        if end == start:
-            return []
-
-        with self.lock:
-            curr_arr = self.data[name]
-            if self.stime is None:
-                return []
-
-            sidx = start - self.stime
-            eidx = end - self.stime
-
-            if sidx < 0 and eidx < 0:
-                return [0] * (end - start)
-            elif sidx < 0:
-                return [0] * (-sidx) + curr_arr[:eidx]
-            return curr_arr[sidx:eidx]
-
-    def set_values(self, start_time, vals):
-        with self.lock:
-            return self.add_values_l(start_time, vals)
-
-    def set_values_l(self, start_time, vals):
-        max_cut = 0
-        for name, values in vals.items():
-            curr_arr = self.data.setdefault(name, array.array("H"))
-
-            if self.stime is None:
-                self.stime = start_time
-
-            curr_end_time = len(curr_arr) + self.stime
-
-            if curr_end_time < start_time:
-                curr_arr.extend([0] * (start_time - curr_end_time))
-                curr_arr.extend(values)
-            elif curr_end_time > start_time:
-                logger.warning("Duplicated sensors data")
-                sindex = len(curr_arr) + (start_time - curr_end_time)
-
-                if sindex < 0:
-                    values = values[-sindex:]
-                    sindex = 0
-                    logger.warning("Some data with timestamp before"
-                                   " beginning of measurememts. Skip them")
-
-                curr_arr[sindex:sindex + len(values)] = values
-            else:
-                curr_arr.extend(values)
-
-            if len(curr_arr) > self.max_size:
-                max_cut = max(len(curr_arr) - self.min_size, max_cut)
-
-        if max_cut > 0:
-            self.start_time += max_cut
-            for val in vals.values():
-                del val[:max_cut]
-
-
 def save_sensors_data(data_q, mon_q, fd, data_store, source2roles_map):
     fd.write("\n")
 
-    BUFFER = 3
     observed_nodes = set()
-    testnodes_data = {
-        'io': {},
-        'cpu': {},
-    }
 
     try:
         while True:
@@ -114,23 +35,12 @@
 
             source_id = data.pop('source_id')
             rep_time = data.pop('time')
+
             if 'testnode' in source2roles_map.get(source_id, []):
-                vl = testnodes_data['io'].get(rep_time, 0)
-                sum_io_q = vl
-                testnodes_data['io'][rep_time] = sum_io_q
-
-            etime = time.time() - BUFFER
-            for name, vals in testnodes_data.items():
-                new_vals = {}
-                for rtime, value in vals.items():
-                    if rtime < etime:
-                        data_store.set_values("testnodes:io", rtime, [value])
-                    else:
-                        new_vals[rtime] = value
-
-                vals.clear()
-                vals.update(new_vals)
-
+                sum_io_q = 0
+                data_store.update(rep_time,
+                                  {"testnodes:io": sum_io_q},
+                                  add=True)
     except Exception:
         logger.exception("Error in sensors thread")
     logger.info("Sensors thread exits")
diff --git a/wally/suits/io/ceph.cfg b/wally/suits/io/ceph.cfg
index 6abaae5..085153f 100644
--- a/wally/suits/io/ceph.cfg
+++ b/wally/suits/io/ceph.cfg
@@ -25,7 +25,7 @@
 numjobs={NUMJOBS}
 
 # ---------------------------------------------------------------------
-# check different thread count, sync mode. (latency, iops) = func(th_count)
+# direct write
 # ---------------------------------------------------------------------
 [ceph_test_{TEST_SUMM} * {NUM_ROUNDS}]
 blocksize=4k
diff --git a/wally/suits/io/formatter.py b/wally/suits/io/formatter.py
index a1da1c3..63c9408 100644
--- a/wally/suits/io/formatter.py
+++ b/wally/suits/io/formatter.py
@@ -24,9 +24,11 @@
     tab.set_cols_align(["l", "r", "r", "r", "r"])
 
     prev_k = None
+
     items = sorted(test_set['res'].items(), key=key_func)
 
     for test_name, data in items:
+
         curr_k = key_func((test_name, data))[:3]
 
         if prev_k is not None:
diff --git a/wally/suits/io/results_loader.py b/wally/suits/io/results_loader.py
index c67dbe8..3b50bf7 100644
--- a/wally/suits/io/results_loader.py
+++ b/wally/suits/io/results_loader.py
@@ -3,7 +3,7 @@
 import collections
 
 
-from wally.utils import ssize_to_b
+# from wally.utils import ssize_to_b
 from wally.statistic import med_dev
 
 PerfInfo = collections.namedtuple('PerfInfo',
@@ -12,16 +12,28 @@
                                    'lat', 'lat_dev', 'raw'))
 
 
+def split_and_add(data, block_count):
+    assert len(data) % block_count == 0
+    res = [0] * (len(data) // block_count)
+
+    for i in range(block_count):
+        for idx, val in enumerate(data[i::block_count]):
+            res[idx] += val
+
+    return res
+
+
 def process_disk_info(test_output):
     data = {}
-
     for tp, pre_result in test_output:
         if tp != 'io' or pre_result is None:
             pass
 
+        vm_count = pre_result['__test_meta__']['testnodes_count']
+
         for name, results in pre_result['res'].items():
-            bw, bw_dev = med_dev(results['bw'])
-            iops, iops_dev = med_dev(results['iops'])
+            bw, bw_dev = med_dev(split_and_add(results['bw'], vm_count))
+            iops, iops_dev = med_dev(split_and_add(results['iops'], vm_count))
             lat, lat_dev = med_dev(results['lat'])
             dev = bw_dev / float(bw)
             data[name] = PerfInfo(name, bw, iops, dev, lat, lat_dev, results)
@@ -82,19 +94,19 @@
     return closure
 
 
-def load_data(raw_data):
-    data = list(parse_output(raw_data))[0]
+# def load_data(raw_data):
+#     data = list(parse_output(raw_data))[0]
 
-    for key, val in data['res'].items():
-        val['blocksize_b'] = ssize_to_b(val['blocksize'])
+#     for key, val in data['res'].items():
+#         val['blocksize_b'] = ssize_to_b(val['blocksize'])
 
-        val['iops_mediana'], val['iops_stddev'] = med_dev(val['iops'])
-        val['bw_mediana'], val['bw_stddev'] = med_dev(val['bw'])
-        val['lat_mediana'], val['lat_stddev'] = med_dev(val['lat'])
-        yield val
+#         val['iops_mediana'], val['iops_stddev'] = med_dev(val['iops'])
+#         val['bw_mediana'], val['bw_stddev'] = med_dev(val['bw'])
+#         val['lat_mediana'], val['lat_stddev'] = med_dev(val['lat'])
+#         yield val
 
 
-def load_files(*fnames):
-    for fname in fnames:
-        for i in load_data(open(fname).read()):
-            yield i
+# def load_files(*fnames):
+#     for fname in fnames:
+#         for i in load_data(open(fname).read()):
+#             yield i
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index 4b9db19..605df2c 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -301,13 +301,16 @@
                     try:
                         pid = read_from_remote(sftp, self.pid_file)
                         is_running = True
-                    except (NameError, IOError) as exc:
+                    except (NameError, IOError, OSError) as exc:
                         pid = None
                         is_running = False
 
                     if is_running:
                         if not self.check_process_is_running(sftp, pid):
-                            sftp.remove(self.pid_file)
+                            try:
+                                sftp.remove(self.pid_file)
+                            except (IOError, NameError, OSError):
+                                pass
                             is_running = False
 
             is_connected = True
@@ -370,6 +373,10 @@
                 for test in self.fio_configs:
                     exec_time += io_agent.calculate_execution_time(test)
 
+                # +5% - is a rough estimation for additional operations
+                # like sftp, etc
+                exec_time = int(exec_time * 1.05)
+
                 exec_time_s = sec_to_str(exec_time)
                 now_dt = datetime.datetime.now()
                 end_dt = now_dt + datetime.timedelta(0, exec_time)
@@ -444,6 +451,7 @@
         exec_time_str = sec_to_str(exec_time)
 
         timeout = int(exec_time + max(300, exec_time))
+        soft_tout = exec_time
         barrier.wait()
         self.run_over_ssh(cmd, nolog=nolog)
 
@@ -467,7 +475,7 @@
         if self.node.connection is not Local:
             self.node.connection.close()
 
-        self.wait_till_finished(timeout)
+        self.wait_till_finished(soft_tout, timeout)
         if not nolog:
             logger.debug("Test on node {0} is finished".format(conn_id))
 
diff --git a/wally/timeseries.py b/wally/timeseries.py
new file mode 100644
index 0000000..d322fff
--- /dev/null
+++ b/wally/timeseries.py
@@ -0,0 +1,61 @@
+import array
+import threading
+
+
+class SensorDatastore(object):
+    def __init__(self, stime=None):
+        self.lock = threading.Lock()
+        self.stime = stime
+
+        self.min_size = 60 * 60
+        self.max_size = 60 * 61
+
+        self.data = {
+            'testnodes:io': array.array("B"),
+            'testnodes:cpu': array.array("B"),
+        }
+
+    def get_values(self, name, start, end):
+        assert end >= start
+
+        if end == start:
+            return []
+
+        with self.lock:
+            curr_arr = self.data[name]
+            if self.stime is None:
+                return []
+
+            sidx = start - self.stime
+            eidx = end - self.stime
+
+            if sidx < 0 and eidx < 0:
+                return [0] * (end - start)
+            elif sidx < 0:
+                return [0] * (-sidx) + curr_arr[:eidx]
+            return curr_arr[sidx:eidx]
+
+    def update_values(self, data_time, vals, add=False):
+        with self.lock:
+            if self.stime is None:
+                self.stime = data_time
+
+            for name, value in vals.items():
+                curr_arr = self.data.setdefault(name, array.array("H"))
+                curr_end_time = len(curr_arr) + self.stime
+
+                dtime = data_time - curr_end_time
+
+                if dtime > 0:
+                    curr_arr.extend([0] * dtime)
+                    curr_arr.append(value)
+                elif dtime == 0:
+                    curr_arr.append(value)
+                else:
+                    # dtime < 0
+                    sindex = len(curr_arr) + dtime
+                    if sindex > 0:
+                        if add:
+                            curr_arr[sindex] += value
+                        else:
+                            curr_arr[sindex].append(value)