cfg-checker ceph bench module alpha version
- Ceph benchmark report (beta)
- Updated result time choosing. Now results reported based on start time
- New methods for listing
- Cleanup-only mode
- Unified results processing
- Additional ceph info gather
- Experimental barchart graph example
Fixes:
- Kube API client recreated each time for stability (HTTP/WebSocket specifics)
- args naming fixes
-
Change-Id: Id541f789a00ab4ee827603c5b6f7f07899aaa7c5
diff --git a/cfg_checker/agent/fio_runner.py b/cfg_checker/agent/fio_runner.py
index c8488af..3cc6ca2 100644
--- a/cfg_checker/agent/fio_runner.py
+++ b/cfg_checker/agent/fio_runner.py
@@ -257,6 +257,7 @@
self._fio_options_common["ioengine"] = "posixaio"
# Thread finish marker
self.finished = False
+ self.testrun_starttime = None
self.scheduled_datetime = None
def update_options(self, _dict):
@@ -296,8 +297,8 @@
_q = queue.Queue()
self.fiorun = ShellThread(_cmd, _q)
# Check if schedule is set
+ _now = datetime.now(timezone.utc)
if self.scheduled_datetime:
- _now = datetime.now(timezone.utc)
logger.debug(
"waiting for '{}', now is '{}', total of {} sec left".format(
self.scheduled_datetime.strftime(_datetime_fmt),
@@ -306,6 +307,8 @@
)
)
wait_until(self.scheduled_datetime)
+ else:
+ self.testrun_starttime = _now.strftime(_datetime_fmt)
self.fiorun.run_shell()
_raw = []
_start = -1
@@ -360,11 +363,13 @@
break
sleep(0.1)
# Save status to results dictionary
- self.results[get_time(timestamp=self.testrun["timestamp"])] = {
+ self.results[self.testrun_starttime] = {
"result": self.testrun,
"timeline": self.timeline
}
self.finished = True
+ self.scheduled_datetime = None
+ self.testrun_starttime = None
return
def healthcheck(self):
@@ -511,9 +516,9 @@
raise CheckerException("Parameter missing: 'scheduled_to'")
else:
# set time and get rid of it from options
- _time = options.pop("scheduled_to")
+ self.fio.testrun_starttime = options.pop("scheduled_to")
self.fio.scheduled_datetime = datetime.strptime(
- _time,
+ self.fio.testrun_starttime,
_datetime_fmt
)
# Fill options
@@ -612,7 +617,7 @@
_opts["readwrite"] = "read"
_opts["ramp_time"] = "1s"
_opts["runtime"] = "5s"
- _opts["scheduled_to"] = "11/13/2021, 23:03:30+0000"
+ _opts["scheduled_to"] = "11/23/2021, 21:48:20+0000"
_shell.do_scheduledrun(_opts)
_shell()
_times = _shell.get_resultlist()
@@ -627,8 +632,8 @@
_opts["readwrite"] = "read"
_opts["ramp_time"] = "1s"
_opts["runtime"] = "10s"
- _opts["scheduled_to"] = "11/13/2021, 23:04:20+0000"
- _shell.do_scheduledrun(_opts)
+ # _opts["scheduled_to"] = "11/23/2021, 21:40:30+0000"
+ _shell.do_singlerun(_opts)
_shell()
_times = _shell.get_resultlist()
print("# results:\n{}".format("\n".join(_times)))
diff --git a/cfg_checker/common/file_utils.py b/cfg_checker/common/file_utils.py
index 6fbb675..faf7cf0 100644
--- a/cfg_checker/common/file_utils.py
+++ b/cfg_checker/common/file_utils.py
@@ -102,7 +102,7 @@
os.mkdir(_folder)
return "... folder '{}' created".format(_folder)
else:
- return "... folder is at '{}'".format(_folder)
+ return "... folder exists at '{}'".format(_folder)
def ensure_folder_removed(_folder):
diff --git a/cfg_checker/common/kube_utils.py b/cfg_checker/common/kube_utils.py
index 22eee30..3e15095 100644
--- a/cfg_checker/common/kube_utils.py
+++ b/cfg_checker/common/kube_utils.py
@@ -206,7 +206,6 @@
class KubeRemote(KubeApi):
def __init__(self, config):
super(KubeRemote, self).__init__(config)
- self._coreV1 = None
self._appsV1 = None
self._podV1 = None
self._custom = None
@@ -219,12 +218,10 @@
@property
def CoreV1(self):
- if not self._coreV1:
- if self.is_local:
- self._coreV1 = kclient.CoreV1Api(kclient.ApiClient())
- else:
- self._coreV1 = kclient.CoreV1Api(kclient.ApiClient(self.kConf))
- return self._coreV1
+ if self.is_local:
+ return kclient.CoreV1Api(kclient.ApiClient())
+ else:
+ return kclient.CoreV1Api(kclient.ApiClient(self.kConf))
@property
def AppsV1(self):
@@ -377,6 +374,7 @@
return _pods
+ @retry(ApiException, initial_wait=5)
def exec_on_target_pod(
self,
cmd,
@@ -425,6 +423,7 @@
cmd = cmd if isinstance(cmd, list) else cmd.split()
if arguments:
cmd += [arguments]
+ # Make sure that CoreV1 is fresh before calling it
_pod_stream = stream(
self.CoreV1.connect_get_namespaced_pod_exec,
_pname,
@@ -453,8 +452,6 @@
)
if not _output:
_output = _error
- # Force recreate of api objects
- self._coreV1 = None
# Send output
return _output
@@ -596,7 +593,7 @@
return []
- @retry(ApiException)
+ @retry(ApiException, initial_wait=5)
def get_pods_for_daemonset(self, ds):
# get all pod names for daemonset
logger_cli.debug(
@@ -612,6 +609,7 @@
)
return _pods
+ @retry(ApiException, initial_wait=5)
def put_string_buffer_to_pod_as_textfile(
self,
pod_name,
@@ -653,15 +651,12 @@
logger_cli.debug("... STDERR: %s" % response.read_stderr())
if commands:
c = commands.pop(0)
- logger_cli.debug("... running command... {}\n".format(c))
+ logger_cli.debug("... running command... {}".format(c))
response.write_stdin(str(c, encoding='utf-8'))
else:
break
response.close()
- # Force recreate of Api objects
- self._coreV1 = None
-
return
def get_custom_resource(self, group, version, plural):
@@ -824,6 +819,12 @@
name
)
+ def list_pods(self, ns, label_str=None):
+ return self.CoreV1.list_namespaced_pod(
+ ns,
+ label_selector=label_str
+ )
+
def get_svc_by_name_and_ns(self, name, ns):
return self.safe_get_item_by_name(
self.CoreV1.list_namespaced_service(
@@ -833,6 +834,12 @@
name
)
+ def list_svc(self, ns, label_str=None):
+ return self.CoreV1.list_namespaced_service(
+ ns,
+ label_selector=label_str
+ )
+
def get_pvc_by_name_and_ns(self, name, ns):
return self.safe_get_item_by_name(
self.CoreV1.list_namespaced_persistent_volume_claim(
@@ -842,6 +849,12 @@
name
)
+ def list_pvc(self, ns, label_str=None):
+ return self.CoreV1.list_namespaced_persistent_volume_claim(
+ ns,
+ label_selector=label_str
+ )
+
def get_pv_by_name(self, name):
return self.safe_get_item_by_name(
self.CoreV1.list_persistent_volume(
@@ -850,6 +863,11 @@
name
)
+ def list_pv(self, label_str=None):
+ return self.CoreV1.list_persistent_volume(
+ label_selector=label_str
+ )
+
def wait_for_phase(self, ttype, name, ns, phase_list, timeout=120):
logger_cli.debug(
"... waiting '{}'s until {} is '{}'".format(
diff --git a/cfg_checker/modules/ceph/__init__.py b/cfg_checker/modules/ceph/__init__.py
index 0f1de01..f9bf3ca 100644
--- a/cfg_checker/modules/ceph/__init__.py
+++ b/cfg_checker/modules/ceph/__init__.py
@@ -89,7 +89,7 @@
)
ceph_bench_parser.add_argument(
'--task-file',
- metavar='task-file',
+ metavar='task_file',
help="Task file for benchmark"
)
ceph_bench_parser.add_argument(
@@ -97,6 +97,16 @@
action="store_true", default=False,
help="Do not cleanup services, agents, pvc, and pv"
)
+ ceph_bench_parser.add_argument(
+ '--cleanup-only',
+ action="store_true", default=False,
+ help="Cleanup resources related to benchmark"
+ )
+ ceph_bench_parser.add_argument(
+ '--dump-path',
+ metavar="dump_results", default="/tmp",
+ help="Dump result after each test run to use them later"
+ )
return _parser
@@ -149,8 +159,29 @@
def do_bench(args, config):
# Ceph Benchmark using multiple pods
+ # if only cleanup needed do it and exit
+ _cleanup_only = args_utils.get_arg(args, 'cleanup_only')
+ config.resource_prefix = "cfgagent"
+ if _cleanup_only:
+ # Do forced resource cleanup and exit
+ config.bench_mode = "cleanup"
+ config.bench_agent_count = -1
+ ceph_bench = bench.KubeCephBench(config)
+ logger_cli.info(
+ "# Discovering benchmark resources using prefix of '{}'".format(
+ config.resource_prefix
+ )
+ )
+ ceph_bench.prepare_cleanup()
+ ceph_bench.cleanup()
+ return
+
+ # gather Ceph info
+ logger_cli.info("# Collecting Ceph cluster information")
+ ceph_info = info.KubeCephInfo(config)
+
# Prepare the tasks and do synced testrun or a single one
- logger_cli.info("# Initializing benchmark run")
+ logger_cli.info("# Initializing ceph benchmark module")
args_utils.check_supported_env(ENV_TYPE_KUBE, args, config)
_filename = args_utils.get_arg(args, 'html')
# agents count option
@@ -161,6 +192,17 @@
_storage_class = args_utils.get_arg(args, "storage_class")
logger_cli.info("-> using storage class of '{}'".format(_storage_class))
config.bench_storage_class = _storage_class
+ # dump results options
+ _dump_path = args_utils.get_arg(args, "dump_path")
+ if _dump_path:
+ logger_cli.info("# Results will be dumped to '{}'".format(_dump_path))
+ config.bench_results_dump_path = _dump_path
+ else:
+ logger_cli.info(
+ "# No result dump path set. "
+ "Consider setting it if running long task_file based test runs"
+ )
+ config.bench_results_dump_path = _dump_path
# Task files or options
_task_file = args_utils.get_arg(args, "task_file", nofail=True)
if not _task_file:
@@ -180,12 +222,20 @@
# init the Bench class
ceph_bench = bench.KubeCephBench(config)
+ ceph_bench.set_ceph_info_class(ceph_info)
# Do the testrun
ceph_bench.prepare_agents(_opts)
+ ceph_bench.wait_ceph_cooldown()
+
+ # DEBUG of report in progress
if not ceph_bench.run_benchmark(_opts):
# No cleaning and/or report if benchmark was not finished
logger_cli.info("# Abnormal benchmark run, no cleaning performed")
return
+ # Remove after DEBUG
+ # ceph_bench.collect_results(_opts)
+ # END DEBUG
+
# Cleaning
if not config.no_cleaning_after_benchmark:
ceph_bench.cleanup()
diff --git a/cfg_checker/modules/ceph/bench.py b/cfg_checker/modules/ceph/bench.py
index 7640440..d804f4a 100644
--- a/cfg_checker/modules/ceph/bench.py
+++ b/cfg_checker/modules/ceph/bench.py
@@ -7,7 +7,9 @@
from cfg_checker.common import logger_cli
from cfg_checker.common.decorators import retry
+from cfg_checker.common.file_utils import write_str_to_file
from cfg_checker.helpers.console_utils import Progress
+from cfg_checker.reports import reporter
# from cfg_checker.common.exception import InvalidReturnException
# from cfg_checker.common.exception import ConfigException
# from cfg_checker.common.exception import KubeException
@@ -16,6 +18,27 @@
from cfg_checker.agent.fio_runner import _get_seconds, _datetime_fmt
+def _reformat_timestr(_str, _chars=["/", ",", " ", ":", "+"], _tchar=""):
+ _new = ""
+ for _c in _str:
+ _new += _c if _c not in _chars else _tchar
+ return _new
+
+
+def _parse_json_output(buffer):
+ try:
+ return json.loads(buffer)
+ except TypeError as e:
+ logger_cli.error(
+ "ERROR: Status not decoded: {}\n{}".format(e, buffer)
+ )
+ except json.decoder.JSONDecodeError as e:
+ logger_cli.error(
+ "ERROR: Status not decoded: {}\n{}".format(e, buffer)
+ )
+ return {}
+
+
class CephBench(object):
_agent_template = "cfgagent-template.yaml"
@@ -41,17 +64,30 @@
self.agent_count = config.bench_agent_count
self.master = KubeNodes(config)
super(KubeCephBench, self).__init__(config)
- self.storage_class = config.bench_storage_class
- self.agent_pods = []
- self.services = []
- self.scheduled_delay = 30
+
self.mode = config.bench_mode
+ self.resource_prefix = config.resource_prefix
if config.bench_mode == "tasks":
self.taskfile = config.bench_task_file
self.load_tasks(self.taskfile)
+ elif config.bench_mode == "cleanup":
+ self.cleanup_list = []
+ return
+
+ self.storage_class = config.bench_storage_class
+ self.results_dump_path = config.bench_results_dump_path
+ self.agent_pods = []
+ self.services = []
+ # By default,
+ # 30 seconds should be enough to send tasks to 3-5 agents
+ self.scheduled_delay = 30
self.cleanup_list = []
self.results = {}
+ self.agent_results = {}
+
+ def set_ceph_info_class(self, ceph_info):
+ self.ceph_info = ceph_info
def load_tasks(self, taskfile):
# Load csv file
@@ -68,10 +104,33 @@
"iodepth": row[3],
"size": row[4]
})
+ logger_cli.info("-> Loaded {} tasks".format(len(self.tasks)))
def add_for_deletion(self, obj, typ):
- _d = [typ, obj.metadata.namespace, obj.metadata.name]
- self.cleanup_list.append(_d)
+ self.cleanup_list.append(
+ [
+ typ,
+ obj.metadata.namespace,
+ obj.metadata.name
+ ]
+ )
+ return
+
+ def prepare_cleanup(self):
+ # Assume number of resources not given
+ # list all svc, pod, pvc, pv and identify 'cfgagent-xx ones
+ _types = ["pv", "pvc", "pod", "svc"]
+ _prefix = self.resource_prefix
+ for _typ in _types:
+ _list = self.master.list_resource_names_by_type_and_ns(_typ)
+ for ns, name in _list:
+ if name.startswith(_prefix):
+ if ns:
+ _msg = "{} {}/{}".format(_typ, ns, name)
+ else:
+ _msg = "{} {}".format(_typ, name)
+ logger_cli.info("-> Found {}".format(_msg))
+ self.cleanup_list.append([_typ, ns, name])
return
def prepare_agents(self, options):
@@ -98,19 +157,23 @@
# Save service
self.services.append(_svc)
# prepopulate results
- self.results[_agent.metadata.name] = {}
- self.results[_agent.metadata.name]["list"] = {}
- self.results[_agent.metadata.name]["url"] = \
+ self.agent_results[_agent.metadata.name] = {}
+ self.agent_results[_agent.metadata.name]["url"] = \
"http://{}:{}/api/".format(
_svc.spec.cluster_ip,
8765
)
- self.results[_agent.metadata.name]["storage_class"] = \
+ self.agent_results[_agent.metadata.name]["storage_class"] = \
self.storage_class
- self.results[_agent.metadata.name]["volume_size"] = \
+ self.agent_results[_agent.metadata.name]["volume_size"] = \
options['size']
logger_cli.info("-> Done creating agents")
+ # TODO: Update after implementing pooled task sending
+ self.scheduled_delay = self.agent_count * 6
+ logger_cli.info(
+ "-> Schedule delay set to {} sec".format(self.scheduled_delay)
+ )
return
def _poke_agent(self, url, body, action="GET"):
@@ -141,18 +204,7 @@
self.master._namespace,
" ".join(_cmd)
)
- try:
- return json.loads(_ret)
- except TypeError as e:
- logger_cli.error(
- "ERROR: Status not decoded: {}\n{}".format(e, _ret)
- )
- except json.decoder.JSONDecodeError as e:
- logger_cli.error(
- "ERROR: Status not decoded: {}\n{}".format(e, _ret)
- )
-
- return None
+ return _parse_json_output(_ret)
def _ensure_agents_ready(self):
# make sure agents idle
@@ -190,18 +242,24 @@
def get_agents_status(self):
_status = {}
- for _agent, _d in self.results.items():
- _status[_agent] = self._poke_agent(_d["url"] + "fio", {})
+ _results = self.master.exec_on_labeled_pods_and_ns(
+ "app=cfgagent",
+ "curl -s http://localhost:8765/api/fio"
+ )
+ for _agent, _result in _results.items():
+ _j = _parse_json_output(_result)
+ _status[_agent] = _j
return _status
+ @retry(Exception, initial_wait=5)
def get_agents_resultlist(self):
_t = {"module": "fio", "action": "get_resultlist"}
_status = {}
- for _agent, _d in self.results.items():
+ for _agent, _d in self.agent_results.items():
_status[_agent] = self._poke_agent(_d["url"], _t, action="POST")
return _status
- @retry(Exception)
+ @retry(Exception, initial_wait=5)
def get_result_from_agent(self, agent, time):
_t = {
"module": "fio",
@@ -210,7 +268,11 @@
"time": time
}
}
- return self._poke_agent(self.results[agent]["url"], _t, action="POST")
+ return self._poke_agent(
+ self.agent_results[agent]["url"],
+ _t,
+ action="POST"
+ )
def _get_next_scheduled_time(self):
_now = datetime.now(timezone.utc)
@@ -228,7 +290,7 @@
"action": "do_scheduledrun",
"options": options
}
- for _agent, _d in self.results.items():
+ for _agent, _d in self.agent_results.items():
logger_cli.info(
"-> sending task to '{}:{}'".format(_agent, _d["url"])
)
@@ -261,7 +323,7 @@
_status["progress"]
)
)
- finished = [True for _s in _sts.values()
+ finished = [True for _s in _sts.values()
if _s["status"] == 'finished']
_fcnt = len(finished)
_tcnt = len(_sts)
@@ -276,8 +338,9 @@
if diff < 0:
logger_cli.info("-> Timed out waiting for agents to finish")
return False
- logger_cli.info("-> Sleeping for {:.2f}s".format(diff/3))
- sleep(diff/3)
+ else:
+ logger_cli.info("-> Sleeping for {:.2f}s".format(2))
+ sleep(2)
if diff <= 0.1:
logger_cli.info("-> Timed out waiting for agents to finish")
return False
@@ -292,12 +355,17 @@
else:
logger_cli.info("-> Finished testrun")
# Get results for each agent
- self.collect_results()
+ self.collect_results(options)
return True
- def _wait_ceph_cooldown(self):
+ def wait_ceph_cooldown(self):
# TODO: Query Ceph ince a 20 sec to make sure its load dropped
+ # get ceph idle status
+ self.ceph_idle_status = self.ceph_info.get_cluster_status()
+ self.health_detail = self.ceph_info.get_health_detail()
+ self.ceph_df = self.ceph_info.get_ceph_df()
+ self.ceph_pg_dump = self.ceph_info.get_ceph_pg_dump()
return
def run_benchmark(self, options):
@@ -309,7 +377,9 @@
# Make sure that Ceph is at low load
# TODO: Ceph status check
- self._wait_ceph_cooldown()
+ # self._wait_ceph_cooldown()
+
+ _get_df = self.ceph_info.get_ceph_osd_df
# Do benchmark according to mode
if self.mode == "tasks":
@@ -321,6 +391,8 @@
# take next task
_total_tasks = len(self.tasks)
for idx in range(_total_tasks):
+ # init time to schedule
+ _osd_df_before = _get_df()
_task = self.tasks[idx]
logger_cli.info(
"-> Starting next task ({}/{})".format(idx+1, _total_tasks)
@@ -333,18 +405,36 @@
)
# update options
options.update(_task)
- # init time to schedule
- options["scheduled_to"] = self._get_next_scheduled_time()
+ _sch_time = self._get_next_scheduled_time()
+ options["scheduled_to"] = _sch_time
+ # init results table
+ self.results[_sch_time] = {
+ "input_options": options,
+ "agents": {},
+ "osd_df_before": _osd_df_before
+ }
if not self._do_testrun(options):
return False
+ else:
+ self.results[_sch_time]["osd_df_after"] = _get_df()
- self._wait_ceph_cooldown()
+ self.wait_ceph_cooldown()
elif self.mode == "single":
logger_cli.info("# Running single benchmark")
+ _osd_df_before = _get_df()
# init time to schedule
- options["scheduled_to"] = self._get_next_scheduled_time()
+ _sch_time = self._get_next_scheduled_time()
+ options["scheduled_to"] = _sch_time
+ # init results table
+ self.results[_sch_time] = {
+ "input_options": options,
+ "agents": {},
+ "osd_df_before": _osd_df_before
+ }
if not self._do_testrun(options):
return False
+ else:
+ self.results[_sch_time]["osd_df_after"] = _get_df()
else:
logger_cli.error("ERROR: Unknown mode '{}'".format(self.mode))
return False
@@ -354,6 +444,7 @@
return True
def cleanup(self):
+ logger_cli.info("# Cleaning up")
self.cleanup_list.reverse()
for _res in self.cleanup_list:
@@ -385,39 +476,179 @@
return
- def collect_results(self):
+ def collect_results(self, options):
+ _sch_time = options["scheduled_to"]
logger_cli.info("# Collecting results")
# query agents for results
_agents = self.get_agents_resultlist()
+ # Syntax shortcut
+ _ar = self.results[_sch_time]["agents"]
for _agent, _l in _agents.items():
- _list = _l["resultlist"]
- _new = [r for r in _list if r not in self.results[_agent]["list"]]
- logger_cli.debug(
- "... agent '{}' has {} new results".format(_agent, len(_new))
- )
- # get all new results
- for _time in _new:
- logger_cli.info(
- "-> loading results for '{}' from '{}'".format(
- _time,
- _agent
+ # Create a syntax shortcut
+ if _agent not in _ar:
+ _ar[_agent] = {}
+ _arl = _ar[_agent]
+ # Check if we already have this locally
+ for _time in _l["resultlist"]:
+ _filename = self._get_dump_filename(_sch_time, _agent, options)
+ if os.path.exists(_filename):
+ # There is a file already for this task
+ # Check if we need to load it
+ if _sch_time in _arl:
+ logger_cli.info(
+ "-> Skipped already processed result '{}'".format(
+ _filename
+ )
+ )
+ else:
+ # Load previously dumped result from disk
+ logger_cli.info(
+ "-> Loading already present result '{}'".format(
+ _filename
+ )
+ )
+ _arl[_sch_time] = self.load_dumped_result(_filename)
+ else:
+ # Load result add it locally and dump it
+ logger_cli.info(
+ "-> Getting results for '{}' from '{}'".format(
+ _sch_time,
+ _agent
+ )
)
- )
- self.results[_agent]["list"].update(
- self.get_result_from_agent(_agent, _time)
- )
+ _r = self.get_result_from_agent(_agent, _time)
+ # Important to switch from result status time
+ # to scheduled time
+ _arl[_sch_time] = _r[_time]
+ # Dump collected result
+ self.dump_result(_filename, _arl[_sch_time])
return
- def dump_results(self, path):
- # Function dumps all availabkle results as jsons to the given path
+ def _get_dump_filename(self, _time, agent, options):
+ _dirname = _reformat_timestr(_time)
+ _filename = "-".join([
+ _dirname,
+ agent,
+ options["readwrite"],
+ options["bs"],
+ str(options["iodepth"]),
+ ]) + ".json"
+ return os.path.join(
+ self.results_dump_path,
+ _dirname,
+ _filename
+ )
+
+ def dump_result(self, filename, data):
+ # Function dumps all available results as jsons to the given path
# overwriting if needed
-
- # TODO: Conduct the dumping
-
+ _folder, _file = os.path.split(filename)
+ # Do dump
+ if not os.path.exists(_folder):
+ os.mkdir(_folder)
+ logger_cli.info("-> Created folder '{}'".format(_folder))
+ # Dump agent data for this test run
+ write_str_to_file(filename, json.dumps(data, indent=2))
+ logger_cli.info("-> Dumped '{}'".format(filename))
return
+ def load_dumped_result(self, filename):
+ try:
+ with open(filename, "rt+") as f:
+ return json.loads(f.read())
+ except FileNotFoundError as e:
+ logger_cli.error(
+ "ERROR: {}".format(e)
+ )
+ except TypeError as e:
+ logger_cli.error(
+ "ERROR: Invalid file ({}): {}".format(filename, e)
+ )
+ except json.decoder.JSONDecodeError as e:
+ logger_cli.error(
+ "ERROR: Failed to decode json ({}): {}".format(filename, e)
+ )
+ return None
+
+ def _lookup_storage_class_id_by_name(self, storage_class_name):
+ # Assume that self had proper data
+ for _pool in self.ceph_df["pools"]:
+ if storage_class_name == _pool["name"]:
+ return _pool["id"]
+ return None
+
+ def calculate_totals(self):
+ # Calculate totals for Read and Write
+ for _time, data in self.results.items():
+ if "totals" not in data:
+ data["totals"] = {}
+ else:
+ continue
+ _totals = data["totals"]
+ _r_bw = 0
+ _r_avglat = []
+ _r_iops = 0
+ _w_bw = 0
+ _w_avglat = []
+ _w_iops = 0
+ for _a, _d in data["agents"].items():
+ # Hardcoded number of jobs param :(
+ _j = _d[_time]["jobs"][0]
+ _r_bw += _j["read"]["bw_bytes"]
+ _r_avglat += [_j["read"]["lat_ns"]["mean"]]
+ _r_iops += _j["read"]["iops"]
+ _w_bw += _j["write"]["bw_bytes"]
+ _w_avglat += [_j["write"]["lat_ns"]["mean"]]
+ _w_iops += _j["write"]["iops"]
+ # Save storage class name
+ if "storage_class" not in _totals:
+ _totals["storage_class"] = \
+ self.agent_results[_a]["storage_class"]
+ # Lookup storage class id and num_pg
+ _totals["storage_class_stats"] = \
+ reporter.get_pool_stats_by_id(
+ self._lookup_storage_class_id_by_name(
+ self.agent_results[_a]["storage_class"]
+ ),
+ self.ceph_pg_dump
+ )
+
+ _totals["read_bw_bytes"] = _r_bw
+ _totals["read_avg_lat_us"] = \
+ (sum(_r_avglat) / len(_r_avglat)) / 1000
+ _totals["read_iops"] = _r_iops
+ _totals["write_bw_bytes"] = _w_bw
+ _totals["write_avg_lat_us"] = \
+ (sum(_w_avglat) / len(_w_avglat)) / 1000
+ _totals["write_iops"] = _w_iops
+
# Create report
def create_report(self, filename):
+ """
+ Create static html showing ceph info report
+
+ :return: none
+ """
+ logger_cli.info("### Generating report to '{}'".format(filename))
+ _report = reporter.ReportToFile(
+ reporter.HTMLCephBench(self),
+ filename
+ )
+ self.calculate_totals()
+ _report(
+ {
+ "results": self.results,
+ "idle_status": self.ceph_idle_status,
+ "health_detail": self.health_detail,
+ "ceph_df": self.ceph_df,
+ "ceph_pg_dump": self.ceph_pg_dump,
+ "info": self.ceph_info.ceph_info,
+ "cluster": self.ceph_info.cluster_info,
+ "ceph_version": self.ceph_info.ceph_version,
+ "nodes": self.agent_pods
+ }
+ )
+ logger_cli.info("-> Done")
return
diff --git a/cfg_checker/modules/ceph/info.py b/cfg_checker/modules/ceph/info.py
index 9b55c3f..56e250e 100644
--- a/cfg_checker/modules/ceph/info.py
+++ b/cfg_checker/modules/ceph/info.py
@@ -355,6 +355,21 @@
))
return self.cluster_info['version']
+ def get_cluster_status(self):
+ return self._safe_get_cmd_output_as_json("ceph -s -f json")
+
+ def get_health_detail(self):
+ return self._safe_get_cmd_output_as_json("ceph -f json health detail")
+
+ def get_ceph_df(self):
+ return self._safe_get_cmd_output_as_json("ceph df -f json")
+
+ def get_ceph_pg_dump(self):
+ return self._safe_get_cmd_output_as_json("ceph pg dump -f json")
+
+ def get_ceph_osd_df(self):
+ return self._safe_get_cmd_output_as_json("ceph osd df -f json")
+
def gather_info(self):
logger_cli.info("# Gathering Ceph cluster info")
# Collect info
@@ -394,14 +409,14 @@
self._add_ceph_info_item(
"cluster_status",
"Cluster status",
- _cj("ceph -s -f json")
+ self.get_cluster_status()
)
logger_cli.info("-> Collecting health detail")
self._add_ceph_info_item(
"health_detail",
"Health details",
- _cj("ceph -f json health detail")
+ self.get_health_detail()
)
logger_cli.info("-> Collecting monmap")
@@ -415,14 +430,14 @@
self._add_ceph_info_item(
"ceph_df",
"Ceph DF",
- _cj("ceph df -f json")
+ self.get_ceph_df()
)
logger_cli.info("-> Collecting ceph osd df")
self._add_ceph_info_item(
"ceph_osd_df",
"Ceph OSD DF",
- _cj("ceph osd df -f json")
+ self.get_ceph_osd_df()
)
logger_cli.info("-> Collecting ceph osd dump")
@@ -463,7 +478,7 @@
self._add_ceph_info_item(
"ceph_pg_dump",
"Ceph PG dump",
- _cj("ceph pg dump -f json")
+ self.get_ceph_pg_dump()
)
logger_cli.info("-> Collecting ceph running configuration")
diff --git a/cfg_checker/modules/network/mapper.py b/cfg_checker/modules/network/mapper.py
index dea7d4e..c853724 100644
--- a/cfg_checker/modules/network/mapper.py
+++ b/cfg_checker/modules/network/mapper.py
@@ -789,7 +789,7 @@
self.daemonset = _d
return self.daemonset
- def get_script_output(self, script, args=None):
+ def get_script_output(self, script, _args=None):
"""
Get runtime network by creating DaemonSet with Host network parameter
"""
@@ -801,7 +801,7 @@
_result = self.master.execute_cmd_on_daemon_set(
_daemonset,
script,
- args=args,
+ _args=_args,
is_script=True
)
@@ -823,7 +823,7 @@
_networks = None
if source == self.RUNTIME:
logger_cli.info("# Mapping node runtime network data")
- _r = self.get_script_output("ifs_data.py", args="json")
+ _r = self.get_script_output("ifs_data.py", _args="json")
_networks = self._map_runtime_networks(_r)
else:
raise ConfigException(
diff --git a/cfg_checker/nodes.py b/cfg_checker/nodes.py
index 49284ca..a673842 100644
--- a/cfg_checker/nodes.py
+++ b/cfg_checker/nodes.py
@@ -1085,12 +1085,44 @@
self,
ds,
cmd,
- args=None,
+ _args=None,
is_script=False
):
"""
Query daemonset for pods and execute script on all of them
"""
+ _results = self.exec_cmd_on_pods(
+ self.kube.get_pods_for_daemonset(ds),
+ cmd,
+ _args=_args,
+ is_script=is_script
+ )
+ # Update results
+ _ds_results = {}
+ for _n, _, _v in _results:
+ _ds_results[_n] = _v
+ return _ds_results
+
+ def exec_on_labeled_pods_and_ns(self, label_str, cmd, _args=None, ns=None):
+ if not ns:
+ ns = self._namespace
+ _results = self.exec_cmd_on_pods(
+ self.kube.list_pods(ns, label_str=label_str),
+ cmd,
+ _args=_args
+ )
+ _pod_results = {}
+ for _, _p, _v in _results:
+ _pod_results[_p] = _v
+ return _pod_results
+
+ def exec_cmd_on_pods(
+ self,
+ pod_list,
+ cmd,
+ _args=None,
+ is_script=False
+ ):
def _kube_exec_on_pod(plist):
return [
plist[1], # node
@@ -1105,16 +1137,15 @@
)
]
- _pods = self.kube.get_pods_for_daemonset(ds)
# Create map for threads: [[node_name, ns, pod_name, cmd]...]
logger_cli.debug(
"... runnning script on {} pods using {} threads at a time".format(
- len(_pods.items),
+ len(pod_list.items),
self.env_config.threads
)
)
_plist = []
- _arguments = args if args else ""
+ _arguments = _args if _args else ""
if is_script:
_cmd = [
"python3",
@@ -1133,7 +1164,7 @@
_arguments = cmd
else:
_cmd = cmd
- for item in _pods.items:
+ for item in pod_list.items:
_plist.append(
[
self,
@@ -1147,7 +1178,7 @@
# map func and cmd
pool = Pool(self.env_config.threads)
- _results = {}
+ _results = []
self.not_responded = []
# create result list
_progress = Progress(len(_plist))
@@ -1157,7 +1188,7 @@
if not ii[1][1]:
self.not_responded.append(ii[1][0])
else:
- _results[ii[1][0]] = ii[1][2]
+ _results.append(ii[1])
_progress.write_progress(ii[0])
_progress.end()
@@ -1375,3 +1406,17 @@
return _t.status.phase
else:
return None
+
+ def list_resource_names_by_type_and_ns(self, typ, ns="qa-space"):
+ if typ == "pod":
+ _items = self.kube.list_pods(ns)
+ elif typ == "svc":
+ _items = self.kube.list_svc(ns)
+ elif typ == "pvc":
+ _items = self.kube.list_pvc(ns)
+ elif typ == "pv":
+ _items = self.kube.list_pv()
+ else:
+ logger_cli.error("ERROR: '{}' is not supported yet".format(typ))
+ return None
+ return [[i.metadata.namespace, i.metadata.name] for i in _items.items]
diff --git a/cfg_checker/reports/reporter.py b/cfg_checker/reports/reporter.py
index dc9a2cf..150ce65 100644
--- a/cfg_checker/reports/reporter.py
+++ b/cfg_checker/reports/reporter.py
@@ -6,6 +6,7 @@
from cfg_checker.common import const
from cfg_checker.common import logger_cli
from cfg_checker.common.file_utils import read_file_as_lines
+from cfg_checker.modules.ceph.bench import _reformat_timestr
import jinja2
@@ -164,6 +165,10 @@
return _steps
+def time_strip(timestring):
+ return _reformat_timestr(timestring, _tchar="")
+
+
def get_osdmap(cs):
_osdmap = cs
while True:
@@ -185,7 +190,7 @@
}
-def get_pool_stats(id, pgdump):
+def get_pool_stats_by_id(id, pgdump):
_stats = {}
for pool in pgdump["pg_map"]["pool_stats"]:
if id == pool["poolid"]:
@@ -248,8 +253,9 @@
self.jinja2_env.filters['to_mb'] = to_mb
self.jinja2_env.filters['get_bucket_item_name'] = get_bucket_item_name
self.jinja2_env.filters['get_rule_steps'] = get_rule_steps
- self.jinja2_env.filters['get_pool_stats'] = get_pool_stats
+ self.jinja2_env.filters['get_pool_stats'] = get_pool_stats_by_id
self.jinja2_env.filters['get_osdmap'] = get_osdmap
+ self.jinja2_env.filters['tstrip'] = time_strip
# render!
logger_cli.info("-> Using template: {}".format(self.tmpl))
@@ -283,6 +289,10 @@
tmpl = "ceph_info_html.j2"
+class HTMLCephBench(_TMPLBase):
+ tmpl = "ceph_bench_html.j2"
+
+
# Package versions report
class HTMLModelCompare(_TMPLBase):
tmpl = "model_tree_cmp_tmpl.j2"