blob: f1cecab1e458988b81181c4cae2fe74feff92690 [file] [log] [blame]
Alexb78191f2021-11-02 16:35:46 -05001import json
2import os
3import queue
4import signal
5import subprocess
6
7
8from copy import deepcopy
9from datetime import datetime, timedelta
10from platform import system, release, node
11from threading import Thread
12import threading
13from time import sleep
14
15
16from cfg_checker.common.exception import CheckerException
17from cfg_checker.common.other import piped_shell
18from cfg_checker.common.log import logger
19
20
21_datetime_fmt = "%m/%d/%Y, %H:%M:%S"
22
23
24def output_reader(_stdout, outq):
25 for line in iter(_stdout.readline, ''):
26 outq.put(line)
27
28
29def _o(option, param, suffix=""):
30 return "--{}={}{}".format(option, param, suffix)
31
32
33def get_time(timestamp=None):
34 if not timestamp:
35 _t = datetime.now()
36 else:
37 _t = datetime.fromtimestamp(timestamp)
38 return _t.strftime(_datetime_fmt)
39
40
41def _get_seconds(value):
42 # assume that we have symbol at the end
43 _suffix = value[-1]
44 if _suffix == 's':
45 return int(value[:-1])
46 elif _suffix == 'm':
47 return int(value[:-1])*60
48 elif _suffix == 'h':
49 return int(value[:-1])*60*60
50 else:
51 return -1
52
53
54def wait_until(end_datetime):
55 while True:
56 diff = (end_datetime - datetime.now()).total_seconds()
57 # In case end_datetime was in past to begin with
58 if diff < 0:
59 return
60 sleep(diff/2)
61 if diff <= 0.1:
62 return
63
64
65class ShellThread(object):
66 def __init__(self, cmd, queue):
67 self.cmd = cmd
68 self.queue = queue
69 self._p = None
70 self.timeout = 15
71 self.output = []
72
73 def run_shell(self):
74 # Start
75 _cmd = " ".join(self.cmd)
76 logger.debug("... {}".format(_cmd))
77 self._p = subprocess.Popen(
78 _cmd,
79 shell=True,
80 stdout=subprocess.PIPE,
81 stderr=subprocess.STDOUT,
82 env={"PYTHONUNBUFFERED": "1"},
83 universal_newlines=True,
84 bufsize=1
85 )
86 self._t = threading.Thread(
87 target=output_reader,
88 args=(self._p.stdout, self.queue)
89 )
90 self._t.start()
91 if not self.wait_started():
92 self.kill_shell()
93
94 def is_alive(self):
95 if not self._p.poll():
96 return True
97 else:
98 return False
99
100 def wait_started(self):
101 while True:
102 if not self.queue.empty():
103 break
104 else:
105 logger.debug("... {} sec".format(self.timeout))
106 sleep(1)
107 self.timeout -= 1
108 if not self.timeout:
109 logger.debug(
110 "...timed out after {} sec".format(str(self.timeout))
111 )
112 return False
113 logger.debug("... got first fio output")
114 return True
115
116 def kill_shell(self):
117 # Run the poll
118 if not self._p.poll():
119 self._p.send_signal(signal.SIGINT)
120 self.get_output()
121
122 def get_output(self):
123 while True:
124 try:
125 line = self.queue.get(block=False)
126 line = str(line) if isinstance(line, bytes) else line
127 self.output.append(line)
128 except queue.Empty:
129 return self.output
130 return None
131
132
133class FioProcess(Thread):
134 # init vars for status
135 _fio_options_list = [
136 "--time_based",
137 "--output-format=json+",
138 "--eta=always"
139 ]
140 _fio_options_seq_list = [
141 "--thread"
142 ]
143
144 _fio_options_common = {
145 "name": "agent_run",
146 "filename": "/tmp/testfile",
147 "status-interval": "500ms",
148 "randrepeat": 0,
149 "verify": 0,
150 "direct": 1,
151 "gtod_reduce": 0,
152 "bs": "32k",
153 "iodepth": 16,
154 "size": "10G",
155 "readwrite": "randrw",
156 "ramp_time": "5s",
157 "runtime": "30s",
158 "ioengine": "libaio"
159 }
160
161 _fio_options_seq = {
162 "numjobs": 1,
163 "offset_increment": "500M"
164 }
165 _fio_options_mix = {
166 "rwmixread": 50
167 }
168
169 eta_sec = 0
170 total_time_sec = 0
171 elapsed_sec = 0
172 testrun = {}
173
174 mount_point = "/tmp"
175 filename = "testfile"
176
177 # test modes: 'randread', 'randwrite', 'read', 'write', 'randrw'
178 mode = "randrw"
179 _seq_modes = ['read', 'write']
180 _mix_modes = ['randrw']
181 _rand_modes = ['randread', 'randwrite']
182
183 # results
184 results = {}
185
186 def _shell(self, cmd):
187 self._code, self._shell_output = piped_shell(cmd, code=True)
188 if self._code:
189 logger.error(
190 "# Shell error for '{}': [{}] {}".format(
191 cmd,
192 self._code,
193 self._shell_output
194 )
195 )
196 return False
197 else:
198 return True
199
200 def recalc_times(self):
201 _rt = _get_seconds(self._fio_options_common["runtime"])
202 _rup = _get_seconds(self._fio_options_common["ramp_time"])
203 if not _rt:
204 raise CheckerException("invalid 'runtime': '{}'".format(_rt))
205 elif not _rup:
206 raise CheckerException("invalid 'ramp_time': '{}'".format(_rt))
207
208 self.total_time_sec = _rt + _rup
209 self.eta_sec = self.total_time_sec
210
211 def __init__(self):
212 Thread.__init__(self)
213 logger.info("fio thread initialized")
214 # save system
215 self.system = system()
216 self.release = release()
217 self.hostname = node()
218 # create a clear var for last shell output
219 self._shell_output = ""
220 # prepare params
221 self.recalc_times()
222 # prepare the fio
223 self.fio_version = "unknown"
224 if not self._shell("fio --version"):
225 raise CheckerException(
226 "Error running fio: '{}'".format(self._shell_output)
227 )
228 else:
229 self.fio_version = self._shell_output
230 # all outputs timeline
231 self.timeline = {}
232
233 self._fio_options_common["filename"] = os.path.join(
234 self.mount_point,
235 self.filename
236 )
237
238 if self.system == "Darwin":
239 self._fio_options_common["ioengine"] = "posixaio"
240 # Thread finish marker
241 self.finished = False
242 self.scheduled_datetime = None
243
244 def update_options(self, _dict):
245 # validate keys, do not validate numbers themselves
246 for k, v in _dict.items():
247 if k in self._fio_options_mix:
248 self._fio_options_mix[k] = v
249 elif k in self._fio_options_seq:
250 self._fio_options_seq[k] = v
251 elif k in self._fio_options_common:
252 self._fio_options_common[k] = v
253 else:
254 raise CheckerException(
255 "Error running fio: '{}'".format(self._shell_output)
256 )
257 # recalc
258 self.recalc_times()
259
260 def run(self):
261 def _cut(_list, _s, _e):
262 _new = _list[_s:_e]
263 _pre = _list[:_s]
264 _list = _pre + _list[_e:]
265 return (_new, _list)
266
267 # create a cmd
268 _cmd = ["fio"]
269 _cmd += self._fio_options_list
270 _cmd += [_o(k, v) for k, v in self._fio_options_common.items()]
271
272 if self._fio_options_common["readwrite"] in self._seq_modes:
273 _sq = self._fio_options_seq_list
274 _cmd += _sq + [_o(k, v) for k, v in self._fio_options_seq.items()]
275 elif self._fio_options_common["readwrite"] in self._mix_modes:
276 _cmd += [_o(k, v) for k, v in self._fio_options_mix.items()]
277
278 _q = queue.Queue()
279 self.fiorun = ShellThread(_cmd, _q)
280 # Check if schedule is set
281 if self.scheduled_datetime:
282 logger.debug(
283 "waiting for '{}', now is '{}', total of {} sec left".format(
284 self.scheduled_datetime.strftime(_datetime_fmt),
285 datetime.now().strftime(_datetime_fmt),
286 (self.scheduled_datetime - datetime.now()).total_seconds()
287 )
288 )
289 wait_until(self.scheduled_datetime)
290 self.fiorun.run_shell()
291 _raw = []
292 _start = -1
293 _end = -1
294 while self.fiorun.is_alive() or not _q.empty():
295 while not _q.empty():
296 # processing
297 _bb = _q.get(block=False)
298 if isinstance(_bb, bytes):
299 _line = _bb.decode('utf-8')
300 else:
301 _line = _bb
302 if _start < 0 and _end < 0 and not _line.startswith("{"):
303 _time = get_time()
304 self.result[_time] = {
305 "error": _line
306 }
307 self.eta = -1
308 self.fiorun.kill_shell()
309 return
310 _current = _line.splitlines()
311 _raw += _current
312 for ll in range(len(_raw)):
313 if _start < 0 and _raw[ll] == "{":
314 _start = ll
315 elif _end < 0 and _raw[ll] == "}":
316 _end = ll
317 # loop until we have full json
318 if _end < 0 or _start < 0:
319 continue
320 # if start and and found, cut json
321 (_json, _raw) = _cut(_raw, _start, _end+1)
322 _start = -1
323 _end = -1
324 # Try to parse json
325 _json = "\n".join(_json)
326 try:
327 _json = json.loads(_json)
328 _timestamp = _json["timestamp"]
329 self.timeline[_timestamp] = _json["jobs"][0]
330
331 # save last values
332 self.eta_sec = self.timeline[_timestamp]["eta"]
333 self.elapsed_sec = self.timeline[_timestamp]["elapsed"]
334 self.testrun = _json
335 except TypeError as e:
336 logger.error("ERROR: {}".format(e))
337 except json.decoder.JSONDecodeError as e:
338 logger.error("ERROR: {}".format(e))
339 if not self.eta_sec:
340 break
341 sleep(0.1)
342 # Save status to results dictionary
343 self.results[get_time(timestamp=self.testrun["timestamp"])] = {
344 "result": self.testrun,
345 "timeline": self.timeline
346 }
347 self.finished = True
348 return
349
350 def healthcheck(self):
351 _version = self.fio_version
352 _binary_path = self._shell_output if self._shell("which fio") else ""
353 if self._shell("fio --enghelp"):
354 _ioengines = self._shell_output
355 _ioengines = _ioengines.replace("\t", "")
356 _ioengines = _ioengines.splitlines()[1:]
357 else:
358 _ioengines = []
359
360 return {
361 "ready": all((_version, _binary_path, _ioengines)),
362 "version": _version,
363 "path": _binary_path,
364 "ioengines": _ioengines,
365 "system": self.system,
366 "release": self.release,
367 "hostname": self.hostname
368 }
369
370 def status(self):
371 _running = self.is_alive() and self.eta_sec >= 0
372 _scheduled = False
373 _diff = -1
374 if self.scheduled_datetime:
375 _diff = (self.scheduled_datetime - datetime.now()).total_seconds()
376 if _diff > 0:
377 _scheduled = True
378 _s = "running" if _running else "idle"
379 _s = "scheduled" if _scheduled else _s
380 _s = "finished" if self.finished else _s
381 return {
382 "status": _s,
383 "progress": self.get_progress()
384 }
385
386 def end_fio(self):
387 if self.fiorun:
388 self.fiorun.kill_shell()
389
390 # Current run
391 def percent_done(self):
392 _total = self.elapsed_sec + self.eta_sec
393 return float(self.elapsed_sec) / float(_total) * 100.0
394
395 def get_progress(self):
396 return "{:.2f}".format(self.percent_done())
397
398 # latest parsed measurements
399 def get_last_measurements(self):
400 if self.timeline:
401 return self.timeline[max(list(self.timeline.keys()))]
402 else:
403 return {}
404
405
406class FioProcessShellRun(object):
407 stats = {}
408 results = {}
409
410 def __init__(self, init_class=FioProcess):
411 self.init_class = init_class
412 self.actions = {
413 "do_singlerun": self.do_singlerun,
414 "do_scheduledrun": self.do_scheduledrun,
415 "get_options": self.get_options,
416 "get_result": self.get_result,
417 "get_resultlist": self.get_resultlist
418 }
419 self.fio_reset()
420
421 @staticmethod
422 def healthcheck(fio):
423 hchk = fio.healthcheck()
424 hchk_str = \
425 "# fio status: {}\n# {} at {}\n# Engines: {}".format(
426 "ready" if hchk["ready"] else "fail",
427 hchk["version"],
428 hchk["path"],
429 ", ".join(hchk["ioengines"])
430 )
431 return hchk, hchk_str
432
433 def status(self):
434 return self.fio.status()
435
436 def fio_reset(self):
437 # Fancy way of handling fio class not even initialized yet
438 try:
439 _f = self.fio.finished
440 _r = self.fio.results
441 _o = self.fio.get_options()
442 except AttributeError:
443 _f = True
444 _r = None
445 _o = None
446 # Check if reset is needed
447 if not _f:
448 # No need to reset, fio is either idle or running
449 return
450 else:
451 # extract results if they present
452 if _r:
453 self.results.update(_r)
454 # re-init
455 _fio = self.init_class()
456 # Do healthcheck
457 self.hchk, self.hchk_str = self.healthcheck(_fio)
458 # restore options if they existed
459 if _o:
460 _fio.update_options(_o)
461 self.fio = _fio
462
463 def get_options(self):
464 _opts = deepcopy(self.fio._fio_options_common)
465 _opts.update(deepcopy(self.fio._fio_options_seq))
466 _opts.update(deepcopy(self.fio._fio_options_mix))
467 return _opts
468
469 def do_singlerun(self, options):
470 # Reset thread if it closed
471 self.fio_reset()
472 # Fill options
473 self.fio.update_options(options)
474 # Start it
475 self.fio.start()
476 return True
477
478 def do_scheduledrun(self, options):
479 # Reset thread if it closed
480 self.fio_reset()
481 # Handle scheduled time
482 if "scheduled_to" not in options:
483 # required parameter not set
484 return False
485 else:
486 # set time and get rid of it from options
487 self.fio.scheduled_datetime = options.pop("scheduled_to")
488 # Fill options
489 self.fio.update_options(options)
490 # Start it
491 self.fio.start()
492 return True
493
494 def _get_result_object(self, obj_name, time):
495 if time in self.results:
496 if obj_name in self.results[time]:
497 return self.results[time][obj_name]
498 elif "error" in self.results[time]:
499 return self.results[time]["error"]
500 else:
501 return {
502 "error": "Empty {} for '{}'".format(obj_name, time)
503 }
504 else:
505 return {
506 "error": "Result not found for '{}'".format(time)
507 }
508
509 def _update_results(self):
510 # Update only in case of completed thread
511 if self.fio.finished:
512 _r_local = list(self.results.keys())
513 _r_fio = list(self.fio.results.keys())
514 for _r in _r_fio:
515 if _r not in _r_local:
516 self.results[_r] = self.fio.results.pop(_r)
517
518 def get_result(self, time):
519 self._update_results()
520 return self._get_result_object('result', time)
521
522 def get_result_timeline(self, time):
523 self._update_results()
524 return self._get_result_object('timeline', time)
525
526 # reporting
527 def get_resultlist(self):
528 self._update_results()
529 return list(self.results.keys())
530
531 def __call__(self):
532 if not self.fio.is_alive() and not self.fio.finished:
533 self.fio.start()
534
535 while self.fio.is_alive() and self.fio.eta_sec >= 0:
536 sleep(0.2)
537 self.stats = self.fio.get_last_measurements()
538
539 _r = self.stats.get('read', {})
540 _w = self.stats.get('write', {})
541
542 _r_bw = _r.get('bw_bytes', -1)
543 _r_iops = _r.get('iops', -1)
544 _w_bw = _w.get('bw_bytes', -1)
545 _w_iops = _w.get('iops', -1)
546 _s = self.fio.status()
547 if _s["status"] == "scheduled":
548 _t = self.fio.scheduled_datetime
549 _n = datetime.now()
550 _delta = (_t - _n).total_seconds()
551 print(
552 "waiting for '{}'; now '{}'; {} sec left".format(
553 _t.strftime(_datetime_fmt),
554 _n.strftime(_datetime_fmt),
555 _delta
556 )
557 )
558 else:
559 stats = "{}: {:>7}% ({}/{}) " \
560 "(BW/IOPS: " \
561 "Read {:>9.2f} MB/{:>9.2f} " \
562 "Write {:>9.2f} MB/{:>9.2f})".format(
563 _s["status"],
564 _s["progress"],
565 self.fio.elapsed_sec,
566 self.fio.eta_sec,
567 _r_bw / 1024 / 1024,
568 _r_iops,
569 _w_bw / 1024 / 1024,
570 _w_iops
571 )
572 print(stats)
573 self.fio.end_fio()
574
575
576if __name__ == '__main__':
577 # Debug shell to test FioProcessShellRun
578 _shell = FioProcessShellRun()
579 _opts = _shell.get_options()
580 _opts["readwrite"] = "read"
581 _opts["ramp_time"] = "1s"
582 _opts["runtime"] = "5s"
583 _shell.do_singlerun(_opts)
584 _shell()
585 _times = _shell.get_resultlist()
586 print("# results:\n{}".format("\n".join(_times)))
587 # print(
588 # "##### Dumping results\n{}".format(
589 # json.dumps(_shell.get_result(_times[0]), indent=2)
590 # )
591 # )
592 _shell.fio_reset()
593 _opts = _shell.get_options()
594 _opts["readwrite"] = "read"
595 _opts["ramp_time"] = "1s"
596 _opts["runtime"] = "10s"
597 _opts["scheduled_to"] = datetime.now() + timedelta(seconds=12)
598 _shell.do_scheduledrun(_opts)
599 _shell()
600 _times = _shell.get_resultlist()
601 print("# results:\n{}".format("\n".join(_times)))