more improvements and fixes and new bugs
diff --git a/wally/suits/io/agent.py b/wally/suits/io/agent.py
index 7589346..d15d18e 100644
--- a/wally/suits/io/agent.py
+++ b/wally/suits/io/agent.py
@@ -55,6 +55,40 @@
 counter = [0]
 
 
+def extract_iterables(vals):
+    iterable_names = []
+    iterable_values = []
+    rest = {}
+
+    for val_name, val in vals.items():
+        if val is None or not val.startswith('{%'):
+            rest[val_name] = val
+        else:
+            assert val.endswith("%}")
+            content = val[2:-2]
+            iterable_names.append(val_name)
+            iterable_values.append(list(i.strip() for i in content.split(',')))
+
+    return iterable_names, iterable_values, rest
+
+
+def format_params_into_section(sec, params, final=True):
+    processed_vals = {}
+
+    for val_name, val in sec.items():
+        if val is None:
+            processed_vals[val_name] = val
+        else:
+            try:
+                processed_vals[val_name] = val.format(**params)
+            except KeyError:
+                if final:
+                    raise
+                processed_vals[val_name] = val
+
+    return processed_vals
+
+
 def process_section(name, vals, defaults, format_params):
     vals = vals.copy()
     params = format_params.copy()
@@ -67,30 +101,25 @@
         repeat = 1
 
     # this code can be optimized
-    iterable_names = []
-    iterable_values = []
-    processed_vals = {}
-
-    for val_name, val in vals.items():
-        if val is None:
-            processed_vals[val_name] = val
-        # remove hardcode
-        elif val.startswith('{%'):
-            assert val.endswith("%}")
-            content = val[2:-2].format(**params)
-            iterable_names.append(val_name)
-            iterable_values.append(list(i.strip() for i in content.split(',')))
-        else:
-            processed_vals[val_name] = val.format(**params)
+    iterable_names, iterable_values, processed_vals = extract_iterables(vals)
 
     group_report_err_msg = "Group reporting should be set if numjobs != 1"
 
     if iterable_values == []:
+        processed_vals = format_params_into_section(processed_vals, params,
+                                                    final=False)
         params['UNIQ'] = 'UN{0}'.format(counter[0])
         counter[0] += 1
         params['TEST_SUMM'] = get_test_summary(processed_vals)
 
-        if processed_vals.get('numjobs', '1') != '1':
+        num_jobs = int(processed_vals.get('numjobs', '1'))
+        fsize = to_bytes(processed_vals['size'])
+        params['PER_TH_OFFSET'] = fsize // num_jobs
+
+        processed_vals = format_params_into_section(processed_vals, params,
+                                                    final=True)
+
+        if num_jobs != 1:
             assert 'group_reporting' in processed_vals, group_report_err_msg
 
         ramp_time = processed_vals.get('ramp_time')
@@ -104,11 +133,21 @@
             processed_vals['ramp_time'] = ramp_time
     else:
         for it_vals in itertools.product(*iterable_values):
+            processed_vals = format_params_into_section(processed_vals, params,
+                                                        final=False)
+
             processed_vals.update(dict(zip(iterable_names, it_vals)))
             params['UNIQ'] = 'UN{0}'.format(counter[0])
             counter[0] += 1
             params['TEST_SUMM'] = get_test_summary(processed_vals)
 
+            num_jobs = int(processed_vals.get('numjobs', '1'))
+            fsize = to_bytes(processed_vals['size'])
+            params['PER_TH_OFFSET'] = fsize // num_jobs
+
+            processed_vals = format_params_into_section(processed_vals, params,
+                                                        final=True)
+
             if processed_vals.get('numjobs', '1') != '1':
                 assert 'group_reporting' in processed_vals,\
                     group_report_err_msg
@@ -130,6 +169,7 @@
     time = 0
     for _, params in combinations:
         time += int(params.get('ramp_time', 0))
+        time += int(params.get('_ramp_time', 0))
         time += int(params.get('runtime', 0))
     return time
 
@@ -236,6 +276,8 @@
             return (1024 ** 2) * int(sz[:-1])
         if sz[-1] == 'k':
             return 1024 * int(sz[:-1])
+        if sz[-1] == 'g':
+            return (1024 ** 3) * int(sz[:-1])
         raise
 
 
@@ -348,12 +390,17 @@
 def do_run_fio(bconf):
     benchmark_config = format_fio_config(bconf)
     cmd = ["fio", "--output-format=json", "--alloc-size=262144", "-"]
