blob: d15d18ecd7b13a2568076f5dd44779750061fb44 [file] [log] [blame]
import sys
import time
import json
import random
import select
import pprint
import argparse
import traceback
import subprocess
import itertools
from collections import OrderedDict
SECTION = 0
SETTING = 1
def get_test_sync_mode(config):
try:
return config['sync_mode']
except KeyError:
pass
is_sync = config.get("sync", "0") == "1"
is_direct = config.get("direct", "0") == "1"
if is_sync and is_direct:
return 'x'
elif is_sync:
return 's'
elif is_direct:
return 'd'
else:
return 'a'
def get_test_summary(params):
rw = {"randread": "rr",
"randwrite": "rw",
"read": "sr",
"write": "sw"}[params["rw"]]
sync_mode = get_test_sync_mode(params)
th_count = params.get('numjobs')
if th_count is None:
th_count = params.get('concurence', '1')
th_count = int(th_count)
return "{0}{1}{2}th{3}".format(rw,
sync_mode,
params['blocksize'],
th_count)
counter = [0]
def extract_iterables(vals):
iterable_names = []
iterable_values = []
rest = {}
for val_name, val in vals.items():
if val is None or not val.startswith('{%'):
rest[val_name] = val
else:
assert val.endswith("%}")
content = val[2:-2]
iterable_names.append(val_name)
iterable_values.append(list(i.strip() for i in content.split(',')))
return iterable_names, iterable_values, rest
def format_params_into_section(sec, params, final=True):
processed_vals = {}
for val_name, val in sec.items():
if val is None:
processed_vals[val_name] = val
else:
try:
processed_vals[val_name] = val.format(**params)
except KeyError:
if final:
raise
processed_vals[val_name] = val
return processed_vals
def process_section(name, vals, defaults, format_params):
vals = vals.copy()
params = format_params.copy()
if '*' in name:
name, repeat = name.split('*')
name = name.strip()
repeat = int(repeat.format(**params))
else:
repeat = 1
# this code can be optimized
iterable_names, iterable_values, processed_vals = extract_iterables(vals)
group_report_err_msg = "Group reporting should be set if numjobs != 1"
if iterable_values == []:
processed_vals = format_params_into_section(processed_vals, params,
final=False)
params['UNIQ'] = 'UN{0}'.format(counter[0])
counter[0] += 1
params['TEST_SUMM'] = get_test_summary(processed_vals)
num_jobs = int(processed_vals.get('numjobs', '1'))
fsize = to_bytes(processed_vals['size'])
params['PER_TH_OFFSET'] = fsize // num_jobs
processed_vals = format_params_into_section(processed_vals, params,
final=True)
if num_jobs != 1:
assert 'group_reporting' in processed_vals, group_report_err_msg
ramp_time = processed_vals.get('ramp_time')
for i in range(repeat):
yield name.format(**params), processed_vals.copy()
if 'ramp_time' in processed_vals:
del processed_vals['ramp_time']
if ramp_time is not None:
processed_vals['ramp_time'] = ramp_time
else:
for it_vals in itertools.product(*iterable_values):
processed_vals = format_params_into_section(processed_vals, params,
final=False)
processed_vals.update(dict(zip(iterable_names, it_vals)))
params['UNIQ'] = 'UN{0}'.format(counter[0])
counter[0] += 1
params['TEST_SUMM'] = get_test_summary(processed_vals)
num_jobs = int(processed_vals.get('numjobs', '1'))
fsize = to_bytes(processed_vals['size'])
params['PER_TH_OFFSET'] = fsize // num_jobs
processed_vals = format_params_into_section(processed_vals, params,
final=True)
if processed_vals.get('numjobs', '1') != '1':
assert 'group_reporting' in processed_vals,\
group_report_err_msg
ramp_time = processed_vals.get('ramp_time')
for i in range(repeat):
yield name.format(**params), processed_vals.copy()
if 'ramp_time' in processed_vals:
processed_vals['_ramp_time'] = ramp_time
processed_vals.pop('ramp_time')
if ramp_time is not None:
processed_vals['ramp_time'] = ramp_time
processed_vals.pop('_ramp_time')
def calculate_execution_time(combinations):
time = 0
for _, params in combinations:
time += int(params.get('ramp_time', 0))
time += int(params.get('_ramp_time', 0))
time += int(params.get('runtime', 0))
return time
def parse_fio_config_full(fio_cfg, params=None):
defaults = {}
format_params = {}
if params is None:
ext_params = {}
else:
ext_params = params.copy()
curr_section = None
curr_section_name = None
for tp, name, val in parse_fio_config_iter(fio_cfg):
if tp == SECTION:
non_def = curr_section_name != 'defaults'
if curr_section_name is not None and non_def:
format_params.update(ext_params)
for sec in process_section(curr_section_name,
curr_section,
defaults,
format_params):
yield sec
if name == 'defaults':
curr_section = defaults
else:
curr_section = OrderedDict()
curr_section.update(defaults)
curr_section_name = name
else:
assert tp == SETTING
assert curr_section_name is not None, "no section name"
if name == name.upper():
assert curr_section_name == 'defaults'
format_params[name] = val
else:
curr_section[name] = val
if curr_section_name is not None and curr_section_name != 'defaults':
format_params.update(ext_params)
for sec in process_section(curr_section_name,
curr_section,
defaults,
format_params):
yield sec
def parse_fio_config_iter(fio_cfg):
for lineno, line in enumerate(fio_cfg.split("\n")):
try:
line = line.strip()
if line.startswith("#") or line.startswith(";"):
continue
if line == "":
continue
if line.startswith('['):
assert line.endswith(']'), "name should ends with ]"
yield SECTION, line[1:-1], None
elif '=' in line:
opt_name, opt_val = line.split('=', 1)
yield SETTING, opt_name.strip(), opt_val.strip()
else:
yield SETTING, line, None
except Exception as exc:
pref = "During parsing line number {0}\n".format(lineno)
raise ValueError(pref + exc.message)
def format_fio_config(fio_cfg):
res = ""
for pos, (name, section) in enumerate(fio_cfg):
if name.startswith('_'):
continue
if pos != 0:
res += "\n"
res += "[{0}]\n".format(name)
for opt_name, opt_val in section.items():
if opt_val is None:
res += opt_name + "\n"
else:
res += "{0}={1}\n".format(opt_name, opt_val)
return res
count = 0
def to_bytes(sz):
sz = sz.lower()
try:
return int(sz)
except ValueError:
if sz[-1] == 'm':
return (1024 ** 2) * int(sz[:-1])
if sz[-1] == 'k':
return 1024 * int(sz[:-1])
if sz[-1] == 'g':
return (1024 ** 3) * int(sz[:-1])
raise
def do_run_fio_fake(bconf):
def estimate_iops(sz, bw, lat):
return 1 / (lat + float(sz) / bw)
global count
count += 1
parsed_out = []
BW = 120.0 * (1024 ** 2)
LAT = 0.003
for name, cfg in bconf:
sz = to_bytes(cfg['blocksize'])
curr_lat = LAT * ((random.random() - 0.5) * 0.1 + 1)
curr_ulat = curr_lat * 1000000
curr_bw = BW * ((random.random() - 0.5) * 0.1 + 1)
iops = estimate_iops(sz, curr_bw, curr_lat)
bw = iops * sz
res = {'ctx': 10683,
'error': 0,
'groupid': 0,
'jobname': name,
'majf': 0,
'minf': 30,
'read': {'bw': 0,
'bw_agg': 0.0,
'bw_dev': 0.0,
'bw_max': 0,
'bw_mean': 0.0,
'bw_min': 0,
'clat': {'max': 0,
'mean': 0.0,
'min': 0,
'stddev': 0.0},
'io_bytes': 0,
'iops': 0,
'lat': {'max': 0, 'mean': 0.0,
'min': 0, 'stddev': 0.0},
'runtime': 0,
'slat': {'max': 0, 'mean': 0.0,
'min': 0, 'stddev': 0.0}
},
'sys_cpu': 0.64,
'trim': {'bw': 0,
'bw_agg': 0.0,
'bw_dev': 0.0,
'bw_max': 0,
'bw_mean': 0.0,
'bw_min': 0,
'clat': {'max': 0,
'mean': 0.0,
'min': 0,
'stddev': 0.0},
'io_bytes': 0,
'iops': 0,
'lat': {'max': 0, 'mean': 0.0,
'min': 0, 'stddev': 0.0},
'runtime': 0,
'slat': {'max': 0, 'mean': 0.0,
'min': 0, 'stddev': 0.0}
},
'usr_cpu': 0.23,
'write': {'bw': 0,
'bw_agg': 0,
'bw_dev': 0,
'bw_max': 0,
'bw_mean': 0,
'bw_min': 0,
'clat': {'max': 0, 'mean': 0,
'min': 0, 'stddev': 0},
'io_bytes': 0,
'iops': 0,
'lat': {'max': 0, 'mean': 0,
'min': 0, 'stddev': 0},
'runtime': 0,
'slat': {'max': 0, 'mean': 0.0,
'min': 0, 'stddev': 0.0}
}
}
if cfg['rw'] in ('read', 'randread'):
key = 'read'
elif cfg['rw'] in ('write', 'randwrite'):
key = 'write'
else:
raise ValueError("Uknown op type {0}".format(key))
res[key]['bw'] = bw
res[key]['iops'] = iops
res[key]['runtime'] = 30
res[key]['io_bytes'] = res[key]['runtime'] * bw
res[key]['bw_agg'] = bw
res[key]['bw_dev'] = bw / 30
res[key]['bw_max'] = bw * 1.5
res[key]['bw_min'] = bw / 1.5
res[key]['bw_mean'] = bw
res[key]['clat'] = {'max': curr_ulat * 10, 'mean': curr_ulat,
'min': curr_ulat / 2, 'stddev': curr_ulat}
res[key]['lat'] = res[key]['clat'].copy()
res[key]['slat'] = res[key]['clat'].copy()
parsed_out.append(res)
return zip(parsed_out, bconf)
def do_run_fio(bconf):
benchmark_config = format_fio_config(bconf)
cmd = ["fio", "--output-format=json", "--alloc-size=262144", "-"]
p = subprocess.Popen(cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
# set timeout
raw_out, raw_err = p.communicate(benchmark_config)
if 0 != p.returncode:
msg = "Fio failed with code: {0}\nOutput={1}"
raise OSError(msg.format(p.returncode, raw_err))
try:
parsed_out = json.loads(raw_out)["jobs"]
except KeyError:
msg = "Can't parse fio output {0!r}: no 'jobs' found"
raw_out = raw_out[:100]
raise ValueError(msg.format(raw_out))
except Exception as exc:
msg = "Can't parse fio output: {0!r}\nError: {1}"
raw_out = raw_out[:100]
raise ValueError(msg.format(raw_out, exc.message))
return zip(parsed_out, bconf)
# limited by fio
MAX_JOBS = 1000
def next_test_portion(whole_conf, runcycle, cluster=False):
if cluster:
for name, sec in whole_conf:
if '_ramp_time' in sec:
sec['ramp_time'] = sec.pop('_ramp_time')
yield [(name, sec)]
return
jcount = 0
runtime = 0
bconf = []
for pos, (name, sec) in enumerate(whole_conf):
jc = int(sec.get('numjobs', '1'))
if runcycle is not None:
curr_task_time = calculate_execution_time([(name, sec)])
else:
curr_task_time = 0
if jc > MAX_JOBS:
err_templ = "Can't process job {0!r} - too large numjobs"
raise ValueError(err_templ.format(name))
if runcycle is not None and len(bconf) != 0:
rc_ok = curr_task_time + runtime <= runcycle
else:
rc_ok = True
if jc + jcount <= MAX_JOBS and rc_ok:
runtime += curr_task_time
jcount += jc
bconf.append((name, sec))
if '_ramp_time' in sec:
del sec['_ramp_time']
continue
assert len(bconf) != 0
yield bconf
if '_ramp_time' in sec:
sec['ramp_time'] = sec.pop('_ramp_time')
curr_task_time = calculate_execution_time([(name, sec)])
runtime = curr_task_time
jcount = jc
bconf = [(name, sec)]
if bconf != []:
yield bconf
def add_job_results(jname, job_output, jconfig, res):
if job_output['write']['iops'] != 0:
raw_result = job_output['write']
else:
raw_result = job_output['read']
if jname not in res:
j_res = {}
j_res["rw"] = jconfig["rw"]
j_res["sync_mode"] = get_test_sync_mode(jconfig)
j_res["concurence"] = int(jconfig.get("numjobs", 1))
j_res["blocksize"] = jconfig["blocksize"]
j_res["jobname"] = job_output["jobname"]
j_res["timings"] = [int(jconfig.get("runtime", 0)),
int(jconfig.get("ramp_time", 0))]
else:
j_res = res[jname]
assert j_res["rw"] == jconfig["rw"]
assert j_res["rw"] == jconfig["rw"]
assert j_res["sync_mode"] == get_test_sync_mode(jconfig)
assert j_res["concurence"] == int(jconfig.get("numjobs", 1))
assert j_res["blocksize"] == jconfig["blocksize"]
assert j_res["jobname"] == job_output["jobname"]
# ramp part is skipped for all tests, except first
# assert j_res["timings"] == (jconfig.get("runtime"),
# jconfig.get("ramp_time"))
def j_app(name, x):
j_res.setdefault(name, []).append(x)
j_app("bw", raw_result["bw"])
j_app("iops", raw_result["iops"])
j_app("lat", raw_result["lat"]["mean"])
j_app("clat", raw_result["clat"]["mean"])
j_app("slat", raw_result["slat"]["mean"])
res[jname] = j_res
def compile(benchmark_config, params, skip=0, runcycle=None, cluster=False):
whole_conf = list(parse_fio_config_full(benchmark_config, params))
whole_conf = whole_conf[skip:]
res = ""
for bconf in next_test_portion(whole_conf, runcycle, cluster=cluster):
res += format_fio_config(bconf)
res += "\n#" + "-" * 50 + "\n\n"
return res
def run_fio(benchmark_config,
params,
runcycle=None,
raw_results_func=None,
skip_tests=0,
fake_fio=False,
cluster=False):
whole_conf = list(parse_fio_config_full(benchmark_config, params))
whole_conf = whole_conf[skip_tests:]
res = {}
curr_test_num = skip_tests
executed_tests = 0
ok = True
try:
for bconf in next_test_portion(whole_conf, runcycle, cluster=cluster):
if fake_fio:
res_cfg_it = do_run_fio_fake(bconf)
else:
res_cfg_it = do_run_fio(bconf)
res_cfg_it = enumerate(res_cfg_it, curr_test_num)
for curr_test_num, (job_output, (jname, jconfig)) in res_cfg_it:
executed_tests += 1
if raw_results_func is not None:
raw_results_func(executed_tests,
[job_output, jname, jconfig])
assert jname == job_output["jobname"], \
"{0} != {1}".format(jname, job_output["jobname"])
if jname.startswith('_'):
continue
add_job_results(jname, job_output, jconfig, res)
curr_test_num += 1
msg_template = "Done {0} tests from {1}. ETA: {2}"
exec_time = estimate_cfg(benchmark_config, params, curr_test_num)
print msg_template.format(curr_test_num - skip_tests,
len(whole_conf),
sec_to_str(exec_time))
except (SystemExit, KeyboardInterrupt):
raise
except Exception:
print "=========== ERROR ============="
traceback.print_exc()
print "======== END OF ERROR ========="
ok = False
return res, executed_tests, ok
def run_benchmark(binary_tp, *argv, **kwargs):
if 'fio' == binary_tp:
return run_fio(*argv, **kwargs)
raise ValueError("Unknown behcnmark {0}".format(binary_tp))
def read_config(fd, timeout=10):
job_cfg = ""
etime = time.time() + timeout
while True:
wtime = etime - time.time()
if wtime <= 0:
raise IOError("No config provided")
r, w, x = select.select([fd], [], [], wtime)
if len(r) == 0:
raise IOError("No config provided")
char = fd.read(1)
if '' == char:
return job_cfg
job_cfg += char
def estimate_cfg(job_cfg, params, skip_tests=0):
bconf = list(parse_fio_config_full(job_cfg, params))[skip_tests:]
return calculate_execution_time(bconf)
def sec_to_str(seconds):
h = seconds // 3600
m = (seconds % 3600) // 60
s = seconds % 60
return "{0}:{1:02d}:{2:02d}".format(h, m, s)
def parse_args(argv):
parser = argparse.ArgumentParser(
description="Run fio' and return result")
parser.add_argument("--type", metavar="BINARY_TYPE",
choices=['fio'], default='fio',
help=argparse.SUPPRESS)
parser.add_argument("--start-at", metavar="START_AT_UTC", type=int,
help="Start execution at START_AT_UTC")
parser.add_argument("--json", action="store_true", default=False,
help="Json output format")
parser.add_argument("--output", default='-', metavar="FILE_PATH",
help="Store results to FILE_PATH")
parser.add_argument("--estimate", action="store_true", default=False,
help="Only estimate task execution time")
parser.add_argument("--compile", action="store_true", default=False,
help="Compile config file to fio config")
parser.add_argument("--num-tests", action="store_true", default=False,
help="Show total number of tests")
parser.add_argument("--runcycle", type=int, default=None,
metavar="MAX_CYCLE_SECONDS",
help="Max cycle length in seconds")
parser.add_argument("--show-raw-results", action='store_true',
default=False, help="Output raw input and results")
parser.add_argument("--skip-tests", type=int, default=0, metavar="NUM",
help="Skip NUM tests")
parser.add_argument("--faked-fio", action='store_true',
default=False, help="Emulate fio with 0 test time")
parser.add_argument("--cluster", action='store_true',
default=False, help="Apply cluster-test settings")
parser.add_argument("--params", nargs="*", metavar="PARAM=VAL",
default=[],
help="Provide set of pairs PARAM=VAL to" +
"format into job description")
parser.add_argument("jobfile")
return parser.parse_args(argv)
def main(argv):
argv_obj = parse_args(argv)
if argv_obj.jobfile == '-':
job_cfg = read_config(sys.stdin)
else:
job_cfg = open(argv_obj.jobfile).read()
if argv_obj.output == '-':
out_fd = sys.stdout
else:
out_fd = open(argv_obj.output, "w")
params = {}
for param_val in argv_obj.params:
assert '=' in param_val
name, val = param_val.split("=", 1)
params[name] = val
if argv_obj.estimate:
print sec_to_str(estimate_cfg(job_cfg, params))
return 0
if argv_obj.num_tests or argv_obj.compile:
if argv_obj.compile:
data = compile(job_cfg, params, argv_obj.skip_tests,
cluster=argv_obj.cluster)
out_fd.write(data)
out_fd.write("\n")
if argv_obj.num_tests:
bconf = list(parse_fio_config_full(job_cfg, params,
argv_obj.cluster))
bconf = bconf[argv_obj.skip_tests:]
print len(bconf)
return 0
if argv_obj.start_at is not None:
ctime = time.time()
if argv_obj.start_at >= ctime:
time.sleep(ctime - argv_obj.start_at)
def raw_res_func(test_num, data):
pref = "========= RAW_RESULTS({0}) =========\n".format(test_num)
out_fd.write(pref)
out_fd.write(json.dumps(data))
out_fd.write("\n========= END OF RAW_RESULTS =========\n")
out_fd.flush()
rrfunc = raw_res_func if argv_obj.show_raw_results else None
stime = time.time()
job_res, num_tests, ok = run_benchmark(argv_obj.type,
job_cfg,
params,
argv_obj.runcycle,
rrfunc,
argv_obj.skip_tests,
argv_obj.faked_fio,
cluster=argv_obj.cluster)
etime = time.time()
res = {'__meta__': {'raw_cfg': job_cfg, 'params': params}, 'res': job_res}
oformat = 'json' if argv_obj.json else 'eval'
out_fd.write("\nRun {0} tests in {1} seconds\n".format(num_tests,
int(etime - stime)))
out_fd.write("========= RESULTS(format={0}) =========\n".format(oformat))
if argv_obj.json:
out_fd.write(json.dumps(res))
else:
out_fd.write(pprint.pformat(res) + "\n")
out_fd.write("\n========= END OF RESULTS =========\n")
return 0 if ok else 1
def fake_main(x):
import yaml
time.sleep(60)
out_fd = sys.stdout
fname = "/tmp/perf_tests/metempirical_alisha/raw_results.yaml"
res = yaml.load(open(fname).read())[0][1]
out_fd.write("========= RESULTS(format=json) =========\n")
out_fd.write(json.dumps(res))
out_fd.write("\n========= END OF RESULTS =========\n")
return 0
if __name__ == '__main__':
# exit(fake_main(sys.argv[1:]))
exit(main(sys.argv[1:]))