Log collector module
New:
- [Done] multiple namespace selector
- [Done] keyword-based pod selector
- [Done] per-pod logs syntax detection and parsing
- [Differed] in-place filtering for shorter logs
- [Done] individual logs timestamp detection
- [Done] Unix time bases Timestamp sorting
- [Done] Single file logs output using common format
- [Done] add all log types from all MOS namespaces and pods
Update:
- resource preparation can be skipped per module
- updated log collection using multiple threads
- new setting LOG_COLLECT_THREADS
Fixes:
- Network MTU fix
- Faster cmd execution on single pod
- Ceph benchmark validations
- Ceph benchmark report sorting
- Daemonset deployment with nodes skipped
- Network tree debugging script
- Tree depth limiter, i.e. stackoverflow prevention
Related-PROD: PROD-36845
Change-Id: Icf229ac62078c6418ab4dbdff12b0d27ed42af1d
diff --git a/cfg_checker/nodes.py b/cfg_checker/nodes.py
index 2e55b63..7c09690 100644
--- a/cfg_checker/nodes.py
+++ b/cfg_checker/nodes.py
@@ -509,9 +509,15 @@
self._configmap_name = self.env_config.kube_scripts_folder
# prepare needed resources
- self._check_namespace()
- self._scripts = self._check_config_map()
self.prepared_daemonsets = []
+ # Check if we need resources prepared
+ if not config.prepare_qa_resources:
+ logger_cli.debug("... skipped preparing resources")
+ self._scripts = None
+ return
+ else:
+ self._check_namespace()
+ self._scripts = self._check_config_map()
def _check_namespace(self):
# ensure namespace
@@ -1013,7 +1019,7 @@
ds.metadata.name
)
- _total = len(self.nodes)
+ _total = len(self.nodes) - len(self.skip_list)
# _scheduled = _ds.status.scheduled
# _ready = _ds.status.ready
@@ -1082,7 +1088,8 @@
def exec_cmd_on_target_pod(self, pod_name, ns, command_str):
"""
- Run script from configmap on target pod assuming it is present
+ Run cmd on target pod
+
"""
_result = self.kube.exec_on_target_pod(
command_str,
@@ -1110,7 +1117,9 @@
)
# Update results
_ds_results = {}
- for _n, _, _v in _results:
+ # only node name and result is needed
+ # pod name and cmd ignored
+ for _n, _, _v, _ in _results:
_ds_results[_n] = _v
return _ds_results
@@ -1135,14 +1144,7 @@
_pod_results[_p] = _v
return _pod_results
- def exec_cmd_on_pods(
- self,
- pod_list,
- cmd,
- _args=None,
- is_script=False,
- silent=False
- ):
+ def _pooled_exec_on_pod(self, plist, silent=False):
def _kube_exec_on_pod(plist):
return [
plist[1], # node
@@ -1154,8 +1156,47 @@
strict=True,
_request_timeout=120,
arguments=plist[5]
- )
+ ),
+ # save cmd used
+ plist[4]
]
+ # map func and cmd
+ pool = Pool(self.env_config.threads)
+ _results = []
+ self.not_responded = []
+ # create result list
+ if not silent:
+ _progress = Progress(len(plist))
+ ret = pool.imap_unordered(_kube_exec_on_pod, plist)
+
+ for ii in enumerate(ret, start=1):
+ if not ii[1][1]:
+ self.not_responded.append(ii[1][0])
+ else:
+ _results.append(ii[1])
+ if not silent:
+ _progress.write_progress(ii[0])
+
+ if not silent:
+ _progress.end()
+ pool.close()
+ pool.join()
+ logger_cli.debug(
+ "... done, {} total outputs; {} not responded".format(
+ len(_results),
+ len(self.not_responded)
+ )
+ )
+ return _results
+
+ def exec_cmd_on_pods(
+ self,
+ pod_list,
+ cmd,
+ _args=None,
+ is_script=False,
+ silent=False
+ ):
# Create map for threads: [[node_name, ns, pod_name, cmd]...]
logger_cli.debug(
@@ -1196,34 +1237,36 @@
]
)
- # map func and cmd
- pool = Pool(self.env_config.threads)
- _results = []
- self.not_responded = []
- # create result list
- if not silent:
- _progress = Progress(len(_plist))
- ret = pool.imap_unordered(_kube_exec_on_pod, _plist)
+ return self._pooled_exec_on_pod(_plist, silent=silent)
- for ii in enumerate(ret, start=1):
- if not ii[1][1]:
- self.not_responded.append(ii[1][0])
- else:
- _results.append(ii[1])
- if not silent:
- _progress.write_progress(ii[0])
-
- if not silent:
- _progress.end()
- pool.close()
- pool.join()
+ def exec_cmds_on_pod(self, pod, cmd_list):
logger_cli.debug(
- "... done, {} total outputs; {} not responded".format(
- len(_results),
- len(self.not_responded)
+ "... runnning {} cmds using {} threads at a time".format(
+ len(cmd_list),
+ self.env_config.threads
)
)
- return _results
+ _plist = []
+ # decide if we are to wrap it to bash
+ for item in cmd_list:
+ if '|' in item:
+ _cmd = "bash -c"
+ _arguments = item
+ else:
+ _cmd = item
+ _arguments = ""
+ _plist.append(
+ [
+ self,
+ pod.spec.node_name,
+ pod.metadata.namespace,
+ pod.metadata.name,
+ _cmd,
+ _arguments
+ ]
+ )
+
+ return self._pooled_exec_on_pod(_plist)
def delete_daemonset(self, ds):
# Try to delete daemonset
@@ -1446,5 +1489,40 @@
return None
return [[i.metadata.namespace, i.metadata.name] for i in _items.items]
- def get_logs_for_pod(self, podname, namespace):
- return self.kube.get_pod_logs(podname, namespace)
+ def list_pod_names_with_containers(self, ns="qa-space", running_only=True):
+ _result = []
+ _pods = self.kube.list_pods(ns)
+ if not running_only:
+ for i in _pods.items:
+ _result.append([
+ i.metadata.namespace,
+ i.metadata.name,
+ [c.name for c in i.spec.containers]
+ ])
+ else:
+ for i in _pods.items:
+ if i.status.phase == "Running":
+ _result.append([
+ i.metadata.namespace,
+ i.metadata.name,
+ [c.name for c in i.status.container_statuses
+ if c.state.running is not None]
+ ])
+ return _result
+
+ def get_logs_for_pod(self, podname, container, namespace, tail_lines):
+ try:
+ return self.kube.get_pod_logs(
+ podname,
+ container,
+ namespace,
+ tail_lines=tail_lines
+ )
+ except KubeException as e:
+ logger_cli.warning(
+ "WARNING: Log retrieval failed: '{}'".format(e.message)
+ )
+ return ""
+
+ def list_namespaces(self):
+ return [i.metadata.name for i in self.kube.list_namespaces().items]