-    p = subprocess.Popen(cmd, stdin=subprocess.PIPE,
+    p = subprocess.Popen(cmd,
+                         stdin=subprocess.PIPE,
                          stdout=subprocess.PIPE,
-                         stderr=subprocess.STDOUT)
+                         stderr=subprocess.PIPE)
 
     # set timeout
-    raw_out, _ = p.communicate(benchmark_config)
+    raw_out, raw_err = p.communicate(benchmark_config)
+
+    if 0 != p.returncode:
+        msg = "Fio failed with code: {0}\nOutput={1}"
+        raise OSError(msg.format(p.returncode, raw_err))
 
     try:
         parsed_out = json.loads(raw_out)["jobs"]
@@ -373,7 +420,14 @@
 MAX_JOBS = 1000
 
 
-def next_test_portion(whole_conf, runcycle):
+def next_test_portion(whole_conf, runcycle, cluster=False):
+    if cluster:
+        for name, sec in whole_conf:
+            if '_ramp_time' in sec:
+                sec['ramp_time'] = sec.pop('_ramp_time')
+            yield [(name, sec)]
+        return
+
     jcount = 0
     runtime = 0
     bconf = []
@@ -458,12 +512,14 @@
     res[jname] = j_res
 
 
-def compile(benchmark_config, params, runcycle=None):
+def compile(benchmark_config, params, skip=0, runcycle=None, cluster=False):
     whole_conf = list(parse_fio_config_full(benchmark_config, params))
+    whole_conf = whole_conf[skip:]
     res = ""
 
-    for bconf in next_test_portion(whole_conf, runcycle):
+    for bconf in next_test_portion(whole_conf, runcycle, cluster=cluster):
         res += format_fio_config(bconf)
+        res += "\n#" + "-" * 50 + "\n\n"
 
     return res
 
@@ -473,7 +529,8 @@
             runcycle=None,
             raw_results_func=None,
             skip_tests=0,
-            fake_fio=False):
+            fake_fio=False,
+            cluster=False):
 
     whole_conf = list(parse_fio_config_full(benchmark_config, params))
     whole_conf = whole_conf[skip_tests:]
@@ -482,7 +539,7 @@
     executed_tests = 0
     ok = True
     try:
-        for bconf in next_test_portion(whole_conf, runcycle):
+        for bconf in next_test_portion(whole_conf, runcycle, cluster=cluster):
 
             if fake_fio:
                 res_cfg_it = do_run_fio_fake(bconf)
@@ -504,7 +561,7 @@
                     continue
 
                 add_job_results(jname, job_output, jconfig, res)
-
+            curr_test_num += 1
             msg_template = "Done {0} tests from {1}. ETA: {2}"
             exec_time = estimate_cfg(benchmark_config, params, curr_test_num)
 
@@ -588,6 +645,8 @@
                         help="Skip NUM tests")
     parser.add_argument("--faked-fio", action='store_true',
                         default=False, help="Emulate fio with 0 test time")
+    parser.add_argument("--cluster", action='store_true',
+                        default=False, help="Apply cluster-test settings")
     parser.add_argument("--params", nargs="*", metavar="PARAM=VAL",
                         default=[],
                         help="Provide set of pairs PARAM=VAL to" +
@@ -620,14 +679,16 @@
         return 0
 
     if argv_obj.num_tests or argv_obj.compile:
-        bconf = list(parse_fio_config_full(job_cfg, params))
-        bconf = bconf[argv_obj.skip_tests:]
-
         if argv_obj.compile:
-            out_fd.write(format_fio_config(bconf))
+            data = compile(job_cfg, params, argv_obj.skip_tests,
+                           cluster=argv_obj.cluster)
+            out_fd.write(data)
             out_fd.write("\n")
 
         if argv_obj.num_tests:
+            bconf = list(parse_fio_config_full(job_cfg, params,
+                                               argv_obj.cluster))
+            bconf = bconf[argv_obj.skip_tests:]
             print len(bconf)
 
         return 0
@@ -653,7 +714,8 @@
                                            argv_obj.runcycle,
                                            rrfunc,
                                            argv_obj.skip_tests,
-                                           argv_obj.faked_fio)
+                                           argv_obj.faked_fio,
+                                           cluster=argv_obj.cluster)
     etime = time.time()
 
     res = {'__meta__': {'raw_cfg': job_cfg, 'params': params}, 'res': job_res}
