Log collector module
New:
- [Done] multiple namespace selector
- [Done] keyword-based pod selector
- [Done] per-pod logs syntax detection and parsing
- [Differed] in-place filtering for shorter logs
- [Done] individual logs timestamp detection
- [Done] Unix time bases Timestamp sorting
- [Done] Single file logs output using common format
- [Done] add all log types from all MOS namespaces and pods
Update:
- resource preparation can be skipped per module
- updated log collection using multiple threads
- new setting LOG_COLLECT_THREADS
Fixes:
- Network MTU fix
- Faster cmd execution on single pod
- Ceph benchmark validations
- Ceph benchmark report sorting
- Daemonset deployment with nodes skipped
- Network tree debugging script
- Tree depth limiter, i.e. stackoverflow prevention
Related-PROD: PROD-36845
Change-Id: Icf229ac62078c6418ab4dbdff12b0d27ed42af1d
diff --git a/cfg_checker/modules/ceph/__init__.py b/cfg_checker/modules/ceph/__init__.py
index 5c9357b..31c6b7a 100644
--- a/cfg_checker/modules/ceph/__init__.py
+++ b/cfg_checker/modules/ceph/__init__.py
@@ -1,9 +1,13 @@
# Author: Alex Savatieiev (osavatieiev@mirantis.com; a.savex@gmail.com)
# Copyright 2019-2022 Mirantis, Inc.
+import os
+
from cfg_checker.agent.fio_runner import get_fio_options
-from cfg_checker.agent.fio_runner import seq_modes, mix_modes
+from cfg_checker.agent.fio_runner import seq_modes, mix_modes, rand_modes
from cfg_checker.common import logger_cli
+from cfg_checker.common.other import utils
from cfg_checker.common.settings import ENV_TYPE_KUBE
+from cfg_checker.common.exception import CheckerException
from cfg_checker.helpers import args_utils
from cfg_checker.modules.ceph import info, bench
@@ -33,6 +37,32 @@
# else:
# return _class
+
+def _validate_option_type(value, type_list):
+ _s, _t = utils.split_option_type(value)
+ if _t not in type_list:
+ raise CheckerException(
+ "Invalid option type '{}'. Expected types: {}".format(
+ value,
+ ", ".join(type_list)
+ )
+ )
+ else:
+ return
+
+
+def _validate_option(value, type_list):
+ if value not in type_list:
+ raise CheckerException(
+ "Invalid option '{}'. Expected one of: {}".format(
+ value,
+ ", ".join(type_list)
+ )
+ )
+ else:
+ return
+
+
def _get_param_and_log(arg, param_str):
_value = args_utils.get_arg(arg, param_str)
logger_cli.info(" {}={}".format(param_str, _value))
@@ -242,6 +272,10 @@
# dump results options
_dump_path = args_utils.get_arg(args, "dump_path")
if _dump_path:
+ if not os.path.exists(_dump_path):
+ raise CheckerException(
+ "ERROR: Dump path invalid: '{}'".format(_dump_path)
+ )
logger_cli.info("# Results will be dumped to '{}'".format(_dump_path))
config.bench_results_dump_path = _dump_path
else:
@@ -345,6 +379,38 @@
# init the Bench class
ceph_bench = bench.KubeCephBench(config)
ceph_bench.set_ceph_info_class(ceph_info)
+
+ # Validate options
+ logger_cli.info("-> Validating options")
+ # size
+ _validate_option_type(_opts["size"], ["G", "M"])
+ _validate_option_type(_opts["ramp_time"], ["s", "m"])
+ _validate_option_type(_opts["runtime"], ["s", "m"])
+ _modes = seq_modes + mix_modes + rand_modes
+ _validate_option(_opts["readwrite"], _modes)
+
+ if _task_file:
+ _s, _ = utils.split_option_type(_opts["size"])
+ for idx in range(len(ceph_bench.tasks)):
+ # size
+ _ts, _ = utils.split_option_type(ceph_bench.tasks[idx]["size"])
+ if _s < _ts:
+ logger_cli.error(
+ "ERROR: Task #{} file size is to big:"
+ " {} (volume) < {} (testfile)".format(
+ idx,
+ _opts["size"],
+ ceph_bench.tasks[idx]["size"]
+ )
+ )
+ # other
+ _validate_option(ceph_bench.tasks[idx]["readwrite"], _modes)
+ # Print defaults
+ logger_cli.debug("... default/selected options for fio:")
+ for _k in _opts.keys():
+ # TODO: Update options for single run
+ logger_cli.debug(" {} = {}".format(_k, _opts[_k]))
+
# Preload previous results for this name
ceph_bench.preload_results()
# Do the testrun
diff --git a/cfg_checker/modules/ceph/bench.py b/cfg_checker/modules/ceph/bench.py
index f5af704..b79007c 100644
--- a/cfg_checker/modules/ceph/bench.py
+++ b/cfg_checker/modules/ceph/bench.py
@@ -10,6 +10,7 @@
from cfg_checker.common import logger_cli
from cfg_checker.common.decorators import retry
from cfg_checker.common.file_utils import write_str_to_file
+from cfg_checker.common.other import utils
from cfg_checker.helpers.console_utils import Progress
from cfg_checker.helpers.console_utils import cl_typewriter
from cfg_checker.reports import reporter
@@ -45,19 +46,6 @@
return {}
-def _split_vol_size(size):
- # I know, but it is faster then regex
- _numbers = [48, 49, 50, 51, 52, 53, 54, 55, 56, 57]
- _s_int = "0"
- _s_type = ""
- for ch in size:
- if ord(ch) in _numbers:
- _s_int += ch
- else:
- _s_type += ch
- return int(_s_int), _s_type
-
-
class CephBench(object):
_agent_template = "cfgagent-template.yaml"
@@ -165,7 +153,7 @@
logger_cli.info("# Preparing {} agents".format(self.agent_count))
# Increase volume size a bit, so datafile fits
_quanitizer = 1.3
- _v_size, _vol_size_units = _split_vol_size(options['size'])
+ _v_size, _vol_size_units = utils.split_option_type(options['size'])
_v_size = round(_v_size * _quanitizer)
_vol_size = str(_v_size) + _vol_size_units + "i"
logger_cli.info(
diff --git a/cfg_checker/modules/ceph/info.py b/cfg_checker/modules/ceph/info.py
index 2c62018..db3dd75 100644
--- a/cfg_checker/modules/ceph/info.py
+++ b/cfg_checker/modules/ceph/info.py
@@ -313,49 +313,58 @@
self._safe_tools_cmd("rm -f " + _tar_path)
return _json
- def _safe_get_cmd_output_as_json(self, cmd, zipped=False):
- if zipped:
- _buf = self._safe_tools_cmd_zipped_output(cmd)
- else:
- _buf = self._safe_tools_cmd(cmd)
+ @staticmethod
+ def _as_json(buf):
try:
- return json.loads(_buf)
+ return json.loads(buf)
except ValueError as e:
_out = ""
- if len(_buf) > 512:
- _out = _buf[:512]
+ if len(buf) > 512:
+ _out = buf[:512]
_out += "..."
else:
- _out = _buf
+ _out = buf
logger_cli.error(
"\nERROR: failed to parse json: '{}'. Data: '{}'".format(
e,
_out
)
)
- return _buf
+ return buf
+
+ def _safe_get_cmd_output_as_json(self, cmd, zipped=False):
+ if zipped:
+ _buf = self._safe_tools_cmd_zipped_output(cmd)
+ else:
+ _buf = self._safe_tools_cmd(cmd)
+ return self._as_json(_buf)
def _get_tools_pod_name(self):
# get ceph pod
- _names = self.master.kube.get_pod_names_by_partial_name(
+ _pods = self.master.kube.get_pods_by_partial_name(
self.ceph_app_label,
self.ceph_ns
)
- if not _names:
+ # _names = self.master.kube.get_pod_names_by_partial_name(
+ # self.ceph_app_label,
+ # self.ceph_ns
+ # )
+ if not _pods:
raise KubeException(
"Failed to find pod using '{}'".format(self.ceph_app_label)
)
- elif len(_names) > 1:
+ elif len(_pods) > 1:
logger_cli.warning(
"WARNING: Environment has more than one pod "
"with '{}' app: {}".format(
self.ceph_app_label,
- ", ".join(_names)
+ ", ".join([p.metadata.name for p in _pods])
)
)
else:
- logger_cli.debug("... found '{}'".format(_names[0]))
- return _names[0]
+ logger_cli.debug("... found '{}'".format(_pods[0].metadata.name))
+ self.ceph_pod = _pods[0]
+ return _pods[0].metadata.name
def _add_ceph_info_item(self, key, title, data, filename=None):
# handle data
@@ -572,8 +581,7 @@
_health_metrics = {}
_devices = _c("ceph device ls")
_devices = _devices.splitlines()
- _progress = Progress(len(_devices)-1)
- _index = 1
+ cmd_list = []
for device in _devices:
_t = device.split()
_dev = _t[0]
@@ -582,14 +590,31 @@
if _dev == "DEVICE":
continue
- _metric = _cj("ceph device get-health-metrics {}".format(_dev))
+ # _metric = _cj("ceph device get-health-metrics {}".format(_dev))
+ _cmd = "ceph device get-health-metrics {}".format(_dev)
+ cmd_list.append(_cmd)
_dev_name = "{}_{}".format(_osd, _dev)
- _health_metrics[_dev_name] = _metric
+ _health_metrics[_dev_name] = {}
_health_metrics[_dev_name]['node_name'] = _node
_health_metrics[_dev_name]['osd_name'] = _osd
- _progress.write_progress(_index, note=_dev_name)
- _index += 1
- _progress.end()
+ _health_metrics[_dev_name]['cmd'] = _cmd
+
+ results = self.master.exec_cmds_on_pod(
+ self.ceph_pod,
+ cmd_list
+ )
+
+ logger_cli.info("-> Processing results")
+ for _r in results:
+ _cmd = _r[3]
+ _j = self._as_json(_r[2])
+ for _dev_name in _health_metrics.keys():
+ if "cmd" in _health_metrics[_dev_name] and \
+ _health_metrics[_dev_name]["cmd"] == _cmd:
+ _health_metrics[_dev_name].update(_j)
+ _health_metrics[_dev_name].pop("cmd")
+ break
+
self._add_ceph_info_item(
"ceph_health",
"Ceph Health Metrics",
@@ -633,21 +658,29 @@
logger_cli.info(
"-> Gathering OSD configuration ({})".format(_total_osd)
)
- # Shortcuts
- # _c = self._safe_tools_cmd
- _cj = self._safe_get_cmd_output_as_json
- _progress = Progress(_total_osd)
- _idx = 1
- _cfgs = {}
+ cmds = {}
+ cmd_list = []
for _osd in self.ceph_info["ceph_osd_df"]["data"]["nodes"]:
- _progress.write_progress(_idx, note=_osd["name"])
- _cfgs[_osd["name"]] = _cj(
- "ceph config show-with-defaults -f json {}".format(
- _osd["name"]
- )
+ _cmd = "ceph config show-with-defaults -f json {}".format(
+ _osd["name"]
)
- _idx += 1
- _progress.end()
+ cmd_list.append(_cmd)
+ cmds[_osd["name"]] = _cmd
+
+ results = self.master.exec_cmds_on_pod(
+ self.ceph_pod,
+ cmd_list
+ )
+
+ logger_cli.info("-> Processing results")
+ _cfgs = {}
+ for _r in results:
+ _cmd = _r[3]
+ _j = self._as_json(_r[2])
+ for _osd_name in cmds.keys():
+ if cmds[_osd_name] == _cmd:
+ _cfgs[_osd_name] = _j
+ break
# Process configs
_base = {}
diff --git a/cfg_checker/modules/logs/__init__.py b/cfg_checker/modules/logs/__init__.py
new file mode 100644
index 0000000..8003e54
--- /dev/null
+++ b/cfg_checker/modules/logs/__init__.py
@@ -0,0 +1,122 @@
+# Author: Alex Savatieiev (osavatieiev@mirantis.com; a.savex@gmail.com)
+# Copyright 2019-2022 Mirantis, Inc.
+import os
+
+from cfg_checker.common import logger_cli
+from cfg_checker.common.settings import ENV_TYPE_KUBE
+from cfg_checker.helpers import args_utils
+from cfg_checker.modules.logs import sage
+
+command_help = "Logs collecting and organizing"
+supported_envs = [ENV_TYPE_KUBE]
+
+
+def init_parser(_parser):
+ # network subparser
+ logs_subparsers = _parser.add_subparsers(dest='type')
+
+ collect_parser = logs_subparsers.add_parser(
+ 'collect',
+ help="Collect logs according to filters and/or given criteria"
+ )
+
+ collect_parser.add_argument(
+ '--ns',
+ metavar='namespace',
+ action="append",
+ help="Namespace to get pods from. Can be used multiple times"
+ )
+
+ collect_parser.add_argument(
+ '--pod-mask',
+ metavar='pod_mask',
+ action="append",
+ help="Mask/Keyword to filter pods. Can be used multiple times"
+ )
+
+ collect_parser.add_argument(
+ '--pods-inclusive',
+ action="store_true", default=True,
+ help="Inclusive pod mask filtering, "
+ "i.e. OR for filters for 'True' or AND for 'False"
+ )
+
+ collect_parser.add_argument(
+ '--file',
+ metavar='logs_filename',
+ help="Filename for logs to be saved to"
+ )
+
+ collect_parser.add_argument(
+ '--exclude',
+ metavar='exclude_mask',
+ action="append",
+ help="Mask/Keyword to exclude pods from final results. "
+ "Can be used multiple times"
+ )
+
+ collect_parser.add_argument(
+ '--dump-undetected',
+ metavar="dumppath", default="null",
+ help="Give dump path to store not parser log lines separatelly. "
+ "Default: null"
+ )
+
+ collect_parser.add_argument(
+ '--tail',
+ metavar='tail', default=50,
+ help="Number of lines to capture. Default: 50"
+ )
+
+ return _parser
+
+
+def do_collect(args, config):
+ # Ceph info
+ # Gather ceph info and create an archive with data
+ args_utils.check_supported_env(ENV_TYPE_KUBE, args, config)
+ # check tgz
+ _logsfile = "mos_logs.log" if not args.file else args.file
+ logger_cli.info("# Output file is '{}'".format(_logsfile))
+
+ # _class = _selectClass(_env)
+ config.prepare_qa_resources = False
+ # path to dump logs that are not detected by any regex
+ config.dumppath = args_utils.get_arg(args, "dump_undetected")
+ if config.dumppath != "null" and \
+ not os.path.exists(config.dumppath):
+ logger_cli.error(
+ "ERROR: Path to dump not parsable logs not found: '{}'".format(
+ config.dumppath
+ )
+ )
+ return
+ config.tail_lines = args_utils.get_arg(args, "tail")
+ ml = sage.KubeMosLogger(config)
+
+ # namespaces = ["openstack", "stacklight"]
+ # pod_masks = ["alerta", "nova-api"]
+ namespaces = args_utils.get_arg(args, "ns")
+ pod_masks = args_utils.get_arg(args, "pod_mask")
+ pods_inclusive = args_utils.get_arg(args, "pods_inclusive")
+ exclude_keywords = args_utils.get_arg(args, "exclude")
+ exclude_keywords = exclude_keywords if exclude_keywords else []
+ exclude_keywords += ["cleaner"]
+
+ # Prepare pod names list for log collection
+ _plist = ml.prepare_pods(
+ namespaces,
+ pod_masks,
+ inclusive_filter=pods_inclusive,
+ exclude_kw=exclude_keywords
+ )
+ # Collect logs
+ ml.collect_logs(_plist)
+ # Parse logs
+ ml.parse_logs()
+ # Merge them using timestamp
+ ml.merge_logs()
+ # Save resulting file
+ ml.save_logs(_logsfile)
+
+ return
diff --git a/cfg_checker/modules/logs/sage.py b/cfg_checker/modules/logs/sage.py
new file mode 100644
index 0000000..5375421
--- /dev/null
+++ b/cfg_checker/modules/logs/sage.py
@@ -0,0 +1,571 @@
+# Author: Alex Savatieiev (osavatieiev@mirantis.com; a.savex@gmail.com)
+# Copyright 2019-2022 Mirantis, Inc.
+import os
+import re
+
+from datetime import datetime
+from multiprocessing.dummy import Pool
+from multiprocessing import TimeoutError
+
+from cfg_checker.common import logger_cli
+# from cfg_checker.common.exception import KubeException
+
+from cfg_checker.helpers.console_utils import Progress
+from cfg_checker.nodes import KubeNodes
+
+
+_datetime_fmt = "%Y-%m-%d-%H-%M-%S.%f"
+
+# parsers = [
+# [
+# <split lines_parser>,
+# <split data for a line parser>
+# ]
+# ]
+kaas_timestamp_re = {
+ # kaas timestamp
+ # original ts comes from pandas: 2022-06-16T21:32:12.977674062Z
+ # since we do not have nanoseconds support in standard datetime,
+ # just throw away the 3 digits and 'Z'
+ "re": "(?P<kaas_date>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6})"
+ "(?:\d{3}Z) "
+ "(?P<message>.+?("
+ "(?=\\n\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6})|(?=\\n$))"
+ ")",
+ "date_fmt": "%Y-%m-%dT%H:%M:%S.%f"
+}
+log_item_re = [
+ {
+ # (mariadb) YYYY-MM-DD hh:mm:ss
+ # 2022-06-08 01:38:54 DEBUG mariadb-controller Sleeping for 10
+ "re": "(?P<date>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) +(?P<type>INFO|DEBUG|ERROR|FAIL|WARNING) +(?P<message>.*)",
+ "groups": 3,
+ "date_fmt": "%Y-%m-%d %H:%M:%S"
+ },
+ {
+ # (iam-proxy-alerta...) nnn.nnn.nnn.nnn:ppppp - uuid - - [YYYY/MM/DD hh:mm:ss]
+ # '172.16.35.69:35298 - 9b68130c-4c3b-4abd-bb04-7ff5329ad644 - - [2022/04/01 23:00:50] 10.233.118.232:4180 GET - "/ping" HTTP/1.1 "kube-probe/1.20+" 200 2 0.000'
+ "re": "(?P<src_ip>\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}):(?P<src_port>\d{1,5}).\-.(?P<guid>\S{8}-\S{4}-\S{4}-\S{4}-\S{12}).-.-.\[(?P<date>\d{4}\/\d{2}\/\d{2} \d{2}:\d{2}:\d{2})\] (?P<dest_ip>\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}):(?P<dest_port>\d{1,5}) (?P<message>.*)",
+ "groups": 7,
+ "date_fmt": "%Y/%m/%d %H:%M:%S"
+ },
+ {
+ # (default1) YYYY-MM-DD hh:mm:ss,nnn
+ #
+ "re": "(?P<date>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\,\d{3}) +(?P<type>INFO|DEBUG|ERROR|FAIL|WARNING) +(?P<message>.*)",
+ "groups": 3,
+ "date_fmt": "%Y-%m-%d %H:%M:%S,%f"
+ },
+ {
+ # (default1a) YYYY-MM-DD hh:mm:ss,nnn
+ # 2022-06-27 23:34:51,845 - OpenStack-Helm Mariadb - INFO - Updating grastate configmap
+ "re": "(?P<date>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\,\d{3}) \- (?P<process>.+) \- (?P<type>INFO|DEBUG|ERROR|FAIL|WARNING) \- (?P<message>.*)",
+ "groups": 4,
+ "date_fmt": "%Y-%m-%d %H:%M:%S,%f"
+ },
+ {
+ # (default2) YYYY-MM-DD hh:mm:ss.nnn
+ # 2022-05-23 04:01:06.360 7 INFO barbican.model.clean [-] Cleaning up soft deletions in the barbican database[00m
+ "re": "(?P<date>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}) +(?P<pid>\d{1,6}) +(?P<type>INFO|DEBUG|ERROR|FAIL|WARNING) +(?P<message>.*)",
+ "groups": 4,
+ "date_fmt": "%Y-%m-%d %H:%M:%S.%f"
+ },
+ {
+ # (default3) YYYY-MM-DD hh:mm:ss.nnn
+ # <date> - <type> - <message>
+ "re": "(?P<date>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}).\-.(?P<type>INFO|DEBUG|ERROR|FAIL|WARNING).\-.(?P<message>.*)",
+ "groups": 3,
+ "date_fmt": "%Y-%m-%d %H:%M:%S.%f"
+ },
+ {
+ # libvirt
+ # 2022-06-16 12:48:59.509+0000: 53235: info : libvirt version: 6.0.0, package: 0ubuntu8.15~cloud0 (Openstack Ubuntu Testing Bot <openstack-testing-bot@ubuntu.com> Mon, 22 Nov 2021 16:37:15 +0000)
+ "re": "(?P<date>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}\+\d{4}): (?P<pid>\d{1,6}):.(?P<type>info|debug|error|fail|warning)\s: (?P<message>.*)",
+ "groups": 4,
+ "date_fmt": "%Y-%m-%d %H:%M:%S.%f%z"
+ },
+ {
+ # 2022-06-28 00:00:55.400745 2022-06-28 00:00:55.400 13 INFO cinder.api.openstack.wsgi [req-07ca8c70-f33d-406f-9427-5388b1656297 9e5e4502e0c34c0eabcc5bfbc499b059 343ab637681b4520bf4f5a7b826b9803 - default default] https://cinder.ic-eu.ssl.mirantis.net/v2/343ab637681b4520bf4f5a7b826b9803/volumes/detail?metadata=%7B%27KaaS%27%3A%274131b4dc-81ab-4d84-b991-7b63f225058c%27%7D returned with HTTP 200
+ "re": "(?P<date>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{6}) +(?P<date_alt>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}) +(?P<pid>\d{1,6}) +(?P<type>INFO|DEBUG|ERROR|FAIL|WARNING) +(?P<message>.*)",
+ "groups": 5,
+ "date_fmt": "%Y-%m-%d %H:%M:%S.%f"
+ },
+ {
+ # 'stacklight/alerta-5fc6f5dfd-jj7ml'
+ # '2022/04/01 23:12:37 [info] 26#26: *124156 client 127.0.0.1 closed keepalive connection\n'
+ "re": "(?P<date>\d{4}\/\d{2}\/\d{2} \d{2}:\d{2}:\d{2}) +\[(?P<type>info|debug|error|fail|warning)\] +(?P<message>.*)",
+ "groups": 3,
+ "date_fmt": "%Y/%m/%d %H:%M:%S"
+ },
+ {
+ # (nova-api-osapi-....) YYYY-MM-DD hh:mm:ss,nnnnnn
+ # '2022-04-01 23:08:11.806062 capabilities. Old policies are deprecated and silently going to be ignored'
+ "re": "(?P<date>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{6}) +(?P<message>.*)",
+ "groups": 2,
+ "date_fmt": "%Y-%m-%d %H:%M:%S.%f"
+ },
+ {
+ # (nova-api-metadata..) nnn.nnn.nnn.nnn - - [DD/MMM/YYYY:hh:mm:ss +nnnn]
+ # '172.16.35.67 - - [01/Apr/2022:22:23:14 +0000] "GET / HTTP/1.1" 200 98 1345 "-" "kube-probe/1.20+"'
+ "re": "(?P<src_ip>\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}).+-.+-.+\[(?P<date>\d{1,2}\/\S{3}\/\d{4}:\d{2}:\d{2}:\d{2}.\+\d{4})\] +(?P<message>.*)",
+ "groups": 3,
+ "date_fmt": "%d/%b/%Y:%H:%M:%S %z"
+ },
+ {
+ # mysqld exporter
+ # time="2022-06-15T16:16:36Z" level=info msg="Starting mysqld_exporter (version=0.11.0, branch=HEAD, revision=5d7179615695a61ecc3b5bf90a2a7c76a9592cdd)" source="mysqld_exporter.go:206"
+ "re": "time\=\"(?P<date>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})Z\" level\=(?P<type>info|debug|error|fail|warning|warn) msg\=\"(?P<message>.*)\" source\=\"(?P<source>.+)\"",
+ "groups": 4,
+ "date_fmt": "%Y-%m-%dT%H:%M:%S"
+ },
+ {
+ # metrics
+ # 2022-06-24 20:55:19.752754+00:00 [info] <0.716.0> Setting segment_entry_count for vhost 'barbican' with 0 queues to '2048'
+ "re": "(?P<date>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}.\d{6})\+00:00 \[(?P<type>info|debug|error|fail|warning|warn)\] (?P<message>.*)",
+ "groups": 3,
+ "date_fmt": "%Y-%m-%d %H:%M:%S.%f"
+ },
+ {
+ # openvswitch
+ # 2022-06-27T23:12:52Z|25993|reconnect|WARN|unix#89422: connection dropped (Connection reset by peer)
+ # 2022-06-27T21:31:11Z|08582|connmgr|INFO|br-tun<->tcp:127.0.0.1:6633: 6 flow_mods in the 3 s starting 10 s ago (6 adds)
+ "re": "(?P<date>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})Z\|(?P<pid>\d{1,6})\|(?P<action>.+)\|(?P<type>INFO|DEBUG|ERROR|ERR|FAIL|WARNING|WARN)\|(?P<message>.*)",
+ "groups": 5,
+ "date_fmt": "%Y-%m-%dT%H:%M:%S"
+ }
+]
+
+
+def _re_groups_as_dict(compiled, re_match):
+ _d = {}
+ for _k in compiled.groupindex.keys():
+ _d[_k] = re_match[compiled.groupindex[_k]-1]
+ return _d
+
+
+class MosLogger(object):
+ def __init__(
+ self,
+ config
+ ):
+ self.env_config = config
+ return
+
+
+class KubeMosLogger(MosLogger):
+ def __init__(self, config):
+ self.master = KubeNodes(config)
+ super(KubeMosLogger, self).__init__(config)
+ # Init ceph tools pod
+ self.logs = {}
+ self.merged_logs = {}
+ self.kaas_ts_regex = re.compile(kaas_timestamp_re["re"])
+ self.item_regex = []
+ self.dumppath = config.dumppath
+ self.tail_lines = config.tail_lines
+ for regex in log_item_re:
+ self.item_regex.append(
+ {
+ "compiled": re.compile(regex["re"]),
+ "date_fmt": regex["date_fmt"],
+ "groups": regex["groups"]
+ }
+ )
+
+ def _keywords(self, tstring, keywords):
+ return [True if k in tstring else False for k in keywords]
+
+ def _safe_parse_date(self, str_date, fmt, note="-"):
+ # DEBUG
+ try:
+ _t = datetime.strptime(str_date, fmt)
+ except ValueError:
+ logger_cli.warning(
+ "WARNING ({}): Can't parse date '{}'"
+ " using '{}'".format(
+ note,
+ str_date,
+ fmt
+ )
+ )
+ _t = -1
+ return _t
+
+ def prepare_pods(
+ self,
+ ns_list,
+ kw_list,
+ inclusive_filter=True,
+ exclude_kw=[]
+ ):
+ def _list_with_containers(pod_item):
+ _list = []
+ for c in pod_item[2]:
+ _list.append([
+ pod_item[0],
+ pod_item[1],
+ c
+ ])
+ return _list
+ logger_cli.info(
+ "# Preparing pods, ns: {}; keywords: {}".format(
+ ", ".join(ns_list) if ns_list else "*",
+ ", ".join(kw_list) if kw_list else "*"
+ )
+ )
+ # [ns, pod_name]
+ _target_pods = []
+ # validate ns
+ _all_namespaces = self.master.list_namespaces()
+ for _ns in _all_namespaces:
+ if _ns in ns_list or not ns_list:
+ _tmp_pods = []
+ logger_cli.info("-> processing namespace '{}'".format(_ns))
+ # list pods using mask
+ logger_cli.debug("... getting pods list")
+ _pods = self.master.list_pod_names_with_containers(ns=_ns)
+ logger_cli.debug("... {} total pods found".format(len(_pods)))
+ for _pod in _pods:
+ # _pod[_ns, _name]
+ _kw = self._keywords(_pod[1], kw_list) \
+ if kw_list else [True]
+ if any(self._keywords(_pod[1], exclude_kw)) or \
+ any(self._keywords(_pod[2], exclude_kw)):
+ logger_cli.debug("... skipped '{}'".format(_pod[1]))
+ continue
+ elif (not inclusive_filter and all(_kw)) or \
+ (inclusive_filter and any(_kw)):
+ _cpods = _list_with_containers(_pod)
+ _tmp_pods += _cpods
+ logger_cli.debug(
+ "... added {} items for pod '{}'".format(
+ len(_cpods),
+ "/".join(_pod[:2])
+ )
+ )
+ else:
+ # logger_cli.debug("... skipped pod '{}'".format(_pod))
+ pass
+ logger_cli.info(
+ "-> {} pods processed, "
+ "{} log items to be collected".format(
+ len(_pods),
+ len(_tmp_pods)
+ )
+ )
+ _target_pods += _tmp_pods
+
+ logger_cli.info(
+ "-> found {} log items total".format(len(_target_pods))
+ )
+ return _target_pods
+
+ def _get_pod_log(self, params):
+ ns = params[0]
+ name = params[1]
+ container_name = params[2]
+ # Get target log
+ _log_data = self.master.get_logs_for_pod(
+ name,
+ container_name,
+ ns,
+ tail_lines=self.tail_lines
+ )
+ if len(_log_data) < 10:
+ return None
+ else:
+ return _log_data
+
+ def collect_logs(self, pods_list):
+
+ # Prepare list of pods to collect
+ # cmd = """
+ # kubectl get pods -A -o=jsonpath='{range .items[*]}
+ # {.metadata.namespace}{"/"}
+ # {.metadata.name}{"\n"}{range .spec.containers[*]} {.name}{"\n"}
+ # {end}'
+ # """
+
+ logger_cli.info(
+ "# Collecting logs using {} threads".format(
+ self.env_config.sage_threads
+ )
+ )
+ # Do collect using pooled threads
+ pool = Pool(self.env_config.sage_threads)
+ _params = []
+ # Prepare params for getting logs
+ for _ns, _pod_name, _cont_name in pods_list:
+ _params.append([_ns, _pod_name, _cont_name])
+ # Init progress bar
+ total_log_items = len(_params)
+ log_item_index = 0
+ _progress = Progress(total_log_items)
+ # Start pooled processing
+ results = pool.imap(self._get_pod_log, _params)
+ # Catch and parse results
+ while True:
+ try:
+ # use timeout as some of the request can hang
+ _r = results.next(timeout=10)
+ _namepath = "{}/{}:{}".format(
+ _params[log_item_index][0],
+ _params[log_item_index][1],
+ _params[log_item_index][2]
+ )
+
+ except StopIteration:
+ # end of pool
+ break
+
+ except TimeoutError:
+ # report pod which hanged and ignore it
+ _progress.clearline()
+ logger_cli.warning(
+ "WARNING: Timeout waiting for log content {}".format(
+ _namepath
+ )
+ )
+ continue
+ if _r is not None:
+ _raw = _r
+ _size = len(_raw)
+ # Save detected data
+ self.logs[_namepath] = {
+ "ns": _params[log_item_index][0],
+ "pod_name": _params[log_item_index][1],
+ "container_name": _params[log_item_index][2],
+ "raw": _raw
+ }
+
+ # print progress
+ _progress.write_progress(
+ log_item_index+1,
+ note="'{}': {} chars".format(_namepath, _size)
+ )
+
+ # track next param set
+ log_item_index += 1
+
+ _progress.end()
+ pool.close()
+ pool.join()
+
+ # debug
+ # with open("logs_debug.json", "w+") as wf:
+ # wf.write(json.dumps(self.logs))
+ # debug
+ return
+
+ def _parse_kaas_timestamps(self):
+ # shortcut function to get array aligned index
+ def _get_group(match, key):
+ return match[self.kaas_ts_regex.groupindex[key]-1]
+
+ logger_cli.info("-> Parsing kaas timestamps")
+ # iterate logs
+ _counter = 0
+ _pbar = Progress(len(self.logs))
+ for _namepath, _item in self.logs.items():
+ # next progress bar item
+ _counter += 1
+ _pbar.write_progress(_counter, note=_namepath)
+ # Get all lines from log matched
+ _matches = self.kaas_ts_regex.findall(_item.pop("raw"))
+ # iterate matches and parse timestamp
+ _log = []
+ for _match in _matches:
+ # new log item
+ _log_line = _re_groups_as_dict(self.kaas_ts_regex, _match)
+ # parse ts from kaas
+ _pts = self._safe_parse_date(
+ _log_line["kaas_date"],
+ kaas_timestamp_re["date_fmt"],
+ note=_namepath
+ )
+ _log_line["kaas_ts"] = _pts
+
+ # save log item
+ _log.append(_log_line)
+
+ # save pmessage and kaas_ts
+ _item["total_lines"] = len(_matches)
+ _item["log"] = _log
+
+ _pbar.end()
+ return
+
+ @staticmethod
+ def _get_matched(regex, line):
+ # Check if regex has matching groups number in last line
+ _c = regex["compiled"]
+ _m = _c.findall(line)
+ # calculate groups if there is a match
+ _group_count = len(_m[0]) if len(_m) > 0 else 0
+ # Check than group count at least 2
+ # and check that matched number of groups found
+ if _group_count > 1 and _group_count == len(_c.groupindex):
+ return _m
+ else:
+ return []
+
+ def _parse_log_item(self, log_item):
+ def _detect_re():
+ _re = None
+ for regex in self.item_regex:
+ _m = self._get_matched(regex, _message)
+ if _m:
+ _re = regex
+ break
+ return _re, _m
+
+ # parse whole log using detected pattern
+ l_parsed = 0
+ l_not_parsed = 0
+ l_skipped = 0
+ _index = 0
+ _li = log_item["log"]
+ _re = None
+ while _index < log_item["total_lines"]:
+ # pop message as there might be similar group name present
+ _message = _li[_index].pop("message")
+ # Parse line
+ _m = []
+ # Try last regex for this item
+ if _re is not None:
+ _m = self._get_matched(_re, _message)
+ # if not matched
+ if not _m:
+ # Try every regex to match line format
+ # by counting groups detected
+ _re, _m = _detect_re()
+ if len(_m) == 1:
+ # get matches with group names as a dict
+ _new_line_items = \
+ _re_groups_as_dict(_re["compiled"], _m[0])
+ # update original dict
+ _li[_index].update(_new_line_items)
+ # Parse date
+ _pts = self._safe_parse_date(
+ _new_line_items["date"],
+ _re["date_fmt"]
+ )
+ _li[_index]["ts"] = _pts
+ l_parsed += 1
+ elif len(_m) == 0:
+ # put back message that failed to parse
+ _li[_index]["message"] = _message
+ l_not_parsed += 1
+ else:
+ # Should never happen
+ logger_cli.warning(
+ "WARNING: Skipping ambigious log message: "
+ "'{}'".format(_message)
+ )
+ l_skipped += 0
+ # next line
+ _index += 1
+ log_item["stats"] = {
+ "parsed": l_parsed,
+ "not_parsed": l_not_parsed,
+ "skipped": l_skipped
+ }
+
+ def parse_logs(self):
+
+ # debug: load precreated logs
+ # _ll = {}
+ # with open("logs_debug.json", "r+") as rf:
+ # _ll = json.loads(rf.read())
+ # if _ll:
+ # self.logs = _ll
+ # debug: end
+
+ # Get kaas ts as a plan B if log time either not present or not parsed
+ self._parse_kaas_timestamps()
+
+ logger_cli.info("-> Parsing logs")
+ _p = Progress(len(self.logs))
+ idx = 1
+ totalParsed = 0
+ totalNotParsed = 0
+ # iterate raw logs and try to parse actual pod timing
+ for _namepath, _log in self.logs.items():
+ # Update progress bar
+ _p.write_progress(
+ idx,
+ note="parsed: {}, not parsed: {}, => {}".format(
+ totalParsed,
+ totalNotParsed,
+ _namepath
+ )
+ )
+ # Parse log
+ self._parse_log_item(_log)
+ if self.dumppath != "null":
+ for line in _log["log"]:
+ if "date" not in line.keys():
+ # Log line parsed
+ _filename = os.path.join(
+ self.dumppath,
+ _log["pod_name"]+"-"+_log["container_name"]+".log"
+ )
+ with open(_filename, "a") as rawlogfile:
+ rawlogfile.write(
+ "<KTS>{} <M>{}\n".format(
+ line["kaas_date"],
+ line["message"]
+ )
+ )
+ # Stats
+ totalParsed += _log["stats"]["parsed"]
+ totalNotParsed += _log["stats"]["not_parsed"]
+ # Update progress bar
+ # _p.write_progress(
+ # idx,
+ # note="parsed: {}, not parsed: {}, => {}".format(
+ # totalParsed,
+ # totalNotParsed,
+ # _namepath
+ # )
+ # )
+ idx += 1
+ _p.end()
+
+ def merge_logs(self):
+ logger_cli.info("# Merging logs")
+ _merged = {}
+ for _pod, _logs in self.logs.items():
+ for _li in _logs["log"]:
+ # Prepare log entry
+ _li["namepath"] = _pod
+
+ # check if timestamp is detected
+ if "ts" not in _li:
+ # Use kaas_ts as a timestamp
+ _timestamp = _li.pop("kaas_ts")
+ else:
+ # get parsed timestamp
+ _timestamp = _li.pop("ts")
+
+ # and put undetected lines separatelly
+ # save items using timestamps
+ _merged[float(_timestamp.timestamp())] = _li
+ self.merged_logs = _merged
+ return
+
+ def save_logs(self, filename):
+ logger_cli.info("# Writing output file: '{}'".format(filename))
+ with open(filename, 'w+') as ff:
+ _log_iter = sorted(
+ self.merged_logs.items(), key=lambda item: item[0]
+ )
+ for k, v in _log_iter:
+ ff.write(
+ "{} {}: {}\n".format(
+ v.pop("namepath"),
+ datetime.fromtimestamp(k).strftime(_datetime_fmt),
+ " ".join(["{}={}".format(k, v) for k, v in v.items()])
+ )
+ )
+ return
diff --git a/cfg_checker/modules/network/__init__.py b/cfg_checker/modules/network/__init__.py
index 19c0b6b..807d05f 100644
--- a/cfg_checker/modules/network/__init__.py
+++ b/cfg_checker/modules/network/__init__.py
@@ -11,6 +11,14 @@
supported_envs = [ENV_TYPE_SALT, ENV_TYPE_KUBE]
+def mtu_type(value):
+ try:
+ _mtu = int(value)
+ except ValueError:
+ _mtu = value
+ return _mtu
+
+
def _selectClass(_env, strClassHint="checker"):
_class = None
if _env == ENV_TYPE_SALT:
@@ -92,7 +100,7 @@
)
net_ping_parser.add_argument(
'--mtu',
- metavar='network_ping_mtu', default=64,
+ metavar='network_ping_mtu', default=64, type=mtu_type,
help="MTU size to use. Ping will be done for MTU - 20 - 8. Default: 64"
)
net_ping_parser.add_argument(
@@ -306,6 +314,10 @@
"Defaulting to '9100'".format(_mtu)
)
_mtu = 9100
+ elif _mtu > 63 or _mtu < 9101:
+ logger_cli.info(
+ "# MTU set to '{}'.".format(_mtu)
+ )
else:
raise CheckerException(
"Negative MTU values not supported: '{}'".format(
diff --git a/cfg_checker/modules/network/mapper.py b/cfg_checker/modules/network/mapper.py
index 5c18148..3072c68 100644
--- a/cfg_checker/modules/network/mapper.py
+++ b/cfg_checker/modules/network/mapper.py
@@ -197,6 +197,8 @@
len(self.master.nodes[key]['networks'].keys())
))
logger_cli.info("-> done collecting networks data")
+ # Save number for future recoursion depth cutting
+ _total_networks = len(self.master.nodes[key]['networks'])
logger_cli.info("-> mapping runtime network IPs")
# match interfaces by IP subnets
@@ -249,6 +251,15 @@
net_data['ifs'].append(_if)
def process_interface(lvl, interface, tree, res):
+ if abs(lvl) > _total_networks:
+ logger_cli.warn(
+ "WARNING: Probable cyclic dependency, "
+ "tree path discovery was cut down to {}".format(
+ _total_networks
+ )
+ )
+ return
+
# get childs for each root
# tree row item (<if_name>, [<parents>], [<childs>])
if lvl > 50 or lvl < -50: