blob: 7537438c8a1980cc7e3a37c79efeb98791e98eb8 [file] [log] [blame]
koder aka kdanilovda45e882015-04-06 02:24:42 +03001import sys
2import time
3import json
4import select
5import pprint
6import argparse
7import traceback
8import subprocess
9import itertools
10from collections import OrderedDict
11
12
13SECTION = 0
14SETTING = 1
15
16
17def get_test_summary(params):
18 rw = {"randread": "rr",
19 "randwrite": "rw",
20 "read": "sr",
21 "write": "sw"}[params["rw"]]
22
23 if params.get("direct") == '1':
24 sync_mode = 'd'
25 elif params.get("sync") == '1':
26 sync_mode = 's'
27 else:
28 sync_mode = 'a'
29
30 th_count = int(params.get('numjobs', '1'))
31
32 return "{0}{1}{2}th{3}".format(rw, sync_mode,
33 params['blocksize'], th_count)
34
35
36counter = [0]
37
38
39def process_section(name, vals, defaults, format_params):
40 vals = vals.copy()
41 params = format_params.copy()
42
43 if '*' in name:
44 name, repeat = name.split('*')
45 name = name.strip()
46 repeat = int(repeat.format(**params))
47 else:
48 repeat = 1
49
50 # this code can be optimized
51 for i in range(repeat):
52 iterable_names = []
53 iterable_values = []
54 processed_vals = {}
55
56 for val_name, val in vals.items():
57 if val is None:
58 processed_vals[val_name] = val
59 # remove hardcode
60 elif val.startswith('{%'):
61 assert val.endswith("%}")
62 content = val[2:-2].format(**params)
63 iterable_names.append(val_name)
64 iterable_values.append(i.strip() for i in content.split(','))
65 else:
66 processed_vals[val_name] = val.format(**params)
67
68 if iterable_values == []:
69 params['UNIQ'] = 'UN{0}'.format(counter[0])
70 counter[0] += 1
71 params['TEST_SUMM'] = get_test_summary(processed_vals)
72 yield name.format(**params), processed_vals
73 else:
74 for it_vals in itertools.product(*iterable_values):
75 processed_vals.update(dict(zip(iterable_names, it_vals)))
76 params['UNIQ'] = 'UN{0}'.format(counter[0])
77 counter[0] += 1
78 params['TEST_SUMM'] = get_test_summary(processed_vals)
79 yield name.format(**params), processed_vals
80
81
82def calculate_execution_time(combinations):
83 time = 0
84 for _, params in combinations:
85 time += int(params.get('ramp_time', 0))
86 time += int(params.get('runtime', 0))
87 return time
88
89
90def parse_fio_config_full(fio_cfg, params=None):
91 defaults = {}
92 format_params = {}
93
94 if params is None:
95 ext_params = {}
96 else:
97 ext_params = params.copy()
98
99 curr_section = None
100 curr_section_name = None
101
102 for tp, name, val in parse_fio_config_iter(fio_cfg):
103 if tp == SECTION:
104 non_def = curr_section_name != 'defaults'
105 if curr_section_name is not None and non_def:
106 format_params.update(ext_params)
107 for sec in process_section(curr_section_name,
108 curr_section,
109 defaults,
110 format_params):
111 yield sec
112
113 if name == 'defaults':
114 curr_section = defaults
115 else:
116 curr_section = OrderedDict()
117 curr_section.update(defaults)
118 curr_section_name = name
119
120 else:
121 assert tp == SETTING
122 assert curr_section_name is not None, "no section name"
123 if name == name.upper():
124 assert curr_section_name == 'defaults'
125 format_params[name] = val
126 else:
127 curr_section[name] = val
128
129 if curr_section_name is not None and curr_section_name != 'defaults':
130 format_params.update(ext_params)
131 for sec in process_section(curr_section_name,
132 curr_section,
133 defaults,
134 format_params):
135 yield sec
136
137
138def parse_fio_config_iter(fio_cfg):
139 for lineno, line in enumerate(fio_cfg.split("\n")):
140 try:
141 line = line.strip()
142
143 if line.startswith("#") or line.startswith(";"):
144 continue
145
146 if line == "":
147 continue
148
149 if line.startswith('['):
150 assert line.endswith(']'), "name should ends with ]"
151 yield SECTION, line[1:-1], None
152 elif '=' in line:
153 opt_name, opt_val = line.split('=', 1)
154 yield SETTING, opt_name.strip(), opt_val.strip()
155 else:
156 yield SETTING, line, None
157 except Exception as exc:
158 pref = "During parsing line number {0}\n".format(lineno)
159 raise ValueError(pref + exc.message)
160
161
162def format_fio_config(fio_cfg):
163 res = ""
164 for pos, (name, section) in enumerate(fio_cfg):
165 if pos != 0:
166 res += "\n"
167
168 res += "[{0}]\n".format(name)
169 for opt_name, opt_val in section.items():
170 if opt_val is None:
171 res += opt_name + "\n"
172 else:
173 res += "{0}={1}\n".format(opt_name, opt_val)
174 return res
175
176
177def do_run_fio(bconf):
178 benchmark_config = format_fio_config(bconf)
179 cmd = ["fio", "--output-format=json", "-"]
180 p = subprocess.Popen(cmd, stdin=subprocess.PIPE,
181 stdout=subprocess.PIPE,
182 stderr=subprocess.STDOUT)
183
184 # set timeout
185 raw_out, _ = p.communicate(benchmark_config)
186
187 try:
188 parsed_out = json.loads(raw_out)["jobs"]
189 except Exception:
190 msg = "Can't parse fio output: {0!r}\nError: {1}"
191 raise ValueError(msg.format(raw_out, traceback.format_exc()))
192
193 return zip(parsed_out, bconf)
194
195
196# limited by fio
197MAX_JOBS = 1000
198
199
200def next_test_portion(whole_conf, runcycle):
201 jcount = 0
202 runtime = 0
203 bconf = []
204
205 for pos, (name, sec) in enumerate(whole_conf):
206 jc = int(sec.get('numjobs', '1'))
207
208 if runcycle is not None:
209 curr_task_time = calculate_execution_time([(name, sec)])
210 else:
211 curr_task_time = 0
212
213 if jc > MAX_JOBS:
214 err_templ = "Can't process job {0!r} - too large numjobs"
215 raise ValueError(err_templ.format(name))
216
217 if runcycle is not None and len(bconf) != 0:
218 rc_ok = curr_task_time + runtime <= runcycle
219 else:
220 rc_ok = True
221
222 if jc + jcount <= MAX_JOBS and rc_ok:
223 runtime += curr_task_time
224 jcount += jc
225 bconf.append((name, sec))
226 continue
227
228 assert len(bconf) != 0
229 yield bconf
230
231 runtime = curr_task_time
232 jcount = jc
233 bconf = [(name, sec)]
234
235 if bconf != []:
236 yield bconf
237
238
239def add_job_results(jname, job_output, jconfig, res):
240 if job_output['write']['iops'] != 0:
241 raw_result = job_output['write']
242 else:
243 raw_result = job_output['read']
244
245 if jname not in res:
246 j_res = {}
247 j_res["action"] = jconfig["rw"]
248 j_res["direct_io"] = jconfig.get("direct", "0") == "1"
249 j_res["sync"] = jconfig.get("sync", "0") == "1"
250 j_res["concurence"] = int(jconfig.get("numjobs", 1))
251 j_res["size"] = jconfig["size"]
252 j_res["jobname"] = job_output["jobname"]
253 j_res["timings"] = (jconfig.get("runtime"),
254 jconfig.get("ramp_time"))
255 else:
256 j_res = res[jname]
257 assert j_res["action"] == jconfig["rw"]
258
259 assert j_res["direct_io"] == \
260 (jconfig.get("direct", "0") == "1")
261
262 assert j_res["sync"] == (jconfig.get("sync", "0") == "1")
263 assert j_res["concurence"] == int(jconfig.get("numjobs", 1))
264 assert j_res["size"] == jconfig["size"]
265 assert j_res["jobname"] == job_output["jobname"]
266 assert j_res["timings"] == (jconfig.get("runtime"),
267 jconfig.get("ramp_time"))
268
269 def j_app(name, x):
270 j_res.setdefault(name, []).append(x)
271
272 # 'bw_dev bw_mean bw_max bw_min'.split()
273 j_app("bw_mean", raw_result["bw_mean"])
274 j_app("iops", raw_result["iops"])
275 j_app("lat", raw_result["lat"]["mean"])
276 j_app("clat", raw_result["clat"]["mean"])
277 j_app("slat", raw_result["slat"]["mean"])
278
279 res[jname] = j_res
280
281
282def run_fio(benchmark_config,
283 params,
284 runcycle=None,
285 raw_results_func=None,
286 skip_tests=0):
287
288 whole_conf = list(parse_fio_config_full(benchmark_config, params))
289 whole_conf = whole_conf[skip_tests:]
290 res = {}
291 curr_test_num = skip_tests
292 execited_tests = 0
293 try:
294 for bconf in next_test_portion(whole_conf, runcycle):
295 res_cfg_it = do_run_fio(bconf)
296 res_cfg_it = enumerate(res_cfg_it, curr_test_num)
297
298 for curr_test_num, (job_output, (jname, jconfig)) in res_cfg_it:
299 execited_tests += 1
300 if raw_results_func is not None:
301 raw_results_func(curr_test_num,
302 [job_output, jname, jconfig])
303
304 assert jname == job_output["jobname"]
305
306 if jname.startswith('_'):
307 continue
308
309 add_job_results(jname, job_output, jconfig, res)
310
311 except (SystemExit, KeyboardInterrupt):
312 pass
313
314 except Exception:
315 traceback.print_exc()
316
317 return res, execited_tests
318
319
320def run_benchmark(binary_tp, *argv, **kwargs):
321 if 'fio' == binary_tp:
322 return run_fio(*argv, **kwargs)
323 raise ValueError("Unknown behcnmark {0}".format(binary_tp))
324
325
326def parse_args(argv):
327 parser = argparse.ArgumentParser(
328 description="Run fio' and return result")
329 parser.add_argument("--type", metavar="BINARY_TYPE",
330 choices=['fio'], default='fio',
331 help=argparse.SUPPRESS)
332 parser.add_argument("--start-at", metavar="START_AT_UTC", type=int,
333 help="Start execution at START_AT_UTC")
334 parser.add_argument("--json", action="store_true", default=False,
335 help="Json output format")
336 parser.add_argument("--output", default='-', metavar="FILE_PATH",
337 help="Store results to FILE_PATH")
338 parser.add_argument("--estimate", action="store_true", default=False,
339 help="Only estimate task execution time")
340 parser.add_argument("--compile", action="store_true", default=False,
341 help="Compile config file to fio config")
342 parser.add_argument("--num-tests", action="store_true", default=False,
343 help="Show total number of tests")
344 parser.add_argument("--runcycle", type=int, default=None,
345 metavar="MAX_CYCLE_SECONDS",
346 help="Max cycle length in seconds")
347 parser.add_argument("--show-raw-results", action='store_true',
348 default=False, help="Output raw input and results")
349 parser.add_argument("--skip-tests", type=int, default=0, metavar="NUM",
350 help="Skip NUM tests")
351 parser.add_argument("--params", nargs="*", metavar="PARAM=VAL",
352 default=[],
353 help="Provide set of pairs PARAM=VAL to" +
354 "format into job description")
355 parser.add_argument("jobfile")
356 return parser.parse_args(argv)
357
358
359def read_config(fd, timeout=10):
360 job_cfg = ""
361 etime = time.time() + timeout
362 while True:
363 wtime = etime - time.time()
364 if wtime <= 0:
365 raise IOError("No config provided")
366
367 r, w, x = select.select([fd], [], [], wtime)
368 if len(r) == 0:
369 raise IOError("No config provided")
370
371 char = fd.read(1)
372 if '' == char:
373 return job_cfg
374
375 job_cfg += char
376
377
378def main(argv):
379 argv_obj = parse_args(argv)
380
381 if argv_obj.jobfile == '-':
382 job_cfg = read_config(sys.stdin)
383 else:
384 job_cfg = open(argv_obj.jobfile).read()
385
386 if argv_obj.output == '-':
387 out_fd = sys.stdout
388 else:
389 out_fd = open(argv_obj.output, "w")
390
391 params = {}
392 for param_val in argv_obj.params:
393 assert '=' in param_val
394 name, val = param_val.split("=", 1)
395 params[name] = val
396
397 if argv_obj.num_tests or argv_obj.compile or argv_obj.estimate:
398 bconf = list(parse_fio_config_full(job_cfg, params))
399 bconf = bconf[argv_obj.skip_tests:]
400
401 if argv_obj.compile:
402 out_fd.write(format_fio_config(bconf))
403 out_fd.write("\n")
404
405 if argv_obj.num_tests:
406 print len(bconf)
407
408 if argv_obj.estimate:
409 seconds = calculate_execution_time(bconf)
410
411 h = seconds // 3600
412 m = (seconds % 3600) // 60
413 s = seconds % 60
414
415 print "{0}:{1}:{2}".format(h, m, s)
416 return 0
417
418 if argv_obj.start_at is not None:
419 ctime = time.time()
420 if argv_obj.start_at >= ctime:
421 time.sleep(ctime - argv_obj.start_at)
422
423 def raw_res_func(test_num, data):
424 pref = "========= RAW_RESULTS({0}) =========\n".format(test_num)
425 out_fd.write(pref)
426 out_fd.write(json.dumps(data))
427 out_fd.write("\n========= END OF RAW_RESULTS =========\n")
428 out_fd.flush()
429
430 rrfunc = raw_res_func if argv_obj.show_raw_results else None
431
432 stime = time.time()
433 job_res, num_tests = run_benchmark(argv_obj.type,
434 job_cfg,
435 params,
436 argv_obj.runcycle,
437 rrfunc,
438 argv_obj.skip_tests)
439 etime = time.time()
440
441 res = {'__meta__': {'raw_cfg': job_cfg}, 'res': job_res}
442
443 oformat = 'json' if argv_obj.json else 'eval'
444 out_fd.write("\nRun {} tests in {} seconds\n".format(num_tests,
445 int(etime - stime)))
446 out_fd.write("========= RESULTS(format={0}) =========\n".format(oformat))
447 if argv_obj.json:
448 out_fd.write(json.dumps(res))
449 else:
450 out_fd.write(pprint.pformat(res) + "\n")
451 out_fd.write("\n========= END OF RESULTS =========\n".format(oformat))
452
453 return 0
454
455
456if __name__ == '__main__':
457 exit(main(sys.argv[1:]))