blob: 5375421ab3c62729decb40479632929256a91af7 [file] [log] [blame]
# Author: Alex Savatieiev (osavatieiev@mirantis.com; a.savex@gmail.com)
# Copyright 2019-2022 Mirantis, Inc.
import os
import re
from datetime import datetime
from multiprocessing.dummy import Pool
from multiprocessing import TimeoutError
from cfg_checker.common import logger_cli
# from cfg_checker.common.exception import KubeException
from cfg_checker.helpers.console_utils import Progress
from cfg_checker.nodes import KubeNodes
_datetime_fmt = "%Y-%m-%d-%H-%M-%S.%f"
# parsers = [
# [
# <split lines_parser>,
# <split data for a line parser>
# ]
# ]
kaas_timestamp_re = {
# kaas timestamp
# original ts comes from pandas: 2022-06-16T21:32:12.977674062Z
# since we do not have nanoseconds support in standard datetime,
# just throw away the 3 digits and 'Z'
"re": "(?P<kaas_date>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6})"
"(?:\d{3}Z) "
"(?P<message>.+?("
"(?=\\n\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6})|(?=\\n$))"
")",
"date_fmt": "%Y-%m-%dT%H:%M:%S.%f"
}
log_item_re = [
{
# (mariadb) YYYY-MM-DD hh:mm:ss
# 2022-06-08 01:38:54 DEBUG mariadb-controller Sleeping for 10
"re": "(?P<date>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) +(?P<type>INFO|DEBUG|ERROR|FAIL|WARNING) +(?P<message>.*)",
"groups": 3,
"date_fmt": "%Y-%m-%d %H:%M:%S"
},
{
# (iam-proxy-alerta...) nnn.nnn.nnn.nnn:ppppp - uuid - - [YYYY/MM/DD hh:mm:ss]
# '172.16.35.69:35298 - 9b68130c-4c3b-4abd-bb04-7ff5329ad644 - - [2022/04/01 23:00:50] 10.233.118.232:4180 GET - "/ping" HTTP/1.1 "kube-probe/1.20+" 200 2 0.000'
"re": "(?P<src_ip>\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}):(?P<src_port>\d{1,5}).\-.(?P<guid>\S{8}-\S{4}-\S{4}-\S{4}-\S{12}).-.-.\[(?P<date>\d{4}\/\d{2}\/\d{2} \d{2}:\d{2}:\d{2})\] (?P<dest_ip>\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}):(?P<dest_port>\d{1,5}) (?P<message>.*)",
"groups": 7,
"date_fmt": "%Y/%m/%d %H:%M:%S"
},
{
# (default1) YYYY-MM-DD hh:mm:ss,nnn
#
"re": "(?P<date>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\,\d{3}) +(?P<type>INFO|DEBUG|ERROR|FAIL|WARNING) +(?P<message>.*)",
"groups": 3,
"date_fmt": "%Y-%m-%d %H:%M:%S,%f"
},
{
# (default1a) YYYY-MM-DD hh:mm:ss,nnn
# 2022-06-27 23:34:51,845 - OpenStack-Helm Mariadb - INFO - Updating grastate configmap
"re": "(?P<date>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\,\d{3}) \- (?P<process>.+) \- (?P<type>INFO|DEBUG|ERROR|FAIL|WARNING) \- (?P<message>.*)",
"groups": 4,
"date_fmt": "%Y-%m-%d %H:%M:%S,%f"
},
{
# (default2) YYYY-MM-DD hh:mm:ss.nnn
# 2022-05-23 04:01:06.360 7 INFO barbican.model.clean [-] Cleaning up soft deletions in the barbican database
"re": "(?P<date>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}) +(?P<pid>\d{1,6}) +(?P<type>INFO|DEBUG|ERROR|FAIL|WARNING) +(?P<message>.*)",
"groups": 4,
"date_fmt": "%Y-%m-%d %H:%M:%S.%f"
},
{
# (default3) YYYY-MM-DD hh:mm:ss.nnn
# <date> - <type> - <message>
"re": "(?P<date>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}).\-.(?P<type>INFO|DEBUG|ERROR|FAIL|WARNING).\-.(?P<message>.*)",
"groups": 3,
"date_fmt": "%Y-%m-%d %H:%M:%S.%f"
},
{
# libvirt
# 2022-06-16 12:48:59.509+0000: 53235: info : libvirt version: 6.0.0, package: 0ubuntu8.15~cloud0 (Openstack Ubuntu Testing Bot <openstack-testing-bot@ubuntu.com> Mon, 22 Nov 2021 16:37:15 +0000)
"re": "(?P<date>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}\+\d{4}): (?P<pid>\d{1,6}):.(?P<type>info|debug|error|fail|warning)\s: (?P<message>.*)",
"groups": 4,
"date_fmt": "%Y-%m-%d %H:%M:%S.%f%z"
},
{
# 2022-06-28 00:00:55.400745 2022-06-28 00:00:55.400 13 INFO cinder.api.openstack.wsgi [req-07ca8c70-f33d-406f-9427-5388b1656297 9e5e4502e0c34c0eabcc5bfbc499b059 343ab637681b4520bf4f5a7b826b9803 - default default] https://cinder.ic-eu.ssl.mirantis.net/v2/343ab637681b4520bf4f5a7b826b9803/volumes/detail?metadata=%7B%27KaaS%27%3A%274131b4dc-81ab-4d84-b991-7b63f225058c%27%7D returned with HTTP 200
"re": "(?P<date>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{6}) +(?P<date_alt>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}) +(?P<pid>\d{1,6}) +(?P<type>INFO|DEBUG|ERROR|FAIL|WARNING) +(?P<message>.*)",
"groups": 5,
"date_fmt": "%Y-%m-%d %H:%M:%S.%f"
},
{
# 'stacklight/alerta-5fc6f5dfd-jj7ml'
# '2022/04/01 23:12:37 [info] 26#26: *124156 client 127.0.0.1 closed keepalive connection\n'
"re": "(?P<date>\d{4}\/\d{2}\/\d{2} \d{2}:\d{2}:\d{2}) +\[(?P<type>info|debug|error|fail|warning)\] +(?P<message>.*)",
"groups": 3,
"date_fmt": "%Y/%m/%d %H:%M:%S"
},
{
# (nova-api-osapi-....) YYYY-MM-DD hh:mm:ss,nnnnnn
# '2022-04-01 23:08:11.806062 capabilities. Old policies are deprecated and silently going to be ignored'
"re": "(?P<date>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{6}) +(?P<message>.*)",
"groups": 2,
"date_fmt": "%Y-%m-%d %H:%M:%S.%f"
},
{
# (nova-api-metadata..) nnn.nnn.nnn.nnn - - [DD/MMM/YYYY:hh:mm:ss +nnnn]
# '172.16.35.67 - - [01/Apr/2022:22:23:14 +0000] "GET / HTTP/1.1" 200 98 1345 "-" "kube-probe/1.20+"'
"re": "(?P<src_ip>\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}).+-.+-.+\[(?P<date>\d{1,2}\/\S{3}\/\d{4}:\d{2}:\d{2}:\d{2}.\+\d{4})\] +(?P<message>.*)",
"groups": 3,
"date_fmt": "%d/%b/%Y:%H:%M:%S %z"
},
{
# mysqld exporter
# time="2022-06-15T16:16:36Z" level=info msg="Starting mysqld_exporter (version=0.11.0, branch=HEAD, revision=5d7179615695a61ecc3b5bf90a2a7c76a9592cdd)" source="mysqld_exporter.go:206"
"re": "time\=\"(?P<date>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})Z\" level\=(?P<type>info|debug|error|fail|warning|warn) msg\=\"(?P<message>.*)\" source\=\"(?P<source>.+)\"",
"groups": 4,
"date_fmt": "%Y-%m-%dT%H:%M:%S"
},
{
# metrics
# 2022-06-24 20:55:19.752754+00:00 [info] <0.716.0> Setting segment_entry_count for vhost 'barbican' with 0 queues to '2048'
"re": "(?P<date>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}.\d{6})\+00:00 \[(?P<type>info|debug|error|fail|warning|warn)\] (?P<message>.*)",
"groups": 3,
"date_fmt": "%Y-%m-%d %H:%M:%S.%f"
},
{
# openvswitch
# 2022-06-27T23:12:52Z|25993|reconnect|WARN|unix#89422: connection dropped (Connection reset by peer)
# 2022-06-27T21:31:11Z|08582|connmgr|INFO|br-tun<->tcp:127.0.0.1:6633: 6 flow_mods in the 3 s starting 10 s ago (6 adds)
"re": "(?P<date>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})Z\|(?P<pid>\d{1,6})\|(?P<action>.+)\|(?P<type>INFO|DEBUG|ERROR|ERR|FAIL|WARNING|WARN)\|(?P<message>.*)",
"groups": 5,
"date_fmt": "%Y-%m-%dT%H:%M:%S"
}
]
def _re_groups_as_dict(compiled, re_match):
_d = {}
for _k in compiled.groupindex.keys():
_d[_k] = re_match[compiled.groupindex[_k]-1]
return _d
class MosLogger(object):
def __init__(
self,
config
):
self.env_config = config
return
class KubeMosLogger(MosLogger):
def __init__(self, config):
self.master = KubeNodes(config)
super(KubeMosLogger, self).__init__(config)
# Init ceph tools pod
self.logs = {}
self.merged_logs = {}
self.kaas_ts_regex = re.compile(kaas_timestamp_re["re"])
self.item_regex = []
self.dumppath = config.dumppath
self.tail_lines = config.tail_lines
for regex in log_item_re:
self.item_regex.append(
{
"compiled": re.compile(regex["re"]),
"date_fmt": regex["date_fmt"],
"groups": regex["groups"]
}
)
def _keywords(self, tstring, keywords):
return [True if k in tstring else False for k in keywords]
def _safe_parse_date(self, str_date, fmt, note="-"):
# DEBUG
try:
_t = datetime.strptime(str_date, fmt)
except ValueError:
logger_cli.warning(
"WARNING ({}): Can't parse date '{}'"
" using '{}'".format(
note,
str_date,
fmt
)
)
_t = -1
return _t
def prepare_pods(
self,
ns_list,
kw_list,
inclusive_filter=True,
exclude_kw=[]
):
def _list_with_containers(pod_item):
_list = []
for c in pod_item[2]:
_list.append([
pod_item[0],
pod_item[1],
c
])
return _list
logger_cli.info(
"# Preparing pods, ns: {}; keywords: {}".format(
", ".join(ns_list) if ns_list else "*",
", ".join(kw_list) if kw_list else "*"
)
)
# [ns, pod_name]
_target_pods = []
# validate ns
_all_namespaces = self.master.list_namespaces()
for _ns in _all_namespaces:
if _ns in ns_list or not ns_list:
_tmp_pods = []
logger_cli.info("-> processing namespace '{}'".format(_ns))
# list pods using mask
logger_cli.debug("... getting pods list")
_pods = self.master.list_pod_names_with_containers(ns=_ns)
logger_cli.debug("... {} total pods found".format(len(_pods)))
for _pod in _pods:
# _pod[_ns, _name]
_kw = self._keywords(_pod[1], kw_list) \
if kw_list else [True]
if any(self._keywords(_pod[1], exclude_kw)) or \
any(self._keywords(_pod[2], exclude_kw)):
logger_cli.debug("... skipped '{}'".format(_pod[1]))
continue
elif (not inclusive_filter and all(_kw)) or \
(inclusive_filter and any(_kw)):
_cpods = _list_with_containers(_pod)
_tmp_pods += _cpods
logger_cli.debug(
"... added {} items for pod '{}'".format(
len(_cpods),
"/".join(_pod[:2])
)
)
else:
# logger_cli.debug("... skipped pod '{}'".format(_pod))
pass
logger_cli.info(
"-> {} pods processed, "
"{} log items to be collected".format(
len(_pods),
len(_tmp_pods)
)
)
_target_pods += _tmp_pods
logger_cli.info(
"-> found {} log items total".format(len(_target_pods))
)
return _target_pods
def _get_pod_log(self, params):
ns = params[0]
name = params[1]
container_name = params[2]
# Get target log
_log_data = self.master.get_logs_for_pod(
name,
container_name,
ns,
tail_lines=self.tail_lines
)
if len(_log_data) < 10:
return None
else:
return _log_data
def collect_logs(self, pods_list):
# Prepare list of pods to collect
# cmd = """
# kubectl get pods -A -o=jsonpath='{range .items[*]}
# {.metadata.namespace}{"/"}
# {.metadata.name}{"\n"}{range .spec.containers[*]} {.name}{"\n"}
# {end}'
# """
logger_cli.info(
"# Collecting logs using {} threads".format(
self.env_config.sage_threads
)
)
# Do collect using pooled threads
pool = Pool(self.env_config.sage_threads)
_params = []
# Prepare params for getting logs
for _ns, _pod_name, _cont_name in pods_list:
_params.append([_ns, _pod_name, _cont_name])
# Init progress bar
total_log_items = len(_params)
log_item_index = 0
_progress = Progress(total_log_items)
# Start pooled processing
results = pool.imap(self._get_pod_log, _params)
# Catch and parse results
while True:
try:
# use timeout as some of the request can hang
_r = results.next(timeout=10)
_namepath = "{}/{}:{}".format(
_params[log_item_index][0],
_params[log_item_index][1],
_params[log_item_index][2]
)
except StopIteration:
# end of pool
break
except TimeoutError:
# report pod which hanged and ignore it
_progress.clearline()
logger_cli.warning(
"WARNING: Timeout waiting for log content {}".format(
_namepath
)
)
continue
if _r is not None:
_raw = _r
_size = len(_raw)
# Save detected data
self.logs[_namepath] = {
"ns": _params[log_item_index][0],
"pod_name": _params[log_item_index][1],
"container_name": _params[log_item_index][2],
"raw": _raw
}
# print progress
_progress.write_progress(
log_item_index+1,
note="'{}': {} chars".format(_namepath, _size)
)
# track next param set
log_item_index += 1
_progress.end()
pool.close()
pool.join()
# debug
# with open("logs_debug.json", "w+") as wf:
# wf.write(json.dumps(self.logs))
# debug
return
def _parse_kaas_timestamps(self):
# shortcut function to get array aligned index
def _get_group(match, key):
return match[self.kaas_ts_regex.groupindex[key]-1]
logger_cli.info("-> Parsing kaas timestamps")
# iterate logs
_counter = 0
_pbar = Progress(len(self.logs))
for _namepath, _item in self.logs.items():
# next progress bar item
_counter += 1
_pbar.write_progress(_counter, note=_namepath)
# Get all lines from log matched
_matches = self.kaas_ts_regex.findall(_item.pop("raw"))
# iterate matches and parse timestamp
_log = []
for _match in _matches:
# new log item
_log_line = _re_groups_as_dict(self.kaas_ts_regex, _match)
# parse ts from kaas
_pts = self._safe_parse_date(
_log_line["kaas_date"],
kaas_timestamp_re["date_fmt"],
note=_namepath
)
_log_line["kaas_ts"] = _pts
# save log item
_log.append(_log_line)
# save pmessage and kaas_ts
_item["total_lines"] = len(_matches)
_item["log"] = _log
_pbar.end()
return
@staticmethod
def _get_matched(regex, line):
# Check if regex has matching groups number in last line
_c = regex["compiled"]
_m = _c.findall(line)
# calculate groups if there is a match
_group_count = len(_m[0]) if len(_m) > 0 else 0
# Check than group count at least 2
# and check that matched number of groups found
if _group_count > 1 and _group_count == len(_c.groupindex):
return _m
else:
return []
def _parse_log_item(self, log_item):
def _detect_re():
_re = None
for regex in self.item_regex:
_m = self._get_matched(regex, _message)
if _m:
_re = regex
break
return _re, _m
# parse whole log using detected pattern
l_parsed = 0
l_not_parsed = 0
l_skipped = 0
_index = 0
_li = log_item["log"]
_re = None
while _index < log_item["total_lines"]:
# pop message as there might be similar group name present
_message = _li[_index].pop("message")
# Parse line
_m = []
# Try last regex for this item
if _re is not None:
_m = self._get_matched(_re, _message)
# if not matched
if not _m:
# Try every regex to match line format
# by counting groups detected
_re, _m = _detect_re()
if len(_m) == 1:
# get matches with group names as a dict
_new_line_items = \
_re_groups_as_dict(_re["compiled"], _m[0])
# update original dict
_li[_index].update(_new_line_items)
# Parse date
_pts = self._safe_parse_date(
_new_line_items["date"],
_re["date_fmt"]
)
_li[_index]["ts"] = _pts
l_parsed += 1
elif len(_m) == 0:
# put back message that failed to parse
_li[_index]["message"] = _message
l_not_parsed += 1
else:
# Should never happen
logger_cli.warning(
"WARNING: Skipping ambigious log message: "
"'{}'".format(_message)
)
l_skipped += 0
# next line
_index += 1
log_item["stats"] = {
"parsed": l_parsed,
"not_parsed": l_not_parsed,
"skipped": l_skipped
}
def parse_logs(self):
# debug: load precreated logs
# _ll = {}
# with open("logs_debug.json", "r+") as rf:
# _ll = json.loads(rf.read())
# if _ll:
# self.logs = _ll
# debug: end
# Get kaas ts as a plan B if log time either not present or not parsed
self._parse_kaas_timestamps()
logger_cli.info("-> Parsing logs")
_p = Progress(len(self.logs))
idx = 1
totalParsed = 0
totalNotParsed = 0
# iterate raw logs and try to parse actual pod timing
for _namepath, _log in self.logs.items():
# Update progress bar
_p.write_progress(
idx,
note="parsed: {}, not parsed: {}, => {}".format(
totalParsed,
totalNotParsed,
_namepath
)
)
# Parse log
self._parse_log_item(_log)
if self.dumppath != "null":
for line in _log["log"]:
if "date" not in line.keys():
# Log line parsed
_filename = os.path.join(
self.dumppath,
_log["pod_name"]+"-"+_log["container_name"]+".log"
)
with open(_filename, "a") as rawlogfile:
rawlogfile.write(
"<KTS>{} <M>{}\n".format(
line["kaas_date"],
line["message"]
)
)
# Stats
totalParsed += _log["stats"]["parsed"]
totalNotParsed += _log["stats"]["not_parsed"]
# Update progress bar
# _p.write_progress(
# idx,
# note="parsed: {}, not parsed: {}, => {}".format(
# totalParsed,
# totalNotParsed,
# _namepath
# )
# )
idx += 1
_p.end()
def merge_logs(self):
logger_cli.info("# Merging logs")
_merged = {}
for _pod, _logs in self.logs.items():
for _li in _logs["log"]:
# Prepare log entry
_li["namepath"] = _pod
# check if timestamp is detected
if "ts" not in _li:
# Use kaas_ts as a timestamp
_timestamp = _li.pop("kaas_ts")
else:
# get parsed timestamp
_timestamp = _li.pop("ts")
# and put undetected lines separatelly
# save items using timestamps
_merged[float(_timestamp.timestamp())] = _li
self.merged_logs = _merged
return
def save_logs(self, filename):
logger_cli.info("# Writing output file: '{}'".format(filename))
with open(filename, 'w+') as ff:
_log_iter = sorted(
self.merged_logs.items(), key=lambda item: item[0]
)
for k, v in _log_iter:
ff.write(
"{} {}: {}\n".format(
v.pop("namepath"),
datetime.fromtimestamp(k).strftime(_datetime_fmt),
" ".join(["{}={}".format(k, v) for k, v in v.items()])
)
)
return