blob: 52c4bb3e8cacb0686af3dcef18dd4e67ef2a5705 [file] [log] [blame]
koder aka kdanilov4af1c1d2015-05-18 15:48:58 +03001import os
2import sys
3import copy
4import os.path
5import argparse
6import itertools
7from collections import OrderedDict, namedtuple
8
9
10from wally.utils import sec_to_str
11
12
13SECTION = 0
14SETTING = 1
15INCLUDE = 2
16
17
18Var = namedtuple('Var', ('name',))
19CfgLine = namedtuple('CfgLine', ('fname', 'lineno', 'oline',
20 'tp', 'name', 'val'))
21
22
23class FioJobSection(object):
24 def __init__(self, name):
25 self.name = name
26 self.vals = OrderedDict()
27
28 def copy(self):
29 return copy.deepcopy(self)
30
31 def required_vars(self):
32 for name, val in self.vals.items():
33 if isinstance(val, Var):
34 yield name, val
35
36 def is_free(self):
37 return len(list(self.required_vars())) == 0
38
39 def __str__(self):
40 res = "[{0}]\n".format(self.name)
41
42 for name, val in self.vals.items():
43 if name.startswith('_') or name == name.upper():
44 continue
45 if isinstance(val, Var):
46 res += "{0}={{{1}}}\n".format(name, val.name)
47 else:
48 res += "{0}={1}\n".format(name, val)
49
50 return res
51
52
53def to_bytes(sz):
54 sz = sz.lower()
55 try:
56 return int(sz)
57 except ValueError:
58 if sz[-1] == 'm':
59 return (1024 ** 2) * int(sz[:-1])
60 if sz[-1] == 'k':
61 return 1024 * int(sz[:-1])
62 if sz[-1] == 'g':
63 return (1024 ** 3) * int(sz[:-1])
64 raise
65
66
67class ParseError(ValueError):
68 def __init__(self, msg, fname, lineno, line_cont=""):
69 ValueError.__init__(self, msg)
70 self.file_name = fname
71 self.lineno = lineno
72 self.line_cont = line_cont
73
74 def __str__(self):
75 msg = "In {0}:{1} ({2}) : {3}"
76 return msg.format(self.file_name,
77 self.lineno,
78 self.line_cont,
79 super(ParseError, self).__str__())
80
81
82def is_name(name):
83 if len(name) == 0:
84 return False
85
86 if name[0] != '_' and not name[0].isalpha():
87 return False
88
89 for ch in name[1:]:
90 if name[0] != '_' and not name[0].isalnum():
91 return False
92
93 return True
94
95
96def parse_value(val):
97 try:
98 return int(val)
99 except ValueError:
100 pass
101
102 try:
103 return float(val)
104 except ValueError:
105 pass
106
107 if val.startswith('{%'):
108 assert val.endswith("%}")
109 content = val[2:-2]
110 vals = list(i.strip() for i in content.split(','))
111 return map(parse_value, vals)
112
113 if val.startswith('{'):
114 assert val.endswith("}")
115 assert is_name(val[1:-1])
116 return Var(val[1:-1])
117 return val
118
119
120def fio_config_lexer(fio_cfg, fname):
121 for lineno, oline in enumerate(fio_cfg.split("\n")):
122 try:
123 line = oline.strip()
124
125 if line.startswith("#") or line.startswith(";"):
126 continue
127
128 if line == "":
129 continue
130
131 if '#' in line:
132 raise ParseError("# isn't allowed inside line",
133 fname, lineno, oline)
134
135 if line.startswith('['):
136 yield CfgLine(fname, lineno, oline, SECTION,
137 line[1:-1].strip(), None)
138 elif '=' in line:
139 opt_name, opt_val = line.split('=', 1)
140 yield CfgLine(fname, lineno, oline, SETTING,
141 opt_name.strip(),
142 parse_value(opt_val.strip()))
143 elif line.startswith("include "):
144 yield CfgLine(fname, lineno, oline, INCLUDE,
145 line.split(" ", 1)[1], None)
146 else:
147 yield CfgLine(fname, lineno, oline, SETTING, line, '1')
148
149 except Exception as exc:
150 raise ParseError(str(exc), fname, lineno, oline)
151
152
153def fio_config_parse(lexer_iter):
154 in_globals = False
155 curr_section = None
156 glob_vals = OrderedDict()
157 sections_count = 0
158
159 lexed_lines = list(lexer_iter)
160 one_more = True
161 includes = {}
162
163 while one_more:
164 new_lines = []
165 one_more = False
166 for line in lexed_lines:
167 fname, lineno, oline, tp, name, val = line
168
169 if INCLUDE == tp:
170 if not os.path.exists(fname):
171 dirname = '.'
172 else:
173 dirname = os.path.dirname(fname)
174
175 new_fname = os.path.join(dirname, name)
176 includes[new_fname] = (fname, lineno)
177
178 try:
179 cont = open(new_fname).read()
180 except IOError as err:
181 msg = "Error while including file {0}: {1}"
182 raise ParseError(msg.format(new_fname, err),
183 fname, lineno, oline)
184
185 new_lines.extend(fio_config_lexer(cont, new_fname))
186 one_more = True
187 else:
188 new_lines.append(line)
189
190 lexed_lines = new_lines
191
192 for fname, lineno, oline, tp, name, val in lexed_lines:
193 if tp == SECTION:
194 if curr_section is not None:
195 yield curr_section
196 curr_section = None
197
198 if name == 'global':
199 if sections_count != 0:
200 raise ParseError("[global] section should" +
201 " be only one and first",
202 fname, lineno, oline)
203 in_globals = True
204 else:
205 in_globals = False
206 curr_section = FioJobSection(name)
207 curr_section.vals = glob_vals.copy()
208 sections_count += 1
209 else:
210 assert tp == SETTING
211 if in_globals:
212 glob_vals[name] = val
213 elif name == name.upper():
214 raise ParseError("Param '" + name +
215 "' not in [global] section",
216 fname, lineno, oline)
217 elif curr_section is None:
218 raise ParseError("Data outside section",
219 fname, lineno, oline)
220 else:
221 curr_section.vals[name] = val
222
223 if curr_section is not None:
224 yield curr_section
225
226
227def process_repeats(sec):
228 sec = sec.copy()
229 count = sec.vals.pop('NUM_ROUNDS', 1)
230 assert isinstance(count, (int, long))
231
232 for _ in range(count):
233 yield sec.copy()
234
235 if 'ramp_time' in sec.vals:
236 sec.vals['_ramp_time'] = sec.vals.pop('ramp_time')
237
238
239def process_cycles(sec):
240 cycles = OrderedDict()
241
242 for name, val in sec.vals.items():
243 if isinstance(val, list) and name.upper() != name:
244 cycles[name] = val
245
246 if len(cycles) == 0:
247 yield sec
248 else:
249 for combination in itertools.product(*cycles.values()):
250 new_sec = sec.copy()
251 new_sec.vals.update(zip(cycles.keys(), combination))
252 yield new_sec
253
254
255def apply_params(sec, params):
256 processed_vals = OrderedDict()
257 processed_vals.update(params)
258 for name, val in sec.vals.items():
259 if name in params:
260 continue
261
262 if isinstance(val, Var):
263 if val.name in params:
264 val = params[val.name]
265 elif val.name in processed_vals:
266 val = processed_vals[val.name]
267 processed_vals[name] = val
268 sec = sec.copy()
269 sec.vals = processed_vals
270 return sec
271
272
273def finall_process(sec, counter=[0]):
274 sec = sec.copy()
275
276 if sec.vals.get('numjobs', '1') != 1:
277 msg = "Group reporting should be set if numjobs != 1"
278 assert 'group_reporting' in sec.vals, msg
279
280 sec.vals['unified_rw_reporting'] = '1'
281
282 params = sec.vals.copy()
283 params['UNIQ'] = 'UN{0}'.format(counter[0])
284 params['COUNTER'] = str(counter[0])
285 params['TEST_SUMM'] = get_test_summary(sec)
286 sec.name = sec.name.format(**params)
287 counter[0] += 1
288
289 return sec
290
291
292def get_test_sync_mode(sec):
293 is_sync = str(sec.vals.get("sync", "0")) == "1"
294 is_direct = str(sec.vals.get("direct", "0")) == "1"
295
296 if is_sync and is_direct:
297 return 'x'
298 elif is_sync:
299 return 's'
300 elif is_direct:
301 return 'd'
302 else:
303 return 'a'
304
305
306def get_test_summary(sec):
307 rw = {"randread": "rr",
308 "randwrite": "rw",
309 "read": "sr",
310 "write": "sw"}[sec.vals["rw"]]
311
312 sync_mode = get_test_sync_mode(sec)
313 th_count = sec.vals.get('numjobs')
314
315 if th_count is None:
316 th_count = sec.vals.get('concurence', 1)
317
318 return "{0}{1}{2}th{3}".format(rw,
319 sync_mode,
320 sec.vals['blocksize'],
321 th_count)
322
323
324def execution_time(sec):
325 return sec.vals.get('ramp_time', 0) + sec.vals.get('runtime', 0)
326
327
328def slice_config(sec_iter, runcycle=None, max_jobs=1000, split_on_names=False):
329 jcount = 0
330 runtime = 0
331 curr_slice = []
332 prev_name = None
333
334 for pos, sec in enumerate(sec_iter):
335
336 if prev_name is not None:
337 split_here = False
338
339 if split_on_names and prev_name != sec.name:
340 split_here = True
341
342 if split_here:
343 yield curr_slice
344 curr_slice = []
345 runtime = 0
346 jcount = 0
347
348 prev_name = sec.name
349
350 jc = sec.vals.get('numjobs', 1)
351 msg = "numjobs should be integer, not {0!r}".format(jc)
352 assert isinstance(jc, int), msg
353
354 curr_task_time = execution_time(sec)
355
356 if jc > max_jobs:
357 err_templ = "Can't process job {0!r} - too large numjobs"
358 raise ValueError(err_templ.format(sec.name))
359
360 if runcycle is not None and len(curr_slice) != 0:
361 rc_ok = curr_task_time + runtime <= runcycle
362 else:
363 rc_ok = True
364
365 if jc + jcount <= max_jobs and rc_ok:
366 runtime += curr_task_time
367 jcount += jc
368 curr_slice.append(sec)
369 continue
370
371 assert len(curr_slice) != 0
372 yield curr_slice
373
374 if '_ramp_time' in sec.vals:
375 sec.vals['ramp_time'] = sec.vals.pop('_ramp_time')
376 curr_task_time = execution_time(sec)
377
378 runtime = curr_task_time
379 jcount = jc
380 curr_slice = [sec]
381 prev_name = None
382
383 if curr_slice != []:
384 yield curr_slice
385
386
387def parse_all_in_1(source, fname=None):
388 return fio_config_parse(fio_config_lexer(source, fname))
389
390
391def flatmap(func, inp_iter):
392 for val in inp_iter:
393 for res in func(val):
394 yield res
395
396
397def fio_cfg_compile(source, fname, test_params, **slice_params):
398 it = parse_all_in_1(source, fname)
399 it = (apply_params(sec, test_params) for sec in it)
400 it = flatmap(process_cycles, it)
401 it = flatmap(process_repeats, it)
402 it = itertools.imap(finall_process, it)
403 return slice_config(it, **slice_params)
404
405
406def parse_args(argv):
407 parser = argparse.ArgumentParser(
408 description="Run fio' and return result")
409 parser.add_argument("--runcycle", type=int, default=None,
410 metavar="MAX_CYCLE_SECONDS",
411 help="Max cycle length in seconds")
412 parser.add_argument("-p", "--params", nargs="*", metavar="PARAM=VAL",
413 default=[],
414 help="Provide set of pairs PARAM=VAL to" +
415 "format into job description")
416 parser.add_argument("action", choices=['estimate', 'compile', 'num_tests'])
417 parser.add_argument("jobfile")
418 return parser.parse_args(argv)
419
420
421def main(argv):
422 argv_obj = parse_args(argv)
423
424 if argv_obj.jobfile == '-':
425 job_cfg = sys.stdin.read()
426 else:
427 job_cfg = open(argv_obj.jobfile).read()
428
429 params = {}
430 for param_val in argv_obj.params:
431 assert '=' in param_val
432 name, val = param_val.split("=", 1)
433 params[name] = parse_value(val)
434
435 slice_params = {
436 'runcycle': argv_obj.runcycle,
437 }
438
439 sliced_it = fio_cfg_compile(job_cfg, argv_obj.jobfile,
440 params, **slice_params)
441
442 if argv_obj.action == 'estimate':
443 sum_time = 0
444 for cfg_slice in sliced_it:
445 sum_time += sum(map(execution_time, cfg_slice))
446 print sec_to_str(sum_time)
447 elif argv_obj.action == 'num_tests':
448 print sum(map(len, map(list, sliced_it)))
449 elif argv_obj.action == 'compile':
450 splitter = "\n#" + "-" * 70 + "\n\n"
451 for cfg_slice in sliced_it:
452 print splitter.join(map(str, cfg_slice))
453
454 return 0
455
456
457if __name__ == '__main__':
458 exit(main(sys.argv[1:]))