Ceph report fixes and Ceph bench beta 0.1
- Ceph stats collection
- Updated Ceph results averages calculations
Fixes:
- Fixed huge PG dump copying >30MB jsons
- Fixes for the fio-runner constants
Related-PROD: PROD-36669
Change-Id: Id8e250f626dfdaecc12ad005b61d03a21c9e6c4e
diff --git a/cfg_checker/modules/ceph/__init__.py b/cfg_checker/modules/ceph/__init__.py
index f9bf3ca..dd483cf 100644
--- a/cfg_checker/modules/ceph/__init__.py
+++ b/cfg_checker/modules/ceph/__init__.py
@@ -1,4 +1,5 @@
from cfg_checker.agent.fio_runner import get_fio_options
+from cfg_checker.agent.fio_runner import seq_modes, mix_modes
from cfg_checker.common import logger_cli
from cfg_checker.common.settings import ENV_TYPE_KUBE
from cfg_checker.helpers import args_utils
@@ -29,6 +30,11 @@
# else:
# return _class
+def _get_param_and_log(arg, param_str):
+ _value = args_utils.get_arg(arg, param_str)
+ logger_cli.info(" {}={}".format(param_str, _value))
+ return _value
+
def init_parser(_parser):
# network subparser
@@ -107,6 +113,56 @@
metavar="dump_results", default="/tmp",
help="Dump result after each test run to use them later"
)
+ ceph_bench_parser.add_argument(
+ '--name',
+ metavar="name", default="cephbench",
+ help="Dump result after each test run to use them later"
+ )
+ ceph_bench_parser.add_argument(
+ '--bs',
+ metavar="blocksize", default="16k",
+ help="Block size for single run"
+ )
+ ceph_bench_parser.add_argument(
+ '--iodepth',
+ metavar="iodepth", default="16",
+ help="IO Depth for single run"
+ )
+ ceph_bench_parser.add_argument(
+ '--size',
+ metavar="size", default="10G",
+ help="Persistent volume size (M, G)"
+ )
+ ceph_bench_parser.add_argument(
+ '--readwrite',
+ metavar="readwrite", default="randrw",
+ help="Test mode for single run"
+ )
+ ceph_bench_parser.add_argument(
+ '--rwmixread',
+ metavar="rwmixread", default="50",
+ help="Percent of read in randon mixed mode (randrw)"
+ )
+ ceph_bench_parser.add_argument(
+ '--ramp-time',
+ metavar="ramp_time", default="5s",
+ help="Warmup time before test"
+ )
+ ceph_bench_parser.add_argument(
+ '--runtime',
+ metavar="runtime", default="60s",
+ help="Time based test run longevity"
+ )
+ ceph_bench_parser.add_argument(
+ '--ioengine',
+ metavar="ioengine", default="libaio",
+ help="IO Engine used by fio. See eng-help output in fio for list"
+ )
+ ceph_bench_parser.add_argument(
+ '--offset-increment',
+ metavar="offset_increment", default="500M",
+ help="IO Engine used by fio. See eng-help output in fio for list"
+ )
return _parser
@@ -183,10 +239,12 @@
# Prepare the tasks and do synced testrun or a single one
logger_cli.info("# Initializing ceph benchmark module")
args_utils.check_supported_env(ENV_TYPE_KUBE, args, config)
+ # Report filename
_filename = args_utils.get_arg(args, 'html')
# agents count option
config.bench_agent_count = args_utils.get_arg(args, "agents")
logger_cli.info("-> using {} agents".format(config.bench_agent_count))
+ # Cleaning option
config.no_cleaning_after_benchmark = args_utils.get_arg(args, "no_cleanup")
# storage class
_storage_class = args_utils.get_arg(args, "storage_class")
@@ -204,15 +262,38 @@
)
config.bench_results_dump_path = _dump_path
# Task files or options
+ _opts = get_fio_options()
_task_file = args_utils.get_arg(args, "task_file", nofail=True)
if not _task_file:
- logger_cli.info("-> running single run")
+ logger_cli.info("-> Running single benchmark run")
config.bench_mode = "single"
+ # Updating _opts from arguments
+ _params = [
+ "bs",
+ "iodepth",
+ "size",
+ "readwrite",
+ "ramp_time",
+ "runtime",
+ "ioengine"
+ ]
+ for _p in _params:
+ _opts[_p] = _get_param_and_log(args, _p)
+ if _opts["readwrite"] in seq_modes:
+ _p = "offset_increment"
+ _opts[_p] = _get_param_and_log(args, _p)
+ elif _opts["readwrite"] in mix_modes:
+ _p = "rwmixread"
+ _opts[_p] = _get_param_and_log(args, _p)
else:
logger_cli.info("-> running with tasks from '{}'".format(_task_file))
config.bench_task_file = _task_file
config.bench_mode = "tasks"
- _opts = get_fio_options()
+ config.bench_name = args_utils.get_arg(args, "name")
+ _opts["name"] = config.bench_name
+ logger_cli.info(
+ "# Using '{}' as ceph bench jobs name".format(_opts["name"])
+ )
logger_cli.debug("... default/selected options for fio:")
for _k in _opts.keys():
# TODO: Update options for single run
@@ -223,6 +304,8 @@
# init the Bench class
ceph_bench = bench.KubeCephBench(config)
ceph_bench.set_ceph_info_class(ceph_info)
+ # Preload previous results for this name
+ ceph_bench.preload_results()
# Do the testrun
ceph_bench.prepare_agents(_opts)
ceph_bench.wait_ceph_cooldown()
diff --git a/cfg_checker/modules/ceph/bench.py b/cfg_checker/modules/ceph/bench.py
index d804f4a..2eedcfb 100644
--- a/cfg_checker/modules/ceph/bench.py
+++ b/cfg_checker/modules/ceph/bench.py
@@ -3,7 +3,6 @@
import json
from datetime import datetime, timedelta, timezone
-from time import sleep
from cfg_checker.common import logger_cli
from cfg_checker.common.decorators import retry
@@ -18,6 +17,9 @@
from cfg_checker.agent.fio_runner import _get_seconds, _datetime_fmt
+_file_datetime_fmt = "%m%d%Y%H%M%S%z"
+
+
def _reformat_timestr(_str, _chars=["/", ",", " ", ":", "+"], _tchar=""):
_new = ""
for _c in _str:
@@ -76,6 +78,7 @@
self.storage_class = config.bench_storage_class
self.results_dump_path = config.bench_results_dump_path
+ self.bench_name = config.bench_name
self.agent_pods = []
self.services = []
# By default,
@@ -138,7 +141,8 @@
for idx in range(self.agent_count):
# create pvc/pv and pod
logger_cli.info("-> creating agent '{:02}'".format(idx))
- _agent, _pv, _pvc = self.master.prepare_benchmark_agent(
+ # _agent, _pv, _pvc = self.master.prepare_benchmark_agent(
+ _agent, _pvc = self.master.prepare_benchmark_agent(
idx,
os.path.split(options["filename"])[0],
self.storage_class,
@@ -147,7 +151,7 @@
)
# save it to lists
self.agent_pods.append(_agent)
- self.add_for_deletion(_pv, "pv")
+ # self.add_for_deletion(_pv, "pv")
self.add_for_deletion(_pvc, "pvc")
self.add_for_deletion(_agent, "pod")
@@ -170,7 +174,7 @@
logger_cli.info("-> Done creating agents")
# TODO: Update after implementing pooled task sending
- self.scheduled_delay = self.agent_count * 6
+ self.scheduled_delay = self.agent_count * 10
logger_cli.info(
"-> Schedule delay set to {} sec".format(self.scheduled_delay)
)
@@ -277,8 +281,10 @@
def _get_next_scheduled_time(self):
_now = datetime.now(timezone.utc)
logger_cli.info("-> time is '{}'".format(_now.strftime(_datetime_fmt)))
- _time = _now + timedelta(seconds=self.scheduled_delay)
- _str_time = _time.strftime(_datetime_fmt)
+ self.next_scheduled_time = _now + timedelta(
+ seconds=self.scheduled_delay
+ )
+ _str_time = self.next_scheduled_time.strftime(_datetime_fmt)
logger_cli.info(
"-> next benchmark scheduled to '{}'".format(_str_time)
)
@@ -308,10 +314,10 @@
_ramptime = _get_seconds(options["ramp_time"])
# Sum up all timings that we must wait and double it
_timeout = (self.scheduled_delay + _runtime + _ramptime) * 2
+ _start = self.next_scheduled_time
_end = datetime.now(timezone.utc) + timedelta(seconds=_timeout)
while True:
# Print status
- # TODO: do pooled status get
_sts = self.get_agents_status()
diff = (_end - datetime.now(timezone.utc)).total_seconds()
logger_cli.info("-> {:.2f}s left. Agent status:".format(diff))
@@ -323,6 +329,13 @@
_status["progress"]
)
)
+ # Get Ceph status if _start time passed
+ _elapsed = (datetime.now(timezone.utc) - _start).total_seconds()
+ if _elapsed > 0:
+ logger_cli.info("-> {:.2f}s elapsed".format(_elapsed))
+ self.results[options["scheduled_to"]]["ceph"][_elapsed] = \
+ self.ceph_info.get_cluster_status()
+ # Check if agents finished
finished = [True for _s in _sts.values()
if _s["status"] == 'finished']
_fcnt = len(finished)
@@ -338,12 +351,6 @@
if diff < 0:
logger_cli.info("-> Timed out waiting for agents to finish")
return False
- else:
- logger_cli.info("-> Sleeping for {:.2f}s".format(2))
- sleep(2)
- if diff <= 0.1:
- logger_cli.info("-> Timed out waiting for agents to finish")
- return False
def _do_testrun(self, options):
# send single to agent
@@ -353,9 +360,18 @@
if not self.track_benchmark(options):
return False
else:
- logger_cli.info("-> Finished testrun")
+ logger_cli.info("-> Finished testrun. Collecting results...")
# Get results for each agent
- self.collect_results(options)
+ self.collect_results()
+ logger_cli.info("-> Calculating totals and averages")
+ self.calculate_totals()
+ self.calculate_ceph_stats()
+ logger_cli.info("-> Dumping results")
+ for _time, _d in self.results.items():
+ self.dump_result(
+ self._get_dump_filename(_time),
+ _d
+ )
return True
def wait_ceph_cooldown(self):
@@ -411,6 +427,7 @@
self.results[_sch_time] = {
"input_options": options,
"agents": {},
+ "ceph": {},
"osd_df_before": _osd_df_before
}
if not self._do_testrun(options):
@@ -429,6 +446,7 @@
self.results[_sch_time] = {
"input_options": options,
"agents": {},
+ "ceph": {},
"osd_df_before": _osd_df_before
}
if not self._do_testrun(options):
@@ -476,63 +494,50 @@
return
- def collect_results(self, options):
- _sch_time = options["scheduled_to"]
+ def collect_results(self):
logger_cli.info("# Collecting results")
# query agents for results
_agents = self.get_agents_resultlist()
- # Syntax shortcut
- _ar = self.results[_sch_time]["agents"]
-
for _agent, _l in _agents.items():
- # Create a syntax shortcut
- if _agent not in _ar:
- _ar[_agent] = {}
- _arl = _ar[_agent]
# Check if we already have this locally
for _time in _l["resultlist"]:
- _filename = self._get_dump_filename(_sch_time, _agent, options)
- if os.path.exists(_filename):
- # There is a file already for this task
- # Check if we need to load it
- if _sch_time in _arl:
- logger_cli.info(
- "-> Skipped already processed result '{}'".format(
- _filename
- )
- )
- else:
- # Load previously dumped result from disk
- logger_cli.info(
- "-> Loading already present result '{}'".format(
- _filename
- )
- )
- _arl[_sch_time] = self.load_dumped_result(_filename)
- else:
- # Load result add it locally and dump it
+ # There is a file already for this task/time
+ # Check if we need to load it
+ if _time not in self.results:
+ # Some older results found
+ # do not process them
+ logger_cli.info(
+ "-> Skipped old results for '{}'".format(_time)
+ )
+ continue
+ elif _agent not in self.results[_time]["agents"]:
+ # Load result add it locally
logger_cli.info(
"-> Getting results for '{}' from '{}'".format(
- _sch_time,
+ _time,
_agent
)
)
_r = self.get_result_from_agent(_agent, _time)
- # Important to switch from result status time
- # to scheduled time
- _arl[_sch_time] = _r[_time]
- # Dump collected result
- self.dump_result(_filename, _arl[_sch_time])
- return
+ self.results[_time]["agents"][_agent] = _r[_time]
+ else:
+ # Should never happen, actually
+ logger_cli.info(
+ "-> Skipped loaded result for '{}' from '{}'".format(
+ _time,
+ _agent
+ )
+ )
- def _get_dump_filename(self, _time, agent, options):
- _dirname = _reformat_timestr(_time)
+ def _get_dump_filename(self, _time):
+ _r = self.results[_time]
+ _dirname = _r["input_options"]["name"]
_filename = "-".join([
- _dirname,
- agent,
- options["readwrite"],
- options["bs"],
- str(options["iodepth"]),
+ _reformat_timestr(_time),
+ "{:02}".format(len(_r["agents"])),
+ _r["input_options"]["readwrite"],
+ _r["input_options"]["bs"],
+ str(_r["input_options"]["iodepth"]),
]) + ".json"
return os.path.join(
self.results_dump_path,
@@ -540,6 +545,45 @@
_filename
)
+ def preload_results(self):
+ logger_cli.info(
+ "# Preloading results for '{}'".format(self.bench_name)
+ )
+ # get all dirs in folder
+ _p = self.results_dump_path
+ if not os.path.isdir(_p):
+ logger_cli.warn(
+ "WARNING: Dump path is not a folder '{}'".format(_p)
+ )
+ return
+ for path, dirs, files in os.walk(_p):
+ if path == os.path.join(_p, self.bench_name):
+ logger_cli.info("-> Folder found '{}'".format(path))
+ for _fname in files:
+ logger_cli.debug("... processing '{}'".format(_fname))
+ _ext = _fname.split('.')[-1]
+ if _ext != "json":
+ logger_cli.info(
+ "-> Extension invalid '{}', "
+ "'json' is expected".format(_ext)
+ )
+ continue
+ # get time from filename
+ # Ugly, but works
+ _t = _fname.split('-')[0]
+ _str_time = _t[:14] + "+" + _t[14:]
+ _t = datetime.strptime(_str_time, _file_datetime_fmt)
+ _time = _t.strftime(_datetime_fmt)
+ self.results[_time] = self.load_dumped_result(
+ os.path.join(path, _fname)
+ )
+ logger_cli.info(
+ "-> Loaded '{}' as '{}'".format(
+ _fname,
+ _time
+ )
+ )
+
def dump_result(self, filename, data):
# Function dumps all available results as jsons to the given path
# overwriting if needed
@@ -594,7 +638,7 @@
_w_iops = 0
for _a, _d in data["agents"].items():
# Hardcoded number of jobs param :(
- _j = _d[_time]["jobs"][0]
+ _j = _d["jobs"][0]
_r_bw += _j["read"]["bw_bytes"]
_r_avglat += [_j["read"]["lat_ns"]["mean"]]
_r_iops += _j["read"]["iops"]
@@ -623,6 +667,64 @@
(sum(_w_avglat) / len(_w_avglat)) / 1000
_totals["write_iops"] = _w_iops
+ def calculate_ceph_stats(self):
+ # func to get values as lists
+ def _as_list(key, stats):
+ _list = []
+ for _v in stats.values():
+ if key in _v:
+ _list += [_v[key]]
+ else:
+ _list += [0]
+ return _list
+
+ def _perc(n, m):
+ if not n:
+ return 0
+ elif not m:
+ return 0
+ else:
+ return (n / m) * 100
+
+ _stats = {}
+ for _time, data in self.results.items():
+ if "ceph" not in data:
+ logger_cli.warning(
+ "WARNING: Ceph stats raw data not found in results"
+ )
+ continue
+ if "ceph_stats" not in data:
+ data["ceph_stats"] = {}
+ else:
+ continue
+ # Copy pool stats data
+ for _e, _d in data["ceph"].items():
+ _stats[_e] = _d["pgmap"]
+ # Maximums
+ m_r_bytes = max(_as_list("read_bytes_sec", _stats))
+ m_w_bytes = max(_as_list("write_bytes_sec", _stats))
+ m_r_iops = max(_as_list("read_op_per_sec", _stats))
+ m_w_iops = max(_as_list("write_op_per_sec", _stats))
+ # Replace ceph with shorter data
+ data["ceph"] = {
+ "max_read_bytes_sec": m_r_bytes,
+ "max_write_bytes_sec": m_w_bytes,
+ "max_read_iops_sec": m_r_iops,
+ "max_write_iops_sec": m_w_iops,
+ "stats": _stats
+ }
+ # Calculate %% values for barchart
+ for _e, _d in data["ceph"]["stats"].items():
+ _d["read_bytes_sec_perc"] = \
+ _perc(_d.get("read_bytes_sec", 0), m_r_bytes)
+ _d["write_bytes_sec_perc"] = \
+ _perc(_d.get("write_bytes_sec", 0), m_w_bytes)
+ _d["read_op_per_sec_perc"] = \
+ _perc(_d.get("read_op_per_sec", 0), m_r_iops)
+ _d["write_op_per_sec_perc"] = \
+ _perc(_d.get("write_op_per_sec", 0), m_w_iops)
+ return
+
# Create report
def create_report(self, filename):
"""
@@ -635,7 +737,6 @@
reporter.HTMLCephBench(self),
filename
)
- self.calculate_totals()
_report(
{
"results": self.results,
diff --git a/cfg_checker/modules/ceph/info.py b/cfg_checker/modules/ceph/info.py
index 56e250e..f6d5758 100644
--- a/cfg_checker/modules/ceph/info.py
+++ b/cfg_checker/modules/ceph/info.py
@@ -1,4 +1,8 @@
+import base64
import json
+import os
+import tarfile
+import io
from time import sleep
@@ -59,6 +63,10 @@
# TODO: Consider filtering out or prepare data for the table
_osd = _d.pop("osd_name")
_node_name = _d.pop("node_name")
+ # Additional check for empty data
+ if not _d:
+ self.ceph_info['ceph_health']['latest'][_n] = {}
+ continue
_date = sorted(_d.keys(), reverse=True)[0]
self.ceph_info['ceph_health']['date'] = _date
self.ceph_info['ceph_health']['latest'][_n] = _d[_date]
@@ -238,28 +246,65 @@
self.cluster_info = {}
self.ceph_version = self.get_ceph_cluster_config()
- def _safe_tools_cmd(self, cmd, expect_output=True):
+ def _safe_tools_cmd(self, cmd_str, expect_output=True):
_r = self.master.exec_cmd_on_target_pod(
self.pod_name,
self.ceph_ns,
- cmd
+ cmd_str
)
if expect_output and not _r:
- logger_cli.debug("... got empty output for '{}'".format(cmd))
+ logger_cli.debug("... got empty output for '{}'".format(cmd_str))
elif not expect_output and _r:
logger_cli.warning(
"WARNING: Unexpected output for '{}':\n"
- "===== Start\n{}\n===== End".format(cmd, _r)
+ "===== Start\n{}\n===== End".format(cmd_str, _r)
)
return _r
- def _safe_get_cmd_output_as_json(self, cmd):
- _buf = self._safe_tools_cmd(cmd)
+ def _safe_tools_cmd_zipped_output(self, cmd_str):
+ # temp file
+ _tmp_path = "/tmp"
+ _filename = "checker_cmd_output"
+ _tar_path = os.path.join(_tmp_path, "checker_cmd.tgz")
+ _path = os.path.join(_tmp_path, _filename)
+
+ # Run original cmd with redirect
+ _cmd = [cmd_str, "-o", _path]
+ self._safe_tools_cmd(" ".join(_cmd), expect_output=False)
+ # zip it and base64 encode
+ _cmd = ["tar", "-zcvf", _tar_path, _path]
+ self._safe_tools_cmd(" ".join(_cmd))
+ _b64 = self._safe_tools_cmd("base64 " + _tar_path)
+ # decode and decompress
+ _io = io.BytesIO(base64.standard_b64decode(_b64))
+ _json = ""
+ with tarfile.open(fileobj=_io) as _tar:
+ _tar_item = _tar.extractfile(_tar.getmembers()[0])
+ _json = _tar_item.read()
+ # cleanup
+ self._safe_tools_cmd("rm -f " + _path)
+ self._safe_tools_cmd("rm -f " + _tar_path)
+ return _json
+
+ def _safe_get_cmd_output_as_json(self, cmd, zipped=False):
+ if zipped:
+ _buf = self._safe_tools_cmd_zipped_output(cmd)
+ else:
+ _buf = self._safe_tools_cmd(cmd)
try:
return json.loads(_buf)
- except ValueError:
+ except ValueError as e:
+ _out = ""
+ if len(_buf) > 512:
+ _out = _buf[:512]
+ _out += "..."
+ else:
+ _out = _buf
logger_cli.error(
- "\nERROR: failed to parse json: '{}'".format(_buf)
+ "\nERROR: failed to parse json: '{}'. Data: '{}'".format(
+ e,
+ _out
+ )
)
return _buf
@@ -365,7 +410,10 @@
return self._safe_get_cmd_output_as_json("ceph df -f json")
def get_ceph_pg_dump(self):
- return self._safe_get_cmd_output_as_json("ceph pg dump -f json")
+ return self._safe_get_cmd_output_as_json(
+ "ceph pg dump -f json",
+ zipped=True
+ )
def get_ceph_osd_df(self):
return self._safe_get_cmd_output_as_json("ceph osd df -f json")