blob: 3cc6ca23d1ff5b3b3a42f0432b5ece3e72fb6543 [file] [log] [blame]
Alexb78191f2021-11-02 16:35:46 -05001import json
2import os
3import queue
4import signal
5import subprocess
6
7
8from copy import deepcopy
Alex3034ba52021-11-13 17:06:45 -06009from datetime import datetime, timezone
Alexb78191f2021-11-02 16:35:46 -050010from platform import system, release, node
11from threading import Thread
12import threading
13from time import sleep
14
15
16from cfg_checker.common.exception import CheckerException
17from cfg_checker.common.other import piped_shell
18from cfg_checker.common.log import logger
19
20
Alex3034ba52021-11-13 17:06:45 -060021_datetime_fmt = "%m/%d/%Y, %H:%M:%S%z"
Alex5cace3b2021-11-10 16:40:37 -060022fio_options_common = {
23 "name": "agent_run",
24 "filename": "/cephvol/testfile",
25 "status-interval": "500ms",
26 "randrepeat": 0,
27 "verify": 0,
28 "direct": 1,
29 "gtod_reduce": 0,
30 "bs": "32k",
31 "iodepth": 16,
32 "size": "10G",
33 "readwrite": "randrw",
34 "ramp_time": "5s",
35 "runtime": "30s",
36 "ioengine": "libaio"
37}
38
39fio_options_seq = {
40 "numjobs": 1,
41 "offset_increment": "500M"
42}
43fio_options_mix = {
44 "rwmixread": 50
45}
46
47
48def get_fio_options():
49 # Duplicate function for external option access
50 _opts = deepcopy(fio_options_common)
51 _opts.update(deepcopy(fio_options_seq))
52 _opts.update(deepcopy(fio_options_mix))
53 return _opts
Alexb78191f2021-11-02 16:35:46 -050054
55
56def output_reader(_stdout, outq):
57 for line in iter(_stdout.readline, ''):
58 outq.put(line)
59
60
61def _o(option, param, suffix=""):
62 return "--{}={}{}".format(option, param, suffix)
63
64
65def get_time(timestamp=None):
66 if not timestamp:
Alex3034ba52021-11-13 17:06:45 -060067 _t = datetime.now(timezone.utc)
Alexb78191f2021-11-02 16:35:46 -050068 else:
69 _t = datetime.fromtimestamp(timestamp)
70 return _t.strftime(_datetime_fmt)
71
72
73def _get_seconds(value):
74 # assume that we have symbol at the end
75 _suffix = value[-1]
76 if _suffix == 's':
77 return int(value[:-1])
78 elif _suffix == 'm':
79 return int(value[:-1])*60
80 elif _suffix == 'h':
81 return int(value[:-1])*60*60
82 else:
83 return -1
84
85
86def wait_until(end_datetime):
87 while True:
Alex3034ba52021-11-13 17:06:45 -060088 diff = (end_datetime - datetime.now(timezone.utc)).total_seconds()
Alexb78191f2021-11-02 16:35:46 -050089 # In case end_datetime was in past to begin with
90 if diff < 0:
91 return
92 sleep(diff/2)
93 if diff <= 0.1:
94 return
95
96
97class ShellThread(object):
98 def __init__(self, cmd, queue):
99 self.cmd = cmd
100 self.queue = queue
101 self._p = None
102 self.timeout = 15
103 self.output = []
104
105 def run_shell(self):
106 # Start
107 _cmd = " ".join(self.cmd)
108 logger.debug("... {}".format(_cmd))
109 self._p = subprocess.Popen(
110 _cmd,
111 shell=True,
112 stdout=subprocess.PIPE,
113 stderr=subprocess.STDOUT,
114 env={"PYTHONUNBUFFERED": "1"},
115 universal_newlines=True,
116 bufsize=1
117 )
118 self._t = threading.Thread(
119 target=output_reader,
120 args=(self._p.stdout, self.queue)
121 )
122 self._t.start()
123 if not self.wait_started():
124 self.kill_shell()
125
126 def is_alive(self):
127 if not self._p.poll():
128 return True
129 else:
130 return False
131
132 def wait_started(self):
133 while True:
134 if not self.queue.empty():
135 break
136 else:
137 logger.debug("... {} sec".format(self.timeout))
138 sleep(1)
139 self.timeout -= 1
140 if not self.timeout:
141 logger.debug(
142 "...timed out after {} sec".format(str(self.timeout))
143 )
144 return False
145 logger.debug("... got first fio output")
146 return True
147
148 def kill_shell(self):
149 # Run the poll
150 if not self._p.poll():
151 self._p.send_signal(signal.SIGINT)
152 self.get_output()
153
154 def get_output(self):
155 while True:
156 try:
157 line = self.queue.get(block=False)
158 line = str(line) if isinstance(line, bytes) else line
159 self.output.append(line)
160 except queue.Empty:
161 return self.output
162 return None
163
164
165class FioProcess(Thread):
166 # init vars for status
167 _fio_options_list = [
168 "--time_based",
169 "--output-format=json+",
170 "--eta=always"
171 ]
172 _fio_options_seq_list = [
173 "--thread"
174 ]
175
Alex5cace3b2021-11-10 16:40:37 -0600176 _fio_options_common = fio_options_common
177 _fio_options_seq = fio_options_seq
178 _fio_options_mix = fio_options_mix
Alexb78191f2021-11-02 16:35:46 -0500179
180 eta_sec = 0
181 total_time_sec = 0
182 elapsed_sec = 0
183 testrun = {}
184
Alex5cace3b2021-11-10 16:40:37 -0600185 mount_point = "/cephvol"
Alexb78191f2021-11-02 16:35:46 -0500186 filename = "testfile"
187
188 # test modes: 'randread', 'randwrite', 'read', 'write', 'randrw'
189 mode = "randrw"
190 _seq_modes = ['read', 'write']
191 _mix_modes = ['randrw']
192 _rand_modes = ['randread', 'randwrite']
193
194 # results
195 results = {}
196
197 def _shell(self, cmd):
198 self._code, self._shell_output = piped_shell(cmd, code=True)
199 if self._code:
200 logger.error(
201 "# Shell error for '{}': [{}] {}".format(
202 cmd,
203 self._code,
204 self._shell_output
205 )
206 )
207 return False
208 else:
209 return True
210
211 def recalc_times(self):
212 _rt = _get_seconds(self._fio_options_common["runtime"])
213 _rup = _get_seconds(self._fio_options_common["ramp_time"])
214 if not _rt:
215 raise CheckerException("invalid 'runtime': '{}'".format(_rt))
216 elif not _rup:
217 raise CheckerException("invalid 'ramp_time': '{}'".format(_rt))
218
219 self.total_time_sec = _rt + _rup
220 self.eta_sec = self.total_time_sec
221
222 def __init__(self):
223 Thread.__init__(self)
224 logger.info("fio thread initialized")
225 # save system
226 self.system = system()
227 self.release = release()
228 self.hostname = node()
229 # create a clear var for last shell output
230 self._shell_output = ""
231 # prepare params
232 self.recalc_times()
233 # prepare the fio
234 self.fio_version = "unknown"
235 if not self._shell("fio --version"):
236 raise CheckerException(
237 "Error running fio: '{}'".format(self._shell_output)
238 )
239 else:
240 self.fio_version = self._shell_output
241 # all outputs timeline
242 self.timeline = {}
Alex5cace3b2021-11-10 16:40:37 -0600243 # setup target file
244 if not os.path.exists(self.mount_point):
245 logger.warning(
246 "WARNING: '{}' not exists, using tmp folder".format(
247 self.mount_point
248 )
249 )
250 self.mount_point = "/tmp"
Alexb78191f2021-11-02 16:35:46 -0500251 self._fio_options_common["filename"] = os.path.join(
252 self.mount_point,
253 self.filename
254 )
255
256 if self.system == "Darwin":
257 self._fio_options_common["ioengine"] = "posixaio"
258 # Thread finish marker
259 self.finished = False
Alexb2129542021-11-23 15:49:42 -0600260 self.testrun_starttime = None
Alexb78191f2021-11-02 16:35:46 -0500261 self.scheduled_datetime = None
262
263 def update_options(self, _dict):
264 # validate keys, do not validate numbers themselves
265 for k, v in _dict.items():
266 if k in self._fio_options_mix:
267 self._fio_options_mix[k] = v
268 elif k in self._fio_options_seq:
269 self._fio_options_seq[k] = v
270 elif k in self._fio_options_common:
271 self._fio_options_common[k] = v
272 else:
273 raise CheckerException(
Alex2a7657c2021-11-10 20:51:34 -0600274 "Unknown option: '{}': '{}'".format(k, v)
Alexb78191f2021-11-02 16:35:46 -0500275 )
276 # recalc
277 self.recalc_times()
278
279 def run(self):
280 def _cut(_list, _s, _e):
281 _new = _list[_s:_e]
282 _pre = _list[:_s]
283 _list = _pre + _list[_e:]
284 return (_new, _list)
285
286 # create a cmd
287 _cmd = ["fio"]
288 _cmd += self._fio_options_list
289 _cmd += [_o(k, v) for k, v in self._fio_options_common.items()]
290
291 if self._fio_options_common["readwrite"] in self._seq_modes:
292 _sq = self._fio_options_seq_list
293 _cmd += _sq + [_o(k, v) for k, v in self._fio_options_seq.items()]
294 elif self._fio_options_common["readwrite"] in self._mix_modes:
295 _cmd += [_o(k, v) for k, v in self._fio_options_mix.items()]
296
297 _q = queue.Queue()
298 self.fiorun = ShellThread(_cmd, _q)
299 # Check if schedule is set
Alexb2129542021-11-23 15:49:42 -0600300 _now = datetime.now(timezone.utc)
Alexb78191f2021-11-02 16:35:46 -0500301 if self.scheduled_datetime:
302 logger.debug(
303 "waiting for '{}', now is '{}', total of {} sec left".format(
304 self.scheduled_datetime.strftime(_datetime_fmt),
Alex3034ba52021-11-13 17:06:45 -0600305 _now.strftime(_datetime_fmt),
306 (self.scheduled_datetime - _now).total_seconds()
Alexb78191f2021-11-02 16:35:46 -0500307 )
308 )
309 wait_until(self.scheduled_datetime)
Alexb2129542021-11-23 15:49:42 -0600310 else:
311 self.testrun_starttime = _now.strftime(_datetime_fmt)
Alexb78191f2021-11-02 16:35:46 -0500312 self.fiorun.run_shell()
313 _raw = []
314 _start = -1
315 _end = -1
316 while self.fiorun.is_alive() or not _q.empty():
317 while not _q.empty():
318 # processing
319 _bb = _q.get(block=False)
320 if isinstance(_bb, bytes):
321 _line = _bb.decode('utf-8')
322 else:
323 _line = _bb
324 if _start < 0 and _end < 0 and not _line.startswith("{"):
325 _time = get_time()
Alex2a7657c2021-11-10 20:51:34 -0600326 self.results[_time] = {
Alexb78191f2021-11-02 16:35:46 -0500327 "error": _line
328 }
329 self.eta = -1
330 self.fiorun.kill_shell()
Alex2a7657c2021-11-10 20:51:34 -0600331 self.finished = True
Alexb78191f2021-11-02 16:35:46 -0500332 return
333 _current = _line.splitlines()
334 _raw += _current
335 for ll in range(len(_raw)):
336 if _start < 0 and _raw[ll] == "{":
337 _start = ll
338 elif _end < 0 and _raw[ll] == "}":
339 _end = ll
340 # loop until we have full json
341 if _end < 0 or _start < 0:
342 continue
343 # if start and and found, cut json
344 (_json, _raw) = _cut(_raw, _start, _end+1)
345 _start = -1
346 _end = -1
347 # Try to parse json
348 _json = "\n".join(_json)
349 try:
350 _json = json.loads(_json)
351 _timestamp = _json["timestamp"]
352 self.timeline[_timestamp] = _json["jobs"][0]
353
354 # save last values
355 self.eta_sec = self.timeline[_timestamp]["eta"]
356 self.elapsed_sec = self.timeline[_timestamp]["elapsed"]
357 self.testrun = _json
358 except TypeError as e:
359 logger.error("ERROR: {}".format(e))
360 except json.decoder.JSONDecodeError as e:
361 logger.error("ERROR: {}".format(e))
362 if not self.eta_sec:
363 break
364 sleep(0.1)
365 # Save status to results dictionary
Alexb2129542021-11-23 15:49:42 -0600366 self.results[self.testrun_starttime] = {
Alexb78191f2021-11-02 16:35:46 -0500367 "result": self.testrun,
368 "timeline": self.timeline
369 }
370 self.finished = True
Alexb2129542021-11-23 15:49:42 -0600371 self.scheduled_datetime = None
372 self.testrun_starttime = None
Alexb78191f2021-11-02 16:35:46 -0500373 return
374
375 def healthcheck(self):
376 _version = self.fio_version
377 _binary_path = self._shell_output if self._shell("which fio") else ""
378 if self._shell("fio --enghelp"):
379 _ioengines = self._shell_output
380 _ioengines = _ioengines.replace("\t", "")
381 _ioengines = _ioengines.splitlines()[1:]
Alex2a7657c2021-11-10 20:51:34 -0600382 self._shell_output = ""
Alexb78191f2021-11-02 16:35:46 -0500383 else:
384 _ioengines = []
385
386 return {
387 "ready": all((_version, _binary_path, _ioengines)),
388 "version": _version,
389 "path": _binary_path,
390 "ioengines": _ioengines,
391 "system": self.system,
392 "release": self.release,
393 "hostname": self.hostname
394 }
395
396 def status(self):
397 _running = self.is_alive() and self.eta_sec >= 0
398 _scheduled = False
399 _diff = -1
400 if self.scheduled_datetime:
Alex3034ba52021-11-13 17:06:45 -0600401 _now = datetime.now(timezone.utc)
402 _diff = (self.scheduled_datetime - _now).total_seconds()
Alexb78191f2021-11-02 16:35:46 -0500403 if _diff > 0:
404 _scheduled = True
405 _s = "running" if _running else "idle"
406 _s = "scheduled" if _scheduled else _s
407 _s = "finished" if self.finished else _s
408 return {
409 "status": _s,
410 "progress": self.get_progress()
411 }
412
413 def end_fio(self):
414 if self.fiorun:
415 self.fiorun.kill_shell()
416
417 # Current run
418 def percent_done(self):
419 _total = self.elapsed_sec + self.eta_sec
420 return float(self.elapsed_sec) / float(_total) * 100.0
421
422 def get_progress(self):
423 return "{:.2f}".format(self.percent_done())
424
425 # latest parsed measurements
426 def get_last_measurements(self):
427 if self.timeline:
428 return self.timeline[max(list(self.timeline.keys()))]
429 else:
430 return {}
431
432
433class FioProcessShellRun(object):
434 stats = {}
435 results = {}
436
437 def __init__(self, init_class=FioProcess):
438 self.init_class = init_class
439 self.actions = {
440 "do_singlerun": self.do_singlerun,
441 "do_scheduledrun": self.do_scheduledrun,
442 "get_options": self.get_options,
443 "get_result": self.get_result,
444 "get_resultlist": self.get_resultlist
445 }
446 self.fio_reset()
447
448 @staticmethod
449 def healthcheck(fio):
450 hchk = fio.healthcheck()
451 hchk_str = \
452 "# fio status: {}\n# {} at {}\n# Engines: {}".format(
453 "ready" if hchk["ready"] else "fail",
454 hchk["version"],
455 hchk["path"],
456 ", ".join(hchk["ioengines"])
457 )
458 return hchk, hchk_str
459
460 def status(self):
461 return self.fio.status()
462
463 def fio_reset(self):
464 # Fancy way of handling fio class not even initialized yet
465 try:
466 _f = self.fio.finished
467 _r = self.fio.results
468 _o = self.fio.get_options()
469 except AttributeError:
470 _f = True
471 _r = None
472 _o = None
473 # Check if reset is needed
474 if not _f:
475 # No need to reset, fio is either idle or running
476 return
477 else:
478 # extract results if they present
479 if _r:
480 self.results.update(_r)
481 # re-init
482 _fio = self.init_class()
483 # Do healthcheck
484 self.hchk, self.hchk_str = self.healthcheck(_fio)
485 # restore options if they existed
486 if _o:
487 _fio.update_options(_o)
488 self.fio = _fio
489
490 def get_options(self):
491 _opts = deepcopy(self.fio._fio_options_common)
492 _opts.update(deepcopy(self.fio._fio_options_seq))
493 _opts.update(deepcopy(self.fio._fio_options_mix))
494 return _opts
495
496 def do_singlerun(self, options):
497 # Reset thread if it closed
498 self.fio_reset()
499 # Fill options
Alex2a7657c2021-11-10 20:51:34 -0600500 if "scheduled_to" in options:
501 # just ignore it
502 _k = "scheduled_to"
503 _v = options.pop(_k)
504 logger.warning("Ignoring option: '{}': '{}'".format(_k, _v))
Alexb78191f2021-11-02 16:35:46 -0500505 self.fio.update_options(options)
506 # Start it
507 self.fio.start()
508 return True
509
510 def do_scheduledrun(self, options):
511 # Reset thread if it closed
512 self.fio_reset()
513 # Handle scheduled time
514 if "scheduled_to" not in options:
515 # required parameter not set
Alex2a7657c2021-11-10 20:51:34 -0600516 raise CheckerException("Parameter missing: 'scheduled_to'")
Alexb78191f2021-11-02 16:35:46 -0500517 else:
518 # set time and get rid of it from options
Alexb2129542021-11-23 15:49:42 -0600519 self.fio.testrun_starttime = options.pop("scheduled_to")
Alex5cace3b2021-11-10 16:40:37 -0600520 self.fio.scheduled_datetime = datetime.strptime(
Alexb2129542021-11-23 15:49:42 -0600521 self.fio.testrun_starttime,
Alex5cace3b2021-11-10 16:40:37 -0600522 _datetime_fmt
523 )
Alexb78191f2021-11-02 16:35:46 -0500524 # Fill options
525 self.fio.update_options(options)
526 # Start it
527 self.fio.start()
528 return True
529
530 def _get_result_object(self, obj_name, time):
531 if time in self.results:
532 if obj_name in self.results[time]:
533 return self.results[time][obj_name]
534 elif "error" in self.results[time]:
535 return self.results[time]["error"]
536 else:
537 return {
538 "error": "Empty {} for '{}'".format(obj_name, time)
539 }
540 else:
541 return {
542 "error": "Result not found for '{}'".format(time)
543 }
544
545 def _update_results(self):
546 # Update only in case of completed thread
547 if self.fio.finished:
548 _r_local = list(self.results.keys())
549 _r_fio = list(self.fio.results.keys())
550 for _r in _r_fio:
551 if _r not in _r_local:
552 self.results[_r] = self.fio.results.pop(_r)
553
554 def get_result(self, time):
555 self._update_results()
556 return self._get_result_object('result', time)
557
558 def get_result_timeline(self, time):
559 self._update_results()
560 return self._get_result_object('timeline', time)
561
562 # reporting
563 def get_resultlist(self):
564 self._update_results()
565 return list(self.results.keys())
566
567 def __call__(self):
568 if not self.fio.is_alive() and not self.fio.finished:
569 self.fio.start()
570
571 while self.fio.is_alive() and self.fio.eta_sec >= 0:
572 sleep(0.2)
573 self.stats = self.fio.get_last_measurements()
574
575 _r = self.stats.get('read', {})
576 _w = self.stats.get('write', {})
577
578 _r_bw = _r.get('bw_bytes', -1)
579 _r_iops = _r.get('iops', -1)
580 _w_bw = _w.get('bw_bytes', -1)
581 _w_iops = _w.get('iops', -1)
582 _s = self.fio.status()
583 if _s["status"] == "scheduled":
584 _t = self.fio.scheduled_datetime
Alex3034ba52021-11-13 17:06:45 -0600585 _n = datetime.now(timezone.utc)
Alexb78191f2021-11-02 16:35:46 -0500586 _delta = (_t - _n).total_seconds()
587 print(
Alex3034ba52021-11-13 17:06:45 -0600588 "{}: waiting for '{}'; now '{}'; {} sec left".format(
589 _s["status"],
Alexb78191f2021-11-02 16:35:46 -0500590 _t.strftime(_datetime_fmt),
591 _n.strftime(_datetime_fmt),
592 _delta
593 )
594 )
595 else:
596 stats = "{}: {:>7}% ({}/{}) " \
597 "(BW/IOPS: " \
598 "Read {:>9.2f} MB/{:>9.2f} " \
599 "Write {:>9.2f} MB/{:>9.2f})".format(
600 _s["status"],
601 _s["progress"],
602 self.fio.elapsed_sec,
603 self.fio.eta_sec,
604 _r_bw / 1024 / 1024,
605 _r_iops,
606 _w_bw / 1024 / 1024,
607 _w_iops
608 )
609 print(stats)
610 self.fio.end_fio()
611
612
613if __name__ == '__main__':
614 # Debug shell to test FioProcessShellRun
615 _shell = FioProcessShellRun()
616 _opts = _shell.get_options()
617 _opts["readwrite"] = "read"
618 _opts["ramp_time"] = "1s"
619 _opts["runtime"] = "5s"
Alexb2129542021-11-23 15:49:42 -0600620 _opts["scheduled_to"] = "11/23/2021, 21:48:20+0000"
Alex3034ba52021-11-13 17:06:45 -0600621 _shell.do_scheduledrun(_opts)
Alexb78191f2021-11-02 16:35:46 -0500622 _shell()
623 _times = _shell.get_resultlist()
624 print("# results:\n{}".format("\n".join(_times)))
625 # print(
626 # "##### Dumping results\n{}".format(
627 # json.dumps(_shell.get_result(_times[0]), indent=2)
628 # )
629 # )
630 _shell.fio_reset()
631 _opts = _shell.get_options()
632 _opts["readwrite"] = "read"
633 _opts["ramp_time"] = "1s"
634 _opts["runtime"] = "10s"
Alexb2129542021-11-23 15:49:42 -0600635 # _opts["scheduled_to"] = "11/23/2021, 21:40:30+0000"
636 _shell.do_singlerun(_opts)
Alexb78191f2021-11-02 16:35:46 -0500637 _shell()
638 _times = _shell.get_resultlist()
639 print("# results:\n{}".format("\n".join(_times)))