blob: 36efcaaff2950ee5ecbeedd8bee3529c423c7ebe [file] [log] [blame]
Alex0989ecf2022-03-29 13:43:21 -05001# Author: Alex Savatieiev (osavatieiev@mirantis.com; a.savex@gmail.com)
2# Copyright 2019-2022 Mirantis, Inc.
Alex9a4ad212020-10-01 18:04:25 -05003"""
4Module to handle interaction with Kube
5"""
6import base64
7import os
8import urllib3
9import yaml
10
Alex5cace3b2021-11-10 16:40:37 -060011from kubernetes import client as kclient, config as kconfig, watch
Alex9a4ad212020-10-01 18:04:25 -050012from kubernetes.stream import stream
Alex7b0ee9a2021-09-21 17:16:17 -050013from kubernetes.client.rest import ApiException
Alex0bcf31b2022-03-29 17:38:58 -050014from urllib3.exceptions import MaxRetryError
Alex5cace3b2021-11-10 16:40:37 -060015from time import time, sleep
Alex9a4ad212020-10-01 18:04:25 -050016
17from cfg_checker.common import logger, logger_cli
Alex7b0ee9a2021-09-21 17:16:17 -050018from cfg_checker.common.decorators import retry
Alex5cace3b2021-11-10 16:40:37 -060019from cfg_checker.common.exception import CheckerException, \
20 InvalidReturnException, KubeException
Alex9a4ad212020-10-01 18:04:25 -050021from cfg_checker.common.file_utils import create_temp_file_with_content
22from cfg_checker.common.other import utils, shell
23from cfg_checker.common.ssh_utils import ssh_shell_p
Alex359e5752021-08-16 17:28:30 -050024from cfg_checker.common.const import ENV_LOCAL
Alex9a4ad212020-10-01 18:04:25 -050025
Alex7b0ee9a2021-09-21 17:16:17 -050026
Alex9a4ad212020-10-01 18:04:25 -050027urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
28
29
30def _init_kube_conf_local(config):
31 # Init kube library locally
Alex359e5752021-08-16 17:28:30 -050032 _path = "local:{}".format(config.kube_config_path)
Alex9a4ad212020-10-01 18:04:25 -050033 try:
Alexc4f59622021-08-27 13:42:00 -050034 kconfig.load_kube_config(config_file=config.kube_config_path)
Alex33747812021-04-07 10:11:39 -050035 if config.insecure:
36 kconfig.assert_hostname = False
37 kconfig.client_side_validation = False
Alex9a4ad212020-10-01 18:04:25 -050038 logger_cli.debug(
Alexc4f59622021-08-27 13:42:00 -050039 "... found Kube env: core, {}". format(
Alex9a4ad212020-10-01 18:04:25 -050040 ",".join(
41 kclient.CoreApi().get_api_versions().versions
42 )
43 )
44 )
Alexc4f59622021-08-27 13:42:00 -050045 return kconfig, kclient.ApiClient(), _path
Alex9a4ad212020-10-01 18:04:25 -050046 except Exception as e:
47 logger.warn("Failed to init local Kube client: {}".format(
48 str(e)
49 )
50 )
Alex359e5752021-08-16 17:28:30 -050051 return None, None, _path
Alex9a4ad212020-10-01 18:04:25 -050052
53
54def _init_kube_conf_remote(config):
55 # init remote client
56 # Preload Kube token
57 """
58 APISERVER=$(kubectl config view --minify |
59 grep server | cut -f 2- -d ":" | tr -d " ")
60 SECRET_NAME=$(kubectl get secrets |
61 grep ^default | cut -f1 -d ' ')
62 TOKEN=$(kubectl describe secret $SECRET_NAME |
63 grep -E '^token' | cut -f2 -d':' | tr -d " ")
64
65 echo "Detected API Server at: '${APISERVER}'"
66 echo "Got secret: '${SECRET_NAME}'"
67 echo "Loaded token: '${TOKEN}'"
68
69 curl $APISERVER/api
70 --header "Authorization: Bearer $TOKEN" --insecure
71 """
72 import yaml
Alex359e5752021-08-16 17:28:30 -050073 _path = ''
Alexc4f59622021-08-27 13:42:00 -050074 # Try to load remote config only if it was not detected already
75 if not config.kube_config_detected and not config.env_name == ENV_LOCAL:
Alex359e5752021-08-16 17:28:30 -050076 _path = "{}@{}:{}".format(
77 config.ssh_user,
78 config.ssh_host,
79 config.kube_config_path
80 )
Alex9d913532021-03-24 18:01:45 -050081 _c_data = ssh_shell_p(
Alexc4f59622021-08-27 13:42:00 -050082 "cat " + config.kube_config_path,
Alex9d913532021-03-24 18:01:45 -050083 config.ssh_host,
84 username=config.ssh_user,
85 keypath=config.ssh_key,
86 piped=False,
87 use_sudo=config.ssh_uses_sudo,
88 )
89 else:
Alex359e5752021-08-16 17:28:30 -050090 _path = "local:{}".format(config.kube_config_path)
Alex9d913532021-03-24 18:01:45 -050091 with open(config.kube_config_path, 'r') as ff:
92 _c_data = ff.read()
Alex9a4ad212020-10-01 18:04:25 -050093
Alex359e5752021-08-16 17:28:30 -050094 if len(_c_data) < 1:
95 return None, None, _path
96
Alex9a4ad212020-10-01 18:04:25 -050097 _conf = yaml.load(_c_data, Loader=yaml.SafeLoader)
98
99 _kube_conf = kclient.Configuration()
100 # A remote host configuration
101
102 # To work with remote cluster, we need to extract these
103 # keys = ['host', 'ssl_ca_cert', 'cert_file', 'key_file', 'verify_ssl']
104 # When v12 of the client will be release, we will use load_from_dict
105
106 _kube_conf.ssl_ca_cert = create_temp_file_with_content(
107 base64.standard_b64decode(
108 _conf['clusters'][0]['cluster']['certificate-authority-data']
109 )
110 )
111 _host = _conf['clusters'][0]['cluster']['server']
112 _kube_conf.cert_file = create_temp_file_with_content(
113 base64.standard_b64decode(
114 _conf['users'][0]['user']['client-certificate-data']
115 )
116 )
117 _kube_conf.key_file = create_temp_file_with_content(
118 base64.standard_b64decode(
119 _conf['users'][0]['user']['client-key-data']
120 )
121 )
122 if "http" not in _host or "443" not in _host:
123 logger_cli.error(
124 "Failed to extract Kube host: '{}'".format(_host)
125 )
126 else:
127 logger_cli.debug(
Alexc4f59622021-08-27 13:42:00 -0500128 "... 'context' host extracted: '{}' via SSH@{}".format(
Alex9a4ad212020-10-01 18:04:25 -0500129 _host,
130 config.ssh_host
131 )
132 )
133
134 # Substitute context host to ours
135 _tmp = _host.split(':')
136 _kube_conf.host = \
137 _tmp[0] + "://" + config.mcp_host + ":" + _tmp[2]
138 config.kube_port = _tmp[2]
139 logger_cli.debug(
Alexc4f59622021-08-27 13:42:00 -0500140 "... kube remote host updated to {}".format(
Alex9a4ad212020-10-01 18:04:25 -0500141 _kube_conf.host
142 )
143 )
144 _kube_conf.verify_ssl = False
145 _kube_conf.debug = config.debug
Alex33747812021-04-07 10:11:39 -0500146 if config.insecure:
147 _kube_conf.assert_hostname = False
148 _kube_conf.client_side_validation = False
149
Alex9a4ad212020-10-01 18:04:25 -0500150 # Nevertheless if you want to do it
151 # you can with these 2 parameters
152 # configuration.verify_ssl=True
153 # ssl_ca_cert is the filepath
154 # to the file that contains the certificate.
155 # configuration.ssl_ca_cert="certificate"
156
157 # _kube_conf.api_key = {
158 # "authorization": "Bearer " + config.kube_token
159 # }
160
161 # Create a ApiClient with our config
162 _kube_api = kclient.ApiClient(_kube_conf)
163
Alex359e5752021-08-16 17:28:30 -0500164 return _kube_conf, _kube_api, _path
Alex9a4ad212020-10-01 18:04:25 -0500165
166
167class KubeApi(object):
168 def __init__(self, config):
169 self.config = config
Alex359e5752021-08-16 17:28:30 -0500170 self.initialized = self._init_kclient()
Alex9a4ad212020-10-01 18:04:25 -0500171 self.last_response = None
172
173 def _init_kclient(self):
174 # if there is no password - try to get local, if this available
Alex359e5752021-08-16 17:28:30 -0500175 logger_cli.debug("... init kube config")
Alex9a4ad212020-10-01 18:04:25 -0500176 if self.config.env_name == "local":
Alex359e5752021-08-16 17:28:30 -0500177 self.kConf, self.kApi, self.kConfigPath = _init_kube_conf_local(
178 self.config
179 )
Alex9a4ad212020-10-01 18:04:25 -0500180 self.is_local = True
Alexc4f59622021-08-27 13:42:00 -0500181 # Try to load local config data
182 if self.config.kube_config_path and \
183 os.path.exists(self.config.kube_config_path):
184 _cmd = "cat " + self.config.kube_config_path
185 _c_data = shell(_cmd)
Alex9a4ad212020-10-01 18:04:25 -0500186 _conf = yaml.load(_c_data, Loader=yaml.SafeLoader)
187 self.user_keypath = create_temp_file_with_content(
188 base64.standard_b64decode(
189 _conf['users'][0]['user']['client-key-data']
190 )
191 )
192 self.yaml_conf = _c_data
193 else:
Alex359e5752021-08-16 17:28:30 -0500194 self.kConf, self.kApi, self.kConfigPath = _init_kube_conf_remote(
195 self.config
196 )
Alex9a4ad212020-10-01 18:04:25 -0500197 self.is_local = False
198
Alex359e5752021-08-16 17:28:30 -0500199 if self.kConf is None or self.kApi is None:
200 return False
201 else:
202 return True
203
Alex9a4ad212020-10-01 18:04:25 -0500204 def get_versions_api(self):
205 # client.CoreApi().get_api_versions().versions
206 return kclient.VersionApi(self.kApi)
207
208
209class KubeRemote(KubeApi):
210 def __init__(self, config):
211 super(KubeRemote, self).__init__(config)
Alex1f90e7b2021-09-03 15:31:28 -0500212 self._appsV1 = None
213 self._podV1 = None
Alexdcb792f2021-10-04 14:24:21 -0500214 self._custom = None
215
216 @property
217 def CustomObjects(self):
218 if not self._custom:
219 self._custom = kclient.CustomObjectsApi(self.kApi)
220 return self._custom
Alex9a4ad212020-10-01 18:04:25 -0500221
222 @property
223 def CoreV1(self):
Alexb2129542021-11-23 15:49:42 -0600224 if self.is_local:
225 return kclient.CoreV1Api(kclient.ApiClient())
226 else:
227 return kclient.CoreV1Api(kclient.ApiClient(self.kConf))
Alex9a4ad212020-10-01 18:04:25 -0500228
Alex1f90e7b2021-09-03 15:31:28 -0500229 @property
230 def AppsV1(self):
231 if not self._appsV1:
232 self._appsV1 = kclient.AppsV1Api(self.kApi)
233 return self._appsV1
234
235 @property
236 def PodsV1(self):
237 if not self._podsV1:
238 self._podsV1 = kclient.V1Pod(self.kApi)
239 return self._podsV1
240
Alex9a4ad212020-10-01 18:04:25 -0500241 @staticmethod
242 def _typed_list_to_dict(i_list):
243 _dict = {}
244 for _item in i_list:
245 _d = _item.to_dict()
246 _type = _d.pop("type")
247 _dict[_type.lower()] = _d
248
249 return _dict
250
251 @staticmethod
252 def _get_listed_attrs(items, _path):
253 _list = []
254 for _n in items:
255 _list.append(utils.rgetattr(_n, _path))
256
257 return _list
258
Alex1f90e7b2021-09-03 15:31:28 -0500259 @staticmethod
260 def safe_get_item_by_name(api_resource, _name):
261 for item in api_resource.items:
262 if item.metadata.name == _name:
263 return item
264
265 return None
266
Alex2a7657c2021-11-10 20:51:34 -0600267 def wait_for_phase_on_start(self, _func, phase, *args, **kwargs):
Alex5cace3b2021-11-10 16:40:37 -0600268 w = watch.Watch()
269 start_time = time()
270 for event in w.stream(_func, *args, **kwargs):
271 if event["object"].status.phase == phase:
272 w.stop()
273 end_time = time()
274 logger_cli.debug(
275 "... bacame '{}' in {:0.2f} sec".format(
276 phase,
277 end_time-start_time
278 )
279 )
280 return
281 # event.type: ADDED, MODIFIED, DELETED
282 if event["type"] == "DELETED":
283 # Pod was deleted while we were waiting for it to start.
284 logger_cli.debug("... deleted before started")
285 w.stop()
286 return
287
Alex2a7657c2021-11-10 20:51:34 -0600288 def wait_for_event(self, _func, event, *args, **kwargs):
289 w = watch.Watch()
290 for event in w.stream(_func, *args, **kwargs):
291 # event.type: ADDED, MODIFIED, DELETED
292 if event["type"] == event:
293 # Pod was deleted while we were waiting for it to start.
294 logger_cli.debug("... got {} event".format(event["type"]))
295 w.stop()
296 return
297
Ievgeniia Zadorozhna33d5c9a2025-09-04 17:36:55 +0200298 def get_k0rdent_release(self, mgmt_name="kcm"):
299 group = "k0rdent.mirantis.com"
300 version = "v1beta1"
301 plural = "managements"
302
303 try:
304 mgmt_list = self.get_custom_resource(group, version, plural) or {}
305 for item in mgmt_list.get("items", []):
306 name = item.get("metadata", {}).get("name", "").lower()
307 if name == mgmt_name.lower():
308 return item.get("status", {}).get("release", "Unknown")
309 logger.warning(
310 f"Management object '{mgmt_name}' not found in CR list.")
311 except Exception as e:
312 logger.warning(
313 f"Failed to get the k0rdent release from the '{mgmt_name}' mgmt CRD: {e}")
314 return "Unknown"
315
Alex9a4ad212020-10-01 18:04:25 -0500316 def get_node_info(self, http=False):
317 # Query API for the nodes and do some presorting
318 _nodes = {}
319 if http:
320 _raw_nodes = self.CoreV1.list_node_with_http_info()
321 else:
322 _raw_nodes = self.CoreV1.list_node()
323
324 if not isinstance(_raw_nodes, kclient.models.v1_node_list.V1NodeList):
325 raise InvalidReturnException(
326 "Invalid return type: '{}'".format(type(_raw_nodes))
327 )
328
329 for _n in _raw_nodes.items:
330 _name = _n.metadata.name
331 _d = _n.to_dict()
332 # parse inner data classes as dicts
333 _d['addresses'] = self._typed_list_to_dict(_n.status.addresses)
334 _d['conditions'] = self._typed_list_to_dict(_n.status.conditions)
335 # Update 'status' type
336 if isinstance(_d['conditions']['ready']['status'], str):
337 _d['conditions']['ready']['status'] = utils.to_bool(
338 _d['conditions']['ready']['status']
339 )
340 # Parse image names?
341 # TODO: Here is the place where we can parse each node image names
342
343 # Parse roles
344 _d['labels'] = {}
345 for _label, _data in _d["metadata"]["labels"].items():
346 if _data.lower() in ["true", "false"]:
347 _d['labels'][_label] = utils.to_bool(_data)
348 else:
349 _d['labels'][_label] = _data
350
351 # Save
352 _nodes[_name] = _d
353
354 # debug report on how many nodes detected
355 logger_cli.debug("...node items returned '{}'".format(len(_nodes)))
356
357 return _nodes
358
Alexdcb792f2021-10-04 14:24:21 -0500359 def get_pod_names_by_partial_name(self, partial_name, ns):
360 logger_cli.debug('... searching for pods with {}'.format(partial_name))
361 _pods = self.CoreV1.list_namespaced_pod(ns)
362 _names = self._get_listed_attrs(_pods.items, "metadata.name")
363 _pnames = [n for n in _names if partial_name in n]
364 if len(_pnames) > 1:
365 logger_cli.debug(
366 "... more than one pod found for '{}': {}\n".format(
367 partial_name,
368 ", ".join(_pnames)
369 )
370 )
371 elif len(_pnames) < 1:
372 logger_cli.warning(
373 "WARNING: No pods found for '{}'".format(partial_name)
374 )
375
376 return _pnames
377
378 def get_pods_by_partial_name(self, partial_name, ns):
379 logger_cli.debug('... searching for pods with {}'.format(partial_name))
380 _all_pods = self.CoreV1.list_namespaced_pod(ns)
381 # _names = self._get_listed_attrs(_pods.items, "metadata.name")
382 _pods = [_pod for _pod in _all_pods.items
383 if partial_name in _pod.metadata.name]
384 if len(_pods) > 1:
385 logger_cli.debug(
386 "... more than one pod found for '{}': {}\n".format(
387 partial_name,
388 ", ".join(partial_name)
389 )
390 )
391 elif len(_pods) < 1:
392 logger_cli.warning(
393 "WARNING: No pods found for '{}'".format(partial_name)
394 )
395
396 return _pods
397
Alex30380a42021-12-20 16:11:20 -0600398 @retry(ApiException, initial_wait=10)
Alex9a4ad212020-10-01 18:04:25 -0500399 def exec_on_target_pod(
400 self,
401 cmd,
402 pod_name,
403 namespace,
404 strict=False,
405 _request_timeout=120,
Alexb78191f2021-11-02 16:35:46 -0500406 arguments=None,
Alex9a4ad212020-10-01 18:04:25 -0500407 **kwargs
408 ):
Alexdcb792f2021-10-04 14:24:21 -0500409 _pname = ""
Alex9a4ad212020-10-01 18:04:25 -0500410 if not strict:
Alex1f90e7b2021-09-03 15:31:28 -0500411 logger_cli.debug(
412 "... searching for pods with the name '{}'".format(pod_name)
413 )
414 _pods = {}
Alex7b0ee9a2021-09-21 17:16:17 -0500415 _pods = self.CoreV1.list_namespaced_pod(namespace)
Alex1f90e7b2021-09-03 15:31:28 -0500416 _names = self._get_listed_attrs(_pods.items, "metadata.name")
Alex33747812021-04-07 10:11:39 -0500417 _pnames = [n for n in _names if n.startswith(pod_name)]
418 if len(_pnames) > 1:
Alex9a4ad212020-10-01 18:04:25 -0500419 logger_cli.debug(
Alexc4f59622021-08-27 13:42:00 -0500420 "... more than one pod found for '{}': {}\n"
421 "... using first one".format(
Alex9a4ad212020-10-01 18:04:25 -0500422 pod_name,
Alex33747812021-04-07 10:11:39 -0500423 ", ".join(_pnames)
Alex9a4ad212020-10-01 18:04:25 -0500424 )
425 )
Alexdcb792f2021-10-04 14:24:21 -0500426 elif len(_pnames) < 1:
Alex9a4ad212020-10-01 18:04:25 -0500427 raise KubeException("No pods found for '{}'".format(pod_name))
Alexb78191f2021-11-02 16:35:46 -0500428 # in case of >1 and =1 we are taking 1st anyway
429 _pname = _pnames[0]
Alex9a4ad212020-10-01 18:04:25 -0500430 else:
431 _pname = pod_name
Alex33747812021-04-07 10:11:39 -0500432 logger_cli.debug(
Alexb78191f2021-11-02 16:35:46 -0500433 "... cmd: [CoreV1] exec {} -n {} -- {} '{}'".format(
Alex33747812021-04-07 10:11:39 -0500434 _pname,
435 namespace,
Alexb78191f2021-11-02 16:35:46 -0500436 cmd,
437 arguments
Alex33747812021-04-07 10:11:39 -0500438 )
439 )
Alex1f90e7b2021-09-03 15:31:28 -0500440 # Set preload_content to False to preserve JSON
441 # If not, output gets converted to str
442 # Which causes to change " to '
443 # After that json.loads(...) fail
Alex7b0ee9a2021-09-21 17:16:17 -0500444 cmd = cmd if isinstance(cmd, list) else cmd.split()
Alexb78191f2021-11-02 16:35:46 -0500445 if arguments:
446 cmd += [arguments]
Alexb2129542021-11-23 15:49:42 -0600447 # Make sure that CoreV1 is fresh before calling it
Alex1f90e7b2021-09-03 15:31:28 -0500448 _pod_stream = stream(
Alex9a4ad212020-10-01 18:04:25 -0500449 self.CoreV1.connect_get_namespaced_pod_exec,
450 _pname,
451 namespace,
Alex7b0ee9a2021-09-21 17:16:17 -0500452 command=cmd,
Alex9a4ad212020-10-01 18:04:25 -0500453 stderr=True,
454 stdin=False,
455 stdout=True,
456 tty=False,
457 _request_timeout=_request_timeout,
Alex1f90e7b2021-09-03 15:31:28 -0500458 _preload_content=False,
Alex9a4ad212020-10-01 18:04:25 -0500459 **kwargs
460 )
Alex1f90e7b2021-09-03 15:31:28 -0500461 # run for timeout
462 _pod_stream.run_forever(timeout=_request_timeout)
463 # read the output
Alex7b0ee9a2021-09-21 17:16:17 -0500464 _output = _pod_stream.read_stdout()
Alexb78191f2021-11-02 16:35:46 -0500465 _error = _pod_stream.read_stderr()
466 if _error:
467 # copy error to output
Alexe4de1142022-11-04 19:26:03 -0500468 logger.warning(
Alexb78191f2021-11-02 16:35:46 -0500469 "WARNING: cmd of '{}' returned error:\n{}\n".format(
470 " ".join(cmd),
471 _error
472 )
473 )
474 if not _output:
475 _output = _error
Alex7b0ee9a2021-09-21 17:16:17 -0500476 # Send output
477 return _output
Alex9a4ad212020-10-01 18:04:25 -0500478
Alex1f90e7b2021-09-03 15:31:28 -0500479 def ensure_namespace(self, ns):
480 """
481 Ensure that given namespace exists
482 """
483 # list active namespaces
484 _v1NamespaceList = self.CoreV1.list_namespace()
485 _ns = self.safe_get_item_by_name(_v1NamespaceList, ns)
486
487 if _ns is None:
488 logger_cli.debug("... creating namespace '{}'".format(ns))
Alexdcb792f2021-10-04 14:24:21 -0500489 _new_ns = kclient.V1Namespace()
490 _new_ns.metadata = kclient.V1ObjectMeta(name=ns)
491 _r = self.CoreV1.create_namespace(_new_ns)
Alex1f90e7b2021-09-03 15:31:28 -0500492 # TODO: check return on fail
493 if not _r:
494 return False
495 else:
496 logger_cli.debug("... found existing namespace '{}'".format(ns))
497
498 return True
499
500 def get_daemon_set_by_name(self, ns, name):
501 return self.safe_get_item_by_name(
502 self.AppsV1.list_namespaced_daemon_set(ns),
503 name
504 )
505
506 def create_config_map(self, ns, name, source, recreate=True):
507 """
508 Creates/Overwrites ConfigMap in working namespace
509 """
510 # Prepare source
511 logger_cli.debug(
512 "... preparing config map '{}/{}' with files from '{}'".format(
513 ns,
514 name,
515 source
516 )
517 )
518 _data = {}
519 if os.path.isfile(source):
520 # populate data with one file
521 with open(source, 'rt') as fS:
522 _data[os.path.split(source)[1]] = fS.read()
523 elif os.path.isdir(source):
524 # walk dirs and populate all 'py' files
525 for path, dirs, files in os.walk(source):
526 _e = ('.py')
527 _subfiles = (_fl for _fl in files
528 if _fl.endswith(_e) and not _fl.startswith('.'))
529 for _file in _subfiles:
530 with open(os.path.join(path, _file), 'rt') as fS:
531 _data[_file] = fS.read()
532
533 _cm = kclient.V1ConfigMap()
534 _cm.metadata = kclient.V1ObjectMeta(name=name, namespace=ns)
535 _cm.data = _data
536 logger_cli.debug(
537 "... prepared config map with {} scripts".format(len(_data))
538 )
539 # Query existing configmap, delete if needed
540 _existing_cm = self.safe_get_item_by_name(
541 self.CoreV1.list_namespaced_config_map(namespace=ns),
542 name
543 )
544 if _existing_cm is not None:
545 self.CoreV1.replace_namespaced_config_map(
546 namespace=ns,
547 name=name,
548 body=_cm
549 )
550 logger_cli.debug(
551 "... replaced existing config map '{}/{}'".format(
552 ns,
553 name
554 )
555 )
556 else:
557 # Create it
558 self.CoreV1.create_namespaced_config_map(
559 namespace=ns,
560 body=_cm
561 )
562 logger_cli.debug("... created config map '{}/{}'".format(
563 ns,
564 name
565 ))
566
567 return _data.keys()
568
569 def prepare_daemonset_from_yaml(self, ns, ds_yaml):
570 _name = ds_yaml['metadata']['name']
571 _ds = self.get_daemon_set_by_name(ns, _name)
572
573 if _ds is not None:
574 logger_cli.debug(
575 "... found existing daemonset '{}'".format(_name)
576 )
577 _r = self.AppsV1.replace_namespaced_daemon_set(
578 _ds.metadata.name,
579 _ds.metadata.namespace,
580 body=ds_yaml
581 )
582 logger_cli.debug(
583 "... replacing existing daemonset '{}'".format(_name)
584 )
585 return _r
586 else:
587 logger_cli.debug(
588 "... creating daemonset '{}'".format(_name)
589 )
590 _r = self.AppsV1.create_namespaced_daemon_set(ns, body=ds_yaml)
591 return _r
592
593 def delete_daemon_set_by_name(self, ns, name):
594 return self.AppsV1.delete_namespaced_daemon_set(name, ns)
595
596 def exec_on_all_pods(self, pods):
597 """
598 Create multiple threads to execute script on all target pods
599 """
600 # Create map for threads: [[node_name, ns, pod_name]...]
601 _pod_list = []
602 for item in pods.items:
603 _pod_list.append(
604 [
605 item.spec.nodeName,
606 item.metadata.namespace,
607 item.metadata.name
608 ]
609 )
610
611 # map func and cmd
Alexdcb792f2021-10-04 14:24:21 -0500612 logger_cli.error("ERROR: 'exec_on_all_pods'is not implemented yet")
Alex1f90e7b2021-09-03 15:31:28 -0500613 # create result list
614
615 return []
Alex7b0ee9a2021-09-21 17:16:17 -0500616
Alexb2129542021-11-23 15:49:42 -0600617 @retry(ApiException, initial_wait=5)
Alex7b0ee9a2021-09-21 17:16:17 -0500618 def get_pods_for_daemonset(self, ds):
619 # get all pod names for daemonset
620 logger_cli.debug(
621 "... extracting pod names from daemonset '{}'".format(
622 ds.metadata.name
623 )
624 )
625 _ns = ds.metadata.namespace
626 _name = ds.metadata.name
627 _pods = self.CoreV1.list_namespaced_pod(
628 namespace=_ns,
629 label_selector='name={}'.format(_name)
630 )
631 return _pods
632
Alexbdc72742021-12-23 13:26:05 -0600633 @retry(ApiException, initial_wait=10)
Alex7b0ee9a2021-09-21 17:16:17 -0500634 def put_string_buffer_to_pod_as_textfile(
635 self,
636 pod_name,
637 namespace,
638 buffer,
639 filepath,
640 _request_timeout=120,
641 **kwargs
642 ):
643 _command = ['/bin/sh']
644 response = stream(
645 self.CoreV1.connect_get_namespaced_pod_exec,
646 pod_name,
647 namespace,
648 command=_command,
649 stderr=True,
650 stdin=True,
651 stdout=True,
652 tty=False,
653 _request_timeout=_request_timeout,
654 _preload_content=False,
655 **kwargs
656 )
657
658 # if json
659 # buffer = json.dumps(_dict, indent=2).encode('utf-8')
660
661 commands = [
662 bytes("cat <<'EOF' >" + filepath + "\n", 'utf-8'),
663 buffer,
664 bytes("\n" + "EOF\n", 'utf-8')
665 ]
666
667 while response.is_open():
668 response.update(timeout=1)
669 if response.peek_stdout():
670 logger_cli.debug("... STDOUT: %s" % response.read_stdout())
671 if response.peek_stderr():
672 logger_cli.debug("... STDERR: %s" % response.read_stderr())
673 if commands:
674 c = commands.pop(0)
Alexb2129542021-11-23 15:49:42 -0600675 logger_cli.debug("... running command... {}".format(c))
Alex7b0ee9a2021-09-21 17:16:17 -0500676 response.write_stdin(str(c, encoding='utf-8'))
677 else:
678 break
679 response.close()
680
Alex7b0ee9a2021-09-21 17:16:17 -0500681 return
Alexdcb792f2021-10-04 14:24:21 -0500682
683 def get_custom_resource(self, group, version, plural):
684 # Get it
685 # Example:
686 # kubernetes.client.CustomObjectsApi().list_cluster_custom_object(
687 # group="networking.istio.io",
688 # version="v1alpha3",
689 # plural="serviceentries"
690 # )
691 return self.CustomObjects.list_cluster_custom_object(
692 group=group,
693 version=version,
694 plural=plural
695 )
Alex5cace3b2021-11-10 16:40:37 -0600696
697 def init_pvc_resource(
698 self,
699 name,
700 storage_class,
701 size,
702 ns="qa-space",
703 mode="ReadWriteOnce"
704 ):
705 """Return the Kubernetes PVC resource"""
706 return kclient.V1PersistentVolumeClaim(
707 api_version='v1',
708 kind='PersistentVolumeClaim',
709 metadata=kclient.V1ObjectMeta(
710 name=name,
711 namespace=ns,
712 labels={"name": name}
713 ),
714 spec=kclient.V1PersistentVolumeClaimSpec(
715 storage_class_name=storage_class,
716 access_modes=[mode],
717 resources=kclient.V1ResourceRequirements(
718 requests={'storage': size}
719 )
720 )
721 )
722
723 def init_pv_resource(
724 self,
725 name,
726 storage_class,
727 size,
728 path,
729 ns="qa-space",
730 mode="ReadWriteOnce"
731 ):
732 """Return the Kubernetes PVC resource"""
733 return kclient.V1PersistentVolume(
734 api_version='v1',
735 kind='PersistentVolume',
736 metadata=kclient.V1ObjectMeta(
737 name=name,
738 namespace=ns,
739 labels={"name": name}
740 ),
741 spec=kclient.V1PersistentVolumeSpec(
742 storage_class_name=storage_class,
743 access_modes=[mode],
744 capacity={'storage': size},
745 host_path=kclient.V1HostPathVolumeSource(path=path)
746 )
747 )
748
749 def init_service(
750 self,
751 name,
752 port,
753 clusterip=None,
754 ns="qa-space"
755 ):
756 """ Inits a V1Service object with data for benchmark agent"""
757 _meta = kclient.V1ObjectMeta(
758 name=name,
759 namespace=ns,
760 labels={"name": name}
761 )
762 _port = kclient.V1ServicePort(
763 port=port,
764 protocol="TCP",
765 target_port=port
766 )
767 _spec = kclient.V1ServiceSpec(
768 # cluster_ip=clusterip,
769 selector={"name": name},
770 # type="ClusterIP",
771 ports=[_port]
772 )
773 return kclient.V1Service(
774 api_version="v1",
775 kind="Service",
776 metadata=_meta,
777 spec=_spec
778 )
779
780 def prepare_pv(self, pv_object):
Alex2a7657c2021-11-10 20:51:34 -0600781 _existing = self.get_pv_by_name(pv_object.metadata.name)
Alex5cace3b2021-11-10 16:40:37 -0600782 if _existing is not None:
783 self.CoreV1.replace_persistent_volume(
784 pv_object.metadata.name,
785 pv_object
786 )
787 else:
788 self.CoreV1.create_persistent_volume(pv_object)
789
Alex2a7657c2021-11-10 20:51:34 -0600790 return self.wait_for_phase(
791 "pv",
792 pv_object.metadata.name,
793 None,
794 ["Available", "Bound"]
Alex5cace3b2021-11-10 16:40:37 -0600795 )
796
797 def prepare_pvc(self, pvc_object):
Alex2a7657c2021-11-10 20:51:34 -0600798 _existing = self.get_pvc_by_name_and_ns(
799 pvc_object.metadata.name,
800 pvc_object.metadata.namespace
Alex5cace3b2021-11-10 16:40:37 -0600801 )
802 if _existing is not None:
803 _size_r = pvc_object.spec.resources.requests["storage"]
804 _size_e = _existing.spec.resources.requests["storage"]
Alex2a7657c2021-11-10 20:51:34 -0600805 logger_cli.info(
806 "-> Found PVC '{}/{}' with {}. Requested: {}'".format(
Alex5cace3b2021-11-10 16:40:37 -0600807 pvc_object.metadata.namespace,
808 pvc_object.metadata.name,
809 _size_e,
810 _size_r
811 )
812 )
813 if _size_r != _size_e:
814 raise CheckerException(
815 "ERROR: PVC exists on the cloud with different size "
816 "than needed. Please cleanup!"
817 )
818 else:
819 logger_cli.debug(
820 "... creating pvc '{}'".format(pvc_object.metadata.name)
821 )
822 self.CoreV1.create_namespaced_persistent_volume_claim(
823 pvc_object.metadata.namespace,
824 pvc_object
825 )
826
Alex2a7657c2021-11-10 20:51:34 -0600827 return self.wait_for_phase(
828 "pvc",
829 pvc_object.metadata.name,
830 pvc_object.metadata.namespace,
831 ["Available", "Bound"]
832 )
833
834 def get_pod_by_name_and_ns(self, name, ns):
835 return self.safe_get_item_by_name(
836 self.CoreV1.list_namespaced_pod(
837 ns,
838 label_selector='name={}'.format(name)
839 ),
840 name
841 )
842
Alexb2129542021-11-23 15:49:42 -0600843 def list_pods(self, ns, label_str=None):
844 return self.CoreV1.list_namespaced_pod(
845 ns,
846 label_selector=label_str
847 )
848
Alex2a7657c2021-11-10 20:51:34 -0600849 def get_svc_by_name_and_ns(self, name, ns):
850 return self.safe_get_item_by_name(
851 self.CoreV1.list_namespaced_service(
852 ns,
853 label_selector='name={}'.format(name)
854 ),
855 name
856 )
857
Alexb2129542021-11-23 15:49:42 -0600858 def list_svc(self, ns, label_str=None):
859 return self.CoreV1.list_namespaced_service(
860 ns,
861 label_selector=label_str
862 )
863
Alex2a7657c2021-11-10 20:51:34 -0600864 def get_pvc_by_name_and_ns(self, name, ns):
865 return self.safe_get_item_by_name(
866 self.CoreV1.list_namespaced_persistent_volume_claim(
867 ns,
868 label_selector='name={}'.format(name)
869 ),
870 name
871 )
872
Alexb2129542021-11-23 15:49:42 -0600873 def list_pvc(self, ns, label_str=None):
874 return self.CoreV1.list_namespaced_persistent_volume_claim(
875 ns,
876 label_selector=label_str
877 )
878
Alex2a7657c2021-11-10 20:51:34 -0600879 def get_pv_by_name(self, name):
880 return self.safe_get_item_by_name(
881 self.CoreV1.list_persistent_volume(
882 label_selector='name={}'.format(name)
883 ),
884 name
885 )
886
Alexb2129542021-11-23 15:49:42 -0600887 def list_pv(self, label_str=None):
888 return self.CoreV1.list_persistent_volume(
889 label_selector=label_str
890 )
891
Alex2a7657c2021-11-10 20:51:34 -0600892 def wait_for_phase(self, ttype, name, ns, phase_list, timeout=120):
893 logger_cli.debug(
894 "... waiting '{}'s until {} is '{}'".format(
895 timeout,
896 ttype,
897 ", ".join(phase_list)
898 )
899 )
900 while timeout > 0:
901 if ttype == "pod":
902 _t = self.get_pod_by_name_and_ns(name, ns)
903 elif ttype == "svc":
904 _t = self.get_svc_by_name_and_ns(name, ns)
905 elif ttype == "pvc":
906 _t = self.get_pvc_by_name_and_ns(name, ns)
907 elif ttype == "pv":
908 _t = self.get_pv_by_name(name)
909 if "Terminated" in phase_list and not _t:
910 if ns:
911 _s = "... {} {}/{} not found".format(ttype, ns, name)
Alex5cace3b2021-11-10 16:40:37 -0600912 else:
Alex2a7657c2021-11-10 20:51:34 -0600913 _s = "... {} '{}' not found".format(ttype, name)
914 logger_cli.debug(_s)
915 return None
916 logger_cli.debug("... {} is '{}'".format(ttype, _t.status.phase))
917 if _t.status.phase in phase_list:
918 return _t
919 sleep(2)
920 timeout -= 2
Alex5cace3b2021-11-10 16:40:37 -0600921 raise CheckerException(
Alex2a7657c2021-11-10 20:51:34 -0600922 "Timed out waiting for {} '{}' in '{}'".format(
923 ttype,
924 name,
925 ", ".join(ttype)
Alex5cace3b2021-11-10 16:40:37 -0600926 )
927 )
928
929 def prepare_pod_from_yaml(self, pod_yaml):
Alex2a7657c2021-11-10 20:51:34 -0600930 _existing = self.get_pod_by_name_and_ns(
931 pod_yaml['metadata']['name'],
932 pod_yaml['metadata']['namespace']
Alex5cace3b2021-11-10 16:40:37 -0600933 )
934 if _existing is not None:
Alexbfa947c2021-11-11 18:14:28 -0600935 logger_cli.info(
936 "-> Found pod '{}/{}'. Reusing.".format(
Alex5cace3b2021-11-10 16:40:37 -0600937 pod_yaml['metadata']['namespace'],
938 pod_yaml['metadata']['name']
939 )
940 )
941 return _existing
942 else:
943 self.CoreV1.create_namespaced_pod(
944 pod_yaml['metadata']['namespace'],
945 pod_yaml
946 )
Alex2a7657c2021-11-10 20:51:34 -0600947 return self.wait_for_phase(
948 "pod",
949 pod_yaml['metadata']['name'],
950 pod_yaml['metadata']['namespace'],
951 ["Running"]
Alex5cace3b2021-11-10 16:40:37 -0600952 )
953
954 def expose_pod_port(self, pod_object, port, ns="qa-space"):
Alex2a7657c2021-11-10 20:51:34 -0600955 _existing = self.get_svc_by_name_and_ns(
956 pod_object.metadata.name,
957 pod_object.metadata.namespace
Alex5cace3b2021-11-10 16:40:37 -0600958 )
959 if _existing is not None:
960 # TODO: Check port number?
Alex2a7657c2021-11-10 20:51:34 -0600961 logger_cli.info(
962 "-> Pod already exposed '{}/{}:{}'. Reusing.".format(
Alex5cace3b2021-11-10 16:40:37 -0600963 pod_object.metadata.namespace,
964 pod_object.metadata.name,
965 port
966 )
967 )
968 return _existing
969 else:
970 logger_cli.debug(
971 "... creating service for pod {}/{}: {}:{}".format(
972 pod_object.metadata.namespace,
973 pod_object.metadata.name,
974 pod_object.status.pod_ip,
975 port
976 )
977 )
978 _svc = self.init_service(
979 pod_object.metadata.name,
980 port
981 )
982 return self.CoreV1.create_namespaced_service(
983 pod_object.metadata.namespace,
984 _svc
985 )
Alex0989ecf2022-03-29 13:43:21 -0500986
Alex0bcf31b2022-03-29 17:38:58 -0500987 def list_namespaces(self):
988 return self.CoreV1.list_namespace()
989
990 @retry(ApiException, initial_wait=2)
991 def get_pod_logs(self, podname, container, ns, tail_lines=50):
Alex0989ecf2022-03-29 13:43:21 -0500992 # Params
993 # read log of the specified Pod # noqa: E501 This method makes a synchronous HTTP request by default. To make an asynchronous HTTP request, please pass async_req=True
994
995 # >>> thread = api.read_namespaced_pod_log(name, namespace,
996 # async_req=True)
997 # >>> result = thread.get()
998 # :param async_req bool: execute request asynchronously
999 # :param str name: name of the Pod (required)
1000 # :param str namespace: object name and auth scope, such as for teams
1001 # and projects (required)
1002 # :param str container: The container for which to stream logs.
1003 # Defaults to only container if there is one container in
1004 # the pod.
1005 # :param bool follow: Follow the log stream of the pod. Defaults to
1006 # false.
1007 # :param bool insecure_skip_tls_verify_backend:
1008 # insecureSkipTLSVerifyBackend indicates that the apiserver
1009 # should not confirm the validity of the serving certificate
1010 # of the backend it is connecting to. This will make the
1011 # HTTPS connection between the apiserver and the backend
1012 # insecure. This means the apiserver cannot verify the log
1013 # data it is receiving came from the real kubelet. If the
1014 # kubelet is configured to verify the apiserver's TLS
1015 # credentials, it does not mean the connection to the real
1016 # kubelet is vulnerable to a man in the middle attack (e.g.
1017 # an attacker could not intercept the actual log data coming
1018 # from the real kubelet).
1019 # :param int limit_bytes: If set, the number of bytes to read from the
1020 # server before terminating the log output. This may not
1021 # display a complete final line of logging, and may return
1022 # slightly more or slightly less than the specified limit.
1023 # :param str pretty: If 'true', then the output is pretty printed.
1024 # :param bool previous: Return previous terminated container logs.
1025 # Defaults to false.
1026 # :param int since_seconds: A relative time in seconds before the
1027 # current time from which to show logs. If this value precedes
1028 # the time a pod was started, only logs since the pod start will
1029 # be returned. If this value is in the future, no logs will be
1030 # returned. Only one of sinceSeconds or sinceTime may be
1031 # specified.
1032 # :param int tail_lines: If set, the number of lines from the end of
1033 # the logs to show. If not specified, logs are shown from the
1034 # creation of the container or sinceSeconds or sinceTime
1035 # :param bool timestamps: If true, add an RFC3339 or RFC3339Nano
1036 # timestamp at the beginning of every line of log output.
1037 # Defaults to false.
1038 # :param _preload_content: if False, the urllib3.HTTPResponse object
1039 # will be returned without reading/decoding response data.
1040 # Default is True.
1041 # :param _request_timeout: timeout setting for this request. If one
1042 # number provided, it will be total request timeout. It can
1043 # also be a pair (tuple) of (connection, read) timeouts.
1044 # :return: str
1045 # If the method is called asynchronously, returns the request
1046 # thread.
1047
Alex0bcf31b2022-03-29 17:38:58 -05001048 try:
1049 return self.CoreV1.read_namespaced_pod_log(
1050 name=podname,
1051 namespace=ns,
1052 container=container,
1053 timestamps=True,
1054 tail_lines=tail_lines,
1055 # pretty=True,
1056 _request_timeout=(1, 5)
1057 )
1058 except MaxRetryError as e:
1059 logger_cli.warning(
1060 "WARNING: Failed to retrieve log {}/{}:{}:\n{}".format(
1061 ns,
1062 podname,
1063 container,
1064 e.reason
1065 )
1066 )
1067 return ""