| # Author: Alex Savatieiev (osavatieiev@mirantis.com; a.savex@gmail.com) |
| # Copyright 2019-2022 Mirantis, Inc. |
| import json |
| import os |
| import queue |
| import signal |
| import subprocess |
| |
| |
| from copy import deepcopy |
| from datetime import datetime, timezone |
| from platform import system, release, node |
| from threading import Thread |
| import threading |
| from time import sleep |
| |
| |
| from cfg_checker.common.exception import CheckerException |
| from cfg_checker.common.other import piped_shell |
| from cfg_checker.common.log import logger |
| |
| |
| _datetime_fmt = "%m/%d/%Y, %H:%M:%S%z" |
| fio_options_common = { |
| "name": "agent_run", |
| "filename": "/cephvol/testfile", |
| "status-interval": "500ms", |
| "randrepeat": 0, |
| "verify": 0, |
| "direct": 1, |
| "gtod_reduce": 0, |
| "bs": "32k", |
| "iodepth": 16, |
| "size": "10G", |
| "readwrite": "randrw", |
| "ramp_time": "5s", |
| "runtime": "30s", |
| "ioengine": "libaio" |
| } |
| |
| fio_options_seq = { |
| "numjobs": 1, |
| "offset_increment": "500M" |
| } |
| fio_options_mix = { |
| "rwmixread": 50 |
| } |
| |
| seq_modes = ['read', 'write'] |
| mix_modes = ['randrw'] |
| rand_modes = ['randread', 'randwrite'] |
| |
| |
| def get_fio_options(): |
| # Duplicate function for external option access |
| _opts = deepcopy(fio_options_common) |
| _opts.update(deepcopy(fio_options_seq)) |
| _opts.update(deepcopy(fio_options_mix)) |
| return _opts |
| |
| |
| def output_reader(_stdout, outq): |
| for line in iter(_stdout.readline, ''): |
| outq.put(line) |
| |
| |
| def _o(option, param, suffix=""): |
| return "--{}={}{}".format(option, param, suffix) |
| |
| |
| def get_time(timestamp=None): |
| if not timestamp: |
| _t = datetime.now(timezone.utc) |
| else: |
| _t = datetime.fromtimestamp(timestamp) |
| return _t.strftime(_datetime_fmt) |
| |
| |
| def _get_seconds(value): |
| # assume that we have symbol at the end |
| _suffix = value[-1] |
| if _suffix == 's': |
| return int(value[:-1]) |
| elif _suffix == 'm': |
| return int(value[:-1])*60 |
| elif _suffix == 'h': |
| return int(value[:-1])*60*60 |
| else: |
| return -1 |
| |
| |
| def wait_until(end_datetime): |
| while True: |
| diff = (end_datetime - datetime.now(timezone.utc)).total_seconds() |
| # In case end_datetime was in past to begin with |
| if diff < 0: |
| return |
| sleep(diff/2) |
| if diff <= 0.1: |
| return |
| |
| |
| class ShellThread(object): |
| def __init__(self, cmd, queue): |
| self.cmd = cmd |
| self.queue = queue |
| self._p = None |
| self.timeout = 15 |
| self.output = [] |
| |
| def run_shell(self): |
| # Start |
| _cmd = " ".join(self.cmd) |
| logger.debug("... {}".format(_cmd)) |
| self._p = subprocess.Popen( |
| _cmd, |
| shell=True, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.STDOUT, |
| env={"PYTHONUNBUFFERED": "1"}, |
| universal_newlines=True, |
| bufsize=1 |
| ) |
| self._t = threading.Thread( |
| target=output_reader, |
| args=(self._p.stdout, self.queue) |
| ) |
| self._t.start() |
| if not self.wait_started(): |
| self.kill_shell() |
| |
| def is_alive(self): |
| if not self._p.poll(): |
| return True |
| else: |
| return False |
| |
| def wait_started(self): |
| while True: |
| if not self.queue.empty(): |
| break |
| else: |
| logger.debug("... {} sec".format(self.timeout)) |
| sleep(1) |
| self.timeout -= 1 |
| if not self.timeout: |
| logger.debug( |
| "...timed out after {} sec".format(str(self.timeout)) |
| ) |
| return False |
| logger.debug("... got first fio output") |
| return True |
| |
| def kill_shell(self): |
| # Run the poll |
| if not self._p.poll(): |
| self._p.send_signal(signal.SIGINT) |
| self.get_output() |
| |
| def get_output(self): |
| while True: |
| try: |
| line = self.queue.get(block=False) |
| line = str(line) if isinstance(line, bytes) else line |
| self.output.append(line) |
| except queue.Empty: |
| return self.output |
| return None |
| |
| |
| class FioProcess(Thread): |
| # init vars for status |
| _fio_options_list = [ |
| "--time_based", |
| "--output-format=json+", |
| "--eta=always" |
| ] |
| _fio_options_seq_list = [ |
| "--thread" |
| ] |
| |
| _fio_options_common = fio_options_common |
| _fio_options_seq = fio_options_seq |
| _fio_options_mix = fio_options_mix |
| |
| eta_sec = 0 |
| total_time_sec = 0 |
| elapsed_sec = 0 |
| testrun = {} |
| |
| mount_point = "/cephvol" |
| filename = "testfile" |
| |
| # test modes: 'randread', 'randwrite', 'read', 'write', 'randrw' |
| mode = "randrw" |
| _seq_modes = seq_modes |
| _mix_modes = mix_modes |
| _rand_modes = rand_modes |
| |
| # results |
| results = {} |
| |
| def _shell(self, cmd): |
| self._code, self._shell_output = piped_shell(cmd, code=True) |
| if self._code: |
| logger.error( |
| "# Shell error for '{}': [{}] {}".format( |
| cmd, |
| self._code, |
| self._shell_output |
| ) |
| ) |
| return False |
| else: |
| return True |
| |
| def recalc_times(self): |
| _rt = _get_seconds(self._fio_options_common["runtime"]) |
| _rup = _get_seconds(self._fio_options_common["ramp_time"]) |
| if not _rt: |
| raise CheckerException("invalid 'runtime': '{}'".format(_rt)) |
| elif not _rup: |
| raise CheckerException("invalid 'ramp_time': '{}'".format(_rt)) |
| |
| self.total_time_sec = _rt + _rup |
| self.eta_sec = self.total_time_sec |
| |
| def __init__(self): |
| Thread.__init__(self) |
| logger.info("fio thread initialized") |
| # save system |
| self.system = system() |
| self.release = release() |
| self.hostname = node() |
| # create a clear var for last shell output |
| self._shell_output = "" |
| # prepare params |
| self.recalc_times() |
| # prepare the fio |
| self.fio_version = "unknown" |
| if not self._shell("fio --version"): |
| raise CheckerException( |
| "Error running fio: '{}'".format(self._shell_output) |
| ) |
| else: |
| self.fio_version = self._shell_output |
| # all outputs timeline |
| self.timeline = {} |
| # setup target file |
| if not os.path.exists(self.mount_point): |
| logger.warning( |
| "WARNING: '{}' not exists, using tmp folder".format( |
| self.mount_point |
| ) |
| ) |
| self.mount_point = "/tmp" |
| self._fio_options_common["filename"] = os.path.join( |
| self.mount_point, |
| self.filename |
| ) |
| |
| if self.system == "Darwin": |
| self._fio_options_common["ioengine"] = "posixaio" |
| # Thread finish marker |
| self.finished = False |
| self.testrun_starttime = None |
| self.scheduled_datetime = None |
| |
| def update_options(self, _dict): |
| # validate keys, do not validate numbers themselves |
| for k, v in _dict.items(): |
| if k in self._fio_options_mix: |
| self._fio_options_mix[k] = v |
| elif k in self._fio_options_seq: |
| self._fio_options_seq[k] = v |
| elif k in self._fio_options_common: |
| self._fio_options_common[k] = v |
| else: |
| raise CheckerException( |
| "Unknown option: '{}': '{}'".format(k, v) |
| ) |
| # recalc |
| self.recalc_times() |
| |
| def run(self): |
| def _cut(_list, _s, _e): |
| _new = _list[_s:_e] |
| _pre = _list[:_s] |
| _list = _pre + _list[_e:] |
| return (_new, _list) |
| |
| # create a cmd |
| _cmd = ["fio"] |
| _cmd += self._fio_options_list |
| _cmd += [_o(k, v) for k, v in self._fio_options_common.items()] |
| |
| if self._fio_options_common["readwrite"] in self._seq_modes: |
| _sq = self._fio_options_seq_list |
| _cmd += _sq + [_o(k, v) for k, v in self._fio_options_seq.items()] |
| elif self._fio_options_common["readwrite"] in self._mix_modes: |
| _cmd += [_o(k, v) for k, v in self._fio_options_mix.items()] |
| |
| _q = queue.Queue() |
| self.fiorun = ShellThread(_cmd, _q) |
| # Check if schedule is set |
| _now = datetime.now(timezone.utc) |
| if self.scheduled_datetime: |
| logger.debug( |
| "waiting for '{}', now is '{}', total of {} sec left".format( |
| self.scheduled_datetime.strftime(_datetime_fmt), |
| _now.strftime(_datetime_fmt), |
| (self.scheduled_datetime - _now).total_seconds() |
| ) |
| ) |
| wait_until(self.scheduled_datetime) |
| else: |
| self.testrun_starttime = _now.strftime(_datetime_fmt) |
| self.fiorun.run_shell() |
| _raw = [] |
| _start = -1 |
| _end = -1 |
| while self.fiorun.is_alive() or not _q.empty(): |
| while not _q.empty(): |
| # processing |
| _bb = _q.get(block=False) |
| if isinstance(_bb, bytes): |
| _line = _bb.decode('utf-8') |
| else: |
| _line = _bb |
| if _start < 0 and _end < 0 and not _line.startswith("{"): |
| self.results[self.testrun_starttime] = { |
| "error": _line |
| } |
| self.eta = -1 |
| self.fiorun.kill_shell() |
| self.finished = True |
| return |
| _current = _line.splitlines() |
| _raw += _current |
| for ll in range(len(_raw)): |
| if _start < 0 and _raw[ll] == "{": |
| _start = ll |
| elif _end < 0 and _raw[ll] == "}": |
| _end = ll |
| # loop until we have full json |
| if _end < 0 or _start < 0: |
| continue |
| # if start and and found, cut json |
| (_json, _raw) = _cut(_raw, _start, _end+1) |
| _start = -1 |
| _end = -1 |
| # Try to parse json |
| _json = "\n".join(_json) |
| try: |
| _json = json.loads(_json) |
| _timestamp = _json["timestamp"] |
| self.timeline[_timestamp] = _json["jobs"][0] |
| |
| # save last values |
| self.eta_sec = self.timeline[_timestamp]["eta"] |
| self.elapsed_sec = self.timeline[_timestamp]["elapsed"] |
| self.testrun = _json |
| except TypeError as e: |
| logger.error("ERROR: {}".format(e)) |
| except json.decoder.JSONDecodeError as e: |
| logger.error("ERROR: {}".format(e)) |
| if not self.eta_sec: |
| break |
| sleep(0.1) |
| # Save status to results dictionary |
| self.results[self.testrun_starttime] = { |
| "result": self.testrun, |
| "timeline": self.timeline |
| } |
| self.finished = True |
| self.scheduled_datetime = None |
| self.testrun_starttime = None |
| return |
| |
| def healthcheck(self): |
| _version = self.fio_version |
| _binary_path = self._shell_output if self._shell("which fio") else "" |
| if self._shell("fio --enghelp"): |
| _ioengines = self._shell_output |
| _ioengines = _ioengines.replace("\t", "") |
| _ioengines = _ioengines.splitlines()[1:] |
| self._shell_output = "" |
| else: |
| _ioengines = [] |
| |
| return { |
| "ready": all((_version, _binary_path, _ioengines)), |
| "version": _version, |
| "path": _binary_path, |
| "ioengines": _ioengines, |
| "system": self.system, |
| "release": self.release, |
| "hostname": self.hostname |
| } |
| |
| def status(self): |
| _running = self.is_alive() and self.eta_sec >= 0 |
| _scheduled = False |
| _diff = -1 |
| if self.scheduled_datetime: |
| _now = datetime.now(timezone.utc) |
| _diff = (self.scheduled_datetime - _now).total_seconds() |
| if _diff > 0: |
| _scheduled = True |
| _s = "running" if _running else "idle" |
| _s = "scheduled" if _scheduled else _s |
| _s = "finished" if self.finished else _s |
| return { |
| "status": _s, |
| "progress": self.get_progress() |
| } |
| |
| def end_fio(self): |
| if self.fiorun: |
| self.fiorun.kill_shell() |
| |
| # Current run |
| def percent_done(self): |
| _total = self.elapsed_sec + self.eta_sec |
| return float(self.elapsed_sec) / float(_total) * 100.0 |
| |
| def get_progress(self): |
| return "{:.2f}".format(self.percent_done()) |
| |
| # latest parsed measurements |
| def get_last_measurements(self): |
| if self.timeline: |
| return self.timeline[max(list(self.timeline.keys()))] |
| else: |
| return {} |
| |
| |
| class FioProcessShellRun(object): |
| stats = {} |
| results = {} |
| |
| def __init__(self, init_class=FioProcess): |
| self.init_class = init_class |
| self.actions = { |
| "do_singlerun": self.do_singlerun, |
| "do_scheduledrun": self.do_scheduledrun, |
| "get_options": self.get_options, |
| "get_result": self.get_result, |
| "get_resultlist": self.get_resultlist |
| } |
| self.fio_reset() |
| |
| @staticmethod |
| def healthcheck(fio): |
| hchk = fio.healthcheck() |
| hchk_str = \ |
| "# fio status: {}\n# {} at {}\n# Engines: {}".format( |
| "ready" if hchk["ready"] else "fail", |
| hchk["version"], |
| hchk["path"], |
| ", ".join(hchk["ioengines"]) |
| ) |
| return hchk, hchk_str |
| |
| def status(self): |
| return self.fio.status() |
| |
| def fio_reset(self): |
| # Fancy way of handling fio class not even initialized yet |
| try: |
| _f = self.fio.finished |
| _r = self.fio.results |
| _o = self.fio.get_options() |
| except AttributeError: |
| _f = True |
| _r = None |
| _o = None |
| # Check if reset is needed |
| if not _f: |
| # No need to reset, fio is either idle or running |
| return |
| else: |
| # extract results if they present |
| if _r: |
| self.results.update(_r) |
| # re-init |
| _fio = self.init_class() |
| # Do healthcheck |
| self.hchk, self.hchk_str = self.healthcheck(_fio) |
| # restore options if they existed |
| if _o: |
| _fio.update_options(_o) |
| self.fio = _fio |
| |
| def get_options(self): |
| _opts = deepcopy(self.fio._fio_options_common) |
| _opts.update(deepcopy(self.fio._fio_options_seq)) |
| _opts.update(deepcopy(self.fio._fio_options_mix)) |
| return _opts |
| |
| def do_singlerun(self, options): |
| # Reset thread if it closed |
| self.fio_reset() |
| # Fill options |
| if "scheduled_to" in options: |
| # just ignore it |
| _k = "scheduled_to" |
| _v = options.pop(_k) |
| logger.warning("Ignoring option: '{}': '{}'".format(_k, _v)) |
| self.fio.update_options(options) |
| # Start it |
| self.fio.start() |
| return True |
| |
| def do_scheduledrun(self, options): |
| # Reset thread if it closed |
| self.fio_reset() |
| # Handle scheduled time |
| if "scheduled_to" not in options: |
| # required parameter not set |
| raise CheckerException("Parameter missing: 'scheduled_to'") |
| else: |
| # set time and get rid of it from options |
| self.fio.testrun_starttime = options.pop("scheduled_to") |
| self.fio.scheduled_datetime = datetime.strptime( |
| self.fio.testrun_starttime, |
| _datetime_fmt |
| ) |
| # Fill options |
| self.fio.update_options(options) |
| # Start it |
| self.fio.start() |
| return True |
| |
| def _get_result_object(self, obj_name, time): |
| if time in self.results: |
| if obj_name in self.results[time]: |
| return self.results[time][obj_name] |
| elif "error" in self.results[time]: |
| return self.results[time]["error"] |
| else: |
| return { |
| "error": "Empty {} for '{}'".format(obj_name, time) |
| } |
| else: |
| return { |
| "error": "Result not found for '{}'".format(time) |
| } |
| |
| def _update_results(self): |
| # Update only in case of completed thread |
| if self.fio.finished: |
| _r_local = list(self.results.keys()) |
| _r_fio = list(self.fio.results.keys()) |
| for _r in _r_fio: |
| if _r not in _r_local: |
| self.results[_r] = self.fio.results.pop(_r) |
| |
| def get_result(self, time): |
| self._update_results() |
| return self._get_result_object('result', time) |
| |
| def get_result_timeline(self, time): |
| self._update_results() |
| return self._get_result_object('timeline', time) |
| |
| # reporting |
| def get_resultlist(self): |
| self._update_results() |
| return list(self.results.keys()) |
| |
| def __call__(self): |
| if not self.fio.is_alive() and not self.fio.finished: |
| self.fio.start() |
| |
| while self.fio.is_alive() and self.fio.eta_sec >= 0: |
| sleep(0.2) |
| self.stats = self.fio.get_last_measurements() |
| |
| _r = self.stats.get('read', {}) |
| _w = self.stats.get('write', {}) |
| |
| _r_bw = _r.get('bw_bytes', -1) |
| _r_iops = _r.get('iops', -1) |
| _w_bw = _w.get('bw_bytes', -1) |
| _w_iops = _w.get('iops', -1) |
| _s = self.fio.status() |
| if _s["status"] == "scheduled": |
| _t = self.fio.scheduled_datetime |
| _n = datetime.now(timezone.utc) |
| _delta = (_t - _n).total_seconds() |
| print( |
| "{}: waiting for '{}'; now '{}'; {} sec left".format( |
| _s["status"], |
| _t.strftime(_datetime_fmt), |
| _n.strftime(_datetime_fmt), |
| _delta |
| ) |
| ) |
| else: |
| stats = "{}: {:>7}% ({}/{}) " \ |
| "(BW/IOPS: " \ |
| "Read {:>9.2f} MB/{:>9.2f} " \ |
| "Write {:>9.2f} MB/{:>9.2f})".format( |
| _s["status"], |
| _s["progress"], |
| self.fio.elapsed_sec, |
| self.fio.eta_sec, |
| _r_bw / 1024 / 1024, |
| _r_iops, |
| _w_bw / 1024 / 1024, |
| _w_iops |
| ) |
| print(stats) |
| self.fio.end_fio() |
| |
| |
| if __name__ == '__main__': |
| # Debug shell to test FioProcessShellRun |
| _shell = FioProcessShellRun() |
| _opts = _shell.get_options() |
| _opts["readwrite"] = "read" |
| _opts["ramp_time"] = "1s" |
| _opts["runtime"] = "5s" |
| _opts["scheduled_to"] = "11/23/2021, 21:48:20+0000" |
| _shell.do_scheduledrun(_opts) |
| _shell() |
| _times = _shell.get_resultlist() |
| print("# results:\n{}".format("\n".join(_times))) |
| # print( |
| # "##### Dumping results\n{}".format( |
| # json.dumps(_shell.get_result(_times[0]), indent=2) |
| # ) |
| # ) |
| _shell.fio_reset() |
| _opts = _shell.get_options() |
| _opts["readwrite"] = "read" |
| _opts["ramp_time"] = "1s" |
| _opts["runtime"] = "10s" |
| # _opts["scheduled_to"] = "11/23/2021, 21:40:30+0000" |
| _shell.do_singlerun(_opts) |
| _shell() |
| _times = _shell.get_resultlist() |
| print("# results:\n{}".format("\n".join(_times))) |