blob: 86e59a57890ef41759fba4b34bddba1d575fff77 [file] [log] [blame]
"""
Module to handle interaction with Kube
"""
import base64
import os
import urllib3
import yaml
from kubernetes import client as kclient, config as kconfig
from kubernetes.stream import stream
from cfg_checker.common import logger, logger_cli
from cfg_checker.common.exception import InvalidReturnException, KubeException
from cfg_checker.common.file_utils import create_temp_file_with_content
from cfg_checker.common.other import utils, shell
from cfg_checker.common.ssh_utils import ssh_shell_p
from cfg_checker.common.const import ENV_LOCAL
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def _init_kube_conf_local(config):
# Init kube library locally
_path = "local:{}".format(config.kube_config_path)
try:
kconfig.load_kube_config(config_file=config.kube_config_path)
if config.insecure:
kconfig.assert_hostname = False
kconfig.client_side_validation = False
logger_cli.debug(
"... found Kube env: core, {}". format(
",".join(
kclient.CoreApi().get_api_versions().versions
)
)
)
return kconfig, kclient.ApiClient(), _path
except Exception as e:
logger.warn("Failed to init local Kube client: {}".format(
str(e)
)
)
return None, None, _path
def _init_kube_conf_remote(config):
# init remote client
# Preload Kube token
"""
APISERVER=$(kubectl config view --minify |
grep server | cut -f 2- -d ":" | tr -d " ")
SECRET_NAME=$(kubectl get secrets |
grep ^default | cut -f1 -d ' ')
TOKEN=$(kubectl describe secret $SECRET_NAME |
grep -E '^token' | cut -f2 -d':' | tr -d " ")
echo "Detected API Server at: '${APISERVER}'"
echo "Got secret: '${SECRET_NAME}'"
echo "Loaded token: '${TOKEN}'"
curl $APISERVER/api
--header "Authorization: Bearer $TOKEN" --insecure
"""
import yaml
_path = ''
# Try to load remote config only if it was not detected already
if not config.kube_config_detected and not config.env_name == ENV_LOCAL:
_path = "{}@{}:{}".format(
config.ssh_user,
config.ssh_host,
config.kube_config_path
)
_c_data = ssh_shell_p(
"cat " + config.kube_config_path,
config.ssh_host,
username=config.ssh_user,
keypath=config.ssh_key,
piped=False,
use_sudo=config.ssh_uses_sudo,
)
else:
_path = "local:{}".format(config.kube_config_path)
with open(config.kube_config_path, 'r') as ff:
_c_data = ff.read()
if len(_c_data) < 1:
return None, None, _path
_conf = yaml.load(_c_data, Loader=yaml.SafeLoader)
_kube_conf = kclient.Configuration()
# A remote host configuration
# To work with remote cluster, we need to extract these
# keys = ['host', 'ssl_ca_cert', 'cert_file', 'key_file', 'verify_ssl']
# When v12 of the client will be release, we will use load_from_dict
_kube_conf.ssl_ca_cert = create_temp_file_with_content(
base64.standard_b64decode(
_conf['clusters'][0]['cluster']['certificate-authority-data']
)
)
_host = _conf['clusters'][0]['cluster']['server']
_kube_conf.cert_file = create_temp_file_with_content(
base64.standard_b64decode(
_conf['users'][0]['user']['client-certificate-data']
)
)
_kube_conf.key_file = create_temp_file_with_content(
base64.standard_b64decode(
_conf['users'][0]['user']['client-key-data']
)
)
if "http" not in _host or "443" not in _host:
logger_cli.error(
"Failed to extract Kube host: '{}'".format(_host)
)
else:
logger_cli.debug(
"... 'context' host extracted: '{}' via SSH@{}".format(
_host,
config.ssh_host
)
)
# Substitute context host to ours
_tmp = _host.split(':')
_kube_conf.host = \
_tmp[0] + "://" + config.mcp_host + ":" + _tmp[2]
config.kube_port = _tmp[2]
logger_cli.debug(
"... kube remote host updated to {}".format(
_kube_conf.host
)
)
_kube_conf.verify_ssl = False
_kube_conf.debug = config.debug
if config.insecure:
_kube_conf.assert_hostname = False
_kube_conf.client_side_validation = False
# Nevertheless if you want to do it
# you can with these 2 parameters
# configuration.verify_ssl=True
# ssl_ca_cert is the filepath
# to the file that contains the certificate.
# configuration.ssl_ca_cert="certificate"
# _kube_conf.api_key = {
# "authorization": "Bearer " + config.kube_token
# }
# Create a ApiClient with our config
_kube_api = kclient.ApiClient(_kube_conf)
return _kube_conf, _kube_api, _path
class KubeApi(object):
def __init__(self, config):
self.config = config
self.initialized = self._init_kclient()
self.last_response = None
def _init_kclient(self):
# if there is no password - try to get local, if this available
logger_cli.debug("... init kube config")
if self.config.env_name == "local":
self.kConf, self.kApi, self.kConfigPath = _init_kube_conf_local(
self.config
)
self.is_local = True
# Try to load local config data
if self.config.kube_config_path and \
os.path.exists(self.config.kube_config_path):
_cmd = "cat " + self.config.kube_config_path
_c_data = shell(_cmd)
_conf = yaml.load(_c_data, Loader=yaml.SafeLoader)
self.user_keypath = create_temp_file_with_content(
base64.standard_b64decode(
_conf['users'][0]['user']['client-key-data']
)
)
self.yaml_conf = _c_data
else:
self.kConf, self.kApi, self.kConfigPath = _init_kube_conf_remote(
self.config
)
self.is_local = False
if self.kConf is None or self.kApi is None:
return False
else:
return True
def get_versions_api(self):
# client.CoreApi().get_api_versions().versions
return kclient.VersionApi(self.kApi)
class KubeRemote(KubeApi):
def __init__(self, config):
super(KubeRemote, self).__init__(config)
self._coreV1 = None
self._appsV1 = None
self._podV1 = None
@property
def CoreV1(self):
if not self._coreV1:
self._coreV1 = kclient.CoreV1Api(self.kApi)
return self._coreV1
@property
def AppsV1(self):
if not self._appsV1:
self._appsV1 = kclient.AppsV1Api(self.kApi)
return self._appsV1
@property
def PodsV1(self):
if not self._podsV1:
self._podsV1 = kclient.V1Pod(self.kApi)
return self._podsV1
@staticmethod
def _typed_list_to_dict(i_list):
_dict = {}
for _item in i_list:
_d = _item.to_dict()
_type = _d.pop("type")
_dict[_type.lower()] = _d
return _dict
@staticmethod
def _get_listed_attrs(items, _path):
_list = []
for _n in items:
_list.append(utils.rgetattr(_n, _path))
return _list
@staticmethod
def safe_get_item_by_name(api_resource, _name):
for item in api_resource.items:
if item.metadata.name == _name:
return item
return None
def get_node_info(self, http=False):
# Query API for the nodes and do some presorting
_nodes = {}
if http:
_raw_nodes = self.CoreV1.list_node_with_http_info()
else:
_raw_nodes = self.CoreV1.list_node()
if not isinstance(_raw_nodes, kclient.models.v1_node_list.V1NodeList):
raise InvalidReturnException(
"Invalid return type: '{}'".format(type(_raw_nodes))
)
for _n in _raw_nodes.items:
_name = _n.metadata.name
_d = _n.to_dict()
# parse inner data classes as dicts
_d['addresses'] = self._typed_list_to_dict(_n.status.addresses)
_d['conditions'] = self._typed_list_to_dict(_n.status.conditions)
# Update 'status' type
if isinstance(_d['conditions']['ready']['status'], str):
_d['conditions']['ready']['status'] = utils.to_bool(
_d['conditions']['ready']['status']
)
# Parse image names?
# TODO: Here is the place where we can parse each node image names
# Parse roles
_d['labels'] = {}
for _label, _data in _d["metadata"]["labels"].items():
if _data.lower() in ["true", "false"]:
_d['labels'][_label] = utils.to_bool(_data)
else:
_d['labels'][_label] = _data
# Save
_nodes[_name] = _d
# debug report on how many nodes detected
logger_cli.debug("...node items returned '{}'".format(len(_nodes)))
return _nodes
def exec_on_target_pod(
self,
cmd,
pod_name,
namespace,
strict=False,
_request_timeout=120,
**kwargs
):
if not strict:
logger_cli.debug(
"... searching for pods with the name '{}'".format(pod_name)
)
_pods = {}
_pods = self._coreV1.list_namespaced_pod(namespace)
_names = self._get_listed_attrs(_pods.items, "metadata.name")
_pname = ""
_pnames = [n for n in _names if n.startswith(pod_name)]
if len(_pnames) > 1:
logger_cli.debug(
"... more than one pod found for '{}': {}\n"
"... using first one".format(
pod_name,
", ".join(_pnames)
)
)
_pname = _pnames[0]
elif len(_pname) < 1:
raise KubeException("No pods found for '{}'".format(pod_name))
else:
_pname = pod_name
logger_cli.debug(
"... cmd: [CoreV1] exec {} -n {} -- {}".format(
_pname,
namespace,
cmd
)
)
# Set preload_content to False to preserve JSON
# If not, output gets converted to str
# Which causes to change " to '
# After that json.loads(...) fail
_pod_stream = stream(
self.CoreV1.connect_get_namespaced_pod_exec,
_pname,
namespace,
command=cmd.split(),
stderr=True,
stdin=False,
stdout=True,
tty=False,
_request_timeout=_request_timeout,
_preload_content=False,
**kwargs
)
# run for timeout
_pod_stream.run_forever(timeout=_request_timeout)
# read the output
return _pod_stream.read_stdout()
def ensure_namespace(self, ns):
"""
Ensure that given namespace exists
"""
# list active namespaces
_v1NamespaceList = self.CoreV1.list_namespace()
_ns = self.safe_get_item_by_name(_v1NamespaceList, ns)
if _ns is None:
logger_cli.debug("... creating namespace '{}'".format(ns))
_r = self.CoreV1.create_namespace(ns)
# TODO: check return on fail
if not _r:
return False
else:
logger_cli.debug("... found existing namespace '{}'".format(ns))
return True
def get_daemon_set_by_name(self, ns, name):
return self.safe_get_item_by_name(
self.AppsV1.list_namespaced_daemon_set(ns),
name
)
def create_config_map(self, ns, name, source, recreate=True):
"""
Creates/Overwrites ConfigMap in working namespace
"""
# Prepare source
logger_cli.debug(
"... preparing config map '{}/{}' with files from '{}'".format(
ns,
name,
source
)
)
_data = {}
if os.path.isfile(source):
# populate data with one file
with open(source, 'rt') as fS:
_data[os.path.split(source)[1]] = fS.read()
elif os.path.isdir(source):
# walk dirs and populate all 'py' files
for path, dirs, files in os.walk(source):
_e = ('.py')
_subfiles = (_fl for _fl in files
if _fl.endswith(_e) and not _fl.startswith('.'))
for _file in _subfiles:
with open(os.path.join(path, _file), 'rt') as fS:
_data[_file] = fS.read()
_cm = kclient.V1ConfigMap()
_cm.metadata = kclient.V1ObjectMeta(name=name, namespace=ns)
_cm.data = _data
logger_cli.debug(
"... prepared config map with {} scripts".format(len(_data))
)
# Query existing configmap, delete if needed
_existing_cm = self.safe_get_item_by_name(
self.CoreV1.list_namespaced_config_map(namespace=ns),
name
)
if _existing_cm is not None:
self.CoreV1.replace_namespaced_config_map(
namespace=ns,
name=name,
body=_cm
)
logger_cli.debug(
"... replaced existing config map '{}/{}'".format(
ns,
name
)
)
else:
# Create it
self.CoreV1.create_namespaced_config_map(
namespace=ns,
body=_cm
)
logger_cli.debug("... created config map '{}/{}'".format(
ns,
name
))
return _data.keys()
def prepare_daemonset_from_yaml(self, ns, ds_yaml):
_name = ds_yaml['metadata']['name']
_ds = self.get_daemon_set_by_name(ns, _name)
if _ds is not None:
logger_cli.debug(
"... found existing daemonset '{}'".format(_name)
)
_r = self.AppsV1.replace_namespaced_daemon_set(
_ds.metadata.name,
_ds.metadata.namespace,
body=ds_yaml
)
logger_cli.debug(
"... replacing existing daemonset '{}'".format(_name)
)
return _r
else:
logger_cli.debug(
"... creating daemonset '{}'".format(_name)
)
_r = self.AppsV1.create_namespaced_daemon_set(ns, body=ds_yaml)
return _r
def delete_daemon_set_by_name(self, ns, name):
return self.AppsV1.delete_namespaced_daemon_set(name, ns)
def exec_on_all_pods(self, pods):
"""
Create multiple threads to execute script on all target pods
"""
# Create map for threads: [[node_name, ns, pod_name]...]
_pod_list = []
for item in pods.items:
_pod_list.append(
[
item.spec.nodeName,
item.metadata.namespace,
item.metadata.name
]
)
# map func and cmd
# create result list
return []