Network check for MCC/MOS
- Network info gathering using DaemonSet with 'hostNetwork=True'
- DaemonSet handling routines
- Mapper and Checker refactoring for Kube
Fixes
- SSH timeouts handling using env vars
MCP_SSH_TIMEOUT when connecting
MCP_SCRIPT_RUN_TIMEOUT when running command
- Progress class supports 0 as an index
Related-PROD: PROD-36575
Change-Id: Ie03a9051007eeb788901acae3696ea2bfdfe33e2
diff --git a/cfg_checker/nodes.py b/cfg_checker/nodes.py
index f0b7b38..0559132 100644
--- a/cfg_checker/nodes.py
+++ b/cfg_checker/nodes.py
@@ -1,7 +1,9 @@
import json
import os
+import yaml
from copy import deepcopy
from multiprocessing.dummy import Pool
+from time import sleep
from cfg_checker.clients import get_salt_remote, get_kube_remote
from cfg_checker.common.const import all_salt_roles_map, all_kube_roles_map
@@ -455,7 +457,7 @@
self.not_responded = [_n for _n in _r.keys() if not _r[_n]]
return _r
- def execute_script_on_active_nodes(self, script_filename, args=[]):
+ def execute_script_on_active_nodes(self, script_filename, args=None):
# Prepare path
_target_path = os.path.join(
'/root',
@@ -466,7 +468,7 @@
# execute script
logger_cli.debug("... running script")
# handle results for each node
- _script_arguments = " ".join(args) if args else ""
+ _script_arguments = args if args else ""
self.not_responded = []
_r = self.salt.cmd(
self.active_nodes_compound,
@@ -501,6 +503,34 @@
# simple salt rest client
self.kube = get_kube_remote(self.env_config)
self.env_type = ENV_TYPE_KUBE
+ self._namespace = "qa-space"
+ self._configmap_name = self.env_config.kube_scripts_folder
+
+ # prepare needed resources
+ self._check_namespace()
+ self._scripts = self._check_config_map()
+
+ def _check_namespace(self):
+ # ensure namespace
+ logger_cli.debug(
+ "... checking namespace '{}'".format(self._namespace)
+ )
+ if not self.kube.ensure_namespace(self._namespace):
+ raise KubeException(
+ "Failed to manage namespace '{}'".format(self._namespace)
+ )
+
+ def _check_config_map(self):
+ # ensure config map exists
+ logger_cli.debug(
+ "... checking config map '{}'".format(self._configmap_name)
+ )
+ _source = os.path.join(pkg_dir, 'scripts')
+ return self.kube.create_config_map(
+ self._namespace,
+ self._configmap_name,
+ _source
+ )
def gather_node_info(self, skip_list, skip_list_file):
# Gather nodes info and query pod lists for each node
@@ -520,7 +550,6 @@
# iterate through all accepted nodes and create a dict for it
self.nodes = {}
self.skip_list = []
- # _domains = set()
for _name in _node_names:
if _name in _skipped_nodes:
_status = NODE_SKIP
@@ -552,7 +581,6 @@
_nodes[_name]['addresses']['hostname']['address']
self.nodes[_name]['internalip'] = \
_nodes[_name]['addresses']['internalip']['address']
- # _domains.add(_name.split(".", 1)[1])
self.nodes[_name]['node_group'] = None
self.nodes[_name]['labels'] = _labels
self.nodes[_name]['roles'] = _roles
@@ -579,7 +607,7 @@
# "Multiple domains detected: {}".format(",".join(_domains))
# )
# else:
- # self.domain = _domains[0]
+ self.domain = "no.domain.in.kube.yet"
logger_cli.info(
"-> {} nodes collected: {} - active, {} - not active".format(
len(self.nodes),
@@ -624,19 +652,20 @@
return
@staticmethod
- def _get_ssh_shell(_h, _u, _k, _p, _q, _pipe):
+ def _get_ssh_shell(_h, _u, _k, _p, _q, _pipe, timeout=15):
_ssh = SshShell(
_h,
user=_u,
keypath=_k,
port=_p,
silent=_q,
- piped=_pipe
+ piped=_pipe,
+ timeout=timeout
)
return _ssh.connect()
@staticmethod
- def _do_ssh_cmd(_cmd, _h, _u, _k, _p, _q, _pipe):
+ def _do_ssh_cmd(_cmd, _h, _u, _k, _p, _q, _pipe, timeout=None):
with SshShell(
_h,
user=_u,
@@ -645,7 +674,10 @@
silent=_q,
piped=_pipe
) as ssh:
- _r = ssh.do(_cmd)
+ if timeout is None:
+ _r = ssh.do(_cmd)
+ else:
+ _r = ssh.do(_cmd, timeout=timeout)
logger_cli.debug("'{}'".format(_r))
return _r
@@ -662,7 +694,11 @@
_h = self.nodes[node]['internalip']
_p = 22
if self.kube.is_local or self.kube.config.ssh_direct:
- return None, self._get_ssh_shell(_h, _u, _k, _p, silent, piped)
+ return None,
+ self._get_ssh_shell(
+ _h, _u, _k, _p, silent, piped,
+ timeout=self.kube.config.ssh_connect_timeout
+ )
else:
_fh = "localhost"
_p = 10022 if not fport else fport
@@ -671,10 +707,19 @@
_h,
user=_u,
keypath=self.env_config.ssh_key,
- loc_port=_p
+ loc_port=_p,
+ timeout=self.kube.config.ssh_connect_timeout
)
_pfwd.connect()
- _ssh = self._get_ssh_shell(_fh, _u, _k, _p, silent, piped)
+ _ssh = self._get_ssh_shell(
+ _fh,
+ _u,
+ _k,
+ _p,
+ silent,
+ piped,
+ timeout=self.kube.config.ssh_connect_timeout
+ )
return _pfwd, _ssh
def execute_script_on_node(self, node, script_filename, args=[]):
@@ -725,7 +770,7 @@
return _r
- def _exec_script(self, params):
+ def _ssh_exec_script(self, params):
"""
Threadsafe method to get shell to node,
check/copy script and get results
@@ -737,6 +782,7 @@
args
]
"""
+ _timeout = self.kube.config.script_execution_timeout
_name = params[0]
_src = params[1]
_tgt = params[2]
@@ -757,7 +803,7 @@
_sh.do(_check.format(_python))
)
if not _python:
- _sh.do("apt install python3", sudo=True)
+ _sh.do("apt install python3", sudo=True, timeout=_timeout)
# check if script already there
_folder = os.path.join(
self.env_config.kube_node_homepath,
@@ -789,7 +835,8 @@
_tgt,
_args
),
- sudo=True
+ sudo=True,
+ timeout=_timeout
)
if _fwd_sh:
@@ -798,7 +845,7 @@
return [_name, _out]
- def execute_script_on_active_nodes(self, script_filename, args=[]):
+ def execute_script_on_active_nodes(self, script_filename, args=None):
# Prepare script
_source_path = os.path.join(pkg_dir, 'scripts', script_filename)
_target_path = os.path.join(
@@ -843,7 +890,7 @@
_port += 1
_progress = Progress(len(_params))
- results = pool.imap_unordered(self._exec_script, _params)
+ results = pool.imap_unordered(self._ssh_exec_script, _params)
for ii in enumerate(results, start=1):
if not ii[1][1]:
@@ -904,3 +951,188 @@
_fwd_sh.kill()
_sh.kill()
return _target_path
+
+ def prepare_daemonset(self, template_filename, config_map=None):
+ # load template
+ _yaml_file = os.path.join(pkg_dir, 'templates', template_filename)
+ logger_cli.debug("... loading template '{}'".format(_yaml_file))
+ _ds = {}
+ with open(_yaml_file) as dsFile:
+ _ds = yaml.load(dsFile, Loader=yaml.SafeLoader)
+
+ # Add scripts to pod template as volumeMounts
+ _tspec = _ds['spec']['template']['spec']
+ _tspec['containers'][0]['volumeMounts'] = [
+ {
+ "name": "scripts",
+ "mountPath": os.path.join(
+ "/",
+ self.env_config.kube_scripts_folder
+ )
+ }
+ ]
+
+ _tspec['volumes'] = [
+ {
+ "name": "scripts",
+ "configMap": {
+ "name": self._configmap_name
+ }
+ }
+ ]
+
+ # create daemonset
+ logger_cli.debug("... preparing daemonset")
+ return self.kube.prepare_daemonset_from_yaml(self._namespace, _ds)
+
+ def wait_for_daemonset(self, ds, timeout=120):
+ # iteration timeout
+ _sleep_time = 5
+ _timeout = timeout
+
+ # query daemonset and check that desired=scheduled=ready
+ _ds = self.kube.get_daemon_set_by_name(
+ ds.metadata.namespace,
+ ds.metadata.name
+ )
+
+ _total = len(self.nodes)
+ # _scheduled = _ds.status.scheduled
+ # _ready = _ds.status.ready
+
+ # Init Progress bar to show daemonset readiness
+ _progress = Progress(_total)
+ while _timeout > 0:
+ # get new status
+ _ds = self.kube.get_daemon_set_by_name(
+ ds.metadata.namespace,
+ ds.metadata.name
+ )
+ _desired = _ds.status.desired_number_scheduled
+ _scheduled = _ds.status.current_number_scheduled
+ _ready = _ds.status.number_ready
+ _updated = _ds.status.updated_number_scheduled
+ # print it
+ _progress.write_progress(
+ _ready,
+ note="desired: {}, scheduled: {}, ready: {},"
+ " up-to-date: {}".format(
+ _desired,
+ _scheduled,
+ _ready,
+ _updated
+ )
+ )
+
+ # check values and return
+ # In case of Update, also checking _updated value
+ if _ready == _updated and _ready == _total:
+ # close progress bar class
+ _progress.end()
+ logger_cli.debug("... daemonset is ready")
+ return True
+ # iterate
+ _timeout -= _sleep_time
+ # wait
+ sleep(_sleep_time)
+
+ # timed out
+ _progress.end()
+ # log it
+ logger_cli.error("Timed out waiting for Daemonset to be ready")
+ return False
+
+ def execute_script_on_daemon_set(self, ds, script_filename, args=None):
+ """
+ Query daemonset for pods and execute script on all of them
+ """
+ def _kube_exec_on_pod(plist):
+ return [
+ plist[1], # node
+ plist[3], # pod name
+ plist[0].kube.exec_on_target_pod( # pointer to function
+ plist[4], # cmd
+ plist[3], # pod name
+ plist[2], # namespace
+ strict=True,
+ _request_timeout=120,
+ )
+ ]
+
+ # get all pod names
+ logger_cli.debug("... extracting pod names from daemonset")
+ _pods = self.kube.CoreV1.list_namespaced_pod(
+ namespace=ds.metadata.namespace,
+ label_selector='name={}'.format(ds.metadata.name)
+ )
+ # Create map for threads: [[node_name, ns, pod_name, cmd]...]
+ logger_cli.debug(
+ "... runnning script on {} pods using {} threads at a time".format(
+ len(_pods.items),
+ self.env_config.threads
+ )
+ )
+ _plist = []
+ _arguments = args if args else ""
+ _cmd = [
+ "python3",
+ os.path.join(
+ "/",
+ self.env_config.kube_scripts_folder,
+ script_filename
+ ),
+ _arguments
+ ]
+ _cmd = " ".join(_cmd)
+ for item in _pods.items:
+ _plist.append(
+ [
+ self,
+ item.spec.node_name,
+ item.metadata.namespace,
+ item.metadata.name,
+ _cmd
+ ]
+ )
+
+ # map func and cmd
+ logger_cli
+ pool = Pool(self.env_config.threads)
+ _results = {}
+ self.not_responded = []
+ # create result list
+ _progress = Progress(len(_plist))
+ ret = pool.imap_unordered(_kube_exec_on_pod, _plist)
+
+ for ii in enumerate(ret, start=1):
+ if not ii[1][1]:
+ self.not_responded.append(ii[1][0])
+ else:
+ _results[ii[1][0]] = ii[1][2]
+ _progress.write_progress(ii[0])
+
+ _progress.end()
+ pool.close()
+ pool.join()
+ logger_cli.debug(
+ "... done, {} total outputs; {} not responded".format(
+ len(_results),
+ len(self.not_responded)
+ )
+ )
+ return _results
+
+ def delete_daemonset(self, ds):
+ # Try to delete daemonset
+ try:
+ _r = self.kube.delete_daemon_set_by_name(
+ ds.metadata.namespace,
+ ds.metadata.name
+ )
+ except Exception as e:
+ logger_cli.warning("Failed to delete daemonset '{}': {}".format(
+ ds.metadata.name,
+ e.reason
+ ))
+ _r = None
+ return _r