Log collector module

New:
  - [Done] multiple namespace selector
  - [Done] keyword-based pod selector
  - [Done] per-pod logs syntax detection and parsing
  - [Differed] in-place filtering for shorter logs
  - [Done] individual logs timestamp detection
  - [Done] Unix time bases Timestamp sorting
  - [Done] Single file logs output using common format
  - [Done] add all log types from all MOS namespaces and pods

Update:
  - resource preparation can be skipped per module
  - updated log collection using multiple threads
  - new setting LOG_COLLECT_THREADS

Fixes:
  - Network MTU fix
  - Faster cmd execution on single pod
  - Ceph benchmark validations
  - Ceph benchmark report sorting
  - Daemonset deployment with nodes skipped
  - Network tree debugging script
  - Tree depth limiter, i.e. stackoverflow prevention

  Related-PROD: PROD-36845

Change-Id: Icf229ac62078c6418ab4dbdff12b0d27ed42af1d
diff --git a/cfg_checker/modules/ceph/__init__.py b/cfg_checker/modules/ceph/__init__.py
index 5c9357b..31c6b7a 100644
--- a/cfg_checker/modules/ceph/__init__.py
+++ b/cfg_checker/modules/ceph/__init__.py
@@ -1,9 +1,13 @@
 #    Author: Alex Savatieiev (osavatieiev@mirantis.com; a.savex@gmail.com)
 #    Copyright 2019-2022 Mirantis, Inc.
+import os
+
 from cfg_checker.agent.fio_runner import get_fio_options
-from cfg_checker.agent.fio_runner import seq_modes, mix_modes
+from cfg_checker.agent.fio_runner import seq_modes, mix_modes, rand_modes
 from cfg_checker.common import logger_cli
+from cfg_checker.common.other import utils
 from cfg_checker.common.settings import ENV_TYPE_KUBE
+from cfg_checker.common.exception import CheckerException
 from cfg_checker.helpers import args_utils
 from cfg_checker.modules.ceph import info, bench
 
@@ -33,6 +37,32 @@
 #     else:
 #         return _class
 
+
+def _validate_option_type(value, type_list):
+    _s, _t = utils.split_option_type(value)
+    if _t not in type_list:
+        raise CheckerException(
+            "Invalid option type '{}'. Expected types: {}".format(
+                value,
+                ", ".join(type_list)
+            )
+        )
+    else:
+        return
+
+
+def _validate_option(value, type_list):
+    if value not in type_list:
+        raise CheckerException(
+            "Invalid option '{}'. Expected one of: {}".format(
+                value,
+                ", ".join(type_list)
+            )
+        )
+    else:
+        return
+
+
 def _get_param_and_log(arg, param_str):
     _value = args_utils.get_arg(arg, param_str)
     logger_cli.info("    {}={}".format(param_str, _value))
@@ -242,6 +272,10 @@
     # dump results options
     _dump_path = args_utils.get_arg(args, "dump_path")
     if _dump_path:
+        if not os.path.exists(_dump_path):
+            raise CheckerException(
+                "ERROR: Dump path invalid: '{}'".format(_dump_path)
+            )
         logger_cli.info("# Results will be dumped to '{}'".format(_dump_path))
         config.bench_results_dump_path = _dump_path
     else:
@@ -345,6 +379,38 @@
     # init the Bench class
     ceph_bench = bench.KubeCephBench(config)
     ceph_bench.set_ceph_info_class(ceph_info)
+
+    # Validate options
+    logger_cli.info("-> Validating options")
+    # size
+    _validate_option_type(_opts["size"], ["G", "M"])
+    _validate_option_type(_opts["ramp_time"], ["s", "m"])
+    _validate_option_type(_opts["runtime"], ["s", "m"])
+    _modes = seq_modes + mix_modes + rand_modes
+    _validate_option(_opts["readwrite"], _modes)
+
+    if _task_file:
+        _s, _ = utils.split_option_type(_opts["size"])
+        for idx in range(len(ceph_bench.tasks)):
+            # size
+            _ts, _ = utils.split_option_type(ceph_bench.tasks[idx]["size"])
+            if _s < _ts:
+                logger_cli.error(
+                    "ERROR: Task #{} file size is to big:"
+                    " {} (volume) < {} (testfile)".format(
+                        idx,
+                        _opts["size"],
+                        ceph_bench.tasks[idx]["size"]
+                    )
+                )
+            # other
+            _validate_option(ceph_bench.tasks[idx]["readwrite"], _modes)
+        # Print defaults
+        logger_cli.debug("... default/selected options for fio:")
+        for _k in _opts.keys():
+            # TODO: Update options for single run
+            logger_cli.debug("    {} = {}".format(_k, _opts[_k]))
+
     # Preload previous results for this name
     ceph_bench.preload_results()
     # Do the testrun
