Ceph report fixes and Ceph bench beta 0.1
- Ceph stats collection
- Updated Ceph results averages calculations
Fixes:
- Fixed huge PG dump copying >30MB jsons
- Fixes for the fio-runner constants
Related-PROD: PROD-36669
Change-Id: Id8e250f626dfdaecc12ad005b61d03a21c9e6c4e
diff --git a/cfg_checker/modules/ceph/bench.py b/cfg_checker/modules/ceph/bench.py
index d804f4a..2eedcfb 100644
--- a/cfg_checker/modules/ceph/bench.py
+++ b/cfg_checker/modules/ceph/bench.py
@@ -3,7 +3,6 @@
import json
from datetime import datetime, timedelta, timezone
-from time import sleep
from cfg_checker.common import logger_cli
from cfg_checker.common.decorators import retry
@@ -18,6 +17,9 @@
from cfg_checker.agent.fio_runner import _get_seconds, _datetime_fmt
+_file_datetime_fmt = "%m%d%Y%H%M%S%z"
+
+
def _reformat_timestr(_str, _chars=["/", ",", " ", ":", "+"], _tchar=""):
_new = ""
for _c in _str:
@@ -76,6 +78,7 @@
self.storage_class = config.bench_storage_class
self.results_dump_path = config.bench_results_dump_path
+ self.bench_name = config.bench_name
self.agent_pods = []
self.services = []
# By default,
@@ -138,7 +141,8 @@
for idx in range(self.agent_count):
# create pvc/pv and pod
logger_cli.info("-> creating agent '{:02}'".format(idx))
- _agent, _pv, _pvc = self.master.prepare_benchmark_agent(
+ # _agent, _pv, _pvc = self.master.prepare_benchmark_agent(
+ _agent, _pvc = self.master.prepare_benchmark_agent(
idx,
os.path.split(options["filename"])[0],
self.storage_class,
@@ -147,7 +151,7 @@
)
# save it to lists
self.agent_pods.append(_agent)
- self.add_for_deletion(_pv, "pv")
+ # self.add_for_deletion(_pv, "pv")
self.add_for_deletion(_pvc, "pvc")
self.add_for_deletion(_agent, "pod")
@@ -170,7 +174,7 @@
logger_cli.info("-> Done creating agents")
# TODO: Update after implementing pooled task sending
- self.scheduled_delay = self.agent_count * 6
+ self.scheduled_delay = self.agent_count * 10
logger_cli.info(
"-> Schedule delay set to {} sec".format(self.scheduled_delay)
)
@@ -277,8 +281,10 @@
def _get_next_scheduled_time(self):
_now = datetime.now(timezone.utc)
logger_cli.info("-> time is '{}'".format(_now.strftime(_datetime_fmt)))
- _time = _now + timedelta(seconds=self.scheduled_delay)
- _str_time = _time.strftime(_datetime_fmt)
+ self.next_scheduled_time = _now + timedelta(
+ seconds=self.scheduled_delay
+ )
+ _str_time = self.next_scheduled_time.strftime(_datetime_fmt)
logger_cli.info(
"-> next benchmark scheduled to '{}'".format(_str_time)
)
@@ -308,10 +314,10 @@
_ramptime = _get_seconds(options["ramp_time"])
# Sum up all timings that we must wait and double it
_timeout = (self.scheduled_delay + _runtime + _ramptime) * 2
+ _start = self.next_scheduled_time
_end = datetime.now(timezone.utc) + timedelta(seconds=_timeout)
while True:
# Print status
- # TODO: do pooled status get
_sts = self.get_agents_status()
diff = (_end - datetime.now(timezone.utc)).total_seconds()
logger_cli.info("-> {:.2f}s left. Agent status:".format(diff))
@@ -323,6 +329,13 @@
_status["progress"]
)
)
+ # Get Ceph status if _start time passed
+ _elapsed = (datetime.now(timezone.utc) - _start).total_seconds()
+ if _elapsed > 0:
+ logger_cli.info("-> {:.2f}s elapsed".format(_elapsed))
+ self.results[options["scheduled_to"]]["ceph"][_elapsed] = \
+ self.ceph_info.get_cluster_status()
+ # Check if agents finished
finished = [True for _s in _sts.values()
if _s["status"] == 'finished']
_fcnt = len(finished)
@@ -338,12 +351,6 @@
if diff < 0:
logger_cli.info("-> Timed out waiting for agents to finish")
return False
- else:
- logger_cli.info("-> Sleeping for {:.2f}s".format(2))
- sleep(2)
- if diff <= 0.1:
- logger_cli.info("-> Timed out waiting for agents to finish")
- return False
def _do_testrun(self, options):
# send single to agent
@@ -353,9 +360,18 @@
if not self.track_benchmark(options):
return False
else:
- logger_cli.info("-> Finished testrun")
+ logger_cli.info("-> Finished testrun. Collecting results...")
# Get results for each agent
- self.collect_results(options)
+ self.collect_results()
+ logger_cli.info("-> Calculating totals and averages")
+ self.calculate_totals()
+ self.calculate_ceph_stats()
+ logger_cli.info("-> Dumping results")
+ for _time, _d in self.results.items():
+ self.dump_result(
+ self._get_dump_filename(_time),
+ _d
+ )
return True
def wait_ceph_cooldown(self):
@@ -411,6 +427,7 @@
self.results[_sch_time] = {
"input_options": options,
"agents": {},
+ "ceph": {},
"osd_df_before": _osd_df_before
}
if not self._do_testrun(options):
@@ -429,6 +446,7 @@
self.results[_sch_time] = {
"input_options": options,
"agents": {},
+ "ceph": {},
"osd_df_before": _osd_df_before
}
if not self._do_testrun(options):
@@ -476,63 +494,50 @@
return
- def collect_results(self, options):
- _sch_time = options["scheduled_to"]
+ def collect_results(self):
logger_cli.info("# Collecting results")
# query agents for results
_agents = self.get_agents_resultlist()
- # Syntax shortcut
- _ar = self.results[_sch_time]["agents"]
-
for _agent, _l in _agents.items():
- # Create a syntax shortcut
- if _agent not in _ar:
- _ar[_agent] = {}
- _arl = _ar[_agent]
# Check if we already have this locally
for _time in _l["resultlist"]:
- _filename = self._get_dump_filename(_sch_time, _agent, options)
- if os.path.exists(_filename):
- # There is a file already for this task
- # Check if we need to load it
- if _sch_time in _arl:
- logger_cli.info(
- "-> Skipped already processed result '{}'".format(
- _filename
- )
- )
- else:
- # Load previously dumped result from disk
- logger_cli.info(
- "-> Loading already present result '{}'".format(
- _filename
- )
- )
- _arl[_sch_time] = self.load_dumped_result(_filename)
- else:
- # Load result add it locally and dump it
+ # There is a file already for this task/time
+ # Check if we need to load it
+ if _time not in self.results:
+ # Some older results found
+ # do not process them
+ logger_cli.info(
+ "-> Skipped old results for '{}'".format(_time)
+ )
+ continue
+ elif _agent not in self.results[_time]["agents"]:
+ # Load result add it locally
logger_cli.info(
"-> Getting results for '{}' from '{}'".format(
- _sch_time,
+ _time,
_agent
)
)
_r = self.get_result_from_agent(_agent, _time)
- # Important to switch from result status time
- # to scheduled time
- _arl[_sch_time] = _r[_time]
- # Dump collected result
- self.dump_result(_filename, _arl[_sch_time])
- return
+ self.results[_time]["agents"][_agent] = _r[_time]
+ else:
+ # Should never happen, actually
+ logger_cli.info(
+ "-> Skipped loaded result for '{}' from '{}'".format(
+ _time,
+ _agent
+ )
+ )
- def _get_dump_filename(self, _time, agent, options):
- _dirname = _reformat_timestr(_time)
+ def _get_dump_filename(self, _time):
+ _r = self.results[_time]
+ _dirname = _r["input_options"]["name"]
_filename = "-".join([
- _dirname,
- agent,
- options["readwrite"],
- options["bs"],
- str(options["iodepth"]),
+ _reformat_timestr(_time),
+ "{:02}".format(len(_r["agents"])),
+ _r["input_options"]["readwrite"],
+ _r["input_options"]["bs"],
+ str(_r["input_options"]["iodepth"]),
]) + ".json"
return os.path.join(
self.results_dump_path,
@@ -540,6 +545,45 @@
_filename
)
+ def preload_results(self):
+ logger_cli.info(
+ "# Preloading results for '{}'".format(self.bench_name)
+ )
+ # get all dirs in folder
+ _p = self.results_dump_path
+ if not os.path.isdir(_p):
+ logger_cli.warn(
+ "WARNING: Dump path is not a folder '{}'".format(_p)
+ )
+ return
+ for path, dirs, files in os.walk(_p):
+ if path == os.path.join(_p, self.bench_name):
+ logger_cli.info("-> Folder found '{}'".format(path))
+ for _fname in files:
+ logger_cli.debug("... processing '{}'".format(_fname))
+ _ext = _fname.split('.')[-1]
+ if _ext != "json":
+ logger_cli.info(
+ "-> Extension invalid '{}', "
+ "'json' is expected".format(_ext)
+ )
+ continue
+ # get time from filename
+ # Ugly, but works
+ _t = _fname.split('-')[0]
+ _str_time = _t[:14] + "+" + _t[14:]
+ _t = datetime.strptime(_str_time, _file_datetime_fmt)
+ _time = _t.strftime(_datetime_fmt)
+ self.results[_time] = self.load_dumped_result(
+ os.path.join(path, _fname)
+ )
+ logger_cli.info(
+ "-> Loaded '{}' as '{}'".format(
+ _fname,
+ _time
+ )
+ )
+
def dump_result(self, filename, data):
# Function dumps all available results as jsons to the given path
# overwriting if needed
@@ -594,7 +638,7 @@
_w_iops = 0
for _a, _d in data["agents"].items():
# Hardcoded number of jobs param :(
- _j = _d[_time]["jobs"][0]
+ _j = _d["jobs"][0]
_r_bw += _j["read"]["bw_bytes"]
_r_avglat += [_j["read"]["lat_ns"]["mean"]]
_r_iops += _j["read"]["iops"]
@@ -623,6 +667,64 @@
(sum(_w_avglat) / len(_w_avglat)) / 1000
_totals["write_iops"] = _w_iops
+ def calculate_ceph_stats(self):
+ # func to get values as lists
+ def _as_list(key, stats):
+ _list = []
+ for _v in stats.values():
+ if key in _v:
+ _list += [_v[key]]
+ else:
+ _list += [0]
+ return _list
+
+ def _perc(n, m):
+ if not n:
+ return 0
+ elif not m:
+ return 0
+ else:
+ return (n / m) * 100
+
+ _stats = {}
+ for _time, data in self.results.items():
+ if "ceph" not in data:
+ logger_cli.warning(
+ "WARNING: Ceph stats raw data not found in results"
+ )
+ continue
+ if "ceph_stats" not in data:
+ data["ceph_stats"] = {}
+ else:
+ continue
+ # Copy pool stats data
+ for _e, _d in data["ceph"].items():
+ _stats[_e] = _d["pgmap"]
+ # Maximums
+ m_r_bytes = max(_as_list("read_bytes_sec", _stats))
+ m_w_bytes = max(_as_list("write_bytes_sec", _stats))
+ m_r_iops = max(_as_list("read_op_per_sec", _stats))
+ m_w_iops = max(_as_list("write_op_per_sec", _stats))
+ # Replace ceph with shorter data
+ data["ceph"] = {
+ "max_read_bytes_sec": m_r_bytes,
+ "max_write_bytes_sec": m_w_bytes,
+ "max_read_iops_sec": m_r_iops,
+ "max_write_iops_sec": m_w_iops,
+ "stats": _stats
+ }
+ # Calculate %% values for barchart
+ for _e, _d in data["ceph"]["stats"].items():
+ _d["read_bytes_sec_perc"] = \
+ _perc(_d.get("read_bytes_sec", 0), m_r_bytes)
+ _d["write_bytes_sec_perc"] = \
+ _perc(_d.get("write_bytes_sec", 0), m_w_bytes)
+ _d["read_op_per_sec_perc"] = \
+ _perc(_d.get("read_op_per_sec", 0), m_r_iops)
+ _d["write_op_per_sec_perc"] = \
+ _perc(_d.get("write_op_per_sec", 0), m_w_iops)
+ return
+
# Create report
def create_report(self, filename):
"""
@@ -635,7 +737,6 @@
reporter.HTMLCephBench(self),
filename
)
- self.calculate_totals()
_report(
{
"results": self.results,