Cfg-checker ceph benchmark & info updates and fixes
- Added collecting Ceph global stats while running benchmark
- Added collecting osd pg dump data
- Added page with active OSD nodes stats
- --report-only option, ceph info still collected
Fixes:
- fio-runner uses scheduled time when reporting errors
- proper ceph pv creation
- updated retry decorator timeouts for overloaded envs
- calculated volume size creation with size*1.3
- Proper maximum values indication
Related-PROD: PROD-36669
Change-Id: Ic518ddbb2ca0915b550e981d0b0fc7084000aa04
diff --git a/cfg_checker/agent/fio_runner.py b/cfg_checker/agent/fio_runner.py
index 29173ed..db012ac 100644
--- a/cfg_checker/agent/fio_runner.py
+++ b/cfg_checker/agent/fio_runner.py
@@ -326,8 +326,7 @@
else:
_line = _bb
if _start < 0 and _end < 0 and not _line.startswith("{"):
- _time = get_time()
- self.results[_time] = {
+ self.results[self.testrun_starttime] = {
"error": _line
}
self.eta = -1
diff --git a/cfg_checker/common/decorators.py b/cfg_checker/common/decorators.py
index eed3fba..d83e469 100644
--- a/cfg_checker/common/decorators.py
+++ b/cfg_checker/common/decorators.py
@@ -6,7 +6,7 @@
from cfg_checker.common import logger, logger_cli
-def retry(exceptions, total_tries=4, initial_wait=0.5, backoff_factor=2):
+def retry(exceptions, total_tries=5, initial_wait=0.5, backoff_factor=2):
"""
calling the decorated function applying an exponential backoff.
Args:
diff --git a/cfg_checker/common/kube_utils.py b/cfg_checker/common/kube_utils.py
index 3e15095..f8c3469 100644
--- a/cfg_checker/common/kube_utils.py
+++ b/cfg_checker/common/kube_utils.py
@@ -374,7 +374,7 @@
return _pods
- @retry(ApiException, initial_wait=5)
+ @retry(ApiException, initial_wait=10)
def exec_on_target_pod(
self,
cmd,
diff --git a/cfg_checker/modules/ceph/__init__.py b/cfg_checker/modules/ceph/__init__.py
index dd483cf..eee01ce 100644
--- a/cfg_checker/modules/ceph/__init__.py
+++ b/cfg_checker/modules/ceph/__init__.py
@@ -109,8 +109,13 @@
help="Cleanup resources related to benchmark"
)
ceph_bench_parser.add_argument(
+ '--report-only',
+ action="store_true", default=False,
+ help="Just create report using files in folder"
+ )
+ ceph_bench_parser.add_argument(
'--dump-path',
- metavar="dump_results", default="/tmp",
+ metavar="dump_results",
help="Dump result after each test run to use them later"
)
ceph_bench_parser.add_argument(
@@ -217,6 +222,7 @@
# Ceph Benchmark using multiple pods
# if only cleanup needed do it and exit
_cleanup_only = args_utils.get_arg(args, 'cleanup_only')
+ _report_only = args_utils.get_arg(args, 'report_only')
config.resource_prefix = "cfgagent"
if _cleanup_only:
# Do forced resource cleanup and exit
@@ -232,15 +238,57 @@
ceph_bench.cleanup()
return
+ # dump results options
+ _dump_path = args_utils.get_arg(args, "dump_path")
+ if _dump_path:
+ logger_cli.info("# Results will be dumped to '{}'".format(_dump_path))
+ config.bench_results_dump_path = _dump_path
+ else:
+ _p = "/tmp"
+ logger_cli.info(
+ "# No result dump path set. Defaulting to {}"
+ "Consider setting it if running long task_file "
+ "based test runs".format(_p)
+ )
+ config.bench_results_dump_path = _p
+
+ # Report filename
+ _filename = args_utils.get_arg(args, 'html')
# gather Ceph info
logger_cli.info("# Collecting Ceph cluster information")
ceph_info = info.KubeCephInfo(config)
+ # Task files or options
+ _opts = get_fio_options()
+ # Load name and announce it
+ config.bench_name = args_utils.get_arg(args, "name")
+ _opts["name"] = config.bench_name
+ logger_cli.info(
+ "# Using '{}' as ceph bench jobs name".format(_opts["name"])
+ )
+
+ if _report_only:
+ # Do forced report creation and exit
+ config.bench_mode = "report"
+ config.bench_agent_count = -1
+ ceph_bench = bench.KubeCephBench(config)
+ ceph_bench.set_ceph_info_class(ceph_info)
+ logger_cli.info(
+ "# Preparing to generate report '{}'".format(
+ config.resource_prefix
+ )
+ )
+ # Preload previous results for this name
+ ceph_bench.preload_results()
+ # Gather ceph data
+ ceph_bench.wait_ceph_cooldown()
+ # Generate report
+ ceph_bench.create_report(_filename)
+ return
+
# Prepare the tasks and do synced testrun or a single one
logger_cli.info("# Initializing ceph benchmark module")
args_utils.check_supported_env(ENV_TYPE_KUBE, args, config)
- # Report filename
- _filename = args_utils.get_arg(args, 'html')
# agents count option
config.bench_agent_count = args_utils.get_arg(args, "agents")
logger_cli.info("-> using {} agents".format(config.bench_agent_count))
@@ -250,8 +298,6 @@
_storage_class = args_utils.get_arg(args, "storage_class")
logger_cli.info("-> using storage class of '{}'".format(_storage_class))
config.bench_storage_class = _storage_class
- # dump results options
- _dump_path = args_utils.get_arg(args, "dump_path")
if _dump_path:
logger_cli.info("# Results will be dumped to '{}'".format(_dump_path))
config.bench_results_dump_path = _dump_path
@@ -261,8 +307,7 @@
"Consider setting it if running long task_file based test runs"
)
config.bench_results_dump_path = _dump_path
- # Task files or options
- _opts = get_fio_options()
+
_task_file = args_utils.get_arg(args, "task_file", nofail=True)
if not _task_file:
logger_cli.info("-> Running single benchmark run")
@@ -289,18 +334,11 @@
logger_cli.info("-> running with tasks from '{}'".format(_task_file))
config.bench_task_file = _task_file
config.bench_mode = "tasks"
- config.bench_name = args_utils.get_arg(args, "name")
- _opts["name"] = config.bench_name
- logger_cli.info(
- "# Using '{}' as ceph bench jobs name".format(_opts["name"])
- )
logger_cli.debug("... default/selected options for fio:")
for _k in _opts.keys():
# TODO: Update options for single run
logger_cli.debug(" {} = {}".format(_k, _opts[_k]))
- # handle option inavailability from command line for single mode
-
# init the Bench class
ceph_bench = bench.KubeCephBench(config)
ceph_bench.set_ceph_info_class(ceph_info)
diff --git a/cfg_checker/modules/ceph/bench.py b/cfg_checker/modules/ceph/bench.py
index 2eedcfb..0780596 100644
--- a/cfg_checker/modules/ceph/bench.py
+++ b/cfg_checker/modules/ceph/bench.py
@@ -41,6 +41,19 @@
return {}
+def _split_vol_size(size):
+ # I know, but it is faster then regex
+ _numbers = [48, 49, 50, 51, 52, 53, 54, 55, 56, 57]
+ _s_int = "0"
+ _s_type = ""
+ for ch in size:
+ if ord(ch) in _numbers:
+ _s_int += ch
+ else:
+ _s_type += ch
+ return int(_s_int), _s_type
+
+
class CephBench(object):
_agent_template = "cfgagent-template.yaml"
@@ -69,26 +82,32 @@
self.mode = config.bench_mode
self.resource_prefix = config.resource_prefix
+
if config.bench_mode == "tasks":
self.taskfile = config.bench_task_file
self.load_tasks(self.taskfile)
- elif config.bench_mode == "cleanup":
+
+ if config.bench_mode == "cleanup":
self.cleanup_list = []
return
- self.storage_class = config.bench_storage_class
- self.results_dump_path = config.bench_results_dump_path
self.bench_name = config.bench_name
+ self.results_dump_path = config.bench_results_dump_path
+ self.results = {}
+ self.agent_results = {}
+ self.cleanup_list = []
self.agent_pods = []
+
+ if config.bench_mode == "report":
+ self.results = {}
+ return
+
+ self.storage_class = config.bench_storage_class
self.services = []
# By default,
# 30 seconds should be enough to send tasks to 3-5 agents
self.scheduled_delay = 30
- self.cleanup_list = []
- self.results = {}
- self.agent_results = {}
-
def set_ceph_info_class(self, ceph_info):
self.ceph_info = ceph_info
@@ -138,6 +157,19 @@
def prepare_agents(self, options):
logger_cli.info("# Preparing {} agents".format(self.agent_count))
+ # Increase volume size a bit, so datafile fits
+ _quanitizer = 1.3
+ _v_size, _vol_size_units = _split_vol_size(options['size'])
+ _v_size = round(_v_size * _quanitizer)
+ _vol_size = str(_v_size) + _vol_size_units + "i"
+ logger_cli.info(
+ "-> Testfile size: {0}, Volume size: {1} ({0}*{2})".format(
+ options['size'],
+ _vol_size,
+ _quanitizer
+ )
+ )
+ # Start preparing
for idx in range(self.agent_count):
# create pvc/pv and pod
logger_cli.info("-> creating agent '{:02}'".format(idx))
@@ -146,7 +178,7 @@
idx,
os.path.split(options["filename"])[0],
self.storage_class,
- options['size'] + 'i',
+ _vol_size,
self._agent_template
)
# save it to lists
@@ -314,6 +346,8 @@
_ramptime = _get_seconds(options["ramp_time"])
# Sum up all timings that we must wait and double it
_timeout = (self.scheduled_delay + _runtime + _ramptime) * 2
+ # We should have no more than 65 measurements
+ _stats_delay = round((_runtime + _ramptime) / 65)
_start = self.next_scheduled_time
_end = datetime.now(timezone.utc) + timedelta(seconds=_timeout)
while True:
@@ -331,9 +365,10 @@
)
# Get Ceph status if _start time passed
_elapsed = (datetime.now(timezone.utc) - _start).total_seconds()
- if _elapsed > 0:
+ if _elapsed > _stats_delay:
logger_cli.info("-> {:.2f}s elapsed".format(_elapsed))
- self.results[options["scheduled_to"]]["ceph"][_elapsed] = \
+ _sec = "{:0.1f}".format(_elapsed)
+ self.results[options["scheduled_to"]]["ceph"][_sec] = \
self.ceph_info.get_cluster_status()
# Check if agents finished
finished = [True for _s in _sts.values()
@@ -353,6 +388,8 @@
return False
def _do_testrun(self, options):
+ self.results[options["scheduled_to"]]["osd_df_before"] = \
+ self.ceph_info.get_ceph_osd_df()
# send single to agent
if not self._send_scheduled_task(options):
return False
@@ -361,11 +398,15 @@
return False
else:
logger_cli.info("-> Finished testrun. Collecting results...")
+ # get ceph osd stats
+ self.results[options["scheduled_to"]]["osd_df_after"] = \
+ self.ceph_info.get_ceph_osd_df()
# Get results for each agent
self.collect_results()
logger_cli.info("-> Calculating totals and averages")
self.calculate_totals()
self.calculate_ceph_stats()
+ self.osd_df_compare(options["scheduled_to"])
logger_cli.info("-> Dumping results")
for _time, _d in self.results.items():
self.dump_result(
@@ -395,8 +436,6 @@
# TODO: Ceph status check
# self._wait_ceph_cooldown()
- _get_df = self.ceph_info.get_ceph_osd_df
-
# Do benchmark according to mode
if self.mode == "tasks":
logger_cli.info(
@@ -408,7 +447,6 @@
_total_tasks = len(self.tasks)
for idx in range(_total_tasks):
# init time to schedule
- _osd_df_before = _get_df()
_task = self.tasks[idx]
logger_cli.info(
"-> Starting next task ({}/{})".format(idx+1, _total_tasks)
@@ -427,18 +465,15 @@
self.results[_sch_time] = {
"input_options": options,
"agents": {},
- "ceph": {},
- "osd_df_before": _osd_df_before
+ "ceph": {}
}
+ # exit on error
if not self._do_testrun(options):
return False
- else:
- self.results[_sch_time]["osd_df_after"] = _get_df()
-
+ # Save ceph osd stats and wait cooldown
self.wait_ceph_cooldown()
elif self.mode == "single":
logger_cli.info("# Running single benchmark")
- _osd_df_before = _get_df()
# init time to schedule
_sch_time = self._get_next_scheduled_time()
options["scheduled_to"] = _sch_time
@@ -446,13 +481,11 @@
self.results[_sch_time] = {
"input_options": options,
"agents": {},
- "ceph": {},
- "osd_df_before": _osd_df_before
+ "ceph": {}
}
if not self._do_testrun(options):
return False
- else:
- self.results[_sch_time]["osd_df_after"] = _get_df()
+ # Save ceph osd stats
else:
logger_cli.error("ERROR: Unknown mode '{}'".format(self.mode))
return False
@@ -669,14 +702,14 @@
def calculate_ceph_stats(self):
# func to get values as lists
- def _as_list(key, stats):
- _list = []
- for _v in stats.values():
- if key in _v:
- _list += [_v[key]]
- else:
- _list += [0]
- return _list
+ def _get_max_value(key, stats):
+ _max_time = 0
+ _value = 0
+ for _k, _v in stats.items():
+ if key in _v and _value < _v[key]:
+ _max_time = _k
+ _value = _v[key]
+ return _max_time, _value
def _perc(n, m):
if not n:
@@ -684,7 +717,12 @@
elif not m:
return 0
else:
- return (n / m) * 100
+ return "{:.0f}%".format((n / m) * 100)
+
+ def _axis_vals(val):
+ return [
+ val, int(val*1.1), int(val*0.75), int(val*0.50), int(val*0.15)
+ ]
_stats = {}
for _time, data in self.results.items():
@@ -701,28 +739,148 @@
for _e, _d in data["ceph"].items():
_stats[_e] = _d["pgmap"]
# Maximums
- m_r_bytes = max(_as_list("read_bytes_sec", _stats))
- m_w_bytes = max(_as_list("write_bytes_sec", _stats))
- m_r_iops = max(_as_list("read_op_per_sec", _stats))
- m_w_iops = max(_as_list("write_op_per_sec", _stats))
+ mrb_t, mrb = _get_max_value("read_bytes_sec", _stats)
+ mwb_t, mwb = _get_max_value("write_bytes_sec", _stats)
+ mri_t, mri = _get_max_value("read_op_per_sec", _stats)
+ mwi_t, mwi = _get_max_value("write_op_per_sec", _stats)
# Replace ceph with shorter data
data["ceph"] = {
- "max_read_bytes_sec": m_r_bytes,
- "max_write_bytes_sec": m_w_bytes,
- "max_read_iops_sec": m_r_iops,
- "max_write_iops_sec": m_w_iops,
+ "max_rbl": _axis_vals(mrb),
+ "max_rbl_time": mrb_t,
+ "max_wbl": _axis_vals(mwb),
+ "max_wbl_time": mwb_t,
+ "max_ril": _axis_vals(mri),
+ "max_ril_time": mri_t,
+ "max_wil": _axis_vals(mwi),
+ "max_wil_time": mwi_t,
"stats": _stats
}
# Calculate %% values for barchart
for _e, _d in data["ceph"]["stats"].items():
_d["read_bytes_sec_perc"] = \
- _perc(_d.get("read_bytes_sec", 0), m_r_bytes)
+ _perc(_d.get("read_bytes_sec", 0), mrb)
_d["write_bytes_sec_perc"] = \
- _perc(_d.get("write_bytes_sec", 0), m_w_bytes)
+ _perc(_d.get("write_bytes_sec", 0), mwb)
_d["read_op_per_sec_perc"] = \
- _perc(_d.get("read_op_per_sec", 0), m_r_iops)
+ _perc(_d.get("read_op_per_sec", 0), mri)
_d["write_op_per_sec_perc"] = \
- _perc(_d.get("write_op_per_sec", 0), m_w_iops)
+ _perc(_d.get("write_op_per_sec", 0), mwi)
+ return
+
+ def osd_df_compare(self, _time):
+ def _get_osd(osd_id, nodes):
+ for osd in nodes:
+ if osd["id"] == osd_id:
+ return osd
+ return None
+
+ logger_cli.info("# Comparing OSD stats")
+ _osd = {}
+ if _time not in self.results:
+ logger_cli.warning("WARNING: {} not found in results. Check data")
+ return
+ data = self.results[_time]
+ # Save summary
+ data["osd_summary"] = {}
+ data["osd_summary"]["before"] = data["osd_df_before"]["summary"]
+ data["osd_summary"]["after"] = data["osd_df_after"]["summary"]
+ data["osd_summary"]["active"] = {
+ "status": "",
+ "device_class": "",
+ "pgs": "",
+ "kb_used": 0,
+ "kb_used_data": 0,
+ "kb_used_omap": 0,
+ "kb_used_meta": 0,
+ "utilization": 0,
+ "var_down": 0,
+ "var_up": 0
+ }
+ # Compare OSD counts
+ osds_before = len(data["osd_df_before"]["nodes"])
+ osds_after = len(data["osd_df_after"]["nodes"])
+ if osds_before != osds_after:
+ logger_cli.warning(
+ "WARNING: Before/After bench OSD "
+ "count mismatch for '{}'".format(_time)
+ )
+ # iterate osds from before
+ _pgs = 0
+ _classes = set()
+ _nodes_up = 0
+ for idx in range(osds_before):
+ _osd_b = data["osd_df_before"]["nodes"][idx]
+ # search for the same osd in after
+ _osd_a = _get_osd(_osd_b["id"], data["osd_df_after"]["nodes"])
+ # Save data to the new place
+ _osd[_osd_b["name"]] = {}
+ _osd[_osd_b["name"]]["before"] = _osd_b
+ if not _osd_a:
+ # If this happen, Ceph cluster is actually broken
+ logger_cli.warning(
+ "WARNING: Wow! {} dissapered".format(_osd_b["name"])
+ )
+ _osd[_osd_b["name"]]["after"] = {}
+ else:
+ _osd[_osd_b["name"]]["after"] = _osd_a
+ _osd[_osd_b["name"]]["percent"] = {}
+ # Calculate summary using "after" data
+ _pgs += _osd_a["pgs"]
+ _classes.update([_osd_a["device_class"]])
+ if _osd_a["status"] == "up":
+ _nodes_up += 1
+ # compare
+ _keys_b = list(_osd_b.keys())
+ _keys_a = list(_osd_a.keys())
+ _nodes_up
+ # To be safe, detect if some keys are different
+ # ...and log it.
+ _diff = set(_keys_b).symmetric_difference(_keys_a)
+ if len(_diff) > 0:
+ # This should never happen, actually
+ logger_cli.warning(
+ "WARNING: Before/after keys mismatch "
+ "for OSD node {}: {}".format(idx, ", ".join(_diff))
+ )
+ continue
+ # Compare each key and calculate how it changed
+ for k in _keys_b:
+ if _osd_b[k] != _osd_a[k]:
+ # Announce change
+ logger_cli.debug(
+ "-> {:4}: {}, {} -> {}".format(
+ idx,
+ k,
+ _osd_b[k],
+ _osd_a[k]
+ )
+ )
+ # calculate percent
+ _change_perc = (_osd_a[k] / _osd_b[k]) * 100 - 100
+ _osd[_osd_b["name"]]["percent"][k] = _change_perc
+
+ # Increase counters
+ _p = data["osd_summary"]["active"]
+
+ if k not in _p:
+ _p[k] = 1
+ else:
+ _p[k] += 1
+ if k == "var":
+ if _change_perc > 0:
+ _p["var_up"] += 1
+ elif _change_perc < 0:
+ _p["var_down"] += 1
+ # Save sorted data
+ data["osds"] = _osd
+ logger_cli.info("-> Removing redundand osd before/after data")
+ data.pop("osd_df_before")
+ data.pop("osd_df_after")
+ # Save summary
+ data["osd_summary"]["active"]["status"] = "{}".format(_nodes_up)
+ data["osd_summary"]["active"]["device_class"] = \
+ "{}".format(len(list(_classes)))
+ data["osd_summary"]["active"]["pgs"] = _pgs
return
# Create report
diff --git a/cfg_checker/nodes.py b/cfg_checker/nodes.py
index b908571..ebf0e00 100644
--- a/cfg_checker/nodes.py
+++ b/cfg_checker/nodes.py
@@ -1322,13 +1322,14 @@
_mnt["mountPath"] = path
# replace claim
for _v in _pod["spec"]["volumes"]:
- if "placeholder" in _v["name"]:
+ if "cfgagent-pv" in _v["name"]:
# _v["name"] = _pv_n
_v["persistentVolumeClaim"]["claimName"] = _pvc_n
# init volume resources
# _pv_object = self.kube.init_pv_resource(_pv_n, sc, size, path)
# _pv = self.kube.prepare_pv(_pv_object)
+ # update size of the volume to be 15% larger
_pvc_object = self.kube.init_pvc_resource(_pvc_n, sc, size)
_pvc = self.kube.prepare_pvc(_pvc_object)
diff --git a/cfg_checker/reports/reporter.py b/cfg_checker/reports/reporter.py
index 150ce65..de0c83c 100644
--- a/cfg_checker/reports/reporter.py
+++ b/cfg_checker/reports/reporter.py
@@ -123,15 +123,15 @@
def to_gb(bytes_str):
- _bytes = int(bytes_str)
- _gb = _bytes / 1024 / 1024 / 1024
- return "{}".format(round(_gb, 2))
+ return "{}".format(round(int(bytes_str) / 1024 / 1024 / 1024, 2))
def to_mb(bytes_str):
- _bytes = int(bytes_str)
- _mb = _bytes / 1024 / 1024
- return "{}".format(round(_mb, 2))
+ return "{}".format(round(int(bytes_str) / 1024 / 1024, 2))
+
+
+def to_kb(bytes_str):
+ return "{}".format(round(int(bytes_str) / 1024, 2))
def get_bucket_item_name(id, cmap):
@@ -251,6 +251,7 @@
self.jinja2_env.filters['pkg_repo_info'] = make_repo_info
self.jinja2_env.filters['to_gb'] = to_gb
self.jinja2_env.filters['to_mb'] = to_mb
+ self.jinja2_env.filters['to_kb'] = to_kb
self.jinja2_env.filters['get_bucket_item_name'] = get_bucket_item_name
self.jinja2_env.filters['get_rule_steps'] = get_rule_steps
self.jinja2_env.filters['get_pool_stats'] = get_pool_stats_by_id