blob: 5375421ab3c62729decb40479632929256a91af7 [file] [log] [blame]
Alex0bcf31b2022-03-29 17:38:58 -05001# Author: Alex Savatieiev (osavatieiev@mirantis.com; a.savex@gmail.com)
2# Copyright 2019-2022 Mirantis, Inc.
3import os
4import re
5
6from datetime import datetime
7from multiprocessing.dummy import Pool
8from multiprocessing import TimeoutError
9
10from cfg_checker.common import logger_cli
11# from cfg_checker.common.exception import KubeException
12
13from cfg_checker.helpers.console_utils import Progress
14from cfg_checker.nodes import KubeNodes
15
16
17_datetime_fmt = "%Y-%m-%d-%H-%M-%S.%f"
18
19# parsers = [
20# [
21# <split lines_parser>,
22# <split data for a line parser>
23# ]
24# ]
25kaas_timestamp_re = {
26 # kaas timestamp
27 # original ts comes from pandas: 2022-06-16T21:32:12.977674062Z
28 # since we do not have nanoseconds support in standard datetime,
29 # just throw away the 3 digits and 'Z'
30 "re": "(?P<kaas_date>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6})"
31 "(?:\d{3}Z) "
32 "(?P<message>.+?("
33 "(?=\\n\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6})|(?=\\n$))"
34 ")",
35 "date_fmt": "%Y-%m-%dT%H:%M:%S.%f"
36}
37log_item_re = [
38 {
39 # (mariadb) YYYY-MM-DD hh:mm:ss
40 # 2022-06-08 01:38:54 DEBUG mariadb-controller Sleeping for 10
41 "re": "(?P<date>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) +(?P<type>INFO|DEBUG|ERROR|FAIL|WARNING) +(?P<message>.*)",
42 "groups": 3,
43 "date_fmt": "%Y-%m-%d %H:%M:%S"
44 },
45 {
46 # (iam-proxy-alerta...) nnn.nnn.nnn.nnn:ppppp - uuid - - [YYYY/MM/DD hh:mm:ss]
47 # '172.16.35.69:35298 - 9b68130c-4c3b-4abd-bb04-7ff5329ad644 - - [2022/04/01 23:00:50] 10.233.118.232:4180 GET - "/ping" HTTP/1.1 "kube-probe/1.20+" 200 2 0.000'
48 "re": "(?P<src_ip>\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}):(?P<src_port>\d{1,5}).\-.(?P<guid>\S{8}-\S{4}-\S{4}-\S{4}-\S{12}).-.-.\[(?P<date>\d{4}\/\d{2}\/\d{2} \d{2}:\d{2}:\d{2})\] (?P<dest_ip>\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}):(?P<dest_port>\d{1,5}) (?P<message>.*)",
49 "groups": 7,
50 "date_fmt": "%Y/%m/%d %H:%M:%S"
51 },
52 {
53 # (default1) YYYY-MM-DD hh:mm:ss,nnn
54 #
55 "re": "(?P<date>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\,\d{3}) +(?P<type>INFO|DEBUG|ERROR|FAIL|WARNING) +(?P<message>.*)",
56 "groups": 3,
57 "date_fmt": "%Y-%m-%d %H:%M:%S,%f"
58 },
59 {
60 # (default1a) YYYY-MM-DD hh:mm:ss,nnn
61 # 2022-06-27 23:34:51,845 - OpenStack-Helm Mariadb - INFO - Updating grastate configmap
62 "re": "(?P<date>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\,\d{3}) \- (?P<process>.+) \- (?P<type>INFO|DEBUG|ERROR|FAIL|WARNING) \- (?P<message>.*)",
63 "groups": 4,
64 "date_fmt": "%Y-%m-%d %H:%M:%S,%f"
65 },
66 {
67 # (default2) YYYY-MM-DD hh:mm:ss.nnn
68 # 2022-05-23 04:01:06.360 7 INFO barbican.model.clean [-] Cleaning up soft deletions in the barbican database
69 "re": "(?P<date>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}) +(?P<pid>\d{1,6}) +(?P<type>INFO|DEBUG|ERROR|FAIL|WARNING) +(?P<message>.*)",
70 "groups": 4,
71 "date_fmt": "%Y-%m-%d %H:%M:%S.%f"
72 },
73 {
74 # (default3) YYYY-MM-DD hh:mm:ss.nnn
75 # <date> - <type> - <message>
76 "re": "(?P<date>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}).\-.(?P<type>INFO|DEBUG|ERROR|FAIL|WARNING).\-.(?P<message>.*)",
77 "groups": 3,
78 "date_fmt": "%Y-%m-%d %H:%M:%S.%f"
79 },
80 {
81 # libvirt
82 # 2022-06-16 12:48:59.509+0000: 53235: info : libvirt version: 6.0.0, package: 0ubuntu8.15~cloud0 (Openstack Ubuntu Testing Bot <openstack-testing-bot@ubuntu.com> Mon, 22 Nov 2021 16:37:15 +0000)
83 "re": "(?P<date>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}\+\d{4}): (?P<pid>\d{1,6}):.(?P<type>info|debug|error|fail|warning)\s: (?P<message>.*)",
84 "groups": 4,
85 "date_fmt": "%Y-%m-%d %H:%M:%S.%f%z"
86 },
87 {
88 # 2022-06-28 00:00:55.400745 2022-06-28 00:00:55.400 13 INFO cinder.api.openstack.wsgi [req-07ca8c70-f33d-406f-9427-5388b1656297 9e5e4502e0c34c0eabcc5bfbc499b059 343ab637681b4520bf4f5a7b826b9803 - default default] https://cinder.ic-eu.ssl.mirantis.net/v2/343ab637681b4520bf4f5a7b826b9803/volumes/detail?metadata=%7B%27KaaS%27%3A%274131b4dc-81ab-4d84-b991-7b63f225058c%27%7D returned with HTTP 200
89 "re": "(?P<date>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{6}) +(?P<date_alt>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}) +(?P<pid>\d{1,6}) +(?P<type>INFO|DEBUG|ERROR|FAIL|WARNING) +(?P<message>.*)",
90 "groups": 5,
91 "date_fmt": "%Y-%m-%d %H:%M:%S.%f"
92 },
93 {
94 # 'stacklight/alerta-5fc6f5dfd-jj7ml'
95 # '2022/04/01 23:12:37 [info] 26#26: *124156 client 127.0.0.1 closed keepalive connection\n'
96 "re": "(?P<date>\d{4}\/\d{2}\/\d{2} \d{2}:\d{2}:\d{2}) +\[(?P<type>info|debug|error|fail|warning)\] +(?P<message>.*)",
97 "groups": 3,
98 "date_fmt": "%Y/%m/%d %H:%M:%S"
99 },
100 {
101 # (nova-api-osapi-....) YYYY-MM-DD hh:mm:ss,nnnnnn
102 # '2022-04-01 23:08:11.806062 capabilities. Old policies are deprecated and silently going to be ignored'
103 "re": "(?P<date>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{6}) +(?P<message>.*)",
104 "groups": 2,
105 "date_fmt": "%Y-%m-%d %H:%M:%S.%f"
106 },
107 {
108 # (nova-api-metadata..) nnn.nnn.nnn.nnn - - [DD/MMM/YYYY:hh:mm:ss +nnnn]
109 # '172.16.35.67 - - [01/Apr/2022:22:23:14 +0000] "GET / HTTP/1.1" 200 98 1345 "-" "kube-probe/1.20+"'
110 "re": "(?P<src_ip>\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}).+-.+-.+\[(?P<date>\d{1,2}\/\S{3}\/\d{4}:\d{2}:\d{2}:\d{2}.\+\d{4})\] +(?P<message>.*)",
111 "groups": 3,
112 "date_fmt": "%d/%b/%Y:%H:%M:%S %z"
113 },
114 {
115 # mysqld exporter
116 # time="2022-06-15T16:16:36Z" level=info msg="Starting mysqld_exporter (version=0.11.0, branch=HEAD, revision=5d7179615695a61ecc3b5bf90a2a7c76a9592cdd)" source="mysqld_exporter.go:206"
117 "re": "time\=\"(?P<date>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})Z\" level\=(?P<type>info|debug|error|fail|warning|warn) msg\=\"(?P<message>.*)\" source\=\"(?P<source>.+)\"",
118 "groups": 4,
119 "date_fmt": "%Y-%m-%dT%H:%M:%S"
120 },
121 {
122 # metrics
123 # 2022-06-24 20:55:19.752754+00:00 [info] <0.716.0> Setting segment_entry_count for vhost 'barbican' with 0 queues to '2048'
124 "re": "(?P<date>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}.\d{6})\+00:00 \[(?P<type>info|debug|error|fail|warning|warn)\] (?P<message>.*)",
125 "groups": 3,
126 "date_fmt": "%Y-%m-%d %H:%M:%S.%f"
127 },
128 {
129 # openvswitch
130 # 2022-06-27T23:12:52Z|25993|reconnect|WARN|unix#89422: connection dropped (Connection reset by peer)
131 # 2022-06-27T21:31:11Z|08582|connmgr|INFO|br-tun<->tcp:127.0.0.1:6633: 6 flow_mods in the 3 s starting 10 s ago (6 adds)
132 "re": "(?P<date>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})Z\|(?P<pid>\d{1,6})\|(?P<action>.+)\|(?P<type>INFO|DEBUG|ERROR|ERR|FAIL|WARNING|WARN)\|(?P<message>.*)",
133 "groups": 5,
134 "date_fmt": "%Y-%m-%dT%H:%M:%S"
135 }
136]
137
138
139def _re_groups_as_dict(compiled, re_match):
140 _d = {}
141 for _k in compiled.groupindex.keys():
142 _d[_k] = re_match[compiled.groupindex[_k]-1]
143 return _d
144
145
146class MosLogger(object):
147 def __init__(
148 self,
149 config
150 ):
151 self.env_config = config
152 return
153
154
155class KubeMosLogger(MosLogger):
156 def __init__(self, config):
157 self.master = KubeNodes(config)
158 super(KubeMosLogger, self).__init__(config)
159 # Init ceph tools pod
160 self.logs = {}
161 self.merged_logs = {}
162 self.kaas_ts_regex = re.compile(kaas_timestamp_re["re"])
163 self.item_regex = []
164 self.dumppath = config.dumppath
165 self.tail_lines = config.tail_lines
166 for regex in log_item_re:
167 self.item_regex.append(
168 {
169 "compiled": re.compile(regex["re"]),
170 "date_fmt": regex["date_fmt"],
171 "groups": regex["groups"]
172 }
173 )
174
175 def _keywords(self, tstring, keywords):
176 return [True if k in tstring else False for k in keywords]
177
178 def _safe_parse_date(self, str_date, fmt, note="-"):
179 # DEBUG
180 try:
181 _t = datetime.strptime(str_date, fmt)
182 except ValueError:
183 logger_cli.warning(
184 "WARNING ({}): Can't parse date '{}'"
185 " using '{}'".format(
186 note,
187 str_date,
188 fmt
189 )
190 )
191 _t = -1
192 return _t
193
194 def prepare_pods(
195 self,
196 ns_list,
197 kw_list,
198 inclusive_filter=True,
199 exclude_kw=[]
200 ):
201 def _list_with_containers(pod_item):
202 _list = []
203 for c in pod_item[2]:
204 _list.append([
205 pod_item[0],
206 pod_item[1],
207 c
208 ])
209 return _list
210 logger_cli.info(
211 "# Preparing pods, ns: {}; keywords: {}".format(
212 ", ".join(ns_list) if ns_list else "*",
213 ", ".join(kw_list) if kw_list else "*"
214 )
215 )
216 # [ns, pod_name]
217 _target_pods = []
218 # validate ns
219 _all_namespaces = self.master.list_namespaces()
220 for _ns in _all_namespaces:
221 if _ns in ns_list or not ns_list:
222 _tmp_pods = []
223 logger_cli.info("-> processing namespace '{}'".format(_ns))
224 # list pods using mask
225 logger_cli.debug("... getting pods list")
226 _pods = self.master.list_pod_names_with_containers(ns=_ns)
227 logger_cli.debug("... {} total pods found".format(len(_pods)))
228 for _pod in _pods:
229 # _pod[_ns, _name]
230 _kw = self._keywords(_pod[1], kw_list) \
231 if kw_list else [True]
232 if any(self._keywords(_pod[1], exclude_kw)) or \
233 any(self._keywords(_pod[2], exclude_kw)):
234 logger_cli.debug("... skipped '{}'".format(_pod[1]))
235 continue
236 elif (not inclusive_filter and all(_kw)) or \
237 (inclusive_filter and any(_kw)):
238 _cpods = _list_with_containers(_pod)
239 _tmp_pods += _cpods
240 logger_cli.debug(
241 "... added {} items for pod '{}'".format(
242 len(_cpods),
243 "/".join(_pod[:2])
244 )
245 )
246 else:
247 # logger_cli.debug("... skipped pod '{}'".format(_pod))
248 pass
249 logger_cli.info(
250 "-> {} pods processed, "
251 "{} log items to be collected".format(
252 len(_pods),
253 len(_tmp_pods)
254 )
255 )
256 _target_pods += _tmp_pods
257
258 logger_cli.info(
259 "-> found {} log items total".format(len(_target_pods))
260 )
261 return _target_pods
262
263 def _get_pod_log(self, params):
264 ns = params[0]
265 name = params[1]
266 container_name = params[2]
267 # Get target log
268 _log_data = self.master.get_logs_for_pod(
269 name,
270 container_name,
271 ns,
272 tail_lines=self.tail_lines
273 )
274 if len(_log_data) < 10:
275 return None
276 else:
277 return _log_data
278
279 def collect_logs(self, pods_list):
280
281 # Prepare list of pods to collect
282 # cmd = """
283 # kubectl get pods -A -o=jsonpath='{range .items[*]}
284 # {.metadata.namespace}{"/"}
285 # {.metadata.name}{"\n"}{range .spec.containers[*]} {.name}{"\n"}
286 # {end}'
287 # """
288
289 logger_cli.info(
290 "# Collecting logs using {} threads".format(
291 self.env_config.sage_threads
292 )
293 )
294 # Do collect using pooled threads
295 pool = Pool(self.env_config.sage_threads)
296 _params = []
297 # Prepare params for getting logs
298 for _ns, _pod_name, _cont_name in pods_list:
299 _params.append([_ns, _pod_name, _cont_name])
300 # Init progress bar
301 total_log_items = len(_params)
302 log_item_index = 0
303 _progress = Progress(total_log_items)
304 # Start pooled processing
305 results = pool.imap(self._get_pod_log, _params)
306 # Catch and parse results
307 while True:
308 try:
309 # use timeout as some of the request can hang
310 _r = results.next(timeout=10)
311 _namepath = "{}/{}:{}".format(
312 _params[log_item_index][0],
313 _params[log_item_index][1],
314 _params[log_item_index][2]
315 )
316
317 except StopIteration:
318 # end of pool
319 break
320
321 except TimeoutError:
322 # report pod which hanged and ignore it
323 _progress.clearline()
324 logger_cli.warning(
325 "WARNING: Timeout waiting for log content {}".format(
326 _namepath
327 )
328 )
329 continue
330 if _r is not None:
331 _raw = _r
332 _size = len(_raw)
333 # Save detected data
334 self.logs[_namepath] = {
335 "ns": _params[log_item_index][0],
336 "pod_name": _params[log_item_index][1],
337 "container_name": _params[log_item_index][2],
338 "raw": _raw
339 }
340
341 # print progress
342 _progress.write_progress(
343 log_item_index+1,
344 note="'{}': {} chars".format(_namepath, _size)
345 )
346
347 # track next param set
348 log_item_index += 1
349
350 _progress.end()
351 pool.close()
352 pool.join()
353
354 # debug
355 # with open("logs_debug.json", "w+") as wf:
356 # wf.write(json.dumps(self.logs))
357 # debug
358 return
359
360 def _parse_kaas_timestamps(self):
361 # shortcut function to get array aligned index
362 def _get_group(match, key):
363 return match[self.kaas_ts_regex.groupindex[key]-1]
364
365 logger_cli.info("-> Parsing kaas timestamps")
366 # iterate logs
367 _counter = 0
368 _pbar = Progress(len(self.logs))
369 for _namepath, _item in self.logs.items():
370 # next progress bar item
371 _counter += 1
372 _pbar.write_progress(_counter, note=_namepath)
373 # Get all lines from log matched
374 _matches = self.kaas_ts_regex.findall(_item.pop("raw"))
375 # iterate matches and parse timestamp
376 _log = []
377 for _match in _matches:
378 # new log item
379 _log_line = _re_groups_as_dict(self.kaas_ts_regex, _match)
380 # parse ts from kaas
381 _pts = self._safe_parse_date(
382 _log_line["kaas_date"],
383 kaas_timestamp_re["date_fmt"],
384 note=_namepath
385 )
386 _log_line["kaas_ts"] = _pts
387
388 # save log item
389 _log.append(_log_line)
390
391 # save pmessage and kaas_ts
392 _item["total_lines"] = len(_matches)
393 _item["log"] = _log
394
395 _pbar.end()
396 return
397
398 @staticmethod
399 def _get_matched(regex, line):
400 # Check if regex has matching groups number in last line
401 _c = regex["compiled"]
402 _m = _c.findall(line)
403 # calculate groups if there is a match
404 _group_count = len(_m[0]) if len(_m) > 0 else 0
405 # Check than group count at least 2
406 # and check that matched number of groups found
407 if _group_count > 1 and _group_count == len(_c.groupindex):
408 return _m
409 else:
410 return []
411
412 def _parse_log_item(self, log_item):
413 def _detect_re():
414 _re = None
415 for regex in self.item_regex:
416 _m = self._get_matched(regex, _message)
417 if _m:
418 _re = regex
419 break
420 return _re, _m
421
422 # parse whole log using detected pattern
423 l_parsed = 0
424 l_not_parsed = 0
425 l_skipped = 0
426 _index = 0
427 _li = log_item["log"]
428 _re = None
429 while _index < log_item["total_lines"]:
430 # pop message as there might be similar group name present
431 _message = _li[_index].pop("message")
432 # Parse line
433 _m = []
434 # Try last regex for this item
435 if _re is not None:
436 _m = self._get_matched(_re, _message)
437 # if not matched
438 if not _m:
439 # Try every regex to match line format
440 # by counting groups detected
441 _re, _m = _detect_re()
442 if len(_m) == 1:
443 # get matches with group names as a dict
444 _new_line_items = \
445 _re_groups_as_dict(_re["compiled"], _m[0])
446 # update original dict
447 _li[_index].update(_new_line_items)
448 # Parse date
449 _pts = self._safe_parse_date(
450 _new_line_items["date"],
451 _re["date_fmt"]
452 )
453 _li[_index]["ts"] = _pts
454 l_parsed += 1
455 elif len(_m) == 0:
456 # put back message that failed to parse
457 _li[_index]["message"] = _message
458 l_not_parsed += 1
459 else:
460 # Should never happen
461 logger_cli.warning(
462 "WARNING: Skipping ambigious log message: "
463 "'{}'".format(_message)
464 )
465 l_skipped += 0
466 # next line
467 _index += 1
468 log_item["stats"] = {
469 "parsed": l_parsed,
470 "not_parsed": l_not_parsed,
471 "skipped": l_skipped
472 }
473
474 def parse_logs(self):
475
476 # debug: load precreated logs
477 # _ll = {}
478 # with open("logs_debug.json", "r+") as rf:
479 # _ll = json.loads(rf.read())
480 # if _ll:
481 # self.logs = _ll
482 # debug: end
483
484 # Get kaas ts as a plan B if log time either not present or not parsed
485 self._parse_kaas_timestamps()
486
487 logger_cli.info("-> Parsing logs")
488 _p = Progress(len(self.logs))
489 idx = 1
490 totalParsed = 0
491 totalNotParsed = 0
492 # iterate raw logs and try to parse actual pod timing
493 for _namepath, _log in self.logs.items():
494 # Update progress bar
495 _p.write_progress(
496 idx,
497 note="parsed: {}, not parsed: {}, => {}".format(
498 totalParsed,
499 totalNotParsed,
500 _namepath
501 )
502 )
503 # Parse log
504 self._parse_log_item(_log)
505 if self.dumppath != "null":
506 for line in _log["log"]:
507 if "date" not in line.keys():
508 # Log line parsed
509 _filename = os.path.join(
510 self.dumppath,
511 _log["pod_name"]+"-"+_log["container_name"]+".log"
512 )
513 with open(_filename, "a") as rawlogfile:
514 rawlogfile.write(
515 "<KTS>{} <M>{}\n".format(
516 line["kaas_date"],
517 line["message"]
518 )
519 )
520 # Stats
521 totalParsed += _log["stats"]["parsed"]
522 totalNotParsed += _log["stats"]["not_parsed"]
523 # Update progress bar
524 # _p.write_progress(
525 # idx,
526 # note="parsed: {}, not parsed: {}, => {}".format(
527 # totalParsed,
528 # totalNotParsed,
529 # _namepath
530 # )
531 # )
532 idx += 1
533 _p.end()
534
535 def merge_logs(self):
536 logger_cli.info("# Merging logs")
537 _merged = {}
538 for _pod, _logs in self.logs.items():
539 for _li in _logs["log"]:
540 # Prepare log entry
541 _li["namepath"] = _pod
542
543 # check if timestamp is detected
544 if "ts" not in _li:
545 # Use kaas_ts as a timestamp
546 _timestamp = _li.pop("kaas_ts")
547 else:
548 # get parsed timestamp
549 _timestamp = _li.pop("ts")
550
551 # and put undetected lines separatelly
552 # save items using timestamps
553 _merged[float(_timestamp.timestamp())] = _li
554 self.merged_logs = _merged
555 return
556
557 def save_logs(self, filename):
558 logger_cli.info("# Writing output file: '{}'".format(filename))
559 with open(filename, 'w+') as ff:
560 _log_iter = sorted(
561 self.merged_logs.items(), key=lambda item: item[0]
562 )
563 for k, v in _log_iter:
564 ff.write(
565 "{} {}: {}\n".format(
566 v.pop("namepath"),
567 datetime.fromtimestamp(k).strftime(_datetime_fmt),
568 " ".join(["{}={}".format(k, v) for k, v in v.items()])
569 )
570 )
571 return