more improvements and fixes and new bugs
diff --git a/wally/suits/io/agent.py b/wally/suits/io/agent.py
index 7589346..d15d18e 100644
--- a/wally/suits/io/agent.py
+++ b/wally/suits/io/agent.py
@@ -55,6 +55,40 @@
 counter = [0]
 
 
+def extract_iterables(vals):
+    iterable_names = []
+    iterable_values = []
+    rest = {}
+
+    for val_name, val in vals.items():
+        if val is None or not val.startswith('{%'):
+            rest[val_name] = val
+        else:
+            assert val.endswith("%}")
+            content = val[2:-2]
+            iterable_names.append(val_name)
+            iterable_values.append(list(i.strip() for i in content.split(',')))
+
+    return iterable_names, iterable_values, rest
+
+
+def format_params_into_section(sec, params, final=True):
+    processed_vals = {}
+
+    for val_name, val in sec.items():
+        if val is None:
+            processed_vals[val_name] = val
+        else:
+            try:
+                processed_vals[val_name] = val.format(**params)
+            except KeyError:
+                if final:
+                    raise
+                processed_vals[val_name] = val
+
+    return processed_vals
+
+
 def process_section(name, vals, defaults, format_params):
     vals = vals.copy()
     params = format_params.copy()
@@ -67,30 +101,25 @@
         repeat = 1
 
     # this code can be optimized
-    iterable_names = []
-    iterable_values = []
-    processed_vals = {}
-
-    for val_name, val in vals.items():
-        if val is None:
-            processed_vals[val_name] = val
-        # remove hardcode
-        elif val.startswith('{%'):
-            assert val.endswith("%}")
-            content = val[2:-2].format(**params)
-            iterable_names.append(val_name)
-            iterable_values.append(list(i.strip() for i in content.split(',')))
-        else:
-            processed_vals[val_name] = val.format(**params)
+    iterable_names, iterable_values, processed_vals = extract_iterables(vals)
 
     group_report_err_msg = "Group reporting should be set if numjobs != 1"
 
     if iterable_values == []:
+        processed_vals = format_params_into_section(processed_vals, params,
+                                                    final=False)
         params['UNIQ'] = 'UN{0}'.format(counter[0])
         counter[0] += 1
         params['TEST_SUMM'] = get_test_summary(processed_vals)
 
-        if processed_vals.get('numjobs', '1') != '1':
+        num_jobs = int(processed_vals.get('numjobs', '1'))
+        fsize = to_bytes(processed_vals['size'])
+        params['PER_TH_OFFSET'] = fsize // num_jobs
+
+        processed_vals = format_params_into_section(processed_vals, params,
+                                                    final=True)
+
+        if num_jobs != 1:
             assert 'group_reporting' in processed_vals, group_report_err_msg
 
         ramp_time = processed_vals.get('ramp_time')
@@ -104,11 +133,21 @@
             processed_vals['ramp_time'] = ramp_time
     else:
         for it_vals in itertools.product(*iterable_values):
+            processed_vals = format_params_into_section(processed_vals, params,
+                                                        final=False)
+
             processed_vals.update(dict(zip(iterable_names, it_vals)))
             params['UNIQ'] = 'UN{0}'.format(counter[0])
             counter[0] += 1
             params['TEST_SUMM'] = get_test_summary(processed_vals)
 
+            num_jobs = int(processed_vals.get('numjobs', '1'))
+            fsize = to_bytes(processed_vals['size'])
+            params['PER_TH_OFFSET'] = fsize // num_jobs
+
+            processed_vals = format_params_into_section(processed_vals, params,
+                                                        final=True)
+
             if processed_vals.get('numjobs', '1') != '1':
                 assert 'group_reporting' in processed_vals,\
                     group_report_err_msg
@@ -130,6 +169,7 @@
     time = 0
     for _, params in combinations:
         time += int(params.get('ramp_time', 0))
+        time += int(params.get('_ramp_time', 0))
         time += int(params.get('runtime', 0))
     return time
 
@@ -236,6 +276,8 @@
             return (1024 ** 2) * int(sz[:-1])
         if sz[-1] == 'k':
             return 1024 * int(sz[:-1])
+        if sz[-1] == 'g':
+            return (1024 ** 3) * int(sz[:-1])
         raise
 
 
