blob: d6268899d3506a02e50bfed19477286dc896a3ea [file] [log] [blame]
koder aka kdanilov2e928022015-04-08 13:47:15 +03001import re
koder aka kdanilovda45e882015-04-06 02:24:42 +03002import sys
3import time
4import json
koder aka kdanilovb896f692015-04-07 14:57:55 +03005import random
koder aka kdanilovda45e882015-04-06 02:24:42 +03006import select
7import pprint
8import argparse
9import traceback
10import subprocess
11import itertools
12from collections import OrderedDict
13
14
15SECTION = 0
16SETTING = 1
17
18
19def get_test_summary(params):
20 rw = {"randread": "rr",
21 "randwrite": "rw",
22 "read": "sr",
23 "write": "sw"}[params["rw"]]
24
25 if params.get("direct") == '1':
26 sync_mode = 'd'
27 elif params.get("sync") == '1':
28 sync_mode = 's'
29 else:
30 sync_mode = 'a'
31
32 th_count = int(params.get('numjobs', '1'))
33
34 return "{0}{1}{2}th{3}".format(rw, sync_mode,
35 params['blocksize'], th_count)
36
37
38counter = [0]
39
40
41def process_section(name, vals, defaults, format_params):
42 vals = vals.copy()
43 params = format_params.copy()
44
45 if '*' in name:
46 name, repeat = name.split('*')
47 name = name.strip()
48 repeat = int(repeat.format(**params))
49 else:
50 repeat = 1
51
52 # this code can be optimized
koder aka kdanilovb896f692015-04-07 14:57:55 +030053 iterable_names = []
54 iterable_values = []
55 processed_vals = {}
koder aka kdanilovda45e882015-04-06 02:24:42 +030056
koder aka kdanilovb896f692015-04-07 14:57:55 +030057 for val_name, val in vals.items():
58 if val is None:
59 processed_vals[val_name] = val
60 # remove hardcode
61 elif val.startswith('{%'):
62 assert val.endswith("%}")
63 content = val[2:-2].format(**params)
64 iterable_names.append(val_name)
65 iterable_values.append(list(i.strip() for i in content.split(',')))
66 else:
67 processed_vals[val_name] = val.format(**params)
koder aka kdanilovda45e882015-04-06 02:24:42 +030068
koder aka kdanilov2e928022015-04-08 13:47:15 +030069 group_report_err_msg = "Group reporting should be set if numjobs != 1"
70
koder aka kdanilovb896f692015-04-07 14:57:55 +030071 if iterable_values == []:
72 params['UNIQ'] = 'UN{0}'.format(counter[0])
73 counter[0] += 1
74 params['TEST_SUMM'] = get_test_summary(processed_vals)
koder aka kdanilov2e928022015-04-08 13:47:15 +030075
76 if processed_vals.get('numjobs', '1') != '1':
77 assert 'group_reporting' in processed_vals, group_report_err_msg
78
koder aka kdanilovb896f692015-04-07 14:57:55 +030079 for i in range(repeat):
koder aka kdanilov2e928022015-04-08 13:47:15 +030080 yield name.format(**params), processed_vals.copy()
koder aka kdanilovb896f692015-04-07 14:57:55 +030081 else:
82 for it_vals in itertools.product(*iterable_values):
83 processed_vals.update(dict(zip(iterable_names, it_vals)))
koder aka kdanilovda45e882015-04-06 02:24:42 +030084 params['UNIQ'] = 'UN{0}'.format(counter[0])
85 counter[0] += 1
86 params['TEST_SUMM'] = get_test_summary(processed_vals)
koder aka kdanilov2e928022015-04-08 13:47:15 +030087
88 if processed_vals.get('numjobs', '1') != '1':
89 assert 'group_reporting' in processed_vals,\
90 group_report_err_msg
91
koder aka kdanilovb896f692015-04-07 14:57:55 +030092 for i in range(repeat):
93 yield name.format(**params), processed_vals.copy()
koder aka kdanilovda45e882015-04-06 02:24:42 +030094
95
96def calculate_execution_time(combinations):
97 time = 0
98 for _, params in combinations:
99 time += int(params.get('ramp_time', 0))
100 time += int(params.get('runtime', 0))
101 return time
102
103
104def parse_fio_config_full(fio_cfg, params=None):
105 defaults = {}
106 format_params = {}
107
108 if params is None:
109 ext_params = {}
110 else:
111 ext_params = params.copy()
112
113 curr_section = None
114 curr_section_name = None
115
116 for tp, name, val in parse_fio_config_iter(fio_cfg):
117 if tp == SECTION:
118 non_def = curr_section_name != 'defaults'
119 if curr_section_name is not None and non_def:
120 format_params.update(ext_params)
121 for sec in process_section(curr_section_name,
122 curr_section,
123 defaults,
124 format_params):
125 yield sec
126
127 if name == 'defaults':
128 curr_section = defaults
129 else:
130 curr_section = OrderedDict()
131 curr_section.update(defaults)
132 curr_section_name = name
133
134 else:
135 assert tp == SETTING
136 assert curr_section_name is not None, "no section name"
137 if name == name.upper():
138 assert curr_section_name == 'defaults'
139 format_params[name] = val
140 else:
141 curr_section[name] = val
142
143 if curr_section_name is not None and curr_section_name != 'defaults':
144 format_params.update(ext_params)
145 for sec in process_section(curr_section_name,
146 curr_section,
147 defaults,
148 format_params):
149 yield sec
150
151
152def parse_fio_config_iter(fio_cfg):
153 for lineno, line in enumerate(fio_cfg.split("\n")):
154 try:
155 line = line.strip()
156
157 if line.startswith("#") or line.startswith(";"):
158 continue
159
160 if line == "":
161 continue
162
163 if line.startswith('['):
164 assert line.endswith(']'), "name should ends with ]"
165 yield SECTION, line[1:-1], None
166 elif '=' in line:
167 opt_name, opt_val = line.split('=', 1)
168 yield SETTING, opt_name.strip(), opt_val.strip()
169 else:
170 yield SETTING, line, None
171 except Exception as exc:
172 pref = "During parsing line number {0}\n".format(lineno)
173 raise ValueError(pref + exc.message)
174
175
176def format_fio_config(fio_cfg):
177 res = ""
178 for pos, (name, section) in enumerate(fio_cfg):
179 if pos != 0:
180 res += "\n"
181
182 res += "[{0}]\n".format(name)
183 for opt_name, opt_val in section.items():
184 if opt_val is None:
185 res += opt_name + "\n"
186 else:
187 res += "{0}={1}\n".format(opt_name, opt_val)
188 return res
189
190
koder aka kdanilovb896f692015-04-07 14:57:55 +0300191count = 0
192
193
194def to_bytes(sz):
195 sz = sz.lower()
196 try:
197 return int(sz)
198 except ValueError:
199 if sz[-1] == 'm':
200 return (1024 ** 2) * int(sz[:-1])
201 if sz[-1] == 'k':
202 return 1024 * int(sz[:-1])
203 raise
204
205
206def estimate_iops(sz, bw, lat):
207 return 1 / (lat + float(sz) / bw)
208
209
210def do_run_fio_fake(bconf):
211 global count
212 count += 1
213 parsed_out = []
214
215 BW = 120.0 * (1024 ** 2)
216 LAT = 0.003
217
218 for name, cfg in bconf:
219 sz = to_bytes(cfg['blocksize'])
220 curr_lat = LAT * ((random.random() - 0.5) * 0.1 + 1)
221 curr_ulat = curr_lat * 1000000
222 curr_bw = BW * ((random.random() - 0.5) * 0.1 + 1)
223 iops = estimate_iops(sz, curr_bw, curr_lat)
224 bw = iops * sz
225
226 res = {'ctx': 10683,
227 'error': 0,
228 'groupid': 0,
229 'jobname': name,
230 'majf': 0,
231 'minf': 30,
232 'read': {'bw': 0,
233 'bw_agg': 0.0,
234 'bw_dev': 0.0,
235 'bw_max': 0,
236 'bw_mean': 0.0,
237 'bw_min': 0,
238 'clat': {'max': 0,
239 'mean': 0.0,
240 'min': 0,
241 'stddev': 0.0},
242 'io_bytes': 0,
243 'iops': 0,
244 'lat': {'max': 0, 'mean': 0.0,
245 'min': 0, 'stddev': 0.0},
246 'runtime': 0,
247 'slat': {'max': 0, 'mean': 0.0,
248 'min': 0, 'stddev': 0.0}
249 },
250 'sys_cpu': 0.64,
251 'trim': {'bw': 0,
252 'bw_agg': 0.0,
253 'bw_dev': 0.0,
254 'bw_max': 0,
255 'bw_mean': 0.0,
256 'bw_min': 0,
257 'clat': {'max': 0,
258 'mean': 0.0,
259 'min': 0,
260 'stddev': 0.0},
261 'io_bytes': 0,
262 'iops': 0,
263 'lat': {'max': 0, 'mean': 0.0,
264 'min': 0, 'stddev': 0.0},
265 'runtime': 0,
266 'slat': {'max': 0, 'mean': 0.0,
267 'min': 0, 'stddev': 0.0}
268 },
269 'usr_cpu': 0.23,
270 'write': {'bw': 0,
271 'bw_agg': 0,
272 'bw_dev': 0,
273 'bw_max': 0,
274 'bw_mean': 0,
275 'bw_min': 0,
276 'clat': {'max': 0, 'mean': 0,
277 'min': 0, 'stddev': 0},
278 'io_bytes': 0,
279 'iops': 0,
280 'lat': {'max': 0, 'mean': 0,
281 'min': 0, 'stddev': 0},
282 'runtime': 0,
283 'slat': {'max': 0, 'mean': 0.0,
284 'min': 0, 'stddev': 0.0}
285 }
286 }
287
288 if cfg['rw'] in ('read', 'randread'):
289 key = 'read'
290 elif cfg['rw'] in ('write', 'randwrite'):
291 key = 'write'
292 else:
293 raise ValueError("Uknown op type {0}".format(key))
294
295 res[key]['bw'] = bw
296 res[key]['iops'] = iops
297 res[key]['runtime'] = 30
298 res[key]['io_bytes'] = res[key]['runtime'] * bw
299 res[key]['bw_agg'] = bw
300 res[key]['bw_dev'] = bw / 30
301 res[key]['bw_max'] = bw * 1.5
302 res[key]['bw_min'] = bw / 1.5
303 res[key]['bw_mean'] = bw
304 res[key]['clat'] = {'max': curr_ulat * 10, 'mean': curr_ulat,
305 'min': curr_ulat / 2, 'stddev': curr_ulat}
306 res[key]['lat'] = res[key]['clat'].copy()
307 res[key]['slat'] = res[key]['clat'].copy()
308
309 parsed_out.append(res)
310
311 return zip(parsed_out, bconf)
312
313
koder aka kdanilovda45e882015-04-06 02:24:42 +0300314def do_run_fio(bconf):
315 benchmark_config = format_fio_config(bconf)
316 cmd = ["fio", "--output-format=json", "-"]
317 p = subprocess.Popen(cmd, stdin=subprocess.PIPE,
318 stdout=subprocess.PIPE,
319 stderr=subprocess.STDOUT)
320
321 # set timeout
322 raw_out, _ = p.communicate(benchmark_config)
323
324 try:
325 parsed_out = json.loads(raw_out)["jobs"]
326 except Exception:
327 msg = "Can't parse fio output: {0!r}\nError: {1}"
328 raise ValueError(msg.format(raw_out, traceback.format_exc()))
329
330 return zip(parsed_out, bconf)
331
koder aka kdanilovda45e882015-04-06 02:24:42 +0300332# limited by fio
333MAX_JOBS = 1000
334
335
336def next_test_portion(whole_conf, runcycle):
337 jcount = 0
338 runtime = 0
339 bconf = []
340
341 for pos, (name, sec) in enumerate(whole_conf):
342 jc = int(sec.get('numjobs', '1'))
343
344 if runcycle is not None:
345 curr_task_time = calculate_execution_time([(name, sec)])
346 else:
347 curr_task_time = 0
348
349 if jc > MAX_JOBS:
350 err_templ = "Can't process job {0!r} - too large numjobs"
351 raise ValueError(err_templ.format(name))
352
353 if runcycle is not None and len(bconf) != 0:
354 rc_ok = curr_task_time + runtime <= runcycle
355 else:
356 rc_ok = True
357
358 if jc + jcount <= MAX_JOBS and rc_ok:
359 runtime += curr_task_time
360 jcount += jc
361 bconf.append((name, sec))
362 continue
363
364 assert len(bconf) != 0
365 yield bconf
366
367 runtime = curr_task_time
368 jcount = jc
369 bconf = [(name, sec)]
370
371 if bconf != []:
372 yield bconf
373
374
375def add_job_results(jname, job_output, jconfig, res):
376 if job_output['write']['iops'] != 0:
377 raw_result = job_output['write']
378 else:
379 raw_result = job_output['read']
380
381 if jname not in res:
382 j_res = {}
383 j_res["action"] = jconfig["rw"]
384 j_res["direct_io"] = jconfig.get("direct", "0") == "1"
385 j_res["sync"] = jconfig.get("sync", "0") == "1"
386 j_res["concurence"] = int(jconfig.get("numjobs", 1))
koder aka kdanilov2e928022015-04-08 13:47:15 +0300387 j_res["blocksize"] = jconfig["blocksize"]
koder aka kdanilovda45e882015-04-06 02:24:42 +0300388 j_res["jobname"] = job_output["jobname"]
389 j_res["timings"] = (jconfig.get("runtime"),
390 jconfig.get("ramp_time"))
391 else:
392 j_res = res[jname]
393 assert j_res["action"] == jconfig["rw"]
394
395 assert j_res["direct_io"] == \
396 (jconfig.get("direct", "0") == "1")
397
398 assert j_res["sync"] == (jconfig.get("sync", "0") == "1")
399 assert j_res["concurence"] == int(jconfig.get("numjobs", 1))
koder aka kdanilov2e928022015-04-08 13:47:15 +0300400 assert j_res["blocksize"] == jconfig["blocksize"]
koder aka kdanilovda45e882015-04-06 02:24:42 +0300401 assert j_res["jobname"] == job_output["jobname"]
402 assert j_res["timings"] == (jconfig.get("runtime"),
403 jconfig.get("ramp_time"))
404
405 def j_app(name, x):
406 j_res.setdefault(name, []).append(x)
407
408 # 'bw_dev bw_mean bw_max bw_min'.split()
409 j_app("bw_mean", raw_result["bw_mean"])
410 j_app("iops", raw_result["iops"])
411 j_app("lat", raw_result["lat"]["mean"])
412 j_app("clat", raw_result["clat"]["mean"])
413 j_app("slat", raw_result["slat"]["mean"])
414
415 res[jname] = j_res
416
417
418def run_fio(benchmark_config,
419 params,
420 runcycle=None,
421 raw_results_func=None,
koder aka kdanilovb896f692015-04-07 14:57:55 +0300422 skip_tests=0,
423 fake_fio=False):
koder aka kdanilovda45e882015-04-06 02:24:42 +0300424
425 whole_conf = list(parse_fio_config_full(benchmark_config, params))
426 whole_conf = whole_conf[skip_tests:]
427 res = {}
428 curr_test_num = skip_tests
koder aka kdanilov2e928022015-04-08 13:47:15 +0300429 executed_tests = 0
koder aka kdanilovda45e882015-04-06 02:24:42 +0300430 try:
431 for bconf in next_test_portion(whole_conf, runcycle):
koder aka kdanilovb896f692015-04-07 14:57:55 +0300432
433 if fake_fio:
434 res_cfg_it = do_run_fio_fake(bconf)
435 else:
436 res_cfg_it = do_run_fio(bconf)
437
koder aka kdanilovda45e882015-04-06 02:24:42 +0300438 res_cfg_it = enumerate(res_cfg_it, curr_test_num)
439
440 for curr_test_num, (job_output, (jname, jconfig)) in res_cfg_it:
koder aka kdanilov2e928022015-04-08 13:47:15 +0300441 executed_tests += 1
koder aka kdanilovda45e882015-04-06 02:24:42 +0300442 if raw_results_func is not None:
koder aka kdanilov2e928022015-04-08 13:47:15 +0300443 raw_results_func(executed_tests,
koder aka kdanilovda45e882015-04-06 02:24:42 +0300444 [job_output, jname, jconfig])
445
koder aka kdanilovb896f692015-04-07 14:57:55 +0300446 assert jname == job_output["jobname"], \
447 "{0} != {1}".format(jname, job_output["jobname"])
koder aka kdanilovda45e882015-04-06 02:24:42 +0300448
449 if jname.startswith('_'):
450 continue
451
452 add_job_results(jname, job_output, jconfig, res)
453
454 except (SystemExit, KeyboardInterrupt):
455 pass
456
457 except Exception:
458 traceback.print_exc()
459
koder aka kdanilov2e928022015-04-08 13:47:15 +0300460 return res, executed_tests
koder aka kdanilovda45e882015-04-06 02:24:42 +0300461
462
463def run_benchmark(binary_tp, *argv, **kwargs):
464 if 'fio' == binary_tp:
465 return run_fio(*argv, **kwargs)
466 raise ValueError("Unknown behcnmark {0}".format(binary_tp))
467
468
koder aka kdanilov2e928022015-04-08 13:47:15 +0300469def parse_output(out_err):
470 start_patt = r"(?ims)=+\s+RESULTS\(format=json\)\s+=+"
471 end_patt = r"(?ims)=+\s+END OF RESULTS\s+=+"
472
473 for block in re.split(start_patt, out_err)[1:]:
474 data, garbage = re.split(end_patt, block)
475 yield json.loads(data.strip())
476
477 start_patt = r"(?ims)=+\s+RESULTS\(format=eval\)\s+=+"
478 end_patt = r"(?ims)=+\s+END OF RESULTS\s+=+"
479
480 for block in re.split(start_patt, out_err)[1:]:
481 data, garbage = re.split(end_patt, block)
482 yield eval(data.strip())
483
484
koder aka kdanilovda45e882015-04-06 02:24:42 +0300485def parse_args(argv):
486 parser = argparse.ArgumentParser(
487 description="Run fio' and return result")
488 parser.add_argument("--type", metavar="BINARY_TYPE",
489 choices=['fio'], default='fio',
490 help=argparse.SUPPRESS)
491 parser.add_argument("--start-at", metavar="START_AT_UTC", type=int,
492 help="Start execution at START_AT_UTC")
493 parser.add_argument("--json", action="store_true", default=False,
494 help="Json output format")
495 parser.add_argument("--output", default='-', metavar="FILE_PATH",
496 help="Store results to FILE_PATH")
497 parser.add_argument("--estimate", action="store_true", default=False,
498 help="Only estimate task execution time")
499 parser.add_argument("--compile", action="store_true", default=False,
500 help="Compile config file to fio config")
501 parser.add_argument("--num-tests", action="store_true", default=False,
502 help="Show total number of tests")
503 parser.add_argument("--runcycle", type=int, default=None,
504 metavar="MAX_CYCLE_SECONDS",
505 help="Max cycle length in seconds")
506 parser.add_argument("--show-raw-results", action='store_true',
507 default=False, help="Output raw input and results")
508 parser.add_argument("--skip-tests", type=int, default=0, metavar="NUM",
509 help="Skip NUM tests")
koder aka kdanilovb896f692015-04-07 14:57:55 +0300510 parser.add_argument("--faked-fio", action='store_true',
511 default=False, help="Emulate fio with 0 test time")
koder aka kdanilovda45e882015-04-06 02:24:42 +0300512 parser.add_argument("--params", nargs="*", metavar="PARAM=VAL",
513 default=[],
514 help="Provide set of pairs PARAM=VAL to" +
515 "format into job description")
516 parser.add_argument("jobfile")
517 return parser.parse_args(argv)
518
519
520def read_config(fd, timeout=10):
521 job_cfg = ""
522 etime = time.time() + timeout
523 while True:
524 wtime = etime - time.time()
525 if wtime <= 0:
526 raise IOError("No config provided")
527
528 r, w, x = select.select([fd], [], [], wtime)
529 if len(r) == 0:
530 raise IOError("No config provided")
531
532 char = fd.read(1)
533 if '' == char:
534 return job_cfg
535
536 job_cfg += char
537
538
539def main(argv):
540 argv_obj = parse_args(argv)
541
542 if argv_obj.jobfile == '-':
543 job_cfg = read_config(sys.stdin)
544 else:
545 job_cfg = open(argv_obj.jobfile).read()
546
547 if argv_obj.output == '-':
548 out_fd = sys.stdout
549 else:
550 out_fd = open(argv_obj.output, "w")
551
552 params = {}
553 for param_val in argv_obj.params:
554 assert '=' in param_val
555 name, val = param_val.split("=", 1)
556 params[name] = val
557
558 if argv_obj.num_tests or argv_obj.compile or argv_obj.estimate:
559 bconf = list(parse_fio_config_full(job_cfg, params))
560 bconf = bconf[argv_obj.skip_tests:]
561
562 if argv_obj.compile:
563 out_fd.write(format_fio_config(bconf))
564 out_fd.write("\n")
565
566 if argv_obj.num_tests:
567 print len(bconf)
568
569 if argv_obj.estimate:
570 seconds = calculate_execution_time(bconf)
571
572 h = seconds // 3600
573 m = (seconds % 3600) // 60
574 s = seconds % 60
575
576 print "{0}:{1}:{2}".format(h, m, s)
577 return 0
578
579 if argv_obj.start_at is not None:
580 ctime = time.time()
581 if argv_obj.start_at >= ctime:
582 time.sleep(ctime - argv_obj.start_at)
583
584 def raw_res_func(test_num, data):
585 pref = "========= RAW_RESULTS({0}) =========\n".format(test_num)
586 out_fd.write(pref)
587 out_fd.write(json.dumps(data))
588 out_fd.write("\n========= END OF RAW_RESULTS =========\n")
589 out_fd.flush()
590
591 rrfunc = raw_res_func if argv_obj.show_raw_results else None
592
593 stime = time.time()
594 job_res, num_tests = run_benchmark(argv_obj.type,
595 job_cfg,
596 params,
597 argv_obj.runcycle,
598 rrfunc,
koder aka kdanilovb896f692015-04-07 14:57:55 +0300599 argv_obj.skip_tests,
600 argv_obj.faked_fio)
koder aka kdanilovda45e882015-04-06 02:24:42 +0300601 etime = time.time()
602
603 res = {'__meta__': {'raw_cfg': job_cfg}, 'res': job_res}
604
605 oformat = 'json' if argv_obj.json else 'eval'
606 out_fd.write("\nRun {} tests in {} seconds\n".format(num_tests,
607 int(etime - stime)))
608 out_fd.write("========= RESULTS(format={0}) =========\n".format(oformat))
609 if argv_obj.json:
610 out_fd.write(json.dumps(res))
611 else:
612 out_fd.write(pprint.pformat(res) + "\n")
613 out_fd.write("\n========= END OF RESULTS =========\n".format(oformat))
614
615 return 0
616
617
618if __name__ == '__main__':
619 exit(main(sys.argv[1:]))