blob: e6b9922947e74952cb321f390aba378a022b84f3 [file] [log] [blame]
# Author: Alex Savatieiev (osavatieiev@mirantis.com; a.savex@gmail.com)
# Copyright 2019-2022 Mirantis, Inc.
"""
Module to handle interaction with Kube
"""
import base64
import os
import urllib3
import yaml
from kubernetes import client as kclient, config as kconfig, watch
from kubernetes.stream import stream
from kubernetes.client.rest import ApiException
from urllib3.exceptions import MaxRetryError
from time import time, sleep
from cfg_checker.common import logger, logger_cli
from cfg_checker.common.decorators import retry
from cfg_checker.common.exception import CheckerException, \
InvalidReturnException, KubeException
from cfg_checker.common.file_utils import create_temp_file_with_content
from cfg_checker.common.other import utils, shell
from cfg_checker.common.ssh_utils import ssh_shell_p
from cfg_checker.common.const import ENV_LOCAL
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def _init_kube_conf_local(config):
# Init kube library locally
_path = "local:{}".format(config.kube_config_path)
try:
kconfig.load_kube_config(config_file=config.kube_config_path)
if config.insecure:
kconfig.assert_hostname = False
kconfig.client_side_validation = False
logger_cli.debug(
"... found Kube env: core, {}". format(
",".join(
kclient.CoreApi().get_api_versions().versions
)
)
)
return kconfig, kclient.ApiClient(), _path
except Exception as e:
logger.warn("Failed to init local Kube client: {}".format(
str(e)
)
)
return None, None, _path
def _init_kube_conf_remote(config):
# init remote client
# Preload Kube token
"""
APISERVER=$(kubectl config view --minify |
grep server | cut -f 2- -d ":" | tr -d " ")
SECRET_NAME=$(kubectl get secrets |
grep ^default | cut -f1 -d ' ')
TOKEN=$(kubectl describe secret $SECRET_NAME |
grep -E '^token' | cut -f2 -d':' | tr -d " ")
echo "Detected API Server at: '${APISERVER}'"
echo "Got secret: '${SECRET_NAME}'"
echo "Loaded token: '${TOKEN}'"
curl $APISERVER/api
--header "Authorization: Bearer $TOKEN" --insecure
"""
import yaml
_path = ''
# Try to load remote config only if it was not detected already
if not config.kube_config_detected and not config.env_name == ENV_LOCAL:
_path = "{}@{}:{}".format(
config.ssh_user,
config.ssh_host,
config.kube_config_path
)
_c_data = ssh_shell_p(
"cat " + config.kube_config_path,
config.ssh_host,
username=config.ssh_user,
keypath=config.ssh_key,
piped=False,
use_sudo=config.ssh_uses_sudo,
)
else:
_path = "local:{}".format(config.kube_config_path)
with open(config.kube_config_path, 'r') as ff:
_c_data = ff.read()
if len(_c_data) < 1:
return None, None, _path
_conf = yaml.load(_c_data, Loader=yaml.SafeLoader)
_kube_conf = kclient.Configuration()
# A remote host configuration
# To work with remote cluster, we need to extract these
# keys = ['host', 'ssl_ca_cert', 'cert_file', 'key_file', 'verify_ssl']
# When v12 of the client will be release, we will use load_from_dict
_kube_conf.ssl_ca_cert = create_temp_file_with_content(
base64.standard_b64decode(
_conf['clusters'][0]['cluster']['certificate-authority-data']
)
)
_host = _conf['clusters'][0]['cluster']['server']
_kube_conf.cert_file = create_temp_file_with_content(
base64.standard_b64decode(
_conf['users'][0]['user']['client-certificate-data']
)
)
_kube_conf.key_file = create_temp_file_with_content(
base64.standard_b64decode(
_conf['users'][0]['user']['client-key-data']
)
)
if "http" not in _host or "443" not in _host:
logger_cli.error(
"Failed to extract Kube host: '{}'".format(_host)
)
else:
logger_cli.debug(
"... 'context' host extracted: '{}' via SSH@{}".format(
_host,
config.ssh_host
)
)
# Substitute context host to ours
_tmp = _host.split(':')
_kube_conf.host = \
_tmp[0] + "://" + config.mcp_host + ":" + _tmp[2]
config.kube_port = _tmp[2]
logger_cli.debug(
"... kube remote host updated to {}".format(
_kube_conf.host
)
)
_kube_conf.verify_ssl = False
_kube_conf.debug = config.debug
if config.insecure:
_kube_conf.assert_hostname = False
_kube_conf.client_side_validation = False
# Nevertheless if you want to do it
# you can with these 2 parameters
# configuration.verify_ssl=True
# ssl_ca_cert is the filepath
# to the file that contains the certificate.
# configuration.ssl_ca_cert="certificate"
# _kube_conf.api_key = {
# "authorization": "Bearer " + config.kube_token
# }
# Create a ApiClient with our config
_kube_api = kclient.ApiClient(_kube_conf)
return _kube_conf, _kube_api, _path
class KubeApi(object):
def __init__(self, config):
self.config = config
self.initialized = self._init_kclient()
self.last_response = None
def _init_kclient(self):
# if there is no password - try to get local, if this available
logger_cli.debug("... init kube config")
if self.config.env_name == "local":
self.kConf, self.kApi, self.kConfigPath = _init_kube_conf_local(
self.config
)
self.is_local = True
# Try to load local config data
if self.config.kube_config_path and \
os.path.exists(self.config.kube_config_path):
_cmd = "cat " + self.config.kube_config_path
_c_data = shell(_cmd)
_conf = yaml.load(_c_data, Loader=yaml.SafeLoader)
self.user_keypath = create_temp_file_with_content(
base64.standard_b64decode(
_conf['users'][0]['user']['client-key-data']
)
)
self.yaml_conf = _c_data
else:
self.kConf, self.kApi, self.kConfigPath = _init_kube_conf_remote(
self.config
)
self.is_local = False
if self.kConf is None or self.kApi is None:
return False
else:
return True
def get_versions_api(self):
# client.CoreApi().get_api_versions().versions
return kclient.VersionApi(self.kApi)
class KubeRemote(KubeApi):
def __init__(self, config):
super(KubeRemote, self).__init__(config)
self._appsV1 = None
self._podV1 = None
self._custom = None
@property
def CustomObjects(self):
if not self._custom:
self._custom = kclient.CustomObjectsApi(self.kApi)
return self._custom
@property
def CoreV1(self):
if self.is_local:
return kclient.CoreV1Api(kclient.ApiClient())
else:
return kclient.CoreV1Api(kclient.ApiClient(self.kConf))
@property
def AppsV1(self):
if not self._appsV1:
self._appsV1 = kclient.AppsV1Api(self.kApi)
return self._appsV1
@property
def PodsV1(self):
if not self._podsV1:
self._podsV1 = kclient.V1Pod(self.kApi)
return self._podsV1
@staticmethod
def _typed_list_to_dict(i_list):
_dict = {}
for _item in i_list:
_d = _item.to_dict()
_type = _d.pop("type")
_dict[_type.lower()] = _d
return _dict
@staticmethod
def _get_listed_attrs(items, _path):
_list = []
for _n in items:
_list.append(utils.rgetattr(_n, _path))
return _list
@staticmethod
def safe_get_item_by_name(api_resource, _name):
for item in api_resource.items:
if item.metadata.name == _name:
return item
return None
def wait_for_phase_on_start(self, _func, phase, *args, **kwargs):
w = watch.Watch()
start_time = time()
for event in w.stream(_func, *args, **kwargs):
if event["object"].status.phase == phase:
w.stop()
end_time = time()
logger_cli.debug(
"... bacame '{}' in {:0.2f} sec".format(
phase,
end_time-start_time
)
)
return
# event.type: ADDED, MODIFIED, DELETED
if event["type"] == "DELETED":
# Pod was deleted while we were waiting for it to start.
logger_cli.debug("... deleted before started")
w.stop()
return
def wait_for_event(self, _func, event, *args, **kwargs):
w = watch.Watch()
for event in w.stream(_func, *args, **kwargs):
# event.type: ADDED, MODIFIED, DELETED
if event["type"] == event:
# Pod was deleted while we were waiting for it to start.
logger_cli.debug("... got {} event".format(event["type"]))
w.stop()
return
def get_node_info(self, http=False):
# Query API for the nodes and do some presorting
_nodes = {}
if http:
_raw_nodes = self.CoreV1.list_node_with_http_info()
else:
_raw_nodes = self.CoreV1.list_node()
if not isinstance(_raw_nodes, kclient.models.v1_node_list.V1NodeList):
raise InvalidReturnException(
"Invalid return type: '{}'".format(type(_raw_nodes))
)
for _n in _raw_nodes.items:
_name = _n.metadata.name
_d = _n.to_dict()
# parse inner data classes as dicts
_d['addresses'] = self._typed_list_to_dict(_n.status.addresses)
_d['conditions'] = self._typed_list_to_dict(_n.status.conditions)
# Update 'status' type
if isinstance(_d['conditions']['ready']['status'], str):
_d['conditions']['ready']['status'] = utils.to_bool(
_d['conditions']['ready']['status']
)
# Parse image names?
# TODO: Here is the place where we can parse each node image names
# Parse roles
_d['labels'] = {}
for _label, _data in _d["metadata"]["labels"].items():
if _data.lower() in ["true", "false"]:
_d['labels'][_label] = utils.to_bool(_data)
else:
_d['labels'][_label] = _data
# Save
_nodes[_name] = _d
# debug report on how many nodes detected
logger_cli.debug("...node items returned '{}'".format(len(_nodes)))
return _nodes
def get_pod_names_by_partial_name(self, partial_name, ns):
logger_cli.debug('... searching for pods with {}'.format(partial_name))
_pods = self.CoreV1.list_namespaced_pod(ns)
_names = self._get_listed_attrs(_pods.items, "metadata.name")
_pnames = [n for n in _names if partial_name in n]
if len(_pnames) > 1:
logger_cli.debug(
"... more than one pod found for '{}': {}\n".format(
partial_name,
", ".join(_pnames)
)
)
elif len(_pnames) < 1:
logger_cli.warning(
"WARNING: No pods found for '{}'".format(partial_name)
)
return _pnames
def get_pods_by_partial_name(self, partial_name, ns):
logger_cli.debug('... searching for pods with {}'.format(partial_name))
_all_pods = self.CoreV1.list_namespaced_pod(ns)
# _names = self._get_listed_attrs(_pods.items, "metadata.name")
_pods = [_pod for _pod in _all_pods.items
if partial_name in _pod.metadata.name]
if len(_pods) > 1:
logger_cli.debug(
"... more than one pod found for '{}': {}\n".format(
partial_name,
", ".join(partial_name)
)
)
elif len(_pods) < 1:
logger_cli.warning(
"WARNING: No pods found for '{}'".format(partial_name)
)
return _pods
@retry(ApiException, initial_wait=10)
def exec_on_target_pod(
self,
cmd,
pod_name,
namespace,
strict=False,
_request_timeout=120,
arguments=None,
**kwargs
):
_pname = ""
if not strict:
logger_cli.debug(
"... searching for pods with the name '{}'".format(pod_name)
)
_pods = {}
_pods = self.CoreV1.list_namespaced_pod(namespace)
_names = self._get_listed_attrs(_pods.items, "metadata.name")
_pnames = [n for n in _names if n.startswith(pod_name)]
if len(_pnames) > 1:
logger_cli.debug(
"... more than one pod found for '{}': {}\n"
"... using first one".format(
pod_name,
", ".join(_pnames)
)
)
elif len(_pnames) < 1:
raise KubeException("No pods found for '{}'".format(pod_name))
# in case of >1 and =1 we are taking 1st anyway
_pname = _pnames[0]
else:
_pname = pod_name
logger_cli.debug(
"... cmd: [CoreV1] exec {} -n {} -- {} '{}'".format(
_pname,
namespace,
cmd,
arguments
)
)
# Set preload_content to False to preserve JSON
# If not, output gets converted to str
# Which causes to change " to '
# After that json.loads(...) fail
cmd = cmd if isinstance(cmd, list) else cmd.split()
if arguments:
cmd += [arguments]
# Make sure that CoreV1 is fresh before calling it
_pod_stream = stream(
self.CoreV1.connect_get_namespaced_pod_exec,
_pname,
namespace,
command=cmd,
stderr=True,
stdin=False,
stdout=True,
tty=False,
_request_timeout=_request_timeout,
_preload_content=False,
**kwargs
)
# run for timeout
_pod_stream.run_forever(timeout=_request_timeout)
# read the output
_output = _pod_stream.read_stdout()
_error = _pod_stream.read_stderr()
if _error:
# copy error to output
logger.warning(
"WARNING: cmd of '{}' returned error:\n{}\n".format(
" ".join(cmd),
_error
)
)
if not _output:
_output = _error
# Send output
return _output
def ensure_namespace(self, ns):
"""
Ensure that given namespace exists
"""
# list active namespaces
_v1NamespaceList = self.CoreV1.list_namespace()
_ns = self.safe_get_item_by_name(_v1NamespaceList, ns)
if _ns is None:
logger_cli.debug("... creating namespace '{}'".format(ns))
_new_ns = kclient.V1Namespace()
_new_ns.metadata = kclient.V1ObjectMeta(name=ns)
_r = self.CoreV1.create_namespace(_new_ns)
# TODO: check return on fail
if not _r:
return False
else:
logger_cli.debug("... found existing namespace '{}'".format(ns))
return True
def get_daemon_set_by_name(self, ns, name):
return self.safe_get_item_by_name(
self.AppsV1.list_namespaced_daemon_set(ns),
name
)
def create_config_map(self, ns, name, source, recreate=True):
"""
Creates/Overwrites ConfigMap in working namespace
"""
# Prepare source
logger_cli.debug(
"... preparing config map '{}/{}' with files from '{}'".format(
ns,
name,
source
)
)
_data = {}
if os.path.isfile(source):
# populate data with one file
with open(source, 'rt') as fS:
_data[os.path.split(source)[1]] = fS.read()
elif os.path.isdir(source):
# walk dirs and populate all 'py' files
for path, dirs, files in os.walk(source):
_e = ('.py')
_subfiles = (_fl for _fl in files
if _fl.endswith(_e) and not _fl.startswith('.'))
for _file in _subfiles:
with open(os.path.join(path, _file), 'rt') as fS:
_data[_file] = fS.read()
_cm = kclient.V1ConfigMap()
_cm.metadata = kclient.V1ObjectMeta(name=name, namespace=ns)
_cm.data = _data
logger_cli.debug(
"... prepared config map with {} scripts".format(len(_data))
)
# Query existing configmap, delete if needed
_existing_cm = self.safe_get_item_by_name(
self.CoreV1.list_namespaced_config_map(namespace=ns),
name
)
if _existing_cm is not None:
self.CoreV1.replace_namespaced_config_map(
namespace=ns,
name=name,
body=_cm
)
logger_cli.debug(
"... replaced existing config map '{}/{}'".format(
ns,
name
)
)
else:
# Create it
self.CoreV1.create_namespaced_config_map(
namespace=ns,
body=_cm
)
logger_cli.debug("... created config map '{}/{}'".format(
ns,
name
))
return _data.keys()
def prepare_daemonset_from_yaml(self, ns, ds_yaml):
_name = ds_yaml['metadata']['name']
_ds = self.get_daemon_set_by_name(ns, _name)
if _ds is not None:
logger_cli.debug(
"... found existing daemonset '{}'".format(_name)
)
_r = self.AppsV1.replace_namespaced_daemon_set(
_ds.metadata.name,
_ds.metadata.namespace,
body=ds_yaml
)
logger_cli.debug(
"... replacing existing daemonset '{}'".format(_name)
)
return _r
else:
logger_cli.debug(
"... creating daemonset '{}'".format(_name)
)
_r = self.AppsV1.create_namespaced_daemon_set(ns, body=ds_yaml)
return _r
def delete_daemon_set_by_name(self, ns, name):
return self.AppsV1.delete_namespaced_daemon_set(name, ns)
def exec_on_all_pods(self, pods):
"""
Create multiple threads to execute script on all target pods
"""
# Create map for threads: [[node_name, ns, pod_name]...]
_pod_list = []
for item in pods.items:
_pod_list.append(
[
item.spec.nodeName,
item.metadata.namespace,
item.metadata.name
]
)
# map func and cmd
logger_cli.error("ERROR: 'exec_on_all_pods'is not implemented yet")
# create result list
return []
@retry(ApiException, initial_wait=5)
def get_pods_for_daemonset(self, ds):
# get all pod names for daemonset
logger_cli.debug(
"... extracting pod names from daemonset '{}'".format(
ds.metadata.name
)
)
_ns = ds.metadata.namespace
_name = ds.metadata.name
_pods = self.CoreV1.list_namespaced_pod(
namespace=_ns,
label_selector='name={}'.format(_name)
)
return _pods
@retry(ApiException, initial_wait=10)
def put_string_buffer_to_pod_as_textfile(
self,
pod_name,
namespace,
buffer,
filepath,
_request_timeout=120,
**kwargs
):
_command = ['/bin/sh']
response = stream(
self.CoreV1.connect_get_namespaced_pod_exec,
pod_name,
namespace,
command=_command,
stderr=True,
stdin=True,
stdout=True,
tty=False,
_request_timeout=_request_timeout,
_preload_content=False,
**kwargs
)
# if json
# buffer = json.dumps(_dict, indent=2).encode('utf-8')
commands = [
bytes("cat <<'EOF' >" + filepath + "\n", 'utf-8'),
buffer,
bytes("\n" + "EOF\n", 'utf-8')
]
while response.is_open():
response.update(timeout=1)
if response.peek_stdout():
logger_cli.debug("... STDOUT: %s" % response.read_stdout())
if response.peek_stderr():
logger_cli.debug("... STDERR: %s" % response.read_stderr())
if commands:
c = commands.pop(0)
logger_cli.debug("... running command... {}".format(c))
response.write_stdin(str(c, encoding='utf-8'))
else:
break
response.close()
return
def get_custom_resource(self, group, version, plural):
# Get it
# Example:
# kubernetes.client.CustomObjectsApi().list_cluster_custom_object(
# group="networking.istio.io",
# version="v1alpha3",
# plural="serviceentries"
# )
return self.CustomObjects.list_cluster_custom_object(
group=group,
version=version,
plural=plural
)
def init_pvc_resource(
self,
name,
storage_class,
size,
ns="qa-space",
mode="ReadWriteOnce"
):
"""Return the Kubernetes PVC resource"""
return kclient.V1PersistentVolumeClaim(
api_version='v1',
kind='PersistentVolumeClaim',
metadata=kclient.V1ObjectMeta(
name=name,
namespace=ns,
labels={"name": name}
),
spec=kclient.V1PersistentVolumeClaimSpec(
storage_class_name=storage_class,
access_modes=[mode],
resources=kclient.V1ResourceRequirements(
requests={'storage': size}
)
)
)
def init_pv_resource(
self,
name,
storage_class,
size,
path,
ns="qa-space",
mode="ReadWriteOnce"
):
"""Return the Kubernetes PVC resource"""
return kclient.V1PersistentVolume(
api_version='v1',
kind='PersistentVolume',
metadata=kclient.V1ObjectMeta(
name=name,
namespace=ns,
labels={"name": name}
),
spec=kclient.V1PersistentVolumeSpec(
storage_class_name=storage_class,
access_modes=[mode],
capacity={'storage': size},
host_path=kclient.V1HostPathVolumeSource(path=path)
)
)
def init_service(
self,
name,
port,
clusterip=None,
ns="qa-space"
):
""" Inits a V1Service object with data for benchmark agent"""
_meta = kclient.V1ObjectMeta(
name=name,
namespace=ns,
labels={"name": name}
)
_port = kclient.V1ServicePort(
port=port,
protocol="TCP",
target_port=port
)
_spec = kclient.V1ServiceSpec(
# cluster_ip=clusterip,
selector={"name": name},
# type="ClusterIP",
ports=[_port]
)
return kclient.V1Service(
api_version="v1",
kind="Service",
metadata=_meta,
spec=_spec
)
def prepare_pv(self, pv_object):
_existing = self.get_pv_by_name(pv_object.metadata.name)
if _existing is not None:
self.CoreV1.replace_persistent_volume(
pv_object.metadata.name,
pv_object
)
else:
self.CoreV1.create_persistent_volume(pv_object)
return self.wait_for_phase(
"pv",
pv_object.metadata.name,
None,
["Available", "Bound"]
)
def prepare_pvc(self, pvc_object):
_existing = self.get_pvc_by_name_and_ns(
pvc_object.metadata.name,
pvc_object.metadata.namespace
)
if _existing is not None:
_size_r = pvc_object.spec.resources.requests["storage"]
_size_e = _existing.spec.resources.requests["storage"]
logger_cli.info(
"-> Found PVC '{}/{}' with {}. Requested: {}'".format(
pvc_object.metadata.namespace,
pvc_object.metadata.name,
_size_e,
_size_r
)
)
if _size_r != _size_e:
raise CheckerException(
"ERROR: PVC exists on the cloud with different size "
"than needed. Please cleanup!"
)
else:
logger_cli.debug(
"... creating pvc '{}'".format(pvc_object.metadata.name)
)
self.CoreV1.create_namespaced_persistent_volume_claim(
pvc_object.metadata.namespace,
pvc_object
)
return self.wait_for_phase(
"pvc",
pvc_object.metadata.name,
pvc_object.metadata.namespace,
["Available", "Bound"]
)
def get_pod_by_name_and_ns(self, name, ns):
return self.safe_get_item_by_name(
self.CoreV1.list_namespaced_pod(
ns,
label_selector='name={}'.format(name)
),
name
)
def list_pods(self, ns, label_str=None):
return self.CoreV1.list_namespaced_pod(
ns,
label_selector=label_str
)
def get_svc_by_name_and_ns(self, name, ns):
return self.safe_get_item_by_name(
self.CoreV1.list_namespaced_service(
ns,
label_selector='name={}'.format(name)
),
name
)
def list_svc(self, ns, label_str=None):
return self.CoreV1.list_namespaced_service(
ns,
label_selector=label_str
)
def get_pvc_by_name_and_ns(self, name, ns):
return self.safe_get_item_by_name(
self.CoreV1.list_namespaced_persistent_volume_claim(
ns,
label_selector='name={}'.format(name)
),
name
)
def list_pvc(self, ns, label_str=None):
return self.CoreV1.list_namespaced_persistent_volume_claim(
ns,
label_selector=label_str
)
def get_pv_by_name(self, name):
return self.safe_get_item_by_name(
self.CoreV1.list_persistent_volume(
label_selector='name={}'.format(name)
),
name
)
def list_pv(self, label_str=None):
return self.CoreV1.list_persistent_volume(
label_selector=label_str
)
def wait_for_phase(self, ttype, name, ns, phase_list, timeout=120):
logger_cli.debug(
"... waiting '{}'s until {} is '{}'".format(
timeout,
ttype,
", ".join(phase_list)
)
)
while timeout > 0:
if ttype == "pod":
_t = self.get_pod_by_name_and_ns(name, ns)
elif ttype == "svc":
_t = self.get_svc_by_name_and_ns(name, ns)
elif ttype == "pvc":
_t = self.get_pvc_by_name_and_ns(name, ns)
elif ttype == "pv":
_t = self.get_pv_by_name(name)
if "Terminated" in phase_list and not _t:
if ns:
_s = "... {} {}/{} not found".format(ttype, ns, name)
else:
_s = "... {} '{}' not found".format(ttype, name)
logger_cli.debug(_s)
return None
logger_cli.debug("... {} is '{}'".format(ttype, _t.status.phase))
if _t.status.phase in phase_list:
return _t
sleep(2)
timeout -= 2
raise CheckerException(
"Timed out waiting for {} '{}' in '{}'".format(
ttype,
name,
", ".join(ttype)
)
)
def prepare_pod_from_yaml(self, pod_yaml):
_existing = self.get_pod_by_name_and_ns(
pod_yaml['metadata']['name'],
pod_yaml['metadata']['namespace']
)
if _existing is not None:
logger_cli.info(
"-> Found pod '{}/{}'. Reusing.".format(
pod_yaml['metadata']['namespace'],
pod_yaml['metadata']['name']
)
)
return _existing
else:
self.CoreV1.create_namespaced_pod(
pod_yaml['metadata']['namespace'],
pod_yaml
)
return self.wait_for_phase(
"pod",
pod_yaml['metadata']['name'],
pod_yaml['metadata']['namespace'],
["Running"]
)
def expose_pod_port(self, pod_object, port, ns="qa-space"):
_existing = self.get_svc_by_name_and_ns(
pod_object.metadata.name,
pod_object.metadata.namespace
)
if _existing is not None:
# TODO: Check port number?
logger_cli.info(
"-> Pod already exposed '{}/{}:{}'. Reusing.".format(
pod_object.metadata.namespace,
pod_object.metadata.name,
port
)
)
return _existing
else:
logger_cli.debug(
"... creating service for pod {}/{}: {}:{}".format(
pod_object.metadata.namespace,
pod_object.metadata.name,
pod_object.status.pod_ip,
port
)
)
_svc = self.init_service(
pod_object.metadata.name,
port
)
return self.CoreV1.create_namespaced_service(
pod_object.metadata.namespace,
_svc
)
def list_namespaces(self):
return self.CoreV1.list_namespace()
@retry(ApiException, initial_wait=2)
def get_pod_logs(self, podname, container, ns, tail_lines=50):
# Params
# read log of the specified Pod # noqa: E501 This method makes a synchronous HTTP request by default. To make an asynchronous HTTP request, please pass async_req=True
# >>> thread = api.read_namespaced_pod_log(name, namespace,
# async_req=True)
# >>> result = thread.get()
# :param async_req bool: execute request asynchronously
# :param str name: name of the Pod (required)
# :param str namespace: object name and auth scope, such as for teams
# and projects (required)
# :param str container: The container for which to stream logs.
# Defaults to only container if there is one container in
# the pod.
# :param bool follow: Follow the log stream of the pod. Defaults to
# false.
# :param bool insecure_skip_tls_verify_backend:
# insecureSkipTLSVerifyBackend indicates that the apiserver
# should not confirm the validity of the serving certificate
# of the backend it is connecting to. This will make the
# HTTPS connection between the apiserver and the backend
# insecure. This means the apiserver cannot verify the log
# data it is receiving came from the real kubelet. If the
# kubelet is configured to verify the apiserver's TLS
# credentials, it does not mean the connection to the real
# kubelet is vulnerable to a man in the middle attack (e.g.
# an attacker could not intercept the actual log data coming
# from the real kubelet).
# :param int limit_bytes: If set, the number of bytes to read from the
# server before terminating the log output. This may not
# display a complete final line of logging, and may return
# slightly more or slightly less than the specified limit.
# :param str pretty: If 'true', then the output is pretty printed.
# :param bool previous: Return previous terminated container logs.
# Defaults to false.
# :param int since_seconds: A relative time in seconds before the
# current time from which to show logs. If this value precedes
# the time a pod was started, only logs since the pod start will
# be returned. If this value is in the future, no logs will be
# returned. Only one of sinceSeconds or sinceTime may be
# specified.
# :param int tail_lines: If set, the number of lines from the end of
# the logs to show. If not specified, logs are shown from the
# creation of the container or sinceSeconds or sinceTime
# :param bool timestamps: If true, add an RFC3339 or RFC3339Nano
# timestamp at the beginning of every line of log output.
# Defaults to false.
# :param _preload_content: if False, the urllib3.HTTPResponse object
# will be returned without reading/decoding response data.
# Default is True.
# :param _request_timeout: timeout setting for this request. If one
# number provided, it will be total request timeout. It can
# also be a pair (tuple) of (connection, read) timeouts.
# :return: str
# If the method is called asynchronously, returns the request
# thread.
try:
return self.CoreV1.read_namespaced_pod_log(
name=podname,
namespace=ns,
container=container,
timestamps=True,
tail_lines=tail_lines,
# pretty=True,
_request_timeout=(1, 5)
)
except MaxRetryError as e:
logger_cli.warning(
"WARNING: Failed to retrieve log {}/{}:{}:\n{}".format(
ns,
podname,
container,
e.reason
)
)
return ""