blob: 29173edf1632ed0de91478d5c6a012fb10f8b137 [file] [log] [blame]
Alexb78191f2021-11-02 16:35:46 -05001import json
2import os
3import queue
4import signal
5import subprocess
6
7
8from copy import deepcopy
Alex3034ba52021-11-13 17:06:45 -06009from datetime import datetime, timezone
Alexb78191f2021-11-02 16:35:46 -050010from platform import system, release, node
11from threading import Thread
12import threading
13from time import sleep
14
15
16from cfg_checker.common.exception import CheckerException
17from cfg_checker.common.other import piped_shell
18from cfg_checker.common.log import logger
19
20
Alex3034ba52021-11-13 17:06:45 -060021_datetime_fmt = "%m/%d/%Y, %H:%M:%S%z"
Alex5cace3b2021-11-10 16:40:37 -060022fio_options_common = {
23 "name": "agent_run",
24 "filename": "/cephvol/testfile",
25 "status-interval": "500ms",
26 "randrepeat": 0,
27 "verify": 0,
28 "direct": 1,
29 "gtod_reduce": 0,
30 "bs": "32k",
31 "iodepth": 16,
32 "size": "10G",
33 "readwrite": "randrw",
34 "ramp_time": "5s",
35 "runtime": "30s",
36 "ioengine": "libaio"
37}
38
39fio_options_seq = {
40 "numjobs": 1,
41 "offset_increment": "500M"
42}
43fio_options_mix = {
44 "rwmixread": 50
45}
46
Alex90ac1532021-12-09 11:13:14 -060047seq_modes = ['read', 'write']
48mix_modes = ['randrw']
49rand_modes = ['randread', 'randwrite']
50
Alex5cace3b2021-11-10 16:40:37 -060051
52def get_fio_options():
53 # Duplicate function for external option access
54 _opts = deepcopy(fio_options_common)
55 _opts.update(deepcopy(fio_options_seq))
56 _opts.update(deepcopy(fio_options_mix))
57 return _opts
Alexb78191f2021-11-02 16:35:46 -050058
59
60def output_reader(_stdout, outq):
61 for line in iter(_stdout.readline, ''):
62 outq.put(line)
63
64
65def _o(option, param, suffix=""):
66 return "--{}={}{}".format(option, param, suffix)
67
68
69def get_time(timestamp=None):
70 if not timestamp:
Alex3034ba52021-11-13 17:06:45 -060071 _t = datetime.now(timezone.utc)
Alexb78191f2021-11-02 16:35:46 -050072 else:
73 _t = datetime.fromtimestamp(timestamp)
74 return _t.strftime(_datetime_fmt)
75
76
77def _get_seconds(value):
78 # assume that we have symbol at the end
79 _suffix = value[-1]
80 if _suffix == 's':
81 return int(value[:-1])
82 elif _suffix == 'm':
83 return int(value[:-1])*60
84 elif _suffix == 'h':
85 return int(value[:-1])*60*60
86 else:
87 return -1
88
89
90def wait_until(end_datetime):
91 while True:
Alex3034ba52021-11-13 17:06:45 -060092 diff = (end_datetime - datetime.now(timezone.utc)).total_seconds()
Alexb78191f2021-11-02 16:35:46 -050093 # In case end_datetime was in past to begin with
94 if diff < 0:
95 return
96 sleep(diff/2)
97 if diff <= 0.1:
98 return
99
100
101class ShellThread(object):
102 def __init__(self, cmd, queue):
103 self.cmd = cmd
104 self.queue = queue
105 self._p = None
106 self.timeout = 15
107 self.output = []
108
109 def run_shell(self):
110 # Start
111 _cmd = " ".join(self.cmd)
112 logger.debug("... {}".format(_cmd))
113 self._p = subprocess.Popen(
114 _cmd,
115 shell=True,
116 stdout=subprocess.PIPE,
117 stderr=subprocess.STDOUT,
118 env={"PYTHONUNBUFFERED": "1"},
119 universal_newlines=True,
120 bufsize=1
121 )
122 self._t = threading.Thread(
123 target=output_reader,
124 args=(self._p.stdout, self.queue)
125 )
126 self._t.start()
127 if not self.wait_started():
128 self.kill_shell()
129
130 def is_alive(self):
131 if not self._p.poll():
132 return True
133 else:
134 return False
135
136 def wait_started(self):
137 while True:
138 if not self.queue.empty():
139 break
140 else:
141 logger.debug("... {} sec".format(self.timeout))
142 sleep(1)
143 self.timeout -= 1
144 if not self.timeout:
145 logger.debug(
146 "...timed out after {} sec".format(str(self.timeout))
147 )
148 return False
149 logger.debug("... got first fio output")
150 return True
151
152 def kill_shell(self):
153 # Run the poll
154 if not self._p.poll():
155 self._p.send_signal(signal.SIGINT)
156 self.get_output()
157
158 def get_output(self):
159 while True:
160 try:
161 line = self.queue.get(block=False)
162 line = str(line) if isinstance(line, bytes) else line
163 self.output.append(line)
164 except queue.Empty:
165 return self.output
166 return None
167
168
169class FioProcess(Thread):
170 # init vars for status
171 _fio_options_list = [
172 "--time_based",
173 "--output-format=json+",
174 "--eta=always"
175 ]
176 _fio_options_seq_list = [
177 "--thread"
178 ]
179
Alex5cace3b2021-11-10 16:40:37 -0600180 _fio_options_common = fio_options_common
181 _fio_options_seq = fio_options_seq
182 _fio_options_mix = fio_options_mix
Alexb78191f2021-11-02 16:35:46 -0500183
184 eta_sec = 0
185 total_time_sec = 0
186 elapsed_sec = 0
187 testrun = {}
188
Alex5cace3b2021-11-10 16:40:37 -0600189 mount_point = "/cephvol"
Alexb78191f2021-11-02 16:35:46 -0500190 filename = "testfile"
191
192 # test modes: 'randread', 'randwrite', 'read', 'write', 'randrw'
193 mode = "randrw"
Alex90ac1532021-12-09 11:13:14 -0600194 _seq_modes = seq_modes
195 _mix_modes = mix_modes
196 _rand_modes = rand_modes
Alexb78191f2021-11-02 16:35:46 -0500197
198 # results
199 results = {}
200
201 def _shell(self, cmd):
202 self._code, self._shell_output = piped_shell(cmd, code=True)
203 if self._code:
204 logger.error(
205 "# Shell error for '{}': [{}] {}".format(
206 cmd,
207 self._code,
208 self._shell_output
209 )
210 )
211 return False
212 else:
213 return True
214
215 def recalc_times(self):
216 _rt = _get_seconds(self._fio_options_common["runtime"])
217 _rup = _get_seconds(self._fio_options_common["ramp_time"])
218 if not _rt:
219 raise CheckerException("invalid 'runtime': '{}'".format(_rt))
220 elif not _rup:
221 raise CheckerException("invalid 'ramp_time': '{}'".format(_rt))
222
223 self.total_time_sec = _rt + _rup
224 self.eta_sec = self.total_time_sec
225
226 def __init__(self):
227 Thread.__init__(self)
228 logger.info("fio thread initialized")
229 # save system
230 self.system = system()
231 self.release = release()
232 self.hostname = node()
233 # create a clear var for last shell output
234 self._shell_output = ""
235 # prepare params
236 self.recalc_times()
237 # prepare the fio
238 self.fio_version = "unknown"
239 if not self._shell("fio --version"):
240 raise CheckerException(
241 "Error running fio: '{}'".format(self._shell_output)
242 )
243 else:
244 self.fio_version = self._shell_output
245 # all outputs timeline
246 self.timeline = {}
Alex5cace3b2021-11-10 16:40:37 -0600247 # setup target file
248 if not os.path.exists(self.mount_point):
249 logger.warning(
250 "WARNING: '{}' not exists, using tmp folder".format(
251 self.mount_point
252 )
253 )
254 self.mount_point = "/tmp"
Alexb78191f2021-11-02 16:35:46 -0500255 self._fio_options_common["filename"] = os.path.join(
256 self.mount_point,
257 self.filename
258 )
259
260 if self.system == "Darwin":
261 self._fio_options_common["ioengine"] = "posixaio"
262 # Thread finish marker
263 self.finished = False
Alexb2129542021-11-23 15:49:42 -0600264 self.testrun_starttime = None
Alexb78191f2021-11-02 16:35:46 -0500265 self.scheduled_datetime = None
266
267 def update_options(self, _dict):
268 # validate keys, do not validate numbers themselves
269 for k, v in _dict.items():
270 if k in self._fio_options_mix:
271 self._fio_options_mix[k] = v
272 elif k in self._fio_options_seq:
273 self._fio_options_seq[k] = v
274 elif k in self._fio_options_common:
275 self._fio_options_common[k] = v
276 else:
277 raise CheckerException(
Alex2a7657c2021-11-10 20:51:34 -0600278 "Unknown option: '{}': '{}'".format(k, v)
Alexb78191f2021-11-02 16:35:46 -0500279 )
280 # recalc
281 self.recalc_times()
282
283 def run(self):
284 def _cut(_list, _s, _e):
285 _new = _list[_s:_e]
286 _pre = _list[:_s]
287 _list = _pre + _list[_e:]
288 return (_new, _list)
289
290 # create a cmd
291 _cmd = ["fio"]
292 _cmd += self._fio_options_list
293 _cmd += [_o(k, v) for k, v in self._fio_options_common.items()]
294
295 if self._fio_options_common["readwrite"] in self._seq_modes:
296 _sq = self._fio_options_seq_list
297 _cmd += _sq + [_o(k, v) for k, v in self._fio_options_seq.items()]
298 elif self._fio_options_common["readwrite"] in self._mix_modes:
299 _cmd += [_o(k, v) for k, v in self._fio_options_mix.items()]
300
301 _q = queue.Queue()
302 self.fiorun = ShellThread(_cmd, _q)
303 # Check if schedule is set
Alexb2129542021-11-23 15:49:42 -0600304 _now = datetime.now(timezone.utc)
Alexb78191f2021-11-02 16:35:46 -0500305 if self.scheduled_datetime:
306 logger.debug(
307 "waiting for '{}', now is '{}', total of {} sec left".format(
308 self.scheduled_datetime.strftime(_datetime_fmt),
Alex3034ba52021-11-13 17:06:45 -0600309 _now.strftime(_datetime_fmt),
310 (self.scheduled_datetime - _now).total_seconds()
Alexb78191f2021-11-02 16:35:46 -0500311 )
312 )
313 wait_until(self.scheduled_datetime)
Alexb2129542021-11-23 15:49:42 -0600314 else:
315 self.testrun_starttime = _now.strftime(_datetime_fmt)
Alexb78191f2021-11-02 16:35:46 -0500316 self.fiorun.run_shell()
317 _raw = []
318 _start = -1
319 _end = -1
320 while self.fiorun.is_alive() or not _q.empty():
321 while not _q.empty():
322 # processing
323 _bb = _q.get(block=False)
324 if isinstance(_bb, bytes):
325 _line = _bb.decode('utf-8')
326 else:
327 _line = _bb
328 if _start < 0 and _end < 0 and not _line.startswith("{"):
329 _time = get_time()
Alex2a7657c2021-11-10 20:51:34 -0600330 self.results[_time] = {
Alexb78191f2021-11-02 16:35:46 -0500331 "error": _line
332 }
333 self.eta = -1
334 self.fiorun.kill_shell()
Alex2a7657c2021-11-10 20:51:34 -0600335 self.finished = True
Alexb78191f2021-11-02 16:35:46 -0500336 return
337 _current = _line.splitlines()
338 _raw += _current
339 for ll in range(len(_raw)):
340 if _start < 0 and _raw[ll] == "{":
341 _start = ll
342 elif _end < 0 and _raw[ll] == "}":
343 _end = ll
344 # loop until we have full json
345 if _end < 0 or _start < 0:
346 continue
347 # if start and and found, cut json
348 (_json, _raw) = _cut(_raw, _start, _end+1)
349 _start = -1
350 _end = -1
351 # Try to parse json
352 _json = "\n".join(_json)
353 try:
354 _json = json.loads(_json)
355 _timestamp = _json["timestamp"]
356 self.timeline[_timestamp] = _json["jobs"][0]
357
358 # save last values
359 self.eta_sec = self.timeline[_timestamp]["eta"]
360 self.elapsed_sec = self.timeline[_timestamp]["elapsed"]
361 self.testrun = _json
362 except TypeError as e:
363 logger.error("ERROR: {}".format(e))
364 except json.decoder.JSONDecodeError as e:
365 logger.error("ERROR: {}".format(e))
366 if not self.eta_sec:
367 break
368 sleep(0.1)
369 # Save status to results dictionary
Alexb2129542021-11-23 15:49:42 -0600370 self.results[self.testrun_starttime] = {
Alexb78191f2021-11-02 16:35:46 -0500371 "result": self.testrun,
372 "timeline": self.timeline
373 }
374 self.finished = True
Alexb2129542021-11-23 15:49:42 -0600375 self.scheduled_datetime = None
376 self.testrun_starttime = None
Alexb78191f2021-11-02 16:35:46 -0500377 return
378
379 def healthcheck(self):
380 _version = self.fio_version
381 _binary_path = self._shell_output if self._shell("which fio") else ""
382 if self._shell("fio --enghelp"):
383 _ioengines = self._shell_output
384 _ioengines = _ioengines.replace("\t", "")
385 _ioengines = _ioengines.splitlines()[1:]
Alex2a7657c2021-11-10 20:51:34 -0600386 self._shell_output = ""
Alexb78191f2021-11-02 16:35:46 -0500387 else:
388 _ioengines = []
389
390 return {
391 "ready": all((_version, _binary_path, _ioengines)),
392 "version": _version,
393 "path": _binary_path,
394 "ioengines": _ioengines,
395 "system": self.system,
396 "release": self.release,
397 "hostname": self.hostname
398 }
399
400 def status(self):
401 _running = self.is_alive() and self.eta_sec >= 0
402 _scheduled = False
403 _diff = -1
404 if self.scheduled_datetime:
Alex3034ba52021-11-13 17:06:45 -0600405 _now = datetime.now(timezone.utc)
406 _diff = (self.scheduled_datetime - _now).total_seconds()
Alexb78191f2021-11-02 16:35:46 -0500407 if _diff > 0:
408 _scheduled = True
409 _s = "running" if _running else "idle"
410 _s = "scheduled" if _scheduled else _s
411 _s = "finished" if self.finished else _s
412 return {
413 "status": _s,
414 "progress": self.get_progress()
415 }
416
417 def end_fio(self):
418 if self.fiorun:
419 self.fiorun.kill_shell()
420
421 # Current run
422 def percent_done(self):
423 _total = self.elapsed_sec + self.eta_sec
424 return float(self.elapsed_sec) / float(_total) * 100.0
425
426 def get_progress(self):
427 return "{:.2f}".format(self.percent_done())
428
429 # latest parsed measurements
430 def get_last_measurements(self):
431 if self.timeline:
432 return self.timeline[max(list(self.timeline.keys()))]
433 else:
434 return {}
435
436
437class FioProcessShellRun(object):
438 stats = {}
439 results = {}
440
441 def __init__(self, init_class=FioProcess):
442 self.init_class = init_class
443 self.actions = {
444 "do_singlerun": self.do_singlerun,
445 "do_scheduledrun": self.do_scheduledrun,
446 "get_options": self.get_options,
447 "get_result": self.get_result,
448 "get_resultlist": self.get_resultlist
449 }
450 self.fio_reset()
451
452 @staticmethod
453 def healthcheck(fio):
454 hchk = fio.healthcheck()
455 hchk_str = \
456 "# fio status: {}\n# {} at {}\n# Engines: {}".format(
457 "ready" if hchk["ready"] else "fail",
458 hchk["version"],
459 hchk["path"],
460 ", ".join(hchk["ioengines"])
461 )
462 return hchk, hchk_str
463
464 def status(self):
465 return self.fio.status()
466
467 def fio_reset(self):
468 # Fancy way of handling fio class not even initialized yet
469 try:
470 _f = self.fio.finished
471 _r = self.fio.results
472 _o = self.fio.get_options()
473 except AttributeError:
474 _f = True
475 _r = None
476 _o = None
477 # Check if reset is needed
478 if not _f:
479 # No need to reset, fio is either idle or running
480 return
481 else:
482 # extract results if they present
483 if _r:
484 self.results.update(_r)
485 # re-init
486 _fio = self.init_class()
487 # Do healthcheck
488 self.hchk, self.hchk_str = self.healthcheck(_fio)
489 # restore options if they existed
490 if _o:
491 _fio.update_options(_o)
492 self.fio = _fio
493
494 def get_options(self):
495 _opts = deepcopy(self.fio._fio_options_common)
496 _opts.update(deepcopy(self.fio._fio_options_seq))
497 _opts.update(deepcopy(self.fio._fio_options_mix))
498 return _opts
499
500 def do_singlerun(self, options):
501 # Reset thread if it closed
502 self.fio_reset()
503 # Fill options
Alex2a7657c2021-11-10 20:51:34 -0600504 if "scheduled_to" in options:
505 # just ignore it
506 _k = "scheduled_to"
507 _v = options.pop(_k)
508 logger.warning("Ignoring option: '{}': '{}'".format(_k, _v))
Alexb78191f2021-11-02 16:35:46 -0500509 self.fio.update_options(options)
510 # Start it
511 self.fio.start()
512 return True
513
514 def do_scheduledrun(self, options):
515 # Reset thread if it closed
516 self.fio_reset()
517 # Handle scheduled time
518 if "scheduled_to" not in options:
519 # required parameter not set
Alex2a7657c2021-11-10 20:51:34 -0600520 raise CheckerException("Parameter missing: 'scheduled_to'")
Alexb78191f2021-11-02 16:35:46 -0500521 else:
522 # set time and get rid of it from options
Alexb2129542021-11-23 15:49:42 -0600523 self.fio.testrun_starttime = options.pop("scheduled_to")
Alex5cace3b2021-11-10 16:40:37 -0600524 self.fio.scheduled_datetime = datetime.strptime(
Alexb2129542021-11-23 15:49:42 -0600525 self.fio.testrun_starttime,
Alex5cace3b2021-11-10 16:40:37 -0600526 _datetime_fmt
527 )
Alexb78191f2021-11-02 16:35:46 -0500528 # Fill options
529 self.fio.update_options(options)
530 # Start it
531 self.fio.start()
532 return True
533
534 def _get_result_object(self, obj_name, time):
535 if time in self.results:
536 if obj_name in self.results[time]:
537 return self.results[time][obj_name]
538 elif "error" in self.results[time]:
539 return self.results[time]["error"]
540 else:
541 return {
542 "error": "Empty {} for '{}'".format(obj_name, time)
543 }
544 else:
545 return {
546 "error": "Result not found for '{}'".format(time)
547 }
548
549 def _update_results(self):
550 # Update only in case of completed thread
551 if self.fio.finished:
552 _r_local = list(self.results.keys())
553 _r_fio = list(self.fio.results.keys())
554 for _r in _r_fio:
555 if _r not in _r_local:
556 self.results[_r] = self.fio.results.pop(_r)
557
558 def get_result(self, time):
559 self._update_results()
560 return self._get_result_object('result', time)
561
562 def get_result_timeline(self, time):
563 self._update_results()
564 return self._get_result_object('timeline', time)
565
566 # reporting
567 def get_resultlist(self):
568 self._update_results()
569 return list(self.results.keys())
570
571 def __call__(self):
572 if not self.fio.is_alive() and not self.fio.finished:
573 self.fio.start()
574
575 while self.fio.is_alive() and self.fio.eta_sec >= 0:
576 sleep(0.2)
577 self.stats = self.fio.get_last_measurements()
578
579 _r = self.stats.get('read', {})
580 _w = self.stats.get('write', {})
581
582 _r_bw = _r.get('bw_bytes', -1)
583 _r_iops = _r.get('iops', -1)
584 _w_bw = _w.get('bw_bytes', -1)
585 _w_iops = _w.get('iops', -1)
586 _s = self.fio.status()
587 if _s["status"] == "scheduled":
588 _t = self.fio.scheduled_datetime
Alex3034ba52021-11-13 17:06:45 -0600589 _n = datetime.now(timezone.utc)
Alexb78191f2021-11-02 16:35:46 -0500590 _delta = (_t - _n).total_seconds()
591 print(
Alex3034ba52021-11-13 17:06:45 -0600592 "{}: waiting for '{}'; now '{}'; {} sec left".format(
593 _s["status"],
Alexb78191f2021-11-02 16:35:46 -0500594 _t.strftime(_datetime_fmt),
595 _n.strftime(_datetime_fmt),
596 _delta
597 )
598 )
599 else:
600 stats = "{}: {:>7}% ({}/{}) " \
601 "(BW/IOPS: " \
602 "Read {:>9.2f} MB/{:>9.2f} " \
603 "Write {:>9.2f} MB/{:>9.2f})".format(
604 _s["status"],
605 _s["progress"],
606 self.fio.elapsed_sec,
607 self.fio.eta_sec,
608 _r_bw / 1024 / 1024,
609 _r_iops,
610 _w_bw / 1024 / 1024,
611 _w_iops
612 )
613 print(stats)
614 self.fio.end_fio()
615
616
617if __name__ == '__main__':
618 # Debug shell to test FioProcessShellRun
619 _shell = FioProcessShellRun()
620 _opts = _shell.get_options()
621 _opts["readwrite"] = "read"
622 _opts["ramp_time"] = "1s"
623 _opts["runtime"] = "5s"
Alexb2129542021-11-23 15:49:42 -0600624 _opts["scheduled_to"] = "11/23/2021, 21:48:20+0000"
Alex3034ba52021-11-13 17:06:45 -0600625 _shell.do_scheduledrun(_opts)
Alexb78191f2021-11-02 16:35:46 -0500626 _shell()
627 _times = _shell.get_resultlist()
628 print("# results:\n{}".format("\n".join(_times)))
629 # print(
630 # "##### Dumping results\n{}".format(
631 # json.dumps(_shell.get_result(_times[0]), indent=2)
632 # )
633 # )
634 _shell.fio_reset()
635 _opts = _shell.get_options()
636 _opts["readwrite"] = "read"
637 _opts["ramp_time"] = "1s"
638 _opts["runtime"] = "10s"
Alexb2129542021-11-23 15:49:42 -0600639 # _opts["scheduled_to"] = "11/23/2021, 21:40:30+0000"
640 _shell.do_singlerun(_opts)
Alexb78191f2021-11-02 16:35:46 -0500641 _shell()
642 _times = _shell.get_resultlist()
643 print("# results:\n{}".format("\n".join(_times)))