Alex | 0bcf31b | 2022-03-29 17:38:58 -0500 | [diff] [blame] | 1 | # Author: Alex Savatieiev (osavatieiev@mirantis.com; a.savex@gmail.com) |
| 2 | # Copyright 2019-2022 Mirantis, Inc. |
| 3 | import os |
| 4 | import re |
| 5 | |
| 6 | from datetime import datetime |
| 7 | from multiprocessing.dummy import Pool |
| 8 | from multiprocessing import TimeoutError |
| 9 | |
| 10 | from cfg_checker.common import logger_cli |
| 11 | # from cfg_checker.common.exception import KubeException |
| 12 | |
| 13 | from cfg_checker.helpers.console_utils import Progress |
| 14 | from cfg_checker.nodes import KubeNodes |
| 15 | |
| 16 | |
| 17 | _datetime_fmt = "%Y-%m-%d-%H-%M-%S.%f" |
| 18 | |
| 19 | # parsers = [ |
| 20 | # [ |
| 21 | # <split lines_parser>, |
| 22 | # <split data for a line parser> |
| 23 | # ] |
| 24 | # ] |
| 25 | kaas_timestamp_re = { |
| 26 | # kaas timestamp |
| 27 | # original ts comes from pandas: 2022-06-16T21:32:12.977674062Z |
| 28 | # since we do not have nanoseconds support in standard datetime, |
| 29 | # just throw away the 3 digits and 'Z' |
| 30 | "re": "(?P<kaas_date>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6})" |
| 31 | "(?:\d{3}Z) " |
| 32 | "(?P<message>.+?(" |
| 33 | "(?=\\n\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6})|(?=\\n$))" |
| 34 | ")", |
| 35 | "date_fmt": "%Y-%m-%dT%H:%M:%S.%f" |
| 36 | } |
| 37 | log_item_re = [ |
| 38 | { |
| 39 | # (mariadb) YYYY-MM-DD hh:mm:ss |
| 40 | # 2022-06-08 01:38:54 DEBUG mariadb-controller Sleeping for 10 |
| 41 | "re": "(?P<date>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) +(?P<type>INFO|DEBUG|ERROR|FAIL|WARNING) +(?P<message>.*)", |
| 42 | "groups": 3, |
| 43 | "date_fmt": "%Y-%m-%d %H:%M:%S" |
| 44 | }, |
| 45 | { |
| 46 | # (iam-proxy-alerta...) nnn.nnn.nnn.nnn:ppppp - uuid - - [YYYY/MM/DD hh:mm:ss] |
| 47 | # '172.16.35.69:35298 - 9b68130c-4c3b-4abd-bb04-7ff5329ad644 - - [2022/04/01 23:00:50] 10.233.118.232:4180 GET - "/ping" HTTP/1.1 "kube-probe/1.20+" 200 2 0.000' |
| 48 | "re": "(?P<src_ip>\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}):(?P<src_port>\d{1,5}).\-.(?P<guid>\S{8}-\S{4}-\S{4}-\S{4}-\S{12}).-.-.\[(?P<date>\d{4}\/\d{2}\/\d{2} \d{2}:\d{2}:\d{2})\] (?P<dest_ip>\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}):(?P<dest_port>\d{1,5}) (?P<message>.*)", |
| 49 | "groups": 7, |
| 50 | "date_fmt": "%Y/%m/%d %H:%M:%S" |
| 51 | }, |
| 52 | { |
| 53 | # (default1) YYYY-MM-DD hh:mm:ss,nnn |
| 54 | # |
| 55 | "re": "(?P<date>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\,\d{3}) +(?P<type>INFO|DEBUG|ERROR|FAIL|WARNING) +(?P<message>.*)", |
| 56 | "groups": 3, |
| 57 | "date_fmt": "%Y-%m-%d %H:%M:%S,%f" |
| 58 | }, |
| 59 | { |
| 60 | # (default1a) YYYY-MM-DD hh:mm:ss,nnn |
| 61 | # 2022-06-27 23:34:51,845 - OpenStack-Helm Mariadb - INFO - Updating grastate configmap |
| 62 | "re": "(?P<date>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\,\d{3}) \- (?P<process>.+) \- (?P<type>INFO|DEBUG|ERROR|FAIL|WARNING) \- (?P<message>.*)", |
| 63 | "groups": 4, |
| 64 | "date_fmt": "%Y-%m-%d %H:%M:%S,%f" |
| 65 | }, |
| 66 | { |
| 67 | # (default2) YYYY-MM-DD hh:mm:ss.nnn |
| 68 | # 2022-05-23 04:01:06.360 7 INFO barbican.model.clean [-] Cleaning up soft deletions in the barbican database[00m |
| 69 | "re": "(?P<date>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}) +(?P<pid>\d{1,6}) +(?P<type>INFO|DEBUG|ERROR|FAIL|WARNING) +(?P<message>.*)", |
| 70 | "groups": 4, |
| 71 | "date_fmt": "%Y-%m-%d %H:%M:%S.%f" |
| 72 | }, |
| 73 | { |
| 74 | # (default3) YYYY-MM-DD hh:mm:ss.nnn |
| 75 | # <date> - <type> - <message> |
| 76 | "re": "(?P<date>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}).\-.(?P<type>INFO|DEBUG|ERROR|FAIL|WARNING).\-.(?P<message>.*)", |
| 77 | "groups": 3, |
| 78 | "date_fmt": "%Y-%m-%d %H:%M:%S.%f" |
| 79 | }, |
| 80 | { |
| 81 | # libvirt |
| 82 | # 2022-06-16 12:48:59.509+0000: 53235: info : libvirt version: 6.0.0, package: 0ubuntu8.15~cloud0 (Openstack Ubuntu Testing Bot <openstack-testing-bot@ubuntu.com> Mon, 22 Nov 2021 16:37:15 +0000) |
| 83 | "re": "(?P<date>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}\+\d{4}): (?P<pid>\d{1,6}):.(?P<type>info|debug|error|fail|warning)\s: (?P<message>.*)", |
| 84 | "groups": 4, |
| 85 | "date_fmt": "%Y-%m-%d %H:%M:%S.%f%z" |
| 86 | }, |
| 87 | { |
| 88 | # 2022-06-28 00:00:55.400745 2022-06-28 00:00:55.400 13 INFO cinder.api.openstack.wsgi [req-07ca8c70-f33d-406f-9427-5388b1656297 9e5e4502e0c34c0eabcc5bfbc499b059 343ab637681b4520bf4f5a7b826b9803 - default default] https://cinder.ic-eu.ssl.mirantis.net/v2/343ab637681b4520bf4f5a7b826b9803/volumes/detail?metadata=%7B%27KaaS%27%3A%274131b4dc-81ab-4d84-b991-7b63f225058c%27%7D returned with HTTP 200 |
| 89 | "re": "(?P<date>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{6}) +(?P<date_alt>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}) +(?P<pid>\d{1,6}) +(?P<type>INFO|DEBUG|ERROR|FAIL|WARNING) +(?P<message>.*)", |
| 90 | "groups": 5, |
| 91 | "date_fmt": "%Y-%m-%d %H:%M:%S.%f" |
| 92 | }, |
| 93 | { |
| 94 | # 'stacklight/alerta-5fc6f5dfd-jj7ml' |
| 95 | # '2022/04/01 23:12:37 [info] 26#26: *124156 client 127.0.0.1 closed keepalive connection\n' |
| 96 | "re": "(?P<date>\d{4}\/\d{2}\/\d{2} \d{2}:\d{2}:\d{2}) +\[(?P<type>info|debug|error|fail|warning)\] +(?P<message>.*)", |
| 97 | "groups": 3, |
| 98 | "date_fmt": "%Y/%m/%d %H:%M:%S" |
| 99 | }, |
| 100 | { |
| 101 | # (nova-api-osapi-....) YYYY-MM-DD hh:mm:ss,nnnnnn |
| 102 | # '2022-04-01 23:08:11.806062 capabilities. Old policies are deprecated and silently going to be ignored' |
| 103 | "re": "(?P<date>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{6}) +(?P<message>.*)", |
| 104 | "groups": 2, |
| 105 | "date_fmt": "%Y-%m-%d %H:%M:%S.%f" |
| 106 | }, |
| 107 | { |
| 108 | # (nova-api-metadata..) nnn.nnn.nnn.nnn - - [DD/MMM/YYYY:hh:mm:ss +nnnn] |
| 109 | # '172.16.35.67 - - [01/Apr/2022:22:23:14 +0000] "GET / HTTP/1.1" 200 98 1345 "-" "kube-probe/1.20+"' |
| 110 | "re": "(?P<src_ip>\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}).+-.+-.+\[(?P<date>\d{1,2}\/\S{3}\/\d{4}:\d{2}:\d{2}:\d{2}.\+\d{4})\] +(?P<message>.*)", |
| 111 | "groups": 3, |
| 112 | "date_fmt": "%d/%b/%Y:%H:%M:%S %z" |
| 113 | }, |
| 114 | { |
| 115 | # mysqld exporter |
| 116 | # time="2022-06-15T16:16:36Z" level=info msg="Starting mysqld_exporter (version=0.11.0, branch=HEAD, revision=5d7179615695a61ecc3b5bf90a2a7c76a9592cdd)" source="mysqld_exporter.go:206" |
| 117 | "re": "time\=\"(?P<date>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})Z\" level\=(?P<type>info|debug|error|fail|warning|warn) msg\=\"(?P<message>.*)\" source\=\"(?P<source>.+)\"", |
| 118 | "groups": 4, |
| 119 | "date_fmt": "%Y-%m-%dT%H:%M:%S" |
| 120 | }, |
| 121 | { |
| 122 | # metrics |
| 123 | # 2022-06-24 20:55:19.752754+00:00 [info] <0.716.0> Setting segment_entry_count for vhost 'barbican' with 0 queues to '2048' |
| 124 | "re": "(?P<date>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}.\d{6})\+00:00 \[(?P<type>info|debug|error|fail|warning|warn)\] (?P<message>.*)", |
| 125 | "groups": 3, |
| 126 | "date_fmt": "%Y-%m-%d %H:%M:%S.%f" |
| 127 | }, |
| 128 | { |
| 129 | # openvswitch |
| 130 | # 2022-06-27T23:12:52Z|25993|reconnect|WARN|unix#89422: connection dropped (Connection reset by peer) |
| 131 | # 2022-06-27T21:31:11Z|08582|connmgr|INFO|br-tun<->tcp:127.0.0.1:6633: 6 flow_mods in the 3 s starting 10 s ago (6 adds) |
| 132 | "re": "(?P<date>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})Z\|(?P<pid>\d{1,6})\|(?P<action>.+)\|(?P<type>INFO|DEBUG|ERROR|ERR|FAIL|WARNING|WARN)\|(?P<message>.*)", |
| 133 | "groups": 5, |
| 134 | "date_fmt": "%Y-%m-%dT%H:%M:%S" |
| 135 | } |
| 136 | ] |
| 137 | |
| 138 | |
| 139 | def _re_groups_as_dict(compiled, re_match): |
| 140 | _d = {} |
| 141 | for _k in compiled.groupindex.keys(): |
| 142 | _d[_k] = re_match[compiled.groupindex[_k]-1] |
| 143 | return _d |
| 144 | |
| 145 | |
| 146 | class MosLogger(object): |
| 147 | def __init__( |
| 148 | self, |
| 149 | config |
| 150 | ): |
| 151 | self.env_config = config |
| 152 | return |
| 153 | |
| 154 | |
| 155 | class KubeMosLogger(MosLogger): |
| 156 | def __init__(self, config): |
| 157 | self.master = KubeNodes(config) |
| 158 | super(KubeMosLogger, self).__init__(config) |
| 159 | # Init ceph tools pod |
| 160 | self.logs = {} |
| 161 | self.merged_logs = {} |
| 162 | self.kaas_ts_regex = re.compile(kaas_timestamp_re["re"]) |
| 163 | self.item_regex = [] |
| 164 | self.dumppath = config.dumppath |
| 165 | self.tail_lines = config.tail_lines |
| 166 | for regex in log_item_re: |
| 167 | self.item_regex.append( |
| 168 | { |
| 169 | "compiled": re.compile(regex["re"]), |
| 170 | "date_fmt": regex["date_fmt"], |
| 171 | "groups": regex["groups"] |
| 172 | } |
| 173 | ) |
| 174 | |
| 175 | def _keywords(self, tstring, keywords): |
| 176 | return [True if k in tstring else False for k in keywords] |
| 177 | |
| 178 | def _safe_parse_date(self, str_date, fmt, note="-"): |
| 179 | # DEBUG |
| 180 | try: |
| 181 | _t = datetime.strptime(str_date, fmt) |
| 182 | except ValueError: |
| 183 | logger_cli.warning( |
| 184 | "WARNING ({}): Can't parse date '{}'" |
| 185 | " using '{}'".format( |
| 186 | note, |
| 187 | str_date, |
| 188 | fmt |
| 189 | ) |
| 190 | ) |
| 191 | _t = -1 |
| 192 | return _t |
| 193 | |
| 194 | def prepare_pods( |
| 195 | self, |
| 196 | ns_list, |
| 197 | kw_list, |
| 198 | inclusive_filter=True, |
| 199 | exclude_kw=[] |
| 200 | ): |
| 201 | def _list_with_containers(pod_item): |
| 202 | _list = [] |
| 203 | for c in pod_item[2]: |
| 204 | _list.append([ |
| 205 | pod_item[0], |
| 206 | pod_item[1], |
| 207 | c |
| 208 | ]) |
| 209 | return _list |
| 210 | logger_cli.info( |
| 211 | "# Preparing pods, ns: {}; keywords: {}".format( |
| 212 | ", ".join(ns_list) if ns_list else "*", |
| 213 | ", ".join(kw_list) if kw_list else "*" |
| 214 | ) |
| 215 | ) |
| 216 | # [ns, pod_name] |
| 217 | _target_pods = [] |
| 218 | # validate ns |
| 219 | _all_namespaces = self.master.list_namespaces() |
| 220 | for _ns in _all_namespaces: |
| 221 | if _ns in ns_list or not ns_list: |
| 222 | _tmp_pods = [] |
| 223 | logger_cli.info("-> processing namespace '{}'".format(_ns)) |
| 224 | # list pods using mask |
| 225 | logger_cli.debug("... getting pods list") |
| 226 | _pods = self.master.list_pod_names_with_containers(ns=_ns) |
| 227 | logger_cli.debug("... {} total pods found".format(len(_pods))) |
| 228 | for _pod in _pods: |
| 229 | # _pod[_ns, _name] |
| 230 | _kw = self._keywords(_pod[1], kw_list) \ |
| 231 | if kw_list else [True] |
| 232 | if any(self._keywords(_pod[1], exclude_kw)) or \ |
| 233 | any(self._keywords(_pod[2], exclude_kw)): |
| 234 | logger_cli.debug("... skipped '{}'".format(_pod[1])) |
| 235 | continue |
| 236 | elif (not inclusive_filter and all(_kw)) or \ |
| 237 | (inclusive_filter and any(_kw)): |
| 238 | _cpods = _list_with_containers(_pod) |
| 239 | _tmp_pods += _cpods |
| 240 | logger_cli.debug( |
| 241 | "... added {} items for pod '{}'".format( |
| 242 | len(_cpods), |
| 243 | "/".join(_pod[:2]) |
| 244 | ) |
| 245 | ) |
| 246 | else: |
| 247 | # logger_cli.debug("... skipped pod '{}'".format(_pod)) |
| 248 | pass |
| 249 | logger_cli.info( |
| 250 | "-> {} pods processed, " |
| 251 | "{} log items to be collected".format( |
| 252 | len(_pods), |
| 253 | len(_tmp_pods) |
| 254 | ) |
| 255 | ) |
| 256 | _target_pods += _tmp_pods |
| 257 | |
| 258 | logger_cli.info( |
| 259 | "-> found {} log items total".format(len(_target_pods)) |
| 260 | ) |
| 261 | return _target_pods |
| 262 | |
| 263 | def _get_pod_log(self, params): |
| 264 | ns = params[0] |
| 265 | name = params[1] |
| 266 | container_name = params[2] |
| 267 | # Get target log |
| 268 | _log_data = self.master.get_logs_for_pod( |
| 269 | name, |
| 270 | container_name, |
| 271 | ns, |
| 272 | tail_lines=self.tail_lines |
| 273 | ) |
| 274 | if len(_log_data) < 10: |
| 275 | return None |
| 276 | else: |
| 277 | return _log_data |
| 278 | |
| 279 | def collect_logs(self, pods_list): |
| 280 | |
| 281 | # Prepare list of pods to collect |
| 282 | # cmd = """ |
| 283 | # kubectl get pods -A -o=jsonpath='{range .items[*]} |
| 284 | # {.metadata.namespace}{"/"} |
| 285 | # {.metadata.name}{"\n"}{range .spec.containers[*]} {.name}{"\n"} |
| 286 | # {end}' |
| 287 | # """ |
| 288 | |
| 289 | logger_cli.info( |
| 290 | "# Collecting logs using {} threads".format( |
| 291 | self.env_config.sage_threads |
| 292 | ) |
| 293 | ) |
| 294 | # Do collect using pooled threads |
| 295 | pool = Pool(self.env_config.sage_threads) |
| 296 | _params = [] |
| 297 | # Prepare params for getting logs |
| 298 | for _ns, _pod_name, _cont_name in pods_list: |
| 299 | _params.append([_ns, _pod_name, _cont_name]) |
| 300 | # Init progress bar |
| 301 | total_log_items = len(_params) |
| 302 | log_item_index = 0 |
| 303 | _progress = Progress(total_log_items) |
| 304 | # Start pooled processing |
| 305 | results = pool.imap(self._get_pod_log, _params) |
| 306 | # Catch and parse results |
| 307 | while True: |
| 308 | try: |
| 309 | # use timeout as some of the request can hang |
| 310 | _r = results.next(timeout=10) |
| 311 | _namepath = "{}/{}:{}".format( |
| 312 | _params[log_item_index][0], |
| 313 | _params[log_item_index][1], |
| 314 | _params[log_item_index][2] |
| 315 | ) |
| 316 | |
| 317 | except StopIteration: |
| 318 | # end of pool |
| 319 | break |
| 320 | |
| 321 | except TimeoutError: |
| 322 | # report pod which hanged and ignore it |
| 323 | _progress.clearline() |
| 324 | logger_cli.warning( |
| 325 | "WARNING: Timeout waiting for log content {}".format( |
| 326 | _namepath |
| 327 | ) |
| 328 | ) |
| 329 | continue |
| 330 | if _r is not None: |
| 331 | _raw = _r |
| 332 | _size = len(_raw) |
| 333 | # Save detected data |
| 334 | self.logs[_namepath] = { |
| 335 | "ns": _params[log_item_index][0], |
| 336 | "pod_name": _params[log_item_index][1], |
| 337 | "container_name": _params[log_item_index][2], |
| 338 | "raw": _raw |
| 339 | } |
| 340 | |
| 341 | # print progress |
| 342 | _progress.write_progress( |
| 343 | log_item_index+1, |
| 344 | note="'{}': {} chars".format(_namepath, _size) |
| 345 | ) |
| 346 | |
| 347 | # track next param set |
| 348 | log_item_index += 1 |
| 349 | |
| 350 | _progress.end() |
| 351 | pool.close() |
| 352 | pool.join() |
| 353 | |
| 354 | # debug |
| 355 | # with open("logs_debug.json", "w+") as wf: |
| 356 | # wf.write(json.dumps(self.logs)) |
| 357 | # debug |
| 358 | return |
| 359 | |
| 360 | def _parse_kaas_timestamps(self): |
| 361 | # shortcut function to get array aligned index |
| 362 | def _get_group(match, key): |
| 363 | return match[self.kaas_ts_regex.groupindex[key]-1] |
| 364 | |
| 365 | logger_cli.info("-> Parsing kaas timestamps") |
| 366 | # iterate logs |
| 367 | _counter = 0 |
| 368 | _pbar = Progress(len(self.logs)) |
| 369 | for _namepath, _item in self.logs.items(): |
| 370 | # next progress bar item |
| 371 | _counter += 1 |
| 372 | _pbar.write_progress(_counter, note=_namepath) |
| 373 | # Get all lines from log matched |
| 374 | _matches = self.kaas_ts_regex.findall(_item.pop("raw")) |
| 375 | # iterate matches and parse timestamp |
| 376 | _log = [] |
| 377 | for _match in _matches: |
| 378 | # new log item |
| 379 | _log_line = _re_groups_as_dict(self.kaas_ts_regex, _match) |
| 380 | # parse ts from kaas |
| 381 | _pts = self._safe_parse_date( |
| 382 | _log_line["kaas_date"], |
| 383 | kaas_timestamp_re["date_fmt"], |
| 384 | note=_namepath |
| 385 | ) |
| 386 | _log_line["kaas_ts"] = _pts |
| 387 | |
| 388 | # save log item |
| 389 | _log.append(_log_line) |
| 390 | |
| 391 | # save pmessage and kaas_ts |
| 392 | _item["total_lines"] = len(_matches) |
| 393 | _item["log"] = _log |
| 394 | |
| 395 | _pbar.end() |
| 396 | return |
| 397 | |
| 398 | @staticmethod |
| 399 | def _get_matched(regex, line): |
| 400 | # Check if regex has matching groups number in last line |
| 401 | _c = regex["compiled"] |
| 402 | _m = _c.findall(line) |
| 403 | # calculate groups if there is a match |
| 404 | _group_count = len(_m[0]) if len(_m) > 0 else 0 |
| 405 | # Check than group count at least 2 |
| 406 | # and check that matched number of groups found |
| 407 | if _group_count > 1 and _group_count == len(_c.groupindex): |
| 408 | return _m |
| 409 | else: |
| 410 | return [] |
| 411 | |
| 412 | def _parse_log_item(self, log_item): |
| 413 | def _detect_re(): |
| 414 | _re = None |
| 415 | for regex in self.item_regex: |
| 416 | _m = self._get_matched(regex, _message) |
| 417 | if _m: |
| 418 | _re = regex |
| 419 | break |
| 420 | return _re, _m |
| 421 | |
| 422 | # parse whole log using detected pattern |
| 423 | l_parsed = 0 |
| 424 | l_not_parsed = 0 |
| 425 | l_skipped = 0 |
| 426 | _index = 0 |
| 427 | _li = log_item["log"] |
| 428 | _re = None |
| 429 | while _index < log_item["total_lines"]: |
| 430 | # pop message as there might be similar group name present |
| 431 | _message = _li[_index].pop("message") |
| 432 | # Parse line |
| 433 | _m = [] |
| 434 | # Try last regex for this item |
| 435 | if _re is not None: |
| 436 | _m = self._get_matched(_re, _message) |
| 437 | # if not matched |
| 438 | if not _m: |
| 439 | # Try every regex to match line format |
| 440 | # by counting groups detected |
| 441 | _re, _m = _detect_re() |
| 442 | if len(_m) == 1: |
| 443 | # get matches with group names as a dict |
| 444 | _new_line_items = \ |
| 445 | _re_groups_as_dict(_re["compiled"], _m[0]) |
| 446 | # update original dict |
| 447 | _li[_index].update(_new_line_items) |
| 448 | # Parse date |
| 449 | _pts = self._safe_parse_date( |
| 450 | _new_line_items["date"], |
| 451 | _re["date_fmt"] |
| 452 | ) |
| 453 | _li[_index]["ts"] = _pts |
| 454 | l_parsed += 1 |
| 455 | elif len(_m) == 0: |
| 456 | # put back message that failed to parse |
| 457 | _li[_index]["message"] = _message |
| 458 | l_not_parsed += 1 |
| 459 | else: |
| 460 | # Should never happen |
| 461 | logger_cli.warning( |
| 462 | "WARNING: Skipping ambigious log message: " |
| 463 | "'{}'".format(_message) |
| 464 | ) |
| 465 | l_skipped += 0 |
| 466 | # next line |
| 467 | _index += 1 |
| 468 | log_item["stats"] = { |
| 469 | "parsed": l_parsed, |
| 470 | "not_parsed": l_not_parsed, |
| 471 | "skipped": l_skipped |
| 472 | } |
| 473 | |
| 474 | def parse_logs(self): |
| 475 | |
| 476 | # debug: load precreated logs |
| 477 | # _ll = {} |
| 478 | # with open("logs_debug.json", "r+") as rf: |
| 479 | # _ll = json.loads(rf.read()) |
| 480 | # if _ll: |
| 481 | # self.logs = _ll |
| 482 | # debug: end |
| 483 | |
| 484 | # Get kaas ts as a plan B if log time either not present or not parsed |
| 485 | self._parse_kaas_timestamps() |
| 486 | |
| 487 | logger_cli.info("-> Parsing logs") |
| 488 | _p = Progress(len(self.logs)) |
| 489 | idx = 1 |
| 490 | totalParsed = 0 |
| 491 | totalNotParsed = 0 |
| 492 | # iterate raw logs and try to parse actual pod timing |
| 493 | for _namepath, _log in self.logs.items(): |
| 494 | # Update progress bar |
| 495 | _p.write_progress( |
| 496 | idx, |
| 497 | note="parsed: {}, not parsed: {}, => {}".format( |
| 498 | totalParsed, |
| 499 | totalNotParsed, |
| 500 | _namepath |
| 501 | ) |
| 502 | ) |
| 503 | # Parse log |
| 504 | self._parse_log_item(_log) |
| 505 | if self.dumppath != "null": |
| 506 | for line in _log["log"]: |
| 507 | if "date" not in line.keys(): |
| 508 | # Log line parsed |
| 509 | _filename = os.path.join( |
| 510 | self.dumppath, |
| 511 | _log["pod_name"]+"-"+_log["container_name"]+".log" |
| 512 | ) |
| 513 | with open(_filename, "a") as rawlogfile: |
| 514 | rawlogfile.write( |
| 515 | "<KTS>{} <M>{}\n".format( |
| 516 | line["kaas_date"], |
| 517 | line["message"] |
| 518 | ) |
| 519 | ) |
| 520 | # Stats |
| 521 | totalParsed += _log["stats"]["parsed"] |
| 522 | totalNotParsed += _log["stats"]["not_parsed"] |
| 523 | # Update progress bar |
| 524 | # _p.write_progress( |
| 525 | # idx, |
| 526 | # note="parsed: {}, not parsed: {}, => {}".format( |
| 527 | # totalParsed, |
| 528 | # totalNotParsed, |
| 529 | # _namepath |
| 530 | # ) |
| 531 | # ) |
| 532 | idx += 1 |
| 533 | _p.end() |
| 534 | |
| 535 | def merge_logs(self): |
| 536 | logger_cli.info("# Merging logs") |
| 537 | _merged = {} |
| 538 | for _pod, _logs in self.logs.items(): |
| 539 | for _li in _logs["log"]: |
| 540 | # Prepare log entry |
| 541 | _li["namepath"] = _pod |
| 542 | |
| 543 | # check if timestamp is detected |
| 544 | if "ts" not in _li: |
| 545 | # Use kaas_ts as a timestamp |
| 546 | _timestamp = _li.pop("kaas_ts") |
| 547 | else: |
| 548 | # get parsed timestamp |
| 549 | _timestamp = _li.pop("ts") |
| 550 | |
| 551 | # and put undetected lines separatelly |
| 552 | # save items using timestamps |
| 553 | _merged[float(_timestamp.timestamp())] = _li |
| 554 | self.merged_logs = _merged |
| 555 | return |
| 556 | |
| 557 | def save_logs(self, filename): |
| 558 | logger_cli.info("# Writing output file: '{}'".format(filename)) |
| 559 | with open(filename, 'w+') as ff: |
| 560 | _log_iter = sorted( |
| 561 | self.merged_logs.items(), key=lambda item: item[0] |
| 562 | ) |
| 563 | for k, v in _log_iter: |
| 564 | ff.write( |
| 565 | "{} {}: {}\n".format( |
| 566 | v.pop("namepath"), |
| 567 | datetime.fromtimestamp(k).strftime(_datetime_fmt), |
| 568 | " ".join(["{}={}".format(k, v) for k, v in v.items()]) |
| 569 | ) |
| 570 | ) |
| 571 | return |