more improvements and fixes and new bugs
diff --git a/wally/suits/io/agent.py b/wally/suits/io/agent.py
index 7589346..d15d18e 100644
--- a/wally/suits/io/agent.py
+++ b/wally/suits/io/agent.py
@@ -55,6 +55,40 @@
counter = [0]
+def extract_iterables(vals):
+ iterable_names = []
+ iterable_values = []
+ rest = {}
+
+ for val_name, val in vals.items():
+ if val is None or not val.startswith('{%'):
+ rest[val_name] = val
+ else:
+ assert val.endswith("%}")
+ content = val[2:-2]
+ iterable_names.append(val_name)
+ iterable_values.append(list(i.strip() for i in content.split(',')))
+
+ return iterable_names, iterable_values, rest
+
+
+def format_params_into_section(sec, params, final=True):
+ processed_vals = {}
+
+ for val_name, val in sec.items():
+ if val is None:
+ processed_vals[val_name] = val
+ else:
+ try:
+ processed_vals[val_name] = val.format(**params)
+ except KeyError:
+ if final:
+ raise
+ processed_vals[val_name] = val
+
+ return processed_vals
+
+
def process_section(name, vals, defaults, format_params):
vals = vals.copy()
params = format_params.copy()
@@ -67,30 +101,25 @@
repeat = 1
# this code can be optimized
- iterable_names = []
- iterable_values = []
- processed_vals = {}
-
- for val_name, val in vals.items():
- if val is None:
- processed_vals[val_name] = val
- # remove hardcode
- elif val.startswith('{%'):
- assert val.endswith("%}")
- content = val[2:-2].format(**params)
- iterable_names.append(val_name)
- iterable_values.append(list(i.strip() for i in content.split(',')))
- else:
- processed_vals[val_name] = val.format(**params)
+ iterable_names, iterable_values, processed_vals = extract_iterables(vals)
group_report_err_msg = "Group reporting should be set if numjobs != 1"
if iterable_values == []:
+ processed_vals = format_params_into_section(processed_vals, params,
+ final=False)
params['UNIQ'] = 'UN{0}'.format(counter[0])
counter[0] += 1
params['TEST_SUMM'] = get_test_summary(processed_vals)
- if processed_vals.get('numjobs', '1') != '1':
+ num_jobs = int(processed_vals.get('numjobs', '1'))
+ fsize = to_bytes(processed_vals['size'])
+ params['PER_TH_OFFSET'] = fsize // num_jobs
+
+ processed_vals = format_params_into_section(processed_vals, params,
+ final=True)
+
+ if num_jobs != 1:
assert 'group_reporting' in processed_vals, group_report_err_msg
ramp_time = processed_vals.get('ramp_time')
@@ -104,11 +133,21 @@
processed_vals['ramp_time'] = ramp_time
else:
for it_vals in itertools.product(*iterable_values):
+ processed_vals = format_params_into_section(processed_vals, params,
+ final=False)
+
processed_vals.update(dict(zip(iterable_names, it_vals)))
params['UNIQ'] = 'UN{0}'.format(counter[0])
counter[0] += 1
params['TEST_SUMM'] = get_test_summary(processed_vals)
+ num_jobs = int(processed_vals.get('numjobs', '1'))
+ fsize = to_bytes(processed_vals['size'])
+ params['PER_TH_OFFSET'] = fsize // num_jobs
+
+ processed_vals = format_params_into_section(processed_vals, params,
+ final=True)
+
if processed_vals.get('numjobs', '1') != '1':
assert 'group_reporting' in processed_vals,\
group_report_err_msg
@@ -130,6 +169,7 @@
time = 0
for _, params in combinations:
time += int(params.get('ramp_time', 0))
+ time += int(params.get('_ramp_time', 0))
time += int(params.get('runtime', 0))
return time
@@ -236,6 +276,8 @@
return (1024 ** 2) * int(sz[:-1])
if sz[-1] == 'k':
return 1024 * int(sz[:-1])
+ if sz[-1] == 'g':
+ return (1024 ** 3) * int(sz[:-1])
raise
@@ -348,12 +390,17 @@
def do_run_fio(bconf):
benchmark_config = format_fio_config(bconf)
cmd = ["fio", "--output-format=json", "--alloc-size=262144", "-"]
- p = subprocess.Popen(cmd, stdin=subprocess.PIPE,
+ p = subprocess.Popen(cmd,
+ stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT)
+ stderr=subprocess.PIPE)
# set timeout
- raw_out, _ = p.communicate(benchmark_config)
+ raw_out, raw_err = p.communicate(benchmark_config)
+
+ if 0 != p.returncode:
+ msg = "Fio failed with code: {0}\nOutput={1}"
+ raise OSError(msg.format(p.returncode, raw_err))
try:
parsed_out = json.loads(raw_out)["jobs"]
@@ -373,7 +420,14 @@
MAX_JOBS = 1000
-def next_test_portion(whole_conf, runcycle):
+def next_test_portion(whole_conf, runcycle, cluster=False):
+ if cluster:
+ for name, sec in whole_conf:
+ if '_ramp_time' in sec:
+ sec['ramp_time'] = sec.pop('_ramp_time')
+ yield [(name, sec)]
+ return
+
jcount = 0
runtime = 0
bconf = []
@@ -458,12 +512,14 @@
res[jname] = j_res
-def compile(benchmark_config, params, runcycle=None):
+def compile(benchmark_config, params, skip=0, runcycle=None, cluster=False):
whole_conf = list(parse_fio_config_full(benchmark_config, params))
+ whole_conf = whole_conf[skip:]
res = ""
- for bconf in next_test_portion(whole_conf, runcycle):
+ for bconf in next_test_portion(whole_conf, runcycle, cluster=cluster):
res += format_fio_config(bconf)
+ res += "\n#" + "-" * 50 + "\n\n"
return res
@@ -473,7 +529,8 @@
runcycle=None,
raw_results_func=None,
skip_tests=0,
- fake_fio=False):
+ fake_fio=False,
+ cluster=False):
whole_conf = list(parse_fio_config_full(benchmark_config, params))
whole_conf = whole_conf[skip_tests:]
@@ -482,7 +539,7 @@
executed_tests = 0
ok = True
try:
- for bconf in next_test_portion(whole_conf, runcycle):
+ for bconf in next_test_portion(whole_conf, runcycle, cluster=cluster):
if fake_fio:
res_cfg_it = do_run_fio_fake(bconf)
@@ -504,7 +561,7 @@
continue
add_job_results(jname, job_output, jconfig, res)
-
+ curr_test_num += 1
msg_template = "Done {0} tests from {1}. ETA: {2}"
exec_time = estimate_cfg(benchmark_config, params, curr_test_num)
@@ -588,6 +645,8 @@
help="Skip NUM tests")
parser.add_argument("--faked-fio", action='store_true',
default=False, help="Emulate fio with 0 test time")
+ parser.add_argument("--cluster", action='store_true',
+ default=False, help="Apply cluster-test settings")
parser.add_argument("--params", nargs="*", metavar="PARAM=VAL",
default=[],
help="Provide set of pairs PARAM=VAL to" +
@@ -620,14 +679,16 @@
return 0
if argv_obj.num_tests or argv_obj.compile:
- bconf = list(parse_fio_config_full(job_cfg, params))
- bconf = bconf[argv_obj.skip_tests:]
-
if argv_obj.compile:
- out_fd.write(format_fio_config(bconf))
+ data = compile(job_cfg, params, argv_obj.skip_tests,
+ cluster=argv_obj.cluster)
+ out_fd.write(data)
out_fd.write("\n")
if argv_obj.num_tests:
+ bconf = list(parse_fio_config_full(job_cfg, params,
+ argv_obj.cluster))
+ bconf = bconf[argv_obj.skip_tests:]
print len(bconf)
return 0
@@ -653,7 +714,8 @@
argv_obj.runcycle,
rrfunc,
argv_obj.skip_tests,
- argv_obj.faked_fio)
+ argv_obj.faked_fio,
+ cluster=argv_obj.cluster)
etime = time.time()
res = {'__meta__': {'raw_cfg': job_cfg, 'params': params}, 'res': job_res}