| # Author: Alex Savatieiev (osavatieiev@mirantis.com; a.savex@gmail.com) |
| # Copyright 2019-2022 Mirantis, Inc. |
| import os |
| import re |
| |
| from datetime import datetime |
| from multiprocessing.dummy import Pool |
| from multiprocessing import TimeoutError |
| |
| from cfg_checker.common import logger_cli |
| # from cfg_checker.common.exception import KubeException |
| |
| from cfg_checker.helpers.console_utils import Progress |
| from cfg_checker.nodes import KubeNodes |
| |
| |
| _datetime_fmt = "%Y-%m-%d-%H-%M-%S.%f" |
| |
| # parsers = [ |
| # [ |
| # <split lines_parser>, |
| # <split data for a line parser> |
| # ] |
| # ] |
| kaas_timestamp_re = { |
| # kaas timestamp |
| # original ts comes from pandas: 2022-06-16T21:32:12.977674062Z |
| # since we do not have nanoseconds support in standard datetime, |
| # just throw away the 3 digits and 'Z' |
| "re": "(?P<kaas_date>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6})" |
| "(?:\d{3}Z) " |
| "(?P<message>.+?(" |
| "(?=\\n\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6})|(?=\\n$))" |
| ")", |
| "date_fmt": "%Y-%m-%dT%H:%M:%S.%f" |
| } |
| log_item_re = [ |
| { |
| # (mariadb) YYYY-MM-DD hh:mm:ss |
| # 2022-06-08 01:38:54 DEBUG mariadb-controller Sleeping for 10 |
| "re": "(?P<date>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) +(?P<type>INFO|DEBUG|ERROR|FAIL|WARNING) +(?P<message>.*)", |
| "groups": 3, |
| "date_fmt": "%Y-%m-%d %H:%M:%S" |
| }, |
| { |
| # (iam-proxy-alerta...) nnn.nnn.nnn.nnn:ppppp - uuid - - [YYYY/MM/DD hh:mm:ss] |
| # '172.16.35.69:35298 - 9b68130c-4c3b-4abd-bb04-7ff5329ad644 - - [2022/04/01 23:00:50] 10.233.118.232:4180 GET - "/ping" HTTP/1.1 "kube-probe/1.20+" 200 2 0.000' |
| "re": "(?P<src_ip>\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}):(?P<src_port>\d{1,5}).\-.(?P<guid>\S{8}-\S{4}-\S{4}-\S{4}-\S{12}).-.-.\[(?P<date>\d{4}\/\d{2}\/\d{2} \d{2}:\d{2}:\d{2})\] (?P<dest_ip>\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}):(?P<dest_port>\d{1,5}) (?P<message>.*)", |
| "groups": 7, |
| "date_fmt": "%Y/%m/%d %H:%M:%S" |
| }, |
| { |
| # (default1) YYYY-MM-DD hh:mm:ss,nnn |
| # |
| "re": "(?P<date>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\,\d{3}) +(?P<type>INFO|DEBUG|ERROR|FAIL|WARNING) +(?P<message>.*)", |
| "groups": 3, |
| "date_fmt": "%Y-%m-%d %H:%M:%S,%f" |
| }, |
| { |
| # (default1a) YYYY-MM-DD hh:mm:ss,nnn |
| # 2022-06-27 23:34:51,845 - OpenStack-Helm Mariadb - INFO - Updating grastate configmap |
| "re": "(?P<date>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\,\d{3}) \- (?P<process>.+) \- (?P<type>INFO|DEBUG|ERROR|FAIL|WARNING) \- (?P<message>.*)", |
| "groups": 4, |
| "date_fmt": "%Y-%m-%d %H:%M:%S,%f" |
| }, |
| { |
| # (default2) YYYY-MM-DD hh:mm:ss.nnn |
| # 2022-05-23 04:01:06.360 7 INFO barbican.model.clean [-] Cleaning up soft deletions in the barbican database[00m |
| "re": "(?P<date>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}) +(?P<pid>\d{1,6}) +(?P<type>INFO|DEBUG|ERROR|FAIL|WARNING) +(?P<message>.*)", |
| "groups": 4, |
| "date_fmt": "%Y-%m-%d %H:%M:%S.%f" |
| }, |
| { |
| # (default3) YYYY-MM-DD hh:mm:ss.nnn |
| # <date> - <type> - <message> |
| "re": "(?P<date>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}).\-.(?P<type>INFO|DEBUG|ERROR|FAIL|WARNING).\-.(?P<message>.*)", |
| "groups": 3, |
| "date_fmt": "%Y-%m-%d %H:%M:%S.%f" |
| }, |
| { |
| # libvirt |
| # 2022-06-16 12:48:59.509+0000: 53235: info : libvirt version: 6.0.0, package: 0ubuntu8.15~cloud0 (Openstack Ubuntu Testing Bot <openstack-testing-bot@ubuntu.com> Mon, 22 Nov 2021 16:37:15 +0000) |
| "re": "(?P<date>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}\+\d{4}): (?P<pid>\d{1,6}):.(?P<type>info|debug|error|fail|warning)\s: (?P<message>.*)", |
| "groups": 4, |
| "date_fmt": "%Y-%m-%d %H:%M:%S.%f%z" |
| }, |
| { |
| # 2022-06-28 00:00:55.400745 2022-06-28 00:00:55.400 13 INFO cinder.api.openstack.wsgi [req-07ca8c70-f33d-406f-9427-5388b1656297 9e5e4502e0c34c0eabcc5bfbc499b059 343ab637681b4520bf4f5a7b826b9803 - default default] https://cinder.ic-eu.ssl.mirantis.net/v2/343ab637681b4520bf4f5a7b826b9803/volumes/detail?metadata=%7B%27KaaS%27%3A%274131b4dc-81ab-4d84-b991-7b63f225058c%27%7D returned with HTTP 200 |
| "re": "(?P<date>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{6}) +(?P<date_alt>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}) +(?P<pid>\d{1,6}) +(?P<type>INFO|DEBUG|ERROR|FAIL|WARNING) +(?P<message>.*)", |
| "groups": 5, |
| "date_fmt": "%Y-%m-%d %H:%M:%S.%f" |
| }, |
| { |
| # 'stacklight/alerta-5fc6f5dfd-jj7ml' |
| # '2022/04/01 23:12:37 [info] 26#26: *124156 client 127.0.0.1 closed keepalive connection\n' |
| "re": "(?P<date>\d{4}\/\d{2}\/\d{2} \d{2}:\d{2}:\d{2}) +\[(?P<type>info|debug|error|fail|warning)\] +(?P<message>.*)", |
| "groups": 3, |
| "date_fmt": "%Y/%m/%d %H:%M:%S" |
| }, |
| { |
| # (nova-api-osapi-....) YYYY-MM-DD hh:mm:ss,nnnnnn |
| # '2022-04-01 23:08:11.806062 capabilities. Old policies are deprecated and silently going to be ignored' |
| "re": "(?P<date>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{6}) +(?P<message>.*)", |
| "groups": 2, |
| "date_fmt": "%Y-%m-%d %H:%M:%S.%f" |
| }, |
| { |
| # (nova-api-metadata..) nnn.nnn.nnn.nnn - - [DD/MMM/YYYY:hh:mm:ss +nnnn] |
| # '172.16.35.67 - - [01/Apr/2022:22:23:14 +0000] "GET / HTTP/1.1" 200 98 1345 "-" "kube-probe/1.20+"' |
| "re": "(?P<src_ip>\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}).+-.+-.+\[(?P<date>\d{1,2}\/\S{3}\/\d{4}:\d{2}:\d{2}:\d{2}.\+\d{4})\] +(?P<message>.*)", |
| "groups": 3, |
| "date_fmt": "%d/%b/%Y:%H:%M:%S %z" |
| }, |
| { |
| # mysqld exporter |
| # time="2022-06-15T16:16:36Z" level=info msg="Starting mysqld_exporter (version=0.11.0, branch=HEAD, revision=5d7179615695a61ecc3b5bf90a2a7c76a9592cdd)" source="mysqld_exporter.go:206" |
| "re": "time\=\"(?P<date>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})Z\" level\=(?P<type>info|debug|error|fail|warning|warn) msg\=\"(?P<message>.*)\" source\=\"(?P<source>.+)\"", |
| "groups": 4, |
| "date_fmt": "%Y-%m-%dT%H:%M:%S" |
| }, |
| { |
| # metrics |
| # 2022-06-24 20:55:19.752754+00:00 [info] <0.716.0> Setting segment_entry_count for vhost 'barbican' with 0 queues to '2048' |
| "re": "(?P<date>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}.\d{6})\+00:00 \[(?P<type>info|debug|error|fail|warning|warn)\] (?P<message>.*)", |
| "groups": 3, |
| "date_fmt": "%Y-%m-%d %H:%M:%S.%f" |
| }, |
| { |
| # openvswitch |
| # 2022-06-27T23:12:52Z|25993|reconnect|WARN|unix#89422: connection dropped (Connection reset by peer) |
| # 2022-06-27T21:31:11Z|08582|connmgr|INFO|br-tun<->tcp:127.0.0.1:6633: 6 flow_mods in the 3 s starting 10 s ago (6 adds) |
| "re": "(?P<date>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})Z\|(?P<pid>\d{1,6})\|(?P<action>.+)\|(?P<type>INFO|DEBUG|ERROR|ERR|FAIL|WARNING|WARN)\|(?P<message>.*)", |
| "groups": 5, |
| "date_fmt": "%Y-%m-%dT%H:%M:%S" |
| } |
| ] |
| |
| |
| def _re_groups_as_dict(compiled, re_match): |
| _d = {} |
| for _k in compiled.groupindex.keys(): |
| _d[_k] = re_match[compiled.groupindex[_k]-1] |
| return _d |
| |
| |
| class MosLogger(object): |
| def __init__( |
| self, |
| config |
| ): |
| self.env_config = config |
| return |
| |
| |
| class KubeMosLogger(MosLogger): |
| def __init__(self, config): |
| self.master = KubeNodes(config) |
| super(KubeMosLogger, self).__init__(config) |
| # Init ceph tools pod |
| self.logs = {} |
| self.merged_logs = {} |
| self.kaas_ts_regex = re.compile(kaas_timestamp_re["re"]) |
| self.item_regex = [] |
| self.dumppath = config.dumppath |
| self.tail_lines = config.tail_lines |
| for regex in log_item_re: |
| self.item_regex.append( |
| { |
| "compiled": re.compile(regex["re"]), |
| "date_fmt": regex["date_fmt"], |
| "groups": regex["groups"] |
| } |
| ) |
| |
| def _keywords(self, tstring, keywords): |
| return [True if k in tstring else False for k in keywords] |
| |
| def _safe_parse_date(self, str_date, fmt, note="-"): |
| # DEBUG |
| try: |
| _t = datetime.strptime(str_date, fmt) |
| except ValueError: |
| logger_cli.warning( |
| "WARNING ({}): Can't parse date '{}'" |
| " using '{}'".format( |
| note, |
| str_date, |
| fmt |
| ) |
| ) |
| _t = -1 |
| return _t |
| |
| def prepare_pods( |
| self, |
| ns_list, |
| kw_list, |
| inclusive_filter=True, |
| exclude_kw=[] |
| ): |
| def _list_with_containers(pod_item): |
| _list = [] |
| for c in pod_item[2]: |
| _list.append([ |
| pod_item[0], |
| pod_item[1], |
| c |
| ]) |
| return _list |
| logger_cli.info( |
| "# Preparing pods, ns: {}; keywords: {}".format( |
| ", ".join(ns_list) if ns_list else "*", |
| ", ".join(kw_list) if kw_list else "*" |
| ) |
| ) |
| # [ns, pod_name] |
| _target_pods = [] |
| # validate ns |
| _all_namespaces = self.master.list_namespaces() |
| for _ns in _all_namespaces: |
| if _ns in ns_list or not ns_list: |
| _tmp_pods = [] |
| logger_cli.info("-> processing namespace '{}'".format(_ns)) |
| # list pods using mask |
| logger_cli.debug("... getting pods list") |
| _pods = self.master.list_pod_names_with_containers(ns=_ns) |
| logger_cli.debug("... {} total pods found".format(len(_pods))) |
| for _pod in _pods: |
| # _pod[_ns, _name] |
| _kw = self._keywords(_pod[1], kw_list) \ |
| if kw_list else [True] |
| if any(self._keywords(_pod[1], exclude_kw)) or \ |
| any(self._keywords(_pod[2], exclude_kw)): |
| logger_cli.debug("... skipped '{}'".format(_pod[1])) |
| continue |
| elif (not inclusive_filter and all(_kw)) or \ |
| (inclusive_filter and any(_kw)): |
| _cpods = _list_with_containers(_pod) |
| _tmp_pods += _cpods |
| logger_cli.debug( |
| "... added {} items for pod '{}'".format( |
| len(_cpods), |
| "/".join(_pod[:2]) |
| ) |
| ) |
| else: |
| # logger_cli.debug("... skipped pod '{}'".format(_pod)) |
| pass |
| logger_cli.info( |
| "-> {} pods processed, " |
| "{} log items to be collected".format( |
| len(_pods), |
| len(_tmp_pods) |
| ) |
| ) |
| _target_pods += _tmp_pods |
| |
| logger_cli.info( |
| "-> found {} log items total".format(len(_target_pods)) |
| ) |
| return _target_pods |
| |
| def _get_pod_log(self, params): |
| ns = params[0] |
| name = params[1] |
| container_name = params[2] |
| # Get target log |
| _log_data = self.master.get_logs_for_pod( |
| name, |
| container_name, |
| ns, |
| tail_lines=self.tail_lines |
| ) |
| if len(_log_data) < 10: |
| return None |
| else: |
| return _log_data |
| |
| def collect_logs(self, pods_list): |
| |
| # Prepare list of pods to collect |
| # cmd = """ |
| # kubectl get pods -A -o=jsonpath='{range .items[*]} |
| # {.metadata.namespace}{"/"} |
| # {.metadata.name}{"\n"}{range .spec.containers[*]} {.name}{"\n"} |
| # {end}' |
| # """ |
| |
| logger_cli.info( |
| "# Collecting logs using {} threads".format( |
| self.env_config.sage_threads |
| ) |
| ) |
| # Do collect using pooled threads |
| pool = Pool(self.env_config.sage_threads) |
| _params = [] |
| # Prepare params for getting logs |
| for _ns, _pod_name, _cont_name in pods_list: |
| _params.append([_ns, _pod_name, _cont_name]) |
| # Init progress bar |
| total_log_items = len(_params) |
| log_item_index = 0 |
| _progress = Progress(total_log_items) |
| # Start pooled processing |
| results = pool.imap(self._get_pod_log, _params) |
| # Catch and parse results |
| while True: |
| try: |
| # use timeout as some of the request can hang |
| _r = results.next(timeout=10) |
| _namepath = "{}/{}:{}".format( |
| _params[log_item_index][0], |
| _params[log_item_index][1], |
| _params[log_item_index][2] |
| ) |
| |
| except StopIteration: |
| # end of pool |
| break |
| |
| except TimeoutError: |
| # report pod which hanged and ignore it |
| _progress.clearline() |
| logger_cli.warning( |
| "WARNING: Timeout waiting for log content {}".format( |
| _namepath |
| ) |
| ) |
| continue |
| if _r is not None: |
| _raw = _r |
| _size = len(_raw) |
| # Save detected data |
| self.logs[_namepath] = { |
| "ns": _params[log_item_index][0], |
| "pod_name": _params[log_item_index][1], |
| "container_name": _params[log_item_index][2], |
| "raw": _raw |
| } |
| |
| # print progress |
| _progress.write_progress( |
| log_item_index+1, |
| note="'{}': {} chars".format(_namepath, _size) |
| ) |
| |
| # track next param set |
| log_item_index += 1 |
| |
| _progress.end() |
| pool.close() |
| pool.join() |
| |
| # debug |
| # with open("logs_debug.json", "w+") as wf: |
| # wf.write(json.dumps(self.logs)) |
| # debug |
| return |
| |
| def _parse_kaas_timestamps(self): |
| # shortcut function to get array aligned index |
| def _get_group(match, key): |
| return match[self.kaas_ts_regex.groupindex[key]-1] |
| |
| logger_cli.info("-> Parsing kaas timestamps") |
| # iterate logs |
| _counter = 0 |
| _pbar = Progress(len(self.logs)) |
| for _namepath, _item in self.logs.items(): |
| # next progress bar item |
| _counter += 1 |
| _pbar.write_progress(_counter, note=_namepath) |
| # Get all lines from log matched |
| _matches = self.kaas_ts_regex.findall(_item.pop("raw")) |
| # iterate matches and parse timestamp |
| _log = [] |
| for _match in _matches: |
| # new log item |
| _log_line = _re_groups_as_dict(self.kaas_ts_regex, _match) |
| # parse ts from kaas |
| _pts = self._safe_parse_date( |
| _log_line["kaas_date"], |
| kaas_timestamp_re["date_fmt"], |
| note=_namepath |
| ) |
| _log_line["kaas_ts"] = _pts |
| |
| # save log item |
| _log.append(_log_line) |
| |
| # save pmessage and kaas_ts |
| _item["total_lines"] = len(_matches) |
| _item["log"] = _log |
| |
| _pbar.end() |
| return |
| |
| @staticmethod |
| def _get_matched(regex, line): |
| # Check if regex has matching groups number in last line |
| _c = regex["compiled"] |
| _m = _c.findall(line) |
| # calculate groups if there is a match |
| _group_count = len(_m[0]) if len(_m) > 0 else 0 |
| # Check than group count at least 2 |
| # and check that matched number of groups found |
| if _group_count > 1 and _group_count == len(_c.groupindex): |
| return _m |
| else: |
| return [] |
| |
| def _parse_log_item(self, log_item): |
| def _detect_re(): |
| _re = None |
| for regex in self.item_regex: |
| _m = self._get_matched(regex, _message) |
| if _m: |
| _re = regex |
| break |
| return _re, _m |
| |
| # parse whole log using detected pattern |
| l_parsed = 0 |
| l_not_parsed = 0 |
| l_skipped = 0 |
| _index = 0 |
| _li = log_item["log"] |
| _re = None |
| while _index < log_item["total_lines"]: |
| # pop message as there might be similar group name present |
| _message = _li[_index].pop("message") |
| # Parse line |
| _m = [] |
| # Try last regex for this item |
| if _re is not None: |
| _m = self._get_matched(_re, _message) |
| # if not matched |
| if not _m: |
| # Try every regex to match line format |
| # by counting groups detected |
| _re, _m = _detect_re() |
| if len(_m) == 1: |
| # get matches with group names as a dict |
| _new_line_items = \ |
| _re_groups_as_dict(_re["compiled"], _m[0]) |
| # update original dict |
| _li[_index].update(_new_line_items) |
| # Parse date |
| _pts = self._safe_parse_date( |
| _new_line_items["date"], |
| _re["date_fmt"] |
| ) |
| _li[_index]["ts"] = _pts |
| l_parsed += 1 |
| elif len(_m) == 0: |
| # put back message that failed to parse |
| _li[_index]["message"] = _message |
| l_not_parsed += 1 |
| else: |
| # Should never happen |
| logger_cli.warning( |
| "WARNING: Skipping ambigious log message: " |
| "'{}'".format(_message) |
| ) |
| l_skipped += 0 |
| # next line |
| _index += 1 |
| log_item["stats"] = { |
| "parsed": l_parsed, |
| "not_parsed": l_not_parsed, |
| "skipped": l_skipped |
| } |
| |
| def parse_logs(self): |
| |
| # debug: load precreated logs |
| # _ll = {} |
| # with open("logs_debug.json", "r+") as rf: |
| # _ll = json.loads(rf.read()) |
| # if _ll: |
| # self.logs = _ll |
| # debug: end |
| |
| # Get kaas ts as a plan B if log time either not present or not parsed |
| self._parse_kaas_timestamps() |
| |
| logger_cli.info("-> Parsing logs") |
| _p = Progress(len(self.logs)) |
| idx = 1 |
| totalParsed = 0 |
| totalNotParsed = 0 |
| # iterate raw logs and try to parse actual pod timing |
| for _namepath, _log in self.logs.items(): |
| # Update progress bar |
| _p.write_progress( |
| idx, |
| note="parsed: {}, not parsed: {}, => {}".format( |
| totalParsed, |
| totalNotParsed, |
| _namepath |
| ) |
| ) |
| # Parse log |
| self._parse_log_item(_log) |
| if self.dumppath != "null": |
| for line in _log["log"]: |
| if "date" not in line.keys(): |
| # Log line parsed |
| _filename = os.path.join( |
| self.dumppath, |
| _log["pod_name"]+"-"+_log["container_name"]+".log" |
| ) |
| with open(_filename, "a") as rawlogfile: |
| rawlogfile.write( |
| "<KTS>{} <M>{}\n".format( |
| line["kaas_date"], |
| line["message"] |
| ) |
| ) |
| # Stats |
| totalParsed += _log["stats"]["parsed"] |
| totalNotParsed += _log["stats"]["not_parsed"] |
| # Update progress bar |
| # _p.write_progress( |
| # idx, |
| # note="parsed: {}, not parsed: {}, => {}".format( |
| # totalParsed, |
| # totalNotParsed, |
| # _namepath |
| # ) |
| # ) |
| idx += 1 |
| _p.end() |
| |
| def merge_logs(self): |
| logger_cli.info("# Merging logs") |
| _merged = {} |
| for _pod, _logs in self.logs.items(): |
| for _li in _logs["log"]: |
| # Prepare log entry |
| _li["namepath"] = _pod |
| |
| # check if timestamp is detected |
| if "ts" not in _li: |
| # Use kaas_ts as a timestamp |
| _timestamp = _li.pop("kaas_ts") |
| else: |
| # get parsed timestamp |
| _timestamp = _li.pop("ts") |
| |
| # and put undetected lines separatelly |
| # save items using timestamps |
| _merged[float(_timestamp.timestamp())] = _li |
| self.merged_logs = _merged |
| return |
| |
| def save_logs(self, filename): |
| logger_cli.info("# Writing output file: '{}'".format(filename)) |
| with open(filename, 'w+') as ff: |
| _log_iter = sorted( |
| self.merged_logs.items(), key=lambda item: item[0] |
| ) |
| for k, v in _log_iter: |
| ff.write( |
| "{} {}: {}\n".format( |
| v.pop("namepath"), |
| datetime.fromtimestamp(k).strftime(_datetime_fmt), |
| " ".join(["{}={}".format(k, v) for k, v in v.items()]) |
| ) |
| ) |
| return |