blob: 50afeca531038f63207cbaba767fe1711724253c [file] [log] [blame]
Alexb78191f2021-11-02 16:35:46 -05001import json
2import os
3import queue
4import signal
5import subprocess
6
7
8from copy import deepcopy
9from datetime import datetime, timedelta
10from platform import system, release, node
11from threading import Thread
12import threading
13from time import sleep
14
15
16from cfg_checker.common.exception import CheckerException
17from cfg_checker.common.other import piped_shell
18from cfg_checker.common.log import logger
19
20
21_datetime_fmt = "%m/%d/%Y, %H:%M:%S"
Alex5cace3b2021-11-10 16:40:37 -060022fio_options_common = {
23 "name": "agent_run",
24 "filename": "/cephvol/testfile",
25 "status-interval": "500ms",
26 "randrepeat": 0,
27 "verify": 0,
28 "direct": 1,
29 "gtod_reduce": 0,
30 "bs": "32k",
31 "iodepth": 16,
32 "size": "10G",
33 "readwrite": "randrw",
34 "ramp_time": "5s",
35 "runtime": "30s",
36 "ioengine": "libaio"
37}
38
39fio_options_seq = {
40 "numjobs": 1,
41 "offset_increment": "500M"
42}
43fio_options_mix = {
44 "rwmixread": 50
45}
46
47
48def get_fio_options():
49 # Duplicate function for external option access
50 _opts = deepcopy(fio_options_common)
51 _opts.update(deepcopy(fio_options_seq))
52 _opts.update(deepcopy(fio_options_mix))
53 return _opts
Alexb78191f2021-11-02 16:35:46 -050054
55
56def output_reader(_stdout, outq):
57 for line in iter(_stdout.readline, ''):
58 outq.put(line)
59
60
61def _o(option, param, suffix=""):
62 return "--{}={}{}".format(option, param, suffix)
63
64
65def get_time(timestamp=None):
66 if not timestamp:
67 _t = datetime.now()
68 else:
69 _t = datetime.fromtimestamp(timestamp)
70 return _t.strftime(_datetime_fmt)
71
72
73def _get_seconds(value):
74 # assume that we have symbol at the end
75 _suffix = value[-1]
76 if _suffix == 's':
77 return int(value[:-1])
78 elif _suffix == 'm':
79 return int(value[:-1])*60
80 elif _suffix == 'h':
81 return int(value[:-1])*60*60
82 else:
83 return -1
84
85
86def wait_until(end_datetime):
87 while True:
88 diff = (end_datetime - datetime.now()).total_seconds()
89 # In case end_datetime was in past to begin with
90 if diff < 0:
91 return
92 sleep(diff/2)
93 if diff <= 0.1:
94 return
95
96
97class ShellThread(object):
98 def __init__(self, cmd, queue):
99 self.cmd = cmd
100 self.queue = queue
101 self._p = None
102 self.timeout = 15
103 self.output = []
104
105 def run_shell(self):
106 # Start
107 _cmd = " ".join(self.cmd)
108 logger.debug("... {}".format(_cmd))
109 self._p = subprocess.Popen(
110 _cmd,
111 shell=True,
112 stdout=subprocess.PIPE,
113 stderr=subprocess.STDOUT,
114 env={"PYTHONUNBUFFERED": "1"},
115 universal_newlines=True,
116 bufsize=1
117 )
118 self._t = threading.Thread(
119 target=output_reader,
120 args=(self._p.stdout, self.queue)
121 )
122 self._t.start()
123 if not self.wait_started():
124 self.kill_shell()
125
126 def is_alive(self):
127 if not self._p.poll():
128 return True
129 else:
130 return False
131
132 def wait_started(self):
133 while True:
134 if not self.queue.empty():
135 break
136 else:
137 logger.debug("... {} sec".format(self.timeout))
138 sleep(1)
139 self.timeout -= 1
140 if not self.timeout:
141 logger.debug(
142 "...timed out after {} sec".format(str(self.timeout))
143 )
144 return False
145 logger.debug("... got first fio output")
146 return True
147
148 def kill_shell(self):
149 # Run the poll
150 if not self._p.poll():
151 self._p.send_signal(signal.SIGINT)
152 self.get_output()
153
154 def get_output(self):
155 while True:
156 try:
157 line = self.queue.get(block=False)
158 line = str(line) if isinstance(line, bytes) else line
159 self.output.append(line)
160 except queue.Empty:
161 return self.output
162 return None
163
164
165class FioProcess(Thread):
166 # init vars for status
167 _fio_options_list = [
168 "--time_based",
169 "--output-format=json+",
170 "--eta=always"
171 ]
172 _fio_options_seq_list = [
173 "--thread"
174 ]
175
Alex5cace3b2021-11-10 16:40:37 -0600176 _fio_options_common = fio_options_common
177 _fio_options_seq = fio_options_seq
178 _fio_options_mix = fio_options_mix
Alexb78191f2021-11-02 16:35:46 -0500179
180 eta_sec = 0
181 total_time_sec = 0
182 elapsed_sec = 0
183 testrun = {}
184
Alex5cace3b2021-11-10 16:40:37 -0600185 mount_point = "/cephvol"
Alexb78191f2021-11-02 16:35:46 -0500186 filename = "testfile"
187
188 # test modes: 'randread', 'randwrite', 'read', 'write', 'randrw'
189 mode = "randrw"
190 _seq_modes = ['read', 'write']
191 _mix_modes = ['randrw']
192 _rand_modes = ['randread', 'randwrite']
193
194 # results
195 results = {}
196
197 def _shell(self, cmd):
198 self._code, self._shell_output = piped_shell(cmd, code=True)
199 if self._code:
200 logger.error(
201 "# Shell error for '{}': [{}] {}".format(
202 cmd,
203 self._code,
204 self._shell_output
205 )
206 )
207 return False
208 else:
209 return True
210
211 def recalc_times(self):
212 _rt = _get_seconds(self._fio_options_common["runtime"])
213 _rup = _get_seconds(self._fio_options_common["ramp_time"])
214 if not _rt:
215 raise CheckerException("invalid 'runtime': '{}'".format(_rt))
216 elif not _rup:
217 raise CheckerException("invalid 'ramp_time': '{}'".format(_rt))
218
219 self.total_time_sec = _rt + _rup
220 self.eta_sec = self.total_time_sec
221
222 def __init__(self):
223 Thread.__init__(self)
224 logger.info("fio thread initialized")
225 # save system
226 self.system = system()
227 self.release = release()
228 self.hostname = node()
229 # create a clear var for last shell output
230 self._shell_output = ""
231 # prepare params
232 self.recalc_times()
233 # prepare the fio
234 self.fio_version = "unknown"
235 if not self._shell("fio --version"):
236 raise CheckerException(
237 "Error running fio: '{}'".format(self._shell_output)
238 )
239 else:
240 self.fio_version = self._shell_output
241 # all outputs timeline
242 self.timeline = {}
Alex5cace3b2021-11-10 16:40:37 -0600243 # setup target file
244 if not os.path.exists(self.mount_point):
245 logger.warning(
246 "WARNING: '{}' not exists, using tmp folder".format(
247 self.mount_point
248 )
249 )
250 self.mount_point = "/tmp"
Alexb78191f2021-11-02 16:35:46 -0500251 self._fio_options_common["filename"] = os.path.join(
252 self.mount_point,
253 self.filename
254 )
255
256 if self.system == "Darwin":
257 self._fio_options_common["ioengine"] = "posixaio"
258 # Thread finish marker
259 self.finished = False
260 self.scheduled_datetime = None
261
262 def update_options(self, _dict):
263 # validate keys, do not validate numbers themselves
264 for k, v in _dict.items():
265 if k in self._fio_options_mix:
266 self._fio_options_mix[k] = v
267 elif k in self._fio_options_seq:
268 self._fio_options_seq[k] = v
269 elif k in self._fio_options_common:
270 self._fio_options_common[k] = v
271 else:
272 raise CheckerException(
Alex2a7657c2021-11-10 20:51:34 -0600273 "Unknown option: '{}': '{}'".format(k, v)
Alexb78191f2021-11-02 16:35:46 -0500274 )
275 # recalc
276 self.recalc_times()
277
278 def run(self):
279 def _cut(_list, _s, _e):
280 _new = _list[_s:_e]
281 _pre = _list[:_s]
282 _list = _pre + _list[_e:]
283 return (_new, _list)
284
285 # create a cmd
286 _cmd = ["fio"]
287 _cmd += self._fio_options_list
288 _cmd += [_o(k, v) for k, v in self._fio_options_common.items()]
289
290 if self._fio_options_common["readwrite"] in self._seq_modes:
291 _sq = self._fio_options_seq_list
292 _cmd += _sq + [_o(k, v) for k, v in self._fio_options_seq.items()]
293 elif self._fio_options_common["readwrite"] in self._mix_modes:
294 _cmd += [_o(k, v) for k, v in self._fio_options_mix.items()]
295
296 _q = queue.Queue()
297 self.fiorun = ShellThread(_cmd, _q)
298 # Check if schedule is set
299 if self.scheduled_datetime:
300 logger.debug(
301 "waiting for '{}', now is '{}', total of {} sec left".format(
302 self.scheduled_datetime.strftime(_datetime_fmt),
303 datetime.now().strftime(_datetime_fmt),
304 (self.scheduled_datetime - datetime.now()).total_seconds()
305 )
306 )
307 wait_until(self.scheduled_datetime)
308 self.fiorun.run_shell()
309 _raw = []
310 _start = -1
311 _end = -1
312 while self.fiorun.is_alive() or not _q.empty():
313 while not _q.empty():
314 # processing
315 _bb = _q.get(block=False)
316 if isinstance(_bb, bytes):
317 _line = _bb.decode('utf-8')
318 else:
319 _line = _bb
320 if _start < 0 and _end < 0 and not _line.startswith("{"):
321 _time = get_time()
Alex2a7657c2021-11-10 20:51:34 -0600322 self.results[_time] = {
Alexb78191f2021-11-02 16:35:46 -0500323 "error": _line
324 }
325 self.eta = -1
326 self.fiorun.kill_shell()
Alex2a7657c2021-11-10 20:51:34 -0600327 self.finished = True
Alexb78191f2021-11-02 16:35:46 -0500328 return
329 _current = _line.splitlines()
330 _raw += _current
331 for ll in range(len(_raw)):
332 if _start < 0 and _raw[ll] == "{":
333 _start = ll
334 elif _end < 0 and _raw[ll] == "}":
335 _end = ll
336 # loop until we have full json
337 if _end < 0 or _start < 0:
338 continue
339 # if start and and found, cut json
340 (_json, _raw) = _cut(_raw, _start, _end+1)
341 _start = -1
342 _end = -1
343 # Try to parse json
344 _json = "\n".join(_json)
345 try:
346 _json = json.loads(_json)
347 _timestamp = _json["timestamp"]
348 self.timeline[_timestamp] = _json["jobs"][0]
349
350 # save last values
351 self.eta_sec = self.timeline[_timestamp]["eta"]
352 self.elapsed_sec = self.timeline[_timestamp]["elapsed"]
353 self.testrun = _json
354 except TypeError as e:
355 logger.error("ERROR: {}".format(e))
356 except json.decoder.JSONDecodeError as e:
357 logger.error("ERROR: {}".format(e))
358 if not self.eta_sec:
359 break
360 sleep(0.1)
361 # Save status to results dictionary
362 self.results[get_time(timestamp=self.testrun["timestamp"])] = {
363 "result": self.testrun,
364 "timeline": self.timeline
365 }
366 self.finished = True
367 return
368
369 def healthcheck(self):
370 _version = self.fio_version
371 _binary_path = self._shell_output if self._shell("which fio") else ""
372 if self._shell("fio --enghelp"):
373 _ioengines = self._shell_output
374 _ioengines = _ioengines.replace("\t", "")
375 _ioengines = _ioengines.splitlines()[1:]
Alex2a7657c2021-11-10 20:51:34 -0600376 self._shell_output = ""
Alexb78191f2021-11-02 16:35:46 -0500377 else:
378 _ioengines = []
379
380 return {
381 "ready": all((_version, _binary_path, _ioengines)),
382 "version": _version,
383 "path": _binary_path,
384 "ioengines": _ioengines,
385 "system": self.system,
386 "release": self.release,
387 "hostname": self.hostname
388 }
389
390 def status(self):
391 _running = self.is_alive() and self.eta_sec >= 0
392 _scheduled = False
393 _diff = -1
394 if self.scheduled_datetime:
395 _diff = (self.scheduled_datetime - datetime.now()).total_seconds()
396 if _diff > 0:
397 _scheduled = True
398 _s = "running" if _running else "idle"
399 _s = "scheduled" if _scheduled else _s
400 _s = "finished" if self.finished else _s
401 return {
402 "status": _s,
403 "progress": self.get_progress()
404 }
405
406 def end_fio(self):
407 if self.fiorun:
408 self.fiorun.kill_shell()
409
410 # Current run
411 def percent_done(self):
412 _total = self.elapsed_sec + self.eta_sec
413 return float(self.elapsed_sec) / float(_total) * 100.0
414
415 def get_progress(self):
416 return "{:.2f}".format(self.percent_done())
417
418 # latest parsed measurements
419 def get_last_measurements(self):
420 if self.timeline:
421 return self.timeline[max(list(self.timeline.keys()))]
422 else:
423 return {}
424
425
426class FioProcessShellRun(object):
427 stats = {}
428 results = {}
429
430 def __init__(self, init_class=FioProcess):
431 self.init_class = init_class
432 self.actions = {
433 "do_singlerun": self.do_singlerun,
434 "do_scheduledrun": self.do_scheduledrun,
435 "get_options": self.get_options,
436 "get_result": self.get_result,
437 "get_resultlist": self.get_resultlist
438 }
439 self.fio_reset()
440
441 @staticmethod
442 def healthcheck(fio):
443 hchk = fio.healthcheck()
444 hchk_str = \
445 "# fio status: {}\n# {} at {}\n# Engines: {}".format(
446 "ready" if hchk["ready"] else "fail",
447 hchk["version"],
448 hchk["path"],
449 ", ".join(hchk["ioengines"])
450 )
451 return hchk, hchk_str
452
453 def status(self):
454 return self.fio.status()
455
456 def fio_reset(self):
457 # Fancy way of handling fio class not even initialized yet
458 try:
459 _f = self.fio.finished
460 _r = self.fio.results
461 _o = self.fio.get_options()
462 except AttributeError:
463 _f = True
464 _r = None
465 _o = None
466 # Check if reset is needed
467 if not _f:
468 # No need to reset, fio is either idle or running
469 return
470 else:
471 # extract results if they present
472 if _r:
473 self.results.update(_r)
474 # re-init
475 _fio = self.init_class()
476 # Do healthcheck
477 self.hchk, self.hchk_str = self.healthcheck(_fio)
478 # restore options if they existed
479 if _o:
480 _fio.update_options(_o)
481 self.fio = _fio
482
483 def get_options(self):
484 _opts = deepcopy(self.fio._fio_options_common)
485 _opts.update(deepcopy(self.fio._fio_options_seq))
486 _opts.update(deepcopy(self.fio._fio_options_mix))
487 return _opts
488
489 def do_singlerun(self, options):
490 # Reset thread if it closed
491 self.fio_reset()
492 # Fill options
Alex2a7657c2021-11-10 20:51:34 -0600493 if "scheduled_to" in options:
494 # just ignore it
495 _k = "scheduled_to"
496 _v = options.pop(_k)
497 logger.warning("Ignoring option: '{}': '{}'".format(_k, _v))
Alexb78191f2021-11-02 16:35:46 -0500498 self.fio.update_options(options)
499 # Start it
500 self.fio.start()
501 return True
502
503 def do_scheduledrun(self, options):
504 # Reset thread if it closed
505 self.fio_reset()
506 # Handle scheduled time
507 if "scheduled_to" not in options:
508 # required parameter not set
Alex2a7657c2021-11-10 20:51:34 -0600509 raise CheckerException("Parameter missing: 'scheduled_to'")
Alexb78191f2021-11-02 16:35:46 -0500510 else:
511 # set time and get rid of it from options
Alex5cace3b2021-11-10 16:40:37 -0600512 _time = options.pop("scheduled_to")
513 self.fio.scheduled_datetime = datetime.strptime(
514 _time,
515 _datetime_fmt
516 )
Alexb78191f2021-11-02 16:35:46 -0500517 # Fill options
518 self.fio.update_options(options)
519 # Start it
520 self.fio.start()
521 return True
522
523 def _get_result_object(self, obj_name, time):
524 if time in self.results:
525 if obj_name in self.results[time]:
526 return self.results[time][obj_name]
527 elif "error" in self.results[time]:
528 return self.results[time]["error"]
529 else:
530 return {
531 "error": "Empty {} for '{}'".format(obj_name, time)
532 }
533 else:
534 return {
535 "error": "Result not found for '{}'".format(time)
536 }
537
538 def _update_results(self):
539 # Update only in case of completed thread
540 if self.fio.finished:
541 _r_local = list(self.results.keys())
542 _r_fio = list(self.fio.results.keys())
543 for _r in _r_fio:
544 if _r not in _r_local:
545 self.results[_r] = self.fio.results.pop(_r)
546
547 def get_result(self, time):
548 self._update_results()
549 return self._get_result_object('result', time)
550
551 def get_result_timeline(self, time):
552 self._update_results()
553 return self._get_result_object('timeline', time)
554
555 # reporting
556 def get_resultlist(self):
557 self._update_results()
558 return list(self.results.keys())
559
560 def __call__(self):
561 if not self.fio.is_alive() and not self.fio.finished:
562 self.fio.start()
563
564 while self.fio.is_alive() and self.fio.eta_sec >= 0:
565 sleep(0.2)
566 self.stats = self.fio.get_last_measurements()
567
568 _r = self.stats.get('read', {})
569 _w = self.stats.get('write', {})
570
571 _r_bw = _r.get('bw_bytes', -1)
572 _r_iops = _r.get('iops', -1)
573 _w_bw = _w.get('bw_bytes', -1)
574 _w_iops = _w.get('iops', -1)
575 _s = self.fio.status()
576 if _s["status"] == "scheduled":
577 _t = self.fio.scheduled_datetime
578 _n = datetime.now()
579 _delta = (_t - _n).total_seconds()
580 print(
581 "waiting for '{}'; now '{}'; {} sec left".format(
582 _t.strftime(_datetime_fmt),
583 _n.strftime(_datetime_fmt),
584 _delta
585 )
586 )
587 else:
588 stats = "{}: {:>7}% ({}/{}) " \
589 "(BW/IOPS: " \
590 "Read {:>9.2f} MB/{:>9.2f} " \
591 "Write {:>9.2f} MB/{:>9.2f})".format(
592 _s["status"],
593 _s["progress"],
594 self.fio.elapsed_sec,
595 self.fio.eta_sec,
596 _r_bw / 1024 / 1024,
597 _r_iops,
598 _w_bw / 1024 / 1024,
599 _w_iops
600 )
601 print(stats)
602 self.fio.end_fio()
603
604
605if __name__ == '__main__':
606 # Debug shell to test FioProcessShellRun
607 _shell = FioProcessShellRun()
608 _opts = _shell.get_options()
609 _opts["readwrite"] = "read"
610 _opts["ramp_time"] = "1s"
611 _opts["runtime"] = "5s"
612 _shell.do_singlerun(_opts)
613 _shell()
614 _times = _shell.get_resultlist()
615 print("# results:\n{}".format("\n".join(_times)))
616 # print(
617 # "##### Dumping results\n{}".format(
618 # json.dumps(_shell.get_result(_times[0]), indent=2)
619 # )
620 # )
621 _shell.fio_reset()
622 _opts = _shell.get_options()
623 _opts["readwrite"] = "read"
624 _opts["ramp_time"] = "1s"
625 _opts["runtime"] = "10s"
Alex5cace3b2021-11-10 16:40:37 -0600626 _opts["scheduled_to"] = (datetime.now() + timedelta(seconds=12)).strftime(
627 _datetime_fmt
628 )
Alexb78191f2021-11-02 16:35:46 -0500629 _shell.do_scheduledrun(_opts)
630 _shell()
631 _times = _shell.get_resultlist()
632 print("# results:\n{}".format("\n".join(_times)))