diff --git a/wally/suits/io/formatter.py b/wally/suits/io/formatter.py
index 529b78a..a1da1c3 100644
--- a/wally/suits/io/formatter.py
+++ b/wally/suits/io/formatter.py
@@ -1,8 +1,8 @@
 import texttable
 
 from wally.utils import ssize_to_b
-from wally.statistic import med_dev
 from wally.suits.io.agent import get_test_summary
+from wally.statistic import med_dev, round_deviation, round_3_digit
 
 
 def key_func(k_data):
@@ -37,14 +37,24 @@
 
         descr = get_test_summary(data)
 
-        iops, _ = med_dev(data['iops'])
-        bw, bwdev = med_dev(data['bw'])
+        iops, _ = round_deviation(med_dev(data['iops']))
+        bw, bwdev = round_deviation(med_dev(data['bw']))
 
         # 3 * sigma
-        dev_perc = int((bwdev * 300) / bw)
+        if 0 == bw:
+            assert 0 == bwdev
+            dev_perc = 0
+        else:
+            dev_perc = int((bwdev * 300) / bw)
 
-        params = (descr, int(iops), int(bw), dev_perc,
-                  int(med_dev(data['lat'])[0]) // 1000)
+        med_lat, _ = round_deviation(med_dev(data['lat']))
+        med_lat = int(med_lat) // 1000
+
+        iops = round_3_digit(iops)
+        bw = round_3_digit(bw)
+        med_lat = round_3_digit(med_lat)
+
+        params = (descr, int(iops), int(bw), dev_perc, med_lat)
         tab.add_row(params)
 
     header = ["Description", "IOPS", "BW KiBps", "Dev * 3 %", "clat ms"]
diff --git a/wally/suits/io/io_scenario_ceph.cfg b/wally/suits/io/io_scenario_ceph.cfg
new file mode 100644
index 0000000..5e793f2
--- /dev/null
+++ b/wally/suits/io/io_scenario_ceph.cfg
@@ -0,0 +1,62 @@
+[defaults]
+wait_for_previous
+group_reporting
+time_based
+buffered=0
+iodepth=1
+softrandommap=1
+filename={FILENAME}
+NUM_ROUNDS=7
+
+size=5G
+ramp_time=20
+runtime=20
+
+# ---------------------------------------------------------------------
+# check different thread count, direct read/write mode. (bw, iops) = func(th_count)
+# also check BW for seq read/write.
+# ---------------------------------------------------------------------
+[ceph_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize=1m
+rw=read
+direct=1
+offset_increment={PER_TH_OFFSET}
+numjobs={% 20, 120 %}
+
+# # ---------------------------------------------------------------------
+# # check different thread count, sync mode. (latency, iops) = func(th_count)
+# # ---------------------------------------------------------------------
+# [hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
+# blocksize=4k
+# rw=randwrite
+# sync=1
+# numjobs={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
+# 
+# # ---------------------------------------------------------------------
+# # check different thread count, direct read mode. (latency, iops) = func(th_count)
+# # also check iops for randread
+# # ---------------------------------------------------------------------
+# [hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
+# blocksize=4k
+# rw=randread
+# direct=1
+# numjobs={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
+# 
+# # ---------------------------------------------------------------------
+# # check different thread count, direct read/write mode. (bw, iops) = func(th_count)
+# # also check BW for seq read/write.
+# # ---------------------------------------------------------------------
+# [hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
+# blocksize=1m
+# rw={% read, write %}
+# direct=1
+# offset_increment=1073741824 # 1G
+# numjobs={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
+# 
+# # ---------------------------------------------------------------------
+# # check IOPS randwrite.
+# # ---------------------------------------------------------------------
+# [hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
+# blocksize=4k
+# rw=randwrite
+# direct=1
diff --git a/wally/suits/io/io_scenario_hdd.cfg b/wally/suits/io/io_scenario_hdd.cfg
index 5e24009..46191f2 100644
--- a/wally/suits/io/io_scenario_hdd.cfg
+++ b/wally/suits/io/io_scenario_hdd.cfg
@@ -8,8 +8,8 @@
 filename={FILENAME}
 NUM_ROUNDS=7
 
-ramp_time=5
 size=10Gb
+ramp_time=5
 runtime=30
 
 # ---------------------------------------------------------------------
@@ -39,6 +39,7 @@
 blocksize=1m
 rw={% read, write %}
 direct=1
+offset_increment=1073741824 # 1G
 numjobs={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
 
 # ---------------------------------------------------------------------