blob: c8488afad5e11155be22861bf8dd56411a09c90a [file] [log] [blame]
import json
import os
import queue
import signal
import subprocess
from copy import deepcopy
from datetime import datetime, timezone
from platform import system, release, node
from threading import Thread
import threading
from time import sleep
from cfg_checker.common.exception import CheckerException
from cfg_checker.common.other import piped_shell
from cfg_checker.common.log import logger
_datetime_fmt = "%m/%d/%Y, %H:%M:%S%z"
fio_options_common = {
"name": "agent_run",
"filename": "/cephvol/testfile",
"status-interval": "500ms",
"randrepeat": 0,
"verify": 0,
"direct": 1,
"gtod_reduce": 0,
"bs": "32k",
"iodepth": 16,
"size": "10G",
"readwrite": "randrw",
"ramp_time": "5s",
"runtime": "30s",
"ioengine": "libaio"
}
fio_options_seq = {
"numjobs": 1,
"offset_increment": "500M"
}
fio_options_mix = {
"rwmixread": 50
}
def get_fio_options():
# Duplicate function for external option access
_opts = deepcopy(fio_options_common)
_opts.update(deepcopy(fio_options_seq))
_opts.update(deepcopy(fio_options_mix))
return _opts
def output_reader(_stdout, outq):
for line in iter(_stdout.readline, ''):
outq.put(line)
def _o(option, param, suffix=""):
return "--{}={}{}".format(option, param, suffix)
def get_time(timestamp=None):
if not timestamp:
_t = datetime.now(timezone.utc)
else:
_t = datetime.fromtimestamp(timestamp)
return _t.strftime(_datetime_fmt)
def _get_seconds(value):
# assume that we have symbol at the end
_suffix = value[-1]
if _suffix == 's':
return int(value[:-1])
elif _suffix == 'm':
return int(value[:-1])*60
elif _suffix == 'h':
return int(value[:-1])*60*60
else:
return -1
def wait_until(end_datetime):
while True:
diff = (end_datetime - datetime.now(timezone.utc)).total_seconds()
# In case end_datetime was in past to begin with
if diff < 0:
return
sleep(diff/2)
if diff <= 0.1:
return
class ShellThread(object):
def __init__(self, cmd, queue):
self.cmd = cmd
self.queue = queue
self._p = None
self.timeout = 15
self.output = []
def run_shell(self):
# Start
_cmd = " ".join(self.cmd)
logger.debug("... {}".format(_cmd))
self._p = subprocess.Popen(
_cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
env={"PYTHONUNBUFFERED": "1"},
universal_newlines=True,
bufsize=1
)
self._t = threading.Thread(
target=output_reader,
args=(self._p.stdout, self.queue)
)
self._t.start()
if not self.wait_started():
self.kill_shell()
def is_alive(self):
if not self._p.poll():
return True
else:
return False
def wait_started(self):
while True:
if not self.queue.empty():
break
else:
logger.debug("... {} sec".format(self.timeout))
sleep(1)
self.timeout -= 1
if not self.timeout:
logger.debug(
"...timed out after {} sec".format(str(self.timeout))
)
return False
logger.debug("... got first fio output")
return True
def kill_shell(self):
# Run the poll
if not self._p.poll():
self._p.send_signal(signal.SIGINT)
self.get_output()
def get_output(self):
while True:
try:
line = self.queue.get(block=False)
line = str(line) if isinstance(line, bytes) else line
self.output.append(line)
except queue.Empty:
return self.output
return None
class FioProcess(Thread):
# init vars for status
_fio_options_list = [
"--time_based",
"--output-format=json+",
"--eta=always"
]
_fio_options_seq_list = [
"--thread"
]
_fio_options_common = fio_options_common
_fio_options_seq = fio_options_seq
_fio_options_mix = fio_options_mix
eta_sec = 0
total_time_sec = 0
elapsed_sec = 0
testrun = {}
mount_point = "/cephvol"
filename = "testfile"
# test modes: 'randread', 'randwrite', 'read', 'write', 'randrw'
mode = "randrw"
_seq_modes = ['read', 'write']
_mix_modes = ['randrw']
_rand_modes = ['randread', 'randwrite']
# results
results = {}
def _shell(self, cmd):
self._code, self._shell_output = piped_shell(cmd, code=True)
if self._code:
logger.error(
"# Shell error for '{}': [{}] {}".format(
cmd,
self._code,
self._shell_output
)
)
return False
else:
return True
def recalc_times(self):
_rt = _get_seconds(self._fio_options_common["runtime"])
_rup = _get_seconds(self._fio_options_common["ramp_time"])
if not _rt:
raise CheckerException("invalid 'runtime': '{}'".format(_rt))
elif not _rup:
raise CheckerException("invalid 'ramp_time': '{}'".format(_rt))
self.total_time_sec = _rt + _rup
self.eta_sec = self.total_time_sec
def __init__(self):
Thread.__init__(self)
logger.info("fio thread initialized")
# save system
self.system = system()
self.release = release()
self.hostname = node()
# create a clear var for last shell output
self._shell_output = ""
# prepare params
self.recalc_times()
# prepare the fio
self.fio_version = "unknown"
if not self._shell("fio --version"):
raise CheckerException(
"Error running fio: '{}'".format(self._shell_output)
)
else:
self.fio_version = self._shell_output
# all outputs timeline
self.timeline = {}
# setup target file
if not os.path.exists(self.mount_point):
logger.warning(
"WARNING: '{}' not exists, using tmp folder".format(
self.mount_point
)
)
self.mount_point = "/tmp"
self._fio_options_common["filename"] = os.path.join(
self.mount_point,
self.filename
)
if self.system == "Darwin":
self._fio_options_common["ioengine"] = "posixaio"
# Thread finish marker
self.finished = False
self.scheduled_datetime = None
def update_options(self, _dict):
# validate keys, do not validate numbers themselves
for k, v in _dict.items():
if k in self._fio_options_mix:
self._fio_options_mix[k] = v
elif k in self._fio_options_seq:
self._fio_options_seq[k] = v
elif k in self._fio_options_common:
self._fio_options_common[k] = v
else:
raise CheckerException(
"Unknown option: '{}': '{}'".format(k, v)
)
# recalc
self.recalc_times()
def run(self):
def _cut(_list, _s, _e):
_new = _list[_s:_e]
_pre = _list[:_s]
_list = _pre + _list[_e:]
return (_new, _list)
# create a cmd
_cmd = ["fio"]
_cmd += self._fio_options_list
_cmd += [_o(k, v) for k, v in self._fio_options_common.items()]
if self._fio_options_common["readwrite"] in self._seq_modes:
_sq = self._fio_options_seq_list
_cmd += _sq + [_o(k, v) for k, v in self._fio_options_seq.items()]
elif self._fio_options_common["readwrite"] in self._mix_modes:
_cmd += [_o(k, v) for k, v in self._fio_options_mix.items()]
_q = queue.Queue()
self.fiorun = ShellThread(_cmd, _q)
# Check if schedule is set
if self.scheduled_datetime:
_now = datetime.now(timezone.utc)
logger.debug(
"waiting for '{}', now is '{}', total of {} sec left".format(
self.scheduled_datetime.strftime(_datetime_fmt),
_now.strftime(_datetime_fmt),
(self.scheduled_datetime - _now).total_seconds()
)
)
wait_until(self.scheduled_datetime)
self.fiorun.run_shell()
_raw = []
_start = -1
_end = -1
while self.fiorun.is_alive() or not _q.empty():
while not _q.empty():
# processing
_bb = _q.get(block=False)
if isinstance(_bb, bytes):
_line = _bb.decode('utf-8')
else:
_line = _bb
if _start < 0 and _end < 0 and not _line.startswith("{"):
_time = get_time()
self.results[_time] = {
"error": _line
}
self.eta = -1
self.fiorun.kill_shell()
self.finished = True
return
_current = _line.splitlines()
_raw += _current
for ll in range(len(_raw)):
if _start < 0 and _raw[ll] == "{":
_start = ll
elif _end < 0 and _raw[ll] == "}":
_end = ll
# loop until we have full json
if _end < 0 or _start < 0:
continue
# if start and and found, cut json
(_json, _raw) = _cut(_raw, _start, _end+1)
_start = -1
_end = -1
# Try to parse json
_json = "\n".join(_json)
try:
_json = json.loads(_json)
_timestamp = _json["timestamp"]
self.timeline[_timestamp] = _json["jobs"][0]
# save last values
self.eta_sec = self.timeline[_timestamp]["eta"]
self.elapsed_sec = self.timeline[_timestamp]["elapsed"]
self.testrun = _json
except TypeError as e:
logger.error("ERROR: {}".format(e))
except json.decoder.JSONDecodeError as e:
logger.error("ERROR: {}".format(e))
if not self.eta_sec:
break
sleep(0.1)
# Save status to results dictionary
self.results[get_time(timestamp=self.testrun["timestamp"])] = {
"result": self.testrun,
"timeline": self.timeline
}
self.finished = True
return
def healthcheck(self):
_version = self.fio_version
_binary_path = self._shell_output if self._shell("which fio") else ""
if self._shell("fio --enghelp"):
_ioengines = self._shell_output
_ioengines = _ioengines.replace("\t", "")
_ioengines = _ioengines.splitlines()[1:]
self._shell_output = ""
else:
_ioengines = []
return {
"ready": all((_version, _binary_path, _ioengines)),
"version": _version,
"path": _binary_path,
"ioengines": _ioengines,
"system": self.system,
"release": self.release,
"hostname": self.hostname
}
def status(self):
_running = self.is_alive() and self.eta_sec >= 0
_scheduled = False
_diff = -1
if self.scheduled_datetime:
_now = datetime.now(timezone.utc)
_diff = (self.scheduled_datetime - _now).total_seconds()
if _diff > 0:
_scheduled = True
_s = "running" if _running else "idle"
_s = "scheduled" if _scheduled else _s
_s = "finished" if self.finished else _s
return {
"status": _s,
"progress": self.get_progress()
}
def end_fio(self):
if self.fiorun:
self.fiorun.kill_shell()
# Current run
def percent_done(self):
_total = self.elapsed_sec + self.eta_sec
return float(self.elapsed_sec) / float(_total) * 100.0
def get_progress(self):
return "{:.2f}".format(self.percent_done())
# latest parsed measurements
def get_last_measurements(self):
if self.timeline:
return self.timeline[max(list(self.timeline.keys()))]
else:
return {}
class FioProcessShellRun(object):
stats = {}
results = {}
def __init__(self, init_class=FioProcess):
self.init_class = init_class
self.actions = {
"do_singlerun": self.do_singlerun,
"do_scheduledrun": self.do_scheduledrun,
"get_options": self.get_options,
"get_result": self.get_result,
"get_resultlist": self.get_resultlist
}
self.fio_reset()
@staticmethod
def healthcheck(fio):
hchk = fio.healthcheck()
hchk_str = \
"# fio status: {}\n# {} at {}\n# Engines: {}".format(
"ready" if hchk["ready"] else "fail",
hchk["version"],
hchk["path"],
", ".join(hchk["ioengines"])
)
return hchk, hchk_str
def status(self):
return self.fio.status()
def fio_reset(self):
# Fancy way of handling fio class not even initialized yet
try:
_f = self.fio.finished
_r = self.fio.results
_o = self.fio.get_options()
except AttributeError:
_f = True
_r = None
_o = None
# Check if reset is needed
if not _f:
# No need to reset, fio is either idle or running
return
else:
# extract results if they present
if _r:
self.results.update(_r)
# re-init
_fio = self.init_class()
# Do healthcheck
self.hchk, self.hchk_str = self.healthcheck(_fio)
# restore options if they existed
if _o:
_fio.update_options(_o)
self.fio = _fio
def get_options(self):
_opts = deepcopy(self.fio._fio_options_common)
_opts.update(deepcopy(self.fio._fio_options_seq))
_opts.update(deepcopy(self.fio._fio_options_mix))
return _opts
def do_singlerun(self, options):
# Reset thread if it closed
self.fio_reset()
# Fill options
if "scheduled_to" in options:
# just ignore it
_k = "scheduled_to"
_v = options.pop(_k)
logger.warning("Ignoring option: '{}': '{}'".format(_k, _v))
self.fio.update_options(options)
# Start it
self.fio.start()
return True
def do_scheduledrun(self, options):
# Reset thread if it closed
self.fio_reset()
# Handle scheduled time
if "scheduled_to" not in options:
# required parameter not set
raise CheckerException("Parameter missing: 'scheduled_to'")
else:
# set time and get rid of it from options
_time = options.pop("scheduled_to")
self.fio.scheduled_datetime = datetime.strptime(
_time,
_datetime_fmt
)
# Fill options
self.fio.update_options(options)
# Start it
self.fio.start()
return True
def _get_result_object(self, obj_name, time):
if time in self.results:
if obj_name in self.results[time]:
return self.results[time][obj_name]
elif "error" in self.results[time]:
return self.results[time]["error"]
else:
return {
"error": "Empty {} for '{}'".format(obj_name, time)
}
else:
return {
"error": "Result not found for '{}'".format(time)
}
def _update_results(self):
# Update only in case of completed thread
if self.fio.finished:
_r_local = list(self.results.keys())
_r_fio = list(self.fio.results.keys())
for _r in _r_fio:
if _r not in _r_local:
self.results[_r] = self.fio.results.pop(_r)
def get_result(self, time):
self._update_results()
return self._get_result_object('result', time)
def get_result_timeline(self, time):
self._update_results()
return self._get_result_object('timeline', time)
# reporting
def get_resultlist(self):
self._update_results()
return list(self.results.keys())
def __call__(self):
if not self.fio.is_alive() and not self.fio.finished:
self.fio.start()
while self.fio.is_alive() and self.fio.eta_sec >= 0:
sleep(0.2)
self.stats = self.fio.get_last_measurements()
_r = self.stats.get('read', {})
_w = self.stats.get('write', {})
_r_bw = _r.get('bw_bytes', -1)
_r_iops = _r.get('iops', -1)
_w_bw = _w.get('bw_bytes', -1)
_w_iops = _w.get('iops', -1)
_s = self.fio.status()
if _s["status"] == "scheduled":
_t = self.fio.scheduled_datetime
_n = datetime.now(timezone.utc)
_delta = (_t - _n).total_seconds()
print(
"{}: waiting for '{}'; now '{}'; {} sec left".format(
_s["status"],
_t.strftime(_datetime_fmt),
_n.strftime(_datetime_fmt),
_delta
)
)
else:
stats = "{}: {:>7}% ({}/{}) " \
"(BW/IOPS: " \
"Read {:>9.2f} MB/{:>9.2f} " \
"Write {:>9.2f} MB/{:>9.2f})".format(
_s["status"],
_s["progress"],
self.fio.elapsed_sec,
self.fio.eta_sec,
_r_bw / 1024 / 1024,
_r_iops,
_w_bw / 1024 / 1024,
_w_iops
)
print(stats)
self.fio.end_fio()
if __name__ == '__main__':
# Debug shell to test FioProcessShellRun
_shell = FioProcessShellRun()
_opts = _shell.get_options()
_opts["readwrite"] = "read"
_opts["ramp_time"] = "1s"
_opts["runtime"] = "5s"
_opts["scheduled_to"] = "11/13/2021, 23:03:30+0000"
_shell.do_scheduledrun(_opts)
_shell()
_times = _shell.get_resultlist()
print("# results:\n{}".format("\n".join(_times)))
# print(
# "##### Dumping results\n{}".format(
# json.dumps(_shell.get_result(_times[0]), indent=2)
# )
# )
_shell.fio_reset()
_opts = _shell.get_options()
_opts["readwrite"] = "read"
_opts["ramp_time"] = "1s"
_opts["runtime"] = "10s"
_opts["scheduled_to"] = "11/13/2021, 23:04:20+0000"
_shell.do_scheduledrun(_opts)
_shell()
_times = _shell.get_resultlist()
print("# results:\n{}".format("\n".join(_times)))