@@ -348,12 +390,17 @@
 def do_run_fio(bconf):
     benchmark_config = format_fio_config(bconf)
     cmd = ["fio", "--output-format=json", "--alloc-size=262144", "-"]
-    p = subprocess.Popen(cmd, stdin=subprocess.PIPE,
+    p = subprocess.Popen(cmd,
+                         stdin=subprocess.PIPE,
                          stdout=subprocess.PIPE,
-                         stderr=subprocess.STDOUT)
+                         stderr=subprocess.PIPE)
 
     # set timeout
-    raw_out, _ = p.communicate(benchmark_config)
+    raw_out, raw_err = p.communicate(benchmark_config)
+
+    if 0 != p.returncode:
+        msg = "Fio failed with code: {0}\nOutput={1}"
+        raise OSError(msg.format(p.returncode, raw_err))
 
     try:
         parsed_out = json.loads(raw_out)["jobs"]
@@ -373,7 +420,14 @@
 MAX_JOBS = 1000
 
 
-def next_test_portion(whole_conf, runcycle):
+def next_test_portion(whole_conf, runcycle, cluster=False):
+    if cluster:
+        for name, sec in whole_conf:
+            if '_ramp_time' in sec:
+                sec['ramp_time'] = sec.pop('_ramp_time')
+            yield [(name, sec)]
+        return
+
     jcount = 0
     runtime = 0
     bconf = []
@@ -458,12 +512,14 @@
     res[jname] = j_res
 
 
-def compile(benchmark_config, params, runcycle=None):
+def compile(benchmark_config, params, skip=0, runcycle=None, cluster=False):
     whole_conf = list(parse_fio_config_full(benchmark_config, params))
+    whole_conf = whole_conf[skip:]
     res = ""
 
-    for bconf in next_test_portion(whole_conf, runcycle):
+    for bconf in next_test_portion(whole_conf, runcycle, cluster=cluster):
         res += format_fio_config(bconf)
+        res += "\n#" + "-" * 50 + "\n\n"
 
     return res
 
@@ -473,7 +529,8 @@
             runcycle=None,
             raw_results_func=None,
             skip_tests=0,
-            fake_fio=False):
+            fake_fio=False,
+            cluster=False):
 
     whole_conf = list(parse_fio_config_full(benchmark_config, params))
     whole_conf = whole_conf[skip_tests:]
@@ -482,7 +539,7 @@
     executed_tests = 0
     ok = True
     try:
-        for bconf in next_test_portion(whole_conf, runcycle):
+        for bconf in next_test_portion(whole_conf, runcycle, cluster=cluster):
 
             if fake_fio:
                 res_cfg_it = do_run_fio_fake(bconf)
@@ -504,7 +561,7 @@
                     continue
 
                 add_job_results(jname, job_output, jconfig, res)
-
+            curr_test_num += 1
             msg_template = "Done {0} tests from {1}. ETA: {2}"
             exec_time = estimate_cfg(benchmark_config, params, curr_test_num)
 
@@ -588,6 +645,8 @@
                         help="Skip NUM tests")
     parser.add_argument("--faked-fio", action='store_true',
                         default=False, help="Emulate fio with 0 test time")
+    parser.add_argument("--cluster", action='store_true',
+                        default=False, help="Apply cluster-test settings")
     parser.add_argument("--params", nargs="*", metavar="PARAM=VAL",
                         default=[],
                         help="Provide set of pairs PARAM=VAL to" +
@@ -620,14 +679,16 @@
         return 0
 
     if argv_obj.num_tests or argv_obj.compile:
-        bconf = list(parse_fio_config_full(job_cfg, params))
-        bconf = bconf[argv_obj.skip_tests:]
-
         if argv_obj.compile:
-            out_fd.write(format_fio_config(bconf))
+            data = compile(job_cfg, params, argv_obj.skip_tests,
+                           cluster=argv_obj.cluster)
+            out_fd.write(data)
             out_fd.write("\n")
 
         if argv_obj.num_tests:
+            bconf = list(parse_fio_config_full(job_cfg, params,
+                                               argv_obj.cluster))
+            bconf = bconf[argv_obj.skip_tests:]
             print len(bconf)
 
         return 0
@@ -653,7 +714,8 @@
                                            argv_obj.runcycle,
                                            rrfunc,
                                            argv_obj.skip_tests,
-                                           argv_obj.faked_fio)
+                                           argv_obj.faked_fio,
+                                           cluster=argv_obj.cluster)
     etime = time.time()
 
     res = {'__meta__': {'raw_cfg': job_cfg, 'params': params}, 'res': job_res}
