blob: 22eee30b7838ba649e3ee9a8383c5749acb4a760 [file] [log] [blame]
"""
Module to handle interaction with Kube
"""
import base64
import os
import urllib3
import yaml
from kubernetes import client as kclient, config as kconfig, watch
from kubernetes.stream import stream
from kubernetes.client.rest import ApiException
from time import time, sleep
from cfg_checker.common import logger, logger_cli
from cfg_checker.common.decorators import retry
from cfg_checker.common.exception import CheckerException, \
InvalidReturnException, KubeException
from cfg_checker.common.file_utils import create_temp_file_with_content
from cfg_checker.common.other import utils, shell
from cfg_checker.common.ssh_utils import ssh_shell_p
from cfg_checker.common.const import ENV_LOCAL
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def _init_kube_conf_local(config):
# Init kube library locally
_path = "local:{}".format(config.kube_config_path)
try:
kconfig.load_kube_config(config_file=config.kube_config_path)
if config.insecure:
kconfig.assert_hostname = False
kconfig.client_side_validation = False
logger_cli.debug(
"... found Kube env: core, {}". format(
",".join(
kclient.CoreApi().get_api_versions().versions
)
)
)
return kconfig, kclient.ApiClient(), _path
except Exception as e:
logger.warn("Failed to init local Kube client: {}".format(
str(e)
)
)
return None, None, _path
def _init_kube_conf_remote(config):
# init remote client
# Preload Kube token
"""
APISERVER=$(kubectl config view --minify |
grep server | cut -f 2- -d ":" | tr -d " ")
SECRET_NAME=$(kubectl get secrets |
grep ^default | cut -f1 -d ' ')
TOKEN=$(kubectl describe secret $SECRET_NAME |
grep -E '^token' | cut -f2 -d':' | tr -d " ")
echo "Detected API Server at: '${APISERVER}'"
echo "Got secret: '${SECRET_NAME}'"
echo "Loaded token: '${TOKEN}'"
curl $APISERVER/api
--header "Authorization: Bearer $TOKEN" --insecure
"""
import yaml
_path = ''
# Try to load remote config only if it was not detected already
if not config.kube_config_detected and not config.env_name == ENV_LOCAL:
_path = "{}@{}:{}".format(
config.ssh_user,
config.ssh_host,
config.kube_config_path
)
_c_data = ssh_shell_p(
"cat " + config.kube_config_path,
config.ssh_host,
username=config.ssh_user,
keypath=config.ssh_key,
piped=False,
use_sudo=config.ssh_uses_sudo,
)
else:
_path = "local:{}".format(config.kube_config_path)
with open(config.kube_config_path, 'r') as ff:
_c_data = ff.read()
if len(_c_data) < 1:
return None, None, _path
_conf = yaml.load(_c_data, Loader=yaml.SafeLoader)
_kube_conf = kclient.Configuration()
# A remote host configuration
# To work with remote cluster, we need to extract these
# keys = ['host', 'ssl_ca_cert', 'cert_file', 'key_file', 'verify_ssl']
# When v12 of the client will be release, we will use load_from_dict
_kube_conf.ssl_ca_cert = create_temp_file_with_content(
base64.standard_b64decode(
_conf['clusters'][0]['cluster']['certificate-authority-data']
)
)
_host = _conf['clusters'][0]['cluster']['server']
_kube_conf.cert_file = create_temp_file_with_content(
base64.standard_b64decode(
_conf['users'][0]['user']['client-certificate-data']
)
)
_kube_conf.key_file = create_temp_file_with_content(
base64.standard_b64decode(
_conf['users'][0]['user']['client-key-data']
)
)
if "http" not in _host or "443" not in _host:
logger_cli.error(
"Failed to extract Kube host: '{}'".format(_host)
)
else:
logger_cli.debug(
"... 'context' host extracted: '{}' via SSH@{}".format(
_host,
config.ssh_host
)
)
# Substitute context host to ours
_tmp = _host.split(':')
_kube_conf.host = \
_tmp[0] + "://" + config.mcp_host + ":" + _tmp[2]
config.kube_port = _tmp[2]
logger_cli.debug(
"... kube remote host updated to {}".format(
_kube_conf.host
)
)
_kube_conf.verify_ssl = False
_kube_conf.debug = config.debug
if config.insecure:
_kube_conf.assert_hostname = False
_kube_conf.client_side_validation = False
# Nevertheless if you want to do it
# you can with these 2 parameters
# configuration.verify_ssl=True
# ssl_ca_cert is the filepath
# to the file that contains the certificate.
# configuration.ssl_ca_cert="certificate"
# _kube_conf.api_key = {
# "authorization": "Bearer " + config.kube_token
# }
# Create a ApiClient with our config
_kube_api = kclient.ApiClient(_kube_conf)
return _kube_conf, _kube_api, _path
class KubeApi(object):
def __init__(self, config):
self.config = config
self.initialized = self._init_kclient()
self.last_response = None
def _init_kclient(self):
# if there is no password - try to get local, if this available
logger_cli.debug("... init kube config")
if self.config.env_name == "local":
self.kConf, self.kApi, self.kConfigPath = _init_kube_conf_local(
self.config
)
self.is_local = True
# Try to load local config data
if self.config.kube_config_path and \
os.path.exists(self.config.kube_config_path):
_cmd = "cat " + self.config.kube_config_path
_c_data = shell(_cmd)
_conf = yaml.load(_c_data, Loader=yaml.SafeLoader)
self.user_keypath = create_temp_file_with_content(
base64.standard_b64decode(
_conf['users'][0]['user']['client-key-data']
)
)
self.yaml_conf = _c_data
else:
self.kConf, self.kApi, self.kConfigPath = _init_kube_conf_remote(
self.config
)
self.is_local = False
if self.kConf is None or self.kApi is None:
return False
else:
return True
def get_versions_api(self):
# client.CoreApi().get_api_versions().versions
return kclient.VersionApi(self.kApi)
class KubeRemote(KubeApi):
def __init__(self, config):
super(KubeRemote, self).__init__(config)
self._coreV1 = None
self._appsV1 = None
self._podV1 = None
self._custom = None
@property
def CustomObjects(self):
if not self._custom:
self._custom = kclient.CustomObjectsApi(self.kApi)
return self._custom
@property
def CoreV1(self):
if not self._coreV1:
if self.is_local:
self._coreV1 = kclient.CoreV1Api(kclient.ApiClient())
else:
self._coreV1 = kclient.CoreV1Api(kclient.ApiClient(self.kConf))
return self._coreV1
@property
def AppsV1(self):
if not self._appsV1:
self._appsV1 = kclient.AppsV1Api(self.kApi)
return self._appsV1
@property
def PodsV1(self):
if not self._podsV1:
self._podsV1 = kclient.V1Pod(self.kApi)
return self._podsV1
@staticmethod
def _typed_list_to_dict(i_list):
_dict = {}
for _item in i_list:
_d = _item.to_dict()
_type = _d.pop("type")
_dict[_type.lower()] = _d
return _dict
@staticmethod
def _get_listed_attrs(items, _path):
_list = []
for _n in items:
_list.append(utils.rgetattr(_n, _path))
return _list
@staticmethod
def safe_get_item_by_name(api_resource, _name):
for item in api_resource.items:
if item.metadata.name == _name:
return item
return None
def wait_for_phase_on_start(self, _func, phase, *args, **kwargs):
w = watch.Watch()
start_time = time()
for event in w.stream(_func, *args, **kwargs):
if event["object"].status.phase == phase:
w.stop()
end_time = time()
logger_cli.debug(
"... bacame '{}' in {:0.2f} sec".format(
phase,
end_time-start_time
)
)
return
# event.type: ADDED, MODIFIED, DELETED
if event["type"] == "DELETED":
# Pod was deleted while we were waiting for it to start.
logger_cli.debug("... deleted before started")
w.stop()
return
def wait_for_event(self, _func, event, *args, **kwargs):
w = watch.Watch()
for event in w.stream(_func, *args, **kwargs):
# event.type: ADDED, MODIFIED, DELETED
if event["type"] == event:
# Pod was deleted while we were waiting for it to start.
logger_cli.debug("... got {} event".format(event["type"]))
w.stop()
return
def get_node_info(self, http=False):
# Query API for the nodes and do some presorting
_nodes = {}
if http:
_raw_nodes = self.CoreV1.list_node_with_http_info()
else:
_raw_nodes = self.CoreV1.list_node()
if not isinstance(_raw_nodes, kclient.models.v1_node_list.V1NodeList):
raise InvalidReturnException(
"Invalid return type: '{}'".format(type(_raw_nodes))
)
for _n in _raw_nodes.items:
_name = _n.metadata.name
_d = _n.to_dict()
# parse inner data classes as dicts
_d['addresses'] = self._typed_list_to_dict(_n.status.addresses)
_d['conditions'] = self._typed_list_to_dict(_n.status.conditions)
# Update 'status' type
if isinstance(_d['conditions']['ready']['status'], str):
_d['conditions']['ready']['status'] = utils.to_bool(
_d['conditions']['ready']['status']
)
# Parse image names?
# TODO: Here is the place where we can parse each node image names
# Parse roles
_d['labels'] = {}
for _label, _data in _d["metadata"]["labels"].items():
if _data.lower() in ["true", "false"]:
_d['labels'][_label] = utils.to_bool(_data)
else:
_d['labels'][_label] = _data
# Save
_nodes[_name] = _d
# debug report on how many nodes detected
logger_cli.debug("...node items returned '{}'".format(len(_nodes)))
return _nodes
def get_pod_names_by_partial_name(self, partial_name, ns):
logger_cli.debug('... searching for pods with {}'.format(partial_name))
_pods = self.CoreV1.list_namespaced_pod(ns)
_names = self._get_listed_attrs(_pods.items, "metadata.name")
_pnames = [n for n in _names if partial_name in n]
if len(_pnames) > 1:
logger_cli.debug(
"... more than one pod found for '{}': {}\n".format(
partial_name,
", ".join(_pnames)
)
)
elif len(_pnames) < 1:
logger_cli.warning(
"WARNING: No pods found for '{}'".format(partial_name)
)
return _pnames
def get_pods_by_partial_name(self, partial_name, ns):
logger_cli.debug('... searching for pods with {}'.format(partial_name))
_all_pods = self.CoreV1.list_namespaced_pod(ns)
# _names = self._get_listed_attrs(_pods.items, "metadata.name")
_pods = [_pod for _pod in _all_pods.items
if partial_name in _pod.metadata.name]
if len(_pods) > 1:
logger_cli.debug(
"... more than one pod found for '{}': {}\n".format(
partial_name,
", ".join(partial_name)
)
)
elif len(_pods) < 1:
logger_cli.warning(
"WARNING: No pods found for '{}'".format(partial_name)
)
return _pods
def exec_on_target_pod(
self,
cmd,
pod_name,
namespace,
strict=False,
_request_timeout=120,
arguments=None,
**kwargs
):
_pname = ""
if not strict:
logger_cli.debug(
"... searching for pods with the name '{}'".format(pod_name)
)
_pods = {}
_pods = self.CoreV1.list_namespaced_pod(namespace)
_names = self._get_listed_attrs(_pods.items, "metadata.name")
_pnames = [n for n in _names if n.startswith(pod_name)]
if len(_pnames) > 1:
logger_cli.debug(
"... more than one pod found for '{}': {}\n"
"... using first one".format(
pod_name,
", ".join(_pnames)
)
)
elif len(_pnames) < 1:
raise KubeException("No pods found for '{}'".format(pod_name))
# in case of >1 and =1 we are taking 1st anyway
_pname = _pnames[0]
else:
_pname = pod_name
logger_cli.debug(
"... cmd: [CoreV1] exec {} -n {} -- {} '{}'".format(
_pname,
namespace,
cmd,
arguments
)
)
# Set preload_content to False to preserve JSON
# If not, output gets converted to str
# Which causes to change " to '
# After that json.loads(...) fail
cmd = cmd if isinstance(cmd, list) else cmd.split()
if arguments:
cmd += [arguments]
_pod_stream = stream(
self.CoreV1.connect_get_namespaced_pod_exec,
_pname,
namespace,
command=cmd,
stderr=True,
stdin=False,
stdout=True,
tty=False,
_request_timeout=_request_timeout,
_preload_content=False,
**kwargs
)
# run for timeout
_pod_stream.run_forever(timeout=_request_timeout)
# read the output
_output = _pod_stream.read_stdout()
_error = _pod_stream.read_stderr()
if _error:
# copy error to output
logger_cli.warning(
"WARNING: cmd of '{}' returned error:\n{}\n".format(
" ".join(cmd),
_error
)
)
if not _output:
_output = _error
# Force recreate of api objects
self._coreV1 = None
# Send output
return _output
def ensure_namespace(self, ns):
"""
Ensure that given namespace exists
"""
# list active namespaces
_v1NamespaceList = self.CoreV1.list_namespace()
_ns = self.safe_get_item_by_name(_v1NamespaceList, ns)
if _ns is None:
logger_cli.debug("... creating namespace '{}'".format(ns))
_new_ns = kclient.V1Namespace()
_new_ns.metadata = kclient.V1ObjectMeta(name=ns)
_r = self.CoreV1.create_namespace(_new_ns)
# TODO: check return on fail
if not _r:
return False
else:
logger_cli.debug("... found existing namespace '{}'".format(ns))
return True
def get_daemon_set_by_name(self, ns, name):
return self.safe_get_item_by_name(
self.AppsV1.list_namespaced_daemon_set(ns),
name
)
def create_config_map(self, ns, name, source, recreate=True):
"""
Creates/Overwrites ConfigMap in working namespace
"""
# Prepare source
logger_cli.debug(
"... preparing config map '{}/{}' with files from '{}'".format(
ns,
name,
source
)
)
_data = {}
if os.path.isfile(source):
# populate data with one file
with open(source, 'rt') as fS:
_data[os.path.split(source)[1]] = fS.read()
elif os.path.isdir(source):
# walk dirs and populate all 'py' files
for path, dirs, files in os.walk(source):
_e = ('.py')
_subfiles = (_fl for _fl in files
if _fl.endswith(_e) and not _fl.startswith('.'))
for _file in _subfiles:
with open(os.path.join(path, _file), 'rt') as fS:
_data[_file] = fS.read()
_cm = kclient.V1ConfigMap()
_cm.metadata = kclient.V1ObjectMeta(name=name, namespace=ns)
_cm.data = _data
logger_cli.debug(
"... prepared config map with {} scripts".format(len(_data))
)
# Query existing configmap, delete if needed
_existing_cm = self.safe_get_item_by_name(
self.CoreV1.list_namespaced_config_map(namespace=ns),
name
)
if _existing_cm is not None:
self.CoreV1.replace_namespaced_config_map(
namespace=ns,
name=name,
body=_cm
)
logger_cli.debug(
"... replaced existing config map '{}/{}'".format(
ns,
name
)
)
else:
# Create it
self.CoreV1.create_namespaced_config_map(
namespace=ns,
body=_cm
)
logger_cli.debug("... created config map '{}/{}'".format(
ns,
name
))
return _data.keys()
def prepare_daemonset_from_yaml(self, ns, ds_yaml):
_name = ds_yaml['metadata']['name']
_ds = self.get_daemon_set_by_name(ns, _name)
if _ds is not None:
logger_cli.debug(
"... found existing daemonset '{}'".format(_name)
)
_r = self.AppsV1.replace_namespaced_daemon_set(
_ds.metadata.name,
_ds.metadata.namespace,
body=ds_yaml
)
logger_cli.debug(
"... replacing existing daemonset '{}'".format(_name)
)
return _r
else:
logger_cli.debug(
"... creating daemonset '{}'".format(_name)
)
_r = self.AppsV1.create_namespaced_daemon_set(ns, body=ds_yaml)
return _r
def delete_daemon_set_by_name(self, ns, name):
return self.AppsV1.delete_namespaced_daemon_set(name, ns)
def exec_on_all_pods(self, pods):
"""
Create multiple threads to execute script on all target pods
"""
# Create map for threads: [[node_name, ns, pod_name]...]
_pod_list = []
for item in pods.items:
_pod_list.append(
[
item.spec.nodeName,
item.metadata.namespace,
item.metadata.name
]
)
# map func and cmd
logger_cli.error("ERROR: 'exec_on_all_pods'is not implemented yet")
# create result list
return []
@retry(ApiException)
def get_pods_for_daemonset(self, ds):
# get all pod names for daemonset
logger_cli.debug(
"... extracting pod names from daemonset '{}'".format(
ds.metadata.name
)
)
_ns = ds.metadata.namespace
_name = ds.metadata.name
_pods = self.CoreV1.list_namespaced_pod(
namespace=_ns,
label_selector='name={}'.format(_name)
)
return _pods
def put_string_buffer_to_pod_as_textfile(
self,
pod_name,
namespace,
buffer,
filepath,
_request_timeout=120,
**kwargs
):
_command = ['/bin/sh']
response = stream(
self.CoreV1.connect_get_namespaced_pod_exec,
pod_name,
namespace,
command=_command,
stderr=True,
stdin=True,
stdout=True,
tty=False,
_request_timeout=_request_timeout,
_preload_content=False,
**kwargs
)
# if json
# buffer = json.dumps(_dict, indent=2).encode('utf-8')
commands = [
bytes("cat <<'EOF' >" + filepath + "\n", 'utf-8'),
buffer,
bytes("\n" + "EOF\n", 'utf-8')
]
while response.is_open():
response.update(timeout=1)
if response.peek_stdout():
logger_cli.debug("... STDOUT: %s" % response.read_stdout())
if response.peek_stderr():
logger_cli.debug("... STDERR: %s" % response.read_stderr())
if commands:
c = commands.pop(0)
logger_cli.debug("... running command... {}\n".format(c))
response.write_stdin(str(c, encoding='utf-8'))
else:
break
response.close()
# Force recreate of Api objects
self._coreV1 = None
return
def get_custom_resource(self, group, version, plural):
# Get it
# Example:
# kubernetes.client.CustomObjectsApi().list_cluster_custom_object(
# group="networking.istio.io",
# version="v1alpha3",
# plural="serviceentries"
# )
return self.CustomObjects.list_cluster_custom_object(
group=group,
version=version,
plural=plural
)
def init_pvc_resource(
self,
name,
storage_class,
size,
ns="qa-space",
mode="ReadWriteOnce"
):
"""Return the Kubernetes PVC resource"""
return kclient.V1PersistentVolumeClaim(
api_version='v1',
kind='PersistentVolumeClaim',
metadata=kclient.V1ObjectMeta(
name=name,
namespace=ns,
labels={"name": name}
),
spec=kclient.V1PersistentVolumeClaimSpec(
storage_class_name=storage_class,
access_modes=[mode],
resources=kclient.V1ResourceRequirements(
requests={'storage': size}
)
)
)
def init_pv_resource(
self,
name,
storage_class,
size,
path,
ns="qa-space",
mode="ReadWriteOnce"
):
"""Return the Kubernetes PVC resource"""
return kclient.V1PersistentVolume(
api_version='v1',
kind='PersistentVolume',
metadata=kclient.V1ObjectMeta(
name=name,
namespace=ns,
labels={"name": name}
),
spec=kclient.V1PersistentVolumeSpec(
storage_class_name=storage_class,
access_modes=[mode],
capacity={'storage': size},
host_path=kclient.V1HostPathVolumeSource(path=path)
)
)
def init_service(
self,
name,
port,
clusterip=None,
ns="qa-space"
):
""" Inits a V1Service object with data for benchmark agent"""
_meta = kclient.V1ObjectMeta(
name=name,
namespace=ns,
labels={"name": name}
)
_port = kclient.V1ServicePort(
port=port,
protocol="TCP",
target_port=port
)
_spec = kclient.V1ServiceSpec(
# cluster_ip=clusterip,
selector={"name": name},
# type="ClusterIP",
ports=[_port]
)
return kclient.V1Service(
api_version="v1",
kind="Service",
metadata=_meta,
spec=_spec
)
def prepare_pv(self, pv_object):
_existing = self.get_pv_by_name(pv_object.metadata.name)
if _existing is not None:
self.CoreV1.replace_persistent_volume(
pv_object.metadata.name,
pv_object
)
else:
self.CoreV1.create_persistent_volume(pv_object)
return self.wait_for_phase(
"pv",
pv_object.metadata.name,
None,
["Available", "Bound"]
)
def prepare_pvc(self, pvc_object):
_existing = self.get_pvc_by_name_and_ns(
pvc_object.metadata.name,
pvc_object.metadata.namespace
)
if _existing is not None:
_size_r = pvc_object.spec.resources.requests["storage"]
_size_e = _existing.spec.resources.requests["storage"]
logger_cli.info(
"-> Found PVC '{}/{}' with {}. Requested: {}'".format(
pvc_object.metadata.namespace,
pvc_object.metadata.name,
_size_e,
_size_r
)
)
if _size_r != _size_e:
raise CheckerException(
"ERROR: PVC exists on the cloud with different size "
"than needed. Please cleanup!"
)
else:
logger_cli.debug(
"... creating pvc '{}'".format(pvc_object.metadata.name)
)
self.CoreV1.create_namespaced_persistent_volume_claim(
pvc_object.metadata.namespace,
pvc_object
)
return self.wait_for_phase(
"pvc",
pvc_object.metadata.name,
pvc_object.metadata.namespace,
["Available", "Bound"]
)
def get_pod_by_name_and_ns(self, name, ns):
return self.safe_get_item_by_name(
self.CoreV1.list_namespaced_pod(
ns,
label_selector='name={}'.format(name)
),
name
)
def get_svc_by_name_and_ns(self, name, ns):
return self.safe_get_item_by_name(
self.CoreV1.list_namespaced_service(
ns,
label_selector='name={}'.format(name)
),
name
)
def get_pvc_by_name_and_ns(self, name, ns):
return self.safe_get_item_by_name(
self.CoreV1.list_namespaced_persistent_volume_claim(
ns,
label_selector='name={}'.format(name)
),
name
)
def get_pv_by_name(self, name):
return self.safe_get_item_by_name(
self.CoreV1.list_persistent_volume(
label_selector='name={}'.format(name)
),
name
)
def wait_for_phase(self, ttype, name, ns, phase_list, timeout=120):
logger_cli.debug(
"... waiting '{}'s until {} is '{}'".format(
timeout,
ttype,
", ".join(phase_list)
)
)
while timeout > 0:
if ttype == "pod":
_t = self.get_pod_by_name_and_ns(name, ns)
elif ttype == "svc":
_t = self.get_svc_by_name_and_ns(name, ns)
elif ttype == "pvc":
_t = self.get_pvc_by_name_and_ns(name, ns)
elif ttype == "pv":
_t = self.get_pv_by_name(name)
if "Terminated" in phase_list and not _t:
if ns:
_s = "... {} {}/{} not found".format(ttype, ns, name)
else:
_s = "... {} '{}' not found".format(ttype, name)
logger_cli.debug(_s)
return None
logger_cli.debug("... {} is '{}'".format(ttype, _t.status.phase))
if _t.status.phase in phase_list:
return _t
sleep(2)
timeout -= 2
raise CheckerException(
"Timed out waiting for {} '{}' in '{}'".format(
ttype,
name,
", ".join(ttype)
)
)
def prepare_pod_from_yaml(self, pod_yaml):
_existing = self.get_pod_by_name_and_ns(
pod_yaml['metadata']['name'],
pod_yaml['metadata']['namespace']
)
if _existing is not None:
logger_cli.info(
"-> Found pod '{}/{}'. Reusing.".format(
pod_yaml['metadata']['namespace'],
pod_yaml['metadata']['name']
)
)
return _existing
else:
self.CoreV1.create_namespaced_pod(
pod_yaml['metadata']['namespace'],
pod_yaml
)
return self.wait_for_phase(
"pod",
pod_yaml['metadata']['name'],
pod_yaml['metadata']['namespace'],
["Running"]
)
def expose_pod_port(self, pod_object, port, ns="qa-space"):
_existing = self.get_svc_by_name_and_ns(
pod_object.metadata.name,
pod_object.metadata.namespace
)
if _existing is not None:
# TODO: Check port number?
logger_cli.info(
"-> Pod already exposed '{}/{}:{}'. Reusing.".format(
pod_object.metadata.namespace,
pod_object.metadata.name,
port
)
)
return _existing
else:
logger_cli.debug(
"... creating service for pod {}/{}: {}:{}".format(
pod_object.metadata.namespace,
pod_object.metadata.name,
pod_object.status.pod_ip,
port
)
)
_svc = self.init_service(
pod_object.metadata.name,
port
)
return self.CoreV1.create_namespaced_service(
pod_object.metadata.namespace,
_svc
)