blob: c8488afad5e11155be22861bf8dd56411a09c90a [file] [log] [blame]
Alexb78191f2021-11-02 16:35:46 -05001import json
2import os
3import queue
4import signal
5import subprocess
6
7
8from copy import deepcopy
Alex3034ba52021-11-13 17:06:45 -06009from datetime import datetime, timezone
Alexb78191f2021-11-02 16:35:46 -050010from platform import system, release, node
11from threading import Thread
12import threading
13from time import sleep
14
15
16from cfg_checker.common.exception import CheckerException
17from cfg_checker.common.other import piped_shell
18from cfg_checker.common.log import logger
19
20
Alex3034ba52021-11-13 17:06:45 -060021_datetime_fmt = "%m/%d/%Y, %H:%M:%S%z"
Alex5cace3b2021-11-10 16:40:37 -060022fio_options_common = {
23 "name": "agent_run",
24 "filename": "/cephvol/testfile",
25 "status-interval": "500ms",
26 "randrepeat": 0,
27 "verify": 0,
28 "direct": 1,
29 "gtod_reduce": 0,
30 "bs": "32k",
31 "iodepth": 16,
32 "size": "10G",
33 "readwrite": "randrw",
34 "ramp_time": "5s",
35 "runtime": "30s",
36 "ioengine": "libaio"
37}
38
39fio_options_seq = {
40 "numjobs": 1,
41 "offset_increment": "500M"
42}
43fio_options_mix = {
44 "rwmixread": 50
45}
46
47
48def get_fio_options():
49 # Duplicate function for external option access
50 _opts = deepcopy(fio_options_common)
51 _opts.update(deepcopy(fio_options_seq))
52 _opts.update(deepcopy(fio_options_mix))
53 return _opts
Alexb78191f2021-11-02 16:35:46 -050054
55
56def output_reader(_stdout, outq):
57 for line in iter(_stdout.readline, ''):
58 outq.put(line)
59
60
61def _o(option, param, suffix=""):
62 return "--{}={}{}".format(option, param, suffix)
63
64
65def get_time(timestamp=None):
66 if not timestamp:
Alex3034ba52021-11-13 17:06:45 -060067 _t = datetime.now(timezone.utc)
Alexb78191f2021-11-02 16:35:46 -050068 else:
69 _t = datetime.fromtimestamp(timestamp)
70 return _t.strftime(_datetime_fmt)
71
72
73def _get_seconds(value):
74 # assume that we have symbol at the end
75 _suffix = value[-1]
76 if _suffix == 's':
77 return int(value[:-1])
78 elif _suffix == 'm':
79 return int(value[:-1])*60
80 elif _suffix == 'h':
81 return int(value[:-1])*60*60
82 else:
83 return -1
84
85
86def wait_until(end_datetime):
87 while True:
Alex3034ba52021-11-13 17:06:45 -060088 diff = (end_datetime - datetime.now(timezone.utc)).total_seconds()
Alexb78191f2021-11-02 16:35:46 -050089 # In case end_datetime was in past to begin with
90 if diff < 0:
91 return
92 sleep(diff/2)
93 if diff <= 0.1:
94 return
95
96
97class ShellThread(object):
98 def __init__(self, cmd, queue):
99 self.cmd = cmd
100 self.queue = queue
101 self._p = None
102 self.timeout = 15
103 self.output = []
104
105 def run_shell(self):
106 # Start
107 _cmd = " ".join(self.cmd)
108 logger.debug("... {}".format(_cmd))
109 self._p = subprocess.Popen(
110 _cmd,
111 shell=True,
112 stdout=subprocess.PIPE,
113 stderr=subprocess.STDOUT,
114 env={"PYTHONUNBUFFERED": "1"},
115 universal_newlines=True,
116 bufsize=1
117 )
118 self._t = threading.Thread(
119 target=output_reader,
120 args=(self._p.stdout, self.queue)
121 )
122 self._t.start()
123 if not self.wait_started():
124 self.kill_shell()
125
126 def is_alive(self):
127 if not self._p.poll():
128 return True
129 else:
130 return False
131
132 def wait_started(self):
133 while True:
134 if not self.queue.empty():
135 break
136 else:
137 logger.debug("... {} sec".format(self.timeout))
138 sleep(1)
139 self.timeout -= 1
140 if not self.timeout:
141 logger.debug(
142 "...timed out after {} sec".format(str(self.timeout))
143 )
144 return False
145 logger.debug("... got first fio output")
146 return True
147
148 def kill_shell(self):
149 # Run the poll
150 if not self._p.poll():
151 self._p.send_signal(signal.SIGINT)
152 self.get_output()
153
154 def get_output(self):
155 while True:
156 try:
157 line = self.queue.get(block=False)
158 line = str(line) if isinstance(line, bytes) else line
159 self.output.append(line)
160 except queue.Empty:
161 return self.output
162 return None
163
164
165class FioProcess(Thread):
166 # init vars for status
167 _fio_options_list = [
168 "--time_based",
169 "--output-format=json+",
170 "--eta=always"
171 ]
172 _fio_options_seq_list = [
173 "--thread"
174 ]
175
Alex5cace3b2021-11-10 16:40:37 -0600176 _fio_options_common = fio_options_common
177 _fio_options_seq = fio_options_seq
178 _fio_options_mix = fio_options_mix
Alexb78191f2021-11-02 16:35:46 -0500179
180 eta_sec = 0
181 total_time_sec = 0
182 elapsed_sec = 0
183 testrun = {}
184
Alex5cace3b2021-11-10 16:40:37 -0600185 mount_point = "/cephvol"
Alexb78191f2021-11-02 16:35:46 -0500186 filename = "testfile"
187
188 # test modes: 'randread', 'randwrite', 'read', 'write', 'randrw'
189 mode = "randrw"
190 _seq_modes = ['read', 'write']
191 _mix_modes = ['randrw']
192 _rand_modes = ['randread', 'randwrite']
193
194 # results
195 results = {}
196
197 def _shell(self, cmd):
198 self._code, self._shell_output = piped_shell(cmd, code=True)
199 if self._code:
200 logger.error(
201 "# Shell error for '{}': [{}] {}".format(
202 cmd,
203 self._code,
204 self._shell_output
205 )
206 )
207 return False
208 else:
209 return True
210
211 def recalc_times(self):
212 _rt = _get_seconds(self._fio_options_common["runtime"])
213 _rup = _get_seconds(self._fio_options_common["ramp_time"])
214 if not _rt:
215 raise CheckerException("invalid 'runtime': '{}'".format(_rt))
216 elif not _rup:
217 raise CheckerException("invalid 'ramp_time': '{}'".format(_rt))
218
219 self.total_time_sec = _rt + _rup
220 self.eta_sec = self.total_time_sec
221
222 def __init__(self):
223 Thread.__init__(self)
224 logger.info("fio thread initialized")
225 # save system
226 self.system = system()
227 self.release = release()
228 self.hostname = node()
229 # create a clear var for last shell output
230 self._shell_output = ""
231 # prepare params
232 self.recalc_times()
233 # prepare the fio
234 self.fio_version = "unknown"
235 if not self._shell("fio --version"):
236 raise CheckerException(
237 "Error running fio: '{}'".format(self._shell_output)
238 )
239 else:
240 self.fio_version = self._shell_output
241 # all outputs timeline
242 self.timeline = {}
Alex5cace3b2021-11-10 16:40:37 -0600243 # setup target file
244 if not os.path.exists(self.mount_point):
245 logger.warning(
246 "WARNING: '{}' not exists, using tmp folder".format(
247 self.mount_point
248 )
249 )
250 self.mount_point = "/tmp"
Alexb78191f2021-11-02 16:35:46 -0500251 self._fio_options_common["filename"] = os.path.join(
252 self.mount_point,
253 self.filename
254 )
255
256 if self.system == "Darwin":
257 self._fio_options_common["ioengine"] = "posixaio"
258 # Thread finish marker
259 self.finished = False
260 self.scheduled_datetime = None
261
262 def update_options(self, _dict):
263 # validate keys, do not validate numbers themselves
264 for k, v in _dict.items():
265 if k in self._fio_options_mix:
266 self._fio_options_mix[k] = v
267 elif k in self._fio_options_seq:
268 self._fio_options_seq[k] = v
269 elif k in self._fio_options_common:
270 self._fio_options_common[k] = v
271 else:
272 raise CheckerException(
Alex2a7657c2021-11-10 20:51:34 -0600273 "Unknown option: '{}': '{}'".format(k, v)
Alexb78191f2021-11-02 16:35:46 -0500274 )
275 # recalc
276 self.recalc_times()
277
278 def run(self):
279 def _cut(_list, _s, _e):
280 _new = _list[_s:_e]
281 _pre = _list[:_s]
282 _list = _pre + _list[_e:]
283 return (_new, _list)
284
285 # create a cmd
286 _cmd = ["fio"]
287 _cmd += self._fio_options_list
288 _cmd += [_o(k, v) for k, v in self._fio_options_common.items()]
289
290 if self._fio_options_common["readwrite"] in self._seq_modes:
291 _sq = self._fio_options_seq_list
292 _cmd += _sq + [_o(k, v) for k, v in self._fio_options_seq.items()]
293 elif self._fio_options_common["readwrite"] in self._mix_modes:
294 _cmd += [_o(k, v) for k, v in self._fio_options_mix.items()]
295
296 _q = queue.Queue()
297 self.fiorun = ShellThread(_cmd, _q)
298 # Check if schedule is set
299 if self.scheduled_datetime:
Alex3034ba52021-11-13 17:06:45 -0600300 _now = datetime.now(timezone.utc)
Alexb78191f2021-11-02 16:35:46 -0500301 logger.debug(
302 "waiting for '{}', now is '{}', total of {} sec left".format(
303 self.scheduled_datetime.strftime(_datetime_fmt),
Alex3034ba52021-11-13 17:06:45 -0600304 _now.strftime(_datetime_fmt),
305 (self.scheduled_datetime - _now).total_seconds()
Alexb78191f2021-11-02 16:35:46 -0500306 )
307 )
308 wait_until(self.scheduled_datetime)
309 self.fiorun.run_shell()
310 _raw = []
311 _start = -1
312 _end = -1
313 while self.fiorun.is_alive() or not _q.empty():
314 while not _q.empty():
315 # processing
316 _bb = _q.get(block=False)
317 if isinstance(_bb, bytes):
318 _line = _bb.decode('utf-8')
319 else:
320 _line = _bb
321 if _start < 0 and _end < 0 and not _line.startswith("{"):
322 _time = get_time()
Alex2a7657c2021-11-10 20:51:34 -0600323 self.results[_time] = {
Alexb78191f2021-11-02 16:35:46 -0500324 "error": _line
325 }
326 self.eta = -1
327 self.fiorun.kill_shell()
Alex2a7657c2021-11-10 20:51:34 -0600328 self.finished = True
Alexb78191f2021-11-02 16:35:46 -0500329 return
330 _current = _line.splitlines()
331 _raw += _current
332 for ll in range(len(_raw)):
333 if _start < 0 and _raw[ll] == "{":
334 _start = ll
335 elif _end < 0 and _raw[ll] == "}":
336 _end = ll
337 # loop until we have full json
338 if _end < 0 or _start < 0:
339 continue
340 # if start and and found, cut json
341 (_json, _raw) = _cut(_raw, _start, _end+1)
342 _start = -1
343 _end = -1
344 # Try to parse json
345 _json = "\n".join(_json)
346 try:
347 _json = json.loads(_json)
348 _timestamp = _json["timestamp"]
349 self.timeline[_timestamp] = _json["jobs"][0]
350
351 # save last values
352 self.eta_sec = self.timeline[_timestamp]["eta"]
353 self.elapsed_sec = self.timeline[_timestamp]["elapsed"]
354 self.testrun = _json
355 except TypeError as e:
356 logger.error("ERROR: {}".format(e))
357 except json.decoder.JSONDecodeError as e:
358 logger.error("ERROR: {}".format(e))
359 if not self.eta_sec:
360 break
361 sleep(0.1)
362 # Save status to results dictionary
363 self.results[get_time(timestamp=self.testrun["timestamp"])] = {
364 "result": self.testrun,
365 "timeline": self.timeline
366 }
367 self.finished = True
368 return
369
370 def healthcheck(self):
371 _version = self.fio_version
372 _binary_path = self._shell_output if self._shell("which fio") else ""
373 if self._shell("fio --enghelp"):
374 _ioengines = self._shell_output
375 _ioengines = _ioengines.replace("\t", "")
376 _ioengines = _ioengines.splitlines()[1:]
Alex2a7657c2021-11-10 20:51:34 -0600377 self._shell_output = ""
Alexb78191f2021-11-02 16:35:46 -0500378 else:
379 _ioengines = []
380
381 return {
382 "ready": all((_version, _binary_path, _ioengines)),
383 "version": _version,
384 "path": _binary_path,
385 "ioengines": _ioengines,
386 "system": self.system,
387 "release": self.release,
388 "hostname": self.hostname
389 }
390
391 def status(self):
392 _running = self.is_alive() and self.eta_sec >= 0
393 _scheduled = False
394 _diff = -1
395 if self.scheduled_datetime:
Alex3034ba52021-11-13 17:06:45 -0600396 _now = datetime.now(timezone.utc)
397 _diff = (self.scheduled_datetime - _now).total_seconds()
Alexb78191f2021-11-02 16:35:46 -0500398 if _diff > 0:
399 _scheduled = True
400 _s = "running" if _running else "idle"
401 _s = "scheduled" if _scheduled else _s
402 _s = "finished" if self.finished else _s
403 return {
404 "status": _s,
405 "progress": self.get_progress()
406 }
407
408 def end_fio(self):
409 if self.fiorun:
410 self.fiorun.kill_shell()
411
412 # Current run
413 def percent_done(self):
414 _total = self.elapsed_sec + self.eta_sec
415 return float(self.elapsed_sec) / float(_total) * 100.0
416
417 def get_progress(self):
418 return "{:.2f}".format(self.percent_done())
419
420 # latest parsed measurements
421 def get_last_measurements(self):
422 if self.timeline:
423 return self.timeline[max(list(self.timeline.keys()))]
424 else:
425 return {}
426
427
428class FioProcessShellRun(object):
429 stats = {}
430 results = {}
431
432 def __init__(self, init_class=FioProcess):
433 self.init_class = init_class
434 self.actions = {
435 "do_singlerun": self.do_singlerun,
436 "do_scheduledrun": self.do_scheduledrun,
437 "get_options": self.get_options,
438 "get_result": self.get_result,
439 "get_resultlist": self.get_resultlist
440 }
441 self.fio_reset()
442
443 @staticmethod
444 def healthcheck(fio):
445 hchk = fio.healthcheck()
446 hchk_str = \
447 "# fio status: {}\n# {} at {}\n# Engines: {}".format(
448 "ready" if hchk["ready"] else "fail",
449 hchk["version"],
450 hchk["path"],
451 ", ".join(hchk["ioengines"])
452 )
453 return hchk, hchk_str
454
455 def status(self):
456 return self.fio.status()
457
458 def fio_reset(self):
459 # Fancy way of handling fio class not even initialized yet
460 try:
461 _f = self.fio.finished
462 _r = self.fio.results
463 _o = self.fio.get_options()
464 except AttributeError:
465 _f = True
466 _r = None
467 _o = None
468 # Check if reset is needed
469 if not _f:
470 # No need to reset, fio is either idle or running
471 return
472 else:
473 # extract results if they present
474 if _r:
475 self.results.update(_r)
476 # re-init
477 _fio = self.init_class()
478 # Do healthcheck
479 self.hchk, self.hchk_str = self.healthcheck(_fio)
480 # restore options if they existed
481 if _o:
482 _fio.update_options(_o)
483 self.fio = _fio
484
485 def get_options(self):
486 _opts = deepcopy(self.fio._fio_options_common)
487 _opts.update(deepcopy(self.fio._fio_options_seq))
488 _opts.update(deepcopy(self.fio._fio_options_mix))
489 return _opts
490
491 def do_singlerun(self, options):
492 # Reset thread if it closed
493 self.fio_reset()
494 # Fill options
Alex2a7657c2021-11-10 20:51:34 -0600495 if "scheduled_to" in options:
496 # just ignore it
497 _k = "scheduled_to"
498 _v = options.pop(_k)
499 logger.warning("Ignoring option: '{}': '{}'".format(_k, _v))
Alexb78191f2021-11-02 16:35:46 -0500500 self.fio.update_options(options)
501 # Start it
502 self.fio.start()
503 return True
504
505 def do_scheduledrun(self, options):
506 # Reset thread if it closed
507 self.fio_reset()
508 # Handle scheduled time
509 if "scheduled_to" not in options:
510 # required parameter not set
Alex2a7657c2021-11-10 20:51:34 -0600511 raise CheckerException("Parameter missing: 'scheduled_to'")
Alexb78191f2021-11-02 16:35:46 -0500512 else:
513 # set time and get rid of it from options
Alex5cace3b2021-11-10 16:40:37 -0600514 _time = options.pop("scheduled_to")
515 self.fio.scheduled_datetime = datetime.strptime(
516 _time,
517 _datetime_fmt
518 )
Alexb78191f2021-11-02 16:35:46 -0500519 # Fill options
520 self.fio.update_options(options)
521 # Start it
522 self.fio.start()
523 return True
524
525 def _get_result_object(self, obj_name, time):
526 if time in self.results:
527 if obj_name in self.results[time]:
528 return self.results[time][obj_name]
529 elif "error" in self.results[time]:
530 return self.results[time]["error"]
531 else:
532 return {
533 "error": "Empty {} for '{}'".format(obj_name, time)
534 }
535 else:
536 return {
537 "error": "Result not found for '{}'".format(time)
538 }
539
540 def _update_results(self):
541 # Update only in case of completed thread
542 if self.fio.finished:
543 _r_local = list(self.results.keys())
544 _r_fio = list(self.fio.results.keys())
545 for _r in _r_fio:
546 if _r not in _r_local:
547 self.results[_r] = self.fio.results.pop(_r)
548
549 def get_result(self, time):
550 self._update_results()
551 return self._get_result_object('result', time)
552
553 def get_result_timeline(self, time):
554 self._update_results()
555 return self._get_result_object('timeline', time)
556
557 # reporting
558 def get_resultlist(self):
559 self._update_results()
560 return list(self.results.keys())
561
562 def __call__(self):
563 if not self.fio.is_alive() and not self.fio.finished:
564 self.fio.start()
565
566 while self.fio.is_alive() and self.fio.eta_sec >= 0:
567 sleep(0.2)
568 self.stats = self.fio.get_last_measurements()
569
570 _r = self.stats.get('read', {})
571 _w = self.stats.get('write', {})
572
573 _r_bw = _r.get('bw_bytes', -1)
574 _r_iops = _r.get('iops', -1)
575 _w_bw = _w.get('bw_bytes', -1)
576 _w_iops = _w.get('iops', -1)
577 _s = self.fio.status()
578 if _s["status"] == "scheduled":
579 _t = self.fio.scheduled_datetime
Alex3034ba52021-11-13 17:06:45 -0600580 _n = datetime.now(timezone.utc)
Alexb78191f2021-11-02 16:35:46 -0500581 _delta = (_t - _n).total_seconds()
582 print(
Alex3034ba52021-11-13 17:06:45 -0600583 "{}: waiting for '{}'; now '{}'; {} sec left".format(
584 _s["status"],
Alexb78191f2021-11-02 16:35:46 -0500585 _t.strftime(_datetime_fmt),
586 _n.strftime(_datetime_fmt),
587 _delta
588 )
589 )
590 else:
591 stats = "{}: {:>7}% ({}/{}) " \
592 "(BW/IOPS: " \
593 "Read {:>9.2f} MB/{:>9.2f} " \
594 "Write {:>9.2f} MB/{:>9.2f})".format(
595 _s["status"],
596 _s["progress"],
597 self.fio.elapsed_sec,
598 self.fio.eta_sec,
599 _r_bw / 1024 / 1024,
600 _r_iops,
601 _w_bw / 1024 / 1024,
602 _w_iops
603 )
604 print(stats)
605 self.fio.end_fio()
606
607
608if __name__ == '__main__':
609 # Debug shell to test FioProcessShellRun
610 _shell = FioProcessShellRun()
611 _opts = _shell.get_options()
612 _opts["readwrite"] = "read"
613 _opts["ramp_time"] = "1s"
614 _opts["runtime"] = "5s"
Alex3034ba52021-11-13 17:06:45 -0600615 _opts["scheduled_to"] = "11/13/2021, 23:03:30+0000"
616 _shell.do_scheduledrun(_opts)
Alexb78191f2021-11-02 16:35:46 -0500617 _shell()
618 _times = _shell.get_resultlist()
619 print("# results:\n{}".format("\n".join(_times)))
620 # print(
621 # "##### Dumping results\n{}".format(
622 # json.dumps(_shell.get_result(_times[0]), indent=2)
623 # )
624 # )
625 _shell.fio_reset()
626 _opts = _shell.get_options()
627 _opts["readwrite"] = "read"
628 _opts["ramp_time"] = "1s"
629 _opts["runtime"] = "10s"
Alex3034ba52021-11-13 17:06:45 -0600630 _opts["scheduled_to"] = "11/13/2021, 23:04:20+0000"
Alexb78191f2021-11-02 16:35:46 -0500631 _shell.do_scheduledrun(_opts)
632 _shell()
633 _times = _shell.get_resultlist()
634 print("# results:\n{}".format("\n".join(_times)))