blob: db012ac4a998d168541d7635292db621d458c2fe [file] [log] [blame]
Alexb78191f2021-11-02 16:35:46 -05001import json
2import os
3import queue
4import signal
5import subprocess
6
7
8from copy import deepcopy
Alex3034ba52021-11-13 17:06:45 -06009from datetime import datetime, timezone
Alexb78191f2021-11-02 16:35:46 -050010from platform import system, release, node
11from threading import Thread
12import threading
13from time import sleep
14
15
16from cfg_checker.common.exception import CheckerException
17from cfg_checker.common.other import piped_shell
18from cfg_checker.common.log import logger
19
20
Alex3034ba52021-11-13 17:06:45 -060021_datetime_fmt = "%m/%d/%Y, %H:%M:%S%z"
Alex5cace3b2021-11-10 16:40:37 -060022fio_options_common = {
23 "name": "agent_run",
24 "filename": "/cephvol/testfile",
25 "status-interval": "500ms",
26 "randrepeat": 0,
27 "verify": 0,
28 "direct": 1,
29 "gtod_reduce": 0,
30 "bs": "32k",
31 "iodepth": 16,
32 "size": "10G",
33 "readwrite": "randrw",
34 "ramp_time": "5s",
35 "runtime": "30s",
36 "ioengine": "libaio"
37}
38
39fio_options_seq = {
40 "numjobs": 1,
41 "offset_increment": "500M"
42}
43fio_options_mix = {
44 "rwmixread": 50
45}
46
Alex90ac1532021-12-09 11:13:14 -060047seq_modes = ['read', 'write']
48mix_modes = ['randrw']
49rand_modes = ['randread', 'randwrite']
50
Alex5cace3b2021-11-10 16:40:37 -060051
52def get_fio_options():
53 # Duplicate function for external option access
54 _opts = deepcopy(fio_options_common)
55 _opts.update(deepcopy(fio_options_seq))
56 _opts.update(deepcopy(fio_options_mix))
57 return _opts
Alexb78191f2021-11-02 16:35:46 -050058
59
60def output_reader(_stdout, outq):
61 for line in iter(_stdout.readline, ''):
62 outq.put(line)
63
64
65def _o(option, param, suffix=""):
66 return "--{}={}{}".format(option, param, suffix)
67
68
69def get_time(timestamp=None):
70 if not timestamp:
Alex3034ba52021-11-13 17:06:45 -060071 _t = datetime.now(timezone.utc)
Alexb78191f2021-11-02 16:35:46 -050072 else:
73 _t = datetime.fromtimestamp(timestamp)
74 return _t.strftime(_datetime_fmt)
75
76
77def _get_seconds(value):
78 # assume that we have symbol at the end
79 _suffix = value[-1]
80 if _suffix == 's':
81 return int(value[:-1])
82 elif _suffix == 'm':
83 return int(value[:-1])*60
84 elif _suffix == 'h':
85 return int(value[:-1])*60*60
86 else:
87 return -1
88
89
90def wait_until(end_datetime):
91 while True:
Alex3034ba52021-11-13 17:06:45 -060092 diff = (end_datetime - datetime.now(timezone.utc)).total_seconds()
Alexb78191f2021-11-02 16:35:46 -050093 # In case end_datetime was in past to begin with
94 if diff < 0:
95 return
96 sleep(diff/2)
97 if diff <= 0.1:
98 return
99
100
101class ShellThread(object):
102 def __init__(self, cmd, queue):
103 self.cmd = cmd
104 self.queue = queue
105 self._p = None
106 self.timeout = 15
107 self.output = []
108
109 def run_shell(self):
110 # Start
111 _cmd = " ".join(self.cmd)
112 logger.debug("... {}".format(_cmd))
113 self._p = subprocess.Popen(
114 _cmd,
115 shell=True,
116 stdout=subprocess.PIPE,
117 stderr=subprocess.STDOUT,
118 env={"PYTHONUNBUFFERED": "1"},
119 universal_newlines=True,
120 bufsize=1
121 )
122 self._t = threading.Thread(
123 target=output_reader,
124 args=(self._p.stdout, self.queue)
125 )
126 self._t.start()
127 if not self.wait_started():
128 self.kill_shell()
129
130 def is_alive(self):
131 if not self._p.poll():
132 return True
133 else:
134 return False
135
136 def wait_started(self):
137 while True:
138 if not self.queue.empty():
139 break
140 else:
141 logger.debug("... {} sec".format(self.timeout))
142 sleep(1)
143 self.timeout -= 1
144 if not self.timeout:
145 logger.debug(
146 "...timed out after {} sec".format(str(self.timeout))
147 )
148 return False
149 logger.debug("... got first fio output")
150 return True
151
152 def kill_shell(self):
153 # Run the poll
154 if not self._p.poll():
155 self._p.send_signal(signal.SIGINT)
156 self.get_output()
157
158 def get_output(self):
159 while True:
160 try:
161 line = self.queue.get(block=False)
162 line = str(line) if isinstance(line, bytes) else line
163 self.output.append(line)
164 except queue.Empty:
165 return self.output
166 return None
167
168
169class FioProcess(Thread):
170 # init vars for status
171 _fio_options_list = [
172 "--time_based",
173 "--output-format=json+",
174 "--eta=always"
175 ]
176 _fio_options_seq_list = [
177 "--thread"
178 ]
179
Alex5cace3b2021-11-10 16:40:37 -0600180 _fio_options_common = fio_options_common
181 _fio_options_seq = fio_options_seq
182 _fio_options_mix = fio_options_mix
Alexb78191f2021-11-02 16:35:46 -0500183
184 eta_sec = 0
185 total_time_sec = 0
186 elapsed_sec = 0
187 testrun = {}
188
Alex5cace3b2021-11-10 16:40:37 -0600189 mount_point = "/cephvol"
Alexb78191f2021-11-02 16:35:46 -0500190 filename = "testfile"
191
192 # test modes: 'randread', 'randwrite', 'read', 'write', 'randrw'
193 mode = "randrw"
Alex90ac1532021-12-09 11:13:14 -0600194 _seq_modes = seq_modes
195 _mix_modes = mix_modes
196 _rand_modes = rand_modes
Alexb78191f2021-11-02 16:35:46 -0500197
198 # results
199 results = {}
200
201 def _shell(self, cmd):
202 self._code, self._shell_output = piped_shell(cmd, code=True)
203 if self._code:
204 logger.error(
205 "# Shell error for '{}': [{}] {}".format(
206 cmd,
207 self._code,
208 self._shell_output
209 )
210 )
211 return False
212 else:
213 return True
214
215 def recalc_times(self):
216 _rt = _get_seconds(self._fio_options_common["runtime"])
217 _rup = _get_seconds(self._fio_options_common["ramp_time"])
218 if not _rt:
219 raise CheckerException("invalid 'runtime': '{}'".format(_rt))
220 elif not _rup:
221 raise CheckerException("invalid 'ramp_time': '{}'".format(_rt))
222
223 self.total_time_sec = _rt + _rup
224 self.eta_sec = self.total_time_sec
225
226 def __init__(self):
227 Thread.__init__(self)
228 logger.info("fio thread initialized")
229 # save system
230 self.system = system()
231 self.release = release()
232 self.hostname = node()
233 # create a clear var for last shell output
234 self._shell_output = ""
235 # prepare params
236 self.recalc_times()
237 # prepare the fio
238 self.fio_version = "unknown"
239 if not self._shell("fio --version"):
240 raise CheckerException(
241 "Error running fio: '{}'".format(self._shell_output)
242 )
243 else:
244 self.fio_version = self._shell_output
245 # all outputs timeline
246 self.timeline = {}
Alex5cace3b2021-11-10 16:40:37 -0600247 # setup target file
248 if not os.path.exists(self.mount_point):
249 logger.warning(
250 "WARNING: '{}' not exists, using tmp folder".format(
251 self.mount_point
252 )
253 )
254 self.mount_point = "/tmp"
Alexb78191f2021-11-02 16:35:46 -0500255 self._fio_options_common["filename"] = os.path.join(
256 self.mount_point,
257 self.filename
258 )
259
260 if self.system == "Darwin":
261 self._fio_options_common["ioengine"] = "posixaio"
262 # Thread finish marker
263 self.finished = False
Alexb2129542021-11-23 15:49:42 -0600264 self.testrun_starttime = None
Alexb78191f2021-11-02 16:35:46 -0500265 self.scheduled_datetime = None
266
267 def update_options(self, _dict):
268 # validate keys, do not validate numbers themselves
269 for k, v in _dict.items():
270 if k in self._fio_options_mix:
271 self._fio_options_mix[k] = v
272 elif k in self._fio_options_seq:
273 self._fio_options_seq[k] = v
274 elif k in self._fio_options_common:
275 self._fio_options_common[k] = v
276 else:
277 raise CheckerException(
Alex2a7657c2021-11-10 20:51:34 -0600278 "Unknown option: '{}': '{}'".format(k, v)
Alexb78191f2021-11-02 16:35:46 -0500279 )
280 # recalc
281 self.recalc_times()
282
283 def run(self):
284 def _cut(_list, _s, _e):
285 _new = _list[_s:_e]
286 _pre = _list[:_s]
287 _list = _pre + _list[_e:]
288 return (_new, _list)
289
290 # create a cmd
291 _cmd = ["fio"]
292 _cmd += self._fio_options_list
293 _cmd += [_o(k, v) for k, v in self._fio_options_common.items()]
294
295 if self._fio_options_common["readwrite"] in self._seq_modes:
296 _sq = self._fio_options_seq_list
297 _cmd += _sq + [_o(k, v) for k, v in self._fio_options_seq.items()]
298 elif self._fio_options_common["readwrite"] in self._mix_modes:
299 _cmd += [_o(k, v) for k, v in self._fio_options_mix.items()]
300
301 _q = queue.Queue()
302 self.fiorun = ShellThread(_cmd, _q)
303 # Check if schedule is set
Alexb2129542021-11-23 15:49:42 -0600304 _now = datetime.now(timezone.utc)
Alexb78191f2021-11-02 16:35:46 -0500305 if self.scheduled_datetime:
306 logger.debug(
307 "waiting for '{}', now is '{}', total of {} sec left".format(
308 self.scheduled_datetime.strftime(_datetime_fmt),
Alex3034ba52021-11-13 17:06:45 -0600309 _now.strftime(_datetime_fmt),
310 (self.scheduled_datetime - _now).total_seconds()
Alexb78191f2021-11-02 16:35:46 -0500311 )
312 )
313 wait_until(self.scheduled_datetime)
Alexb2129542021-11-23 15:49:42 -0600314 else:
315 self.testrun_starttime = _now.strftime(_datetime_fmt)
Alexb78191f2021-11-02 16:35:46 -0500316 self.fiorun.run_shell()
317 _raw = []
318 _start = -1
319 _end = -1
320 while self.fiorun.is_alive() or not _q.empty():
321 while not _q.empty():
322 # processing
323 _bb = _q.get(block=False)
324 if isinstance(_bb, bytes):
325 _line = _bb.decode('utf-8')
326 else:
327 _line = _bb
328 if _start < 0 and _end < 0 and not _line.startswith("{"):
Alex30380a42021-12-20 16:11:20 -0600329 self.results[self.testrun_starttime] = {
Alexb78191f2021-11-02 16:35:46 -0500330 "error": _line
331 }
332 self.eta = -1
333 self.fiorun.kill_shell()
Alex2a7657c2021-11-10 20:51:34 -0600334 self.finished = True
Alexb78191f2021-11-02 16:35:46 -0500335 return
336 _current = _line.splitlines()
337 _raw += _current
338 for ll in range(len(_raw)):
339 if _start < 0 and _raw[ll] == "{":
340 _start = ll
341 elif _end < 0 and _raw[ll] == "}":
342 _end = ll
343 # loop until we have full json
344 if _end < 0 or _start < 0:
345 continue
346 # if start and and found, cut json
347 (_json, _raw) = _cut(_raw, _start, _end+1)
348 _start = -1
349 _end = -1
350 # Try to parse json
351 _json = "\n".join(_json)
352 try:
353 _json = json.loads(_json)
354 _timestamp = _json["timestamp"]
355 self.timeline[_timestamp] = _json["jobs"][0]
356
357 # save last values
358 self.eta_sec = self.timeline[_timestamp]["eta"]
359 self.elapsed_sec = self.timeline[_timestamp]["elapsed"]
360 self.testrun = _json
361 except TypeError as e:
362 logger.error("ERROR: {}".format(e))
363 except json.decoder.JSONDecodeError as e:
364 logger.error("ERROR: {}".format(e))
365 if not self.eta_sec:
366 break
367 sleep(0.1)
368 # Save status to results dictionary
Alexb2129542021-11-23 15:49:42 -0600369 self.results[self.testrun_starttime] = {
Alexb78191f2021-11-02 16:35:46 -0500370 "result": self.testrun,
371 "timeline": self.timeline
372 }
373 self.finished = True
Alexb2129542021-11-23 15:49:42 -0600374 self.scheduled_datetime = None
375 self.testrun_starttime = None
Alexb78191f2021-11-02 16:35:46 -0500376 return
377
378 def healthcheck(self):
379 _version = self.fio_version
380 _binary_path = self._shell_output if self._shell("which fio") else ""
381 if self._shell("fio --enghelp"):
382 _ioengines = self._shell_output
383 _ioengines = _ioengines.replace("\t", "")
384 _ioengines = _ioengines.splitlines()[1:]
Alex2a7657c2021-11-10 20:51:34 -0600385 self._shell_output = ""
Alexb78191f2021-11-02 16:35:46 -0500386 else:
387 _ioengines = []
388
389 return {
390 "ready": all((_version, _binary_path, _ioengines)),
391 "version": _version,
392 "path": _binary_path,
393 "ioengines": _ioengines,
394 "system": self.system,
395 "release": self.release,
396 "hostname": self.hostname
397 }
398
399 def status(self):
400 _running = self.is_alive() and self.eta_sec >= 0
401 _scheduled = False
402 _diff = -1
403 if self.scheduled_datetime:
Alex3034ba52021-11-13 17:06:45 -0600404 _now = datetime.now(timezone.utc)
405 _diff = (self.scheduled_datetime - _now).total_seconds()
Alexb78191f2021-11-02 16:35:46 -0500406 if _diff > 0:
407 _scheduled = True
408 _s = "running" if _running else "idle"
409 _s = "scheduled" if _scheduled else _s
410 _s = "finished" if self.finished else _s
411 return {
412 "status": _s,
413 "progress": self.get_progress()
414 }
415
416 def end_fio(self):
417 if self.fiorun:
418 self.fiorun.kill_shell()
419
420 # Current run
421 def percent_done(self):
422 _total = self.elapsed_sec + self.eta_sec
423 return float(self.elapsed_sec) / float(_total) * 100.0
424
425 def get_progress(self):
426 return "{:.2f}".format(self.percent_done())
427
428 # latest parsed measurements
429 def get_last_measurements(self):
430 if self.timeline:
431 return self.timeline[max(list(self.timeline.keys()))]
432 else:
433 return {}
434
435
436class FioProcessShellRun(object):
437 stats = {}
438 results = {}
439
440 def __init__(self, init_class=FioProcess):
441 self.init_class = init_class
442 self.actions = {
443 "do_singlerun": self.do_singlerun,
444 "do_scheduledrun": self.do_scheduledrun,
445 "get_options": self.get_options,
446 "get_result": self.get_result,
447 "get_resultlist": self.get_resultlist
448 }
449 self.fio_reset()
450
451 @staticmethod
452 def healthcheck(fio):
453 hchk = fio.healthcheck()
454 hchk_str = \
455 "# fio status: {}\n# {} at {}\n# Engines: {}".format(
456 "ready" if hchk["ready"] else "fail",
457 hchk["version"],
458 hchk["path"],
459 ", ".join(hchk["ioengines"])
460 )
461 return hchk, hchk_str
462
463 def status(self):
464 return self.fio.status()
465
466 def fio_reset(self):
467 # Fancy way of handling fio class not even initialized yet
468 try:
469 _f = self.fio.finished
470 _r = self.fio.results
471 _o = self.fio.get_options()
472 except AttributeError:
473 _f = True
474 _r = None
475 _o = None
476 # Check if reset is needed
477 if not _f:
478 # No need to reset, fio is either idle or running
479 return
480 else:
481 # extract results if they present
482 if _r:
483 self.results.update(_r)
484 # re-init
485 _fio = self.init_class()
486 # Do healthcheck
487 self.hchk, self.hchk_str = self.healthcheck(_fio)
488 # restore options if they existed
489 if _o:
490 _fio.update_options(_o)
491 self.fio = _fio
492
493 def get_options(self):
494 _opts = deepcopy(self.fio._fio_options_common)
495 _opts.update(deepcopy(self.fio._fio_options_seq))
496 _opts.update(deepcopy(self.fio._fio_options_mix))
497 return _opts
498
499 def do_singlerun(self, options):
500 # Reset thread if it closed
501 self.fio_reset()
502 # Fill options
Alex2a7657c2021-11-10 20:51:34 -0600503 if "scheduled_to" in options:
504 # just ignore it
505 _k = "scheduled_to"
506 _v = options.pop(_k)
507 logger.warning("Ignoring option: '{}': '{}'".format(_k, _v))
Alexb78191f2021-11-02 16:35:46 -0500508 self.fio.update_options(options)
509 # Start it
510 self.fio.start()
511 return True
512
513 def do_scheduledrun(self, options):
514 # Reset thread if it closed
515 self.fio_reset()
516 # Handle scheduled time
517 if "scheduled_to" not in options:
518 # required parameter not set
Alex2a7657c2021-11-10 20:51:34 -0600519 raise CheckerException("Parameter missing: 'scheduled_to'")
Alexb78191f2021-11-02 16:35:46 -0500520 else:
521 # set time and get rid of it from options
Alexb2129542021-11-23 15:49:42 -0600522 self.fio.testrun_starttime = options.pop("scheduled_to")
Alex5cace3b2021-11-10 16:40:37 -0600523 self.fio.scheduled_datetime = datetime.strptime(
Alexb2129542021-11-23 15:49:42 -0600524 self.fio.testrun_starttime,
Alex5cace3b2021-11-10 16:40:37 -0600525 _datetime_fmt
526 )
Alexb78191f2021-11-02 16:35:46 -0500527 # Fill options
528 self.fio.update_options(options)
529 # Start it
530 self.fio.start()
531 return True
532
533 def _get_result_object(self, obj_name, time):
534 if time in self.results:
535 if obj_name in self.results[time]:
536 return self.results[time][obj_name]
537 elif "error" in self.results[time]:
538 return self.results[time]["error"]
539 else:
540 return {
541 "error": "Empty {} for '{}'".format(obj_name, time)
542 }
543 else:
544 return {
545 "error": "Result not found for '{}'".format(time)
546 }
547
548 def _update_results(self):
549 # Update only in case of completed thread
550 if self.fio.finished:
551 _r_local = list(self.results.keys())
552 _r_fio = list(self.fio.results.keys())
553 for _r in _r_fio:
554 if _r not in _r_local:
555 self.results[_r] = self.fio.results.pop(_r)
556
557 def get_result(self, time):
558 self._update_results()
559 return self._get_result_object('result', time)
560
561 def get_result_timeline(self, time):
562 self._update_results()
563 return self._get_result_object('timeline', time)
564
565 # reporting
566 def get_resultlist(self):
567 self._update_results()
568 return list(self.results.keys())
569
570 def __call__(self):
571 if not self.fio.is_alive() and not self.fio.finished:
572 self.fio.start()
573
574 while self.fio.is_alive() and self.fio.eta_sec >= 0:
575 sleep(0.2)
576 self.stats = self.fio.get_last_measurements()
577
578 _r = self.stats.get('read', {})
579 _w = self.stats.get('write', {})
580
581 _r_bw = _r.get('bw_bytes', -1)
582 _r_iops = _r.get('iops', -1)
583 _w_bw = _w.get('bw_bytes', -1)
584 _w_iops = _w.get('iops', -1)
585 _s = self.fio.status()
586 if _s["status"] == "scheduled":
587 _t = self.fio.scheduled_datetime
Alex3034ba52021-11-13 17:06:45 -0600588 _n = datetime.now(timezone.utc)
Alexb78191f2021-11-02 16:35:46 -0500589 _delta = (_t - _n).total_seconds()
590 print(
Alex3034ba52021-11-13 17:06:45 -0600591 "{}: waiting for '{}'; now '{}'; {} sec left".format(
592 _s["status"],
Alexb78191f2021-11-02 16:35:46 -0500593 _t.strftime(_datetime_fmt),
594 _n.strftime(_datetime_fmt),
595 _delta
596 )
597 )
598 else:
599 stats = "{}: {:>7}% ({}/{}) " \
600 "(BW/IOPS: " \
601 "Read {:>9.2f} MB/{:>9.2f} " \
602 "Write {:>9.2f} MB/{:>9.2f})".format(
603 _s["status"],
604 _s["progress"],
605 self.fio.elapsed_sec,
606 self.fio.eta_sec,
607 _r_bw / 1024 / 1024,
608 _r_iops,
609 _w_bw / 1024 / 1024,
610 _w_iops
611 )
612 print(stats)
613 self.fio.end_fio()
614
615
616if __name__ == '__main__':
617 # Debug shell to test FioProcessShellRun
618 _shell = FioProcessShellRun()
619 _opts = _shell.get_options()
620 _opts["readwrite"] = "read"
621 _opts["ramp_time"] = "1s"
622 _opts["runtime"] = "5s"
Alexb2129542021-11-23 15:49:42 -0600623 _opts["scheduled_to"] = "11/23/2021, 21:48:20+0000"
Alex3034ba52021-11-13 17:06:45 -0600624 _shell.do_scheduledrun(_opts)
Alexb78191f2021-11-02 16:35:46 -0500625 _shell()
626 _times = _shell.get_resultlist()
627 print("# results:\n{}".format("\n".join(_times)))
628 # print(
629 # "##### Dumping results\n{}".format(
630 # json.dumps(_shell.get_result(_times[0]), indent=2)
631 # )
632 # )
633 _shell.fio_reset()
634 _opts = _shell.get_options()
635 _opts["readwrite"] = "read"
636 _opts["ramp_time"] = "1s"
637 _opts["runtime"] = "10s"
Alexb2129542021-11-23 15:49:42 -0600638 # _opts["scheduled_to"] = "11/23/2021, 21:40:30+0000"
639 _shell.do_singlerun(_opts)
Alexb78191f2021-11-02 16:35:46 -0500640 _shell()
641 _times = _shell.get_resultlist()
642 print("# results:\n{}".format("\n".join(_times)))