Network check for MCC/MOS
- Network info gathering using DaemonSet with 'hostNetwork=True'
- DaemonSet handling routines
- Mapper and Checker refactoring for Kube
Fixes
- SSH timeouts handling using env vars
MCP_SSH_TIMEOUT when connecting
MCP_SCRIPT_RUN_TIMEOUT when running command
- Progress class supports 0 as an index
Related-PROD: PROD-36575
Change-Id: Ie03a9051007eeb788901acae3696ea2bfdfe33e2
diff --git a/cfg_checker/common/kube_utils.py b/cfg_checker/common/kube_utils.py
index 315f5ee..86e59a5 100644
--- a/cfg_checker/common/kube_utils.py
+++ b/cfg_checker/common/kube_utils.py
@@ -202,6 +202,8 @@
def __init__(self, config):
super(KubeRemote, self).__init__(config)
self._coreV1 = None
+ self._appsV1 = None
+ self._podV1 = None
@property
def CoreV1(self):
@@ -209,6 +211,18 @@
self._coreV1 = kclient.CoreV1Api(self.kApi)
return self._coreV1
+ @property
+ def AppsV1(self):
+ if not self._appsV1:
+ self._appsV1 = kclient.AppsV1Api(self.kApi)
+ return self._appsV1
+
+ @property
+ def PodsV1(self):
+ if not self._podsV1:
+ self._podsV1 = kclient.V1Pod(self.kApi)
+ return self._podsV1
+
@staticmethod
def _typed_list_to_dict(i_list):
_dict = {}
@@ -227,6 +241,14 @@
return _list
+ @staticmethod
+ def safe_get_item_by_name(api_resource, _name):
+ for item in api_resource.items:
+ if item.metadata.name == _name:
+ return item
+
+ return None
+
def get_node_info(self, http=False):
# Query API for the nodes and do some presorting
_nodes = {}
@@ -279,15 +301,14 @@
_request_timeout=120,
**kwargs
):
- logger_cli.debug(
- "... searching for pods with the name '{}'".format(pod_name)
- )
- _pods = {}
- _pods = self._coreV1.list_namespaced_pod(namespace)
- _names = self._get_listed_attrs(_pods.items, "metadata.name")
-
- _pname = ""
if not strict:
+ logger_cli.debug(
+ "... searching for pods with the name '{}'".format(pod_name)
+ )
+ _pods = {}
+ _pods = self._coreV1.list_namespaced_pod(namespace)
+ _names = self._get_listed_attrs(_pods.items, "metadata.name")
+ _pname = ""
_pnames = [n for n in _names if n.startswith(pod_name)]
if len(_pnames) > 1:
logger_cli.debug(
@@ -309,7 +330,11 @@
cmd
)
)
- _r = stream(
+ # Set preload_content to False to preserve JSON
+ # If not, output gets converted to str
+ # Which causes to change " to '
+ # After that json.loads(...) fail
+ _pod_stream = stream(
self.CoreV1.connect_get_namespaced_pod_exec,
_pname,
namespace,
@@ -319,7 +344,146 @@
stdout=True,
tty=False,
_request_timeout=_request_timeout,
+ _preload_content=False,
**kwargs
)
+ # run for timeout
+ _pod_stream.run_forever(timeout=_request_timeout)
+ # read the output
+ return _pod_stream.read_stdout()
- return _r
+ def ensure_namespace(self, ns):
+ """
+ Ensure that given namespace exists
+ """
+ # list active namespaces
+ _v1NamespaceList = self.CoreV1.list_namespace()
+ _ns = self.safe_get_item_by_name(_v1NamespaceList, ns)
+
+ if _ns is None:
+ logger_cli.debug("... creating namespace '{}'".format(ns))
+ _r = self.CoreV1.create_namespace(ns)
+ # TODO: check return on fail
+ if not _r:
+ return False
+ else:
+ logger_cli.debug("... found existing namespace '{}'".format(ns))
+
+ return True
+
+ def get_daemon_set_by_name(self, ns, name):
+ return self.safe_get_item_by_name(
+ self.AppsV1.list_namespaced_daemon_set(ns),
+ name
+ )
+
+ def create_config_map(self, ns, name, source, recreate=True):
+ """
+ Creates/Overwrites ConfigMap in working namespace
+ """
+ # Prepare source
+ logger_cli.debug(
+ "... preparing config map '{}/{}' with files from '{}'".format(
+ ns,
+ name,
+ source
+ )
+ )
+ _data = {}
+ if os.path.isfile(source):
+ # populate data with one file
+ with open(source, 'rt') as fS:
+ _data[os.path.split(source)[1]] = fS.read()
+ elif os.path.isdir(source):
+ # walk dirs and populate all 'py' files
+ for path, dirs, files in os.walk(source):
+ _e = ('.py')
+ _subfiles = (_fl for _fl in files
+ if _fl.endswith(_e) and not _fl.startswith('.'))
+ for _file in _subfiles:
+ with open(os.path.join(path, _file), 'rt') as fS:
+ _data[_file] = fS.read()
+
+ _cm = kclient.V1ConfigMap()
+ _cm.metadata = kclient.V1ObjectMeta(name=name, namespace=ns)
+ _cm.data = _data
+ logger_cli.debug(
+ "... prepared config map with {} scripts".format(len(_data))
+ )
+ # Query existing configmap, delete if needed
+ _existing_cm = self.safe_get_item_by_name(
+ self.CoreV1.list_namespaced_config_map(namespace=ns),
+ name
+ )
+ if _existing_cm is not None:
+ self.CoreV1.replace_namespaced_config_map(
+ namespace=ns,
+ name=name,
+ body=_cm
+ )
+ logger_cli.debug(
+ "... replaced existing config map '{}/{}'".format(
+ ns,
+ name
+ )
+ )
+ else:
+ # Create it
+ self.CoreV1.create_namespaced_config_map(
+ namespace=ns,
+ body=_cm
+ )
+ logger_cli.debug("... created config map '{}/{}'".format(
+ ns,
+ name
+ ))
+
+ return _data.keys()
+
+ def prepare_daemonset_from_yaml(self, ns, ds_yaml):
+ _name = ds_yaml['metadata']['name']
+ _ds = self.get_daemon_set_by_name(ns, _name)
+
+ if _ds is not None:
+ logger_cli.debug(
+ "... found existing daemonset '{}'".format(_name)
+ )
+ _r = self.AppsV1.replace_namespaced_daemon_set(
+ _ds.metadata.name,
+ _ds.metadata.namespace,
+ body=ds_yaml
+ )
+ logger_cli.debug(
+ "... replacing existing daemonset '{}'".format(_name)
+ )
+ return _r
+ else:
+ logger_cli.debug(
+ "... creating daemonset '{}'".format(_name)
+ )
+ _r = self.AppsV1.create_namespaced_daemon_set(ns, body=ds_yaml)
+ return _r
+
+ def delete_daemon_set_by_name(self, ns, name):
+ return self.AppsV1.delete_namespaced_daemon_set(name, ns)
+
+ def exec_on_all_pods(self, pods):
+ """
+ Create multiple threads to execute script on all target pods
+ """
+ # Create map for threads: [[node_name, ns, pod_name]...]
+ _pod_list = []
+ for item in pods.items:
+ _pod_list.append(
+ [
+ item.spec.nodeName,
+ item.metadata.namespace,
+ item.metadata.name
+ ]
+ )
+
+ # map func and cmd
+
+ # create result list
+
+ return []
diff --git a/cfg_checker/common/settings.py b/cfg_checker/common/settings.py
index dac917e..eac81c1 100644
--- a/cfg_checker/common/settings.py
+++ b/cfg_checker/common/settings.py
@@ -194,10 +194,16 @@
self.ssh_key = os.environ.get('MCP_SSH_KEY', None)
self.ssh_user = os.environ.get('MCP_SSH_USER', None)
self.ssh_host = os.environ.get('MCP_SSH_HOST', None)
+ self.ssh_connect_timeout = int(
+ os.environ.get('MCP_SSH_TIMEOUT', "15")
+ )
self.mcp_host = os.environ.get('MCP_ENV_HOST', None)
self.salt_port = os.environ.get('MCP_SALT_PORT', '6969')
self.threads = int(os.environ.get('MCP_THREADS', "5"))
+ self.script_execution_timeout = int(
+ os.environ.get('MCP_SCRIPT_RUN_TIMEOUT', "300")
+ )
self.skip_nodes = utils.node_string_to_list(os.environ.get(
'CFG_SKIP_NODES',
diff --git a/cfg_checker/helpers/console_utils.py b/cfg_checker/helpers/console_utils.py
index 1a2c184..0cfdecf 100644
--- a/cfg_checker/helpers/console_utils.py
+++ b/cfg_checker/helpers/console_utils.py
@@ -16,8 +16,13 @@
new_size = len(note)
if self._note_size > new_size:
_suffix = ' '*(self._note_size - new_size)
- _percent = (100 * index) // self.total
- _index = (self.bar_size * index) // self.total
+ if index:
+ _percent = (100 * index) // self.total
+ _index = (self.bar_size * index) // self.total
+ else:
+ _percent = 0
+ _index = 0
+
# clear the line
sys.stdout.write('\r')
# print new progress
diff --git a/cfg_checker/modules/network/__init__.py b/cfg_checker/modules/network/__init__.py
index 005ca38..2b806d6 100644
--- a/cfg_checker/modules/network/__init__.py
+++ b/cfg_checker/modules/network/__init__.py
@@ -6,7 +6,7 @@
command_help = "Network infrastructure checks and reports"
-supported_envs = [ENV_TYPE_SALT]
+supported_envs = [ENV_TYPE_SALT, ENV_TYPE_KUBE]
def _selectClass(_env, strClassHint="checker"):
@@ -96,7 +96,11 @@
# should not print map, etc...
# Just bare summary and errors
# Check if there is supported env found
- _env = args_utils.check_supported_env(ENV_TYPE_SALT, args, config)
+ _env = args_utils.check_supported_env(
+ [ENV_TYPE_SALT, ENV_TYPE_KUBE],
+ args,
+ config
+ )
# Start command
logger_cli.info("# Network check to console")
_skip, _skip_file = args_utils.get_skip_args(args)
@@ -122,7 +126,11 @@
def do_report(args, config):
# Network Report
# Check if there is supported env found
- _env = args_utils.check_supported_env(ENV_TYPE_SALT, args, config)
+ _env = args_utils.check_supported_env(
+ [ENV_TYPE_SALT],
+ args,
+ config
+ )
# Start command
logger_cli.info("# Network report (check, node map")
@@ -146,7 +154,11 @@
def do_map(args, config):
# Network Map
# Check if there is supported env found
- _env = args_utils.check_supported_env(ENV_TYPE_SALT, args, config)
+ _env = args_utils.check_supported_env(
+ [ENV_TYPE_SALT, ENV_TYPE_KUBE],
+ args,
+ config
+ )
# Start command
logger_cli.info("# Network report")
_skip, _skip_file = args_utils.get_skip_args(args)
@@ -156,7 +168,7 @@
skip_list=_skip,
skip_list_file=_skip_file
)
- networkMap.prepare_all_maps()
+ networkMap.map_networks()
networkMap.create_map()
networkMap.print_map()
@@ -166,7 +178,11 @@
def do_list(args, config):
# Network List
# Check if there is supported env found
- _env = args_utils.check_supported_env(ENV_TYPE_SALT, args, config)
+ _env = args_utils.check_supported_env(
+ [ENV_TYPE_SALT, ENV_TYPE_KUBE],
+ args,
+ config
+ )
# Start command
_skip, _skip_file = args_utils.get_skip_args(args)
_class = _selectClass(_env, strClassHint="mapper")
@@ -175,12 +191,14 @@
skip_list=_skip,
skip_list_file=_skip_file
)
- reclass = _map.map_network(_map.RECLASS)
- runtime = _map.map_network(_map.RUNTIME)
+ logger_cli.info("# Mapping networks")
+ if _env == ENV_TYPE_SALT:
+ reclass = _map.map_network(_map.RECLASS)
+ _s = [str(_n) for _n in reclass.keys()]
+ logger_cli.info("\n# Reclass networks list")
+ logger_cli.info("\n".join(_s))
- _s = [str(_n) for _n in reclass.keys()]
- logger_cli.info("\n# Reclass networks list")
- logger_cli.info("\n".join(_s))
+ runtime = _map.map_network(_map.RUNTIME)
_s = [str(_n) for _n in runtime.keys()]
logger_cli.info("\n# Runtime networks list")
logger_cli.info("\n".join(_s))
diff --git a/cfg_checker/modules/network/checker.py b/cfg_checker/modules/network/checker.py
index c2de426..b5809f3 100644
--- a/cfg_checker/modules/network/checker.py
+++ b/cfg_checker/modules/network/checker.py
@@ -11,9 +11,7 @@
self.errors = NetworkErrors()
def check_networks(self, map=True):
- self.mapper.map_network(self.mapper.RECLASS)
- self.mapper.map_network(self.mapper.RUNTIME)
-
+ self.mapper.map_networks()
self.mapper.create_map()
if map:
self.mapper.print_map()
diff --git a/cfg_checker/modules/network/mapper.py b/cfg_checker/modules/network/mapper.py
index 2e925f2..fff6bb6 100644
--- a/cfg_checker/modules/network/mapper.py
+++ b/cfg_checker/modules/network/mapper.py
@@ -4,6 +4,8 @@
from cfg_checker.common import logger_cli
from cfg_checker.common.exception import InvalidReturnException
+from cfg_checker.common.exception import ConfigException
+from cfg_checker.common.exception import KubeException
from cfg_checker.modules.network.network_errors import NetworkErrors
from cfg_checker.nodes import SaltNodes, KubeNodes
@@ -57,11 +59,6 @@
logger_cli.debug("... init error logs folder")
self.errors = NetworkErrors()
- def prepare_all_maps(self):
- self.map_network(self.RECLASS)
- self.map_network(self.RUNTIME)
- self.map_network(self.CONFIG)
-
# adding net data to tree
def _add_data(self, _list, _n, _h, _d):
if _n not in _list:
@@ -168,23 +165,17 @@
return _confs
- def _map_runtime_networks(self):
+ def _map_runtime_networks(self, result):
# class uses nodes from self.nodes dict
_runtime = {}
- logger_cli.info("# Mapping node runtime network data")
- self.master.prepare_script_on_active_nodes("ifs_data.py")
- _result = self.master.execute_script_on_active_nodes(
- "ifs_data.py",
- args=["json"]
- )
for key in self.master.nodes.keys():
# check if we are to work with this node
if not self.master.is_node_available(key):
continue
# due to much data to be passed from master,
# it is happening in order
- if key in _result:
- _text = _result[key]
+ if key in result:
+ _text = result[key]
if '{' in _text and '}' in _text:
_text = _text[_text.find('{'):]
else:
@@ -415,6 +406,40 @@
node_data['networks'][_ifname]['lines'] = lines
return _runtime
+
+class SaltNetworkMapper(NetworkMapper):
+ def __init__(
+ self,
+ config,
+ errors_class=None,
+ skip_list=None,
+ skip_list_file=None
+ ):
+ self.master = SaltNodes(config)
+ super(SaltNetworkMapper, self).__init__(
+ config,
+ errors_class=errors_class,
+ skip_list=skip_list,
+ skip_list_file=skip_list_file
+ )
+
+ def get_script_output(self):
+ """
+ Get runtime networks by executing script on nodes
+ """
+ logger_cli.info("# Mapping node runtime network data")
+ self.master.prepare_script_on_active_nodes("ifs_data.py")
+ _result = self.master.execute_script_on_active_nodes(
+ "ifs_data.py",
+ args="json"
+ )
+
+ return _result
+
+ def map_networks(self):
+ self.map_network(self.RECLASS)
+ self.map_network(self.RUNTIME)
+
def map_network(self, source):
# maps target network using given source
_networks = None
@@ -424,7 +449,8 @@
elif source == self.CONFIG:
_networks = self._map_configured_networks()
elif source == self.RUNTIME:
- _networks = self._map_runtime_networks()
+ _r = self.get_script_output()
+ _networks = self._map_runtime_networks(_r)
self.networks[source] = _networks
return _networks
@@ -662,9 +688,6 @@
# save map
self.map = _map
- # other runtime networks found
- # docker, etc
-
return
def print_map(self):
@@ -732,23 +755,7 @@
# " {0:8} {1}".format(hostname.split('.')[0], _text)
# )
# logger_cli.info("\n")
-
-
-class SaltNetworkMapper(NetworkMapper):
- def __init__(
- self,
- config,
- errors_class=None,
- skip_list=None,
- skip_list_file=None
- ):
- self.master = SaltNodes(config)
- super(SaltNetworkMapper, self).__init__(
- config,
- errors_class=errors_class,
- skip_list=skip_list,
- skip_list_file=skip_list_file
- )
+ return
class KubeNetworkMapper(NetworkMapper):
@@ -766,3 +773,156 @@
skip_list=skip_list,
skip_list_file=skip_list_file
)
+
+ def get_script_output(self, script, args=None):
+ """
+ Get runtime network by creating DaemonSet with Host network parameter
+ """
+ # prepare daemonset
+ logger_cli.info("-> Preparing daemonset to get node info")
+ _daemonset = self.master.prepare_daemonset(
+ "daemonset_template.yaml",
+ config_map=script
+ )
+
+ # wait for daemonset, normally less than 60 sec for all
+ # but still, let us give it 10 second per pod
+ _timeout = self.master.nodes.__len__() * 10
+ if not self.master.wait_for_daemonset(_daemonset, timeout=_timeout):
+ raise KubeException("Daemonset deployment fail")
+ logger_cli.info("-> Running script on daemonset")
+ # exec script on all pods in daemonset
+ _result = self.master.execute_script_on_daemon_set(
+ _daemonset,
+ script,
+ args=args
+ )
+
+ # delete daemonset
+ self.master.delete_daemonset(_daemonset)
+
+ return _result
+
+ def map_networks(self):
+ self.map_network(self.RUNTIME)
+
+ def map_network(self, source):
+ # maps target network using given source
+ _networks = None
+
+ if source == self.RUNTIME:
+ logger_cli.info("# Mapping node runtime network data")
+ _r = self.get_script_output("ifs_data.py", args="json")
+ _networks = self._map_runtime_networks(_r)
+ else:
+ raise ConfigException(
+ "Network type not supported in 'Kube': '{}'".format(source)
+ )
+
+ self.networks[source] = _networks
+ return _networks
+
+ def create_map(self):
+ """Create all needed elements for map output
+
+ :return: none
+ """
+ _runtime = self.networks[self.RUNTIME]
+
+ # main networks, target vars
+ _map = {}
+ # No matter of proto, at least one IP will be present for the network
+ # we interested in, since we are to make sure that L3 level
+ # is configured according to reclass model
+ for network in _runtime:
+ # shortcuts
+ _net = str(network)
+ _map[_net] = {}
+ # hostnames
+ names = sorted(_runtime[network].keys())
+ for hostname in names:
+ _notes = []
+ node = hostname.split('.')[0]
+ if not self.master.is_node_available(hostname, log=False):
+ logger_cli.info(
+ " {0:8} {1}".format(node, "node not available")
+ )
+ # add non-responsive node erorr
+ self.errors.add_error(
+ self.errors.NET_NODE_NON_RESPONSIVE,
+ host=hostname
+ )
+ _notes.append(
+ self.errors.get_error_type_text(
+ self.errors.NET_NODE_NON_RESPONSIVE
+ )
+ )
+ continue
+ # lookup interface name on node using network CIDR
+ _if_name = _runtime[network][hostname][0]["name"]
+ _raw = self.interfaces[hostname][_if_name]['runtime']
+ _if_name_suffix = ""
+ _a = _runtime[network][hostname]
+ for _host in _a:
+ for _if in _host['ifs']:
+ _ip_str = str(_if.exploded)
+
+ # Save all data
+ _values = {
+ "interface": _if_name,
+ "interface_note": _if_name_suffix,
+ "interface_map": "\n".join(_host['lines']),
+ "interface_matrix": _host['matrix'],
+ "ip_address": _ip_str,
+ "rt_mtu": _host['mtu'],
+ "status": _host['state'],
+ "raw_data": _raw,
+ }
+ if node in _map[_net]:
+ # add if to host
+ _map[_net][node].append(_values)
+ else:
+ _map[_net][node] = [_values]
+ _notes = []
+
+ # save map
+ self.map = _map
+ return
+
+ def print_map(self):
+ """
+ Create text report for CLI
+
+ :return: none
+ """
+ logger_cli.info("# Networks")
+ logger_cli.info(
+ " {0:8} {1:25} {2:25} {3:10} {4:10}".format(
+ "Host",
+ "IF",
+ "IP",
+ "MTU",
+ "State"
+ )
+ )
+ for network in self.map.keys():
+ logger_cli.info("-> {}".format(network))
+ for hostname in self.map[network].keys():
+ node = hostname.split('.')[0]
+ _n = self.map[network][hostname]
+ for _i in _n:
+ # Host IF IP Proto MTU State Gate Def.Gate
+ _text = "{:7} {:17} {:25} {:5} {:10}".format(
+ _i['interface'],
+ _i['interface_note'],
+ _i['ip_address'],
+ _i['rt_mtu'],
+ _i['status']
+ )
+ logger_cli.info(
+ " {0:8} {1}".format(
+ node,
+ _text
+ )
+ )
+ return
diff --git a/cfg_checker/nodes.py b/cfg_checker/nodes.py
index f0b7b38..0559132 100644
--- a/cfg_checker/nodes.py
+++ b/cfg_checker/nodes.py
@@ -1,7 +1,9 @@
import json
import os
+import yaml
from copy import deepcopy
from multiprocessing.dummy import Pool
+from time import sleep
from cfg_checker.clients import get_salt_remote, get_kube_remote
from cfg_checker.common.const import all_salt_roles_map, all_kube_roles_map
@@ -455,7 +457,7 @@
self.not_responded = [_n for _n in _r.keys() if not _r[_n]]
return _r
- def execute_script_on_active_nodes(self, script_filename, args=[]):
+ def execute_script_on_active_nodes(self, script_filename, args=None):
# Prepare path
_target_path = os.path.join(
'/root',
@@ -466,7 +468,7 @@
# execute script
logger_cli.debug("... running script")
# handle results for each node
- _script_arguments = " ".join(args) if args else ""
+ _script_arguments = args if args else ""
self.not_responded = []
_r = self.salt.cmd(
self.active_nodes_compound,
@@ -501,6 +503,34 @@
# simple salt rest client
self.kube = get_kube_remote(self.env_config)
self.env_type = ENV_TYPE_KUBE
+ self._namespace = "qa-space"
+ self._configmap_name = self.env_config.kube_scripts_folder
+
+ # prepare needed resources
+ self._check_namespace()
+ self._scripts = self._check_config_map()
+
+ def _check_namespace(self):
+ # ensure namespace
+ logger_cli.debug(
+ "... checking namespace '{}'".format(self._namespace)
+ )
+ if not self.kube.ensure_namespace(self._namespace):
+ raise KubeException(
+ "Failed to manage namespace '{}'".format(self._namespace)
+ )
+
+ def _check_config_map(self):
+ # ensure config map exists
+ logger_cli.debug(
+ "... checking config map '{}'".format(self._configmap_name)
+ )
+ _source = os.path.join(pkg_dir, 'scripts')
+ return self.kube.create_config_map(
+ self._namespace,
+ self._configmap_name,
+ _source
+ )
def gather_node_info(self, skip_list, skip_list_file):
# Gather nodes info and query pod lists for each node
@@ -520,7 +550,6 @@
# iterate through all accepted nodes and create a dict for it
self.nodes = {}
self.skip_list = []
- # _domains = set()
for _name in _node_names:
if _name in _skipped_nodes:
_status = NODE_SKIP
@@ -552,7 +581,6 @@
_nodes[_name]['addresses']['hostname']['address']
self.nodes[_name]['internalip'] = \
_nodes[_name]['addresses']['internalip']['address']
- # _domains.add(_name.split(".", 1)[1])
self.nodes[_name]['node_group'] = None
self.nodes[_name]['labels'] = _labels
self.nodes[_name]['roles'] = _roles
@@ -579,7 +607,7 @@
# "Multiple domains detected: {}".format(",".join(_domains))
# )
# else:
- # self.domain = _domains[0]
+ self.domain = "no.domain.in.kube.yet"
logger_cli.info(
"-> {} nodes collected: {} - active, {} - not active".format(
len(self.nodes),
@@ -624,19 +652,20 @@
return
@staticmethod
- def _get_ssh_shell(_h, _u, _k, _p, _q, _pipe):
+ def _get_ssh_shell(_h, _u, _k, _p, _q, _pipe, timeout=15):
_ssh = SshShell(
_h,
user=_u,
keypath=_k,
port=_p,
silent=_q,
- piped=_pipe
+ piped=_pipe,
+ timeout=timeout
)
return _ssh.connect()
@staticmethod
- def _do_ssh_cmd(_cmd, _h, _u, _k, _p, _q, _pipe):
+ def _do_ssh_cmd(_cmd, _h, _u, _k, _p, _q, _pipe, timeout=None):
with SshShell(
_h,
user=_u,
@@ -645,7 +674,10 @@
silent=_q,
piped=_pipe
) as ssh:
- _r = ssh.do(_cmd)
+ if timeout is None:
+ _r = ssh.do(_cmd)
+ else:
+ _r = ssh.do(_cmd, timeout=timeout)
logger_cli.debug("'{}'".format(_r))
return _r
@@ -662,7 +694,11 @@
_h = self.nodes[node]['internalip']
_p = 22
if self.kube.is_local or self.kube.config.ssh_direct:
- return None, self._get_ssh_shell(_h, _u, _k, _p, silent, piped)
+ return None,
+ self._get_ssh_shell(
+ _h, _u, _k, _p, silent, piped,
+ timeout=self.kube.config.ssh_connect_timeout
+ )
else:
_fh = "localhost"
_p = 10022 if not fport else fport
@@ -671,10 +707,19 @@
_h,
user=_u,
keypath=self.env_config.ssh_key,
- loc_port=_p
+ loc_port=_p,
+ timeout=self.kube.config.ssh_connect_timeout
)
_pfwd.connect()
- _ssh = self._get_ssh_shell(_fh, _u, _k, _p, silent, piped)
+ _ssh = self._get_ssh_shell(
+ _fh,
+ _u,
+ _k,
+ _p,
+ silent,
+ piped,
+ timeout=self.kube.config.ssh_connect_timeout
+ )
return _pfwd, _ssh
def execute_script_on_node(self, node, script_filename, args=[]):
@@ -725,7 +770,7 @@
return _r
- def _exec_script(self, params):
+ def _ssh_exec_script(self, params):
"""
Threadsafe method to get shell to node,
check/copy script and get results
@@ -737,6 +782,7 @@
args
]
"""
+ _timeout = self.kube.config.script_execution_timeout
_name = params[0]
_src = params[1]
_tgt = params[2]
@@ -757,7 +803,7 @@
_sh.do(_check.format(_python))
)
if not _python:
- _sh.do("apt install python3", sudo=True)
+ _sh.do("apt install python3", sudo=True, timeout=_timeout)
# check if script already there
_folder = os.path.join(
self.env_config.kube_node_homepath,
@@ -789,7 +835,8 @@
_tgt,
_args
),
- sudo=True
+ sudo=True,
+ timeout=_timeout
)
if _fwd_sh:
@@ -798,7 +845,7 @@
return [_name, _out]
- def execute_script_on_active_nodes(self, script_filename, args=[]):
+ def execute_script_on_active_nodes(self, script_filename, args=None):
# Prepare script
_source_path = os.path.join(pkg_dir, 'scripts', script_filename)
_target_path = os.path.join(
@@ -843,7 +890,7 @@
_port += 1
_progress = Progress(len(_params))
- results = pool.imap_unordered(self._exec_script, _params)
+ results = pool.imap_unordered(self._ssh_exec_script, _params)
for ii in enumerate(results, start=1):
if not ii[1][1]:
@@ -904,3 +951,188 @@
_fwd_sh.kill()
_sh.kill()
return _target_path
+
+ def prepare_daemonset(self, template_filename, config_map=None):
+ # load template
+ _yaml_file = os.path.join(pkg_dir, 'templates', template_filename)
+ logger_cli.debug("... loading template '{}'".format(_yaml_file))
+ _ds = {}
+ with open(_yaml_file) as dsFile:
+ _ds = yaml.load(dsFile, Loader=yaml.SafeLoader)
+
+ # Add scripts to pod template as volumeMounts
+ _tspec = _ds['spec']['template']['spec']
+ _tspec['containers'][0]['volumeMounts'] = [
+ {
+ "name": "scripts",
+ "mountPath": os.path.join(
+ "/",
+ self.env_config.kube_scripts_folder
+ )
+ }
+ ]
+
+ _tspec['volumes'] = [
+ {
+ "name": "scripts",
+ "configMap": {
+ "name": self._configmap_name
+ }
+ }
+ ]
+
+ # create daemonset
+ logger_cli.debug("... preparing daemonset")
+ return self.kube.prepare_daemonset_from_yaml(self._namespace, _ds)
+
+ def wait_for_daemonset(self, ds, timeout=120):
+ # iteration timeout
+ _sleep_time = 5
+ _timeout = timeout
+
+ # query daemonset and check that desired=scheduled=ready
+ _ds = self.kube.get_daemon_set_by_name(
+ ds.metadata.namespace,
+ ds.metadata.name
+ )
+
+ _total = len(self.nodes)
+ # _scheduled = _ds.status.scheduled
+ # _ready = _ds.status.ready
+
+ # Init Progress bar to show daemonset readiness
+ _progress = Progress(_total)
+ while _timeout > 0:
+ # get new status
+ _ds = self.kube.get_daemon_set_by_name(
+ ds.metadata.namespace,
+ ds.metadata.name
+ )
+ _desired = _ds.status.desired_number_scheduled
+ _scheduled = _ds.status.current_number_scheduled
+ _ready = _ds.status.number_ready
+ _updated = _ds.status.updated_number_scheduled
+ # print it
+ _progress.write_progress(
+ _ready,
+ note="desired: {}, scheduled: {}, ready: {},"
+ " up-to-date: {}".format(
+ _desired,
+ _scheduled,
+ _ready,
+ _updated
+ )
+ )
+
+ # check values and return
+ # In case of Update, also checking _updated value
+ if _ready == _updated and _ready == _total:
+ # close progress bar class
+ _progress.end()
+ logger_cli.debug("... daemonset is ready")
+ return True
+ # iterate
+ _timeout -= _sleep_time
+ # wait
+ sleep(_sleep_time)
+
+ # timed out
+ _progress.end()
+ # log it
+ logger_cli.error("Timed out waiting for Daemonset to be ready")
+ return False
+
+ def execute_script_on_daemon_set(self, ds, script_filename, args=None):
+ """
+ Query daemonset for pods and execute script on all of them
+ """
+ def _kube_exec_on_pod(plist):
+ return [
+ plist[1], # node
+ plist[3], # pod name
+ plist[0].kube.exec_on_target_pod( # pointer to function
+ plist[4], # cmd
+ plist[3], # pod name
+ plist[2], # namespace
+ strict=True,
+ _request_timeout=120,
+ )
+ ]
+
+ # get all pod names
+ logger_cli.debug("... extracting pod names from daemonset")
+ _pods = self.kube.CoreV1.list_namespaced_pod(
+ namespace=ds.metadata.namespace,
+ label_selector='name={}'.format(ds.metadata.name)
+ )
+ # Create map for threads: [[node_name, ns, pod_name, cmd]...]
+ logger_cli.debug(
+ "... runnning script on {} pods using {} threads at a time".format(
+ len(_pods.items),
+ self.env_config.threads
+ )
+ )
+ _plist = []
+ _arguments = args if args else ""
+ _cmd = [
+ "python3",
+ os.path.join(
+ "/",
+ self.env_config.kube_scripts_folder,
+ script_filename
+ ),
+ _arguments
+ ]
+ _cmd = " ".join(_cmd)
+ for item in _pods.items:
+ _plist.append(
+ [
+ self,
+ item.spec.node_name,
+ item.metadata.namespace,
+ item.metadata.name,
+ _cmd
+ ]
+ )
+
+ # map func and cmd
+ logger_cli
+ pool = Pool(self.env_config.threads)
+ _results = {}
+ self.not_responded = []
+ # create result list
+ _progress = Progress(len(_plist))
+ ret = pool.imap_unordered(_kube_exec_on_pod, _plist)
+
+ for ii in enumerate(ret, start=1):
+ if not ii[1][1]:
+ self.not_responded.append(ii[1][0])
+ else:
+ _results[ii[1][0]] = ii[1][2]
+ _progress.write_progress(ii[0])
+
+ _progress.end()
+ pool.close()
+ pool.join()
+ logger_cli.debug(
+ "... done, {} total outputs; {} not responded".format(
+ len(_results),
+ len(self.not_responded)
+ )
+ )
+ return _results
+
+ def delete_daemonset(self, ds):
+ # Try to delete daemonset
+ try:
+ _r = self.kube.delete_daemon_set_by_name(
+ ds.metadata.namespace,
+ ds.metadata.name
+ )
+ except Exception as e:
+ logger_cli.warning("Failed to delete daemonset '{}': {}".format(
+ ds.metadata.name,
+ e.reason
+ ))
+ _r = None
+ return _r
diff --git a/cfg_checker/reports/reporter.py b/cfg_checker/reports/reporter.py
index d103cb4..7ddbc4f 100644
--- a/cfg_checker/reports/reporter.py
+++ b/cfg_checker/reports/reporter.py
@@ -118,9 +118,9 @@
@six.add_metaclass(abc.ABCMeta)
class _Base(object):
- def __init__(self, salt_master=None):
+ def __init__(self, master=None):
self.jinja2_env = self.init_jinja2_env()
- self.salt_master = salt_master
+ self.master = master
@abc.abstractmethod
def __call__(self, payload):
@@ -230,7 +230,7 @@
def _dmidecode(_dict, type=0):
# _key = "dmi"
_key_r = "dmi_r"
- _f_cmd = self.salt_master.get_cmd_for_nodes
+ _f_cmd = self.master.get_cmd_for_nodes
_cmd = "dmidecode -t {}".format(type)
_f_cmd(_cmd, _key_r, target_dict=_dict)
# TODO: parse BIOS output or given type
@@ -239,7 +239,7 @@
def _lsblk(_dict):
# _key = "lsblk"
_key_r = "lsblk_raw"
- _f_cmd = self.salt_master.get_cmd_for_nodes
+ _f_cmd = self.master.get_cmd_for_nodes
_columns = [
"NAME",
"HCTL",
@@ -260,7 +260,7 @@
_key = "lscpu"
_key_r = "lscpu_raw"
# get all of the values
- _f_cmd = self.salt_master.get_cmd_for_nodes
+ _f_cmd = self.master.get_cmd_for_nodes
_cmd = "lscpu | sed -n '/\\:/s/ \\+/ /gp'"
_f_cmd(_cmd, _key_r, target_dict=_dict)
# parse them and put into dict
@@ -290,7 +290,7 @@
def _free(_dict):
_key = "ram"
_key_r = "ram_raw"
- _f_cmd = self.salt_master.get_cmd_for_nodes
+ _f_cmd = self.master.get_cmd_for_nodes
_cmd = "free -h | sed -n '/Mem/s/ \\+/ /gp'"
_f_cmd(_cmd, _key_r, target_dict=_dict)
# parse them and put into dict
@@ -323,7 +323,7 @@
def _services(_dict):
_key = "services"
_key_r = "services_raw"
- _f_cmd = self.salt_master.get_cmd_for_nodes
+ _f_cmd = self.master.get_cmd_for_nodes
_cmd = "service --status-all"
_f_cmd(_cmd, _key_r, target_dict=_dict)
for node, dt in _dict.items():
@@ -350,7 +350,7 @@
def _vcp_status(_dict):
_key = "virsh"
_key_r = "virsh_raw"
- self.salt_master.get_cmd_for_nodes(
+ self.master.get_cmd_for_nodes(
"virsh list --all | sed -n -e '/[0-9]/s/ \\+/ /gp'",
_key_r,
target_dict=_dict,
@@ -383,7 +383,7 @@
def _soft_net_stats(_dict):
_key = "net_stats"
_key_r = "net_stats_raw"
- _f_cmd = self.salt_master.get_cmd_for_nodes
+ _f_cmd = self.master.get_cmd_for_nodes
_cmd = "cat /proc/net/softnet_stat; echo \\#; " \
"sleep {}; cat /proc/net/softnet_stat".format(
_softnet_interval
@@ -465,7 +465,7 @@
}
# get kernel version
- self.salt_master.get_cmd_for_nodes(
+ self.master.get_cmd_for_nodes(
"uname -r",
"kernel",
target_dict=data["nodes"]
@@ -481,7 +481,7 @@
# sample: /dev/vda1 78G 33G 45G 43%
_key = "disk"
_key_r = "disk_raw"
- self.salt_master.get_cmd_for_nodes(
+ self.master.get_cmd_for_nodes(
"df -h | sed -n '/^\\/dev/s/ \\+/ /gp' | cut -d\" \" -f 1-5",
"disk_raw",
target_dict=data["nodes"]
@@ -526,12 +526,12 @@
for net, net_v in data['map'].items():
for node, ifs in net_v.items():
for d in ifs:
- _err = "fail"
- d['interface_error'] = _err if d['interface_error'] else ""
- d['mtu_error'] = _err if d['mtu_error'] else ""
- d['status_error'] = _err if d['status_error'] else ""
+ _e = "fail"
+ d['interface_error'] = _e if 'interface_error' in d else ""
+ d['mtu_error'] = _e if 'mtu_error' in d else ""
+ d['status_error'] = _e if 'status_error' in d else ""
d['subnet_gateway_error'] = \
- _err if d['subnet_gateway_error'] else ""
+ _e if 'subnet_gateway_error' in d else ""
_services(data["nodes"])
# vcp status