Multi env support and Kube client integration
Kube friendly Beta
Package versions supports Kube env
Added:
- Env type detection
- New option: --use-env, for selecting env
when function supports multiple detected envs
- Updated config loading
- Each module and command type has supported env check
and stops execution if it is on unsupported env
- Functions can support multiple envs
- Kubernetes dependency
- Kubenernetes API detection: local and remote
- Package checking class hierachy for using Salt or Kube
- Remote pod execution routine
- Flexible SSH/SSH Forwarder classes: with, ssh,do(), etc
- Multithreaded SSH script execution
- Number of workers parameter, default 5
Fixed:
- Config dependency
- Command loading with supported envs list
- Unittests structure and execution flow updated
- Unittests fixes
- Fixed debug mode handling
- Unified command type/support routine
- Nested attrs getter/setter
Change-Id: I3ade693ac21536e2b5dcee4b24d511749dc72759
Related-PROD: PROD-35811
diff --git a/cfg_checker/nodes.py b/cfg_checker/nodes.py
index e028be2..e568cec 100644
--- a/cfg_checker/nodes.py
+++ b/cfg_checker/nodes.py
@@ -1,38 +1,132 @@
import json
import os
from copy import deepcopy
+from multiprocessing.dummy import Pool
-from cfg_checker.clients import get_salt_remote, salt
-from cfg_checker.common import config
-from cfg_checker.common.const import all_roles_map
+from cfg_checker.clients import get_salt_remote, get_kube_remote
+from cfg_checker.common.const import all_salt_roles_map, all_kube_roles_map
from cfg_checker.common.const import NODE_UP, NODE_DOWN, NODE_SKIP
+from cfg_checker.common.const import ubuntu_versions, nova_openstack_versions
from cfg_checker.common import logger, logger_cli
from cfg_checker.common import utils
-from cfg_checker.common.exception import SaltException
-from cfg_checker.common.settings import pkg_dir
+from cfg_checker.common.file_utils import create_temp_file_with_content
+from cfg_checker.common.exception import SaltException, KubeException
+from cfg_checker.common.ssh_utils import PortForward, SshShell
+from cfg_checker.common.settings import pkg_dir, ENV_TYPE_KUBE, ENV_TYPE_SALT
+from cfg_checker.helpers.console_utils import Progress
+
node_tmpl = {
'role': '',
'node_group': '',
'status': NODE_DOWN,
'pillars': {},
- 'grains': {}
+ 'grains': {},
+ 'raw': {}
}
-class SaltNodes(object):
- def __init__(self):
+def _prepare_skipped_nodes(_names, skip_list, skip_list_file):
+ _skipped_minions = []
+ # skip list file
+ if skip_list_file:
+ _valid, _invalid = utils.get_nodes_list(skip_list_file)
+ logger_cli.info(
+ "\n# WARNING: Detected invalid entries "
+ "in nodes skip list: {}\n".format(
+ "\n".join(_invalid)
+ )
+ )
+ _skipped_minions.extend(_valid)
+ # process wildcard, create node list out of mask
+ if skip_list:
+ _list = []
+ _invalid = []
+ for _item in skip_list:
+ if '*' in _item:
+ _str = _item[:_item.index('*')]
+ _nodes = [_m for _m in _names if _m.startswith(_str)]
+ if not _nodes:
+ logger_cli.warn(
+ "# WARNING: No nodes found for {}".format(_item)
+ )
+ _list.extend(_nodes)
+ else:
+ if _item in _names:
+ _list += _item
+ else:
+ logger_cli.warn(
+ "# WARNING: No node found for {}".format(_item)
+ )
+ # removing duplicates
+ _list = list(set(_list))
+ _skipped_minions.extend(_list)
+
+ return _skipped_minions
+
+
+class Nodes(object):
+ def __init__(self, config):
+ self.nodes = None
+ self.env_config = config
+
+ def skip_node(self, node):
+ # Add node to skip list
+ # Fro example if it is fails to comply with the rules
+
+ # check if we know such node
+ if node in self.nodes.keys() and node not in self.skip_list:
+ # yes, add it
+ self.skip_list.append(node)
+ return True
+ else:
+ return False
+
+ def get_nodes(self, skip_list=None, skip_list_file=None):
+ if not self.nodes:
+ if not skip_list and self.env_config.skip_nodes:
+ self.gather_node_info(
+ self.env_config.skip_nodes,
+ skip_list_file
+ )
+ else:
+ self.gather_node_info(skip_list, skip_list_file)
+ return self.nodes
+
+ def get_info(self):
+ _info = {
+ 'mcp_release': self.mcp_release,
+ 'openstack_release': self.openstack_release
+ }
+ return _info
+
+ def is_node_available(self, node, log=True):
+ if node in self.skip_list:
+ if log:
+ logger_cli.info("-> node '{}' not active".format(node))
+ return False
+ elif node in self.not_responded:
+ if log:
+ logger_cli.info("-> node '{}' not responded".format(node))
+ return False
+ else:
+ return True
+
+
+class SaltNodes(Nodes):
+ def __init__(self, config):
+ super(SaltNodes, self).__init__(config)
logger_cli.info("# Gathering environment information")
# simple salt rest client
- self.salt = salt
- self.nodes = None
+ self.salt = None
+ self.env_type = ENV_TYPE_SALT
def gather_node_info(self, skip_list, skip_list_file):
# Keys for all nodes
# this is not working in scope of 2016.8.3, will overide with list
logger_cli.debug("... collecting node names existing in the cloud")
if not self.salt:
- self.salt = get_salt_remote(config)
+ self.salt = get_salt_remote(self.env_config)
try:
_keys = self.salt.list_keys()
@@ -56,7 +150,7 @@
)
elif not self.node_keys:
# this is the last resort
- _minions = config.load_nodes_list()
+ _minions = self.env_config.load_nodes_list()
logger_cli.info(
"-> {} nodes loaded from list file".format(len(_minions))
)
@@ -64,40 +158,8 @@
_minions = self.node_keys['minions']
# Skip nodes if needed
- _skipped_minions = []
- # skip list file
- if skip_list_file:
- _valid, _invalid = utils.get_nodes_list(skip_list_file)
- logger_cli.info(
- "\n# WARNING: Detected invalid entries "
- "in nodes skip list: {}\n".format(
- "\n".join(_invalid)
- )
- )
- _skipped_minions.extend(_valid)
- # process wildcard, create node list out of mask
- if skip_list:
- _list = []
- _invalid = []
- for _item in skip_list:
- if '*' in _item:
- _str = _item[:_item.index('*')]
- _nodes = [_m for _m in _minions if _m.startswith(_str)]
- if not _nodes:
- logger_cli.warn(
- "# WARNING: No nodes found for {}".format(_item)
- )
- _list.extend(_nodes)
- else:
- if _item in _minions:
- _list += _item
- else:
- logger_cli.warn(
- "# WARNING: No node found for {}".format(_item)
- )
- # removing duplicates
- _list = list(set(_list))
- _skipped_minions.extend(_list)
+ _skipped_minions = \
+ _prepare_skipped_nodes(_minions, skip_list, skip_list_file)
# in case API not listed minions, we need all that answer ping
_active = self.salt.get_active_nodes()
@@ -108,7 +170,7 @@
_domains = set()
for _name in _minions:
_nc = utils.get_node_code(_name)
- _rmap = all_roles_map
+ _rmap = all_salt_roles_map
_role = _rmap[_nc] if _nc in _rmap else 'unknown'
if _name in _skipped_minions:
_status = NODE_SKIP
@@ -151,7 +213,7 @@
# lambda nd: self.nodes[nd]['role'] == const.all_roles_map['cfg'],
# self.nodes
# )
- _role = all_roles_map['cfg']
+ _role = all_salt_roles_map['cfg']
_filtered = [n for n, v in self.nodes.items() if v['role'] == _role]
if len(_filtered) < 1:
raise SaltException(
@@ -180,33 +242,6 @@
_n['linux_codename'] = _p['linux_system_codename']
_n['linux_arch'] = _p['linux_system_architecture']
- def skip_node(self, node):
- # Add node to skip list
- # Fro example if it is fails to comply with the rules
-
- # check if we know such node
- if node in self.nodes.keys() and node not in self.skip_list:
- # yes, add it
- self.skip_list.append(node)
- return True
- else:
- return False
-
- def get_nodes(self, skip_list=None, skip_list_file=None):
- if not self.nodes:
- if not skip_list and config.skip_nodes:
- self.gather_node_info(config.skip_nodes, skip_list_file)
- else:
- self.gather_node_info(skip_list, skip_list_file)
- return self.nodes
-
- def get_info(self):
- _info = {
- 'mcp_release': self.mcp_release,
- 'openstack_release': self.openstack_release
- }
- return _info
-
def get_cmd_for_nodes(self, cmd, target_key, target_dict=None, nodes=None):
"""Function runs. cmd.run and parses result into place
or into dict structure provided
@@ -243,7 +278,7 @@
logger_cli.debug(
"... '{}' not responded after '{}'".format(
node,
- config.salt_timeout
+ self.env_config.salt_timeout
)
)
data[target_key] = None
@@ -283,7 +318,7 @@
logger_cli.debug(
"... '{}' not responded after '{}'".format(
node,
- config.salt_timeout
+ self.env_config.salt_timeout
)
)
_data[_pillar_keys[-1]] = None
@@ -295,7 +330,7 @@
# this function assumes that all folders are created
_dumps = json.dumps(_dict, indent=2).splitlines()
_storage_path = os.path.join(
- config.salt_file_root, config.salt_scripts_folder
+ self.env_config.salt_file_root, self.env_config.salt_scripts_folder
)
logger_cli.debug(
"... uploading data as '{}' "
@@ -307,12 +342,12 @@
_cache_path = os.path.join(_storage_path, filename)
_source_path = os.path.join(
'salt://',
- config.salt_scripts_folder,
+ self.env_config.salt_scripts_folder,
filename
)
_target_path = os.path.join(
'/root',
- config.salt_scripts_folder,
+ self.env_config.salt_scripts_folder,
filename
)
@@ -334,7 +369,7 @@
with open(_p, 'rt') as fd:
_script = fd.read().splitlines()
_storage_path = os.path.join(
- config.salt_file_root, config.salt_scripts_folder
+ self.env_config.salt_file_root, self.env_config.salt_scripts_folder
)
logger_cli.debug(
"... uploading script {} "
@@ -348,12 +383,12 @@
_cache_path = os.path.join(_storage_path, script_filename)
_source_path = os.path.join(
'salt://',
- config.salt_scripts_folder,
+ self.env_config.salt_scripts_folder,
script_filename
)
_target_path = os.path.join(
'/root',
- config.salt_scripts_folder,
+ self.env_config.salt_scripts_folder,
script_filename
)
@@ -370,7 +405,7 @@
self.active_nodes_compound,
os.path.join(
'/root',
- config.salt_scripts_folder
+ self.env_config.salt_scripts_folder
),
tgt_type="compound"
)
@@ -388,7 +423,7 @@
# Prepare path
_target_path = os.path.join(
'/root',
- config.salt_scripts_folder,
+ self.env_config.salt_scripts_folder,
script_filename
)
@@ -412,7 +447,7 @@
# Prepare path
_target_path = os.path.join(
'/root',
- config.salt_scripts_folder,
+ self.env_config.salt_scripts_folder,
script_filename
)
@@ -446,17 +481,410 @@
self.not_responded = [_n for _n in _r.keys() if not _r[_n]]
return _r
- def is_node_available(self, node, log=True):
- if node in self.skip_list:
- if log:
- logger_cli.info("-> node '{}' not active".format(node))
- return False
- elif node in self.not_responded:
- if log:
- logger_cli.info("-> node '{}' not responded".format(node))
- return False
+
+class KubeNodes(Nodes):
+ def __init__(self, config):
+ super(KubeNodes, self).__init__(config)
+ logger_cli.info("# Gathering environment information")
+ # simple salt rest client
+ self.kube = get_kube_remote(self.env_config)
+ self.env_type = ENV_TYPE_KUBE
+
+ def gather_node_info(self, skip_list, skip_list_file):
+ # Gather nodes info and query pod lists for each node
+ logger_cli.debug("... collecting node names existing in the cloud")
+
+ # Gather node names and info
+ _nodes = self.kube.get_node_info()
+ _node_names = list(_nodes.keys())
+ # Skip nodes if needed
+ _skipped_nodes = \
+ _prepare_skipped_nodes(_node_names, skip_list, skip_list_file)
+
+ # Count how many nodes active
+ self._active = [n for n, v in _nodes.items()
+ if v['conditions']['ready']['status']]
+
+ # iterate through all accepted nodes and create a dict for it
+ self.nodes = {}
+ self.skip_list = []
+ # _domains = set()
+ for _name in _node_names:
+ if _name in _skipped_nodes:
+ _status = NODE_SKIP
+ self.skip_list.append(_name)
+ else:
+ _status = NODE_UP if _name in self._active else NODE_DOWN
+ if _status == NODE_DOWN:
+ self.skip_list.append(_name)
+ logger_cli.info(
+ "-> '{}' shows 'Ready' as 'False', "
+ "added to skip list".format(
+ _name
+ )
+ )
+ _roles = {}
+ _labels = {}
+ for _label, _value in _nodes[_name]['labels'].items():
+ if _label in all_kube_roles_map:
+ _roles[all_kube_roles_map[_label]] = _value
+ else:
+ _labels[_label] = _value
+
+ self.nodes[_name] = deepcopy(node_tmpl)
+ self.nodes[_name].pop("grains")
+ self.nodes[_name].pop("pillars")
+
+ # hostname
+ self.nodes[_name]['shortname'] = \
+ _nodes[_name]['addresses']['hostname']['address']
+ self.nodes[_name]['internalip'] = \
+ _nodes[_name]['addresses']['internalip']['address']
+ # _domains.add(_name.split(".", 1)[1])
+ self.nodes[_name]['node_group'] = None
+ self.nodes[_name]['labels'] = _labels
+ self.nodes[_name]['roles'] = _roles
+ self.nodes[_name]['status'] = _status
+ # Backward compatibility
+ _info = _nodes[_name]['status']['node_info']
+ self.nodes[_name]['linux_image'] = _info['os_image']
+ self.nodes[_name]['linux_arch'] = _info['architecture']
+
+ _codename = "unknown"
+ _n, _v, _c = _info['os_image'].split()
+ if _n.lower() == 'ubuntu':
+ _v, _, _ = _v.rpartition('.') if '.' in _v else (_v, "", "")
+ if _v in ubuntu_versions:
+ _codename = ubuntu_versions[_v].split()[0].lower()
+ self.nodes[_name]['linux_codename'] = _codename
+
+ # Consider per-data type transfer
+ self.nodes[_name]["raw"] = _nodes[_name]
+ # TODO: Investigate how to handle domains in Kube, probably - skip
+ # _domains = list(_domains)
+ # if len(_domains) > 1:
+ # logger_cli.warning(
+ # "Multiple domains detected: {}".format(",".join(_domains))
+ # )
+ # else:
+ # self.domain = _domains[0]
+ logger_cli.info(
+ "-> {} nodes collected: {} - active, {} - not active".format(
+ len(self.nodes),
+ len(self._active),
+ len(self.skip_list)
+ )
+ )
+
+ _role = "k8s-master"
+ _filtered = [n for n, v in self.nodes.items() if _role in v['roles']]
+ if len(_filtered) < 1:
+ raise KubeException(
+ "No k8s-master nodes detected! Check/Update node role map."
+ )
else:
- return True
+ _r = [n for n, v in self.nodes.items()
+ if v['status'] != NODE_UP and _role in v['roles']]
+ if len(_r) > 0:
+ logger_cli.warn(
+ "Master nodes are reporting 'NotReady:\n{}".format(
+ "\n".join(_r)
+ )
+ )
+ self.kube.master_node = _filtered[0]
+ # get specific data upfront
+ # OpenStack versions
+ self.mcp_release = ""
+ # Quick and Dirty way to detect OS release
+ _nova_version = self.kube.exec_on_target_pod(
+ "nova-manage --version",
+ "nova-api-osapi",
+ "openstack"
+ )
+ _nmajor = _nova_version.partition('.')[0]
+ self.openstack_release = nova_openstack_versions[_nmajor]
-salt_master = SaltNodes()
+ return
+
+ @staticmethod
+ def _get_ssh_shell(_h, _u, _k, _p, _q, _pipe):
+ _ssh = SshShell(
+ _h,
+ user=_u,
+ keypath=_k,
+ port=_p,
+ silent=_q,
+ piped=_pipe
+ )
+ return _ssh.connect()
+
+ @staticmethod
+ def _do_ssh_cmd(_cmd, _h, _u, _k, _p, _q, _pipe):
+ with SshShell(
+ _h,
+ user=_u,
+ keypath=_k,
+ port=_p,
+ silent=_q,
+ piped=_pipe
+ ) as ssh:
+ _r = ssh.do(_cmd)
+ logger_cli.debug("'{}'".format(_r))
+ return _r
+
+ def node_shell(
+ self,
+ node,
+ silent=True,
+ piped=True,
+ use_sudo=True,
+ fport=None
+ ):
+ _u = self.env_config.kube_node_user
+ _k = self.env_config.kube_node_keypath
+ _h = self.nodes[node]['internalip']
+ _p = 22
+ if self.kube.is_local:
+ return None, self._get_ssh_shell(_h, _u, _k, _p, silent, piped)
+ else:
+ _fh = "localhost"
+ _p = 10022 if not fport else fport
+ _pfwd = PortForward(
+ self.env_config.ssh_host,
+ _h,
+ user=_u,
+ keypath=self.env_config.ssh_key,
+ loc_port=_p
+ )
+ _pfwd.connect()
+ _ssh = self._get_ssh_shell(_fh, _u, _k, _p, silent, piped)
+ return _pfwd, _ssh
+
+ def execute_script_on_node(self, node, script_filename, args=[]):
+ # Prepare path
+ _target_path = os.path.join(
+ self.env_config.node_homepath,
+ self.env_config.kube_scripts_folder,
+ script_filename
+ )
+
+ # execute script
+ logger_cli.debug("... running script on '{}'".format(node))
+ # handle results for each node
+ _script_arguments = " ".join(args) if args else ""
+ self.not_responded = []
+ # get result
+ _nr = self.node_shell(
+ node,
+ "python {} {}".format(
+ _target_path,
+ _script_arguments
+ )
+ )
+
+ if not _nr:
+ self.not_responded.append(node)
+ return {}
+ else:
+ return {node: _nr}
+
+ def execute_cmd_on_active_nodes(self, cmd, nodes=None):
+ # execute script
+ logger_cli.debug("...running '{}' on active nodes".format(cmd))
+ # handle results for each node
+ self.not_responded = []
+ _r = {}
+ # TODO: Use threading and pool
+ for node in self._active:
+ _nr = self.node_shell(
+ node,
+ cmd
+ )
+
+ if not _nr:
+ self.not_responded.append(node)
+ else:
+ _r[node] = _nr
+
+ return _r
+
+ def _exec_script(self, params):
+ """
+ Threadsafe method to get shell to node,
+ check/copy script and get results
+ [
+ node_name,
+ src_path,
+ tgt_path,
+ conf,
+ args
+ ]
+ """
+ _name = params[0]
+ _src = params[1]
+ _tgt = params[2]
+ _conf = params[3]
+ _args = params[4]
+ _port = params[5]
+ _log_name = "["+_name+"]:"
+ _check = "echo $(if [[ -s '{}' ]]; then echo True; " \
+ "else echo False; fi)"
+ _fwd_sh, _sh = self.node_shell(
+ _name,
+ use_sudo=False,
+ fport=_port
+ )
+ # check python3
+ _python = _sh.do("which python3")
+ _python = utils.to_bool(
+ _sh.do(_check.format(_python))
+ )
+ if not _python:
+ _sh.do("apt install python3", sudo=True)
+ # check if script already there
+ _folder = os.path.join(
+ self.env_config.node_homepath,
+ _conf.kube_scripts_folder
+ )
+ # check if folder exists
+ _folder_exists = utils.to_bool(
+ _sh.do(_check.format(_folder))
+ )
+ if not _folder_exists:
+ _sh.do("mkdir " + _folder)
+ logger.info("{} Syncing file".format(_log_name))
+ _code, _r, _e = _sh.scp(
+ _src,
+ _sh.get_host_path(_tgt),
+ )
+ # handle error code
+ if _code:
+ logger_cli.warn(
+ "{} Error in scp:\n"
+ "\tstdout:'{}'\n"
+ "\tstderr:'{}'".format(_log_name, _r, _e)
+ )
+
+ # execute script
+ logger.debug("{} Running script".format(_log_name))
+ _out = _sh.do(
+ "python3 {}{}".format(
+ _tgt,
+ _args
+ ),
+ sudo=True
+ )
+
+ if _fwd_sh:
+ _fwd_sh.kill()
+ _sh.kill()
+
+ return [_name, _out]
+
+ def execute_script_on_active_nodes(self, script_filename, args=[]):
+ # Prepare script
+ _source_path = os.path.join(pkg_dir, 'scripts', script_filename)
+ _target_path = os.path.join(
+ self.env_config.node_homepath,
+ self.env_config.kube_scripts_folder,
+ script_filename
+ )
+ # handle results for each node
+ _script_arguments = " ".join(args) if args else ""
+ if _script_arguments:
+ _script_arguments = " " + _script_arguments
+ self.not_responded = []
+ _results = {}
+ logger_cli.debug(
+ "...running '{}' on active nodes, {} worker threads".format(
+ script_filename,
+ self.env_config.threads
+ )
+ )
+ # Workers pool
+ pool = Pool(self.env_config.threads)
+
+ # init the parameters
+ # node_name,
+ # src_path,
+ # tgt_path,
+ # conf,
+ # args
+ _params = []
+ _port = 10022
+ for node in self._active:
+ # build parameter blocks
+ _p_list = [
+ node,
+ _source_path,
+ _target_path,
+ self.env_config,
+ _script_arguments,
+ _port
+ ]
+ _params.append(_p_list)
+ _port += 1
+
+ _progress = Progress(len(_params))
+ results = pool.imap_unordered(self._exec_script, _params)
+
+ for ii in enumerate(results, start=1):
+ if not ii[1][1]:
+ self.not_responded.append(ii[1][0])
+ else:
+ _results[ii[1][0]] = ii[1][1]
+ _progress.write_progress(ii[0])
+
+ _progress.end()
+ pool.close()
+ pool.join()
+
+ # return path on nodes, just in case
+ return _results
+
+ def prepare_json_on_node(self, node, _dict, filename):
+ # this function assumes that all folders are created
+ _dumps = json.dumps(_dict, indent=2).splitlines()
+
+ _source_path = create_temp_file_with_content(_dumps)
+ _target_path = os.path.join(
+ self.env_config.node_homepath,
+ self.env_config.kube_scripts_folder,
+ filename
+ )
+ _folder = os.path.join(
+ self.env_config.node_homepath,
+ self.env_config.kube_scripts_folder
+ )
+ _check = "echo $(if [[ -s '{}' ]]; then echo True; " \
+ "else echo False; fi)"
+ _fwd_sh, _sh = self.node_shell(
+ node,
+ use_sudo=False
+ )
+
+ # check if folder exists
+ _folder_exists = utils.to_bool(
+ _sh.do(_check.format(_folder))
+ )
+ if not _folder_exists:
+ _sh.do("mkdir " + _folder)
+ logger_cli.debug(
+ "...create data on node '{}':'{}'".format(node, _target_path)
+ )
+ _code, _r, _e = _sh.scp(
+ _source_path,
+ _sh.get_host_path(_target_path),
+ )
+ # handle error code
+ if _code:
+ logger_cli.warn(
+ "Error in scp:\n"
+ "\tstdout:'{}'\n"
+ "\tstderr:'{}'".format(_r, _e)
+ )
+
+ _fwd_sh.kill()
+ _sh.kill()
+ return _target_path