Log collector module

  - [Done] multiple namespace selector
  - [Done] keyword-based pod selector
  - [Done] per-pod logs syntax detection and parsing
  - [Differed] in-place filtering for shorter logs
  - [Done] individual logs timestamp detection
  - [Done] Unix time bases Timestamp sorting
  - [Done] Single file logs output using common format
  - [Done] add all log types from all MOS namespaces and pods

  - resource preparation can be skipped per module
  - updated log collection using multiple threads
  - new setting LOG_COLLECT_THREADS

  - Network MTU fix
  - Faster cmd execution on single pod
  - Ceph benchmark validations
  - Ceph benchmark report sorting
  - Daemonset deployment with nodes skipped
  - Network tree debugging script
  - Tree depth limiter, i.e. stackoverflow prevention

  Related-PROD: PROD-36845

Change-Id: Icf229ac62078c6418ab4dbdff12b0d27ed42af1d
diff --git a/cfg_checker/modules/logs/sage.py b/cfg_checker/modules/logs/sage.py
new file mode 100644
index 0000000..5375421
--- /dev/null
+++ b/cfg_checker/modules/logs/sage.py
@@ -0,0 +1,571 @@
+#    Author: Alex Savatieiev (osavatieiev@mirantis.com; a.savex@gmail.com)
+#    Copyright 2019-2022 Mirantis, Inc.
+import os
+import re
+from datetime import datetime
+from multiprocessing.dummy import Pool
+from multiprocessing import TimeoutError
+from cfg_checker.common import logger_cli
+# from cfg_checker.common.exception import KubeException
+from cfg_checker.helpers.console_utils import Progress
+from cfg_checker.nodes import KubeNodes
+_datetime_fmt = "%Y-%m-%d-%H-%M-%S.%f"
+# parsers = [
+#   [
+#       <split lines_parser>,
+#       <split data for a line parser>
+#   ]
+# ]
+kaas_timestamp_re = {
+        # kaas timestamp
+        # original ts comes from pandas: 2022-06-16T21:32:12.977674062Z
+        # since we do not have nanoseconds support in standard datetime,
+        # just throw away the 3 digits and 'Z'
+        "re": "(?P<kaas_date>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6})"
+              "(?:\d{3}Z) "
+              "(?P<message>.+?("
+              "(?=\\n\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6})|(?=\\n$))"
+              ")",
+        "date_fmt": "%Y-%m-%dT%H:%M:%S.%f"
+log_item_re = [
+    {
+        # (mariadb) YYYY-MM-DD hh:mm:ss
+        # 2022-06-08 01:38:54 DEBUG mariadb-controller Sleeping for 10
+        "re": "(?P<date>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) +(?P<type>INFO|DEBUG|ERROR|FAIL|WARNING) +(?P<message>.*)",
+        "groups": 3,
+        "date_fmt": "%Y-%m-%d %H:%M:%S"
+    },
+    {
+        # (iam-proxy-alerta...) nnn.nnn.nnn.nnn:ppppp - uuid - - [YYYY/MM/DD hh:mm:ss]
+        # ' - 9b68130c-4c3b-4abd-bb04-7ff5329ad644 - - [2022/04/01 23:00:50] GET - "/ping" HTTP/1.1 "kube-probe/1.20+" 200 2 0.000'
+        "re": "(?P<src_ip>\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}):(?P<src_port>\d{1,5}).\-.(?P<guid>\S{8}-\S{4}-\S{4}-\S{4}-\S{12}).-.-.\[(?P<date>\d{4}\/\d{2}\/\d{2} \d{2}:\d{2}:\d{2})\] (?P<dest_ip>\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}):(?P<dest_port>\d{1,5}) (?P<message>.*)",
+        "groups": 7,
+        "date_fmt": "%Y/%m/%d %H:%M:%S"
+    },
+    {
+        # (default1) YYYY-MM-DD hh:mm:ss,nnn
+        #
+        "re": "(?P<date>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\,\d{3}) +(?P<type>INFO|DEBUG|ERROR|FAIL|WARNING) +(?P<message>.*)",
+        "groups": 3,
+        "date_fmt": "%Y-%m-%d %H:%M:%S,%f"
+    },
+    {
+        # (default1a) YYYY-MM-DD hh:mm:ss,nnn
+        # 2022-06-27 23:34:51,845 - OpenStack-Helm Mariadb - INFO - Updating grastate configmap
+        "re": "(?P<date>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\,\d{3}) \- (?P<process>.+) \- (?P<type>INFO|DEBUG|ERROR|FAIL|WARNING) \- (?P<message>.*)",
+        "groups": 4,
+        "date_fmt": "%Y-%m-%d %H:%M:%S,%f"
+    },
+    {
+        # (default2) YYYY-MM-DD hh:mm:ss.nnn
+        # 2022-05-23 04:01:06.360 7 INFO barbican.model.clean [-] Cleaning up soft deletions in the barbican database
+        "re": "(?P<date>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}) +(?P<pid>\d{1,6}) +(?P<type>INFO|DEBUG|ERROR|FAIL|WARNING) +(?P<message>.*)",
+        "groups": 4,
+        "date_fmt": "%Y-%m-%d %H:%M:%S.%f"
+    },
+    {
+        # (default3) YYYY-MM-DD hh:mm:ss.nnn
+        # <date> - <type> - <message>
+        "re": "(?P<date>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}).\-.(?P<type>INFO|DEBUG|ERROR|FAIL|WARNING).\-.(?P<message>.*)",
+        "groups": 3,
+        "date_fmt": "%Y-%m-%d %H:%M:%S.%f"
+    },
+    {
+        # libvirt
+        # 2022-06-16 12:48:59.509+0000: 53235: info : libvirt version: 6.0.0, package: 0ubuntu8.15~cloud0 (Openstack Ubuntu Testing Bot <openstack-testing-bot@ubuntu.com> Mon, 22 Nov 2021 16:37:15 +0000)
+        "re": "(?P<date>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}\+\d{4}): (?P<pid>\d{1,6}):.(?P<type>info|debug|error|fail|warning)\s: (?P<message>.*)",
+        "groups": 4,
+        "date_fmt": "%Y-%m-%d %H:%M:%S.%f%z"
+    },
+    {
+        # 2022-06-28 00:00:55.400745 2022-06-28 00:00:55.400 13 INFO cinder.api.openstack.wsgi [req-07ca8c70-f33d-406f-9427-5388b1656297 9e5e4502e0c34c0eabcc5bfbc499b059 343ab637681b4520bf4f5a7b826b9803 - default default] https://cinder.ic-eu.ssl.mirantis.net/v2/343ab637681b4520bf4f5a7b826b9803/volumes/detail?metadata=%7B%27KaaS%27%3A%274131b4dc-81ab-4d84-b991-7b63f225058c%27%7D returned with HTTP 200
+        "re": "(?P<date>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{6}) +(?P<date_alt>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}) +(?P<pid>\d{1,6}) +(?P<type>INFO|DEBUG|ERROR|FAIL|WARNING) +(?P<message>.*)",
+        "groups": 5,
+        "date_fmt": "%Y-%m-%d %H:%M:%S.%f"
+    },
+    {
+        # 'stacklight/alerta-5fc6f5dfd-jj7ml'
+        # '2022/04/01 23:12:37 [info] 26#26: *124156 client closed keepalive connection\n'
+        "re": "(?P<date>\d{4}\/\d{2}\/\d{2} \d{2}:\d{2}:\d{2}) +\[(?P<type>info|debug|error|fail|warning)\] +(?P<message>.*)",
+        "groups": 3,
+        "date_fmt": "%Y/%m/%d %H:%M:%S"
+    },
+    {
+        # (nova-api-osapi-....) YYYY-MM-DD hh:mm:ss,nnnnnn
+        # '2022-04-01 23:08:11.806062 capabilities. Old policies are deprecated and silently going to be ignored'
+        "re": "(?P<date>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{6}) +(?P<message>.*)",
+        "groups": 2,
+        "date_fmt": "%Y-%m-%d %H:%M:%S.%f"
+    },
+    {
+        # (nova-api-metadata..) nnn.nnn.nnn.nnn - - [DD/MMM/YYYY:hh:mm:ss +nnnn]
+        # ' - - [01/Apr/2022:22:23:14 +0000] "GET / HTTP/1.1" 200 98 1345 "-" "kube-probe/1.20+"'
+        "re": "(?P<src_ip>\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}).+-.+-.+\[(?P<date>\d{1,2}\/\S{3}\/\d{4}:\d{2}:\d{2}:\d{2}.\+\d{4})\] +(?P<message>.*)",
+        "groups": 3,
+        "date_fmt": "%d/%b/%Y:%H:%M:%S %z"
+    },
+    {
+        # mysqld exporter
+        # time="2022-06-15T16:16:36Z" level=info msg="Starting mysqld_exporter (version=0.11.0, branch=HEAD, revision=5d7179615695a61ecc3b5bf90a2a7c76a9592cdd)" source="mysqld_exporter.go:206"
+        "re": "time\=\"(?P<date>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})Z\" level\=(?P<type>info|debug|error|fail|warning|warn) msg\=\"(?P<message>.*)\" source\=\"(?P<source>.+)\"",
+        "groups": 4,
+        "date_fmt": "%Y-%m-%dT%H:%M:%S"
+    },
+    {
+        # metrics
+        # 2022-06-24 20:55:19.752754+00:00 [info] <0.716.0> Setting segment_entry_count for vhost 'barbican' with 0 queues to '2048'
+        "re": "(?P<date>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}.\d{6})\+00:00 \[(?P<type>info|debug|error|fail|warning|warn)\] (?P<message>.*)",
+        "groups": 3,
+        "date_fmt": "%Y-%m-%d %H:%M:%S.%f"
+    },
+    {
+        # openvswitch
+        # 2022-06-27T23:12:52Z|25993|reconnect|WARN|unix#89422: connection dropped (Connection reset by peer)
+        # 2022-06-27T21:31:11Z|08582|connmgr|INFO|br-tun<->tcp: 6 flow_mods in the 3 s starting 10 s ago (6 adds)
+        "re": "(?P<date>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})Z\|(?P<pid>\d{1,6})\|(?P<action>.+)\|(?P<type>INFO|DEBUG|ERROR|ERR|FAIL|WARNING|WARN)\|(?P<message>.*)",
+        "groups": 5,
+        "date_fmt": "%Y-%m-%dT%H:%M:%S"
+    }
+def _re_groups_as_dict(compiled, re_match):
+    _d = {}
+    for _k in compiled.groupindex.keys():
+        _d[_k] = re_match[compiled.groupindex[_k]-1]
+    return _d
+class MosLogger(object):
+    def __init__(
+        self,
+        config
+    ):
+        self.env_config = config
+        return
+class KubeMosLogger(MosLogger):
+    def __init__(self, config):
+        self.master = KubeNodes(config)
+        super(KubeMosLogger, self).__init__(config)
+        # Init ceph tools pod
+        self.logs = {}
+        self.merged_logs = {}
+        self.kaas_ts_regex = re.compile(kaas_timestamp_re["re"])
+        self.item_regex = []
+        self.dumppath = config.dumppath
+        self.tail_lines = config.tail_lines
+        for regex in log_item_re:
+            self.item_regex.append(
+                {
+                    "compiled": re.compile(regex["re"]),
+                    "date_fmt": regex["date_fmt"],
+                    "groups": regex["groups"]
+                }
+            )
+    def _keywords(self, tstring, keywords):
+        return [True if k in tstring else False for k in keywords]
+    def _safe_parse_date(self, str_date, fmt, note="-"):
+        # DEBUG
+        try:
+            _t = datetime.strptime(str_date, fmt)
+        except ValueError:
+            logger_cli.warning(
+                "WARNING ({}): Can't parse date '{}'"
+                " using '{}'".format(
+                    note,
+                    str_date,
+                    fmt
+                )
+            )
+            _t = -1
+        return _t
+    def prepare_pods(
+        self,
+        ns_list,
+        kw_list,
+        inclusive_filter=True,
+        exclude_kw=[]
+    ):
+        def _list_with_containers(pod_item):
+            _list = []
+            for c in pod_item[2]:
+                _list.append([
+                    pod_item[0],
+                    pod_item[1],
+                    c
+                ])
+            return _list
+        logger_cli.info(
+            "# Preparing pods, ns: {}; keywords: {}".format(
+                ", ".join(ns_list) if ns_list else "*",
+                ", ".join(kw_list) if kw_list else "*"
+            )
+        )
+        # [ns, pod_name]
+        _target_pods = []
+        # validate ns
+        _all_namespaces = self.master.list_namespaces()
+        for _ns in _all_namespaces:
+            if _ns in ns_list or not ns_list:
+                _tmp_pods = []
+                logger_cli.info("-> processing namespace '{}'".format(_ns))
+                # list pods using mask
+                logger_cli.debug("... getting pods list")
+                _pods = self.master.list_pod_names_with_containers(ns=_ns)
+                logger_cli.debug("... {} total pods found".format(len(_pods)))
+                for _pod in _pods:
+                    # _pod[_ns, _name]
+                    _kw = self._keywords(_pod[1], kw_list) \
+                        if kw_list else [True]
+                    if any(self._keywords(_pod[1], exclude_kw)) or \
+                       any(self._keywords(_pod[2], exclude_kw)):
+                        logger_cli.debug("... skipped '{}'".format(_pod[1]))
+                        continue
+                    elif (not inclusive_filter and all(_kw)) or \
+                         (inclusive_filter and any(_kw)):
+                        _cpods = _list_with_containers(_pod)
+                        _tmp_pods += _cpods
+                        logger_cli.debug(
+                            "... added {} items for pod '{}'".format(
+                                len(_cpods),
+                                "/".join(_pod[:2])
+                            )
+                        )
+                    else:
+                        # logger_cli.debug("... skipped pod '{}'".format(_pod))
+                        pass
+                logger_cli.info(
+                    "-> {} pods processed, "
+                    "{} log items to be collected".format(
+                        len(_pods),
+                        len(_tmp_pods)
+                    )
+                )
+                _target_pods += _tmp_pods
+        logger_cli.info(
+            "-> found {} log items total".format(len(_target_pods))
+        )
+        return _target_pods
+    def _get_pod_log(self, params):
+        ns = params[0]
+        name = params[1]
+        container_name = params[2]
+        # Get target log
+        _log_data = self.master.get_logs_for_pod(
+            name,
+            container_name,
+            ns,
+            tail_lines=self.tail_lines
+        )
+        if len(_log_data) < 10:
+            return None
+        else:
+            return _log_data
+    def collect_logs(self, pods_list):
+        # Prepare list of pods to collect
+        # cmd = """
+        # kubectl get pods -A -o=jsonpath='{range .items[*]}
+        #     {.metadata.namespace}{"/"}
+        #     {.metadata.name}{"\n"}{range .spec.containers[*]} {.name}{"\n"}
+        #     {end}'
+        # """
+        logger_cli.info(
+            "# Collecting logs using {} threads".format(
+                self.env_config.sage_threads
+            )
+        )
+        # Do collect using pooled threads
+        pool = Pool(self.env_config.sage_threads)
+        _params = []
+        # Prepare params for getting logs
+        for _ns, _pod_name, _cont_name in pods_list:
+            _params.append([_ns, _pod_name, _cont_name])
+        # Init progress bar
+        total_log_items = len(_params)
+        log_item_index = 0
+        _progress = Progress(total_log_items)
+        # Start pooled processing
+        results = pool.imap(self._get_pod_log, _params)
+        # Catch and parse results
+        while True:
+            try:
+                # use timeout as some of the request can hang
+                _r = results.next(timeout=10)
+                _namepath = "{}/{}:{}".format(
+                    _params[log_item_index][0],
+                    _params[log_item_index][1],
+                    _params[log_item_index][2]
+                )
+            except StopIteration:
+                # end of pool
+                break
+            except TimeoutError:
+                # report pod which hanged and ignore it
+                _progress.clearline()
+                logger_cli.warning(
+                    "WARNING: Timeout waiting for log content {}".format(
+                        _namepath
+                    )
+                )
+                continue
+            if _r is not None:
+                _raw = _r
+                _size = len(_raw)
+                # Save detected data
+                self.logs[_namepath] = {
+                    "ns": _params[log_item_index][0],
+                    "pod_name": _params[log_item_index][1],
+                    "container_name": _params[log_item_index][2],
+                    "raw": _raw
+                }
+            # print progress
+            _progress.write_progress(
+                log_item_index+1,
+                note="'{}': {} chars".format(_namepath, _size)
+            )
+            # track next param set
+            log_item_index += 1
+        _progress.end()
+        pool.close()
+        pool.join()
+        # debug
+        # with open("logs_debug.json", "w+") as wf:
+        #     wf.write(json.dumps(self.logs))
+        # debug
+        return
+    def _parse_kaas_timestamps(self):
+        # shortcut function to get array aligned index
+        def _get_group(match, key):
+            return match[self.kaas_ts_regex.groupindex[key]-1]
+        logger_cli.info("-> Parsing kaas timestamps")
+        # iterate logs
+        _counter = 0
+        _pbar = Progress(len(self.logs))
+        for _namepath, _item in self.logs.items():
+            # next progress bar item
+            _counter += 1
+            _pbar.write_progress(_counter, note=_namepath)
+            # Get all lines from log matched
+            _matches = self.kaas_ts_regex.findall(_item.pop("raw"))
+            # iterate matches and parse timestamp
+            _log = []
+            for _match in _matches:
+                # new log item
+                _log_line = _re_groups_as_dict(self.kaas_ts_regex, _match)
+                # parse ts from kaas
+                _pts = self._safe_parse_date(
+                    _log_line["kaas_date"],
+                    kaas_timestamp_re["date_fmt"],
+                    note=_namepath
+                )
+                _log_line["kaas_ts"] = _pts
+                # save log item
+                _log.append(_log_line)
+            # save pmessage and kaas_ts
+            _item["total_lines"] = len(_matches)
+            _item["log"] = _log
+        _pbar.end()
+        return
+    @staticmethod
+    def _get_matched(regex, line):
+        # Check if regex has matching groups number in last line
+        _c = regex["compiled"]
+        _m = _c.findall(line)
+        # calculate groups if there is a match
+        _group_count = len(_m[0]) if len(_m) > 0 else 0
+        # Check than group count at least 2
+        # and check that matched number of groups found
+        if _group_count > 1 and _group_count == len(_c.groupindex):
+            return _m
+        else:
+            return []
+    def _parse_log_item(self, log_item):
+        def _detect_re():
+            _re = None
+            for regex in self.item_regex:
+                _m = self._get_matched(regex, _message)
+                if _m:
+                    _re = regex
+                    break
+            return _re, _m
+        # parse whole log using detected pattern
+        l_parsed = 0
+        l_not_parsed = 0
+        l_skipped = 0
+        _index = 0
+        _li = log_item["log"]
+        _re = None
+        while _index < log_item["total_lines"]:
+            # pop message as there might be similar group name present
+            _message = _li[_index].pop("message")
+            # Parse line
+            _m = []
+            # Try last regex for this item
+            if _re is not None:
+                _m = self._get_matched(_re, _message)
+                # if not matched
+            if not _m:
+                # Try every regex to match line format
+                # by counting groups detected
+                _re, _m = _detect_re()
+            if len(_m) == 1:
+                # get matches with group names as a dict
+                _new_line_items = \
+                    _re_groups_as_dict(_re["compiled"], _m[0])
+                # update original dict
+                _li[_index].update(_new_line_items)
+                # Parse date
+                _pts = self._safe_parse_date(
+                    _new_line_items["date"],
+                    _re["date_fmt"]
+                )
+                _li[_index]["ts"] = _pts
+                l_parsed += 1
+            elif len(_m) == 0:
+                # put back message that failed to parse
+                _li[_index]["message"] = _message
+                l_not_parsed += 1
+            else:
+                # Should never happen
+                logger_cli.warning(
+                    "WARNING: Skipping ambigious log message: "
+                    "'{}'".format(_message)
+                )
+                l_skipped += 0
+            # next line
+            _index += 1
+        log_item["stats"] = {
+            "parsed": l_parsed,
+            "not_parsed": l_not_parsed,
+            "skipped": l_skipped
+        }
+    def parse_logs(self):
+        # debug: load precreated logs
+        # _ll = {}
+        # with open("logs_debug.json", "r+") as rf:
+        #     _ll = json.loads(rf.read())
+        # if _ll:
+        #     self.logs = _ll
+        # debug: end
+        # Get kaas ts as a plan B if log time either not present or not parsed
+        self._parse_kaas_timestamps()
+        logger_cli.info("-> Parsing logs")
+        _p = Progress(len(self.logs))
+        idx = 1
+        totalParsed = 0
+        totalNotParsed = 0
+        # iterate raw logs and try to parse actual pod timing
+        for _namepath, _log in self.logs.items():
+            # Update progress bar
+            _p.write_progress(
+                idx,
+                note="parsed: {}, not parsed: {}, => {}".format(
+                    totalParsed,
+                    totalNotParsed,
+                    _namepath
+                )
+            )
+            # Parse log
+            self._parse_log_item(_log)
+            if self.dumppath != "null":
+                for line in _log["log"]:
+                    if "date" not in line.keys():
+                        # Log line parsed
+                        _filename = os.path.join(
+                            self.dumppath,
+                            _log["pod_name"]+"-"+_log["container_name"]+".log"
+                        )
+                        with open(_filename, "a") as rawlogfile:
+                            rawlogfile.write(
+                                "<KTS>{} <M>{}\n".format(
+                                    line["kaas_date"],
+                                    line["message"]
+                                )
+                            )
+            # Stats
+            totalParsed += _log["stats"]["parsed"]
+            totalNotParsed += _log["stats"]["not_parsed"]
+            # Update progress bar
+            # _p.write_progress(
+            #     idx,
+            #     note="parsed: {}, not parsed: {}, => {}".format(
+            #         totalParsed,
+            #         totalNotParsed,
+            #         _namepath
+            #     )
+            # )
+            idx += 1
+        _p.end()
+    def merge_logs(self):
+        logger_cli.info("# Merging logs")
+        _merged = {}
+        for _pod, _logs in self.logs.items():
+            for _li in _logs["log"]:
+                # Prepare log entry
+                _li["namepath"] = _pod
+                # check if timestamp is detected
+                if "ts" not in _li:
+                    # Use kaas_ts as a timestamp
+                    _timestamp = _li.pop("kaas_ts")
+                else:
+                    # get parsed timestamp
+                    _timestamp = _li.pop("ts")
+                # and put undetected lines separatelly
+                # save items using timestamps
+                _merged[float(_timestamp.timestamp())] = _li
+        self.merged_logs = _merged
+        return
+    def save_logs(self, filename):
+        logger_cli.info("# Writing output file: '{}'".format(filename))
+        with open(filename, 'w+') as ff:
+            _log_iter = sorted(
+                self.merged_logs.items(), key=lambda item: item[0]
+            )
+            for k, v in _log_iter:
+                ff.write(
+                    "{} {}: {}\n".format(
+                        v.pop("namepath"),
+                        datetime.fromtimestamp(k).strftime(_datetime_fmt),
+                        " ".join(["{}={}".format(k, v) for k, v in v.items()])
+                    )
+                )
+        return