Log collector module
New:
- [Done] multiple namespace selector
- [Done] keyword-based pod selector
- [Done] per-pod logs syntax detection and parsing
- [Differed] in-place filtering for shorter logs
- [Done] individual logs timestamp detection
- [Done] Unix time bases Timestamp sorting
- [Done] Single file logs output using common format
- [Done] add all log types from all MOS namespaces and pods
Update:
- resource preparation can be skipped per module
- updated log collection using multiple threads
- new setting LOG_COLLECT_THREADS
Fixes:
- Network MTU fix
- Faster cmd execution on single pod
- Ceph benchmark validations
- Ceph benchmark report sorting
- Daemonset deployment with nodes skipped
- Network tree debugging script
- Tree depth limiter, i.e. stackoverflow prevention
Related-PROD: PROD-36845
Change-Id: Icf229ac62078c6418ab4dbdff12b0d27ed42af1d
diff --git a/cfg_checker/modules/ceph/__init__.py b/cfg_checker/modules/ceph/__init__.py
index 5c9357b..31c6b7a 100644
--- a/cfg_checker/modules/ceph/__init__.py
+++ b/cfg_checker/modules/ceph/__init__.py
@@ -1,9 +1,13 @@
# Author: Alex Savatieiev (osavatieiev@mirantis.com; a.savex@gmail.com)
# Copyright 2019-2022 Mirantis, Inc.
+import os
+
from cfg_checker.agent.fio_runner import get_fio_options
-from cfg_checker.agent.fio_runner import seq_modes, mix_modes
+from cfg_checker.agent.fio_runner import seq_modes, mix_modes, rand_modes
from cfg_checker.common import logger_cli
+from cfg_checker.common.other import utils
from cfg_checker.common.settings import ENV_TYPE_KUBE
+from cfg_checker.common.exception import CheckerException
from cfg_checker.helpers import args_utils
from cfg_checker.modules.ceph import info, bench
@@ -33,6 +37,32 @@
# else:
# return _class
+
+def _validate_option_type(value, type_list):
+ _s, _t = utils.split_option_type(value)
+ if _t not in type_list:
+ raise CheckerException(
+ "Invalid option type '{}'. Expected types: {}".format(
+ value,
+ ", ".join(type_list)
+ )
+ )
+ else:
+ return
+
+
+def _validate_option(value, type_list):
+ if value not in type_list:
+ raise CheckerException(
+ "Invalid option '{}'. Expected one of: {}".format(
+ value,
+ ", ".join(type_list)
+ )
+ )
+ else:
+ return
+
+
def _get_param_and_log(arg, param_str):
_value = args_utils.get_arg(arg, param_str)
logger_cli.info(" {}={}".format(param_str, _value))
@@ -242,6 +272,10 @@
# dump results options
_dump_path = args_utils.get_arg(args, "dump_path")
if _dump_path:
+ if not os.path.exists(_dump_path):
+ raise CheckerException(
+ "ERROR: Dump path invalid: '{}'".format(_dump_path)
+ )
logger_cli.info("# Results will be dumped to '{}'".format(_dump_path))
config.bench_results_dump_path = _dump_path
else:
@@ -345,6 +379,38 @@
# init the Bench class
ceph_bench = bench.KubeCephBench(config)
ceph_bench.set_ceph_info_class(ceph_info)
+
+ # Validate options
+ logger_cli.info("-> Validating options")
+ # size
+ _validate_option_type(_opts["size"], ["G", "M"])
+ _validate_option_type(_opts["ramp_time"], ["s", "m"])
+ _validate_option_type(_opts["runtime"], ["s", "m"])
+ _modes = seq_modes + mix_modes + rand_modes
+ _validate_option(_opts["readwrite"], _modes)
+
+ if _task_file:
+ _s, _ = utils.split_option_type(_opts["size"])
+ for idx in range(len(ceph_bench.tasks)):
+ # size
+ _ts, _ = utils.split_option_type(ceph_bench.tasks[idx]["size"])
+ if _s < _ts:
+ logger_cli.error(
+ "ERROR: Task #{} file size is to big:"
+ " {} (volume) < {} (testfile)".format(
+ idx,
+ _opts["size"],
+ ceph_bench.tasks[idx]["size"]
+ )
+ )
+ # other
+ _validate_option(ceph_bench.tasks[idx]["readwrite"], _modes)
+ # Print defaults
+ logger_cli.debug("... default/selected options for fio:")
+ for _k in _opts.keys():
+ # TODO: Update options for single run
+ logger_cli.debug(" {} = {}".format(_k, _opts[_k]))
+
# Preload previous results for this name
ceph_bench.preload_results()
# Do the testrun
diff --git a/cfg_checker/modules/ceph/bench.py b/cfg_checker/modules/ceph/bench.py
index f5af704..b79007c 100644
--- a/cfg_checker/modules/ceph/bench.py
+++ b/cfg_checker/modules/ceph/bench.py
@@ -10,6 +10,7 @@
from cfg_checker.common import logger_cli
from cfg_checker.common.decorators import retry
from cfg_checker.common.file_utils import write_str_to_file
+from cfg_checker.common.other import utils
from cfg_checker.helpers.console_utils import Progress
from cfg_checker.helpers.console_utils import cl_typewriter
from cfg_checker.reports import reporter
@@ -45,19 +46,6 @@
return {}
-def _split_vol_size(size):
- # I know, but it is faster then regex
- _numbers = [48, 49, 50, 51, 52, 53, 54, 55, 56, 57]
- _s_int = "0"
- _s_type = ""
- for ch in size:
- if ord(ch) in _numbers:
- _s_int += ch
- else:
- _s_type += ch
- return int(_s_int), _s_type
-
-
class CephBench(object):
_agent_template = "cfgagent-template.yaml"
@@ -165,7 +153,7 @@
logger_cli.info("# Preparing {} agents".format(self.agent_count))
# Increase volume size a bit, so datafile fits
_quanitizer = 1.3
- _v_size, _vol_size_units = _split_vol_size(options['size'])
+ _v_size, _vol_size_units = utils.split_option_type(options['size'])
_v_size = round(_v_size * _quanitizer)
_vol_size = str(_v_size) + _vol_size_units + "i"
logger_cli.info(
diff --git a/cfg_checker/modules/ceph/info.py b/cfg_checker/modules/ceph/info.py
index 2c62018..db3dd75 100644
--- a/cfg_checker/modules/ceph/info.py
+++ b/cfg_checker/modules/ceph/info.py
@@ -313,49 +313,58 @@
self._safe_tools_cmd("rm -f " + _tar_path)
return _json
- def _safe_get_cmd_output_as_json(self, cmd, zipped=False):
- if zipped:
- _buf = self._safe_tools_cmd_zipped_output(cmd)
- else:
- _buf = self._safe_tools_cmd(cmd)
+ @staticmethod
+ def _as_json(buf):
try:
- return json.loads(_buf)
+ return json.loads(buf)
except ValueError as e:
_out = ""
- if len(_buf) > 512:
- _out = _buf[:512]
+ if len(buf) > 512:
+ _out = buf[:512]
_out += "..."
else:
- _out = _buf
+ _out = buf
logger_cli.error(
"\nERROR: failed to parse json: '{}'. Data: '{}'".format(
e,
_out
)
)
- return _buf
+ return buf
+
+ def _safe_get_cmd_output_as_json(self, cmd, zipped=False):
+ if zipped:
+ _buf = self._safe_tools_cmd_zipped_output(cmd)
+ else:
+ _buf = self._safe_tools_cmd(cmd)
+ return self._as_json(_buf)
def _get_tools_pod_name(self):
# get ceph pod
- _names = self.master.kube.get_pod_names_by_partial_name(
+ _pods = self.master.kube.get_pods_by_partial_name(
self.ceph_app_label,
self.ceph_ns
)
- if not _names:
+ # _names = self.master.kube.get_pod_names_by_partial_name(
+ # self.ceph_app_label,
+ # self.ceph_ns
+ # )
+ if not _pods:
raise KubeException(
"Failed to find pod using '{}'".format(self.ceph_app_label)
)
- elif len(_names) > 1:
+ elif len(_pods) > 1:
logger_cli.warning(
"WARNING: Environment has more than one pod "
"with '{}' app: {}".format(
self.ceph_app_label,
- ", ".join(_names)
+ ", ".join([p.metadata.name for p in _pods])
)
)
else:
- logger_cli.debug("... found '{}'".format(_names[0]))
- return _names[0]
+ logger_cli.debug("... found '{}'".format(_pods[0].metadata.name))
+ self.ceph_pod = _pods[0]
+ return _pods[0].metadata.name
def _add_ceph_info_item(self, key, title, data, filename=None):
# handle data
@@ -572,8 +581,7 @@
_health_metrics = {}
_devices = _c("ceph device ls")
_devices = _devices.splitlines()
- _progress = Progress(len(_devices)-1)
- _index = 1
+ cmd_list = []
for device in _devices:
_t = device.split()
_dev = _t[0]
@@ -582,14 +590,31 @@
if _dev == "DEVICE":
continue
- _metric = _cj("ceph device get-health-metrics {}".format(_dev))
+ # _metric = _cj("ceph device get-health-metrics {}".format(_dev))
+ _cmd = "ceph device get-health-metrics {}".format(_dev)
+ cmd_list.append(_cmd)
_dev_name = "{}_{}".format(_osd, _dev)
- _health_metrics[_dev_name] = _metric
+ _health_metrics[_dev_name] = {}
_health_metrics[_dev_name]['node_name'] = _node
_health_metrics[_dev_name]['osd_name'] = _osd
- _progress.write_progress(_index, note=_dev_name)
- _index += 1
- _progress.end()
+ _health_metrics[_dev_name]['cmd'] = _cmd
+
+ results = self.master.exec_cmds_on_pod(
+ self.ceph_pod,
+ cmd_list
+ )
+
+ logger_cli.info("-> Processing results")
+ for _r in results:
+ _cmd = _r[3]
+ _j = self._as_json(_r[2])
+ for _dev_name in _health_metrics.keys():
+ if "cmd" in _health_metrics[_dev_name] and \
+ _health_metrics[_dev_name]["cmd"] == _cmd:
+ _health_metrics[_dev_name].update(_j)
+ _health_metrics[_dev_name].pop("cmd")
+ break
+
self._add_ceph_info_item(
"ceph_health",
"Ceph Health Metrics",
@@ -633,21 +658,29 @@
logger_cli.info(
"-> Gathering OSD configuration ({})".format(_total_osd)
)
- # Shortcuts
- # _c = self._safe_tools_cmd
- _cj = self._safe_get_cmd_output_as_json
- _progress = Progress(_total_osd)
- _idx = 1
- _cfgs = {}
+ cmds = {}
+ cmd_list = []
for _osd in self.ceph_info["ceph_osd_df"]["data"]["nodes"]:
- _progress.write_progress(_idx, note=_osd["name"])
- _cfgs[_osd["name"]] = _cj(
- "ceph config show-with-defaults -f json {}".format(
- _osd["name"]
- )
+ _cmd = "ceph config show-with-defaults -f json {}".format(
+ _osd["name"]
)
- _idx += 1
- _progress.end()
+ cmd_list.append(_cmd)
+ cmds[_osd["name"]] = _cmd
+
+ results = self.master.exec_cmds_on_pod(
+ self.ceph_pod,
+ cmd_list
+ )
+
+ logger_cli.info("-> Processing results")
+ _cfgs = {}
+ for _r in results:
+ _cmd = _r[3]
+ _j = self._as_json(_r[2])
+ for _osd_name in cmds.keys():
+ if cmds[_osd_name] == _cmd:
+ _cfgs[_osd_name] = _j
+ break
# Process configs
_base = {}