blob: 4f23f907677ea81468f434aff641d7d0279e1ba0 [file] [log] [blame]
koder aka kdanilovda45e882015-04-06 02:24:42 +03001import sys
2import time
3import json
koder aka kdanilovb896f692015-04-07 14:57:55 +03004import random
koder aka kdanilovda45e882015-04-06 02:24:42 +03005import select
6import pprint
7import argparse
8import traceback
9import subprocess
10import itertools
11from collections import OrderedDict
12
13
14SECTION = 0
15SETTING = 1
16
17
18def get_test_summary(params):
19 rw = {"randread": "rr",
20 "randwrite": "rw",
21 "read": "sr",
22 "write": "sw"}[params["rw"]]
23
24 if params.get("direct") == '1':
25 sync_mode = 'd'
26 elif params.get("sync") == '1':
27 sync_mode = 's'
28 else:
29 sync_mode = 'a'
30
31 th_count = int(params.get('numjobs', '1'))
32
33 return "{0}{1}{2}th{3}".format(rw, sync_mode,
34 params['blocksize'], th_count)
35
36
37counter = [0]
38
39
40def process_section(name, vals, defaults, format_params):
41 vals = vals.copy()
42 params = format_params.copy()
43
44 if '*' in name:
45 name, repeat = name.split('*')
46 name = name.strip()
47 repeat = int(repeat.format(**params))
48 else:
49 repeat = 1
50
51 # this code can be optimized
koder aka kdanilovb896f692015-04-07 14:57:55 +030052 iterable_names = []
53 iterable_values = []
54 processed_vals = {}
koder aka kdanilovda45e882015-04-06 02:24:42 +030055
koder aka kdanilovb896f692015-04-07 14:57:55 +030056 for val_name, val in vals.items():
57 if val is None:
58 processed_vals[val_name] = val
59 # remove hardcode
60 elif val.startswith('{%'):
61 assert val.endswith("%}")
62 content = val[2:-2].format(**params)
63 iterable_names.append(val_name)
64 iterable_values.append(list(i.strip() for i in content.split(',')))
65 else:
66 processed_vals[val_name] = val.format(**params)
koder aka kdanilovda45e882015-04-06 02:24:42 +030067
koder aka kdanilovb896f692015-04-07 14:57:55 +030068 if iterable_values == []:
69 params['UNIQ'] = 'UN{0}'.format(counter[0])
70 counter[0] += 1
71 params['TEST_SUMM'] = get_test_summary(processed_vals)
72 for i in range(repeat):
73 yield name.format(**params), processed_vals
74 else:
75 for it_vals in itertools.product(*iterable_values):
76 processed_vals.update(dict(zip(iterable_names, it_vals)))
koder aka kdanilovda45e882015-04-06 02:24:42 +030077 params['UNIQ'] = 'UN{0}'.format(counter[0])
78 counter[0] += 1
79 params['TEST_SUMM'] = get_test_summary(processed_vals)
koder aka kdanilovb896f692015-04-07 14:57:55 +030080 for i in range(repeat):
81 yield name.format(**params), processed_vals.copy()
koder aka kdanilovda45e882015-04-06 02:24:42 +030082
83
84def calculate_execution_time(combinations):
85 time = 0
86 for _, params in combinations:
87 time += int(params.get('ramp_time', 0))
88 time += int(params.get('runtime', 0))
89 return time
90
91
92def parse_fio_config_full(fio_cfg, params=None):
93 defaults = {}
94 format_params = {}
95
96 if params is None:
97 ext_params = {}
98 else:
99 ext_params = params.copy()
100
101 curr_section = None
102 curr_section_name = None
103
104 for tp, name, val in parse_fio_config_iter(fio_cfg):
105 if tp == SECTION:
106 non_def = curr_section_name != 'defaults'
107 if curr_section_name is not None and non_def:
108 format_params.update(ext_params)
109 for sec in process_section(curr_section_name,
110 curr_section,
111 defaults,
112 format_params):
113 yield sec
114
115 if name == 'defaults':
116 curr_section = defaults
117 else:
118 curr_section = OrderedDict()
119 curr_section.update(defaults)
120 curr_section_name = name
121
122 else:
123 assert tp == SETTING
124 assert curr_section_name is not None, "no section name"
125 if name == name.upper():
126 assert curr_section_name == 'defaults'
127 format_params[name] = val
128 else:
129 curr_section[name] = val
130
131 if curr_section_name is not None and curr_section_name != 'defaults':
132 format_params.update(ext_params)
133 for sec in process_section(curr_section_name,
134 curr_section,
135 defaults,
136 format_params):
137 yield sec
138
139
140def parse_fio_config_iter(fio_cfg):
141 for lineno, line in enumerate(fio_cfg.split("\n")):
142 try:
143 line = line.strip()
144
145 if line.startswith("#") or line.startswith(";"):
146 continue
147
148 if line == "":
149 continue
150
151 if line.startswith('['):
152 assert line.endswith(']'), "name should ends with ]"
153 yield SECTION, line[1:-1], None
154 elif '=' in line:
155 opt_name, opt_val = line.split('=', 1)
156 yield SETTING, opt_name.strip(), opt_val.strip()
157 else:
158 yield SETTING, line, None
159 except Exception as exc:
160 pref = "During parsing line number {0}\n".format(lineno)
161 raise ValueError(pref + exc.message)
162
163
164def format_fio_config(fio_cfg):
165 res = ""
166 for pos, (name, section) in enumerate(fio_cfg):
167 if pos != 0:
168 res += "\n"
169
170 res += "[{0}]\n".format(name)
171 for opt_name, opt_val in section.items():
172 if opt_val is None:
173 res += opt_name + "\n"
174 else:
175 res += "{0}={1}\n".format(opt_name, opt_val)
176 return res
177
178
koder aka kdanilovb896f692015-04-07 14:57:55 +0300179count = 0
180
181
182def to_bytes(sz):
183 sz = sz.lower()
184 try:
185 return int(sz)
186 except ValueError:
187 if sz[-1] == 'm':
188 return (1024 ** 2) * int(sz[:-1])
189 if sz[-1] == 'k':
190 return 1024 * int(sz[:-1])
191 raise
192
193
194def estimate_iops(sz, bw, lat):
195 return 1 / (lat + float(sz) / bw)
196
197
198def do_run_fio_fake(bconf):
199 global count
200 count += 1
201 parsed_out = []
202
203 BW = 120.0 * (1024 ** 2)
204 LAT = 0.003
205
206 for name, cfg in bconf:
207 sz = to_bytes(cfg['blocksize'])
208 curr_lat = LAT * ((random.random() - 0.5) * 0.1 + 1)
209 curr_ulat = curr_lat * 1000000
210 curr_bw = BW * ((random.random() - 0.5) * 0.1 + 1)
211 iops = estimate_iops(sz, curr_bw, curr_lat)
212 bw = iops * sz
213
214 res = {'ctx': 10683,
215 'error': 0,
216 'groupid': 0,
217 'jobname': name,
218 'majf': 0,
219 'minf': 30,
220 'read': {'bw': 0,
221 'bw_agg': 0.0,
222 'bw_dev': 0.0,
223 'bw_max': 0,
224 'bw_mean': 0.0,
225 'bw_min': 0,
226 'clat': {'max': 0,
227 'mean': 0.0,
228 'min': 0,
229 'stddev': 0.0},
230 'io_bytes': 0,
231 'iops': 0,
232 'lat': {'max': 0, 'mean': 0.0,
233 'min': 0, 'stddev': 0.0},
234 'runtime': 0,
235 'slat': {'max': 0, 'mean': 0.0,
236 'min': 0, 'stddev': 0.0}
237 },
238 'sys_cpu': 0.64,
239 'trim': {'bw': 0,
240 'bw_agg': 0.0,
241 'bw_dev': 0.0,
242 'bw_max': 0,
243 'bw_mean': 0.0,
244 'bw_min': 0,
245 'clat': {'max': 0,
246 'mean': 0.0,
247 'min': 0,
248 'stddev': 0.0},
249 'io_bytes': 0,
250 'iops': 0,
251 'lat': {'max': 0, 'mean': 0.0,
252 'min': 0, 'stddev': 0.0},
253 'runtime': 0,
254 'slat': {'max': 0, 'mean': 0.0,
255 'min': 0, 'stddev': 0.0}
256 },
257 'usr_cpu': 0.23,
258 'write': {'bw': 0,
259 'bw_agg': 0,
260 'bw_dev': 0,
261 'bw_max': 0,
262 'bw_mean': 0,
263 'bw_min': 0,
264 'clat': {'max': 0, 'mean': 0,
265 'min': 0, 'stddev': 0},
266 'io_bytes': 0,
267 'iops': 0,
268 'lat': {'max': 0, 'mean': 0,
269 'min': 0, 'stddev': 0},
270 'runtime': 0,
271 'slat': {'max': 0, 'mean': 0.0,
272 'min': 0, 'stddev': 0.0}
273 }
274 }
275
276 if cfg['rw'] in ('read', 'randread'):
277 key = 'read'
278 elif cfg['rw'] in ('write', 'randwrite'):
279 key = 'write'
280 else:
281 raise ValueError("Uknown op type {0}".format(key))
282
283 res[key]['bw'] = bw
284 res[key]['iops'] = iops
285 res[key]['runtime'] = 30
286 res[key]['io_bytes'] = res[key]['runtime'] * bw
287 res[key]['bw_agg'] = bw
288 res[key]['bw_dev'] = bw / 30
289 res[key]['bw_max'] = bw * 1.5
290 res[key]['bw_min'] = bw / 1.5
291 res[key]['bw_mean'] = bw
292 res[key]['clat'] = {'max': curr_ulat * 10, 'mean': curr_ulat,
293 'min': curr_ulat / 2, 'stddev': curr_ulat}
294 res[key]['lat'] = res[key]['clat'].copy()
295 res[key]['slat'] = res[key]['clat'].copy()
296
297 parsed_out.append(res)
298
299 return zip(parsed_out, bconf)
300
301
koder aka kdanilovda45e882015-04-06 02:24:42 +0300302def do_run_fio(bconf):
303 benchmark_config = format_fio_config(bconf)
304 cmd = ["fio", "--output-format=json", "-"]
305 p = subprocess.Popen(cmd, stdin=subprocess.PIPE,
306 stdout=subprocess.PIPE,
307 stderr=subprocess.STDOUT)
308
309 # set timeout
310 raw_out, _ = p.communicate(benchmark_config)
311
312 try:
313 parsed_out = json.loads(raw_out)["jobs"]
314 except Exception:
315 msg = "Can't parse fio output: {0!r}\nError: {1}"
316 raise ValueError(msg.format(raw_out, traceback.format_exc()))
317
318 return zip(parsed_out, bconf)
319
koder aka kdanilovda45e882015-04-06 02:24:42 +0300320# limited by fio
321MAX_JOBS = 1000
322
323
324def next_test_portion(whole_conf, runcycle):
325 jcount = 0
326 runtime = 0
327 bconf = []
328
329 for pos, (name, sec) in enumerate(whole_conf):
330 jc = int(sec.get('numjobs', '1'))
331
332 if runcycle is not None:
333 curr_task_time = calculate_execution_time([(name, sec)])
334 else:
335 curr_task_time = 0
336
337 if jc > MAX_JOBS:
338 err_templ = "Can't process job {0!r} - too large numjobs"
339 raise ValueError(err_templ.format(name))
340
341 if runcycle is not None and len(bconf) != 0:
342 rc_ok = curr_task_time + runtime <= runcycle
343 else:
344 rc_ok = True
345
346 if jc + jcount <= MAX_JOBS and rc_ok:
347 runtime += curr_task_time
348 jcount += jc
349 bconf.append((name, sec))
350 continue
351
352 assert len(bconf) != 0
353 yield bconf
354
355 runtime = curr_task_time
356 jcount = jc
357 bconf = [(name, sec)]
358
359 if bconf != []:
360 yield bconf
361
362
363def add_job_results(jname, job_output, jconfig, res):
364 if job_output['write']['iops'] != 0:
365 raw_result = job_output['write']
366 else:
367 raw_result = job_output['read']
368
369 if jname not in res:
370 j_res = {}
371 j_res["action"] = jconfig["rw"]
372 j_res["direct_io"] = jconfig.get("direct", "0") == "1"
373 j_res["sync"] = jconfig.get("sync", "0") == "1"
374 j_res["concurence"] = int(jconfig.get("numjobs", 1))
375 j_res["size"] = jconfig["size"]
376 j_res["jobname"] = job_output["jobname"]
377 j_res["timings"] = (jconfig.get("runtime"),
378 jconfig.get("ramp_time"))
379 else:
380 j_res = res[jname]
381 assert j_res["action"] == jconfig["rw"]
382
383 assert j_res["direct_io"] == \
384 (jconfig.get("direct", "0") == "1")
385
386 assert j_res["sync"] == (jconfig.get("sync", "0") == "1")
387 assert j_res["concurence"] == int(jconfig.get("numjobs", 1))
388 assert j_res["size"] == jconfig["size"]
389 assert j_res["jobname"] == job_output["jobname"]
390 assert j_res["timings"] == (jconfig.get("runtime"),
391 jconfig.get("ramp_time"))
392
393 def j_app(name, x):
394 j_res.setdefault(name, []).append(x)
395
396 # 'bw_dev bw_mean bw_max bw_min'.split()
397 j_app("bw_mean", raw_result["bw_mean"])
398 j_app("iops", raw_result["iops"])
399 j_app("lat", raw_result["lat"]["mean"])
400 j_app("clat", raw_result["clat"]["mean"])
401 j_app("slat", raw_result["slat"]["mean"])
402
403 res[jname] = j_res
404
405
406def run_fio(benchmark_config,
407 params,
408 runcycle=None,
409 raw_results_func=None,
koder aka kdanilovb896f692015-04-07 14:57:55 +0300410 skip_tests=0,
411 fake_fio=False):
koder aka kdanilovda45e882015-04-06 02:24:42 +0300412
413 whole_conf = list(parse_fio_config_full(benchmark_config, params))
414 whole_conf = whole_conf[skip_tests:]
415 res = {}
416 curr_test_num = skip_tests
417 execited_tests = 0
418 try:
419 for bconf in next_test_portion(whole_conf, runcycle):
koder aka kdanilovb896f692015-04-07 14:57:55 +0300420
421 if fake_fio:
422 res_cfg_it = do_run_fio_fake(bconf)
423 else:
424 res_cfg_it = do_run_fio(bconf)
425
koder aka kdanilovda45e882015-04-06 02:24:42 +0300426 res_cfg_it = enumerate(res_cfg_it, curr_test_num)
427
428 for curr_test_num, (job_output, (jname, jconfig)) in res_cfg_it:
429 execited_tests += 1
430 if raw_results_func is not None:
431 raw_results_func(curr_test_num,
432 [job_output, jname, jconfig])
433
koder aka kdanilovb896f692015-04-07 14:57:55 +0300434 assert jname == job_output["jobname"], \
435 "{0} != {1}".format(jname, job_output["jobname"])
koder aka kdanilovda45e882015-04-06 02:24:42 +0300436
437 if jname.startswith('_'):
438 continue
439
440 add_job_results(jname, job_output, jconfig, res)
441
442 except (SystemExit, KeyboardInterrupt):
443 pass
444
445 except Exception:
446 traceback.print_exc()
447
448 return res, execited_tests
449
450
451def run_benchmark(binary_tp, *argv, **kwargs):
452 if 'fio' == binary_tp:
453 return run_fio(*argv, **kwargs)
454 raise ValueError("Unknown behcnmark {0}".format(binary_tp))
455
456
457def parse_args(argv):
458 parser = argparse.ArgumentParser(
459 description="Run fio' and return result")
460 parser.add_argument("--type", metavar="BINARY_TYPE",
461 choices=['fio'], default='fio',
462 help=argparse.SUPPRESS)
463 parser.add_argument("--start-at", metavar="START_AT_UTC", type=int,
464 help="Start execution at START_AT_UTC")
465 parser.add_argument("--json", action="store_true", default=False,
466 help="Json output format")
467 parser.add_argument("--output", default='-', metavar="FILE_PATH",
468 help="Store results to FILE_PATH")
469 parser.add_argument("--estimate", action="store_true", default=False,
470 help="Only estimate task execution time")
471 parser.add_argument("--compile", action="store_true", default=False,
472 help="Compile config file to fio config")
473 parser.add_argument("--num-tests", action="store_true", default=False,
474 help="Show total number of tests")
475 parser.add_argument("--runcycle", type=int, default=None,
476 metavar="MAX_CYCLE_SECONDS",
477 help="Max cycle length in seconds")
478 parser.add_argument("--show-raw-results", action='store_true',
479 default=False, help="Output raw input and results")
480 parser.add_argument("--skip-tests", type=int, default=0, metavar="NUM",
481 help="Skip NUM tests")
koder aka kdanilovb896f692015-04-07 14:57:55 +0300482 parser.add_argument("--faked-fio", action='store_true',
483 default=False, help="Emulate fio with 0 test time")
koder aka kdanilovda45e882015-04-06 02:24:42 +0300484 parser.add_argument("--params", nargs="*", metavar="PARAM=VAL",
485 default=[],
486 help="Provide set of pairs PARAM=VAL to" +
487 "format into job description")
488 parser.add_argument("jobfile")
489 return parser.parse_args(argv)
490
491
492def read_config(fd, timeout=10):
493 job_cfg = ""
494 etime = time.time() + timeout
495 while True:
496 wtime = etime - time.time()
497 if wtime <= 0:
498 raise IOError("No config provided")
499
500 r, w, x = select.select([fd], [], [], wtime)
501 if len(r) == 0:
502 raise IOError("No config provided")
503
504 char = fd.read(1)
505 if '' == char:
506 return job_cfg
507
508 job_cfg += char
509
510
511def main(argv):
512 argv_obj = parse_args(argv)
513
514 if argv_obj.jobfile == '-':
515 job_cfg = read_config(sys.stdin)
516 else:
517 job_cfg = open(argv_obj.jobfile).read()
518
519 if argv_obj.output == '-':
520 out_fd = sys.stdout
521 else:
522 out_fd = open(argv_obj.output, "w")
523
524 params = {}
525 for param_val in argv_obj.params:
526 assert '=' in param_val
527 name, val = param_val.split("=", 1)
528 params[name] = val
529
530 if argv_obj.num_tests or argv_obj.compile or argv_obj.estimate:
531 bconf = list(parse_fio_config_full(job_cfg, params))
532 bconf = bconf[argv_obj.skip_tests:]
533
534 if argv_obj.compile:
535 out_fd.write(format_fio_config(bconf))
536 out_fd.write("\n")
537
538 if argv_obj.num_tests:
539 print len(bconf)
540
541 if argv_obj.estimate:
542 seconds = calculate_execution_time(bconf)
543
544 h = seconds // 3600
545 m = (seconds % 3600) // 60
546 s = seconds % 60
547
548 print "{0}:{1}:{2}".format(h, m, s)
549 return 0
550
551 if argv_obj.start_at is not None:
552 ctime = time.time()
553 if argv_obj.start_at >= ctime:
554 time.sleep(ctime - argv_obj.start_at)
555
556 def raw_res_func(test_num, data):
557 pref = "========= RAW_RESULTS({0}) =========\n".format(test_num)
558 out_fd.write(pref)
559 out_fd.write(json.dumps(data))
560 out_fd.write("\n========= END OF RAW_RESULTS =========\n")
561 out_fd.flush()
562
563 rrfunc = raw_res_func if argv_obj.show_raw_results else None
564
565 stime = time.time()
566 job_res, num_tests = run_benchmark(argv_obj.type,
567 job_cfg,
568 params,
569 argv_obj.runcycle,
570 rrfunc,
koder aka kdanilovb896f692015-04-07 14:57:55 +0300571 argv_obj.skip_tests,
572 argv_obj.faked_fio)
koder aka kdanilovda45e882015-04-06 02:24:42 +0300573 etime = time.time()
574
575 res = {'__meta__': {'raw_cfg': job_cfg}, 'res': job_res}
576
577 oformat = 'json' if argv_obj.json else 'eval'
578 out_fd.write("\nRun {} tests in {} seconds\n".format(num_tests,
579 int(etime - stime)))
580 out_fd.write("========= RESULTS(format={0}) =========\n".format(oformat))
581 if argv_obj.json:
582 out_fd.write(json.dumps(res))
583 else:
584 out_fd.write(pprint.pformat(res) + "\n")
585 out_fd.write("\n========= END OF RESULTS =========\n".format(oformat))
586
587 return 0
588
589
590if __name__ == '__main__':
591 exit(main(sys.argv[1:]))