start adding unit tests, rework config compiler
diff --git a/wally/suits/io/agent.py b/wally/suits/io/agent.py
index d15d18e..1982e35 100644
--- a/wally/suits/io/agent.py
+++ b/wally/suits/io/agent.py
@@ -1,7 +1,7 @@
import sys
import time
import json
-import random
+import copy
import select
import pprint
import argparse
@@ -15,6 +15,234 @@
SETTING = 1
+class FioJobSection(object):
+ def __init__(self, name):
+ self.name = name
+ self.vals = OrderedDict()
+ self.format_params = {}
+
+ def copy(self):
+ return copy.deepcopy(self)
+
+
+def to_bytes(sz):
+ sz = sz.lower()
+ try:
+ return int(sz)
+ except ValueError:
+ if sz[-1] == 'm':
+ return (1024 ** 2) * int(sz[:-1])
+ if sz[-1] == 'k':
+ return 1024 * int(sz[:-1])
+ if sz[-1] == 'g':
+ return (1024 ** 3) * int(sz[:-1])
+ raise
+
+
+def fio_config_lexer(fio_cfg):
+ for lineno, line in enumerate(fio_cfg.split("\n")):
+ try:
+ line = line.strip()
+
+ if line.startswith("#") or line.startswith(";"):
+ continue
+
+ if line == "":
+ continue
+
+ if line.startswith('['):
+ assert line.endswith(']'), "name should ends with ]"
+ yield lineno, SECTION, line[1:-1], None
+ elif '=' in line:
+ opt_name, opt_val = line.split('=', 1)
+ yield lineno, SETTING, opt_name.strip(), opt_val.strip()
+ else:
+ yield lineno, SETTING, line, None
+ except Exception as exc:
+ pref = "During parsing line number {0}\n".format(lineno)
+ raise ValueError(pref + exc.message)
+
+
+def fio_config_parse(lexer_iter, format_params):
+ orig_format_params_keys = set(format_params)
+ format_params = format_params.copy()
+ in_defaults = False
+ curr_section = None
+ defaults = OrderedDict()
+
+ for lineno, tp, name, val in lexer_iter:
+ if tp == SECTION:
+ if curr_section is not None:
+ yield curr_section
+
+ if name == 'defaults':
+ in_defaults = True
+ curr_section = None
+ else:
+ in_defaults = False
+ curr_section = FioJobSection(name)
+ curr_section.format_params = format_params.copy()
+ curr_section.vals = defaults.copy()
+ else:
+ assert tp == SETTING
+ if name == name.upper():
+ msg = "Param not in default section in line " + str(lineno)
+ assert in_defaults, msg
+ if name not in orig_format_params_keys:
+ # don't make parse_value for PARAMS
+ # they would be parsed later
+ # or this would breakes arrays
+ format_params[name] = val
+ elif in_defaults:
+ defaults[name] = parse_value(val)
+ else:
+ msg = "data outside section, line " + str(lineno)
+ assert curr_section is not None, msg
+ curr_section.vals[name] = parse_value(val)
+
+ if curr_section is not None:
+ yield curr_section
+
+
+def parse_value(val):
+ if val is None:
+ return None
+
+ try:
+ return int(val)
+ except ValueError:
+ pass
+
+ try:
+ return float(val)
+ except ValueError:
+ pass
+
+ if val.startswith('{%'):
+ assert val.endswith("%}")
+ content = val[2:-2]
+ vals = list(i.strip() for i in content.split(','))
+ return map(parse_value, vals)
+ return val
+
+
+def process_repeats(sec_iter):
+
+ for sec in sec_iter:
+ if '*' in sec.name:
+ msg = "Only one '*' allowed in section name"
+ assert sec.name.count('*') == 1, msg
+
+ name, count = sec.name.split("*")
+ sec.name = name.strip()
+ count = count.strip()
+
+ try:
+ count = int(count.strip().format(**sec.format_params))
+ except KeyError:
+ raise ValueError("No parameter {0} given".format(count[1:-1]))
+ except ValueError:
+ msg = "Parameter {0} nas non-int value {1!r}"
+ raise ValueError(msg.format(count[1:-1],
+ count.format(**sec.format_params)))
+
+ yield sec
+
+ if 'ramp_time' in sec.vals:
+ sec = sec.copy()
+ sec.vals['_ramp_time'] = sec.vals.pop('ramp_time')
+
+ for _ in range(count - 1):
+ yield sec.copy()
+ else:
+ yield sec
+
+
+def process_cycles(sec_iter):
+ # insert parametrized cycles
+ sec_iter = try_format_params_into_section(sec_iter)
+
+ for sec in sec_iter:
+
+ cycles_var_names = []
+ cycles_var_values = []
+
+ for name, val in sec.vals.items():
+ if isinstance(val, list):
+ cycles_var_names.append(name)
+ cycles_var_values.append(val)
+
+ if len(cycles_var_names) == 0:
+ yield sec
+ else:
+ for combination in itertools.product(*cycles_var_values):
+ new_sec = sec.copy()
+ new_sec.vals.update(zip(cycles_var_names, combination))
+ yield new_sec
+
+
+def try_format_params_into_section(sec_iter):
+ for sec in sec_iter:
+ params = sec.format_params
+ for name, val in sec.vals.items():
+ if isinstance(val, basestring):
+ try:
+ sec.vals[name] = parse_value(val.format(**params))
+ except:
+ pass
+
+ yield sec
+
+
+def format_params_into_section_finall(sec_iter, counter=[0]):
+ group_report_err_msg = "Group reporting should be set if numjobs != 1"
+
+ for sec in sec_iter:
+
+ num_jobs = int(sec.vals.get('numjobs', '1'))
+ if num_jobs != 1:
+ assert 'group_reporting' in sec.vals, group_report_err_msg
+
+ params = sec.format_params
+
+ fsize = to_bytes(sec.vals['size'])
+ params['PER_TH_OFFSET'] = fsize // num_jobs
+
+ for name, val in sec.vals.items():
+ if isinstance(val, basestring):
+ sec.vals[name] = parse_value(val.format(**params))
+ else:
+ assert isinstance(val, (int, float)) or val is None
+
+ params['UNIQ'] = 'UN{0}'.format(counter[0])
+ counter[0] += 1
+ params['TEST_SUMM'] = get_test_summary(sec.vals)
+ sec.name = sec.name.format(**params)
+
+ yield sec
+
+
+def fio_config_to_str(sec_iter):
+ res = ""
+
+ for pos, sec in enumerate(sec_iter):
+ if pos != 0:
+ res += "\n"
+
+ res += "[{0}]\n".format(sec.name)
+
+ for name, val in sec.vals.items():
+ if name.startswith('_'):
+ continue
+
+ if val is None:
+ res += name + "\n"
+ else:
+ res += "{0}={1}\n".format(name, val)
+
+ return res
+
+
def get_test_sync_mode(config):
try:
return config['sync_mode']
@@ -52,343 +280,78 @@
th_count)
-counter = [0]
-
-
-def extract_iterables(vals):
- iterable_names = []
- iterable_values = []
- rest = {}
-
- for val_name, val in vals.items():
- if val is None or not val.startswith('{%'):
- rest[val_name] = val
- else:
- assert val.endswith("%}")
- content = val[2:-2]
- iterable_names.append(val_name)
- iterable_values.append(list(i.strip() for i in content.split(',')))
-
- return iterable_names, iterable_values, rest
-
-
-def format_params_into_section(sec, params, final=True):
- processed_vals = {}
-
- for val_name, val in sec.items():
- if val is None:
- processed_vals[val_name] = val
- else:
- try:
- processed_vals[val_name] = val.format(**params)
- except KeyError:
- if final:
- raise
- processed_vals[val_name] = val
-
- return processed_vals
-
-
-def process_section(name, vals, defaults, format_params):
- vals = vals.copy()
- params = format_params.copy()
-
- if '*' in name:
- name, repeat = name.split('*')
- name = name.strip()
- repeat = int(repeat.format(**params))
- else:
- repeat = 1
-
- # this code can be optimized
- iterable_names, iterable_values, processed_vals = extract_iterables(vals)
-
- group_report_err_msg = "Group reporting should be set if numjobs != 1"
-
- if iterable_values == []:
- processed_vals = format_params_into_section(processed_vals, params,
- final=False)
- params['UNIQ'] = 'UN{0}'.format(counter[0])
- counter[0] += 1
- params['TEST_SUMM'] = get_test_summary(processed_vals)
-
- num_jobs = int(processed_vals.get('numjobs', '1'))
- fsize = to_bytes(processed_vals['size'])
- params['PER_TH_OFFSET'] = fsize // num_jobs
-
- processed_vals = format_params_into_section(processed_vals, params,
- final=True)
-
- if num_jobs != 1:
- assert 'group_reporting' in processed_vals, group_report_err_msg
-
- ramp_time = processed_vals.get('ramp_time')
- for i in range(repeat):
- yield name.format(**params), processed_vals.copy()
-
- if 'ramp_time' in processed_vals:
- del processed_vals['ramp_time']
-
- if ramp_time is not None:
- processed_vals['ramp_time'] = ramp_time
- else:
- for it_vals in itertools.product(*iterable_values):
- processed_vals = format_params_into_section(processed_vals, params,
- final=False)
-
- processed_vals.update(dict(zip(iterable_names, it_vals)))
- params['UNIQ'] = 'UN{0}'.format(counter[0])
- counter[0] += 1
- params['TEST_SUMM'] = get_test_summary(processed_vals)
-
- num_jobs = int(processed_vals.get('numjobs', '1'))
- fsize = to_bytes(processed_vals['size'])
- params['PER_TH_OFFSET'] = fsize // num_jobs
-
- processed_vals = format_params_into_section(processed_vals, params,
- final=True)
-
- if processed_vals.get('numjobs', '1') != '1':
- assert 'group_reporting' in processed_vals,\
- group_report_err_msg
-
- ramp_time = processed_vals.get('ramp_time')
-
- for i in range(repeat):
- yield name.format(**params), processed_vals.copy()
- if 'ramp_time' in processed_vals:
- processed_vals['_ramp_time'] = ramp_time
- processed_vals.pop('ramp_time')
-
- if ramp_time is not None:
- processed_vals['ramp_time'] = ramp_time
- processed_vals.pop('_ramp_time')
-
-
-def calculate_execution_time(combinations):
+def calculate_execution_time(sec_iter):
time = 0
- for _, params in combinations:
- time += int(params.get('ramp_time', 0))
- time += int(params.get('_ramp_time', 0))
- time += int(params.get('runtime', 0))
+ for sec in sec_iter:
+ time += sec.vals.get('ramp_time', 0)
+ time += sec.vals.get('runtime', 0)
return time
-def parse_fio_config_full(fio_cfg, params=None):
- defaults = {}
- format_params = {}
+def slice_config(sec_iter, runcycle=None, max_jobs=1000):
+ jcount = 0
+ runtime = 0
+ curr_slice = []
- if params is None:
- ext_params = {}
- else:
- ext_params = params.copy()
+ for pos, sec in enumerate(sec_iter):
- curr_section = None
- curr_section_name = None
+ jc = sec.vals.get('numjobs', 1)
+ msg = "numjobs should be integer, not {0!r}".format(jc)
+ assert isinstance(jc, int), msg
- for tp, name, val in parse_fio_config_iter(fio_cfg):
- if tp == SECTION:
- non_def = curr_section_name != 'defaults'
- if curr_section_name is not None and non_def:
- format_params.update(ext_params)
- for sec in process_section(curr_section_name,
- curr_section,
- defaults,
- format_params):
- yield sec
+ curr_task_time = calculate_execution_time([sec])
- if name == 'defaults':
- curr_section = defaults
- else:
- curr_section = OrderedDict()
- curr_section.update(defaults)
- curr_section_name = name
+ if jc > max_jobs:
+ err_templ = "Can't process job {0!r} - too large numjobs"
+ raise ValueError(err_templ.format(sec.name))
+ if runcycle is not None and len(curr_slice) != 0:
+ rc_ok = curr_task_time + runtime <= runcycle
else:
- assert tp == SETTING
- assert curr_section_name is not None, "no section name"
- if name == name.upper():
- assert curr_section_name == 'defaults'
- format_params[name] = val
- else:
- curr_section[name] = val
+ rc_ok = True
- if curr_section_name is not None and curr_section_name != 'defaults':
- format_params.update(ext_params)
- for sec in process_section(curr_section_name,
- curr_section,
- defaults,
- format_params):
- yield sec
-
-
-def parse_fio_config_iter(fio_cfg):
- for lineno, line in enumerate(fio_cfg.split("\n")):
- try:
- line = line.strip()
-
- if line.startswith("#") or line.startswith(";"):
- continue
-
- if line == "":
- continue
-
- if line.startswith('['):
- assert line.endswith(']'), "name should ends with ]"
- yield SECTION, line[1:-1], None
- elif '=' in line:
- opt_name, opt_val = line.split('=', 1)
- yield SETTING, opt_name.strip(), opt_val.strip()
- else:
- yield SETTING, line, None
- except Exception as exc:
- pref = "During parsing line number {0}\n".format(lineno)
- raise ValueError(pref + exc.message)
-
-
-def format_fio_config(fio_cfg):
- res = ""
- for pos, (name, section) in enumerate(fio_cfg):
- if name.startswith('_'):
+ if jc + jcount <= max_jobs and rc_ok:
+ runtime += curr_task_time
+ jcount += jc
+ curr_slice.append(sec)
continue
- if pos != 0:
- res += "\n"
+ assert len(curr_slice) != 0
+ yield curr_slice
- res += "[{0}]\n".format(name)
- for opt_name, opt_val in section.items():
- if opt_val is None:
- res += opt_name + "\n"
- else:
- res += "{0}={1}\n".format(opt_name, opt_val)
- return res
+ if '_ramp_time' in sec.vals:
+ sec.vals['ramp_time'] = sec.vals.pop('_ramp_time')
+ curr_task_time = calculate_execution_time([sec])
+
+ runtime = curr_task_time
+ jcount = jc
+ curr_slice = [sec]
+
+ if curr_slice != []:
+ yield curr_slice
-count = 0
+def parse_all_in_1(source, test_params):
+ lexer_it = fio_config_lexer(source)
+ sec_it = fio_config_parse(lexer_it, test_params)
+ sec_it = process_cycles(sec_it)
+ sec_it = process_repeats(sec_it)
+ return format_params_into_section_finall(sec_it)
-def to_bytes(sz):
- sz = sz.lower()
- try:
- return int(sz)
- except ValueError:
- if sz[-1] == 'm':
- return (1024 ** 2) * int(sz[:-1])
- if sz[-1] == 'k':
- return 1024 * int(sz[:-1])
- if sz[-1] == 'g':
- return (1024 ** 3) * int(sz[:-1])
- raise
+def parse_and_slice_all_in_1(source, test_params, **slice_params):
+ sec_it = parse_all_in_1(source, test_params)
+ return slice_config(sec_it, **slice_params)
-def do_run_fio_fake(bconf):
- def estimate_iops(sz, bw, lat):
- return 1 / (lat + float(sz) / bw)
- global count
- count += 1
- parsed_out = []
-
- BW = 120.0 * (1024 ** 2)
- LAT = 0.003
-
- for name, cfg in bconf:
- sz = to_bytes(cfg['blocksize'])
- curr_lat = LAT * ((random.random() - 0.5) * 0.1 + 1)
- curr_ulat = curr_lat * 1000000
- curr_bw = BW * ((random.random() - 0.5) * 0.1 + 1)
- iops = estimate_iops(sz, curr_bw, curr_lat)
- bw = iops * sz
-
- res = {'ctx': 10683,
- 'error': 0,
- 'groupid': 0,
- 'jobname': name,
- 'majf': 0,
- 'minf': 30,
- 'read': {'bw': 0,
- 'bw_agg': 0.0,
- 'bw_dev': 0.0,
- 'bw_max': 0,
- 'bw_mean': 0.0,
- 'bw_min': 0,
- 'clat': {'max': 0,
- 'mean': 0.0,
- 'min': 0,
- 'stddev': 0.0},
- 'io_bytes': 0,
- 'iops': 0,
- 'lat': {'max': 0, 'mean': 0.0,
- 'min': 0, 'stddev': 0.0},
- 'runtime': 0,
- 'slat': {'max': 0, 'mean': 0.0,
- 'min': 0, 'stddev': 0.0}
- },
- 'sys_cpu': 0.64,
- 'trim': {'bw': 0,
- 'bw_agg': 0.0,
- 'bw_dev': 0.0,
- 'bw_max': 0,
- 'bw_mean': 0.0,
- 'bw_min': 0,
- 'clat': {'max': 0,
- 'mean': 0.0,
- 'min': 0,
- 'stddev': 0.0},
- 'io_bytes': 0,
- 'iops': 0,
- 'lat': {'max': 0, 'mean': 0.0,
- 'min': 0, 'stddev': 0.0},
- 'runtime': 0,
- 'slat': {'max': 0, 'mean': 0.0,
- 'min': 0, 'stddev': 0.0}
- },
- 'usr_cpu': 0.23,
- 'write': {'bw': 0,
- 'bw_agg': 0,
- 'bw_dev': 0,
- 'bw_max': 0,
- 'bw_mean': 0,
- 'bw_min': 0,
- 'clat': {'max': 0, 'mean': 0,
- 'min': 0, 'stddev': 0},
- 'io_bytes': 0,
- 'iops': 0,
- 'lat': {'max': 0, 'mean': 0,
- 'min': 0, 'stddev': 0},
- 'runtime': 0,
- 'slat': {'max': 0, 'mean': 0.0,
- 'min': 0, 'stddev': 0.0}
- }
- }
-
- if cfg['rw'] in ('read', 'randread'):
- key = 'read'
- elif cfg['rw'] in ('write', 'randwrite'):
- key = 'write'
- else:
- raise ValueError("Uknown op type {0}".format(key))
-
- res[key]['bw'] = bw
- res[key]['iops'] = iops
- res[key]['runtime'] = 30
- res[key]['io_bytes'] = res[key]['runtime'] * bw
- res[key]['bw_agg'] = bw
- res[key]['bw_dev'] = bw / 30
- res[key]['bw_max'] = bw * 1.5
- res[key]['bw_min'] = bw / 1.5
- res[key]['bw_mean'] = bw
- res[key]['clat'] = {'max': curr_ulat * 10, 'mean': curr_ulat,
- 'min': curr_ulat / 2, 'stddev': curr_ulat}
- res[key]['lat'] = res[key]['clat'].copy()
- res[key]['slat'] = res[key]['clat'].copy()
-
- parsed_out.append(res)
-
- return zip(parsed_out, bconf)
+def compile_all_in_1(source, test_params, **slice_params):
+ slices_it = parse_and_slice_all_in_1(source, test_params, **slice_params)
+ for slices in slices_it:
+ yield fio_config_to_str(slices)
-def do_run_fio(bconf):
- benchmark_config = format_fio_config(bconf)
+def do_run_fio(config_slice):
+ benchmark_config = fio_config_to_str(config_slice)
cmd = ["fio", "--output-format=json", "--alloc-size=262144", "-"]
p = subprocess.Popen(cmd,
stdin=subprocess.PIPE,
@@ -414,91 +377,37 @@
raw_out = raw_out[:100]
raise ValueError(msg.format(raw_out, exc.message))
- return zip(parsed_out, bconf)
-
-# limited by fio
-MAX_JOBS = 1000
+ return zip(parsed_out, config_slice)
-def next_test_portion(whole_conf, runcycle, cluster=False):
- if cluster:
- for name, sec in whole_conf:
- if '_ramp_time' in sec:
- sec['ramp_time'] = sec.pop('_ramp_time')
- yield [(name, sec)]
- return
-
- jcount = 0
- runtime = 0
- bconf = []
-
- for pos, (name, sec) in enumerate(whole_conf):
- jc = int(sec.get('numjobs', '1'))
-
- if runcycle is not None:
- curr_task_time = calculate_execution_time([(name, sec)])
- else:
- curr_task_time = 0
-
- if jc > MAX_JOBS:
- err_templ = "Can't process job {0!r} - too large numjobs"
- raise ValueError(err_templ.format(name))
-
- if runcycle is not None and len(bconf) != 0:
- rc_ok = curr_task_time + runtime <= runcycle
- else:
- rc_ok = True
-
- if jc + jcount <= MAX_JOBS and rc_ok:
- runtime += curr_task_time
- jcount += jc
- bconf.append((name, sec))
- if '_ramp_time' in sec:
- del sec['_ramp_time']
- continue
-
- assert len(bconf) != 0
- yield bconf
-
- if '_ramp_time' in sec:
- sec['ramp_time'] = sec.pop('_ramp_time')
- curr_task_time = calculate_execution_time([(name, sec)])
-
- runtime = curr_task_time
- jcount = jc
- bconf = [(name, sec)]
-
- if bconf != []:
- yield bconf
-
-
-def add_job_results(jname, job_output, jconfig, res):
+def add_job_results(section, job_output, res):
if job_output['write']['iops'] != 0:
raw_result = job_output['write']
else:
raw_result = job_output['read']
- if jname not in res:
+ vals = section.vals
+ if section.name not in res:
j_res = {}
- j_res["rw"] = jconfig["rw"]
- j_res["sync_mode"] = get_test_sync_mode(jconfig)
- j_res["concurence"] = int(jconfig.get("numjobs", 1))
- j_res["blocksize"] = jconfig["blocksize"]
+ j_res["rw"] = vals["rw"]
+ j_res["sync_mode"] = get_test_sync_mode(vals)
+ j_res["concurence"] = int(vals.get("numjobs", 1))
+ j_res["blocksize"] = vals["blocksize"]
j_res["jobname"] = job_output["jobname"]
- j_res["timings"] = [int(jconfig.get("runtime", 0)),
- int(jconfig.get("ramp_time", 0))]
+ j_res["timings"] = [int(vals.get("runtime", 0)),
+ int(vals.get("ramp_time", 0))]
else:
- j_res = res[jname]
- assert j_res["rw"] == jconfig["rw"]
- assert j_res["rw"] == jconfig["rw"]
- assert j_res["sync_mode"] == get_test_sync_mode(jconfig)
- assert j_res["concurence"] == int(jconfig.get("numjobs", 1))
- assert j_res["blocksize"] == jconfig["blocksize"]
+ j_res = res[section.name]
+ assert j_res["rw"] == vals["rw"]
+ assert j_res["rw"] == vals["rw"]
+ assert j_res["sync_mode"] == get_test_sync_mode(vals)
+ assert j_res["concurence"] == int(vals.get("numjobs", 1))
+ assert j_res["blocksize"] == vals["blocksize"]
assert j_res["jobname"] == job_output["jobname"]
# ramp part is skipped for all tests, except first
- # assert j_res["timings"] == (jconfig.get("runtime"),
- # jconfig.get("ramp_time"))
+ # assert j_res["timings"] == (vals.get("runtime"),
+ # vals.get("ramp_time"))
def j_app(name, x):
j_res.setdefault(name, []).append(x)
@@ -509,65 +418,46 @@
j_app("clat", raw_result["clat"]["mean"])
j_app("slat", raw_result["slat"]["mean"])
- res[jname] = j_res
+ res[section.name] = j_res
-def compile(benchmark_config, params, skip=0, runcycle=None, cluster=False):
- whole_conf = list(parse_fio_config_full(benchmark_config, params))
- whole_conf = whole_conf[skip:]
- res = ""
-
- for bconf in next_test_portion(whole_conf, runcycle, cluster=cluster):
- res += format_fio_config(bconf)
- res += "\n#" + "-" * 50 + "\n\n"
-
- return res
-
-
-def run_fio(benchmark_config,
- params,
- runcycle=None,
- raw_results_func=None,
- skip_tests=0,
- fake_fio=False,
- cluster=False):
-
- whole_conf = list(parse_fio_config_full(benchmark_config, params))
- whole_conf = whole_conf[skip_tests:]
- res = {}
- curr_test_num = skip_tests
- executed_tests = 0
+def run_fio(sliced_it, raw_results_func=None):
+ sliced_list = list(sliced_it)
ok = True
+
try:
- for bconf in next_test_portion(whole_conf, runcycle, cluster=cluster):
+ curr_test_num = 0
+ executed_tests = 0
+ result = {}
- if fake_fio:
- res_cfg_it = do_run_fio_fake(bconf)
- else:
- res_cfg_it = do_run_fio(bconf)
-
+ for i, test_slice in enumerate(sliced_list):
+ res_cfg_it = do_run_fio(test_slice)
res_cfg_it = enumerate(res_cfg_it, curr_test_num)
- for curr_test_num, (job_output, (jname, jconfig)) in res_cfg_it:
+ for curr_test_num, (job_output, section) in res_cfg_it:
executed_tests += 1
+
if raw_results_func is not None:
raw_results_func(executed_tests,
- [job_output, jname, jconfig])
+ [job_output, section])
- assert jname == job_output["jobname"], \
- "{0} != {1}".format(jname, job_output["jobname"])
+ msg = "{0} != {1}".format(section.name, job_output["jobname"])
+ assert section.name == job_output["jobname"], msg
- if jname.startswith('_'):
+ if section.name.startswith('_'):
continue
- add_job_results(jname, job_output, jconfig, res)
+ add_job_results(section, job_output, result)
+
curr_test_num += 1
msg_template = "Done {0} tests from {1}. ETA: {2}"
- exec_time = estimate_cfg(benchmark_config, params, curr_test_num)
- print msg_template.format(curr_test_num - skip_tests,
- len(whole_conf),
- sec_to_str(exec_time))
+ rest = sliced_list[i:]
+ time_eta = sum(map(calculate_execution_time, rest))
+ test_left = sum(map(len, rest))
+ print msg_template.format(curr_test_num,
+ test_left,
+ sec_to_str(time_eta))
except (SystemExit, KeyboardInterrupt):
raise
@@ -578,7 +468,7 @@
print "======== END OF ERROR ========="
ok = False
- return res, executed_tests, ok
+ return result, executed_tests, ok
def run_benchmark(binary_tp, *argv, **kwargs):
@@ -606,11 +496,6 @@
job_cfg += char
-def estimate_cfg(job_cfg, params, skip_tests=0):
- bconf = list(parse_fio_config_full(job_cfg, params))[skip_tests:]
- return calculate_execution_time(bconf)
-
-
def sec_to_str(seconds):
h = seconds // 3600
m = (seconds % 3600) // 60
@@ -641,12 +526,6 @@
help="Max cycle length in seconds")
parser.add_argument("--show-raw-results", action='store_true',
default=False, help="Output raw input and results")
- parser.add_argument("--skip-tests", type=int, default=0, metavar="NUM",
- help="Skip NUM tests")
- parser.add_argument("--faked-fio", action='store_true',
- default=False, help="Emulate fio with 0 test time")
- parser.add_argument("--cluster", action='store_true',
- default=False, help="Apply cluster-test settings")
parser.add_argument("--params", nargs="*", metavar="PARAM=VAL",
default=[],
help="Provide set of pairs PARAM=VAL to" +
@@ -674,22 +553,25 @@
name, val = param_val.split("=", 1)
params[name] = val
+ slice_params = {
+ 'runcycle': argv_obj.runcycle,
+ }
+
+ sliced_it = parse_and_slice_all_in_1(job_cfg, params, **slice_params)
+
if argv_obj.estimate:
- print sec_to_str(estimate_cfg(job_cfg, params))
+ it = map(calculate_execution_time, sliced_it)
+ print sec_to_str(sum(it))
return 0
if argv_obj.num_tests or argv_obj.compile:
if argv_obj.compile:
- data = compile(job_cfg, params, argv_obj.skip_tests,
- cluster=argv_obj.cluster)
- out_fd.write(data)
- out_fd.write("\n")
+ for test_slice in sliced_it:
+ out_fd.write(fio_config_to_str(test_slice))
+ out_fd.write("\n#" + "-" * 70 + "\n\n")
if argv_obj.num_tests:
- bconf = list(parse_fio_config_full(job_cfg, params,
- argv_obj.cluster))
- bconf = bconf[argv_obj.skip_tests:]
- print len(bconf)
+ print len(list(sliced_it))
return 0
@@ -708,14 +590,7 @@
rrfunc = raw_res_func if argv_obj.show_raw_results else None
stime = time.time()
- job_res, num_tests, ok = run_benchmark(argv_obj.type,
- job_cfg,
- params,
- argv_obj.runcycle,
- rrfunc,
- argv_obj.skip_tests,
- argv_obj.faked_fio,
- cluster=argv_obj.cluster)
+ job_res, num_tests, ok = run_benchmark(argv_obj.type, sliced_it, rrfunc)
etime = time.time()
res = {'__meta__': {'raw_cfg': job_cfg, 'params': params}, 'res': job_res}
diff --git a/wally/suits/io/io_scenario_ceph.cfg b/wally/suits/io/io_scenario_ceph.cfg
index 5e793f2..cc8e411 100644
--- a/wally/suits/io/io_scenario_ceph.cfg
+++ b/wally/suits/io/io_scenario_ceph.cfg
@@ -7,56 +7,55 @@
softrandommap=1
filename={FILENAME}
NUM_ROUNDS=7
+NUMJOBS={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
size=5G
-ramp_time=20
-runtime=20
+ramp_time=30
+runtime=60
# ---------------------------------------------------------------------
-# check different thread count, direct read/write mode. (bw, iops) = func(th_count)
-# also check BW for seq read/write.
+# check different thread count, sync mode. (latency, iops) = func(th_count)
+# ---------------------------------------------------------------------
+[ceph_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize=4k
+rw=randwrite
+sync=1
+numjobs={NUMJOBS}
+
+# ---------------------------------------------------------------------
+# check different thread count, sync mode. (latency, iops) = func(th_count)
+# ---------------------------------------------------------------------
+[ceph_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize=4k
+rw=randwrite
+direct=1
+numjobs=1
+
+# ---------------------------------------------------------------------
+# check different thread count, direct read mode. (latency, iops) = func(th_count)
+# also check iops for randread
+# ---------------------------------------------------------------------
+[ceph_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize=4k
+rw=randread
+direct=1
+numjobs={NUMJOBS}
+
+# ---------------------------------------------------------------------
+# this is essentially sequential read openration
+# we can't use seq read with numjobs > 1 on clouds due to caching
+# ---------------------------------------------------------------------
+[ceph_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize=16m
+rw=randread
+direct=1
+numjobs={NUMJOBS}
+
+# ---------------------------------------------------------------------
+# sequential write
# ---------------------------------------------------------------------
[ceph_test_{TEST_SUMM} * {NUM_ROUNDS}]
blocksize=1m
-rw=read
+rw=write
direct=1
-offset_increment={PER_TH_OFFSET}
-numjobs={% 20, 120 %}
-
-# # ---------------------------------------------------------------------
-# # check different thread count, sync mode. (latency, iops) = func(th_count)
-# # ---------------------------------------------------------------------
-# [hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
-# blocksize=4k
-# rw=randwrite
-# sync=1
-# numjobs={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
-#
-# # ---------------------------------------------------------------------
-# # check different thread count, direct read mode. (latency, iops) = func(th_count)
-# # also check iops for randread
-# # ---------------------------------------------------------------------
-# [hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
-# blocksize=4k
-# rw=randread
-# direct=1
-# numjobs={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
-#
-# # ---------------------------------------------------------------------
-# # check different thread count, direct read/write mode. (bw, iops) = func(th_count)
-# # also check BW for seq read/write.
-# # ---------------------------------------------------------------------
-# [hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
-# blocksize=1m
-# rw={% read, write %}
-# direct=1
-# offset_increment=1073741824 # 1G
-# numjobs={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
-#
-# # ---------------------------------------------------------------------
-# # check IOPS randwrite.
-# # ---------------------------------------------------------------------
-# [hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
-# blocksize=4k
-# rw=randwrite
-# direct=1
+numjobs={NUMJOBS}
diff --git a/wally/suits/io/io_scenario_hdd.cfg b/wally/suits/io/io_scenario_hdd.cfg
index 46191f2..519fb0f 100644
--- a/wally/suits/io/io_scenario_hdd.cfg
+++ b/wally/suits/io/io_scenario_hdd.cfg
@@ -7,8 +7,9 @@
softrandommap=1
filename={FILENAME}
NUM_ROUNDS=7
+NUMJOBS={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
-size=10Gb
+size=10G
ramp_time=5
runtime=30
@@ -19,7 +20,7 @@
blocksize=4k
rw=randwrite
sync=1
-numjobs={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
+numjobs={NUMJOBS}
# ---------------------------------------------------------------------
# check different thread count, direct read mode. (latency, iops) = func(th_count)
@@ -29,18 +30,16 @@
blocksize=4k
rw=randread
direct=1
-numjobs={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
+numjobs={NUMJOBS}
# ---------------------------------------------------------------------
-# check different thread count, direct read/write mode. (bw, iops) = func(th_count)
-# also check BW for seq read/write.
+# No reason for th count > 1 in case of sequantial operations
+# They became random
# ---------------------------------------------------------------------
[hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
blocksize=1m
rw={% read, write %}
direct=1
-offset_increment=1073741824 # 1G
-numjobs={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
# ---------------------------------------------------------------------
# check IOPS randwrite.
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index c5615bb..0062569 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -101,16 +101,17 @@
self.config_params = test_options.get('params', {})
self.tool = test_options.get('tool', 'fio')
self.raw_cfg = open(self.config_fname).read()
- self.configs = list(io_agent.parse_fio_config_full(self.raw_cfg,
- self.config_params))
+ self.configs = list(io_agent.parse_all_in_1(self.raw_cfg,
+ self.config_params))
cmd_log = os.path.join(self.log_directory, "task_compiled.cfg")
raw_res = os.path.join(self.log_directory, "raw_results.txt")
fio_command_file = open_for_append_or_create(cmd_log)
- fio_command_file.write(io_agent.compile(self.raw_cfg,
- self.config_params,
- None))
+
+ cfg_s_it = io_agent.compile_all_in_1(self.raw_cfg, self.config_params)
+ splitter = "\n\n" + "-" * 60 + "\n\n"
+ fio_command_file.write(splitter.join(cfg_s_it))
self.fio_raw_results_file = open_for_append_or_create(raw_res)
def cleanup(self, conn):
@@ -140,22 +141,23 @@
if self.options.get('prefill_files', True):
files = {}
- for secname, params in self.configs:
- sz = ssize_to_b(params['size'])
+ for section in self.configs:
+ sz = ssize_to_b(section.vals['size'])
msz = sz / (1024 ** 2)
+
if sz % (1024 ** 2) != 0:
msz += 1
- fname = params['filename']
+ fname = section.vals['filename']
# if already has other test with the same file name
# take largest size
files[fname] = max(files.get(fname, 0), msz)
# logger.warning("dd run DISABLED")
- # cmd_templ = "dd if=/dev/zero of={0} bs={1} count={2}"
+ cmd_templ = "dd if=/dev/zero of={0} bs={1} count={2}"
- cmd_templ = "sudo dd if=/dev/zero of={0} bs={1} count={2}"
+ # cmd_templ = "sudo dd if=/dev/zero of={0} bs={1} count={2}"
ssize = 0
stime = time.time()
@@ -175,8 +177,8 @@
def run(self, conn, barrier):
# logger.warning("No tests runned")
# return
- cmd_templ = "sudo env python2 {0} {3} --type {1} {2} --json -"
- # cmd_templ = "env python2 {0} --type {1} {2} --json -"
+ # cmd_templ = "sudo env python2 {0} --type {1} {2} --json -"
+ cmd_templ = "env python2 {0} --type {1} {2} --json -"
params = " ".join("{0}={1}".format(k, v)
for k, v in self.config_params.items())
@@ -184,18 +186,10 @@
if "" != params:
params = "--params " + params
- if self.options.get('cluster', False):
- logger.info("Cluster mode is used")
- cluster_opt = "--cluster"
- else:
- logger.info("Non-cluster mode is used")
- cluster_opt = ""
-
- cmd = cmd_templ.format(self.io_py_remote, self.tool, params,
- cluster_opt)
+ cmd = cmd_templ.format(self.io_py_remote, self.tool, params)
logger.debug("Waiting on barrier")
- exec_time = io_agent.estimate_cfg(self.raw_cfg, self.config_params)
+ exec_time = io_agent.calculate_execution_time(self.configs)
exec_time_str = sec_to_str(exec_time)
try: