blob: eb355abeb8e72e5fcbe917195a65e7e1dc90325d [file] [log] [blame]
Alex0989ecf2022-03-29 13:43:21 -05001# Author: Alex Savatieiev (osavatieiev@mirantis.com; a.savex@gmail.com)
2# Copyright 2019-2022 Mirantis, Inc.
Alex9a4ad212020-10-01 18:04:25 -05003"""
4Module to handle interaction with Kube
5"""
6import base64
7import os
8import urllib3
9import yaml
10
Alex5cace3b2021-11-10 16:40:37 -060011from kubernetes import client as kclient, config as kconfig, watch
Alex9a4ad212020-10-01 18:04:25 -050012from kubernetes.stream import stream
Alex7b0ee9a2021-09-21 17:16:17 -050013from kubernetes.client.rest import ApiException
Alex0bcf31b2022-03-29 17:38:58 -050014from urllib3.exceptions import MaxRetryError
Alex5cace3b2021-11-10 16:40:37 -060015from time import time, sleep
Alex9a4ad212020-10-01 18:04:25 -050016
17from cfg_checker.common import logger, logger_cli
Alex7b0ee9a2021-09-21 17:16:17 -050018from cfg_checker.common.decorators import retry
Alex5cace3b2021-11-10 16:40:37 -060019from cfg_checker.common.exception import CheckerException, \
20 InvalidReturnException, KubeException
Alex9a4ad212020-10-01 18:04:25 -050021from cfg_checker.common.file_utils import create_temp_file_with_content
22from cfg_checker.common.other import utils, shell
23from cfg_checker.common.ssh_utils import ssh_shell_p
Alex359e5752021-08-16 17:28:30 -050024from cfg_checker.common.const import ENV_LOCAL
Alex9a4ad212020-10-01 18:04:25 -050025
Alex7b0ee9a2021-09-21 17:16:17 -050026
Alex9a4ad212020-10-01 18:04:25 -050027urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
28
29
30def _init_kube_conf_local(config):
31 # Init kube library locally
Alex359e5752021-08-16 17:28:30 -050032 _path = "local:{}".format(config.kube_config_path)
Alex9a4ad212020-10-01 18:04:25 -050033 try:
Alexc4f59622021-08-27 13:42:00 -050034 kconfig.load_kube_config(config_file=config.kube_config_path)
Alex33747812021-04-07 10:11:39 -050035 if config.insecure:
36 kconfig.assert_hostname = False
37 kconfig.client_side_validation = False
Alex9a4ad212020-10-01 18:04:25 -050038 logger_cli.debug(
Alexc4f59622021-08-27 13:42:00 -050039 "... found Kube env: core, {}". format(
Alex9a4ad212020-10-01 18:04:25 -050040 ",".join(
41 kclient.CoreApi().get_api_versions().versions
42 )
43 )
44 )
Alexc4f59622021-08-27 13:42:00 -050045 return kconfig, kclient.ApiClient(), _path
Alex9a4ad212020-10-01 18:04:25 -050046 except Exception as e:
47 logger.warn("Failed to init local Kube client: {}".format(
48 str(e)
49 )
50 )
Alex359e5752021-08-16 17:28:30 -050051 return None, None, _path
Alex9a4ad212020-10-01 18:04:25 -050052
53
54def _init_kube_conf_remote(config):
55 # init remote client
56 # Preload Kube token
57 """
58 APISERVER=$(kubectl config view --minify |
59 grep server | cut -f 2- -d ":" | tr -d " ")
60 SECRET_NAME=$(kubectl get secrets |
61 grep ^default | cut -f1 -d ' ')
62 TOKEN=$(kubectl describe secret $SECRET_NAME |
63 grep -E '^token' | cut -f2 -d':' | tr -d " ")
64
65 echo "Detected API Server at: '${APISERVER}'"
66 echo "Got secret: '${SECRET_NAME}'"
67 echo "Loaded token: '${TOKEN}'"
68
69 curl $APISERVER/api
70 --header "Authorization: Bearer $TOKEN" --insecure
71 """
72 import yaml
Alex359e5752021-08-16 17:28:30 -050073 _path = ''
Alexc4f59622021-08-27 13:42:00 -050074 # Try to load remote config only if it was not detected already
75 if not config.kube_config_detected and not config.env_name == ENV_LOCAL:
Alex359e5752021-08-16 17:28:30 -050076 _path = "{}@{}:{}".format(
77 config.ssh_user,
78 config.ssh_host,
79 config.kube_config_path
80 )
Alex9d913532021-03-24 18:01:45 -050081 _c_data = ssh_shell_p(
Alexc4f59622021-08-27 13:42:00 -050082 "cat " + config.kube_config_path,
Alex9d913532021-03-24 18:01:45 -050083 config.ssh_host,
84 username=config.ssh_user,
85 keypath=config.ssh_key,
86 piped=False,
87 use_sudo=config.ssh_uses_sudo,
88 )
89 else:
Alex359e5752021-08-16 17:28:30 -050090 _path = "local:{}".format(config.kube_config_path)
Alex9d913532021-03-24 18:01:45 -050091 with open(config.kube_config_path, 'r') as ff:
92 _c_data = ff.read()
Alex9a4ad212020-10-01 18:04:25 -050093
Alex359e5752021-08-16 17:28:30 -050094 if len(_c_data) < 1:
95 return None, None, _path
96
Alex9a4ad212020-10-01 18:04:25 -050097 _conf = yaml.load(_c_data, Loader=yaml.SafeLoader)
98
99 _kube_conf = kclient.Configuration()
100 # A remote host configuration
101
102 # To work with remote cluster, we need to extract these
103 # keys = ['host', 'ssl_ca_cert', 'cert_file', 'key_file', 'verify_ssl']
104 # When v12 of the client will be release, we will use load_from_dict
105
106 _kube_conf.ssl_ca_cert = create_temp_file_with_content(
107 base64.standard_b64decode(
108 _conf['clusters'][0]['cluster']['certificate-authority-data']
109 )
110 )
111 _host = _conf['clusters'][0]['cluster']['server']
112 _kube_conf.cert_file = create_temp_file_with_content(
113 base64.standard_b64decode(
114 _conf['users'][0]['user']['client-certificate-data']
115 )
116 )
117 _kube_conf.key_file = create_temp_file_with_content(
118 base64.standard_b64decode(
119 _conf['users'][0]['user']['client-key-data']
120 )
121 )
122 if "http" not in _host or "443" not in _host:
123 logger_cli.error(
124 "Failed to extract Kube host: '{}'".format(_host)
125 )
126 else:
127 logger_cli.debug(
Alexc4f59622021-08-27 13:42:00 -0500128 "... 'context' host extracted: '{}' via SSH@{}".format(
Alex9a4ad212020-10-01 18:04:25 -0500129 _host,
130 config.ssh_host
131 )
132 )
133
134 # Substitute context host to ours
135 _tmp = _host.split(':')
136 _kube_conf.host = \
137 _tmp[0] + "://" + config.mcp_host + ":" + _tmp[2]
138 config.kube_port = _tmp[2]
139 logger_cli.debug(
Alexc4f59622021-08-27 13:42:00 -0500140 "... kube remote host updated to {}".format(
Alex9a4ad212020-10-01 18:04:25 -0500141 _kube_conf.host
142 )
143 )
144 _kube_conf.verify_ssl = False
145 _kube_conf.debug = config.debug
Alex33747812021-04-07 10:11:39 -0500146 if config.insecure:
147 _kube_conf.assert_hostname = False
148 _kube_conf.client_side_validation = False
149
Alex9a4ad212020-10-01 18:04:25 -0500150 # Nevertheless if you want to do it
151 # you can with these 2 parameters
152 # configuration.verify_ssl=True
153 # ssl_ca_cert is the filepath
154 # to the file that contains the certificate.
155 # configuration.ssl_ca_cert="certificate"
156
157 # _kube_conf.api_key = {
158 # "authorization": "Bearer " + config.kube_token
159 # }
160
161 # Create a ApiClient with our config
162 _kube_api = kclient.ApiClient(_kube_conf)
163
Alex359e5752021-08-16 17:28:30 -0500164 return _kube_conf, _kube_api, _path
Alex9a4ad212020-10-01 18:04:25 -0500165
166
167class KubeApi(object):
168 def __init__(self, config):
169 self.config = config
Alex359e5752021-08-16 17:28:30 -0500170 self.initialized = self._init_kclient()
Alex9a4ad212020-10-01 18:04:25 -0500171 self.last_response = None
172
173 def _init_kclient(self):
174 # if there is no password - try to get local, if this available
Alex359e5752021-08-16 17:28:30 -0500175 logger_cli.debug("... init kube config")
Alex9a4ad212020-10-01 18:04:25 -0500176 if self.config.env_name == "local":
Alex359e5752021-08-16 17:28:30 -0500177 self.kConf, self.kApi, self.kConfigPath = _init_kube_conf_local(
178 self.config
179 )
Alex9a4ad212020-10-01 18:04:25 -0500180 self.is_local = True
Alexc4f59622021-08-27 13:42:00 -0500181 # Try to load local config data
182 if self.config.kube_config_path and \
183 os.path.exists(self.config.kube_config_path):
184 _cmd = "cat " + self.config.kube_config_path
185 _c_data = shell(_cmd)
Alex9a4ad212020-10-01 18:04:25 -0500186 _conf = yaml.load(_c_data, Loader=yaml.SafeLoader)
187 self.user_keypath = create_temp_file_with_content(
188 base64.standard_b64decode(
189 _conf['users'][0]['user']['client-key-data']
190 )
191 )
192 self.yaml_conf = _c_data
193 else:
Alex359e5752021-08-16 17:28:30 -0500194 self.kConf, self.kApi, self.kConfigPath = _init_kube_conf_remote(
195 self.config
196 )
Alex9a4ad212020-10-01 18:04:25 -0500197 self.is_local = False
198
Alex359e5752021-08-16 17:28:30 -0500199 if self.kConf is None or self.kApi is None:
200 return False
201 else:
202 return True
203
Alex9a4ad212020-10-01 18:04:25 -0500204 def get_versions_api(self):
205 # client.CoreApi().get_api_versions().versions
206 return kclient.VersionApi(self.kApi)
207
208
209class KubeRemote(KubeApi):
210 def __init__(self, config):
211 super(KubeRemote, self).__init__(config)
Alex1f90e7b2021-09-03 15:31:28 -0500212 self._appsV1 = None
213 self._podV1 = None
Alexdcb792f2021-10-04 14:24:21 -0500214 self._custom = None
215
216 @property
217 def CustomObjects(self):
218 if not self._custom:
219 self._custom = kclient.CustomObjectsApi(self.kApi)
220 return self._custom
Alex9a4ad212020-10-01 18:04:25 -0500221
222 @property
223 def CoreV1(self):
Alexb2129542021-11-23 15:49:42 -0600224 if self.is_local:
225 return kclient.CoreV1Api(kclient.ApiClient())
226 else:
227 return kclient.CoreV1Api(kclient.ApiClient(self.kConf))
Alex9a4ad212020-10-01 18:04:25 -0500228
Alex1f90e7b2021-09-03 15:31:28 -0500229 @property
230 def AppsV1(self):
231 if not self._appsV1:
232 self._appsV1 = kclient.AppsV1Api(self.kApi)
233 return self._appsV1
234
235 @property
236 def PodsV1(self):
237 if not self._podsV1:
238 self._podsV1 = kclient.V1Pod(self.kApi)
239 return self._podsV1
240
Alex9a4ad212020-10-01 18:04:25 -0500241 @staticmethod
242 def _typed_list_to_dict(i_list):
243 _dict = {}
244 for _item in i_list:
245 _d = _item.to_dict()
246 _type = _d.pop("type")
247 _dict[_type.lower()] = _d
248
249 return _dict
250
251 @staticmethod
252 def _get_listed_attrs(items, _path):
253 _list = []
254 for _n in items:
255 _list.append(utils.rgetattr(_n, _path))
256
257 return _list
258
Alex1f90e7b2021-09-03 15:31:28 -0500259 @staticmethod
260 def safe_get_item_by_name(api_resource, _name):
261 for item in api_resource.items:
262 if item.metadata.name == _name:
263 return item
264
265 return None
266
Alex2a7657c2021-11-10 20:51:34 -0600267 def wait_for_phase_on_start(self, _func, phase, *args, **kwargs):
Alex5cace3b2021-11-10 16:40:37 -0600268 w = watch.Watch()
269 start_time = time()
270 for event in w.stream(_func, *args, **kwargs):
271 if event["object"].status.phase == phase:
272 w.stop()
273 end_time = time()
274 logger_cli.debug(
275 "... bacame '{}' in {:0.2f} sec".format(
276 phase,
277 end_time-start_time
278 )
279 )
280 return
281 # event.type: ADDED, MODIFIED, DELETED
282 if event["type"] == "DELETED":
283 # Pod was deleted while we were waiting for it to start.
284 logger_cli.debug("... deleted before started")
285 w.stop()
286 return
287
Alex2a7657c2021-11-10 20:51:34 -0600288 def wait_for_event(self, _func, event, *args, **kwargs):
289 w = watch.Watch()
290 for event in w.stream(_func, *args, **kwargs):
291 # event.type: ADDED, MODIFIED, DELETED
292 if event["type"] == event:
293 # Pod was deleted while we were waiting for it to start.
294 logger_cli.debug("... got {} event".format(event["type"]))
295 w.stop()
296 return
297
Ievgeniia Zadorozhna33d5c9a2025-09-04 17:36:55 +0200298 def get_k0rdent_release(self, mgmt_name="kcm"):
299 group = "k0rdent.mirantis.com"
300 version = "v1beta1"
301 plural = "managements"
302
303 try:
304 mgmt_list = self.get_custom_resource(group, version, plural) or {}
305 for item in mgmt_list.get("items", []):
306 name = item.get("metadata", {}).get("name", "").lower()
307 if name == mgmt_name.lower():
308 return item.get("status", {}).get("release", "Unknown")
309 logger.warning(
310 f"Management object '{mgmt_name}' not found in CR list.")
311 except Exception as e:
312 logger.warning(
313 f"Failed to get the k0rdent release from the '{mgmt_name}' mgmt CRD: {e}")
314 return "Unknown"
315
Ievgeniia Zadorozhna23906ed2025-09-04 19:53:44 +0200316 def get_cluster_name_from_kube_config(self):
317 try:
318 with open(self.kConfigPath.split(":")[1], "r") as f:
319 config = yaml.safe_load(f)
320 clusters = config.get("clusters", [])
321 return clusters[0].get("name")
322 except Exception as e:
323 logger.warning(
324 f"Failed to get the cluster name from the loaded kubeconfig: {e}")
325 return ""
326
Alex9a4ad212020-10-01 18:04:25 -0500327 def get_node_info(self, http=False):
328 # Query API for the nodes and do some presorting
329 _nodes = {}
330 if http:
331 _raw_nodes = self.CoreV1.list_node_with_http_info()
332 else:
333 _raw_nodes = self.CoreV1.list_node()
334
335 if not isinstance(_raw_nodes, kclient.models.v1_node_list.V1NodeList):
336 raise InvalidReturnException(
337 "Invalid return type: '{}'".format(type(_raw_nodes))
338 )
339
340 for _n in _raw_nodes.items:
341 _name = _n.metadata.name
342 _d = _n.to_dict()
343 # parse inner data classes as dicts
344 _d['addresses'] = self._typed_list_to_dict(_n.status.addresses)
345 _d['conditions'] = self._typed_list_to_dict(_n.status.conditions)
346 # Update 'status' type
347 if isinstance(_d['conditions']['ready']['status'], str):
348 _d['conditions']['ready']['status'] = utils.to_bool(
349 _d['conditions']['ready']['status']
350 )
351 # Parse image names?
352 # TODO: Here is the place where we can parse each node image names
353
354 # Parse roles
355 _d['labels'] = {}
356 for _label, _data in _d["metadata"]["labels"].items():
357 if _data.lower() in ["true", "false"]:
358 _d['labels'][_label] = utils.to_bool(_data)
359 else:
360 _d['labels'][_label] = _data
361
362 # Save
363 _nodes[_name] = _d
364
365 # debug report on how many nodes detected
366 logger_cli.debug("...node items returned '{}'".format(len(_nodes)))
367
368 return _nodes
369
Alexdcb792f2021-10-04 14:24:21 -0500370 def get_pod_names_by_partial_name(self, partial_name, ns):
371 logger_cli.debug('... searching for pods with {}'.format(partial_name))
372 _pods = self.CoreV1.list_namespaced_pod(ns)
373 _names = self._get_listed_attrs(_pods.items, "metadata.name")
374 _pnames = [n for n in _names if partial_name in n]
375 if len(_pnames) > 1:
376 logger_cli.debug(
377 "... more than one pod found for '{}': {}\n".format(
378 partial_name,
379 ", ".join(_pnames)
380 )
381 )
382 elif len(_pnames) < 1:
383 logger_cli.warning(
384 "WARNING: No pods found for '{}'".format(partial_name)
385 )
386
387 return _pnames
388
389 def get_pods_by_partial_name(self, partial_name, ns):
390 logger_cli.debug('... searching for pods with {}'.format(partial_name))
391 _all_pods = self.CoreV1.list_namespaced_pod(ns)
392 # _names = self._get_listed_attrs(_pods.items, "metadata.name")
393 _pods = [_pod for _pod in _all_pods.items
394 if partial_name in _pod.metadata.name]
395 if len(_pods) > 1:
396 logger_cli.debug(
397 "... more than one pod found for '{}': {}\n".format(
398 partial_name,
399 ", ".join(partial_name)
400 )
401 )
402 elif len(_pods) < 1:
403 logger_cli.warning(
404 "WARNING: No pods found for '{}'".format(partial_name)
405 )
406
407 return _pods
408
Alex30380a42021-12-20 16:11:20 -0600409 @retry(ApiException, initial_wait=10)
Alex9a4ad212020-10-01 18:04:25 -0500410 def exec_on_target_pod(
411 self,
412 cmd,
413 pod_name,
414 namespace,
415 strict=False,
416 _request_timeout=120,
Alexb78191f2021-11-02 16:35:46 -0500417 arguments=None,
Alex9a4ad212020-10-01 18:04:25 -0500418 **kwargs
419 ):
Alexdcb792f2021-10-04 14:24:21 -0500420 _pname = ""
Alex9a4ad212020-10-01 18:04:25 -0500421 if not strict:
Alex1f90e7b2021-09-03 15:31:28 -0500422 logger_cli.debug(
423 "... searching for pods with the name '{}'".format(pod_name)
424 )
425 _pods = {}
Alex7b0ee9a2021-09-21 17:16:17 -0500426 _pods = self.CoreV1.list_namespaced_pod(namespace)
Alex1f90e7b2021-09-03 15:31:28 -0500427 _names = self._get_listed_attrs(_pods.items, "metadata.name")
Alex33747812021-04-07 10:11:39 -0500428 _pnames = [n for n in _names if n.startswith(pod_name)]
429 if len(_pnames) > 1:
Alex9a4ad212020-10-01 18:04:25 -0500430 logger_cli.debug(
Alexc4f59622021-08-27 13:42:00 -0500431 "... more than one pod found for '{}': {}\n"
432 "... using first one".format(
Alex9a4ad212020-10-01 18:04:25 -0500433 pod_name,
Alex33747812021-04-07 10:11:39 -0500434 ", ".join(_pnames)
Alex9a4ad212020-10-01 18:04:25 -0500435 )
436 )
Alexdcb792f2021-10-04 14:24:21 -0500437 elif len(_pnames) < 1:
Alex9a4ad212020-10-01 18:04:25 -0500438 raise KubeException("No pods found for '{}'".format(pod_name))
Alexb78191f2021-11-02 16:35:46 -0500439 # in case of >1 and =1 we are taking 1st anyway
440 _pname = _pnames[0]
Alex9a4ad212020-10-01 18:04:25 -0500441 else:
442 _pname = pod_name
Alex33747812021-04-07 10:11:39 -0500443 logger_cli.debug(
Alexb78191f2021-11-02 16:35:46 -0500444 "... cmd: [CoreV1] exec {} -n {} -- {} '{}'".format(
Alex33747812021-04-07 10:11:39 -0500445 _pname,
446 namespace,
Alexb78191f2021-11-02 16:35:46 -0500447 cmd,
448 arguments
Alex33747812021-04-07 10:11:39 -0500449 )
450 )
Alex1f90e7b2021-09-03 15:31:28 -0500451 # Set preload_content to False to preserve JSON
452 # If not, output gets converted to str
453 # Which causes to change " to '
454 # After that json.loads(...) fail
Alex7b0ee9a2021-09-21 17:16:17 -0500455 cmd = cmd if isinstance(cmd, list) else cmd.split()
Alexb78191f2021-11-02 16:35:46 -0500456 if arguments:
457 cmd += [arguments]
Alexb2129542021-11-23 15:49:42 -0600458 # Make sure that CoreV1 is fresh before calling it
Alex1f90e7b2021-09-03 15:31:28 -0500459 _pod_stream = stream(
Alex9a4ad212020-10-01 18:04:25 -0500460 self.CoreV1.connect_get_namespaced_pod_exec,
461 _pname,
462 namespace,
Alex7b0ee9a2021-09-21 17:16:17 -0500463 command=cmd,
Alex9a4ad212020-10-01 18:04:25 -0500464 stderr=True,
465 stdin=False,
466 stdout=True,
467 tty=False,
468 _request_timeout=_request_timeout,
Alex1f90e7b2021-09-03 15:31:28 -0500469 _preload_content=False,
Alex9a4ad212020-10-01 18:04:25 -0500470 **kwargs
471 )
Alex1f90e7b2021-09-03 15:31:28 -0500472 # run for timeout
473 _pod_stream.run_forever(timeout=_request_timeout)
474 # read the output
Alex7b0ee9a2021-09-21 17:16:17 -0500475 _output = _pod_stream.read_stdout()
Alexb78191f2021-11-02 16:35:46 -0500476 _error = _pod_stream.read_stderr()
477 if _error:
478 # copy error to output
Alexe4de1142022-11-04 19:26:03 -0500479 logger.warning(
Alexb78191f2021-11-02 16:35:46 -0500480 "WARNING: cmd of '{}' returned error:\n{}\n".format(
481 " ".join(cmd),
482 _error
483 )
484 )
485 if not _output:
486 _output = _error
Alex7b0ee9a2021-09-21 17:16:17 -0500487 # Send output
488 return _output
Alex9a4ad212020-10-01 18:04:25 -0500489
Alex1f90e7b2021-09-03 15:31:28 -0500490 def ensure_namespace(self, ns):
491 """
492 Ensure that given namespace exists
493 """
494 # list active namespaces
495 _v1NamespaceList = self.CoreV1.list_namespace()
496 _ns = self.safe_get_item_by_name(_v1NamespaceList, ns)
497
498 if _ns is None:
499 logger_cli.debug("... creating namespace '{}'".format(ns))
Alexdcb792f2021-10-04 14:24:21 -0500500 _new_ns = kclient.V1Namespace()
501 _new_ns.metadata = kclient.V1ObjectMeta(name=ns)
502 _r = self.CoreV1.create_namespace(_new_ns)
Alex1f90e7b2021-09-03 15:31:28 -0500503 # TODO: check return on fail
504 if not _r:
505 return False
506 else:
507 logger_cli.debug("... found existing namespace '{}'".format(ns))
508
509 return True
510
511 def get_daemon_set_by_name(self, ns, name):
512 return self.safe_get_item_by_name(
513 self.AppsV1.list_namespaced_daemon_set(ns),
514 name
515 )
516
517 def create_config_map(self, ns, name, source, recreate=True):
518 """
519 Creates/Overwrites ConfigMap in working namespace
520 """
521 # Prepare source
522 logger_cli.debug(
523 "... preparing config map '{}/{}' with files from '{}'".format(
524 ns,
525 name,
526 source
527 )
528 )
529 _data = {}
530 if os.path.isfile(source):
531 # populate data with one file
532 with open(source, 'rt') as fS:
533 _data[os.path.split(source)[1]] = fS.read()
534 elif os.path.isdir(source):
535 # walk dirs and populate all 'py' files
536 for path, dirs, files in os.walk(source):
537 _e = ('.py')
538 _subfiles = (_fl for _fl in files
539 if _fl.endswith(_e) and not _fl.startswith('.'))
540 for _file in _subfiles:
541 with open(os.path.join(path, _file), 'rt') as fS:
542 _data[_file] = fS.read()
543
544 _cm = kclient.V1ConfigMap()
545 _cm.metadata = kclient.V1ObjectMeta(name=name, namespace=ns)
546 _cm.data = _data
547 logger_cli.debug(
548 "... prepared config map with {} scripts".format(len(_data))
549 )
550 # Query existing configmap, delete if needed
551 _existing_cm = self.safe_get_item_by_name(
552 self.CoreV1.list_namespaced_config_map(namespace=ns),
553 name
554 )
555 if _existing_cm is not None:
556 self.CoreV1.replace_namespaced_config_map(
557 namespace=ns,
558 name=name,
559 body=_cm
560 )
561 logger_cli.debug(
562 "... replaced existing config map '{}/{}'".format(
563 ns,
564 name
565 )
566 )
567 else:
568 # Create it
569 self.CoreV1.create_namespaced_config_map(
570 namespace=ns,
571 body=_cm
572 )
573 logger_cli.debug("... created config map '{}/{}'".format(
574 ns,
575 name
576 ))
577
578 return _data.keys()
579
580 def prepare_daemonset_from_yaml(self, ns, ds_yaml):
581 _name = ds_yaml['metadata']['name']
582 _ds = self.get_daemon_set_by_name(ns, _name)
583
584 if _ds is not None:
585 logger_cli.debug(
586 "... found existing daemonset '{}'".format(_name)
587 )
588 _r = self.AppsV1.replace_namespaced_daemon_set(
589 _ds.metadata.name,
590 _ds.metadata.namespace,
591 body=ds_yaml
592 )
593 logger_cli.debug(
594 "... replacing existing daemonset '{}'".format(_name)
595 )
596 return _r
597 else:
598 logger_cli.debug(
599 "... creating daemonset '{}'".format(_name)
600 )
601 _r = self.AppsV1.create_namespaced_daemon_set(ns, body=ds_yaml)
602 return _r
603
604 def delete_daemon_set_by_name(self, ns, name):
605 return self.AppsV1.delete_namespaced_daemon_set(name, ns)
606
607 def exec_on_all_pods(self, pods):
608 """
609 Create multiple threads to execute script on all target pods
610 """
611 # Create map for threads: [[node_name, ns, pod_name]...]
612 _pod_list = []
613 for item in pods.items:
614 _pod_list.append(
615 [
616 item.spec.nodeName,
617 item.metadata.namespace,
618 item.metadata.name
619 ]
620 )
621
622 # map func and cmd
Alexdcb792f2021-10-04 14:24:21 -0500623 logger_cli.error("ERROR: 'exec_on_all_pods'is not implemented yet")
Alex1f90e7b2021-09-03 15:31:28 -0500624 # create result list
625
626 return []
Alex7b0ee9a2021-09-21 17:16:17 -0500627
Alexb2129542021-11-23 15:49:42 -0600628 @retry(ApiException, initial_wait=5)
Alex7b0ee9a2021-09-21 17:16:17 -0500629 def get_pods_for_daemonset(self, ds):
630 # get all pod names for daemonset
631 logger_cli.debug(
632 "... extracting pod names from daemonset '{}'".format(
633 ds.metadata.name
634 )
635 )
636 _ns = ds.metadata.namespace
637 _name = ds.metadata.name
638 _pods = self.CoreV1.list_namespaced_pod(
639 namespace=_ns,
640 label_selector='name={}'.format(_name)
641 )
642 return _pods
643
Alexbdc72742021-12-23 13:26:05 -0600644 @retry(ApiException, initial_wait=10)
Alex7b0ee9a2021-09-21 17:16:17 -0500645 def put_string_buffer_to_pod_as_textfile(
646 self,
647 pod_name,
648 namespace,
649 buffer,
650 filepath,
651 _request_timeout=120,
652 **kwargs
653 ):
654 _command = ['/bin/sh']
655 response = stream(
656 self.CoreV1.connect_get_namespaced_pod_exec,
657 pod_name,
658 namespace,
659 command=_command,
660 stderr=True,
661 stdin=True,
662 stdout=True,
663 tty=False,
664 _request_timeout=_request_timeout,
665 _preload_content=False,
666 **kwargs
667 )
668
669 # if json
670 # buffer = json.dumps(_dict, indent=2).encode('utf-8')
671
672 commands = [
673 bytes("cat <<'EOF' >" + filepath + "\n", 'utf-8'),
674 buffer,
675 bytes("\n" + "EOF\n", 'utf-8')
676 ]
677
678 while response.is_open():
679 response.update(timeout=1)
680 if response.peek_stdout():
681 logger_cli.debug("... STDOUT: %s" % response.read_stdout())
682 if response.peek_stderr():
683 logger_cli.debug("... STDERR: %s" % response.read_stderr())
684 if commands:
685 c = commands.pop(0)
Alexb2129542021-11-23 15:49:42 -0600686 logger_cli.debug("... running command... {}".format(c))
Alex7b0ee9a2021-09-21 17:16:17 -0500687 response.write_stdin(str(c, encoding='utf-8'))
688 else:
689 break
690 response.close()
691
Alex7b0ee9a2021-09-21 17:16:17 -0500692 return
Alexdcb792f2021-10-04 14:24:21 -0500693
694 def get_custom_resource(self, group, version, plural):
695 # Get it
696 # Example:
697 # kubernetes.client.CustomObjectsApi().list_cluster_custom_object(
698 # group="networking.istio.io",
699 # version="v1alpha3",
700 # plural="serviceentries"
701 # )
702 return self.CustomObjects.list_cluster_custom_object(
703 group=group,
704 version=version,
705 plural=plural
706 )
Alex5cace3b2021-11-10 16:40:37 -0600707
708 def init_pvc_resource(
709 self,
710 name,
711 storage_class,
712 size,
713 ns="qa-space",
714 mode="ReadWriteOnce"
715 ):
716 """Return the Kubernetes PVC resource"""
717 return kclient.V1PersistentVolumeClaim(
718 api_version='v1',
719 kind='PersistentVolumeClaim',
720 metadata=kclient.V1ObjectMeta(
721 name=name,
722 namespace=ns,
723 labels={"name": name}
724 ),
725 spec=kclient.V1PersistentVolumeClaimSpec(
726 storage_class_name=storage_class,
727 access_modes=[mode],
728 resources=kclient.V1ResourceRequirements(
729 requests={'storage': size}
730 )
731 )
732 )
733
734 def init_pv_resource(
735 self,
736 name,
737 storage_class,
738 size,
739 path,
740 ns="qa-space",
741 mode="ReadWriteOnce"
742 ):
743 """Return the Kubernetes PVC resource"""
744 return kclient.V1PersistentVolume(
745 api_version='v1',
746 kind='PersistentVolume',
747 metadata=kclient.V1ObjectMeta(
748 name=name,
749 namespace=ns,
750 labels={"name": name}
751 ),
752 spec=kclient.V1PersistentVolumeSpec(
753 storage_class_name=storage_class,
754 access_modes=[mode],
755 capacity={'storage': size},
756 host_path=kclient.V1HostPathVolumeSource(path=path)
757 )
758 )
759
760 def init_service(
761 self,
762 name,
763 port,
764 clusterip=None,
765 ns="qa-space"
766 ):
767 """ Inits a V1Service object with data for benchmark agent"""
768 _meta = kclient.V1ObjectMeta(
769 name=name,
770 namespace=ns,
771 labels={"name": name}
772 )
773 _port = kclient.V1ServicePort(
774 port=port,
775 protocol="TCP",
776 target_port=port
777 )
778 _spec = kclient.V1ServiceSpec(
779 # cluster_ip=clusterip,
780 selector={"name": name},
781 # type="ClusterIP",
782 ports=[_port]
783 )
784 return kclient.V1Service(
785 api_version="v1",
786 kind="Service",
787 metadata=_meta,
788 spec=_spec
789 )
790
791 def prepare_pv(self, pv_object):
Alex2a7657c2021-11-10 20:51:34 -0600792 _existing = self.get_pv_by_name(pv_object.metadata.name)
Alex5cace3b2021-11-10 16:40:37 -0600793 if _existing is not None:
794 self.CoreV1.replace_persistent_volume(
795 pv_object.metadata.name,
796 pv_object
797 )
798 else:
799 self.CoreV1.create_persistent_volume(pv_object)
800
Alex2a7657c2021-11-10 20:51:34 -0600801 return self.wait_for_phase(
802 "pv",
803 pv_object.metadata.name,
804 None,
805 ["Available", "Bound"]
Alex5cace3b2021-11-10 16:40:37 -0600806 )
807
808 def prepare_pvc(self, pvc_object):
Alex2a7657c2021-11-10 20:51:34 -0600809 _existing = self.get_pvc_by_name_and_ns(
810 pvc_object.metadata.name,
811 pvc_object.metadata.namespace
Alex5cace3b2021-11-10 16:40:37 -0600812 )
813 if _existing is not None:
814 _size_r = pvc_object.spec.resources.requests["storage"]
815 _size_e = _existing.spec.resources.requests["storage"]
Alex2a7657c2021-11-10 20:51:34 -0600816 logger_cli.info(
817 "-> Found PVC '{}/{}' with {}. Requested: {}'".format(
Alex5cace3b2021-11-10 16:40:37 -0600818 pvc_object.metadata.namespace,
819 pvc_object.metadata.name,
820 _size_e,
821 _size_r
822 )
823 )
824 if _size_r != _size_e:
825 raise CheckerException(
826 "ERROR: PVC exists on the cloud with different size "
827 "than needed. Please cleanup!"
828 )
829 else:
830 logger_cli.debug(
831 "... creating pvc '{}'".format(pvc_object.metadata.name)
832 )
833 self.CoreV1.create_namespaced_persistent_volume_claim(
834 pvc_object.metadata.namespace,
835 pvc_object
836 )
837
Alex2a7657c2021-11-10 20:51:34 -0600838 return self.wait_for_phase(
839 "pvc",
840 pvc_object.metadata.name,
841 pvc_object.metadata.namespace,
842 ["Available", "Bound"]
843 )
844
845 def get_pod_by_name_and_ns(self, name, ns):
846 return self.safe_get_item_by_name(
847 self.CoreV1.list_namespaced_pod(
848 ns,
849 label_selector='name={}'.format(name)
850 ),
851 name
852 )
853
Alexb2129542021-11-23 15:49:42 -0600854 def list_pods(self, ns, label_str=None):
855 return self.CoreV1.list_namespaced_pod(
856 ns,
857 label_selector=label_str
858 )
859
Alex2a7657c2021-11-10 20:51:34 -0600860 def get_svc_by_name_and_ns(self, name, ns):
861 return self.safe_get_item_by_name(
862 self.CoreV1.list_namespaced_service(
863 ns,
864 label_selector='name={}'.format(name)
865 ),
866 name
867 )
868
Alexb2129542021-11-23 15:49:42 -0600869 def list_svc(self, ns, label_str=None):
870 return self.CoreV1.list_namespaced_service(
871 ns,
872 label_selector=label_str
873 )
874
Alex2a7657c2021-11-10 20:51:34 -0600875 def get_pvc_by_name_and_ns(self, name, ns):
876 return self.safe_get_item_by_name(
877 self.CoreV1.list_namespaced_persistent_volume_claim(
878 ns,
879 label_selector='name={}'.format(name)
880 ),
881 name
882 )
883
Alexb2129542021-11-23 15:49:42 -0600884 def list_pvc(self, ns, label_str=None):
885 return self.CoreV1.list_namespaced_persistent_volume_claim(
886 ns,
887 label_selector=label_str
888 )
889
Alex2a7657c2021-11-10 20:51:34 -0600890 def get_pv_by_name(self, name):
891 return self.safe_get_item_by_name(
892 self.CoreV1.list_persistent_volume(
893 label_selector='name={}'.format(name)
894 ),
895 name
896 )
897
Alexb2129542021-11-23 15:49:42 -0600898 def list_pv(self, label_str=None):
899 return self.CoreV1.list_persistent_volume(
900 label_selector=label_str
901 )
902
Alex2a7657c2021-11-10 20:51:34 -0600903 def wait_for_phase(self, ttype, name, ns, phase_list, timeout=120):
904 logger_cli.debug(
905 "... waiting '{}'s until {} is '{}'".format(
906 timeout,
907 ttype,
908 ", ".join(phase_list)
909 )
910 )
911 while timeout > 0:
912 if ttype == "pod":
913 _t = self.get_pod_by_name_and_ns(name, ns)
914 elif ttype == "svc":
915 _t = self.get_svc_by_name_and_ns(name, ns)
916 elif ttype == "pvc":
917 _t = self.get_pvc_by_name_and_ns(name, ns)
918 elif ttype == "pv":
919 _t = self.get_pv_by_name(name)
920 if "Terminated" in phase_list and not _t:
921 if ns:
922 _s = "... {} {}/{} not found".format(ttype, ns, name)
Alex5cace3b2021-11-10 16:40:37 -0600923 else:
Alex2a7657c2021-11-10 20:51:34 -0600924 _s = "... {} '{}' not found".format(ttype, name)
925 logger_cli.debug(_s)
926 return None
927 logger_cli.debug("... {} is '{}'".format(ttype, _t.status.phase))
928 if _t.status.phase in phase_list:
929 return _t
930 sleep(2)
931 timeout -= 2
Alex5cace3b2021-11-10 16:40:37 -0600932 raise CheckerException(
Alex2a7657c2021-11-10 20:51:34 -0600933 "Timed out waiting for {} '{}' in '{}'".format(
934 ttype,
935 name,
936 ", ".join(ttype)
Alex5cace3b2021-11-10 16:40:37 -0600937 )
938 )
939
940 def prepare_pod_from_yaml(self, pod_yaml):
Alex2a7657c2021-11-10 20:51:34 -0600941 _existing = self.get_pod_by_name_and_ns(
942 pod_yaml['metadata']['name'],
943 pod_yaml['metadata']['namespace']
Alex5cace3b2021-11-10 16:40:37 -0600944 )
945 if _existing is not None:
Alexbfa947c2021-11-11 18:14:28 -0600946 logger_cli.info(
947 "-> Found pod '{}/{}'. Reusing.".format(
Alex5cace3b2021-11-10 16:40:37 -0600948 pod_yaml['metadata']['namespace'],
949 pod_yaml['metadata']['name']
950 )
951 )
952 return _existing
953 else:
954 self.CoreV1.create_namespaced_pod(
955 pod_yaml['metadata']['namespace'],
956 pod_yaml
957 )
Alex2a7657c2021-11-10 20:51:34 -0600958 return self.wait_for_phase(
959 "pod",
960 pod_yaml['metadata']['name'],
961 pod_yaml['metadata']['namespace'],
962 ["Running"]
Alex5cace3b2021-11-10 16:40:37 -0600963 )
964
965 def expose_pod_port(self, pod_object, port, ns="qa-space"):
Alex2a7657c2021-11-10 20:51:34 -0600966 _existing = self.get_svc_by_name_and_ns(
967 pod_object.metadata.name,
968 pod_object.metadata.namespace
Alex5cace3b2021-11-10 16:40:37 -0600969 )
970 if _existing is not None:
971 # TODO: Check port number?
Alex2a7657c2021-11-10 20:51:34 -0600972 logger_cli.info(
973 "-> Pod already exposed '{}/{}:{}'. Reusing.".format(
Alex5cace3b2021-11-10 16:40:37 -0600974 pod_object.metadata.namespace,
975 pod_object.metadata.name,
976 port
977 )
978 )
979 return _existing
980 else:
981 logger_cli.debug(
982 "... creating service for pod {}/{}: {}:{}".format(
983 pod_object.metadata.namespace,
984 pod_object.metadata.name,
985 pod_object.status.pod_ip,
986 port
987 )
988 )
989 _svc = self.init_service(
990 pod_object.metadata.name,
991 port
992 )
993 return self.CoreV1.create_namespaced_service(
994 pod_object.metadata.namespace,
995 _svc
996 )
Alex0989ecf2022-03-29 13:43:21 -0500997
Alex0bcf31b2022-03-29 17:38:58 -0500998 def list_namespaces(self):
999 return self.CoreV1.list_namespace()
1000
1001 @retry(ApiException, initial_wait=2)
1002 def get_pod_logs(self, podname, container, ns, tail_lines=50):
Alex0989ecf2022-03-29 13:43:21 -05001003 # Params
1004 # read log of the specified Pod # noqa: E501 This method makes a synchronous HTTP request by default. To make an asynchronous HTTP request, please pass async_req=True
1005
1006 # >>> thread = api.read_namespaced_pod_log(name, namespace,
1007 # async_req=True)
1008 # >>> result = thread.get()
1009 # :param async_req bool: execute request asynchronously
1010 # :param str name: name of the Pod (required)
1011 # :param str namespace: object name and auth scope, such as for teams
1012 # and projects (required)
1013 # :param str container: The container for which to stream logs.
1014 # Defaults to only container if there is one container in
1015 # the pod.
1016 # :param bool follow: Follow the log stream of the pod. Defaults to
1017 # false.
1018 # :param bool insecure_skip_tls_verify_backend:
1019 # insecureSkipTLSVerifyBackend indicates that the apiserver
1020 # should not confirm the validity of the serving certificate
1021 # of the backend it is connecting to. This will make the
1022 # HTTPS connection between the apiserver and the backend
1023 # insecure. This means the apiserver cannot verify the log
1024 # data it is receiving came from the real kubelet. If the
1025 # kubelet is configured to verify the apiserver's TLS
1026 # credentials, it does not mean the connection to the real
1027 # kubelet is vulnerable to a man in the middle attack (e.g.
1028 # an attacker could not intercept the actual log data coming
1029 # from the real kubelet).
1030 # :param int limit_bytes: If set, the number of bytes to read from the
1031 # server before terminating the log output. This may not
1032 # display a complete final line of logging, and may return
1033 # slightly more or slightly less than the specified limit.
1034 # :param str pretty: If 'true', then the output is pretty printed.
1035 # :param bool previous: Return previous terminated container logs.
1036 # Defaults to false.
1037 # :param int since_seconds: A relative time in seconds before the
1038 # current time from which to show logs. If this value precedes
1039 # the time a pod was started, only logs since the pod start will
1040 # be returned. If this value is in the future, no logs will be
1041 # returned. Only one of sinceSeconds or sinceTime may be
1042 # specified.
1043 # :param int tail_lines: If set, the number of lines from the end of
1044 # the logs to show. If not specified, logs are shown from the
1045 # creation of the container or sinceSeconds or sinceTime
1046 # :param bool timestamps: If true, add an RFC3339 or RFC3339Nano
1047 # timestamp at the beginning of every line of log output.
1048 # Defaults to false.
1049 # :param _preload_content: if False, the urllib3.HTTPResponse object
1050 # will be returned without reading/decoding response data.
1051 # Default is True.
1052 # :param _request_timeout: timeout setting for this request. If one
1053 # number provided, it will be total request timeout. It can
1054 # also be a pair (tuple) of (connection, read) timeouts.
1055 # :return: str
1056 # If the method is called asynchronously, returns the request
1057 # thread.
1058
Alex0bcf31b2022-03-29 17:38:58 -05001059 try:
1060 return self.CoreV1.read_namespaced_pod_log(
1061 name=podname,
1062 namespace=ns,
1063 container=container,
1064 timestamps=True,
1065 tail_lines=tail_lines,
1066 # pretty=True,
1067 _request_timeout=(1, 5)
1068 )
1069 except MaxRetryError as e:
1070 logger_cli.warning(
1071 "WARNING: Failed to retrieve log {}/{}:{}:\n{}".format(
1072 ns,
1073 podname,
1074 container,
1075 e.reason
1076 )
1077 )
1078 return ""