mcp-agent mode for mcp-checker with web-info and REST API
New:
- agent index page serving on 0.0.0.0:8765
- REST API with modular approach to modules
- 'fio' module working via thread-safe Thread able to return
real-time info on its status
- 'fio' module scheduled run option
- ability to preserve multiple testrun results while active
- dockerfile for agent image
Fixed:
- Network report fixes to work on Kube envs
- Fixed function for running commands inside daemonset pods
Related-PROD: PROD-36669
Change-Id: I57e73001247af9187680bfc5744590eef219d93c
diff --git a/cfg_checker/agent/fio_runner.py b/cfg_checker/agent/fio_runner.py
new file mode 100644
index 0000000..f1cecab
--- /dev/null
+++ b/cfg_checker/agent/fio_runner.py
@@ -0,0 +1,601 @@
+import json
+import os
+import queue
+import signal
+import subprocess
+
+
+from copy import deepcopy
+from datetime import datetime, timedelta
+from platform import system, release, node
+from threading import Thread
+import threading
+from time import sleep
+
+
+from cfg_checker.common.exception import CheckerException
+from cfg_checker.common.other import piped_shell
+from cfg_checker.common.log import logger
+
+
+_datetime_fmt = "%m/%d/%Y, %H:%M:%S"
+
+
+def output_reader(_stdout, outq):
+ for line in iter(_stdout.readline, ''):
+ outq.put(line)
+
+
+def _o(option, param, suffix=""):
+ return "--{}={}{}".format(option, param, suffix)
+
+
+def get_time(timestamp=None):
+ if not timestamp:
+ _t = datetime.now()
+ else:
+ _t = datetime.fromtimestamp(timestamp)
+ return _t.strftime(_datetime_fmt)
+
+
+def _get_seconds(value):
+ # assume that we have symbol at the end
+ _suffix = value[-1]
+ if _suffix == 's':
+ return int(value[:-1])
+ elif _suffix == 'm':
+ return int(value[:-1])*60
+ elif _suffix == 'h':
+ return int(value[:-1])*60*60
+ else:
+ return -1
+
+
+def wait_until(end_datetime):
+ while True:
+ diff = (end_datetime - datetime.now()).total_seconds()
+ # In case end_datetime was in past to begin with
+ if diff < 0:
+ return
+ sleep(diff/2)
+ if diff <= 0.1:
+ return
+
+
+class ShellThread(object):
+ def __init__(self, cmd, queue):
+ self.cmd = cmd
+ self.queue = queue
+ self._p = None
+ self.timeout = 15
+ self.output = []
+
+ def run_shell(self):
+ # Start
+ _cmd = " ".join(self.cmd)
+ logger.debug("... {}".format(_cmd))
+ self._p = subprocess.Popen(
+ _cmd,
+ shell=True,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ env={"PYTHONUNBUFFERED": "1"},
+ universal_newlines=True,
+ bufsize=1
+ )
+ self._t = threading.Thread(
+ target=output_reader,
+ args=(self._p.stdout, self.queue)
+ )
+ self._t.start()
+ if not self.wait_started():
+ self.kill_shell()
+
+ def is_alive(self):
+ if not self._p.poll():
+ return True
+ else:
+ return False
+
+ def wait_started(self):
+ while True:
+ if not self.queue.empty():
+ break
+ else:
+ logger.debug("... {} sec".format(self.timeout))
+ sleep(1)
+ self.timeout -= 1
+ if not self.timeout:
+ logger.debug(
+ "...timed out after {} sec".format(str(self.timeout))
+ )
+ return False
+ logger.debug("... got first fio output")
+ return True
+
+ def kill_shell(self):
+ # Run the poll
+ if not self._p.poll():
+ self._p.send_signal(signal.SIGINT)
+ self.get_output()
+
+ def get_output(self):
+ while True:
+ try:
+ line = self.queue.get(block=False)
+ line = str(line) if isinstance(line, bytes) else line
+ self.output.append(line)
+ except queue.Empty:
+ return self.output
+ return None
+
+
+class FioProcess(Thread):
+ # init vars for status
+ _fio_options_list = [
+ "--time_based",
+ "--output-format=json+",
+ "--eta=always"
+ ]
+ _fio_options_seq_list = [
+ "--thread"
+ ]
+
+ _fio_options_common = {
+ "name": "agent_run",
+ "filename": "/tmp/testfile",
+ "status-interval": "500ms",
+ "randrepeat": 0,
+ "verify": 0,
+ "direct": 1,
+ "gtod_reduce": 0,
+ "bs": "32k",
+ "iodepth": 16,
+ "size": "10G",
+ "readwrite": "randrw",
+ "ramp_time": "5s",
+ "runtime": "30s",
+ "ioengine": "libaio"
+ }
+
+ _fio_options_seq = {
+ "numjobs": 1,
+ "offset_increment": "500M"
+ }
+ _fio_options_mix = {
+ "rwmixread": 50
+ }
+
+ eta_sec = 0
+ total_time_sec = 0
+ elapsed_sec = 0
+ testrun = {}
+
+ mount_point = "/tmp"
+ filename = "testfile"
+
+ # test modes: 'randread', 'randwrite', 'read', 'write', 'randrw'
+ mode = "randrw"
+ _seq_modes = ['read', 'write']
+ _mix_modes = ['randrw']
+ _rand_modes = ['randread', 'randwrite']
+
+ # results
+ results = {}
+
+ def _shell(self, cmd):
+ self._code, self._shell_output = piped_shell(cmd, code=True)
+ if self._code:
+ logger.error(
+ "# Shell error for '{}': [{}] {}".format(
+ cmd,
+ self._code,
+ self._shell_output
+ )
+ )
+ return False
+ else:
+ return True
+
+ def recalc_times(self):
+ _rt = _get_seconds(self._fio_options_common["runtime"])
+ _rup = _get_seconds(self._fio_options_common["ramp_time"])
+ if not _rt:
+ raise CheckerException("invalid 'runtime': '{}'".format(_rt))
+ elif not _rup:
+ raise CheckerException("invalid 'ramp_time': '{}'".format(_rt))
+
+ self.total_time_sec = _rt + _rup
+ self.eta_sec = self.total_time_sec
+
+ def __init__(self):
+ Thread.__init__(self)
+ logger.info("fio thread initialized")
+ # save system
+ self.system = system()
+ self.release = release()
+ self.hostname = node()
+ # create a clear var for last shell output
+ self._shell_output = ""
+ # prepare params
+ self.recalc_times()
+ # prepare the fio
+ self.fio_version = "unknown"
+ if not self._shell("fio --version"):
+ raise CheckerException(
+ "Error running fio: '{}'".format(self._shell_output)
+ )
+ else:
+ self.fio_version = self._shell_output
+ # all outputs timeline
+ self.timeline = {}
+
+ self._fio_options_common["filename"] = os.path.join(
+ self.mount_point,
+ self.filename
+ )
+
+ if self.system == "Darwin":
+ self._fio_options_common["ioengine"] = "posixaio"
+ # Thread finish marker
+ self.finished = False
+ self.scheduled_datetime = None
+
+ def update_options(self, _dict):
+ # validate keys, do not validate numbers themselves
+ for k, v in _dict.items():
+ if k in self._fio_options_mix:
+ self._fio_options_mix[k] = v
+ elif k in self._fio_options_seq:
+ self._fio_options_seq[k] = v
+ elif k in self._fio_options_common:
+ self._fio_options_common[k] = v
+ else:
+ raise CheckerException(
+ "Error running fio: '{}'".format(self._shell_output)
+ )
+ # recalc
+ self.recalc_times()
+
+ def run(self):
+ def _cut(_list, _s, _e):
+ _new = _list[_s:_e]
+ _pre = _list[:_s]
+ _list = _pre + _list[_e:]
+ return (_new, _list)
+
+ # create a cmd
+ _cmd = ["fio"]
+ _cmd += self._fio_options_list
+ _cmd += [_o(k, v) for k, v in self._fio_options_common.items()]
+
+ if self._fio_options_common["readwrite"] in self._seq_modes:
+ _sq = self._fio_options_seq_list
+ _cmd += _sq + [_o(k, v) for k, v in self._fio_options_seq.items()]
+ elif self._fio_options_common["readwrite"] in self._mix_modes:
+ _cmd += [_o(k, v) for k, v in self._fio_options_mix.items()]
+
+ _q = queue.Queue()
+ self.fiorun = ShellThread(_cmd, _q)
+ # Check if schedule is set
+ if self.scheduled_datetime:
+ logger.debug(
+ "waiting for '{}', now is '{}', total of {} sec left".format(
+ self.scheduled_datetime.strftime(_datetime_fmt),
+ datetime.now().strftime(_datetime_fmt),
+ (self.scheduled_datetime - datetime.now()).total_seconds()
+ )
+ )
+ wait_until(self.scheduled_datetime)
+ self.fiorun.run_shell()
+ _raw = []
+ _start = -1
+ _end = -1
+ while self.fiorun.is_alive() or not _q.empty():
+ while not _q.empty():
+ # processing
+ _bb = _q.get(block=False)
+ if isinstance(_bb, bytes):
+ _line = _bb.decode('utf-8')
+ else:
+ _line = _bb
+ if _start < 0 and _end < 0 and not _line.startswith("{"):
+ _time = get_time()
+ self.result[_time] = {
+ "error": _line
+ }
+ self.eta = -1
+ self.fiorun.kill_shell()
+ return
+ _current = _line.splitlines()
+ _raw += _current
+ for ll in range(len(_raw)):
+ if _start < 0 and _raw[ll] == "{":
+ _start = ll
+ elif _end < 0 and _raw[ll] == "}":
+ _end = ll
+ # loop until we have full json
+ if _end < 0 or _start < 0:
+ continue
+ # if start and and found, cut json
+ (_json, _raw) = _cut(_raw, _start, _end+1)
+ _start = -1
+ _end = -1
+ # Try to parse json
+ _json = "\n".join(_json)
+ try:
+ _json = json.loads(_json)
+ _timestamp = _json["timestamp"]
+ self.timeline[_timestamp] = _json["jobs"][0]
+
+ # save last values
+ self.eta_sec = self.timeline[_timestamp]["eta"]
+ self.elapsed_sec = self.timeline[_timestamp]["elapsed"]
+ self.testrun = _json
+ except TypeError as e:
+ logger.error("ERROR: {}".format(e))
+ except json.decoder.JSONDecodeError as e:
+ logger.error("ERROR: {}".format(e))
+ if not self.eta_sec:
+ break
+ sleep(0.1)
+ # Save status to results dictionary
+ self.results[get_time(timestamp=self.testrun["timestamp"])] = {
+ "result": self.testrun,
+ "timeline": self.timeline
+ }
+ self.finished = True
+ return
+
+ def healthcheck(self):
+ _version = self.fio_version
+ _binary_path = self._shell_output if self._shell("which fio") else ""
+ if self._shell("fio --enghelp"):
+ _ioengines = self._shell_output
+ _ioengines = _ioengines.replace("\t", "")
+ _ioengines = _ioengines.splitlines()[1:]
+ else:
+ _ioengines = []
+
+ return {
+ "ready": all((_version, _binary_path, _ioengines)),
+ "version": _version,
+ "path": _binary_path,
+ "ioengines": _ioengines,
+ "system": self.system,
+ "release": self.release,
+ "hostname": self.hostname
+ }
+
+ def status(self):
+ _running = self.is_alive() and self.eta_sec >= 0
+ _scheduled = False
+ _diff = -1
+ if self.scheduled_datetime:
+ _diff = (self.scheduled_datetime - datetime.now()).total_seconds()
+ if _diff > 0:
+ _scheduled = True
+ _s = "running" if _running else "idle"
+ _s = "scheduled" if _scheduled else _s
+ _s = "finished" if self.finished else _s
+ return {
+ "status": _s,
+ "progress": self.get_progress()
+ }
+
+ def end_fio(self):
+ if self.fiorun:
+ self.fiorun.kill_shell()
+
+ # Current run
+ def percent_done(self):
+ _total = self.elapsed_sec + self.eta_sec
+ return float(self.elapsed_sec) / float(_total) * 100.0
+
+ def get_progress(self):
+ return "{:.2f}".format(self.percent_done())
+
+ # latest parsed measurements
+ def get_last_measurements(self):
+ if self.timeline:
+ return self.timeline[max(list(self.timeline.keys()))]
+ else:
+ return {}
+
+
+class FioProcessShellRun(object):
+ stats = {}
+ results = {}
+
+ def __init__(self, init_class=FioProcess):
+ self.init_class = init_class
+ self.actions = {
+ "do_singlerun": self.do_singlerun,
+ "do_scheduledrun": self.do_scheduledrun,
+ "get_options": self.get_options,
+ "get_result": self.get_result,
+ "get_resultlist": self.get_resultlist
+ }
+ self.fio_reset()
+
+ @staticmethod
+ def healthcheck(fio):
+ hchk = fio.healthcheck()
+ hchk_str = \
+ "# fio status: {}\n# {} at {}\n# Engines: {}".format(
+ "ready" if hchk["ready"] else "fail",
+ hchk["version"],
+ hchk["path"],
+ ", ".join(hchk["ioengines"])
+ )
+ return hchk, hchk_str
+
+ def status(self):
+ return self.fio.status()
+
+ def fio_reset(self):
+ # Fancy way of handling fio class not even initialized yet
+ try:
+ _f = self.fio.finished
+ _r = self.fio.results
+ _o = self.fio.get_options()
+ except AttributeError:
+ _f = True
+ _r = None
+ _o = None
+ # Check if reset is needed
+ if not _f:
+ # No need to reset, fio is either idle or running
+ return
+ else:
+ # extract results if they present
+ if _r:
+ self.results.update(_r)
+ # re-init
+ _fio = self.init_class()
+ # Do healthcheck
+ self.hchk, self.hchk_str = self.healthcheck(_fio)
+ # restore options if they existed
+ if _o:
+ _fio.update_options(_o)
+ self.fio = _fio
+
+ def get_options(self):
+ _opts = deepcopy(self.fio._fio_options_common)
+ _opts.update(deepcopy(self.fio._fio_options_seq))
+ _opts.update(deepcopy(self.fio._fio_options_mix))
+ return _opts
+
+ def do_singlerun(self, options):
+ # Reset thread if it closed
+ self.fio_reset()
+ # Fill options
+ self.fio.update_options(options)
+ # Start it
+ self.fio.start()
+ return True
+
+ def do_scheduledrun(self, options):
+ # Reset thread if it closed
+ self.fio_reset()
+ # Handle scheduled time
+ if "scheduled_to" not in options:
+ # required parameter not set
+ return False
+ else:
+ # set time and get rid of it from options
+ self.fio.scheduled_datetime = options.pop("scheduled_to")
+ # Fill options
+ self.fio.update_options(options)
+ # Start it
+ self.fio.start()
+ return True
+
+ def _get_result_object(self, obj_name, time):
+ if time in self.results:
+ if obj_name in self.results[time]:
+ return self.results[time][obj_name]
+ elif "error" in self.results[time]:
+ return self.results[time]["error"]
+ else:
+ return {
+ "error": "Empty {} for '{}'".format(obj_name, time)
+ }
+ else:
+ return {
+ "error": "Result not found for '{}'".format(time)
+ }
+
+ def _update_results(self):
+ # Update only in case of completed thread
+ if self.fio.finished:
+ _r_local = list(self.results.keys())
+ _r_fio = list(self.fio.results.keys())
+ for _r in _r_fio:
+ if _r not in _r_local:
+ self.results[_r] = self.fio.results.pop(_r)
+
+ def get_result(self, time):
+ self._update_results()
+ return self._get_result_object('result', time)
+
+ def get_result_timeline(self, time):
+ self._update_results()
+ return self._get_result_object('timeline', time)
+
+ # reporting
+ def get_resultlist(self):
+ self._update_results()
+ return list(self.results.keys())
+
+ def __call__(self):
+ if not self.fio.is_alive() and not self.fio.finished:
+ self.fio.start()
+
+ while self.fio.is_alive() and self.fio.eta_sec >= 0:
+ sleep(0.2)
+ self.stats = self.fio.get_last_measurements()
+
+ _r = self.stats.get('read', {})
+ _w = self.stats.get('write', {})
+
+ _r_bw = _r.get('bw_bytes', -1)
+ _r_iops = _r.get('iops', -1)
+ _w_bw = _w.get('bw_bytes', -1)
+ _w_iops = _w.get('iops', -1)
+ _s = self.fio.status()
+ if _s["status"] == "scheduled":
+ _t = self.fio.scheduled_datetime
+ _n = datetime.now()
+ _delta = (_t - _n).total_seconds()
+ print(
+ "waiting for '{}'; now '{}'; {} sec left".format(
+ _t.strftime(_datetime_fmt),
+ _n.strftime(_datetime_fmt),
+ _delta
+ )
+ )
+ else:
+ stats = "{}: {:>7}% ({}/{}) " \
+ "(BW/IOPS: " \
+ "Read {:>9.2f} MB/{:>9.2f} " \
+ "Write {:>9.2f} MB/{:>9.2f})".format(
+ _s["status"],
+ _s["progress"],
+ self.fio.elapsed_sec,
+ self.fio.eta_sec,
+ _r_bw / 1024 / 1024,
+ _r_iops,
+ _w_bw / 1024 / 1024,
+ _w_iops
+ )
+ print(stats)
+ self.fio.end_fio()
+
+
+if __name__ == '__main__':
+ # Debug shell to test FioProcessShellRun
+ _shell = FioProcessShellRun()
+ _opts = _shell.get_options()
+ _opts["readwrite"] = "read"
+ _opts["ramp_time"] = "1s"
+ _opts["runtime"] = "5s"
+ _shell.do_singlerun(_opts)
+ _shell()
+ _times = _shell.get_resultlist()
+ print("# results:\n{}".format("\n".join(_times)))
+ # print(
+ # "##### Dumping results\n{}".format(
+ # json.dumps(_shell.get_result(_times[0]), indent=2)
+ # )
+ # )
+ _shell.fio_reset()
+ _opts = _shell.get_options()
+ _opts["readwrite"] = "read"
+ _opts["ramp_time"] = "1s"
+ _opts["runtime"] = "10s"
+ _opts["scheduled_to"] = datetime.now() + timedelta(seconds=12)
+ _shell.do_scheduledrun(_opts)
+ _shell()
+ _times = _shell.get_resultlist()
+ print("# results:\n{}".format("\n".join(_times)))