blob: 69ec6613dbd3129d30e53f3531abdfa9a9521405 [file] [log] [blame]
Alexb78191f2021-11-02 16:35:46 -05001import json
2import os
3import queue
4import signal
5import subprocess
6
7
8from copy import deepcopy
9from datetime import datetime, timedelta
10from platform import system, release, node
11from threading import Thread
12import threading
13from time import sleep
14
15
16from cfg_checker.common.exception import CheckerException
17from cfg_checker.common.other import piped_shell
18from cfg_checker.common.log import logger
19
20
21_datetime_fmt = "%m/%d/%Y, %H:%M:%S"
Alex5cace3b2021-11-10 16:40:37 -060022fio_options_common = {
23 "name": "agent_run",
24 "filename": "/cephvol/testfile",
25 "status-interval": "500ms",
26 "randrepeat": 0,
27 "verify": 0,
28 "direct": 1,
29 "gtod_reduce": 0,
30 "bs": "32k",
31 "iodepth": 16,
32 "size": "10G",
33 "readwrite": "randrw",
34 "ramp_time": "5s",
35 "runtime": "30s",
36 "ioengine": "libaio"
37}
38
39fio_options_seq = {
40 "numjobs": 1,
41 "offset_increment": "500M"
42}
43fio_options_mix = {
44 "rwmixread": 50
45}
46
47
48def get_fio_options():
49 # Duplicate function for external option access
50 _opts = deepcopy(fio_options_common)
51 _opts.update(deepcopy(fio_options_seq))
52 _opts.update(deepcopy(fio_options_mix))
53 return _opts
Alexb78191f2021-11-02 16:35:46 -050054
55
56def output_reader(_stdout, outq):
57 for line in iter(_stdout.readline, ''):
58 outq.put(line)
59
60
61def _o(option, param, suffix=""):
62 return "--{}={}{}".format(option, param, suffix)
63
64
65def get_time(timestamp=None):
66 if not timestamp:
67 _t = datetime.now()
68 else:
69 _t = datetime.fromtimestamp(timestamp)
70 return _t.strftime(_datetime_fmt)
71
72
73def _get_seconds(value):
74 # assume that we have symbol at the end
75 _suffix = value[-1]
76 if _suffix == 's':
77 return int(value[:-1])
78 elif _suffix == 'm':
79 return int(value[:-1])*60
80 elif _suffix == 'h':
81 return int(value[:-1])*60*60
82 else:
83 return -1
84
85
86def wait_until(end_datetime):
87 while True:
88 diff = (end_datetime - datetime.now()).total_seconds()
89 # In case end_datetime was in past to begin with
90 if diff < 0:
91 return
92 sleep(diff/2)
93 if diff <= 0.1:
94 return
95
96
97class ShellThread(object):
98 def __init__(self, cmd, queue):
99 self.cmd = cmd
100 self.queue = queue
101 self._p = None
102 self.timeout = 15
103 self.output = []
104
105 def run_shell(self):
106 # Start
107 _cmd = " ".join(self.cmd)
108 logger.debug("... {}".format(_cmd))
109 self._p = subprocess.Popen(
110 _cmd,
111 shell=True,
112 stdout=subprocess.PIPE,
113 stderr=subprocess.STDOUT,
114 env={"PYTHONUNBUFFERED": "1"},
115 universal_newlines=True,
116 bufsize=1
117 )
118 self._t = threading.Thread(
119 target=output_reader,
120 args=(self._p.stdout, self.queue)
121 )
122 self._t.start()
123 if not self.wait_started():
124 self.kill_shell()
125
126 def is_alive(self):
127 if not self._p.poll():
128 return True
129 else:
130 return False
131
132 def wait_started(self):
133 while True:
134 if not self.queue.empty():
135 break
136 else:
137 logger.debug("... {} sec".format(self.timeout))
138 sleep(1)
139 self.timeout -= 1
140 if not self.timeout:
141 logger.debug(
142 "...timed out after {} sec".format(str(self.timeout))
143 )
144 return False
145 logger.debug("... got first fio output")
146 return True
147
148 def kill_shell(self):
149 # Run the poll
150 if not self._p.poll():
151 self._p.send_signal(signal.SIGINT)
152 self.get_output()
153
154 def get_output(self):
155 while True:
156 try:
157 line = self.queue.get(block=False)
158 line = str(line) if isinstance(line, bytes) else line
159 self.output.append(line)
160 except queue.Empty:
161 return self.output
162 return None
163
164
165class FioProcess(Thread):
166 # init vars for status
167 _fio_options_list = [
168 "--time_based",
169 "--output-format=json+",
170 "--eta=always"
171 ]
172 _fio_options_seq_list = [
173 "--thread"
174 ]
175
Alex5cace3b2021-11-10 16:40:37 -0600176 _fio_options_common = fio_options_common
177 _fio_options_seq = fio_options_seq
178 _fio_options_mix = fio_options_mix
Alexb78191f2021-11-02 16:35:46 -0500179
180 eta_sec = 0
181 total_time_sec = 0
182 elapsed_sec = 0
183 testrun = {}
184
Alex5cace3b2021-11-10 16:40:37 -0600185 mount_point = "/cephvol"
Alexb78191f2021-11-02 16:35:46 -0500186 filename = "testfile"
187
188 # test modes: 'randread', 'randwrite', 'read', 'write', 'randrw'
189 mode = "randrw"
190 _seq_modes = ['read', 'write']
191 _mix_modes = ['randrw']
192 _rand_modes = ['randread', 'randwrite']
193
194 # results
195 results = {}
196
197 def _shell(self, cmd):
198 self._code, self._shell_output = piped_shell(cmd, code=True)
199 if self._code:
200 logger.error(
201 "# Shell error for '{}': [{}] {}".format(
202 cmd,
203 self._code,
204 self._shell_output
205 )
206 )
207 return False
208 else:
209 return True
210
211 def recalc_times(self):
212 _rt = _get_seconds(self._fio_options_common["runtime"])
213 _rup = _get_seconds(self._fio_options_common["ramp_time"])
214 if not _rt:
215 raise CheckerException("invalid 'runtime': '{}'".format(_rt))
216 elif not _rup:
217 raise CheckerException("invalid 'ramp_time': '{}'".format(_rt))
218
219 self.total_time_sec = _rt + _rup
220 self.eta_sec = self.total_time_sec
221
222 def __init__(self):
223 Thread.__init__(self)
224 logger.info("fio thread initialized")
225 # save system
226 self.system = system()
227 self.release = release()
228 self.hostname = node()
229 # create a clear var for last shell output
230 self._shell_output = ""
231 # prepare params
232 self.recalc_times()
233 # prepare the fio
234 self.fio_version = "unknown"
235 if not self._shell("fio --version"):
236 raise CheckerException(
237 "Error running fio: '{}'".format(self._shell_output)
238 )
239 else:
240 self.fio_version = self._shell_output
241 # all outputs timeline
242 self.timeline = {}
Alex5cace3b2021-11-10 16:40:37 -0600243 # setup target file
244 if not os.path.exists(self.mount_point):
245 logger.warning(
246 "WARNING: '{}' not exists, using tmp folder".format(
247 self.mount_point
248 )
249 )
250 self.mount_point = "/tmp"
Alexb78191f2021-11-02 16:35:46 -0500251 self._fio_options_common["filename"] = os.path.join(
252 self.mount_point,
253 self.filename
254 )
255
256 if self.system == "Darwin":
257 self._fio_options_common["ioengine"] = "posixaio"
258 # Thread finish marker
259 self.finished = False
260 self.scheduled_datetime = None
261
262 def update_options(self, _dict):
263 # validate keys, do not validate numbers themselves
264 for k, v in _dict.items():
265 if k in self._fio_options_mix:
266 self._fio_options_mix[k] = v
267 elif k in self._fio_options_seq:
268 self._fio_options_seq[k] = v
269 elif k in self._fio_options_common:
270 self._fio_options_common[k] = v
271 else:
272 raise CheckerException(
273 "Error running fio: '{}'".format(self._shell_output)
274 )
275 # recalc
276 self.recalc_times()
277
278 def run(self):
279 def _cut(_list, _s, _e):
280 _new = _list[_s:_e]
281 _pre = _list[:_s]
282 _list = _pre + _list[_e:]
283 return (_new, _list)
284
285 # create a cmd
286 _cmd = ["fio"]
287 _cmd += self._fio_options_list
288 _cmd += [_o(k, v) for k, v in self._fio_options_common.items()]
289
290 if self._fio_options_common["readwrite"] in self._seq_modes:
291 _sq = self._fio_options_seq_list
292 _cmd += _sq + [_o(k, v) for k, v in self._fio_options_seq.items()]
293 elif self._fio_options_common["readwrite"] in self._mix_modes:
294 _cmd += [_o(k, v) for k, v in self._fio_options_mix.items()]
295
296 _q = queue.Queue()
297 self.fiorun = ShellThread(_cmd, _q)
298 # Check if schedule is set
299 if self.scheduled_datetime:
300 logger.debug(
301 "waiting for '{}', now is '{}', total of {} sec left".format(
302 self.scheduled_datetime.strftime(_datetime_fmt),
303 datetime.now().strftime(_datetime_fmt),
304 (self.scheduled_datetime - datetime.now()).total_seconds()
305 )
306 )
307 wait_until(self.scheduled_datetime)
308 self.fiorun.run_shell()
309 _raw = []
310 _start = -1
311 _end = -1
312 while self.fiorun.is_alive() or not _q.empty():
313 while not _q.empty():
314 # processing
315 _bb = _q.get(block=False)
316 if isinstance(_bb, bytes):
317 _line = _bb.decode('utf-8')
318 else:
319 _line = _bb
320 if _start < 0 and _end < 0 and not _line.startswith("{"):
321 _time = get_time()
322 self.result[_time] = {
323 "error": _line
324 }
325 self.eta = -1
326 self.fiorun.kill_shell()
327 return
328 _current = _line.splitlines()
329 _raw += _current
330 for ll in range(len(_raw)):
331 if _start < 0 and _raw[ll] == "{":
332 _start = ll
333 elif _end < 0 and _raw[ll] == "}":
334 _end = ll
335 # loop until we have full json
336 if _end < 0 or _start < 0:
337 continue
338 # if start and and found, cut json
339 (_json, _raw) = _cut(_raw, _start, _end+1)
340 _start = -1
341 _end = -1
342 # Try to parse json
343 _json = "\n".join(_json)
344 try:
345 _json = json.loads(_json)
346 _timestamp = _json["timestamp"]
347 self.timeline[_timestamp] = _json["jobs"][0]
348
349 # save last values
350 self.eta_sec = self.timeline[_timestamp]["eta"]
351 self.elapsed_sec = self.timeline[_timestamp]["elapsed"]
352 self.testrun = _json
353 except TypeError as e:
354 logger.error("ERROR: {}".format(e))
355 except json.decoder.JSONDecodeError as e:
356 logger.error("ERROR: {}".format(e))
357 if not self.eta_sec:
358 break
359 sleep(0.1)
360 # Save status to results dictionary
361 self.results[get_time(timestamp=self.testrun["timestamp"])] = {
362 "result": self.testrun,
363 "timeline": self.timeline
364 }
365 self.finished = True
366 return
367
368 def healthcheck(self):
369 _version = self.fio_version
370 _binary_path = self._shell_output if self._shell("which fio") else ""
371 if self._shell("fio --enghelp"):
372 _ioengines = self._shell_output
373 _ioengines = _ioengines.replace("\t", "")
374 _ioengines = _ioengines.splitlines()[1:]
375 else:
376 _ioengines = []
377
378 return {
379 "ready": all((_version, _binary_path, _ioengines)),
380 "version": _version,
381 "path": _binary_path,
382 "ioengines": _ioengines,
383 "system": self.system,
384 "release": self.release,
385 "hostname": self.hostname
386 }
387
388 def status(self):
389 _running = self.is_alive() and self.eta_sec >= 0
390 _scheduled = False
391 _diff = -1
392 if self.scheduled_datetime:
393 _diff = (self.scheduled_datetime - datetime.now()).total_seconds()
394 if _diff > 0:
395 _scheduled = True
396 _s = "running" if _running else "idle"
397 _s = "scheduled" if _scheduled else _s
398 _s = "finished" if self.finished else _s
399 return {
400 "status": _s,
401 "progress": self.get_progress()
402 }
403
404 def end_fio(self):
405 if self.fiorun:
406 self.fiorun.kill_shell()
407
408 # Current run
409 def percent_done(self):
410 _total = self.elapsed_sec + self.eta_sec
411 return float(self.elapsed_sec) / float(_total) * 100.0
412
413 def get_progress(self):
414 return "{:.2f}".format(self.percent_done())
415
416 # latest parsed measurements
417 def get_last_measurements(self):
418 if self.timeline:
419 return self.timeline[max(list(self.timeline.keys()))]
420 else:
421 return {}
422
423
424class FioProcessShellRun(object):
425 stats = {}
426 results = {}
427
428 def __init__(self, init_class=FioProcess):
429 self.init_class = init_class
430 self.actions = {
431 "do_singlerun": self.do_singlerun,
432 "do_scheduledrun": self.do_scheduledrun,
433 "get_options": self.get_options,
434 "get_result": self.get_result,
435 "get_resultlist": self.get_resultlist
436 }
437 self.fio_reset()
438
439 @staticmethod
440 def healthcheck(fio):
441 hchk = fio.healthcheck()
442 hchk_str = \
443 "# fio status: {}\n# {} at {}\n# Engines: {}".format(
444 "ready" if hchk["ready"] else "fail",
445 hchk["version"],
446 hchk["path"],
447 ", ".join(hchk["ioengines"])
448 )
449 return hchk, hchk_str
450
451 def status(self):
452 return self.fio.status()
453
454 def fio_reset(self):
455 # Fancy way of handling fio class not even initialized yet
456 try:
457 _f = self.fio.finished
458 _r = self.fio.results
459 _o = self.fio.get_options()
460 except AttributeError:
461 _f = True
462 _r = None
463 _o = None
464 # Check if reset is needed
465 if not _f:
466 # No need to reset, fio is either idle or running
467 return
468 else:
469 # extract results if they present
470 if _r:
471 self.results.update(_r)
472 # re-init
473 _fio = self.init_class()
474 # Do healthcheck
475 self.hchk, self.hchk_str = self.healthcheck(_fio)
476 # restore options if they existed
477 if _o:
478 _fio.update_options(_o)
479 self.fio = _fio
480
481 def get_options(self):
482 _opts = deepcopy(self.fio._fio_options_common)
483 _opts.update(deepcopy(self.fio._fio_options_seq))
484 _opts.update(deepcopy(self.fio._fio_options_mix))
485 return _opts
486
487 def do_singlerun(self, options):
488 # Reset thread if it closed
489 self.fio_reset()
490 # Fill options
491 self.fio.update_options(options)
492 # Start it
493 self.fio.start()
494 return True
495
496 def do_scheduledrun(self, options):
497 # Reset thread if it closed
498 self.fio_reset()
499 # Handle scheduled time
500 if "scheduled_to" not in options:
501 # required parameter not set
502 return False
503 else:
504 # set time and get rid of it from options
Alex5cace3b2021-11-10 16:40:37 -0600505 _time = options.pop("scheduled_to")
506 self.fio.scheduled_datetime = datetime.strptime(
507 _time,
508 _datetime_fmt
509 )
Alexb78191f2021-11-02 16:35:46 -0500510 # Fill options
511 self.fio.update_options(options)
512 # Start it
513 self.fio.start()
514 return True
515
516 def _get_result_object(self, obj_name, time):
517 if time in self.results:
518 if obj_name in self.results[time]:
519 return self.results[time][obj_name]
520 elif "error" in self.results[time]:
521 return self.results[time]["error"]
522 else:
523 return {
524 "error": "Empty {} for '{}'".format(obj_name, time)
525 }
526 else:
527 return {
528 "error": "Result not found for '{}'".format(time)
529 }
530
531 def _update_results(self):
532 # Update only in case of completed thread
533 if self.fio.finished:
534 _r_local = list(self.results.keys())
535 _r_fio = list(self.fio.results.keys())
536 for _r in _r_fio:
537 if _r not in _r_local:
538 self.results[_r] = self.fio.results.pop(_r)
539
540 def get_result(self, time):
541 self._update_results()
542 return self._get_result_object('result', time)
543
544 def get_result_timeline(self, time):
545 self._update_results()
546 return self._get_result_object('timeline', time)
547
548 # reporting
549 def get_resultlist(self):
550 self._update_results()
551 return list(self.results.keys())
552
553 def __call__(self):
554 if not self.fio.is_alive() and not self.fio.finished:
555 self.fio.start()
556
557 while self.fio.is_alive() and self.fio.eta_sec >= 0:
558 sleep(0.2)
559 self.stats = self.fio.get_last_measurements()
560
561 _r = self.stats.get('read', {})
562 _w = self.stats.get('write', {})
563
564 _r_bw = _r.get('bw_bytes', -1)
565 _r_iops = _r.get('iops', -1)
566 _w_bw = _w.get('bw_bytes', -1)
567 _w_iops = _w.get('iops', -1)
568 _s = self.fio.status()
569 if _s["status"] == "scheduled":
570 _t = self.fio.scheduled_datetime
571 _n = datetime.now()
572 _delta = (_t - _n).total_seconds()
573 print(
574 "waiting for '{}'; now '{}'; {} sec left".format(
575 _t.strftime(_datetime_fmt),
576 _n.strftime(_datetime_fmt),
577 _delta
578 )
579 )
580 else:
581 stats = "{}: {:>7}% ({}/{}) " \
582 "(BW/IOPS: " \
583 "Read {:>9.2f} MB/{:>9.2f} " \
584 "Write {:>9.2f} MB/{:>9.2f})".format(
585 _s["status"],
586 _s["progress"],
587 self.fio.elapsed_sec,
588 self.fio.eta_sec,
589 _r_bw / 1024 / 1024,
590 _r_iops,
591 _w_bw / 1024 / 1024,
592 _w_iops
593 )
594 print(stats)
595 self.fio.end_fio()
596
597
598if __name__ == '__main__':
599 # Debug shell to test FioProcessShellRun
600 _shell = FioProcessShellRun()
601 _opts = _shell.get_options()
602 _opts["readwrite"] = "read"
603 _opts["ramp_time"] = "1s"
604 _opts["runtime"] = "5s"
605 _shell.do_singlerun(_opts)
606 _shell()
607 _times = _shell.get_resultlist()
608 print("# results:\n{}".format("\n".join(_times)))
609 # print(
610 # "##### Dumping results\n{}".format(
611 # json.dumps(_shell.get_result(_times[0]), indent=2)
612 # )
613 # )
614 _shell.fio_reset()
615 _opts = _shell.get_options()
616 _opts["readwrite"] = "read"
617 _opts["ramp_time"] = "1s"
618 _opts["runtime"] = "10s"
Alex5cace3b2021-11-10 16:40:37 -0600619 _opts["scheduled_to"] = (datetime.now() + timedelta(seconds=12)).strftime(
620 _datetime_fmt
621 )
Alexb78191f2021-11-02 16:35:46 -0500622 _shell.do_scheduledrun(_opts)
623 _shell()
624 _times = _shell.get_resultlist()
625 print("# results:\n{}".format("\n".join(_times)))