more improvements and fixes and new bugs
diff --git a/wally/suits/io/agent.py b/wally/suits/io/agent.py
index 7589346..d15d18e 100644
--- a/wally/suits/io/agent.py
+++ b/wally/suits/io/agent.py
@@ -55,6 +55,40 @@
 counter = [0]
 
 
+def extract_iterables(vals):
+    iterable_names = []
+    iterable_values = []
+    rest = {}
+
+    for val_name, val in vals.items():
+        if val is None or not val.startswith('{%'):
+            rest[val_name] = val
+        else:
+            assert val.endswith("%}")
+            content = val[2:-2]
+            iterable_names.append(val_name)
+            iterable_values.append(list(i.strip() for i in content.split(',')))
+
+    return iterable_names, iterable_values, rest
+
+
+def format_params_into_section(sec, params, final=True):
+    processed_vals = {}
+
+    for val_name, val in sec.items():
+        if val is None:
+            processed_vals[val_name] = val
+        else:
+            try:
+                processed_vals[val_name] = val.format(**params)
+            except KeyError:
+                if final:
+                    raise
+                processed_vals[val_name] = val
+
+    return processed_vals
+
+
 def process_section(name, vals, defaults, format_params):
     vals = vals.copy()
     params = format_params.copy()
@@ -67,30 +101,25 @@
         repeat = 1
 
     # this code can be optimized
-    iterable_names = []
-    iterable_values = []
-    processed_vals = {}
-
-    for val_name, val in vals.items():
-        if val is None:
-            processed_vals[val_name] = val
-        # remove hardcode
-        elif val.startswith('{%'):
-            assert val.endswith("%}")
-            content = val[2:-2].format(**params)
-            iterable_names.append(val_name)
-            iterable_values.append(list(i.strip() for i in content.split(',')))
-        else:
-            processed_vals[val_name] = val.format(**params)
+    iterable_names, iterable_values, processed_vals = extract_iterables(vals)
 
     group_report_err_msg = "Group reporting should be set if numjobs != 1"
 
     if iterable_values == []:
+        processed_vals = format_params_into_section(processed_vals, params,
+                                                    final=False)
         params['UNIQ'] = 'UN{0}'.format(counter[0])
         counter[0] += 1
         params['TEST_SUMM'] = get_test_summary(processed_vals)
 
-        if processed_vals.get('numjobs', '1') != '1':
+        num_jobs = int(processed_vals.get('numjobs', '1'))
+        fsize = to_bytes(processed_vals['size'])
+        params['PER_TH_OFFSET'] = fsize // num_jobs
+
+        processed_vals = format_params_into_section(processed_vals, params,
+                                                    final=True)
+
+        if num_jobs != 1:
             assert 'group_reporting' in processed_vals, group_report_err_msg
 
         ramp_time = processed_vals.get('ramp_time')
@@ -104,11 +133,21 @@
             processed_vals['ramp_time'] = ramp_time
     else:
         for it_vals in itertools.product(*iterable_values):
+            processed_vals = format_params_into_section(processed_vals, params,
+                                                        final=False)
+
             processed_vals.update(dict(zip(iterable_names, it_vals)))
             params['UNIQ'] = 'UN{0}'.format(counter[0])
             counter[0] += 1
             params['TEST_SUMM'] = get_test_summary(processed_vals)
 
+            num_jobs = int(processed_vals.get('numjobs', '1'))
+            fsize = to_bytes(processed_vals['size'])
+            params['PER_TH_OFFSET'] = fsize // num_jobs
+
+            processed_vals = format_params_into_section(processed_vals, params,
+                                                        final=True)
+
             if processed_vals.get('numjobs', '1') != '1':
                 assert 'group_reporting' in processed_vals,\
                     group_report_err_msg
@@ -130,6 +169,7 @@
     time = 0
     for _, params in combinations:
         time += int(params.get('ramp_time', 0))
+        time += int(params.get('_ramp_time', 0))
         time += int(params.get('runtime', 0))
     return time
 
@@ -236,6 +276,8 @@
             return (1024 ** 2) * int(sz[:-1])
         if sz[-1] == 'k':
             return 1024 * int(sz[:-1])
+        if sz[-1] == 'g':
+            return (1024 ** 3) * int(sz[:-1])
         raise
 
 
@@ -348,12 +390,17 @@
 def do_run_fio(bconf):
     benchmark_config = format_fio_config(bconf)
     cmd = ["fio", "--output-format=json", "--alloc-size=262144", "-"]
