blob: f4c38ef64c971e7fd909853d3b5e88c8d56b82b8 [file] [log] [blame]
Alex0989ecf2022-03-29 13:43:21 -05001# Author: Alex Savatieiev (osavatieiev@mirantis.com; a.savex@gmail.com)
2# Copyright 2019-2022 Mirantis, Inc.
Alex9a4ad212020-10-01 18:04:25 -05003"""
4Module to handle interaction with Kube
5"""
6import base64
7import os
8import urllib3
9import yaml
10
Alex5cace3b2021-11-10 16:40:37 -060011from kubernetes import client as kclient, config as kconfig, watch
Alex9a4ad212020-10-01 18:04:25 -050012from kubernetes.stream import stream
Alex7b0ee9a2021-09-21 17:16:17 -050013from kubernetes.client.rest import ApiException
Alex5cace3b2021-11-10 16:40:37 -060014from time import time, sleep
Alex9a4ad212020-10-01 18:04:25 -050015
16from cfg_checker.common import logger, logger_cli
Alex7b0ee9a2021-09-21 17:16:17 -050017from cfg_checker.common.decorators import retry
Alex5cace3b2021-11-10 16:40:37 -060018from cfg_checker.common.exception import CheckerException, \
19 InvalidReturnException, KubeException
Alex9a4ad212020-10-01 18:04:25 -050020from cfg_checker.common.file_utils import create_temp_file_with_content
21from cfg_checker.common.other import utils, shell
22from cfg_checker.common.ssh_utils import ssh_shell_p
Alex359e5752021-08-16 17:28:30 -050023from cfg_checker.common.const import ENV_LOCAL
Alex9a4ad212020-10-01 18:04:25 -050024
Alex7b0ee9a2021-09-21 17:16:17 -050025
Alex9a4ad212020-10-01 18:04:25 -050026urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
27
28
29def _init_kube_conf_local(config):
30 # Init kube library locally
Alex359e5752021-08-16 17:28:30 -050031 _path = "local:{}".format(config.kube_config_path)
Alex9a4ad212020-10-01 18:04:25 -050032 try:
Alexc4f59622021-08-27 13:42:00 -050033 kconfig.load_kube_config(config_file=config.kube_config_path)
Alex33747812021-04-07 10:11:39 -050034 if config.insecure:
35 kconfig.assert_hostname = False
36 kconfig.client_side_validation = False
Alex9a4ad212020-10-01 18:04:25 -050037 logger_cli.debug(
Alexc4f59622021-08-27 13:42:00 -050038 "... found Kube env: core, {}". format(
Alex9a4ad212020-10-01 18:04:25 -050039 ",".join(
40 kclient.CoreApi().get_api_versions().versions
41 )
42 )
43 )
Alexc4f59622021-08-27 13:42:00 -050044 return kconfig, kclient.ApiClient(), _path
Alex9a4ad212020-10-01 18:04:25 -050045 except Exception as e:
46 logger.warn("Failed to init local Kube client: {}".format(
47 str(e)
48 )
49 )
Alex359e5752021-08-16 17:28:30 -050050 return None, None, _path
Alex9a4ad212020-10-01 18:04:25 -050051
52
53def _init_kube_conf_remote(config):
54 # init remote client
55 # Preload Kube token
56 """
57 APISERVER=$(kubectl config view --minify |
58 grep server | cut -f 2- -d ":" | tr -d " ")
59 SECRET_NAME=$(kubectl get secrets |
60 grep ^default | cut -f1 -d ' ')
61 TOKEN=$(kubectl describe secret $SECRET_NAME |
62 grep -E '^token' | cut -f2 -d':' | tr -d " ")
63
64 echo "Detected API Server at: '${APISERVER}'"
65 echo "Got secret: '${SECRET_NAME}'"
66 echo "Loaded token: '${TOKEN}'"
67
68 curl $APISERVER/api
69 --header "Authorization: Bearer $TOKEN" --insecure
70 """
71 import yaml
Alex359e5752021-08-16 17:28:30 -050072 _path = ''
Alexc4f59622021-08-27 13:42:00 -050073 # Try to load remote config only if it was not detected already
74 if not config.kube_config_detected and not config.env_name == ENV_LOCAL:
Alex359e5752021-08-16 17:28:30 -050075 _path = "{}@{}:{}".format(
76 config.ssh_user,
77 config.ssh_host,
78 config.kube_config_path
79 )
Alex9d913532021-03-24 18:01:45 -050080 _c_data = ssh_shell_p(
Alexc4f59622021-08-27 13:42:00 -050081 "cat " + config.kube_config_path,
Alex9d913532021-03-24 18:01:45 -050082 config.ssh_host,
83 username=config.ssh_user,
84 keypath=config.ssh_key,
85 piped=False,
86 use_sudo=config.ssh_uses_sudo,
87 )
88 else:
Alex359e5752021-08-16 17:28:30 -050089 _path = "local:{}".format(config.kube_config_path)
Alex9d913532021-03-24 18:01:45 -050090 with open(config.kube_config_path, 'r') as ff:
91 _c_data = ff.read()
Alex9a4ad212020-10-01 18:04:25 -050092
Alex359e5752021-08-16 17:28:30 -050093 if len(_c_data) < 1:
94 return None, None, _path
95
Alex9a4ad212020-10-01 18:04:25 -050096 _conf = yaml.load(_c_data, Loader=yaml.SafeLoader)
97
98 _kube_conf = kclient.Configuration()
99 # A remote host configuration
100
101 # To work with remote cluster, we need to extract these
102 # keys = ['host', 'ssl_ca_cert', 'cert_file', 'key_file', 'verify_ssl']
103 # When v12 of the client will be release, we will use load_from_dict
104
105 _kube_conf.ssl_ca_cert = create_temp_file_with_content(
106 base64.standard_b64decode(
107 _conf['clusters'][0]['cluster']['certificate-authority-data']
108 )
109 )
110 _host = _conf['clusters'][0]['cluster']['server']
111 _kube_conf.cert_file = create_temp_file_with_content(
112 base64.standard_b64decode(
113 _conf['users'][0]['user']['client-certificate-data']
114 )
115 )
116 _kube_conf.key_file = create_temp_file_with_content(
117 base64.standard_b64decode(
118 _conf['users'][0]['user']['client-key-data']
119 )
120 )
121 if "http" not in _host or "443" not in _host:
122 logger_cli.error(
123 "Failed to extract Kube host: '{}'".format(_host)
124 )
125 else:
126 logger_cli.debug(
Alexc4f59622021-08-27 13:42:00 -0500127 "... 'context' host extracted: '{}' via SSH@{}".format(
Alex9a4ad212020-10-01 18:04:25 -0500128 _host,
129 config.ssh_host
130 )
131 )
132
133 # Substitute context host to ours
134 _tmp = _host.split(':')
135 _kube_conf.host = \
136 _tmp[0] + "://" + config.mcp_host + ":" + _tmp[2]
137 config.kube_port = _tmp[2]
138 logger_cli.debug(
Alexc4f59622021-08-27 13:42:00 -0500139 "... kube remote host updated to {}".format(
Alex9a4ad212020-10-01 18:04:25 -0500140 _kube_conf.host
141 )
142 )
143 _kube_conf.verify_ssl = False
144 _kube_conf.debug = config.debug
Alex33747812021-04-07 10:11:39 -0500145 if config.insecure:
146 _kube_conf.assert_hostname = False
147 _kube_conf.client_side_validation = False
148
Alex9a4ad212020-10-01 18:04:25 -0500149 # Nevertheless if you want to do it
150 # you can with these 2 parameters
151 # configuration.verify_ssl=True
152 # ssl_ca_cert is the filepath
153 # to the file that contains the certificate.
154 # configuration.ssl_ca_cert="certificate"
155
156 # _kube_conf.api_key = {
157 # "authorization": "Bearer " + config.kube_token
158 # }
159
160 # Create a ApiClient with our config
161 _kube_api = kclient.ApiClient(_kube_conf)
162
Alex359e5752021-08-16 17:28:30 -0500163 return _kube_conf, _kube_api, _path
Alex9a4ad212020-10-01 18:04:25 -0500164
165
166class KubeApi(object):
167 def __init__(self, config):
168 self.config = config
Alex359e5752021-08-16 17:28:30 -0500169 self.initialized = self._init_kclient()
Alex9a4ad212020-10-01 18:04:25 -0500170 self.last_response = None
171
172 def _init_kclient(self):
173 # if there is no password - try to get local, if this available
Alex359e5752021-08-16 17:28:30 -0500174 logger_cli.debug("... init kube config")
Alex9a4ad212020-10-01 18:04:25 -0500175 if self.config.env_name == "local":
Alex359e5752021-08-16 17:28:30 -0500176 self.kConf, self.kApi, self.kConfigPath = _init_kube_conf_local(
177 self.config
178 )
Alex9a4ad212020-10-01 18:04:25 -0500179 self.is_local = True
Alexc4f59622021-08-27 13:42:00 -0500180 # Try to load local config data
181 if self.config.kube_config_path and \
182 os.path.exists(self.config.kube_config_path):
183 _cmd = "cat " + self.config.kube_config_path
184 _c_data = shell(_cmd)
Alex9a4ad212020-10-01 18:04:25 -0500185 _conf = yaml.load(_c_data, Loader=yaml.SafeLoader)
186 self.user_keypath = create_temp_file_with_content(
187 base64.standard_b64decode(
188 _conf['users'][0]['user']['client-key-data']
189 )
190 )
191 self.yaml_conf = _c_data
192 else:
Alex359e5752021-08-16 17:28:30 -0500193 self.kConf, self.kApi, self.kConfigPath = _init_kube_conf_remote(
194 self.config
195 )
Alex9a4ad212020-10-01 18:04:25 -0500196 self.is_local = False
197
Alex359e5752021-08-16 17:28:30 -0500198 if self.kConf is None or self.kApi is None:
199 return False
200 else:
201 return True
202
Alex9a4ad212020-10-01 18:04:25 -0500203 def get_versions_api(self):
204 # client.CoreApi().get_api_versions().versions
205 return kclient.VersionApi(self.kApi)
206
207
208class KubeRemote(KubeApi):
209 def __init__(self, config):
210 super(KubeRemote, self).__init__(config)
Alex1f90e7b2021-09-03 15:31:28 -0500211 self._appsV1 = None
212 self._podV1 = None
Alexdcb792f2021-10-04 14:24:21 -0500213 self._custom = None
214
215 @property
216 def CustomObjects(self):
217 if not self._custom:
218 self._custom = kclient.CustomObjectsApi(self.kApi)
219 return self._custom
Alex9a4ad212020-10-01 18:04:25 -0500220
221 @property
222 def CoreV1(self):
Alexb2129542021-11-23 15:49:42 -0600223 if self.is_local:
224 return kclient.CoreV1Api(kclient.ApiClient())
225 else:
226 return kclient.CoreV1Api(kclient.ApiClient(self.kConf))
Alex9a4ad212020-10-01 18:04:25 -0500227
Alex1f90e7b2021-09-03 15:31:28 -0500228 @property
229 def AppsV1(self):
230 if not self._appsV1:
231 self._appsV1 = kclient.AppsV1Api(self.kApi)
232 return self._appsV1
233
234 @property
235 def PodsV1(self):
236 if not self._podsV1:
237 self._podsV1 = kclient.V1Pod(self.kApi)
238 return self._podsV1
239
Alex9a4ad212020-10-01 18:04:25 -0500240 @staticmethod
241 def _typed_list_to_dict(i_list):
242 _dict = {}
243 for _item in i_list:
244 _d = _item.to_dict()
245 _type = _d.pop("type")
246 _dict[_type.lower()] = _d
247
248 return _dict
249
250 @staticmethod
251 def _get_listed_attrs(items, _path):
252 _list = []
253 for _n in items:
254 _list.append(utils.rgetattr(_n, _path))
255
256 return _list
257
Alex1f90e7b2021-09-03 15:31:28 -0500258 @staticmethod
259 def safe_get_item_by_name(api_resource, _name):
260 for item in api_resource.items:
261 if item.metadata.name == _name:
262 return item
263
264 return None
265
Alex2a7657c2021-11-10 20:51:34 -0600266 def wait_for_phase_on_start(self, _func, phase, *args, **kwargs):
Alex5cace3b2021-11-10 16:40:37 -0600267 w = watch.Watch()
268 start_time = time()
269 for event in w.stream(_func, *args, **kwargs):
270 if event["object"].status.phase == phase:
271 w.stop()
272 end_time = time()
273 logger_cli.debug(
274 "... bacame '{}' in {:0.2f} sec".format(
275 phase,
276 end_time-start_time
277 )
278 )
279 return
280 # event.type: ADDED, MODIFIED, DELETED
281 if event["type"] == "DELETED":
282 # Pod was deleted while we were waiting for it to start.
283 logger_cli.debug("... deleted before started")
284 w.stop()
285 return
286
Alex2a7657c2021-11-10 20:51:34 -0600287 def wait_for_event(self, _func, event, *args, **kwargs):
288 w = watch.Watch()
289 for event in w.stream(_func, *args, **kwargs):
290 # event.type: ADDED, MODIFIED, DELETED
291 if event["type"] == event:
292 # Pod was deleted while we were waiting for it to start.
293 logger_cli.debug("... got {} event".format(event["type"]))
294 w.stop()
295 return
296
Alex9a4ad212020-10-01 18:04:25 -0500297 def get_node_info(self, http=False):
298 # Query API for the nodes and do some presorting
299 _nodes = {}
300 if http:
301 _raw_nodes = self.CoreV1.list_node_with_http_info()
302 else:
303 _raw_nodes = self.CoreV1.list_node()
304
305 if not isinstance(_raw_nodes, kclient.models.v1_node_list.V1NodeList):
306 raise InvalidReturnException(
307 "Invalid return type: '{}'".format(type(_raw_nodes))
308 )
309
310 for _n in _raw_nodes.items:
311 _name = _n.metadata.name
312 _d = _n.to_dict()
313 # parse inner data classes as dicts
314 _d['addresses'] = self._typed_list_to_dict(_n.status.addresses)
315 _d['conditions'] = self._typed_list_to_dict(_n.status.conditions)
316 # Update 'status' type
317 if isinstance(_d['conditions']['ready']['status'], str):
318 _d['conditions']['ready']['status'] = utils.to_bool(
319 _d['conditions']['ready']['status']
320 )
321 # Parse image names?
322 # TODO: Here is the place where we can parse each node image names
323
324 # Parse roles
325 _d['labels'] = {}
326 for _label, _data in _d["metadata"]["labels"].items():
327 if _data.lower() in ["true", "false"]:
328 _d['labels'][_label] = utils.to_bool(_data)
329 else:
330 _d['labels'][_label] = _data
331
332 # Save
333 _nodes[_name] = _d
334
335 # debug report on how many nodes detected
336 logger_cli.debug("...node items returned '{}'".format(len(_nodes)))
337
338 return _nodes
339
Alexdcb792f2021-10-04 14:24:21 -0500340 def get_pod_names_by_partial_name(self, partial_name, ns):
341 logger_cli.debug('... searching for pods with {}'.format(partial_name))
342 _pods = self.CoreV1.list_namespaced_pod(ns)
343 _names = self._get_listed_attrs(_pods.items, "metadata.name")
344 _pnames = [n for n in _names if partial_name in n]
345 if len(_pnames) > 1:
346 logger_cli.debug(
347 "... more than one pod found for '{}': {}\n".format(
348 partial_name,
349 ", ".join(_pnames)
350 )
351 )
352 elif len(_pnames) < 1:
353 logger_cli.warning(
354 "WARNING: No pods found for '{}'".format(partial_name)
355 )
356
357 return _pnames
358
359 def get_pods_by_partial_name(self, partial_name, ns):
360 logger_cli.debug('... searching for pods with {}'.format(partial_name))
361 _all_pods = self.CoreV1.list_namespaced_pod(ns)
362 # _names = self._get_listed_attrs(_pods.items, "metadata.name")
363 _pods = [_pod for _pod in _all_pods.items
364 if partial_name in _pod.metadata.name]
365 if len(_pods) > 1:
366 logger_cli.debug(
367 "... more than one pod found for '{}': {}\n".format(
368 partial_name,
369 ", ".join(partial_name)
370 )
371 )
372 elif len(_pods) < 1:
373 logger_cli.warning(
374 "WARNING: No pods found for '{}'".format(partial_name)
375 )
376
377 return _pods
378
Alex30380a42021-12-20 16:11:20 -0600379 @retry(ApiException, initial_wait=10)
Alex9a4ad212020-10-01 18:04:25 -0500380 def exec_on_target_pod(
381 self,
382 cmd,
383 pod_name,
384 namespace,
385 strict=False,
386 _request_timeout=120,
Alexb78191f2021-11-02 16:35:46 -0500387 arguments=None,
Alex9a4ad212020-10-01 18:04:25 -0500388 **kwargs
389 ):
Alexdcb792f2021-10-04 14:24:21 -0500390 _pname = ""
Alex9a4ad212020-10-01 18:04:25 -0500391 if not strict:
Alex1f90e7b2021-09-03 15:31:28 -0500392 logger_cli.debug(
393 "... searching for pods with the name '{}'".format(pod_name)
394 )
395 _pods = {}
Alex7b0ee9a2021-09-21 17:16:17 -0500396 _pods = self.CoreV1.list_namespaced_pod(namespace)
Alex1f90e7b2021-09-03 15:31:28 -0500397 _names = self._get_listed_attrs(_pods.items, "metadata.name")
Alex33747812021-04-07 10:11:39 -0500398 _pnames = [n for n in _names if n.startswith(pod_name)]
399 if len(_pnames) > 1:
Alex9a4ad212020-10-01 18:04:25 -0500400 logger_cli.debug(
Alexc4f59622021-08-27 13:42:00 -0500401 "... more than one pod found for '{}': {}\n"
402 "... using first one".format(
Alex9a4ad212020-10-01 18:04:25 -0500403 pod_name,
Alex33747812021-04-07 10:11:39 -0500404 ", ".join(_pnames)
Alex9a4ad212020-10-01 18:04:25 -0500405 )
406 )
Alexdcb792f2021-10-04 14:24:21 -0500407 elif len(_pnames) < 1:
Alex9a4ad212020-10-01 18:04:25 -0500408 raise KubeException("No pods found for '{}'".format(pod_name))
Alexb78191f2021-11-02 16:35:46 -0500409 # in case of >1 and =1 we are taking 1st anyway
410 _pname = _pnames[0]
Alex9a4ad212020-10-01 18:04:25 -0500411 else:
412 _pname = pod_name
Alex33747812021-04-07 10:11:39 -0500413 logger_cli.debug(
Alexb78191f2021-11-02 16:35:46 -0500414 "... cmd: [CoreV1] exec {} -n {} -- {} '{}'".format(
Alex33747812021-04-07 10:11:39 -0500415 _pname,
416 namespace,
Alexb78191f2021-11-02 16:35:46 -0500417 cmd,
418 arguments
Alex33747812021-04-07 10:11:39 -0500419 )
420 )
Alex1f90e7b2021-09-03 15:31:28 -0500421 # Set preload_content to False to preserve JSON
422 # If not, output gets converted to str
423 # Which causes to change " to '
424 # After that json.loads(...) fail
Alex7b0ee9a2021-09-21 17:16:17 -0500425 cmd = cmd if isinstance(cmd, list) else cmd.split()
Alexb78191f2021-11-02 16:35:46 -0500426 if arguments:
427 cmd += [arguments]
Alexb2129542021-11-23 15:49:42 -0600428 # Make sure that CoreV1 is fresh before calling it
Alex1f90e7b2021-09-03 15:31:28 -0500429 _pod_stream = stream(
Alex9a4ad212020-10-01 18:04:25 -0500430 self.CoreV1.connect_get_namespaced_pod_exec,
431 _pname,
432 namespace,
Alex7b0ee9a2021-09-21 17:16:17 -0500433 command=cmd,
Alex9a4ad212020-10-01 18:04:25 -0500434 stderr=True,
435 stdin=False,
436 stdout=True,
437 tty=False,
438 _request_timeout=_request_timeout,
Alex1f90e7b2021-09-03 15:31:28 -0500439 _preload_content=False,
Alex9a4ad212020-10-01 18:04:25 -0500440 **kwargs
441 )
Alex1f90e7b2021-09-03 15:31:28 -0500442 # run for timeout
443 _pod_stream.run_forever(timeout=_request_timeout)
444 # read the output
Alex7b0ee9a2021-09-21 17:16:17 -0500445 _output = _pod_stream.read_stdout()
Alexb78191f2021-11-02 16:35:46 -0500446 _error = _pod_stream.read_stderr()
447 if _error:
448 # copy error to output
Alexe4de1142022-11-04 19:26:03 -0500449 logger.warning(
Alexb78191f2021-11-02 16:35:46 -0500450 "WARNING: cmd of '{}' returned error:\n{}\n".format(
451 " ".join(cmd),
452 _error
453 )
454 )
455 if not _output:
456 _output = _error
Alex7b0ee9a2021-09-21 17:16:17 -0500457 # Send output
458 return _output
Alex9a4ad212020-10-01 18:04:25 -0500459
Alex1f90e7b2021-09-03 15:31:28 -0500460 def ensure_namespace(self, ns):
461 """
462 Ensure that given namespace exists
463 """
464 # list active namespaces
465 _v1NamespaceList = self.CoreV1.list_namespace()
466 _ns = self.safe_get_item_by_name(_v1NamespaceList, ns)
467
468 if _ns is None:
469 logger_cli.debug("... creating namespace '{}'".format(ns))
Alexdcb792f2021-10-04 14:24:21 -0500470 _new_ns = kclient.V1Namespace()
471 _new_ns.metadata = kclient.V1ObjectMeta(name=ns)
472 _r = self.CoreV1.create_namespace(_new_ns)
Alex1f90e7b2021-09-03 15:31:28 -0500473 # TODO: check return on fail
474 if not _r:
475 return False
476 else:
477 logger_cli.debug("... found existing namespace '{}'".format(ns))
478
479 return True
480
481 def get_daemon_set_by_name(self, ns, name):
482 return self.safe_get_item_by_name(
483 self.AppsV1.list_namespaced_daemon_set(ns),
484 name
485 )
486
487 def create_config_map(self, ns, name, source, recreate=True):
488 """
489 Creates/Overwrites ConfigMap in working namespace
490 """
491 # Prepare source
492 logger_cli.debug(
493 "... preparing config map '{}/{}' with files from '{}'".format(
494 ns,
495 name,
496 source
497 )
498 )
499 _data = {}
500 if os.path.isfile(source):
501 # populate data with one file
502 with open(source, 'rt') as fS:
503 _data[os.path.split(source)[1]] = fS.read()
504 elif os.path.isdir(source):
505 # walk dirs and populate all 'py' files
506 for path, dirs, files in os.walk(source):
507 _e = ('.py')
508 _subfiles = (_fl for _fl in files
509 if _fl.endswith(_e) and not _fl.startswith('.'))
510 for _file in _subfiles:
511 with open(os.path.join(path, _file), 'rt') as fS:
512 _data[_file] = fS.read()
513
514 _cm = kclient.V1ConfigMap()
515 _cm.metadata = kclient.V1ObjectMeta(name=name, namespace=ns)
516 _cm.data = _data
517 logger_cli.debug(
518 "... prepared config map with {} scripts".format(len(_data))
519 )
520 # Query existing configmap, delete if needed
521 _existing_cm = self.safe_get_item_by_name(
522 self.CoreV1.list_namespaced_config_map(namespace=ns),
523 name
524 )
525 if _existing_cm is not None:
526 self.CoreV1.replace_namespaced_config_map(
527 namespace=ns,
528 name=name,
529 body=_cm
530 )
531 logger_cli.debug(
532 "... replaced existing config map '{}/{}'".format(
533 ns,
534 name
535 )
536 )
537 else:
538 # Create it
539 self.CoreV1.create_namespaced_config_map(
540 namespace=ns,
541 body=_cm
542 )
543 logger_cli.debug("... created config map '{}/{}'".format(
544 ns,
545 name
546 ))
547
548 return _data.keys()
549
550 def prepare_daemonset_from_yaml(self, ns, ds_yaml):
551 _name = ds_yaml['metadata']['name']
552 _ds = self.get_daemon_set_by_name(ns, _name)
553
554 if _ds is not None:
555 logger_cli.debug(
556 "... found existing daemonset '{}'".format(_name)
557 )
558 _r = self.AppsV1.replace_namespaced_daemon_set(
559 _ds.metadata.name,
560 _ds.metadata.namespace,
561 body=ds_yaml
562 )
563 logger_cli.debug(
564 "... replacing existing daemonset '{}'".format(_name)
565 )
566 return _r
567 else:
568 logger_cli.debug(
569 "... creating daemonset '{}'".format(_name)
570 )
571 _r = self.AppsV1.create_namespaced_daemon_set(ns, body=ds_yaml)
572 return _r
573
574 def delete_daemon_set_by_name(self, ns, name):
575 return self.AppsV1.delete_namespaced_daemon_set(name, ns)
576
577 def exec_on_all_pods(self, pods):
578 """
579 Create multiple threads to execute script on all target pods
580 """
581 # Create map for threads: [[node_name, ns, pod_name]...]
582 _pod_list = []
583 for item in pods.items:
584 _pod_list.append(
585 [
586 item.spec.nodeName,
587 item.metadata.namespace,
588 item.metadata.name
589 ]
590 )
591
592 # map func and cmd
Alexdcb792f2021-10-04 14:24:21 -0500593 logger_cli.error("ERROR: 'exec_on_all_pods'is not implemented yet")
Alex1f90e7b2021-09-03 15:31:28 -0500594 # create result list
595
596 return []
Alex7b0ee9a2021-09-21 17:16:17 -0500597
Alexb2129542021-11-23 15:49:42 -0600598 @retry(ApiException, initial_wait=5)
Alex7b0ee9a2021-09-21 17:16:17 -0500599 def get_pods_for_daemonset(self, ds):
600 # get all pod names for daemonset
601 logger_cli.debug(
602 "... extracting pod names from daemonset '{}'".format(
603 ds.metadata.name
604 )
605 )
606 _ns = ds.metadata.namespace
607 _name = ds.metadata.name
608 _pods = self.CoreV1.list_namespaced_pod(
609 namespace=_ns,
610 label_selector='name={}'.format(_name)
611 )
612 return _pods
613
Alexbdc72742021-12-23 13:26:05 -0600614 @retry(ApiException, initial_wait=10)
Alex7b0ee9a2021-09-21 17:16:17 -0500615 def put_string_buffer_to_pod_as_textfile(
616 self,
617 pod_name,
618 namespace,
619 buffer,
620 filepath,
621 _request_timeout=120,
622 **kwargs
623 ):
624 _command = ['/bin/sh']
625 response = stream(
626 self.CoreV1.connect_get_namespaced_pod_exec,
627 pod_name,
628 namespace,
629 command=_command,
630 stderr=True,
631 stdin=True,
632 stdout=True,
633 tty=False,
634 _request_timeout=_request_timeout,
635 _preload_content=False,
636 **kwargs
637 )
638
639 # if json
640 # buffer = json.dumps(_dict, indent=2).encode('utf-8')
641
642 commands = [
643 bytes("cat <<'EOF' >" + filepath + "\n", 'utf-8'),
644 buffer,
645 bytes("\n" + "EOF\n", 'utf-8')
646 ]
647
648 while response.is_open():
649 response.update(timeout=1)
650 if response.peek_stdout():
651 logger_cli.debug("... STDOUT: %s" % response.read_stdout())
652 if response.peek_stderr():
653 logger_cli.debug("... STDERR: %s" % response.read_stderr())
654 if commands:
655 c = commands.pop(0)
Alexb2129542021-11-23 15:49:42 -0600656 logger_cli.debug("... running command... {}".format(c))
Alex7b0ee9a2021-09-21 17:16:17 -0500657 response.write_stdin(str(c, encoding='utf-8'))
658 else:
659 break
660 response.close()
661
Alex7b0ee9a2021-09-21 17:16:17 -0500662 return
Alexdcb792f2021-10-04 14:24:21 -0500663
664 def get_custom_resource(self, group, version, plural):
665 # Get it
666 # Example:
667 # kubernetes.client.CustomObjectsApi().list_cluster_custom_object(
668 # group="networking.istio.io",
669 # version="v1alpha3",
670 # plural="serviceentries"
671 # )
672 return self.CustomObjects.list_cluster_custom_object(
673 group=group,
674 version=version,
675 plural=plural
676 )
Alex5cace3b2021-11-10 16:40:37 -0600677
678 def init_pvc_resource(
679 self,
680 name,
681 storage_class,
682 size,
683 ns="qa-space",
684 mode="ReadWriteOnce"
685 ):
686 """Return the Kubernetes PVC resource"""
687 return kclient.V1PersistentVolumeClaim(
688 api_version='v1',
689 kind='PersistentVolumeClaim',
690 metadata=kclient.V1ObjectMeta(
691 name=name,
692 namespace=ns,
693 labels={"name": name}
694 ),
695 spec=kclient.V1PersistentVolumeClaimSpec(
696 storage_class_name=storage_class,
697 access_modes=[mode],
698 resources=kclient.V1ResourceRequirements(
699 requests={'storage': size}
700 )
701 )
702 )
703
704 def init_pv_resource(
705 self,
706 name,
707 storage_class,
708 size,
709 path,
710 ns="qa-space",
711 mode="ReadWriteOnce"
712 ):
713 """Return the Kubernetes PVC resource"""
714 return kclient.V1PersistentVolume(
715 api_version='v1',
716 kind='PersistentVolume',
717 metadata=kclient.V1ObjectMeta(
718 name=name,
719 namespace=ns,
720 labels={"name": name}
721 ),
722 spec=kclient.V1PersistentVolumeSpec(
723 storage_class_name=storage_class,
724 access_modes=[mode],
725 capacity={'storage': size},
726 host_path=kclient.V1HostPathVolumeSource(path=path)
727 )
728 )
729
730 def init_service(
731 self,
732 name,
733 port,
734 clusterip=None,
735 ns="qa-space"
736 ):
737 """ Inits a V1Service object with data for benchmark agent"""
738 _meta = kclient.V1ObjectMeta(
739 name=name,
740 namespace=ns,
741 labels={"name": name}
742 )
743 _port = kclient.V1ServicePort(
744 port=port,
745 protocol="TCP",
746 target_port=port
747 )
748 _spec = kclient.V1ServiceSpec(
749 # cluster_ip=clusterip,
750 selector={"name": name},
751 # type="ClusterIP",
752 ports=[_port]
753 )
754 return kclient.V1Service(
755 api_version="v1",
756 kind="Service",
757 metadata=_meta,
758 spec=_spec
759 )
760
761 def prepare_pv(self, pv_object):
Alex2a7657c2021-11-10 20:51:34 -0600762 _existing = self.get_pv_by_name(pv_object.metadata.name)
Alex5cace3b2021-11-10 16:40:37 -0600763 if _existing is not None:
764 self.CoreV1.replace_persistent_volume(
765 pv_object.metadata.name,
766 pv_object
767 )
768 else:
769 self.CoreV1.create_persistent_volume(pv_object)
770
Alex2a7657c2021-11-10 20:51:34 -0600771 return self.wait_for_phase(
772 "pv",
773 pv_object.metadata.name,
774 None,
775 ["Available", "Bound"]
Alex5cace3b2021-11-10 16:40:37 -0600776 )
777
778 def prepare_pvc(self, pvc_object):
Alex2a7657c2021-11-10 20:51:34 -0600779 _existing = self.get_pvc_by_name_and_ns(
780 pvc_object.metadata.name,
781 pvc_object.metadata.namespace
Alex5cace3b2021-11-10 16:40:37 -0600782 )
783 if _existing is not None:
784 _size_r = pvc_object.spec.resources.requests["storage"]
785 _size_e = _existing.spec.resources.requests["storage"]
Alex2a7657c2021-11-10 20:51:34 -0600786 logger_cli.info(
787 "-> Found PVC '{}/{}' with {}. Requested: {}'".format(
Alex5cace3b2021-11-10 16:40:37 -0600788 pvc_object.metadata.namespace,
789 pvc_object.metadata.name,
790 _size_e,
791 _size_r
792 )
793 )
794 if _size_r != _size_e:
795 raise CheckerException(
796 "ERROR: PVC exists on the cloud with different size "
797 "than needed. Please cleanup!"
798 )
799 else:
800 logger_cli.debug(
801 "... creating pvc '{}'".format(pvc_object.metadata.name)
802 )
803 self.CoreV1.create_namespaced_persistent_volume_claim(
804 pvc_object.metadata.namespace,
805 pvc_object
806 )
807
Alex2a7657c2021-11-10 20:51:34 -0600808 return self.wait_for_phase(
809 "pvc",
810 pvc_object.metadata.name,
811 pvc_object.metadata.namespace,
812 ["Available", "Bound"]
813 )
814
815 def get_pod_by_name_and_ns(self, name, ns):
816 return self.safe_get_item_by_name(
817 self.CoreV1.list_namespaced_pod(
818 ns,
819 label_selector='name={}'.format(name)
820 ),
821 name
822 )
823
Alexb2129542021-11-23 15:49:42 -0600824 def list_pods(self, ns, label_str=None):
825 return self.CoreV1.list_namespaced_pod(
826 ns,
827 label_selector=label_str
828 )
829
Alex2a7657c2021-11-10 20:51:34 -0600830 def get_svc_by_name_and_ns(self, name, ns):
831 return self.safe_get_item_by_name(
832 self.CoreV1.list_namespaced_service(
833 ns,
834 label_selector='name={}'.format(name)
835 ),
836 name
837 )
838
Alexb2129542021-11-23 15:49:42 -0600839 def list_svc(self, ns, label_str=None):
840 return self.CoreV1.list_namespaced_service(
841 ns,
842 label_selector=label_str
843 )
844
Alex2a7657c2021-11-10 20:51:34 -0600845 def get_pvc_by_name_and_ns(self, name, ns):
846 return self.safe_get_item_by_name(
847 self.CoreV1.list_namespaced_persistent_volume_claim(
848 ns,
849 label_selector='name={}'.format(name)
850 ),
851 name
852 )
853
Alexb2129542021-11-23 15:49:42 -0600854 def list_pvc(self, ns, label_str=None):
855 return self.CoreV1.list_namespaced_persistent_volume_claim(
856 ns,
857 label_selector=label_str
858 )
859
Alex2a7657c2021-11-10 20:51:34 -0600860 def get_pv_by_name(self, name):
861 return self.safe_get_item_by_name(
862 self.CoreV1.list_persistent_volume(
863 label_selector='name={}'.format(name)
864 ),
865 name
866 )
867
Alexb2129542021-11-23 15:49:42 -0600868 def list_pv(self, label_str=None):
869 return self.CoreV1.list_persistent_volume(
870 label_selector=label_str
871 )
872
Alex2a7657c2021-11-10 20:51:34 -0600873 def wait_for_phase(self, ttype, name, ns, phase_list, timeout=120):
874 logger_cli.debug(
875 "... waiting '{}'s until {} is '{}'".format(
876 timeout,
877 ttype,
878 ", ".join(phase_list)
879 )
880 )
881 while timeout > 0:
882 if ttype == "pod":
883 _t = self.get_pod_by_name_and_ns(name, ns)
884 elif ttype == "svc":
885 _t = self.get_svc_by_name_and_ns(name, ns)
886 elif ttype == "pvc":
887 _t = self.get_pvc_by_name_and_ns(name, ns)
888 elif ttype == "pv":
889 _t = self.get_pv_by_name(name)
890 if "Terminated" in phase_list and not _t:
891 if ns:
892 _s = "... {} {}/{} not found".format(ttype, ns, name)
Alex5cace3b2021-11-10 16:40:37 -0600893 else:
Alex2a7657c2021-11-10 20:51:34 -0600894 _s = "... {} '{}' not found".format(ttype, name)
895 logger_cli.debug(_s)
896 return None
897 logger_cli.debug("... {} is '{}'".format(ttype, _t.status.phase))
898 if _t.status.phase in phase_list:
899 return _t
900 sleep(2)
901 timeout -= 2
Alex5cace3b2021-11-10 16:40:37 -0600902 raise CheckerException(
Alex2a7657c2021-11-10 20:51:34 -0600903 "Timed out waiting for {} '{}' in '{}'".format(
904 ttype,
905 name,
906 ", ".join(ttype)
Alex5cace3b2021-11-10 16:40:37 -0600907 )
908 )
909
910 def prepare_pod_from_yaml(self, pod_yaml):
Alex2a7657c2021-11-10 20:51:34 -0600911 _existing = self.get_pod_by_name_and_ns(
912 pod_yaml['metadata']['name'],
913 pod_yaml['metadata']['namespace']
Alex5cace3b2021-11-10 16:40:37 -0600914 )
915 if _existing is not None:
Alexbfa947c2021-11-11 18:14:28 -0600916 logger_cli.info(
917 "-> Found pod '{}/{}'. Reusing.".format(
Alex5cace3b2021-11-10 16:40:37 -0600918 pod_yaml['metadata']['namespace'],
919 pod_yaml['metadata']['name']
920 )
921 )
922 return _existing
923 else:
924 self.CoreV1.create_namespaced_pod(
925 pod_yaml['metadata']['namespace'],
926 pod_yaml
927 )
Alex2a7657c2021-11-10 20:51:34 -0600928 return self.wait_for_phase(
929 "pod",
930 pod_yaml['metadata']['name'],
931 pod_yaml['metadata']['namespace'],
932 ["Running"]
Alex5cace3b2021-11-10 16:40:37 -0600933 )
934
935 def expose_pod_port(self, pod_object, port, ns="qa-space"):
Alex2a7657c2021-11-10 20:51:34 -0600936 _existing = self.get_svc_by_name_and_ns(
937 pod_object.metadata.name,
938 pod_object.metadata.namespace
Alex5cace3b2021-11-10 16:40:37 -0600939 )
940 if _existing is not None:
941 # TODO: Check port number?
Alex2a7657c2021-11-10 20:51:34 -0600942 logger_cli.info(
943 "-> Pod already exposed '{}/{}:{}'. Reusing.".format(
Alex5cace3b2021-11-10 16:40:37 -0600944 pod_object.metadata.namespace,
945 pod_object.metadata.name,
946 port
947 )
948 )
949 return _existing
950 else:
951 logger_cli.debug(
952 "... creating service for pod {}/{}: {}:{}".format(
953 pod_object.metadata.namespace,
954 pod_object.metadata.name,
955 pod_object.status.pod_ip,
956 port
957 )
958 )
959 _svc = self.init_service(
960 pod_object.metadata.name,
961 port
962 )
963 return self.CoreV1.create_namespaced_service(
964 pod_object.metadata.namespace,
965 _svc
966 )
Alex0989ecf2022-03-29 13:43:21 -0500967
968 def get_pod_logs(self, podname, ns):
969 # Params
970 # read log of the specified Pod # noqa: E501 This method makes a synchronous HTTP request by default. To make an asynchronous HTTP request, please pass async_req=True
971
972 # >>> thread = api.read_namespaced_pod_log(name, namespace,
973 # async_req=True)
974 # >>> result = thread.get()
975 # :param async_req bool: execute request asynchronously
976 # :param str name: name of the Pod (required)
977 # :param str namespace: object name and auth scope, such as for teams
978 # and projects (required)
979 # :param str container: The container for which to stream logs.
980 # Defaults to only container if there is one container in
981 # the pod.
982 # :param bool follow: Follow the log stream of the pod. Defaults to
983 # false.
984 # :param bool insecure_skip_tls_verify_backend:
985 # insecureSkipTLSVerifyBackend indicates that the apiserver
986 # should not confirm the validity of the serving certificate
987 # of the backend it is connecting to. This will make the
988 # HTTPS connection between the apiserver and the backend
989 # insecure. This means the apiserver cannot verify the log
990 # data it is receiving came from the real kubelet. If the
991 # kubelet is configured to verify the apiserver's TLS
992 # credentials, it does not mean the connection to the real
993 # kubelet is vulnerable to a man in the middle attack (e.g.
994 # an attacker could not intercept the actual log data coming
995 # from the real kubelet).
996 # :param int limit_bytes: If set, the number of bytes to read from the
997 # server before terminating the log output. This may not
998 # display a complete final line of logging, and may return
999 # slightly more or slightly less than the specified limit.
1000 # :param str pretty: If 'true', then the output is pretty printed.
1001 # :param bool previous: Return previous terminated container logs.
1002 # Defaults to false.
1003 # :param int since_seconds: A relative time in seconds before the
1004 # current time from which to show logs. If this value precedes
1005 # the time a pod was started, only logs since the pod start will
1006 # be returned. If this value is in the future, no logs will be
1007 # returned. Only one of sinceSeconds or sinceTime may be
1008 # specified.
1009 # :param int tail_lines: If set, the number of lines from the end of
1010 # the logs to show. If not specified, logs are shown from the
1011 # creation of the container or sinceSeconds or sinceTime
1012 # :param bool timestamps: If true, add an RFC3339 or RFC3339Nano
1013 # timestamp at the beginning of every line of log output.
1014 # Defaults to false.
1015 # :param _preload_content: if False, the urllib3.HTTPResponse object
1016 # will be returned without reading/decoding response data.
1017 # Default is True.
1018 # :param _request_timeout: timeout setting for this request. If one
1019 # number provided, it will be total request timeout. It can
1020 # also be a pair (tuple) of (connection, read) timeouts.
1021 # :return: str
1022 # If the method is called asynchronously, returns the request
1023 # thread.
1024
1025 return self.CoreV1.read_namespaced_pod_log(
1026 podname,
1027 ns,
1028 # timestamps=True,
1029 tail_lines=50,
1030 # pretty=True
1031 )