diff --git a/cfg_checker/modules/ceph/bench.py b/cfg_checker/modules/ceph/bench.py
index f5af704..b79007c 100644
--- a/cfg_checker/modules/ceph/bench.py
+++ b/cfg_checker/modules/ceph/bench.py
@@ -10,6 +10,7 @@
 from cfg_checker.common import logger_cli
 from cfg_checker.common.decorators import retry
 from cfg_checker.common.file_utils import write_str_to_file
+from cfg_checker.common.other import utils
 from cfg_checker.helpers.console_utils import Progress
 from cfg_checker.helpers.console_utils import cl_typewriter
 from cfg_checker.reports import reporter
@@ -45,19 +46,6 @@
     return {}
 
 
-def _split_vol_size(size):
-    # I know, but it is faster then regex
-    _numbers = [48, 49, 50, 51, 52, 53, 54, 55, 56, 57]
-    _s_int = "0"
-    _s_type = ""
-    for ch in size:
-        if ord(ch) in _numbers:
-            _s_int += ch
-        else:
-            _s_type += ch
-    return int(_s_int), _s_type
-
-
 class CephBench(object):
     _agent_template = "cfgagent-template.yaml"
 
@@ -165,7 +153,7 @@
         logger_cli.info("# Preparing {} agents".format(self.agent_count))
         # Increase volume size a bit, so datafile fits
         _quanitizer = 1.3
