cfg-checker ceph bench module alpha version
- Ceph benchmark report (beta)
- Updated result time choosing. Now results reported based on start time
- New methods for listing
- Cleanup-only mode
- Unified results processing
- Additional ceph info gather
- Experimental barchart graph example
Fixes:
- Kube API client recreated each time for stability (HTTP/WebSocket specifics)
- args naming fixes
-
Change-Id: Id541f789a00ab4ee827603c5b6f7f07899aaa7c5
diff --git a/cfg_checker/modules/ceph/bench.py b/cfg_checker/modules/ceph/bench.py
index 7640440..d804f4a 100644
--- a/cfg_checker/modules/ceph/bench.py
+++ b/cfg_checker/modules/ceph/bench.py
@@ -7,7 +7,9 @@
from cfg_checker.common import logger_cli
from cfg_checker.common.decorators import retry
+from cfg_checker.common.file_utils import write_str_to_file
from cfg_checker.helpers.console_utils import Progress
+from cfg_checker.reports import reporter
# from cfg_checker.common.exception import InvalidReturnException
# from cfg_checker.common.exception import ConfigException
# from cfg_checker.common.exception import KubeException
@@ -16,6 +18,27 @@
from cfg_checker.agent.fio_runner import _get_seconds, _datetime_fmt
+def _reformat_timestr(_str, _chars=["/", ",", " ", ":", "+"], _tchar=""):
+ _new = ""
+ for _c in _str:
+ _new += _c if _c not in _chars else _tchar
+ return _new
+
+
+def _parse_json_output(buffer):
+ try:
+ return json.loads(buffer)
+ except TypeError as e:
+ logger_cli.error(
+ "ERROR: Status not decoded: {}\n{}".format(e, buffer)
+ )
+ except json.decoder.JSONDecodeError as e:
+ logger_cli.error(
+ "ERROR: Status not decoded: {}\n{}".format(e, buffer)
+ )
+ return {}
+
+
class CephBench(object):
_agent_template = "cfgagent-template.yaml"
@@ -41,17 +64,30 @@
self.agent_count = config.bench_agent_count
self.master = KubeNodes(config)
super(KubeCephBench, self).__init__(config)
- self.storage_class = config.bench_storage_class
- self.agent_pods = []
- self.services = []
- self.scheduled_delay = 30
+
self.mode = config.bench_mode
+ self.resource_prefix = config.resource_prefix
if config.bench_mode == "tasks":
self.taskfile = config.bench_task_file
self.load_tasks(self.taskfile)
+ elif config.bench_mode == "cleanup":
+ self.cleanup_list = []
+ return
+
+ self.storage_class = config.bench_storage_class
+ self.results_dump_path = config.bench_results_dump_path
+ self.agent_pods = []
+ self.services = []
+ # By default,
+ # 30 seconds should be enough to send tasks to 3-5 agents
+ self.scheduled_delay = 30
self.cleanup_list = []
self.results = {}
+ self.agent_results = {}
+
+ def set_ceph_info_class(self, ceph_info):
+ self.ceph_info = ceph_info
def load_tasks(self, taskfile):
# Load csv file
@@ -68,10 +104,33 @@
"iodepth": row[3],
"size": row[4]
})
+ logger_cli.info("-> Loaded {} tasks".format(len(self.tasks)))
def add_for_deletion(self, obj, typ):
- _d = [typ, obj.metadata.namespace, obj.metadata.name]
- self.cleanup_list.append(_d)
+ self.cleanup_list.append(
+ [
+ typ,
+ obj.metadata.namespace,
+ obj.metadata.name
+ ]
+ )
+ return
+
+ def prepare_cleanup(self):
+ # Assume number of resources not given
+ # list all svc, pod, pvc, pv and identify 'cfgagent-xx ones
+ _types = ["pv", "pvc", "pod", "svc"]
+ _prefix = self.resource_prefix
+ for _typ in _types:
+ _list = self.master.list_resource_names_by_type_and_ns(_typ)
+ for ns, name in _list:
+ if name.startswith(_prefix):
+ if ns:
+ _msg = "{} {}/{}".format(_typ, ns, name)
+ else:
+ _msg = "{} {}".format(_typ, name)
+ logger_cli.info("-> Found {}".format(_msg))
+ self.cleanup_list.append([_typ, ns, name])
return
def prepare_agents(self, options):
@@ -98,19 +157,23 @@
# Save service
self.services.append(_svc)
# prepopulate results
- self.results[_agent.metadata.name] = {}
- self.results[_agent.metadata.name]["list"] = {}
- self.results[_agent.metadata.name]["url"] = \
+ self.agent_results[_agent.metadata.name] = {}
+ self.agent_results[_agent.metadata.name]["url"] = \
"http://{}:{}/api/".format(
_svc.spec.cluster_ip,
8765
)
- self.results[_agent.metadata.name]["storage_class"] = \
+ self.agent_results[_agent.metadata.name]["storage_class"] = \
self.storage_class
- self.results[_agent.metadata.name]["volume_size"] = \
+ self.agent_results[_agent.metadata.name]["volume_size"] = \
options['size']
logger_cli.info("-> Done creating agents")
+ # TODO: Update after implementing pooled task sending
+ self.scheduled_delay = self.agent_count * 6
+ logger_cli.info(
+ "-> Schedule delay set to {} sec".format(self.scheduled_delay)
+ )
return
def _poke_agent(self, url, body, action="GET"):
@@ -141,18 +204,7 @@
self.master._namespace,
" ".join(_cmd)
)
- try:
- return json.loads(_ret)
- except TypeError as e:
- logger_cli.error(
- "ERROR: Status not decoded: {}\n{}".format(e, _ret)
- )
- except json.decoder.JSONDecodeError as e:
- logger_cli.error(
- "ERROR: Status not decoded: {}\n{}".format(e, _ret)
- )
-
- return None
+ return _parse_json_output(_ret)
def _ensure_agents_ready(self):
# make sure agents idle
@@ -190,18 +242,24 @@
def get_agents_status(self):
_status = {}
- for _agent, _d in self.results.items():
- _status[_agent] = self._poke_agent(_d["url"] + "fio", {})
+ _results = self.master.exec_on_labeled_pods_and_ns(
+ "app=cfgagent",
+ "curl -s http://localhost:8765/api/fio"
+ )
+ for _agent, _result in _results.items():
+ _j = _parse_json_output(_result)
+ _status[_agent] = _j
return _status
+ @retry(Exception, initial_wait=5)
def get_agents_resultlist(self):
_t = {"module": "fio", "action": "get_resultlist"}
_status = {}
- for _agent, _d in self.results.items():
+ for _agent, _d in self.agent_results.items():
_status[_agent] = self._poke_agent(_d["url"], _t, action="POST")
return _status
- @retry(Exception)
+ @retry(Exception, initial_wait=5)
def get_result_from_agent(self, agent, time):
_t = {
"module": "fio",
@@ -210,7 +268,11 @@
"time": time
}
}
- return self._poke_agent(self.results[agent]["url"], _t, action="POST")
+ return self._poke_agent(
+ self.agent_results[agent]["url"],
+ _t,
+ action="POST"
+ )
def _get_next_scheduled_time(self):
_now = datetime.now(timezone.utc)
@@ -228,7 +290,7 @@
"action": "do_scheduledrun",
"options": options
}
- for _agent, _d in self.results.items():
+ for _agent, _d in self.agent_results.items():
logger_cli.info(
"-> sending task to '{}:{}'".format(_agent, _d["url"])
)
@@ -261,7 +323,7 @@
_status["progress"]
)
)
- finished = [True for _s in _sts.values()
+ finished = [True for _s in _sts.values()
if _s["status"] == 'finished']
_fcnt = len(finished)
_tcnt = len(_sts)
@@ -276,8 +338,9 @@
if diff < 0:
logger_cli.info("-> Timed out waiting for agents to finish")
return False
- logger_cli.info("-> Sleeping for {:.2f}s".format(diff/3))
- sleep(diff/3)
+ else:
+ logger_cli.info("-> Sleeping for {:.2f}s".format(2))
+ sleep(2)
if diff <= 0.1:
logger_cli.info("-> Timed out waiting for agents to finish")
return False
@@ -292,12 +355,17 @@
else:
logger_cli.info("-> Finished testrun")
# Get results for each agent
- self.collect_results()
+ self.collect_results(options)
return True
- def _wait_ceph_cooldown(self):
+ def wait_ceph_cooldown(self):
# TODO: Query Ceph ince a 20 sec to make sure its load dropped
+ # get ceph idle status
+ self.ceph_idle_status = self.ceph_info.get_cluster_status()
+ self.health_detail = self.ceph_info.get_health_detail()
+ self.ceph_df = self.ceph_info.get_ceph_df()
+ self.ceph_pg_dump = self.ceph_info.get_ceph_pg_dump()
return
def run_benchmark(self, options):
@@ -309,7 +377,9 @@
# Make sure that Ceph is at low load
# TODO: Ceph status check
- self._wait_ceph_cooldown()
+ # self._wait_ceph_cooldown()
+
+ _get_df = self.ceph_info.get_ceph_osd_df
# Do benchmark according to mode
if self.mode == "tasks":
@@ -321,6 +391,8 @@
# take next task
_total_tasks = len(self.tasks)
for idx in range(_total_tasks):
+ # init time to schedule
+ _osd_df_before = _get_df()
_task = self.tasks[idx]
logger_cli.info(
"-> Starting next task ({}/{})".format(idx+1, _total_tasks)
@@ -333,18 +405,36 @@
)
# update options
options.update(_task)
- # init time to schedule
- options["scheduled_to"] = self._get_next_scheduled_time()
+ _sch_time = self._get_next_scheduled_time()
+ options["scheduled_to"] = _sch_time
+ # init results table
+ self.results[_sch_time] = {
+ "input_options": options,
+ "agents": {},
+ "osd_df_before": _osd_df_before
+ }
if not self._do_testrun(options):
return False
+ else:
+ self.results[_sch_time]["osd_df_after"] = _get_df()
- self._wait_ceph_cooldown()
+ self.wait_ceph_cooldown()
elif self.mode == "single":
logger_cli.info("# Running single benchmark")
+ _osd_df_before = _get_df()
# init time to schedule
- options["scheduled_to"] = self._get_next_scheduled_time()
+ _sch_time = self._get_next_scheduled_time()
+ options["scheduled_to"] = _sch_time
+ # init results table
+ self.results[_sch_time] = {
+ "input_options": options,
+ "agents": {},
+ "osd_df_before": _osd_df_before
+ }
if not self._do_testrun(options):
return False
+ else:
+ self.results[_sch_time]["osd_df_after"] = _get_df()
else:
logger_cli.error("ERROR: Unknown mode '{}'".format(self.mode))
return False
@@ -354,6 +444,7 @@
return True
def cleanup(self):
+ logger_cli.info("# Cleaning up")
self.cleanup_list.reverse()
for _res in self.cleanup_list:
@@ -385,39 +476,179 @@
return
- def collect_results(self):
+ def collect_results(self, options):
+ _sch_time = options["scheduled_to"]
logger_cli.info("# Collecting results")
# query agents for results
_agents = self.get_agents_resultlist()
+ # Syntax shortcut
+ _ar = self.results[_sch_time]["agents"]
for _agent, _l in _agents.items():
- _list = _l["resultlist"]
- _new = [r for r in _list if r not in self.results[_agent]["list"]]
- logger_cli.debug(
- "... agent '{}' has {} new results".format(_agent, len(_new))
- )
- # get all new results
- for _time in _new:
- logger_cli.info(
- "-> loading results for '{}' from '{}'".format(
- _time,
- _agent
+ # Create a syntax shortcut
+ if _agent not in _ar:
+ _ar[_agent] = {}
+ _arl = _ar[_agent]
+ # Check if we already have this locally
+ for _time in _l["resultlist"]:
+ _filename = self._get_dump_filename(_sch_time, _agent, options)
+ if os.path.exists(_filename):
+ # There is a file already for this task
+ # Check if we need to load it
+ if _sch_time in _arl:
+ logger_cli.info(
+ "-> Skipped already processed result '{}'".format(
+ _filename
+ )
+ )
+ else:
+ # Load previously dumped result from disk
+ logger_cli.info(
+ "-> Loading already present result '{}'".format(
+ _filename
+ )
+ )
+ _arl[_sch_time] = self.load_dumped_result(_filename)
+ else:
+ # Load result add it locally and dump it
+ logger_cli.info(
+ "-> Getting results for '{}' from '{}'".format(
+ _sch_time,
+ _agent
+ )
)
- )
- self.results[_agent]["list"].update(
- self.get_result_from_agent(_agent, _time)
- )
+ _r = self.get_result_from_agent(_agent, _time)
+ # Important to switch from result status time
+ # to scheduled time
+ _arl[_sch_time] = _r[_time]
+ # Dump collected result
+ self.dump_result(_filename, _arl[_sch_time])
return
- def dump_results(self, path):
- # Function dumps all availabkle results as jsons to the given path
+ def _get_dump_filename(self, _time, agent, options):
+ _dirname = _reformat_timestr(_time)
+ _filename = "-".join([
+ _dirname,
+ agent,
+ options["readwrite"],
+ options["bs"],
+ str(options["iodepth"]),
+ ]) + ".json"
+ return os.path.join(
+ self.results_dump_path,
+ _dirname,
+ _filename
+ )
+
+ def dump_result(self, filename, data):
+ # Function dumps all available results as jsons to the given path
# overwriting if needed
-
- # TODO: Conduct the dumping
-
+ _folder, _file = os.path.split(filename)
+ # Do dump
+ if not os.path.exists(_folder):
+ os.mkdir(_folder)
+ logger_cli.info("-> Created folder '{}'".format(_folder))
+ # Dump agent data for this test run
+ write_str_to_file(filename, json.dumps(data, indent=2))
+ logger_cli.info("-> Dumped '{}'".format(filename))
return
+ def load_dumped_result(self, filename):
+ try:
+ with open(filename, "rt+") as f:
+ return json.loads(f.read())
+ except FileNotFoundError as e:
+ logger_cli.error(
+ "ERROR: {}".format(e)
+ )
+ except TypeError as e:
+ logger_cli.error(
+ "ERROR: Invalid file ({}): {}".format(filename, e)
+ )
+ except json.decoder.JSONDecodeError as e:
+ logger_cli.error(
+ "ERROR: Failed to decode json ({}): {}".format(filename, e)
+ )
+ return None
+
+ def _lookup_storage_class_id_by_name(self, storage_class_name):
+ # Assume that self had proper data
+ for _pool in self.ceph_df["pools"]:
+ if storage_class_name == _pool["name"]:
+ return _pool["id"]
+ return None
+
+ def calculate_totals(self):
+ # Calculate totals for Read and Write
+ for _time, data in self.results.items():
+ if "totals" not in data:
+ data["totals"] = {}
+ else:
+ continue
+ _totals = data["totals"]
+ _r_bw = 0
+ _r_avglat = []
+ _r_iops = 0
+ _w_bw = 0
+ _w_avglat = []
+ _w_iops = 0
+ for _a, _d in data["agents"].items():
+ # Hardcoded number of jobs param :(
+ _j = _d[_time]["jobs"][0]
+ _r_bw += _j["read"]["bw_bytes"]
+ _r_avglat += [_j["read"]["lat_ns"]["mean"]]
+ _r_iops += _j["read"]["iops"]
+ _w_bw += _j["write"]["bw_bytes"]
+ _w_avglat += [_j["write"]["lat_ns"]["mean"]]
+ _w_iops += _j["write"]["iops"]
+ # Save storage class name
+ if "storage_class" not in _totals:
+ _totals["storage_class"] = \
+ self.agent_results[_a]["storage_class"]
+ # Lookup storage class id and num_pg
+ _totals["storage_class_stats"] = \
+ reporter.get_pool_stats_by_id(
+ self._lookup_storage_class_id_by_name(
+ self.agent_results[_a]["storage_class"]
+ ),
+ self.ceph_pg_dump
+ )
+
+ _totals["read_bw_bytes"] = _r_bw
+ _totals["read_avg_lat_us"] = \
+ (sum(_r_avglat) / len(_r_avglat)) / 1000
+ _totals["read_iops"] = _r_iops
+ _totals["write_bw_bytes"] = _w_bw
+ _totals["write_avg_lat_us"] = \
+ (sum(_w_avglat) / len(_w_avglat)) / 1000
+ _totals["write_iops"] = _w_iops
+
# Create report
def create_report(self, filename):
+ """
+ Create static html showing ceph info report
+
+ :return: none
+ """
+ logger_cli.info("### Generating report to '{}'".format(filename))
+ _report = reporter.ReportToFile(
+ reporter.HTMLCephBench(self),
+ filename
+ )
+ self.calculate_totals()
+ _report(
+ {
+ "results": self.results,
+ "idle_status": self.ceph_idle_status,
+ "health_detail": self.health_detail,
+ "ceph_df": self.ceph_df,
+ "ceph_pg_dump": self.ceph_pg_dump,
+ "info": self.ceph_info.ceph_info,
+ "cluster": self.ceph_info.cluster_info,
+ "ceph_version": self.ceph_info.ceph_version,
+ "nodes": self.agent_pods
+ }
+ )
+ logger_cli.info("-> Done")
return