diff --git a/wally/suits/io/formatter.py b/wally/suits/io/formatter.py
index 529b78a..a1da1c3 100644
--- a/wally/suits/io/formatter.py
+++ b/wally/suits/io/formatter.py
@@ -1,8 +1,8 @@
 import texttable
 
 from wally.utils import ssize_to_b
-from wally.statistic import med_dev
 from wally.suits.io.agent import get_test_summary
+from wally.statistic import med_dev, round_deviation, round_3_digit
 
 
 def key_func(k_data):
@@ -37,14 +37,24 @@
 
         descr = get_test_summary(data)
 
-        iops, _ = med_dev(data['iops'])
-        bw, bwdev = med_dev(data['bw'])
+        iops, _ = round_deviation(med_dev(data['iops']))
+        bw, bwdev = round_deviation(med_dev(data['bw']))
 
         # 3 * sigma
-        dev_perc = int((bwdev * 300) / bw)
+        if 0 == bw:
+            assert 0 == bwdev
+            dev_perc = 0
+        else:
+            dev_perc = int((bwdev * 300) / bw)
 
-        params = (descr, int(iops), int(bw), dev_perc,
-                  int(med_dev(data['lat'])[0]) // 1000)
+        med_lat, _ = round_deviation(med_dev(data['lat']))
+        med_lat = int(med_lat) // 1000
+
+        iops = round_3_digit(iops)
+        bw = round_3_digit(bw)
+        med_lat = round_3_digit(med_lat)
+
+        params = (descr, int(iops), int(bw), dev_perc, med_lat)
         tab.add_row(params)
 
     header = ["Description", "IOPS", "BW KiBps", "Dev * 3 %", "clat ms"]
diff --git a/wally/suits/io/io_scenario_ceph.cfg b/wally/suits/io/io_scenario_ceph.cfg
new file mode 100644
index 0000000..5e793f2
--- /dev/null
+++ b/wally/suits/io/io_scenario_ceph.cfg
@@ -0,0 +1,62 @@
+[defaults]
+wait_for_previous
+group_reporting
+time_based
+buffered=0
+iodepth=1
+softrandommap=1
+filename={FILENAME}
+NUM_ROUNDS=7
+
+size=5G
+ramp_time=20
+runtime=20
+
+# ---------------------------------------------------------------------
+# check different thread count, direct read/write mode. (bw, iops) = func(th_count)
+# also check BW for seq read/write.
+# ---------------------------------------------------------------------
+[ceph_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize=1m
+rw=read
+direct=1
+offset_increment={PER_TH_OFFSET}
+numjobs={% 20, 120 %}
+
+# # ---------------------------------------------------------------------
+# # check different thread count, sync mode. (latency, iops) = func(th_count)
+# # ---------------------------------------------------------------------
+# [hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
+# blocksize=4k
+# rw=randwrite
+# sync=1
+# numjobs={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
+# 
+# # ---------------------------------------------------------------------
+# # check different thread count, direct read mode. (latency, iops) = func(th_count)
+# # also check iops for randread
+# # ---------------------------------------------------------------------
+# [hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
+# blocksize=4k
+# rw=randread
+# direct=1
+# numjobs={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
+# 
+# # ---------------------------------------------------------------------
+# # check different thread count, direct read/write mode. (bw, iops) = func(th_count)
+# # also check BW for seq read/write.
+# # ---------------------------------------------------------------------
+# [hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
+# blocksize=1m
+# rw={% read, write %}
+# direct=1
+# offset_increment=1073741824 # 1G
+# numjobs={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
+# 
+# # ---------------------------------------------------------------------
+# # check IOPS randwrite.
+# # ---------------------------------------------------------------------
+# [hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
+# blocksize=4k
+# rw=randwrite
+# direct=1
diff --git a/wally/suits/io/io_scenario_hdd.cfg b/wally/suits/io/io_scenario_hdd.cfg
index 5e24009..46191f2 100644
--- a/wally/suits/io/io_scenario_hdd.cfg
+++ b/wally/suits/io/io_scenario_hdd.cfg
@@ -8,8 +8,8 @@
 filename={FILENAME}
 NUM_ROUNDS=7
 
-ramp_time=5
 size=10Gb
+ramp_time=5
 runtime=30
 
 # ---------------------------------------------------------------------
@@ -39,6 +39,7 @@
 blocksize=1m
 rw={% read, write %}
 direct=1
+offset_increment=1073741824 # 1G
 numjobs={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
 
 # ---------------------------------------------------------------------
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index 1e93247..c5615bb 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -137,29 +137,45 @@
         self.files_to_copy = {local_fname: self.io_py_remote}
         copy_paths(conn, self.files_to_copy)
 
-        files = {}
+        if self.options.get('prefill_files', True):
+            files = {}
 
-        for secname, params in self.configs:
-            sz = ssize_to_b(params['size'])
-            msz = msz = sz / (1024 ** 2)
-            if sz % (1024 ** 2) != 0:
-                msz += 1
+            for secname, params in self.configs:
+                sz = ssize_to_b(params['size'])
+                msz = sz / (1024 ** 2)
+                if sz % (1024 ** 2) != 0:
+                    msz += 1
 
-            fname = params['filename']
-            files[fname] = max(files.get(fname, 0), msz)
+                fname = params['filename']
 
-        # logger.warning("dd run DISABLED")
-        # cmd_templ = "dd if=/dev/zero of={0} bs={1} count={2}"
+                # if already has other test with the same file name
+                # take largest size
+                files[fname] = max(files.get(fname, 0), msz)
 
-        cmd_templ = "sudo dd if=/dev/zero of={0} bs={1} count={2}"
-        for fname, sz in files.items():
-            cmd = cmd_templ.format(fname, 1024 ** 2, msz)
-            run_over_ssh(conn, cmd, timeout=msz, node=self.node)
+            # logger.warning("dd run DISABLED")
+            # cmd_templ = "dd if=/dev/zero of={0} bs={1} count={2}"
+
+            cmd_templ = "sudo dd if=/dev/zero of={0} bs={1} count={2}"
+            ssize = 0
+            stime = time.time()
+
+            for fname, curr_sz in files.items():
+                cmd = cmd_templ.format(fname, 1024 ** 2, curr_sz)
+                ssize += curr_sz
+                run_over_ssh(conn, cmd, timeout=curr_sz, node=self.node)
+
+            ddtime = time.time() - stime
+            if ddtime > 1E-3:
+                fill_bw = int(ssize / ddtime)
+                mess = "Initiall dd fill bw is {0} MiBps for this vm"
+                logger.info(mess.format(fill_bw))
+        else:
+            logger.warning("Test files prefill disabled")
 
     def run(self, conn, barrier):
         # logger.warning("No tests runned")
         # return
-        cmd_templ = "sudo env python2 {0} --type {1} {2} --json -"
+        cmd_templ = "sudo env python2 {0} {3} --type {1} {2} --json -"
         # cmd_templ = "env python2 {0} --type {1} {2} --json -"
 
         params = " ".join("{0}={1}".format(k, v)
@@ -168,16 +184,24 @@
         if "" != params:
             params = "--params " + params
 
-        cmd = cmd_templ.format(self.io_py_remote, self.tool, params)
+        if self.options.get('cluster', False):
+            logger.info("Cluster mode is used")
+            cluster_opt = "--cluster"
+        else:
+            logger.info("Non-cluster mode is used")
+            cluster_opt = ""
+
+        cmd = cmd_templ.format(self.io_py_remote, self.tool, params,
+                               cluster_opt)
         logger.debug("Waiting on barrier")
 
         exec_time = io_agent.estimate_cfg(self.raw_cfg, self.config_params)
         exec_time_str = sec_to_str(exec_time)
 
         try:
+            timeout = int(exec_time * 1.2 + 300)
             if barrier.wait():
                 templ = "Test should takes about {0}. Will wait at most {1}"
-                timeout = int(exec_time * 1.1 + 300)
                 logger.info(templ.format(exec_time_str, sec_to_str(timeout)))
 
             out_err = run_over_ssh(conn, cmd,