-        _v_size, _vol_size_units = _split_vol_size(options['size'])
+        _v_size, _vol_size_units = utils.split_option_type(options['size'])
         _v_size = round(_v_size * _quanitizer)
         _vol_size = str(_v_size) + _vol_size_units + "i"
         logger_cli.info(
diff --git a/cfg_checker/modules/ceph/info.py b/cfg_checker/modules/ceph/info.py
index 2c62018..db3dd75 100644
--- a/cfg_checker/modules/ceph/info.py
+++ b/cfg_checker/modules/ceph/info.py
@@ -313,49 +313,58 @@
         self._safe_tools_cmd("rm -f " + _tar_path)
         return _json
 
-    def _safe_get_cmd_output_as_json(self, cmd, zipped=False):
-        if zipped:
-            _buf = self._safe_tools_cmd_zipped_output(cmd)
-        else:
-            _buf = self._safe_tools_cmd(cmd)
+    @staticmethod
+    def _as_json(buf):
         try:
-            return json.loads(_buf)
+            return json.loads(buf)
         except ValueError as e:
             _out = ""
-            if len(_buf) > 512:
-                _out = _buf[:512]
+            if len(buf) > 512:
+                _out = buf[:512]
                 _out += "..."
             else:
-                _out = _buf
+                _out = buf
             logger_cli.error(
                 "\nERROR: failed to parse json: '{}'. Data: '{}'".format(
                     e,
                     _out
                 )
             )
-            return _buf
+            return buf
+
+    def _safe_get_cmd_output_as_json(self, cmd, zipped=False):
+        if zipped:
+            _buf = self._safe_tools_cmd_zipped_output(cmd)
+        else:
+            _buf = self._safe_tools_cmd(cmd)
+        return self._as_json(_buf)
 
     def _get_tools_pod_name(self):
         # get ceph pod
-        _names = self.master.kube.get_pod_names_by_partial_name(
+        _pods = self.master.kube.get_pods_by_partial_name(
             self.ceph_app_label,
             self.ceph_ns
         )
-        if not _names:
+        # _names = self.master.kube.get_pod_names_by_partial_name(
+        #     self.ceph_app_label,
+        #     self.ceph_ns
+        # )
+        if not _pods:
             raise KubeException(
                 "Failed to find pod using '{}'".format(self.ceph_app_label)
             )
-        elif len(_names) > 1:
+        elif len(_pods) > 1:
             logger_cli.warning(
                 "WARNING: Environment has more than one pod "
                 "with '{}' app: {}".format(
                     self.ceph_app_label,
-                    ", ".join(_names)
+                    ", ".join([p.metadata.name for p in _pods])
                 )
             )
         else:
-            logger_cli.debug("... found '{}'".format(_names[0]))
-        return _names[0]
+            logger_cli.debug("... found '{}'".format(_pods[0].metadata.name))
+        self.ceph_pod = _pods[0]
+        return _pods[0].metadata.name
 
     def _add_ceph_info_item(self, key, title, data, filename=None):
         # handle data
@@ -572,8 +581,7 @@
         _health_metrics = {}
         _devices = _c("ceph device ls")
         _devices = _devices.splitlines()
-        _progress = Progress(len(_devices)-1)
-        _index = 1
+        cmd_list = []
         for device in _devices:
             _t = device.split()
             _dev = _t[0]
@@ -582,14 +590,31 @@
 
             if _dev == "DEVICE":
                 continue
-            _metric = _cj("ceph device get-health-metrics {}".format(_dev))
+            # _metric = _cj("ceph device get-health-metrics {}".format(_dev))
+            _cmd = "ceph device get-health-metrics {}".format(_dev)
+            cmd_list.append(_cmd)
             _dev_name = "{}_{}".format(_osd, _dev)
-            _health_metrics[_dev_name] = _metric
+            _health_metrics[_dev_name] = {}
             _health_metrics[_dev_name]['node_name'] = _node
             _health_metrics[_dev_name]['osd_name'] = _osd
-            _progress.write_progress(_index, note=_dev_name)
-            _index += 1
-        _progress.end()
+            _health_metrics[_dev_name]['cmd'] = _cmd
+
+        results = self.master.exec_cmds_on_pod(
+            self.ceph_pod,
+            cmd_list
+        )
+
+        logger_cli.info("-> Processing results")
+        for _r in results:
+            _cmd = _r[3]
+            _j = self._as_json(_r[2])
+            for _dev_name in _health_metrics.keys():
+                if "cmd" in _health_metrics[_dev_name] and \
+                  _health_metrics[_dev_name]["cmd"] == _cmd:
+                    _health_metrics[_dev_name].update(_j)
+                    _health_metrics[_dev_name].pop("cmd")
+                    break
+
         self._add_ceph_info_item(
             "ceph_health",
             "Ceph Health Metrics",
@@ -633,21 +658,29 @@
         logger_cli.info(
             "-> Gathering OSD configuration ({})".format(_total_osd)
         )
-        # Shortcuts
-        # _c = self._safe_tools_cmd
-        _cj = self._safe_get_cmd_output_as_json
-        _progress = Progress(_total_osd)
-        _idx = 1
-        _cfgs = {}
+        cmds = {}
+        cmd_list = []
         for _osd in self.ceph_info["ceph_osd_df"]["data"]["nodes"]:
-            _progress.write_progress(_idx, note=_osd["name"])
-            _cfgs[_osd["name"]] = _cj(
-                "ceph config show-with-defaults -f json {}".format(
-                    _osd["name"]
-                )
+            _cmd = "ceph config show-with-defaults -f json {}".format(
+                _osd["name"]
             )
-            _idx += 1
-        _progress.end()
+            cmd_list.append(_cmd)
+            cmds[_osd["name"]] = _cmd
+
+        results = self.master.exec_cmds_on_pod(
+            self.ceph_pod,
+            cmd_list
+        )
+
+        logger_cli.info("-> Processing results")
+        _cfgs = {}
+        for _r in results:
+            _cmd = _r[3]
+            _j = self._as_json(_r[2])
+            for _osd_name in cmds.keys():
+                if cmds[_osd_name] == _cmd:
+                    _cfgs[_osd_name] = _j
+                    break
 
         # Process configs
         _base = {}