blob: 3c3e4365bb14fe5be9329486786d0a9b253d7ec7 [file] [log] [blame]
import os
import sys
import time
import json
import copy
import select
import pprint
import os.path
import argparse
import traceback
import subprocess
import itertools
from collections import OrderedDict
SECTION = 0
SETTING = 1
class FioJobSection(object):
def __init__(self, name):
self.name = name
self.vals = OrderedDict()
self.format_params = {}
def copy(self):
return copy.deepcopy(self)
def to_bytes(sz):
sz = sz.lower()
try:
return int(sz)
except ValueError:
if sz[-1] == 'm':
return (1024 ** 2) * int(sz[:-1])
if sz[-1] == 'k':
return 1024 * int(sz[:-1])
if sz[-1] == 'g':
return (1024 ** 3) * int(sz[:-1])
raise
def fio_config_lexer(fio_cfg):
for lineno, line in enumerate(fio_cfg.split("\n")):
try:
line = line.strip()
if line.startswith("#") or line.startswith(";"):
continue
if line == "":
continue
if line.startswith('['):
assert line.endswith(']'), "name should ends with ]"
yield lineno, SECTION, line[1:-1], None
elif '=' in line:
opt_name, opt_val = line.split('=', 1)
yield lineno, SETTING, opt_name.strip(), opt_val.strip()
else:
yield lineno, SETTING, line, '1'
except Exception as exc:
pref = "During parsing line number {0}\n{1!s}".format(lineno, exc)
raise ValueError(pref)
def fio_config_parse(lexer_iter, format_params):
orig_format_params_keys = set(format_params)
format_params = format_params.copy()
in_defaults = False
curr_section = None
defaults = OrderedDict()
for lineno, tp, name, val in lexer_iter:
if tp == SECTION:
if curr_section is not None:
yield curr_section
if name == 'defaults':
in_defaults = True
curr_section = None
else:
in_defaults = False
curr_section = FioJobSection(name)
curr_section.format_params = format_params.copy()
curr_section.vals = defaults.copy()
else:
assert tp == SETTING
if name == name.upper():
msg = "Param not in default section in line " + str(lineno)
assert in_defaults, msg
if name not in orig_format_params_keys:
# don't make parse_value for PARAMS
# they would be parsed later
# or this would breakes arrays
format_params[name] = val
elif in_defaults:
defaults[name] = parse_value(val)
else:
msg = "data outside section, line " + str(lineno)
assert curr_section is not None, msg
curr_section.vals[name] = parse_value(val)
if curr_section is not None:
yield curr_section
def parse_value(val):
try:
return int(val)
except ValueError:
pass
try:
return float(val)
except ValueError:
pass
if val.startswith('{%'):
assert val.endswith("%}")
content = val[2:-2]
vals = list(i.strip() for i in content.split(','))
return map(parse_value, vals)
return val
def process_repeats(sec_iter):
for sec in sec_iter:
if '*' in sec.name:
msg = "Only one '*' allowed in section name"
assert sec.name.count('*') == 1, msg
name, count = sec.name.split("*")
sec.name = name.strip()
count = count.strip()
try:
count = int(count.strip().format(**sec.format_params))
except KeyError:
raise ValueError("No parameter {0} given".format(count[1:-1]))
except ValueError:
msg = "Parameter {0} nas non-int value {1!r}"
raise ValueError(msg.format(count[1:-1],
count.format(**sec.format_params)))
yield sec.copy()
if 'ramp_time' in sec.vals:
sec = sec.copy()
sec.vals['_ramp_time'] = sec.vals.pop('ramp_time')
for _ in range(count - 1):
yield sec.copy()
else:
yield sec
def process_cycles(sec_iter):
# insert parametrized cycles
sec_iter = try_format_params_into_section(sec_iter)
for sec in sec_iter:
cycles_var_names = []
cycles_var_values = []
for name, val in sec.vals.items():
if isinstance(val, (list, tuple)):
cycles_var_names.append(name)
cycles_var_values.append(val)
if len(cycles_var_names) == 0:
yield sec
else:
for combination in itertools.product(*cycles_var_values):
new_sec = sec.copy()
new_sec.vals.update(zip(cycles_var_names, combination))
yield new_sec
def try_format_params_into_section(sec_iter):
for sec in sec_iter:
params = sec.format_params
for name, val in sec.vals.items():
if isinstance(val, basestring):
try:
sec.vals[name] = parse_value(val.format(**params))
except:
pass
yield sec
def format_params_into_section_finall(sec_iter, counter=[0]):
group_report_err_msg = "Group reporting should be set if numjobs != 1"
for sec in sec_iter:
num_jobs = int(sec.vals.get('numjobs', '1'))
if num_jobs != 1:
assert 'group_reporting' in sec.vals, group_report_err_msg
assert sec.vals.get('unified_rw_reporting', '1') in (1, '1')
sec.vals['unified_rw_reporting'] = '1'
params = sec.format_params.copy()
fsize = to_bytes(sec.vals['size'])
params['PER_TH_OFFSET'] = fsize // num_jobs
for name, val in sec.vals.items():
if isinstance(val, basestring):
sec.vals[name] = parse_value(val.format(**params))
else:
assert isinstance(val, (int, float))
params['UNIQ'] = 'UN{0}'.format(counter[0])
params['COUNTER'] = str(counter[0])
counter[0] += 1
params['TEST_SUMM'] = get_test_summary(sec.vals,
params.get('VM_COUNT', 1))
params.update(sec.vals)
sec.name = sec.name.format(**params)
yield sec
def fio_config_to_str(sec_iter):
res = ""
for pos, sec in enumerate(sec_iter):
if pos != 0:
res += "\n"
res += "[{0}]\n".format(sec.name)
for name, val in sec.vals.items():
if name.startswith('_'):
continue
res += "{0}={1}\n".format(name, val)
return res
def get_test_sync_mode(config):
try:
return config['sync_mode']
except KeyError:
pass
is_sync = str(config.get("sync", "0")) == "1"
is_direct = str(config.get("direct", "0")) == "1"
if is_sync and is_direct:
return 'x'
elif is_sync:
return 's'
elif is_direct:
return 'd'
else:
return 'a'
def get_test_summary(params, testnodes_count):
rw = {"randread": "rr",
"randwrite": "rw",
"read": "sr",
"write": "sw"}[params["rw"]]
sync_mode = get_test_sync_mode(params)
th_count = params.get('numjobs')
if th_count is None:
th_count = params.get('concurence', 1)
return "{0}{1}{2}th{3}vm{4}".format(rw,
sync_mode,
params['blocksize'],
th_count,
testnodes_count)
def calculate_execution_time(sec_iter):
time = 0
for sec in sec_iter:
time += sec.vals.get('ramp_time', 0)
time += sec.vals.get('runtime', 0)
return time
def slice_config(sec_iter, runcycle=None, max_jobs=1000,
soft_runcycle=None, split_on_names=False):
jcount = 0
runtime = 0
curr_slice = []
prev_name = None
for pos, sec in enumerate(sec_iter):
if prev_name is not None:
split_here = False
if soft_runcycle is not None and prev_name != sec.name:
split_here = (runtime > soft_runcycle)
if split_on_names and prev_name != sec.name:
split_here = True
if split_here:
yield curr_slice
curr_slice = []
runtime = 0
jcount = 0
prev_name = sec.name
jc = sec.vals.get('numjobs', 1)
msg = "numjobs should be integer, not {0!r}".format(jc)
assert isinstance(jc, int), msg
curr_task_time = calculate_execution_time([sec])
if jc > max_jobs:
err_templ = "Can't process job {0!r} - too large numjobs"
raise ValueError(err_templ.format(sec.name))
if runcycle is not None and len(curr_slice) != 0:
rc_ok = curr_task_time + runtime <= runcycle
else:
rc_ok = True
if jc + jcount <= max_jobs and rc_ok:
runtime += curr_task_time
jcount += jc
curr_slice.append(sec)
continue
assert len(curr_slice) != 0
yield curr_slice
if '_ramp_time' in sec.vals:
sec.vals['ramp_time'] = sec.vals.pop('_ramp_time')
curr_task_time = calculate_execution_time([sec])
runtime = curr_task_time
jcount = jc
curr_slice = [sec]
prev_name = None
if curr_slice != []:
yield curr_slice
def parse_all_in_1(source, test_params):
lexer_it = fio_config_lexer(source)
sec_it = fio_config_parse(lexer_it, test_params)
sec_it = process_cycles(sec_it)
sec_it = process_repeats(sec_it)
return format_params_into_section_finall(sec_it)
def parse_and_slice_all_in_1(source, test_params, **slice_params):
sec_it = parse_all_in_1(source, test_params)
return slice_config(sec_it, **slice_params)
def compile_all_in_1(source, test_params, **slice_params):
slices_it = parse_and_slice_all_in_1(source, test_params, **slice_params)
for slices in slices_it:
yield fio_config_to_str(slices)
def do_run_fio(config_slice):
benchmark_config = fio_config_to_str(config_slice)
cmd = ["fio", "--output-format=json", "--alloc-size=262144", "-"]
p = subprocess.Popen(cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
start_time = time.time()
# set timeout
raw_out, raw_err = p.communicate(benchmark_config)
end_time = time.time()
if 0 != p.returncode:
msg = "Fio failed with code: {0}\nOutput={1}"
raise OSError(msg.format(p.returncode, raw_err))
# HACK
raw_out = "{" + raw_out.split('{', 1)[1]
try:
parsed_out = json.loads(raw_out)["jobs"]
except KeyError:
msg = "Can't parse fio output {0!r}: no 'jobs' found"
raw_out = raw_out[:100]
raise ValueError(msg.format(raw_out))
except Exception as exc:
msg = "Can't parse fio output: {0!r}\nError: {1!s}"
raw_out = raw_out[:100]
raise ValueError(msg.format(raw_out, exc))
return zip(parsed_out, config_slice), (start_time, end_time)
class FioResult(object):
def __init__(self, name, params, run_interval, results):
self.params = params.copy()
self.name = name
self.run_interval = run_interval
self.results = results
def json_obj(self):
return self.__dict__
def make_job_results(section, job_output, slice_timings):
# merge by section.merge_id
raw_result = job_output['mixed']
res = {
"bw": raw_result["bw"],
"iops": raw_result["iops"],
"lat": raw_result["lat"]["mean"],
"clat": raw_result["clat"]["mean"],
"slat": raw_result["slat"]["mean"]
}
vls = section.vals.copy()
vls['sync_mode'] = get_test_sync_mode(vls)
vls['concurence'] = vls.get('numjobs', 1)
return FioResult(section.name, vls, slice_timings, res)
def get_slice_parts_offset(test_slice, real_inteval):
calc_exec_time = calculate_execution_time(test_slice)
coef = (real_inteval[1] - real_inteval[0]) / calc_exec_time
curr_offset = real_inteval[0]
for section in test_slice:
slen = calculate_execution_time([section]) * coef
yield (curr_offset, curr_offset + slen)
curr_offset += slen
def run_fio(sliced_it, raw_results_func=None):
sliced_list = list(sliced_it)
curr_test_num = 0
executed_tests = 0
result = []
for i, test_slice in enumerate(sliced_list):
test_slice = list(test_slice)
res_cfg_it, slice_timings = do_run_fio(test_slice)
sec_intervals = get_slice_parts_offset(test_slice,
slice_timings)
res_cfg_it = enumerate(zip(res_cfg_it, sec_intervals),
curr_test_num)
section_names = []
for curr_test_num, ((job_output, section), interval) in res_cfg_it:
executed_tests += 1
section_names.append(section.name)
if raw_results_func is not None:
raw_results_func(executed_tests,
[job_output, section])
msg = "{0} != {1}".format(section.name, job_output["jobname"])
assert section.name == job_output["jobname"], msg
result.append(make_job_results(section, job_output, interval))
curr_test_num += 1
msg_template = "Done {0} tests from {1}. ETA: {2}"
rest = sliced_list[i:]
time_eta = sum(map(calculate_execution_time, rest))
test_left = sum(map(len, rest))
print msg_template.format(curr_test_num,
test_left,
sec_to_str(time_eta))
return result
def run_benchmark(binary_tp, *argv, **kwargs):
if 'fio' == binary_tp:
return run_fio(*argv, **kwargs)
raise ValueError("Unknown behcnmark {0}".format(binary_tp))
def read_config(fd, timeout=10):
job_cfg = ""
etime = time.time() + timeout
while True:
wtime = etime - time.time()
if wtime <= 0:
raise IOError("No config provided")
r, w, x = select.select([fd], [], [], wtime)
if len(r) == 0:
raise IOError("No config provided")
char = fd.read(1)
if '' == char:
return job_cfg
job_cfg += char
def sec_to_str(seconds):
h = seconds // 3600
m = (seconds % 3600) // 60
s = seconds % 60
return "{0}:{1:02d}:{2:02d}".format(h, m, s)
def parse_args(argv):
parser = argparse.ArgumentParser(
description="Run fio' and return result")
parser.add_argument("--type", metavar="BINARY_TYPE",
choices=['fio'], default='fio',
help=argparse.SUPPRESS)
parser.add_argument("--start-at", metavar="START_AT_UTC", type=int,
help="Start execution at START_AT_UTC")
parser.add_argument("--json", action="store_true", default=False,
help="Json output format")
parser.add_argument("-o", "--output", default='-', metavar="FILE_PATH",
help="Store results to FILE_PATH")
parser.add_argument("--estimate", action="store_true", default=False,
help="Only estimate task execution time")
parser.add_argument("--compile", action="store_true", default=False,
help="Compile config file to fio config")
parser.add_argument("--num-tests", action="store_true", default=False,
help="Show total number of tests")
parser.add_argument("--runcycle", type=int, default=None,
metavar="MAX_CYCLE_SECONDS",
help="Max cycle length in seconds")
parser.add_argument("--show-raw-results", action='store_true',
default=False, help="Output raw input and results")
parser.add_argument("--params", nargs="*", metavar="PARAM=VAL",
default=[],
help="Provide set of pairs PARAM=VAL to" +
"format into job description")
parser.add_argument("-p", "--pid-file", metavar="FILE_TO_STORE_PID",
default=None, help="Store pid to FILE_TO_STORE_PID " +
"and remove this file on exit")
parser.add_argument("jobfile")
return parser.parse_args(argv)
def main(argv):
argv_obj = parse_args(argv)
if argv_obj.jobfile == '-':
job_cfg = read_config(sys.stdin)
else:
job_cfg = open(argv_obj.jobfile).read()
if argv_obj.output == '-':
out_fd = sys.stdout
else:
out_fd = open(argv_obj.output, "w")
if argv_obj.pid_file is not None:
with open(argv_obj.pid_file, "w") as fd:
fd.write(str(os.getpid()))
try:
params = {}
for param_val in argv_obj.params:
assert '=' in param_val
name, val = param_val.split("=", 1)
params[name] = val
slice_params = {
'runcycle': argv_obj.runcycle,
}
sliced_it = parse_and_slice_all_in_1(job_cfg, params, **slice_params)
if argv_obj.estimate:
it = map(calculate_execution_time, sliced_it)
print sec_to_str(sum(it))
return 0
if argv_obj.num_tests or argv_obj.compile:
if argv_obj.compile:
for test_slice in sliced_it:
out_fd.write(fio_config_to_str(test_slice))
out_fd.write("\n#" + "-" * 70 + "\n\n")
if argv_obj.num_tests:
print len(list(sliced_it))
return 0
if argv_obj.start_at is not None:
ctime = time.time()
if argv_obj.start_at >= ctime:
time.sleep(ctime - argv_obj.start_at)
def raw_res_func(test_num, data):
pref = "========= RAW_RESULTS({0}) =========\n".format(test_num)
out_fd.write(pref)
out_fd.write(json.dumps(data))
out_fd.write("\n========= END OF RAW_RESULTS =========\n")
out_fd.flush()
rrfunc = raw_res_func if argv_obj.show_raw_results else None
job_res = run_benchmark(argv_obj.type,
sliced_it, rrfunc)
res = {'__meta__': {'params': params,
'testnodes_count': int(params.get('VM_COUNT', 1))},
'res': [j.json_obj() for j in job_res]}
oformat = 'json' if argv_obj.json else 'eval'
msg = "========= RESULTS(format={0}) =========\n"
out_fd.write(msg.format(oformat))
if argv_obj.json:
out_fd.write(json.dumps(res))
else:
out_fd.write(pprint.pformat(res) + "\n")
out_fd.write("\n========= END OF RESULTS =========\n")
return 0
except:
out_fd.write("============ ERROR =============\n")
out_fd.write(traceback.format_exc() + "\n")
out_fd.write("============ END OF ERROR =============\n")
return 1
finally:
try:
if out_fd is not sys.stdout:
out_fd.flush()
os.fsync(out_fd)
out_fd.close()
except Exception:
traceback.print_exc()
if argv_obj.pid_file is not None:
if os.path.exists(argv_obj.pid_file):
os.unlink(argv_obj.pid_file)
def fake_main(x):
import yaml
time.sleep(60)
out_fd = sys.stdout
fname = "/tmp/perf_tests/metempirical_alisha/raw_results.yaml"
res = yaml.load(open(fname).read())[0][1]
out_fd.write("========= RESULTS(format=json) =========\n")
out_fd.write(json.dumps(res))
out_fd.write("\n========= END OF RESULTS =========\n")
return 0
if __name__ == '__main__':
# exit(fake_main(sys.argv[1:]))
exit(main(sys.argv[1:]))