-    p = subprocess.Popen(cmd, stdin=subprocess.PIPE,
+    p = subprocess.Popen(cmd,
+                         stdin=subprocess.PIPE,
                          stdout=subprocess.PIPE,
-                         stderr=subprocess.STDOUT)
+                         stderr=subprocess.PIPE)
 
     # set timeout
-    raw_out, _ = p.communicate(benchmark_config)
+    raw_out, raw_err = p.communicate(benchmark_config)
+
+    if 0 != p.returncode:
+        msg = "Fio failed with code: {0}\nOutput={1}"
+        raise OSError(msg.format(p.returncode, raw_err))
 
     try:
         parsed_out = json.loads(raw_out)["jobs"]
@@ -373,7 +420,14 @@
 MAX_JOBS = 1000
 
 
-def next_test_portion(whole_conf, runcycle):
+def next_test_portion(whole_conf, runcycle, cluster=False):
+    if cluster:
+        for name, sec in whole_conf:
+            if '_ramp_time' in sec:
+                sec['ramp_time'] = sec.pop('_ramp_time')
+            yield [(name, sec)]
+        return
+
     jcount = 0
     runtime = 0
     bconf = []
@@ -458,12 +512,14 @@
     res[jname] = j_res
 
 
-def compile(benchmark_config, params, runcycle=None):
+def compile(benchmark_config, params, skip=0, runcycle=None, cluster=False):
     whole_conf = list(parse_fio_config_full(benchmark_config, params))
+    whole_conf = whole_conf[skip:]
     res = ""
 
-    for bconf in next_test_portion(whole_conf, runcycle):
+    for bconf in next_test_portion(whole_conf, runcycle, cluster=cluster):
         res += format_fio_config(bconf)
+        res += "\n#" + "-" * 50 + "\n\n"
 
     return res
 
@@ -473,7 +529,8 @@
             runcycle=None,
             raw_results_func=None,
             skip_tests=0,
-            fake_fio=False):
+            fake_fio=False,
+            cluster=False):
 
     whole_conf = list(parse_fio_config_full(benchmark_config, params))
     whole_conf = whole_conf[skip_tests:]
@@ -482,7 +539,7 @@
     executed_tests = 0
     ok = True
     try:
-        for bconf in next_test_portion(whole_conf, runcycle):
+        for bconf in next_test_portion(whole_conf, runcycle, cluster=cluster):
 
             if fake_fio:
                 res_cfg_it = do_run_fio_fake(bconf)
@@ -504,7 +561,7 @@
                     continue
 
                 add_job_results(jname, job_output, jconfig, res)
-
+            curr_test_num += 1
             msg_template = "Done {0} tests from {1}. ETA: {2}"
             exec_time = estimate_cfg(benchmark_config, params, curr_test_num)
 
@@ -588,6 +645,8 @@
                         help="Skip NUM tests")
     parser.add_argument("--faked-fio", action='store_true',
                         default=False, help="Emulate fio with 0 test time")
+    parser.add_argument("--cluster", action='store_true',
+                        default=False, help="Apply cluster-test settings")
     parser.add_argument("--params", nargs="*", metavar="PARAM=VAL",
                         default=[],
                         help="Provide set of pairs PARAM=VAL to" +
@@ -620,14 +679,16 @@
         return 0
 
     if argv_obj.num_tests or argv_obj.compile:
-        bconf = list(parse_fio_config_full(job_cfg, params))
-        bconf = bconf[argv_obj.skip_tests:]
-
         if argv_obj.compile:
-            out_fd.write(format_fio_config(bconf))
+            data = compile(job_cfg, params, argv_obj.skip_tests,
+                           cluster=argv_obj.cluster)
+            out_fd.write(data)
             out_fd.write("\n")
 
         if argv_obj.num_tests:
+            bconf = list(parse_fio_config_full(job_cfg, params,
+                                               argv_obj.cluster))
+            bconf = bconf[argv_obj.skip_tests:]
             print len(bconf)
 
         return 0
@@ -653,7 +714,8 @@
                                            argv_obj.runcycle,
                                            rrfunc,
                                            argv_obj.skip_tests,
-                                           argv_obj.faked_fio)
+                                           argv_obj.faked_fio,
+                                           cluster=argv_obj.cluster)
     etime = time.time()
 
     res = {'__meta__': {'raw_cfg': job_cfg, 'params': params}, 'res': job_res}