blob: f47566bd31b9b4710b930cd4e15d27c8757597be [file] [log] [blame]
Alex0989ecf2022-03-29 13:43:21 -05001# Author: Alex Savatieiev (osavatieiev@mirantis.com; a.savex@gmail.com)
2# Copyright 2019-2022 Mirantis, Inc.
Alexb78191f2021-11-02 16:35:46 -05003import json
4import os
5import queue
6import signal
7import subprocess
8
9
10from copy import deepcopy
Alex3034ba52021-11-13 17:06:45 -060011from datetime import datetime, timezone
Alexb78191f2021-11-02 16:35:46 -050012from platform import system, release, node
13from threading import Thread
14import threading
15from time import sleep
16
17
18from cfg_checker.common.exception import CheckerException
19from cfg_checker.common.other import piped_shell
20from cfg_checker.common.log import logger
21
22
Alex3034ba52021-11-13 17:06:45 -060023_datetime_fmt = "%m/%d/%Y, %H:%M:%S%z"
Alex5cace3b2021-11-10 16:40:37 -060024fio_options_common = {
25 "name": "agent_run",
26 "filename": "/cephvol/testfile",
27 "status-interval": "500ms",
28 "randrepeat": 0,
29 "verify": 0,
30 "direct": 1,
31 "gtod_reduce": 0,
32 "bs": "32k",
33 "iodepth": 16,
34 "size": "10G",
35 "readwrite": "randrw",
36 "ramp_time": "5s",
37 "runtime": "30s",
38 "ioengine": "libaio"
39}
40
41fio_options_seq = {
42 "numjobs": 1,
43 "offset_increment": "500M"
44}
45fio_options_mix = {
46 "rwmixread": 50
47}
48
Alex90ac1532021-12-09 11:13:14 -060049seq_modes = ['read', 'write']
50mix_modes = ['randrw']
51rand_modes = ['randread', 'randwrite']
52
Alex5cace3b2021-11-10 16:40:37 -060053
54def get_fio_options():
55 # Duplicate function for external option access
56 _opts = deepcopy(fio_options_common)
57 _opts.update(deepcopy(fio_options_seq))
58 _opts.update(deepcopy(fio_options_mix))
59 return _opts
Alexb78191f2021-11-02 16:35:46 -050060
61
62def output_reader(_stdout, outq):
63 for line in iter(_stdout.readline, ''):
64 outq.put(line)
65
66
67def _o(option, param, suffix=""):
68 return "--{}={}{}".format(option, param, suffix)
69
70
71def get_time(timestamp=None):
72 if not timestamp:
Alex3034ba52021-11-13 17:06:45 -060073 _t = datetime.now(timezone.utc)
Alexb78191f2021-11-02 16:35:46 -050074 else:
75 _t = datetime.fromtimestamp(timestamp)
76 return _t.strftime(_datetime_fmt)
77
78
79def _get_seconds(value):
80 # assume that we have symbol at the end
81 _suffix = value[-1]
82 if _suffix == 's':
83 return int(value[:-1])
84 elif _suffix == 'm':
85 return int(value[:-1])*60
86 elif _suffix == 'h':
87 return int(value[:-1])*60*60
88 else:
89 return -1
90
91
92def wait_until(end_datetime):
93 while True:
Alex3034ba52021-11-13 17:06:45 -060094 diff = (end_datetime - datetime.now(timezone.utc)).total_seconds()
Alexb78191f2021-11-02 16:35:46 -050095 # In case end_datetime was in past to begin with
96 if diff < 0:
97 return
98 sleep(diff/2)
99 if diff <= 0.1:
100 return
101
102
103class ShellThread(object):
104 def __init__(self, cmd, queue):
105 self.cmd = cmd
106 self.queue = queue
107 self._p = None
108 self.timeout = 15
109 self.output = []
110
111 def run_shell(self):
112 # Start
113 _cmd = " ".join(self.cmd)
114 logger.debug("... {}".format(_cmd))
115 self._p = subprocess.Popen(
116 _cmd,
117 shell=True,
118 stdout=subprocess.PIPE,
119 stderr=subprocess.STDOUT,
120 env={"PYTHONUNBUFFERED": "1"},
121 universal_newlines=True,
122 bufsize=1
123 )
124 self._t = threading.Thread(
125 target=output_reader,
126 args=(self._p.stdout, self.queue)
127 )
128 self._t.start()
129 if not self.wait_started():
130 self.kill_shell()
131
132 def is_alive(self):
133 if not self._p.poll():
134 return True
135 else:
136 return False
137
138 def wait_started(self):
139 while True:
140 if not self.queue.empty():
141 break
142 else:
143 logger.debug("... {} sec".format(self.timeout))
144 sleep(1)
145 self.timeout -= 1
146 if not self.timeout:
147 logger.debug(
148 "...timed out after {} sec".format(str(self.timeout))
149 )
150 return False
151 logger.debug("... got first fio output")
152 return True
153
154 def kill_shell(self):
155 # Run the poll
156 if not self._p.poll():
157 self._p.send_signal(signal.SIGINT)
158 self.get_output()
159
160 def get_output(self):
161 while True:
162 try:
163 line = self.queue.get(block=False)
164 line = str(line) if isinstance(line, bytes) else line
165 self.output.append(line)
166 except queue.Empty:
167 return self.output
168 return None
169
170
171class FioProcess(Thread):
172 # init vars for status
173 _fio_options_list = [
174 "--time_based",
175 "--output-format=json+",
176 "--eta=always"
177 ]
178 _fio_options_seq_list = [
179 "--thread"
180 ]
181
Alex5cace3b2021-11-10 16:40:37 -0600182 _fio_options_common = fio_options_common
183 _fio_options_seq = fio_options_seq
184 _fio_options_mix = fio_options_mix
Alexb78191f2021-11-02 16:35:46 -0500185
186 eta_sec = 0
187 total_time_sec = 0
188 elapsed_sec = 0
189 testrun = {}
190
Alex5cace3b2021-11-10 16:40:37 -0600191 mount_point = "/cephvol"
Alexb78191f2021-11-02 16:35:46 -0500192 filename = "testfile"
193
194 # test modes: 'randread', 'randwrite', 'read', 'write', 'randrw'
195 mode = "randrw"
Alex90ac1532021-12-09 11:13:14 -0600196 _seq_modes = seq_modes
197 _mix_modes = mix_modes
198 _rand_modes = rand_modes
Alexb78191f2021-11-02 16:35:46 -0500199
200 # results
201 results = {}
202
203 def _shell(self, cmd):
204 self._code, self._shell_output = piped_shell(cmd, code=True)
205 if self._code:
206 logger.error(
207 "# Shell error for '{}': [{}] {}".format(
208 cmd,
209 self._code,
210 self._shell_output
211 )
212 )
213 return False
214 else:
215 return True
216
217 def recalc_times(self):
218 _rt = _get_seconds(self._fio_options_common["runtime"])
219 _rup = _get_seconds(self._fio_options_common["ramp_time"])
220 if not _rt:
221 raise CheckerException("invalid 'runtime': '{}'".format(_rt))
222 elif not _rup:
223 raise CheckerException("invalid 'ramp_time': '{}'".format(_rt))
224
225 self.total_time_sec = _rt + _rup
226 self.eta_sec = self.total_time_sec
227
228 def __init__(self):
229 Thread.__init__(self)
230 logger.info("fio thread initialized")
231 # save system
232 self.system = system()
233 self.release = release()
234 self.hostname = node()
235 # create a clear var for last shell output
236 self._shell_output = ""
237 # prepare params
238 self.recalc_times()
239 # prepare the fio
240 self.fio_version = "unknown"
241 if not self._shell("fio --version"):
242 raise CheckerException(
243 "Error running fio: '{}'".format(self._shell_output)
244 )
245 else:
246 self.fio_version = self._shell_output
247 # all outputs timeline
248 self.timeline = {}
Alex5cace3b2021-11-10 16:40:37 -0600249 # setup target file
250 if not os.path.exists(self.mount_point):
251 logger.warning(
252 "WARNING: '{}' not exists, using tmp folder".format(
253 self.mount_point
254 )
255 )
256 self.mount_point = "/tmp"
Alexb78191f2021-11-02 16:35:46 -0500257 self._fio_options_common["filename"] = os.path.join(
258 self.mount_point,
259 self.filename
260 )
261
262 if self.system == "Darwin":
263 self._fio_options_common["ioengine"] = "posixaio"
264 # Thread finish marker
265 self.finished = False
Alexb2129542021-11-23 15:49:42 -0600266 self.testrun_starttime = None
Alexb78191f2021-11-02 16:35:46 -0500267 self.scheduled_datetime = None
268
269 def update_options(self, _dict):
270 # validate keys, do not validate numbers themselves
271 for k, v in _dict.items():
272 if k in self._fio_options_mix:
273 self._fio_options_mix[k] = v
274 elif k in self._fio_options_seq:
275 self._fio_options_seq[k] = v
276 elif k in self._fio_options_common:
277 self._fio_options_common[k] = v
278 else:
279 raise CheckerException(
Alex2a7657c2021-11-10 20:51:34 -0600280 "Unknown option: '{}': '{}'".format(k, v)
Alexb78191f2021-11-02 16:35:46 -0500281 )
282 # recalc
283 self.recalc_times()
284
285 def run(self):
286 def _cut(_list, _s, _e):
287 _new = _list[_s:_e]
288 _pre = _list[:_s]
289 _list = _pre + _list[_e:]
290 return (_new, _list)
291
292 # create a cmd
293 _cmd = ["fio"]
294 _cmd += self._fio_options_list
295 _cmd += [_o(k, v) for k, v in self._fio_options_common.items()]
296
297 if self._fio_options_common["readwrite"] in self._seq_modes:
298 _sq = self._fio_options_seq_list
299 _cmd += _sq + [_o(k, v) for k, v in self._fio_options_seq.items()]
300 elif self._fio_options_common["readwrite"] in self._mix_modes:
301 _cmd += [_o(k, v) for k, v in self._fio_options_mix.items()]
302
303 _q = queue.Queue()
304 self.fiorun = ShellThread(_cmd, _q)
305 # Check if schedule is set
Alexb2129542021-11-23 15:49:42 -0600306 _now = datetime.now(timezone.utc)
Alexb78191f2021-11-02 16:35:46 -0500307 if self.scheduled_datetime:
308 logger.debug(
309 "waiting for '{}', now is '{}', total of {} sec left".format(
310 self.scheduled_datetime.strftime(_datetime_fmt),
Alex3034ba52021-11-13 17:06:45 -0600311 _now.strftime(_datetime_fmt),
312 (self.scheduled_datetime - _now).total_seconds()
Alexb78191f2021-11-02 16:35:46 -0500313 )
314 )
315 wait_until(self.scheduled_datetime)
Alexb2129542021-11-23 15:49:42 -0600316 else:
317 self.testrun_starttime = _now.strftime(_datetime_fmt)
Alexb78191f2021-11-02 16:35:46 -0500318 self.fiorun.run_shell()
319 _raw = []
320 _start = -1
321 _end = -1
322 while self.fiorun.is_alive() or not _q.empty():
323 while not _q.empty():
324 # processing
325 _bb = _q.get(block=False)
326 if isinstance(_bb, bytes):
327 _line = _bb.decode('utf-8')
328 else:
329 _line = _bb
330 if _start < 0 and _end < 0 and not _line.startswith("{"):
Alex30380a42021-12-20 16:11:20 -0600331 self.results[self.testrun_starttime] = {
Alexb78191f2021-11-02 16:35:46 -0500332 "error": _line
333 }
334 self.eta = -1
335 self.fiorun.kill_shell()
Alex2a7657c2021-11-10 20:51:34 -0600336 self.finished = True
Alexb78191f2021-11-02 16:35:46 -0500337 return
338 _current = _line.splitlines()
339 _raw += _current
340 for ll in range(len(_raw)):
341 if _start < 0 and _raw[ll] == "{":
342 _start = ll
343 elif _end < 0 and _raw[ll] == "}":
344 _end = ll
345 # loop until we have full json
346 if _end < 0 or _start < 0:
347 continue
348 # if start and and found, cut json
349 (_json, _raw) = _cut(_raw, _start, _end+1)
350 _start = -1
351 _end = -1
352 # Try to parse json
353 _json = "\n".join(_json)
354 try:
355 _json = json.loads(_json)
356 _timestamp = _json["timestamp"]
357 self.timeline[_timestamp] = _json["jobs"][0]
358
359 # save last values
360 self.eta_sec = self.timeline[_timestamp]["eta"]
361 self.elapsed_sec = self.timeline[_timestamp]["elapsed"]
362 self.testrun = _json
363 except TypeError as e:
364 logger.error("ERROR: {}".format(e))
365 except json.decoder.JSONDecodeError as e:
366 logger.error("ERROR: {}".format(e))
367 if not self.eta_sec:
368 break
369 sleep(0.1)
370 # Save status to results dictionary
Alexb2129542021-11-23 15:49:42 -0600371 self.results[self.testrun_starttime] = {
Alexb78191f2021-11-02 16:35:46 -0500372 "result": self.testrun,
373 "timeline": self.timeline
374 }
375 self.finished = True
Alexb2129542021-11-23 15:49:42 -0600376 self.scheduled_datetime = None
377 self.testrun_starttime = None
Alexb78191f2021-11-02 16:35:46 -0500378 return
379
380 def healthcheck(self):
381 _version = self.fio_version
382 _binary_path = self._shell_output if self._shell("which fio") else ""
383 if self._shell("fio --enghelp"):
384 _ioengines = self._shell_output
385 _ioengines = _ioengines.replace("\t", "")
386 _ioengines = _ioengines.splitlines()[1:]
Alex2a7657c2021-11-10 20:51:34 -0600387 self._shell_output = ""
Alexb78191f2021-11-02 16:35:46 -0500388 else:
389 _ioengines = []
390
391 return {
392 "ready": all((_version, _binary_path, _ioengines)),
393 "version": _version,
394 "path": _binary_path,
395 "ioengines": _ioengines,
396 "system": self.system,
397 "release": self.release,
398 "hostname": self.hostname
399 }
400
401 def status(self):
402 _running = self.is_alive() and self.eta_sec >= 0
403 _scheduled = False
404 _diff = -1
405 if self.scheduled_datetime:
Alex3034ba52021-11-13 17:06:45 -0600406 _now = datetime.now(timezone.utc)
407 _diff = (self.scheduled_datetime - _now).total_seconds()
Alexb78191f2021-11-02 16:35:46 -0500408 if _diff > 0:
409 _scheduled = True
410 _s = "running" if _running else "idle"
411 _s = "scheduled" if _scheduled else _s
412 _s = "finished" if self.finished else _s
413 return {
414 "status": _s,
415 "progress": self.get_progress()
416 }
417
418 def end_fio(self):
419 if self.fiorun:
420 self.fiorun.kill_shell()
421
422 # Current run
423 def percent_done(self):
424 _total = self.elapsed_sec + self.eta_sec
425 return float(self.elapsed_sec) / float(_total) * 100.0
426
427 def get_progress(self):
428 return "{:.2f}".format(self.percent_done())
429
430 # latest parsed measurements
431 def get_last_measurements(self):
432 if self.timeline:
433 return self.timeline[max(list(self.timeline.keys()))]
434 else:
435 return {}
436
437
438class FioProcessShellRun(object):
439 stats = {}
440 results = {}
441
442 def __init__(self, init_class=FioProcess):
443 self.init_class = init_class
444 self.actions = {
445 "do_singlerun": self.do_singlerun,
446 "do_scheduledrun": self.do_scheduledrun,
447 "get_options": self.get_options,
448 "get_result": self.get_result,
449 "get_resultlist": self.get_resultlist
450 }
451 self.fio_reset()
452
453 @staticmethod
454 def healthcheck(fio):
455 hchk = fio.healthcheck()
456 hchk_str = \
457 "# fio status: {}\n# {} at {}\n# Engines: {}".format(
458 "ready" if hchk["ready"] else "fail",
459 hchk["version"],
460 hchk["path"],
461 ", ".join(hchk["ioengines"])
462 )
463 return hchk, hchk_str
464
465 def status(self):
466 return self.fio.status()
467
468 def fio_reset(self):
469 # Fancy way of handling fio class not even initialized yet
470 try:
471 _f = self.fio.finished
472 _r = self.fio.results
473 _o = self.fio.get_options()
474 except AttributeError:
475 _f = True
476 _r = None
477 _o = None
478 # Check if reset is needed
479 if not _f:
480 # No need to reset, fio is either idle or running
481 return
482 else:
483 # extract results if they present
484 if _r:
485 self.results.update(_r)
486 # re-init
487 _fio = self.init_class()
488 # Do healthcheck
489 self.hchk, self.hchk_str = self.healthcheck(_fio)
490 # restore options if they existed
491 if _o:
492 _fio.update_options(_o)
493 self.fio = _fio
494
495 def get_options(self):
496 _opts = deepcopy(self.fio._fio_options_common)
497 _opts.update(deepcopy(self.fio._fio_options_seq))
498 _opts.update(deepcopy(self.fio._fio_options_mix))
499 return _opts
500
501 def do_singlerun(self, options):
502 # Reset thread if it closed
503 self.fio_reset()
504 # Fill options
Alex2a7657c2021-11-10 20:51:34 -0600505 if "scheduled_to" in options:
506 # just ignore it
507 _k = "scheduled_to"
508 _v = options.pop(_k)
509 logger.warning("Ignoring option: '{}': '{}'".format(_k, _v))
Alexb78191f2021-11-02 16:35:46 -0500510 self.fio.update_options(options)
511 # Start it
512 self.fio.start()
513 return True
514
515 def do_scheduledrun(self, options):
516 # Reset thread if it closed
517 self.fio_reset()
518 # Handle scheduled time
519 if "scheduled_to" not in options:
520 # required parameter not set
Alex2a7657c2021-11-10 20:51:34 -0600521 raise CheckerException("Parameter missing: 'scheduled_to'")
Alexb78191f2021-11-02 16:35:46 -0500522 else:
523 # set time and get rid of it from options
Alexb2129542021-11-23 15:49:42 -0600524 self.fio.testrun_starttime = options.pop("scheduled_to")
Alex5cace3b2021-11-10 16:40:37 -0600525 self.fio.scheduled_datetime = datetime.strptime(
Alexb2129542021-11-23 15:49:42 -0600526 self.fio.testrun_starttime,
Alex5cace3b2021-11-10 16:40:37 -0600527 _datetime_fmt
528 )
Alexb78191f2021-11-02 16:35:46 -0500529 # Fill options
530 self.fio.update_options(options)
531 # Start it
532 self.fio.start()
533 return True
534
535 def _get_result_object(self, obj_name, time):
536 if time in self.results:
537 if obj_name in self.results[time]:
538 return self.results[time][obj_name]
539 elif "error" in self.results[time]:
540 return self.results[time]["error"]
541 else:
542 return {
543 "error": "Empty {} for '{}'".format(obj_name, time)
544 }
545 else:
546 return {
547 "error": "Result not found for '{}'".format(time)
548 }
549
550 def _update_results(self):
551 # Update only in case of completed thread
552 if self.fio.finished:
553 _r_local = list(self.results.keys())
554 _r_fio = list(self.fio.results.keys())
555 for _r in _r_fio:
556 if _r not in _r_local:
557 self.results[_r] = self.fio.results.pop(_r)
558
559 def get_result(self, time):
560 self._update_results()
561 return self._get_result_object('result', time)
562
563 def get_result_timeline(self, time):
564 self._update_results()
565 return self._get_result_object('timeline', time)
566
567 # reporting
568 def get_resultlist(self):
569 self._update_results()
570 return list(self.results.keys())
571
572 def __call__(self):
573 if not self.fio.is_alive() and not self.fio.finished:
574 self.fio.start()
575
576 while self.fio.is_alive() and self.fio.eta_sec >= 0:
577 sleep(0.2)
578 self.stats = self.fio.get_last_measurements()
579
580 _r = self.stats.get('read', {})
581 _w = self.stats.get('write', {})
582
583 _r_bw = _r.get('bw_bytes', -1)
584 _r_iops = _r.get('iops', -1)
585 _w_bw = _w.get('bw_bytes', -1)
586 _w_iops = _w.get('iops', -1)
587 _s = self.fio.status()
588 if _s["status"] == "scheduled":
589 _t = self.fio.scheduled_datetime
Alex3034ba52021-11-13 17:06:45 -0600590 _n = datetime.now(timezone.utc)
Alexb78191f2021-11-02 16:35:46 -0500591 _delta = (_t - _n).total_seconds()
592 print(
Alex3034ba52021-11-13 17:06:45 -0600593 "{}: waiting for '{}'; now '{}'; {} sec left".format(
594 _s["status"],
Alexb78191f2021-11-02 16:35:46 -0500595 _t.strftime(_datetime_fmt),
596 _n.strftime(_datetime_fmt),
597 _delta
598 )
599 )
600 else:
601 stats = "{}: {:>7}% ({}/{}) " \
602 "(BW/IOPS: " \
603 "Read {:>9.2f} MB/{:>9.2f} " \
604 "Write {:>9.2f} MB/{:>9.2f})".format(
605 _s["status"],
606 _s["progress"],
607 self.fio.elapsed_sec,
608 self.fio.eta_sec,
609 _r_bw / 1024 / 1024,
610 _r_iops,
611 _w_bw / 1024 / 1024,
612 _w_iops
613 )
614 print(stats)
615 self.fio.end_fio()
616
617
618if __name__ == '__main__':
619 # Debug shell to test FioProcessShellRun
620 _shell = FioProcessShellRun()
621 _opts = _shell.get_options()
622 _opts["readwrite"] = "read"
623 _opts["ramp_time"] = "1s"
624 _opts["runtime"] = "5s"
Alexb2129542021-11-23 15:49:42 -0600625 _opts["scheduled_to"] = "11/23/2021, 21:48:20+0000"
Alex3034ba52021-11-13 17:06:45 -0600626 _shell.do_scheduledrun(_opts)
Alexb78191f2021-11-02 16:35:46 -0500627 _shell()
628 _times = _shell.get_resultlist()
629 print("# results:\n{}".format("\n".join(_times)))
630 # print(
631 # "##### Dumping results\n{}".format(
632 # json.dumps(_shell.get_result(_times[0]), indent=2)
633 # )
634 # )
635 _shell.fio_reset()
636 _opts = _shell.get_options()
637 _opts["readwrite"] = "read"
638 _opts["ramp_time"] = "1s"
639 _opts["runtime"] = "10s"
Alexb2129542021-11-23 15:49:42 -0600640 # _opts["scheduled_to"] = "11/23/2021, 21:40:30+0000"
641 _shell.do_singlerun(_opts)
Alexb78191f2021-11-02 16:35:46 -0500642 _shell()
643 _times = _shell.get_resultlist()
644 print("# results:\n{}".format("\n".join(_times)))