blob: fdda35877d2f59ab563c089653f7697b74ec1389 [file] [log] [blame]
# Author: Alex Savatieiev (osavatieiev@mirantis.com; a.savex@gmail.com)
# Copyright 2019-2022 Mirantis, Inc.
import json
import os
import yaml
from copy import deepcopy
from multiprocessing.dummy import Pool
from time import sleep
from cfg_checker.clients import get_salt_remote, get_kube_remote
from cfg_checker.common.const import all_salt_roles_map, all_kube_roles_map
from cfg_checker.common.const import NODE_UP, NODE_DOWN, NODE_SKIP
from cfg_checker.common.const import ubuntu_versions, nova_openstack_versions
from cfg_checker.common import logger, logger_cli
from cfg_checker.common import utils
from cfg_checker.common.file_utils import create_temp_file_with_content
from cfg_checker.common.exception import SaltException, KubeException
from cfg_checker.common.ssh_utils import PortForward, SshShell
from cfg_checker.common.settings import pkg_dir, ENV_TYPE_KUBE, ENV_TYPE_SALT
from cfg_checker.helpers.console_utils import Progress
node_tmpl = {
'role': '',
'node_group': '',
'status': NODE_DOWN,
'pillars': {},
'grains': {},
'raw': {}
}
def _prepare_skipped_nodes(_names, skip_list, skip_list_file):
_skipped_minions = []
# skip list file
if skip_list_file:
_valid, _invalid = utils.get_nodes_list(skip_list_file)
_skipped_minions.extend(_valid)
if len(_invalid) < 1:
logger_cli.info(
"\n# WARNING: Detected invalid entries "
"in nodes skip list:\n{}\n".format(
"\n".join(_invalid)
)
)
# process wildcard, create node list out of mask
if skip_list:
_list = []
_invalid = []
for _item in skip_list:
if '*' in _item:
_str = _item[:_item.index('*')]
_nodes = [_m for _m in _names if _m.startswith(_str)]
if not _nodes:
logger_cli.warn(
"# WARNING: No nodes found for {}".format(_item)
)
_list.extend(_nodes)
else:
if _item in _names:
_list += _item
else:
logger_cli.warn(
"# WARNING: No node found for {}".format(_item)
)
# removing duplicates
_list = list(set(_list))
_skipped_minions.extend(_list)
return _skipped_minions
class Nodes(object):
def __init__(self, config):
self.nodes = None
self.env_config = config
def skip_node(self, node):
# Add node to skip list
# Fro example if it is fails to comply with the rules
# check if we know such node
if node in self.nodes.keys() and node not in self.skip_list:
# yes, add it
self.skip_list.append(node)
return True
else:
return False
def get_nodes(self, skip_list=None, skip_list_file=None):
if not self.nodes:
if not skip_list and self.env_config.skip_nodes:
self.gather_node_info(
self.env_config.skip_nodes,
skip_list_file
)
else:
self.gather_node_info(skip_list, skip_list_file)
return self.nodes
def get_info(self):
_info = {
'mcp_release': self.mcp_release,
'openstack_release': self.openstack_release
}
return _info
def is_node_available(self, node, log=True):
if node in self.skip_list:
if log:
logger_cli.info("-> node '{}' not active".format(node))
return False
elif node in self.not_responded:
if log:
logger_cli.info("-> node '{}' not responded".format(node))
return False
else:
return True
class SaltNodes(Nodes):
def __init__(self, config):
super(SaltNodes, self).__init__(config)
logger_cli.info("# Gathering environment information")
# simple salt rest client
self.salt = None
self.env_type = ENV_TYPE_SALT
def gather_node_info(self, skip_list, skip_list_file):
# Keys for all nodes
# this is not working in scope of 2016.8.3, will overide with list
logger_cli.debug("... collecting node names existing in the cloud")
if not self.salt:
self.salt = get_salt_remote(self.env_config)
try:
_keys = self.salt.list_keys()
_str = []
for _k, _v in _keys.items():
_str.append("{}: {}".format(_k, len(_v)))
logger_cli.info("-> keys collected: {}".format(", ".join(_str)))
self.node_keys = {
'minions': _keys['minions']
}
except Exception:
_keys = None
self.node_keys = None
# List of minions with grains
_minions = self.salt.list_minions()
if _minions:
logger_cli.info(
"-> api reported {} active minions".format(len(_minions))
)
elif not self.node_keys:
# this is the last resort
_minions = self.env_config.load_nodes_list()
logger_cli.info(
"-> {} nodes loaded from list file".format(len(_minions))
)
else:
_minions = self.node_keys['minions']
# Skip nodes if needed
_skipped_minions = \
_prepare_skipped_nodes(_minions, skip_list, skip_list_file)
# in case API not listed minions, we need all that answer ping
_active = self.salt.get_active_nodes()
logger_cli.info("-> nodes responded: {}".format(len(_active)))
# iterate through all accepted nodes and create a dict for it
self.nodes = {}
self.skip_list = []
_domains = set()
for _name in _minions:
_nc = utils.get_node_code(_name)
_rmap = all_salt_roles_map
_role = _rmap[_nc] if _nc in _rmap else 'unknown'
if _name in _skipped_minions:
_status = NODE_SKIP
self.skip_list.append(_name)
else:
_status = NODE_UP if _name in _active else NODE_DOWN
if _status == NODE_DOWN:
self.skip_list.append(_name)
logger_cli.info(
"-> '{}' is down, "
"added to skip list".format(
_name
)
)
self.nodes[_name] = deepcopy(node_tmpl)
self.nodes[_name]['shortname'] = _name.split(".", 1)[0]
_domains.add(_name.split(".", 1)[1])
self.nodes[_name]['node_group'] = _nc
self.nodes[_name]['role'] = _role
self.nodes[_name]['status'] = _status
_domains = list(_domains)
if len(_domains) > 1:
logger_cli.warning(
"Multiple domains detected: {}".format(",".join(_domains))
)
# TODO: Use domain with biggest node count by default
# or force it via config option
else:
self.domain = _domains[0]
logger_cli.info("-> {} nodes inactive".format(len(self.skip_list)))
logger_cli.info("-> {} nodes collected".format(len(self.nodes)))
# form an all nodes compound string to use in salt
self.active_nodes_compound = self.salt.compound_string_from_list(
filter(
lambda nd: self.nodes[nd]['status'] == NODE_UP,
self.nodes
)
)
# get master node fqdn
# _filtered = filter(
# lambda nd: self.nodes[nd]['role'] == const.all_roles_map['cfg'],
# self.nodes
# )
_role = all_salt_roles_map['cfg']
_filtered = [n for n, v in self.nodes.items() if v['role'] == _role]
if len(_filtered) < 1:
raise SaltException(
"No master node detected! Check/Update node role map."
)
else:
self.salt.master_node = _filtered[0]
# OpenStack versions
self.mcp_release = self.salt.pillar_get(
self.salt.master_node,
"_param:apt_mk_version"
)[self.salt.master_node]
self.openstack_release = self.salt.pillar_get(
self.salt.master_node,
"_param:openstack_version"
)[self.salt.master_node]
# Preload codenames
# do additional queries to get linux codename and arch for each node
self.get_specific_pillar_for_nodes("_param:linux_system_codename")
self.get_specific_pillar_for_nodes("_param:linux_system_architecture")
for _name in self.nodes.keys():
_n = self.nodes[_name]
if _name not in self.skip_list:
_p = _n['pillars']['_param']
_n['linux_codename'] = _p['linux_system_codename']
_n['linux_arch'] = _p['linux_system_architecture']
def get_cmd_for_nodes(self, cmd, target_key, target_dict=None, nodes=None):
"""Function runs. cmd.run and parses result into place
or into dict structure provided
:return: no return value, data pulished internally
"""
logger_cli.debug(
"... collecting results for '{}'".format(cmd)
)
if target_dict:
_nodes = target_dict
else:
_nodes = self.nodes
_result = self.execute_cmd_on_active_nodes(cmd, nodes=nodes)
for node, data in _nodes.items():
if node in self.skip_list:
logger_cli.debug(
"... '{}' skipped while collecting '{}'".format(
node,
cmd
)
)
continue
# Prepare target key
if target_key not in data:
data[target_key] = None
# Save data
if data['status'] in [NODE_DOWN, NODE_SKIP]:
data[target_key] = None
elif node not in _result:
continue
elif not _result[node]:
logger_cli.debug(
"... '{}' not responded after '{}'".format(
node,
self.env_config.salt_timeout
)
)
data[target_key] = None
else:
data[target_key] = _result[node]
def get_specific_pillar_for_nodes(self, pillar_path):
"""Function gets pillars on given path for all nodes
:return: no return value, data pulished internally
"""
logger_cli.debug(
"... collecting node pillars for '{}'".format(pillar_path)
)
_result = self.salt.pillar_get(self.active_nodes_compound, pillar_path)
self.not_responded = []
for node, data in self.nodes.items():
if node in self.skip_list:
logger_cli.debug(
"... '{}' skipped while collecting '{}'".format(
node,
pillar_path
)
)
continue
_pillar_keys = pillar_path.split(':')
_data = data['pillars']
# pre-create nested dict
for idx in range(0, len(_pillar_keys)-1):
_key = _pillar_keys[idx]
if _key not in _data:
_data[_key] = {}
_data = _data[_key]
if data['status'] in [NODE_DOWN, NODE_SKIP]:
_data[_pillar_keys[-1]] = None
elif not _result[node]:
logger_cli.debug(
"... '{}' not responded after '{}'".format(
node,
self.env_config.salt_timeout
)
)
_data[_pillar_keys[-1]] = None
self.not_responded.append(node)
else:
_data[_pillar_keys[-1]] = _result[node]
def prepare_json_on_node(self, node, _dict, filename):
if node in self.skip_list:
logger_cli.debug(
"... '{}' skipped while preparing json file of '{}'".format(
node,
filename
)
)
# this function assumes that all folders are created
_dumps = json.dumps(_dict, indent=2).splitlines()
_storage_path = os.path.join(
self.env_config.salt_file_root, self.env_config.salt_scripts_folder
)
logger_cli.debug(
"... uploading data as '{}' "
"to master's file cache folder: '{}'".format(
filename,
_storage_path
)
)
_cache_path = os.path.join(_storage_path, filename)
_source_path = os.path.join(
'salt://',
self.env_config.salt_scripts_folder,
filename
)
_target_path = os.path.join(
'/root',
self.env_config.salt_scripts_folder,
filename
)
logger_cli.debug("... creating file in cache '{}'".format(_cache_path))
self.salt.f_touch_master(_cache_path)
self.salt.f_append_master(_cache_path, _dumps)
logger.debug("... syncing file to '{}'".format(node))
self.salt.get_file(
node,
_source_path,
_target_path,
tgt_type="compound"
)
return _target_path
def prepare_script_on_active_nodes(self, script_filename):
# Prepare script
_p = os.path.join(pkg_dir, 'scripts', script_filename)
with open(_p, 'rt') as fd:
_script = fd.read().splitlines()
_storage_path = os.path.join(
self.env_config.salt_file_root, self.env_config.salt_scripts_folder
)
logger_cli.debug(
"... uploading script {} "
"to master's file cache folder: '{}'".format(
script_filename,
_storage_path
)
)
self.salt.mkdir(self.salt.master_node, _storage_path)
# Form cache, source and target path
_cache_path = os.path.join(_storage_path, script_filename)
_source_path = os.path.join(
'salt://',
self.env_config.salt_scripts_folder,
script_filename
)
_target_path = os.path.join(
'/root',
self.env_config.salt_scripts_folder,
script_filename
)
logger_cli.debug("... creating file in cache '{}'".format(_cache_path))
self.salt.f_touch_master(_cache_path)
self.salt.f_append_master(_cache_path, _script)
# command salt to copy file to minions
logger_cli.debug(
"... creating script target folder '{}'".format(
_cache_path
)
)
self.salt.mkdir(
self.active_nodes_compound,
os.path.join(
'/root',
self.env_config.salt_scripts_folder
),
tgt_type="compound"
)
logger.debug("... syncing file to nodes")
self.salt.get_file(
self.active_nodes_compound,
_source_path,
_target_path,
tgt_type="compound"
)
# return path on nodes, just in case
return _target_path
def execute_script_on_node(self, node, script_filename, args=[]):
# Prepare path
_target_path = os.path.join(
'/root',
self.env_config.salt_scripts_folder,
script_filename
)
# execute script
logger.debug("... running script on '{}'".format(node))
# handle results for each node
_script_arguments = " ".join(args) if args else ""
self.not_responded = []
_r = self.salt.cmd(
node,
'cmd.run',
param='python {} {}'.format(_target_path, _script_arguments),
expr_form="compound"
)
# all false returns means that there is no response
self.not_responded = [_n for _n in _r.keys() if not _r[_n]]
return _r
def execute_script_on_active_nodes(self, script_filename, args=None):
# Prepare path
_target_path = os.path.join(
'/root',
self.env_config.salt_scripts_folder,
script_filename
)
# execute script
logger_cli.debug("... running script")
# handle results for each node
_script_arguments = args if args else ""
self.not_responded = []
_r = self.salt.cmd(
self.active_nodes_compound,
'cmd.run',
param='python {} {}'.format(_target_path, _script_arguments),
expr_form="compound"
)
# all false returns means that there is no response
self.not_responded = [_n for _n in _r.keys() if not _r[_n]]
return _r
def execute_cmd_on_active_nodes(self, cmd, nodes=None):
# execute cmd
self.not_responded = []
_r = self.salt.cmd(
nodes if nodes else self.active_nodes_compound,
'cmd.run',
param=cmd,
expr_form="compound"
)
# all false returns means that there is no response
self.not_responded = [_n for _n in _r.keys() if not _r[_n]]
return _r
class KubeNodes(Nodes):
def __init__(self, config):
super(KubeNodes, self).__init__(config)
logger_cli.info("# Gathering environment information")
# simple salt rest client
self.kube = get_kube_remote(self.env_config)
self.env_type = ENV_TYPE_KUBE
self._namespace = "qa-space"
self._configmap_name = self.env_config.kube_scripts_folder
# prepare needed resources
self._check_namespace()
self._scripts = self._check_config_map()
self.prepared_daemonsets = []
def _check_namespace(self):
# ensure namespace
logger_cli.debug(
"... checking namespace '{}'".format(self._namespace)
)
if not self.kube.ensure_namespace(self._namespace):
raise KubeException(
"Failed to manage namespace '{}'".format(self._namespace)
)
def _check_config_map(self):
# ensure config map exists
logger_cli.debug(
"... checking config map '{}'".format(self._configmap_name)
)
_source = os.path.join(pkg_dir, 'scripts')
return self.kube.create_config_map(
self._namespace,
self._configmap_name,
_source
)
def gather_node_info(self, skip_list, skip_list_file):
# Gather nodes info and query pod lists for each node
logger_cli.debug("... collecting node names existing in the cloud")
# Gather node names and info
_nodes = self.kube.get_node_info()
_node_names = list(_nodes.keys())
# Skip nodes if needed
_skipped_nodes = \
_prepare_skipped_nodes(_node_names, skip_list, skip_list_file)
# Count how many nodes active
self._active = [n for n, v in _nodes.items()
if v['conditions']['ready']['status']]
# iterate through all accepted nodes and create a dict for it
self.nodes = {}
self.skip_list = []
for _name in _node_names:
if _name in _skipped_nodes:
_status = NODE_SKIP
self.skip_list.append(_name)
else:
_status = NODE_UP if _name in self._active else NODE_DOWN
if _status == NODE_DOWN:
self.skip_list.append(_name)
logger_cli.info(
"-> '{}' shows 'Ready' as 'False', "
"added to skip list".format(
_name
)
)
_roles = {}
_labels = {}
for _label, _value in _nodes[_name]['labels'].items():
if _label in all_kube_roles_map:
_roles[all_kube_roles_map[_label]] = _value
else:
_labels[_label] = _value
self.nodes[_name] = deepcopy(node_tmpl)
self.nodes[_name].pop("grains")
self.nodes[_name].pop("pillars")
# hostname
self.nodes[_name]['shortname'] = \
_nodes[_name]['addresses']['hostname']['address']
self.nodes[_name]['internalip'] = \
_nodes[_name]['addresses']['internalip']['address']
self.nodes[_name]['node_group'] = None
self.nodes[_name]['labels'] = _labels
self.nodes[_name]['roles'] = _roles
self.nodes[_name]['status'] = _status
# Backward compatibility
_info = _nodes[_name]['status']['node_info']
self.nodes[_name]['linux_image'] = _info['os_image']
self.nodes[_name]['linux_arch'] = _info['architecture']
_codename = "unknown"
_n, _v, _c = _info['os_image'].split()
if _n.lower() == 'ubuntu':
_v, _, _ = _v.rpartition('.') if '.' in _v else (_v, "", "")
if _v in ubuntu_versions:
_codename = ubuntu_versions[_v].split()[0].lower()
self.nodes[_name]['linux_codename'] = _codename
# Consider per-data type transfer
self.nodes[_name]["raw"] = _nodes[_name]
# TODO: Investigate how to handle domains in Kube, probably - skip
# _domains = list(_domains)
# if len(_domains) > 1:
# logger_cli.warning(
# "Multiple domains detected: {}".format(",".join(_domains))
# )
# else:
self.domain = "no.domain.in.kube.yet"
logger_cli.info(
"-> {} nodes collected: {} - active, {} - not active".format(
len(self.nodes),
len(self._active),
len(self.skip_list)
)
)
_role = "k8s-master"
_filtered = [n for n, v in self.nodes.items() if _role in v['roles']]
if len(_filtered) < 1:
raise KubeException(
"No k8s-master nodes detected! Check/Update node role map."
)
else:
_r = [n for n, v in self.nodes.items()
if v['status'] != NODE_UP and _role in v['roles']]
if len(_r) > 0:
logger_cli.warn(
"Master nodes are reporting 'NotReady:\n{}".format(
"\n".join(_r)
)
)
self.kube.master_node = _filtered[0]
# get specific data upfront
# OpenStack versions
self.mcp_release = ""
# Quick and Dirty way to detect OS release
try:
_nova_version = self.kube.exec_on_target_pod(
"nova-manage --version",
"nova-api-osapi",
"openstack"
)
_nmajor = _nova_version.partition('.')[0]
self.openstack_release = nova_openstack_versions[_nmajor]
except KubeException as e:
logger_cli.warn("Openstack not detected: {}".format(e.message))
self.openstack_release = nova_openstack_versions["00"]
return
@staticmethod
def _get_ssh_shell(_h, _u, _k, _p, _q, _pipe, timeout=15):
_ssh = SshShell(
_h,
user=_u,
keypath=_k,
port=_p,
silent=_q,
piped=_pipe,
timeout=timeout
)
return _ssh.connect()
@staticmethod
def _do_ssh_cmd(_cmd, _h, _u, _k, _p, _q, _pipe, timeout=None):
with SshShell(
_h,
user=_u,
keypath=_k,
port=_p,
silent=_q,
piped=_pipe
) as ssh:
if timeout is None:
_r = ssh.do(_cmd)
else:
_r = ssh.do(_cmd, timeout=timeout)
logger_cli.debug("'{}'".format(_r))
return _r
def node_shell(
self,
node,
silent=True,
piped=True,
use_sudo=True,
fport=None
):
_u = self.env_config.kube_node_user
_k = self.env_config.kube_node_keypath
_h = self.nodes[node]['internalip']
_p = 22
if self.kube.is_local or self.kube.config.ssh_direct:
logger.debug("Getting shell with no port forward")
return [None, self._get_ssh_shell(
_h, _u, _k, _p, silent, piped,
timeout=self.kube.config.ssh_connect_timeout
)]
else:
logger.debug("Getting shell with with forward")
_fh = "localhost"
_p = 10022 if not fport else fport
_pfwd = PortForward(
self.env_config.ssh_host,
_h,
user=_u,
keypath=self.env_config.ssh_key,
loc_port=_p,
timeout=self.kube.config.ssh_connect_timeout
)
_pfwd.connect()
_ssh = self._get_ssh_shell(
_fh,
_u,
_k,
_p,
silent,
piped,
timeout=self.kube.config.ssh_connect_timeout
)
return [_pfwd, _ssh]
def execute_script_on_node(self, node, script_filename, args=[]):
# Prepare path
_target_path = os.path.join(
self.env_config.kube_node_homepath,
self.env_config.kube_scripts_folder,
script_filename
)
# execute script
logger_cli.debug("... running script on '{}'".format(node))
# handle results for each node
_script_arguments = " ".join(args) if args else ""
self.not_responded = []
# get result
_nr = self.node_shell(
node,
"python {} {}".format(
_target_path,
_script_arguments
)
)
if not _nr:
self.not_responded.append(node)
return {}
else:
return {node: _nr}
def execute_cmd_on_active_nodes(self, cmd, nodes=None):
# execute script
logger_cli.debug("...running '{}' on active nodes".format(cmd))
# handle results for each node
self.not_responded = []
_r = {}
# TODO: Use threading and pool
for node in self._active:
_nr = self.node_shell(
node,
cmd
)
if not _nr:
self.not_responded.append(node)
else:
_r[node] = _nr
return _r
def _ssh_exec_script(self, params):
"""
Threadsafe method to get shell to node,
check/copy script and get results
[
node_name,
src_path,
tgt_path,
conf,
args
]
"""
_timeout = self.kube.config.script_execution_timeout
_name = params[0]
_src = params[1]
_tgt = params[2]
_conf = params[3]
_args = params[4]
_port = params[5]
_log_name = "["+_name+"]:"
_check = "echo $(if [[ -s '{}' ]]; then echo True; " \
"else echo False; fi)"
_fwd_sh, _sh = self.node_shell(
_name,
use_sudo=False,
fport=_port
)
# check python3
_python = _sh.do("which python3")
_python = utils.to_bool(
_sh.do(_check.format(_python))
)
if not _python:
_sh.do("apt install python3", sudo=True, timeout=_timeout)
# check if script already there
_folder = os.path.join(
self.env_config.kube_node_homepath,
_conf.kube_scripts_folder
)
# check if folder exists
_folder_exists = utils.to_bool(
_sh.do(_check.format(_folder))
)
if not _folder_exists:
_sh.do("mkdir " + _folder)
logger.info("{} Syncing file".format(_log_name))
_code, _r, _e = _sh.scp(
_src,
_sh.get_host_path(_tgt),
)
# handle error code
if _code:
logger_cli.warn(
"{} Error in scp:\n"
"\tstdout:'{}'\n"
"\tstderr:'{}'".format(_log_name, _r, _e)
)
# execute script
logger.debug("{} Running script".format(_log_name))
_out = _sh.do(
"python3 {}{}".format(
_tgt,
_args
),
sudo=True,
timeout=_timeout
)
if _fwd_sh:
_fwd_sh.kill()
_sh.kill()
return [_name, _out]
def execute_script_on_active_nodes(self, script_filename, args=None):
# Prepare script
_source_path = os.path.join(pkg_dir, 'scripts', script_filename)
_target_path = os.path.join(
self.env_config.kube_node_homepath,
self.env_config.kube_scripts_folder,
script_filename
)
# handle results for each node
_script_arguments = " ".join(args) if args else ""
if _script_arguments:
_script_arguments = " " + _script_arguments
self.not_responded = []
_results = {}
logger_cli.debug(
"... running '{}' on active nodes, {} worker threads".format(
script_filename,
self.env_config.threads
)
)
# Workers pool
pool = Pool(self.env_config.threads)
# init the parameters
# node_name,
# src_path,
# tgt_path,
# conf,
# args
_params = []
_port = 10022
for node in self._active:
# build parameter blocks
_p_list = [
node,
_source_path,
_target_path,
self.env_config,
_script_arguments,
_port
]
_params.append(_p_list)
_port += 1
_progress = Progress(len(_params))
results = pool.imap_unordered(self._ssh_exec_script, _params)
for ii in enumerate(results, start=1):
if not ii[1][1]:
self.not_responded.append(ii[1][0])
else:
_results[ii[1][0]] = ii[1][1]
_progress.write_progress(ii[0])
_progress.end()
pool.close()
pool.join()
# return path on nodes, just in case
return _results
def prepare_json_on_node(self, node, _dict, filename):
# this function assumes that all folders are created
_dumps = json.dumps(_dict, indent=2).splitlines()
_source_path = create_temp_file_with_content(_dumps)
_target_path = os.path.join(
self.env_config.kube_node_homepath,
self.env_config.kube_scripts_folder,
filename
)
_folder = os.path.join(
self.env_config.kube_node_homepath,
self.env_config.kube_scripts_folder
)
_check = "echo $(if [[ -s '{}' ]]; then echo True; " \
"else echo False; fi)"
_fwd_sh, _sh = self.node_shell(
node,
use_sudo=False
)
# check if folder exists
_folder_exists = utils.to_bool(
_sh.do(_check.format(_folder))
)
if not _folder_exists:
_sh.do("mkdir " + _folder)
logger_cli.debug(
"... create data on node '{}':'{}'".format(node, _target_path)
)
_code, _r, _e = _sh.scp(
_source_path,
_sh.get_host_path(_target_path),
)
# handle error code
if _code:
logger_cli.warn(
"Error in scp:\n"
"\tstdout:'{}'\n"
"\tstderr:'{}'".format(_r, _e)
)
_fwd_sh.kill()
_sh.kill()
return _target_path
def prepare_daemonset(self, template_filename):
# load template
_yaml_file = os.path.join(pkg_dir, 'templates', template_filename)
logger_cli.debug("... loading template '{}'".format(_yaml_file))
_ds = {}
with open(_yaml_file) as dsFile:
_ds = yaml.load(dsFile, Loader=yaml.SafeLoader)
# Add scripts to pod template as volumeMounts
_tspec = _ds['spec']['template']['spec']
_tspec['containers'][0]['volumeMounts'] = [
{
"name": "scripts",
"mountPath": os.path.join(
"/",
self.env_config.kube_scripts_folder
)
}
]
_tspec['volumes'] = [
{
"name": "scripts",
"configMap": {
"name": self._configmap_name
}
}
]
# create daemonset
logger_cli.debug("... preparing daemonset")
_ds = self.kube.prepare_daemonset_from_yaml(self._namespace, _ds)
# Save prepared daemonset
self.prepared_daemonsets.append(_ds)
# return it
return _ds
def wait_for_daemonset(self, ds, timeout=120):
# iteration timeout
_sleep_time = 5
_timeout = timeout
# query daemonset and check that desired=scheduled=ready
_ds = self.kube.get_daemon_set_by_name(
ds.metadata.namespace,
ds.metadata.name
)
_total = len(self.nodes)
# _scheduled = _ds.status.scheduled
# _ready = _ds.status.ready
# Init Progress bar to show daemonset readiness
_progress = Progress(_total)
while _timeout > 0:
# get new status
_ds = self.kube.get_daemon_set_by_name(
ds.metadata.namespace,
ds.metadata.name
)
_desired = _ds.status.desired_number_scheduled
_scheduled = _ds.status.current_number_scheduled
_ready = _ds.status.number_ready
_updated = _ds.status.updated_number_scheduled
# print it
_progress.write_progress(
_ready,
note="desired: {}, scheduled: {}, ready: {},"
" up-to-date: {}".format(
_desired,
_scheduled,
_ready,
_updated
)
)
# check values and return
# In case of Update, also checking _updated value
if _ready == _updated and _ready == _total:
# close progress bar class
_progress.end()
logger_cli.debug("... daemonset is ready")
return True
# iterate
_timeout -= _sleep_time
# wait
sleep(_sleep_time)
# timed out
_progress.end()
# log it
logger_cli.error("Timed out waiting for Daemonset to be ready")
return False
def exec_script_on_target_pod(self, pod_name, script_filename, args=None):
"""
Run script from configmap on target pod assuming it is present
"""
_arguments = args if args else ""
_cmd = [
"python3",
os.path.join(
"/",
self.env_config.kube_scripts_folder,
script_filename
)
] + _arguments
_result = self.kube.exec_on_target_pod(
_cmd,
pod_name,
self._namespace,
strict=True
)
return _result
def exec_cmd_on_target_pod(self, pod_name, ns, command_str):
"""
Run script from configmap on target pod assuming it is present
"""
_result = self.kube.exec_on_target_pod(
command_str,
pod_name,
ns,
strict=True
)
return _result
def execute_cmd_on_daemon_set(
self,
ds,
cmd,
_args=None,
is_script=False
):
"""
Query daemonset for pods and execute script on all of them
"""
_results = self.exec_cmd_on_pods(
self.kube.get_pods_for_daemonset(ds),
cmd,
_args=_args,
is_script=is_script
)
# Update results
_ds_results = {}
for _n, _, _v in _results:
_ds_results[_n] = _v
return _ds_results
def exec_on_labeled_pods_and_ns(self, label_str, cmd, _args=None, ns=None):
if not ns:
ns = self._namespace
_results = self.exec_cmd_on_pods(
self.kube.list_pods(ns, label_str=label_str),
cmd,
_args=_args
)
_pod_results = {}
for _, _p, _v in _results:
_pod_results[_p] = _v
return _pod_results
def exec_cmd_on_pods(
self,
pod_list,
cmd,
_args=None,
is_script=False
):
def _kube_exec_on_pod(plist):
return [
plist[1], # node
plist[3], # pod name
plist[0].kube.exec_on_target_pod( # pointer to function
plist[4], # cmd
plist[3], # pod name
plist[2], # namespace
strict=True,
_request_timeout=120,
arguments=plist[5]
)
]
# Create map for threads: [[node_name, ns, pod_name, cmd]...]
logger_cli.debug(
"... runnning script on {} pods using {} threads at a time".format(
len(pod_list.items),
self.env_config.threads
)
)
_plist = []
_arguments = _args if _args else ""
if is_script:
_cmd = [
"python3",
os.path.join(
"/",
self.env_config.kube_scripts_folder,
cmd
),
_arguments
]
_cmd = " ".join(_cmd)
else:
# decide if we are to wrap it to bash
if '|' in cmd:
_cmd = "bash -c"
_arguments = cmd
else:
_cmd = cmd
for item in pod_list.items:
_plist.append(
[
self,
item.spec.node_name,
item.metadata.namespace,
item.metadata.name,
_cmd,
_arguments
]
)
# map func and cmd
pool = Pool(self.env_config.threads)
_results = []
self.not_responded = []
# create result list
_progress = Progress(len(_plist))
ret = pool.imap_unordered(_kube_exec_on_pod, _plist)
for ii in enumerate(ret, start=1):
if not ii[1][1]:
self.not_responded.append(ii[1][0])
else:
_results.append(ii[1])
_progress.write_progress(ii[0])
_progress.end()
pool.close()
pool.join()
logger_cli.debug(
"... done, {} total outputs; {} not responded".format(
len(_results),
len(self.not_responded)
)
)
return _results
def delete_daemonset(self, ds):
# Try to delete daemonset
try:
_r = self.kube.delete_daemon_set_by_name(
ds.metadata.namespace,
ds.metadata.name
)
except Exception as e:
logger_cli.warning("Failed to delete daemonset '{}': {}".format(
ds.metadata.name,
e.reason
))
_r = None
return _r
def get_pod_name_in_daemonset_by_node(self, nodename, daemonset):
_podname = None
_pods = self.kube.get_pods_for_daemonset(daemonset)
for item in _pods.items:
if item.spec.node_name == nodename:
_podname = item.metadata.name
return _podname
def prepare_json_in_pod(self, podname, namespace, targets, filename):
# Iterate pods in daemonset and prepare json file on each one
_target_path = os.path.join(
"/",
"tmp",
filename
)
# check folder will probably not needed as the daemonset links
# configmap there on creation
# _folder = os.path.join(
# self.env_config.kube_node_homepath,
# self.env_config.kube_scripts_folder
# )
# prepare data
buffer = json.dumps(targets, indent=2).encode('utf-8')
# write data to pod using fancy websocket function
self.kube.put_string_buffer_to_pod_as_textfile(
podname,
namespace,
buffer,
_target_path
)
# TODO: Exception handling
return _target_path
def get_cmd_for_nodes(self, cmd, target_key, target_dict=None, nodes=None):
"""Function runs command on daemonset and parses result into place
or into dict structure provided
:return: no return value, data pulished internally
"""
logger_cli.debug(
"... collecting results for '{}'".format(cmd)
)
if target_dict:
_nodes = target_dict
else:
_nodes = self.nodes
# Dirty way to get daemonset that was used in checker and not deleted
_ds = self.prepared_daemonsets[0]
_result = self.execute_cmd_on_daemon_set(_ds, cmd)
for node, data in _nodes.items():
if node in self.skip_list:
logger_cli.debug(
"... '{}' skipped while collecting '{}'".format(
node,
cmd
)
)
continue
# Prepare target key
if target_key not in data:
data[target_key] = None
# Save data
if data['status'] in [NODE_DOWN, NODE_SKIP]:
data[target_key] = None
elif node not in _result:
continue
elif not _result[node]:
logger_cli.debug(
"... '{}' not responded after '{}'".format(
node,
self.env_config.salt_timeout
)
)
data[target_key] = None
else:
data[target_key] = _result[node]
def prepare_benchmark_agent(self, index, path, sc, size, template):
# Load pod template
_yaml_file = os.path.join(pkg_dir, 'templates', template)
logger_cli.debug("... loading template '{}'".format(_yaml_file))
_pod = {}
with open(_yaml_file) as podFile:
_pod = yaml.load(podFile, Loader=yaml.SafeLoader)
# set namings
_n = "cfgagent-{:02}".format(index)
_pvc_n = "cfgagent-pvc-{:02}".format(index)
# _pv_n = "cfgagent-pv-{:02}".format(index)
_pod["metadata"]["name"] = _n
_pod["metadata"]["labels"]["name"] = _n
# replace volumeMounts
for _c in _pod["spec"]["containers"]:
for _mnt in _c["volumeMounts"]:
if "placeholder" in _mnt["name"]:
# _mnt["name"] = _pv_n
_mnt["mountPath"] = path
# replace claim
for _v in _pod["spec"]["volumes"]:
if "cfgagent-pv" in _v["name"]:
# _v["name"] = _pv_n
_v["persistentVolumeClaim"]["claimName"] = _pvc_n
# init volume resources
# _pv_object = self.kube.init_pv_resource(_pv_n, sc, size, path)
# _pv = self.kube.prepare_pv(_pv_object)
# update size of the volume to be 15% larger
_pvc_object = self.kube.init_pvc_resource(_pvc_n, sc, size)
_pvc = self.kube.prepare_pvc(_pvc_object)
# start pod
_pod = self.kube.prepare_pod_from_yaml(_pod)
# return _pod, _pv, _pvc
return _pod, _pvc
def expose_benchmark_agent(self, agent):
return self.kube.expose_pod_port(agent, 8765)
def cleanup_resource_by_name(self, res_type, name, ns=None, wait=False):
"""Cleansup resource using string res_type and the ns/name
Args:
res_type (string): resource type name: pod, pv, pvc, svc
name (string): resource name to cleanup
ns (string, optional): Namespace to use. Default is 'qa-space'
return: (Bool) Is Success?
"""
# fill defaults
if not ns:
ns = self._namespace
# Handle res_type errors and choose resource type
if not res_type:
logger_cli.debug(
"... resource type invalid: '{}'".format(res_type)
)
return False
elif not name:
logger_cli.debug("... resource name invalid: '{}'".format(name))
return False
elif res_type == "svc":
# Delete service
logger_cli.info("-> deleting svc {}/{}".format(ns, name))
self.kube.CoreV1.delete_namespaced_service(name, ns)
# TODO: Check if successfull
elif res_type == "pod":
# Delete a pod
logger_cli.info("-> deleting pod {}/{}".format(ns, name))
self.kube.CoreV1.delete_namespaced_pod(name, ns)
if wait:
self.kube.wait_for_phase(res_type, name, ns, ["Terminated"])
elif res_type == "pvc":
logger_cli.info("-> deleting pvc {}/{}".format(ns, name))
self.kube.CoreV1.delete_namespaced_persistent_volume_claim(
name,
ns
)
if wait:
self.kube.wait_for_phase(res_type, name, ns, ["Terminated"])
elif res_type == "pv":
logger_cli.info("-> deleting pv {}/{}".format(ns, name))
self.kube.CoreV1.delete_persistent_volume(name)
if wait:
self.kube.wait_for_phase(res_type, name, None, ["Terminated"])
return True
def get_resource_phase_by_name(self, typ, name, ns="qa-space"):
if typ == "pod":
_t = self.kube.get_pod_by_name_and_ns(name, ns)
elif typ == "svc":
_t = self.kube.get_svc_by_name_and_ns(name, ns)
elif typ == "pvc":
_t = self.kube.get_pvc_by_name_and_ns(name, ns)
elif typ == "pv":
_t = self.kube.get_pv_by_name(name)
else:
logger_cli.error("ERROR: '{}' is not supported yet".format(typ))
return None
if _t:
return _t.status.phase
else:
return None
def list_resource_names_by_type_and_ns(self, typ, ns="qa-space"):
if typ == "pod":
_items = self.kube.list_pods(ns)
elif typ == "svc":
_items = self.kube.list_svc(ns)
elif typ == "pvc":
_items = self.kube.list_pvc(ns)
elif typ == "pv":
_items = self.kube.list_pv()
else:
logger_cli.error("ERROR: '{}' is not supported yet".format(typ))
return None
return [[i.metadata.namespace, i.metadata.name] for i in _items.items]
def get_logs_for_pod(self, podname, namespace):
return self.kube.get_pod_logs(podname, namespace)