blob: 3e150958192500b40b2a92d783ef7afa3e6c73b2 [file] [log] [blame]
Alex9a4ad212020-10-01 18:04:25 -05001"""
2Module to handle interaction with Kube
3"""
4import base64
5import os
6import urllib3
7import yaml
8
Alex5cace3b2021-11-10 16:40:37 -06009from kubernetes import client as kclient, config as kconfig, watch
Alex9a4ad212020-10-01 18:04:25 -050010from kubernetes.stream import stream
Alex7b0ee9a2021-09-21 17:16:17 -050011from kubernetes.client.rest import ApiException
Alex5cace3b2021-11-10 16:40:37 -060012from time import time, sleep
Alex9a4ad212020-10-01 18:04:25 -050013
14from cfg_checker.common import logger, logger_cli
Alex7b0ee9a2021-09-21 17:16:17 -050015from cfg_checker.common.decorators import retry
Alex5cace3b2021-11-10 16:40:37 -060016from cfg_checker.common.exception import CheckerException, \
17 InvalidReturnException, KubeException
Alex9a4ad212020-10-01 18:04:25 -050018from cfg_checker.common.file_utils import create_temp_file_with_content
19from cfg_checker.common.other import utils, shell
20from cfg_checker.common.ssh_utils import ssh_shell_p
Alex359e5752021-08-16 17:28:30 -050021from cfg_checker.common.const import ENV_LOCAL
Alex9a4ad212020-10-01 18:04:25 -050022
Alex7b0ee9a2021-09-21 17:16:17 -050023
Alex9a4ad212020-10-01 18:04:25 -050024urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
25
26
27def _init_kube_conf_local(config):
28 # Init kube library locally
Alex359e5752021-08-16 17:28:30 -050029 _path = "local:{}".format(config.kube_config_path)
Alex9a4ad212020-10-01 18:04:25 -050030 try:
Alexc4f59622021-08-27 13:42:00 -050031 kconfig.load_kube_config(config_file=config.kube_config_path)
Alex33747812021-04-07 10:11:39 -050032 if config.insecure:
33 kconfig.assert_hostname = False
34 kconfig.client_side_validation = False
Alex9a4ad212020-10-01 18:04:25 -050035 logger_cli.debug(
Alexc4f59622021-08-27 13:42:00 -050036 "... found Kube env: core, {}". format(
Alex9a4ad212020-10-01 18:04:25 -050037 ",".join(
38 kclient.CoreApi().get_api_versions().versions
39 )
40 )
41 )
Alexc4f59622021-08-27 13:42:00 -050042 return kconfig, kclient.ApiClient(), _path
Alex9a4ad212020-10-01 18:04:25 -050043 except Exception as e:
44 logger.warn("Failed to init local Kube client: {}".format(
45 str(e)
46 )
47 )
Alex359e5752021-08-16 17:28:30 -050048 return None, None, _path
Alex9a4ad212020-10-01 18:04:25 -050049
50
51def _init_kube_conf_remote(config):
52 # init remote client
53 # Preload Kube token
54 """
55 APISERVER=$(kubectl config view --minify |
56 grep server | cut -f 2- -d ":" | tr -d " ")
57 SECRET_NAME=$(kubectl get secrets |
58 grep ^default | cut -f1 -d ' ')
59 TOKEN=$(kubectl describe secret $SECRET_NAME |
60 grep -E '^token' | cut -f2 -d':' | tr -d " ")
61
62 echo "Detected API Server at: '${APISERVER}'"
63 echo "Got secret: '${SECRET_NAME}'"
64 echo "Loaded token: '${TOKEN}'"
65
66 curl $APISERVER/api
67 --header "Authorization: Bearer $TOKEN" --insecure
68 """
69 import yaml
Alex359e5752021-08-16 17:28:30 -050070 _path = ''
Alexc4f59622021-08-27 13:42:00 -050071 # Try to load remote config only if it was not detected already
72 if not config.kube_config_detected and not config.env_name == ENV_LOCAL:
Alex359e5752021-08-16 17:28:30 -050073 _path = "{}@{}:{}".format(
74 config.ssh_user,
75 config.ssh_host,
76 config.kube_config_path
77 )
Alex9d913532021-03-24 18:01:45 -050078 _c_data = ssh_shell_p(
Alexc4f59622021-08-27 13:42:00 -050079 "cat " + config.kube_config_path,
Alex9d913532021-03-24 18:01:45 -050080 config.ssh_host,
81 username=config.ssh_user,
82 keypath=config.ssh_key,
83 piped=False,
84 use_sudo=config.ssh_uses_sudo,
85 )
86 else:
Alex359e5752021-08-16 17:28:30 -050087 _path = "local:{}".format(config.kube_config_path)
Alex9d913532021-03-24 18:01:45 -050088 with open(config.kube_config_path, 'r') as ff:
89 _c_data = ff.read()
Alex9a4ad212020-10-01 18:04:25 -050090
Alex359e5752021-08-16 17:28:30 -050091 if len(_c_data) < 1:
92 return None, None, _path
93
Alex9a4ad212020-10-01 18:04:25 -050094 _conf = yaml.load(_c_data, Loader=yaml.SafeLoader)
95
96 _kube_conf = kclient.Configuration()
97 # A remote host configuration
98
99 # To work with remote cluster, we need to extract these
100 # keys = ['host', 'ssl_ca_cert', 'cert_file', 'key_file', 'verify_ssl']
101 # When v12 of the client will be release, we will use load_from_dict
102
103 _kube_conf.ssl_ca_cert = create_temp_file_with_content(
104 base64.standard_b64decode(
105 _conf['clusters'][0]['cluster']['certificate-authority-data']
106 )
107 )
108 _host = _conf['clusters'][0]['cluster']['server']
109 _kube_conf.cert_file = create_temp_file_with_content(
110 base64.standard_b64decode(
111 _conf['users'][0]['user']['client-certificate-data']
112 )
113 )
114 _kube_conf.key_file = create_temp_file_with_content(
115 base64.standard_b64decode(
116 _conf['users'][0]['user']['client-key-data']
117 )
118 )
119 if "http" not in _host or "443" not in _host:
120 logger_cli.error(
121 "Failed to extract Kube host: '{}'".format(_host)
122 )
123 else:
124 logger_cli.debug(
Alexc4f59622021-08-27 13:42:00 -0500125 "... 'context' host extracted: '{}' via SSH@{}".format(
Alex9a4ad212020-10-01 18:04:25 -0500126 _host,
127 config.ssh_host
128 )
129 )
130
131 # Substitute context host to ours
132 _tmp = _host.split(':')
133 _kube_conf.host = \
134 _tmp[0] + "://" + config.mcp_host + ":" + _tmp[2]
135 config.kube_port = _tmp[2]
136 logger_cli.debug(
Alexc4f59622021-08-27 13:42:00 -0500137 "... kube remote host updated to {}".format(
Alex9a4ad212020-10-01 18:04:25 -0500138 _kube_conf.host
139 )
140 )
141 _kube_conf.verify_ssl = False
142 _kube_conf.debug = config.debug
Alex33747812021-04-07 10:11:39 -0500143 if config.insecure:
144 _kube_conf.assert_hostname = False
145 _kube_conf.client_side_validation = False
146
Alex9a4ad212020-10-01 18:04:25 -0500147 # Nevertheless if you want to do it
148 # you can with these 2 parameters
149 # configuration.verify_ssl=True
150 # ssl_ca_cert is the filepath
151 # to the file that contains the certificate.
152 # configuration.ssl_ca_cert="certificate"
153
154 # _kube_conf.api_key = {
155 # "authorization": "Bearer " + config.kube_token
156 # }
157
158 # Create a ApiClient with our config
159 _kube_api = kclient.ApiClient(_kube_conf)
160
Alex359e5752021-08-16 17:28:30 -0500161 return _kube_conf, _kube_api, _path
Alex9a4ad212020-10-01 18:04:25 -0500162
163
164class KubeApi(object):
165 def __init__(self, config):
166 self.config = config
Alex359e5752021-08-16 17:28:30 -0500167 self.initialized = self._init_kclient()
Alex9a4ad212020-10-01 18:04:25 -0500168 self.last_response = None
169
170 def _init_kclient(self):
171 # if there is no password - try to get local, if this available
Alex359e5752021-08-16 17:28:30 -0500172 logger_cli.debug("... init kube config")
Alex9a4ad212020-10-01 18:04:25 -0500173 if self.config.env_name == "local":
Alex359e5752021-08-16 17:28:30 -0500174 self.kConf, self.kApi, self.kConfigPath = _init_kube_conf_local(
175 self.config
176 )
Alex9a4ad212020-10-01 18:04:25 -0500177 self.is_local = True
Alexc4f59622021-08-27 13:42:00 -0500178 # Try to load local config data
179 if self.config.kube_config_path and \
180 os.path.exists(self.config.kube_config_path):
181 _cmd = "cat " + self.config.kube_config_path
182 _c_data = shell(_cmd)
Alex9a4ad212020-10-01 18:04:25 -0500183 _conf = yaml.load(_c_data, Loader=yaml.SafeLoader)
184 self.user_keypath = create_temp_file_with_content(
185 base64.standard_b64decode(
186 _conf['users'][0]['user']['client-key-data']
187 )
188 )
189 self.yaml_conf = _c_data
190 else:
Alex359e5752021-08-16 17:28:30 -0500191 self.kConf, self.kApi, self.kConfigPath = _init_kube_conf_remote(
192 self.config
193 )
Alex9a4ad212020-10-01 18:04:25 -0500194 self.is_local = False
195
Alex359e5752021-08-16 17:28:30 -0500196 if self.kConf is None or self.kApi is None:
197 return False
198 else:
199 return True
200
Alex9a4ad212020-10-01 18:04:25 -0500201 def get_versions_api(self):
202 # client.CoreApi().get_api_versions().versions
203 return kclient.VersionApi(self.kApi)
204
205
206class KubeRemote(KubeApi):
207 def __init__(self, config):
208 super(KubeRemote, self).__init__(config)
Alex1f90e7b2021-09-03 15:31:28 -0500209 self._appsV1 = None
210 self._podV1 = None
Alexdcb792f2021-10-04 14:24:21 -0500211 self._custom = None
212
213 @property
214 def CustomObjects(self):
215 if not self._custom:
216 self._custom = kclient.CustomObjectsApi(self.kApi)
217 return self._custom
Alex9a4ad212020-10-01 18:04:25 -0500218
219 @property
220 def CoreV1(self):
Alexb2129542021-11-23 15:49:42 -0600221 if self.is_local:
222 return kclient.CoreV1Api(kclient.ApiClient())
223 else:
224 return kclient.CoreV1Api(kclient.ApiClient(self.kConf))
Alex9a4ad212020-10-01 18:04:25 -0500225
Alex1f90e7b2021-09-03 15:31:28 -0500226 @property
227 def AppsV1(self):
228 if not self._appsV1:
229 self._appsV1 = kclient.AppsV1Api(self.kApi)
230 return self._appsV1
231
232 @property
233 def PodsV1(self):
234 if not self._podsV1:
235 self._podsV1 = kclient.V1Pod(self.kApi)
236 return self._podsV1
237
Alex9a4ad212020-10-01 18:04:25 -0500238 @staticmethod
239 def _typed_list_to_dict(i_list):
240 _dict = {}
241 for _item in i_list:
242 _d = _item.to_dict()
243 _type = _d.pop("type")
244 _dict[_type.lower()] = _d
245
246 return _dict
247
248 @staticmethod
249 def _get_listed_attrs(items, _path):
250 _list = []
251 for _n in items:
252 _list.append(utils.rgetattr(_n, _path))
253
254 return _list
255
Alex1f90e7b2021-09-03 15:31:28 -0500256 @staticmethod
257 def safe_get_item_by_name(api_resource, _name):
258 for item in api_resource.items:
259 if item.metadata.name == _name:
260 return item
261
262 return None
263
Alex2a7657c2021-11-10 20:51:34 -0600264 def wait_for_phase_on_start(self, _func, phase, *args, **kwargs):
Alex5cace3b2021-11-10 16:40:37 -0600265 w = watch.Watch()
266 start_time = time()
267 for event in w.stream(_func, *args, **kwargs):
268 if event["object"].status.phase == phase:
269 w.stop()
270 end_time = time()
271 logger_cli.debug(
272 "... bacame '{}' in {:0.2f} sec".format(
273 phase,
274 end_time-start_time
275 )
276 )
277 return
278 # event.type: ADDED, MODIFIED, DELETED
279 if event["type"] == "DELETED":
280 # Pod was deleted while we were waiting for it to start.
281 logger_cli.debug("... deleted before started")
282 w.stop()
283 return
284
Alex2a7657c2021-11-10 20:51:34 -0600285 def wait_for_event(self, _func, event, *args, **kwargs):
286 w = watch.Watch()
287 for event in w.stream(_func, *args, **kwargs):
288 # event.type: ADDED, MODIFIED, DELETED
289 if event["type"] == event:
290 # Pod was deleted while we were waiting for it to start.
291 logger_cli.debug("... got {} event".format(event["type"]))
292 w.stop()
293 return
294
Alex9a4ad212020-10-01 18:04:25 -0500295 def get_node_info(self, http=False):
296 # Query API for the nodes and do some presorting
297 _nodes = {}
298 if http:
299 _raw_nodes = self.CoreV1.list_node_with_http_info()
300 else:
301 _raw_nodes = self.CoreV1.list_node()
302
303 if not isinstance(_raw_nodes, kclient.models.v1_node_list.V1NodeList):
304 raise InvalidReturnException(
305 "Invalid return type: '{}'".format(type(_raw_nodes))
306 )
307
308 for _n in _raw_nodes.items:
309 _name = _n.metadata.name
310 _d = _n.to_dict()
311 # parse inner data classes as dicts
312 _d['addresses'] = self._typed_list_to_dict(_n.status.addresses)
313 _d['conditions'] = self._typed_list_to_dict(_n.status.conditions)
314 # Update 'status' type
315 if isinstance(_d['conditions']['ready']['status'], str):
316 _d['conditions']['ready']['status'] = utils.to_bool(
317 _d['conditions']['ready']['status']
318 )
319 # Parse image names?
320 # TODO: Here is the place where we can parse each node image names
321
322 # Parse roles
323 _d['labels'] = {}
324 for _label, _data in _d["metadata"]["labels"].items():
325 if _data.lower() in ["true", "false"]:
326 _d['labels'][_label] = utils.to_bool(_data)
327 else:
328 _d['labels'][_label] = _data
329
330 # Save
331 _nodes[_name] = _d
332
333 # debug report on how many nodes detected
334 logger_cli.debug("...node items returned '{}'".format(len(_nodes)))
335
336 return _nodes
337
Alexdcb792f2021-10-04 14:24:21 -0500338 def get_pod_names_by_partial_name(self, partial_name, ns):
339 logger_cli.debug('... searching for pods with {}'.format(partial_name))
340 _pods = self.CoreV1.list_namespaced_pod(ns)
341 _names = self._get_listed_attrs(_pods.items, "metadata.name")
342 _pnames = [n for n in _names if partial_name in n]
343 if len(_pnames) > 1:
344 logger_cli.debug(
345 "... more than one pod found for '{}': {}\n".format(
346 partial_name,
347 ", ".join(_pnames)
348 )
349 )
350 elif len(_pnames) < 1:
351 logger_cli.warning(
352 "WARNING: No pods found for '{}'".format(partial_name)
353 )
354
355 return _pnames
356
357 def get_pods_by_partial_name(self, partial_name, ns):
358 logger_cli.debug('... searching for pods with {}'.format(partial_name))
359 _all_pods = self.CoreV1.list_namespaced_pod(ns)
360 # _names = self._get_listed_attrs(_pods.items, "metadata.name")
361 _pods = [_pod for _pod in _all_pods.items
362 if partial_name in _pod.metadata.name]
363 if len(_pods) > 1:
364 logger_cli.debug(
365 "... more than one pod found for '{}': {}\n".format(
366 partial_name,
367 ", ".join(partial_name)
368 )
369 )
370 elif len(_pods) < 1:
371 logger_cli.warning(
372 "WARNING: No pods found for '{}'".format(partial_name)
373 )
374
375 return _pods
376
Alexb2129542021-11-23 15:49:42 -0600377 @retry(ApiException, initial_wait=5)
Alex9a4ad212020-10-01 18:04:25 -0500378 def exec_on_target_pod(
379 self,
380 cmd,
381 pod_name,
382 namespace,
383 strict=False,
384 _request_timeout=120,
Alexb78191f2021-11-02 16:35:46 -0500385 arguments=None,
Alex9a4ad212020-10-01 18:04:25 -0500386 **kwargs
387 ):
Alexdcb792f2021-10-04 14:24:21 -0500388 _pname = ""
Alex9a4ad212020-10-01 18:04:25 -0500389 if not strict:
Alex1f90e7b2021-09-03 15:31:28 -0500390 logger_cli.debug(
391 "... searching for pods with the name '{}'".format(pod_name)
392 )
393 _pods = {}
Alex7b0ee9a2021-09-21 17:16:17 -0500394 _pods = self.CoreV1.list_namespaced_pod(namespace)
Alex1f90e7b2021-09-03 15:31:28 -0500395 _names = self._get_listed_attrs(_pods.items, "metadata.name")
Alex33747812021-04-07 10:11:39 -0500396 _pnames = [n for n in _names if n.startswith(pod_name)]
397 if len(_pnames) > 1:
Alex9a4ad212020-10-01 18:04:25 -0500398 logger_cli.debug(
Alexc4f59622021-08-27 13:42:00 -0500399 "... more than one pod found for '{}': {}\n"
400 "... using first one".format(
Alex9a4ad212020-10-01 18:04:25 -0500401 pod_name,
Alex33747812021-04-07 10:11:39 -0500402 ", ".join(_pnames)
Alex9a4ad212020-10-01 18:04:25 -0500403 )
404 )
Alexdcb792f2021-10-04 14:24:21 -0500405 elif len(_pnames) < 1:
Alex9a4ad212020-10-01 18:04:25 -0500406 raise KubeException("No pods found for '{}'".format(pod_name))
Alexb78191f2021-11-02 16:35:46 -0500407 # in case of >1 and =1 we are taking 1st anyway
408 _pname = _pnames[0]
Alex9a4ad212020-10-01 18:04:25 -0500409 else:
410 _pname = pod_name
Alex33747812021-04-07 10:11:39 -0500411 logger_cli.debug(
Alexb78191f2021-11-02 16:35:46 -0500412 "... cmd: [CoreV1] exec {} -n {} -- {} '{}'".format(
Alex33747812021-04-07 10:11:39 -0500413 _pname,
414 namespace,
Alexb78191f2021-11-02 16:35:46 -0500415 cmd,
416 arguments
Alex33747812021-04-07 10:11:39 -0500417 )
418 )
Alex1f90e7b2021-09-03 15:31:28 -0500419 # Set preload_content to False to preserve JSON
420 # If not, output gets converted to str
421 # Which causes to change " to '
422 # After that json.loads(...) fail
Alex7b0ee9a2021-09-21 17:16:17 -0500423 cmd = cmd if isinstance(cmd, list) else cmd.split()
Alexb78191f2021-11-02 16:35:46 -0500424 if arguments:
425 cmd += [arguments]
Alexb2129542021-11-23 15:49:42 -0600426 # Make sure that CoreV1 is fresh before calling it
Alex1f90e7b2021-09-03 15:31:28 -0500427 _pod_stream = stream(
Alex9a4ad212020-10-01 18:04:25 -0500428 self.CoreV1.connect_get_namespaced_pod_exec,
429 _pname,
430 namespace,
Alex7b0ee9a2021-09-21 17:16:17 -0500431 command=cmd,
Alex9a4ad212020-10-01 18:04:25 -0500432 stderr=True,
433 stdin=False,
434 stdout=True,
435 tty=False,
436 _request_timeout=_request_timeout,
Alex1f90e7b2021-09-03 15:31:28 -0500437 _preload_content=False,
Alex9a4ad212020-10-01 18:04:25 -0500438 **kwargs
439 )
Alex1f90e7b2021-09-03 15:31:28 -0500440 # run for timeout
441 _pod_stream.run_forever(timeout=_request_timeout)
442 # read the output
Alex7b0ee9a2021-09-21 17:16:17 -0500443 _output = _pod_stream.read_stdout()
Alexb78191f2021-11-02 16:35:46 -0500444 _error = _pod_stream.read_stderr()
445 if _error:
446 # copy error to output
447 logger_cli.warning(
448 "WARNING: cmd of '{}' returned error:\n{}\n".format(
449 " ".join(cmd),
450 _error
451 )
452 )
453 if not _output:
454 _output = _error
Alex7b0ee9a2021-09-21 17:16:17 -0500455 # Send output
456 return _output
Alex9a4ad212020-10-01 18:04:25 -0500457
Alex1f90e7b2021-09-03 15:31:28 -0500458 def ensure_namespace(self, ns):
459 """
460 Ensure that given namespace exists
461 """
462 # list active namespaces
463 _v1NamespaceList = self.CoreV1.list_namespace()
464 _ns = self.safe_get_item_by_name(_v1NamespaceList, ns)
465
466 if _ns is None:
467 logger_cli.debug("... creating namespace '{}'".format(ns))
Alexdcb792f2021-10-04 14:24:21 -0500468 _new_ns = kclient.V1Namespace()
469 _new_ns.metadata = kclient.V1ObjectMeta(name=ns)
470 _r = self.CoreV1.create_namespace(_new_ns)
Alex1f90e7b2021-09-03 15:31:28 -0500471 # TODO: check return on fail
472 if not _r:
473 return False
474 else:
475 logger_cli.debug("... found existing namespace '{}'".format(ns))
476
477 return True
478
479 def get_daemon_set_by_name(self, ns, name):
480 return self.safe_get_item_by_name(
481 self.AppsV1.list_namespaced_daemon_set(ns),
482 name
483 )
484
485 def create_config_map(self, ns, name, source, recreate=True):
486 """
487 Creates/Overwrites ConfigMap in working namespace
488 """
489 # Prepare source
490 logger_cli.debug(
491 "... preparing config map '{}/{}' with files from '{}'".format(
492 ns,
493 name,
494 source
495 )
496 )
497 _data = {}
498 if os.path.isfile(source):
499 # populate data with one file
500 with open(source, 'rt') as fS:
501 _data[os.path.split(source)[1]] = fS.read()
502 elif os.path.isdir(source):
503 # walk dirs and populate all 'py' files
504 for path, dirs, files in os.walk(source):
505 _e = ('.py')
506 _subfiles = (_fl for _fl in files
507 if _fl.endswith(_e) and not _fl.startswith('.'))
508 for _file in _subfiles:
509 with open(os.path.join(path, _file), 'rt') as fS:
510 _data[_file] = fS.read()
511
512 _cm = kclient.V1ConfigMap()
513 _cm.metadata = kclient.V1ObjectMeta(name=name, namespace=ns)
514 _cm.data = _data
515 logger_cli.debug(
516 "... prepared config map with {} scripts".format(len(_data))
517 )
518 # Query existing configmap, delete if needed
519 _existing_cm = self.safe_get_item_by_name(
520 self.CoreV1.list_namespaced_config_map(namespace=ns),
521 name
522 )
523 if _existing_cm is not None:
524 self.CoreV1.replace_namespaced_config_map(
525 namespace=ns,
526 name=name,
527 body=_cm
528 )
529 logger_cli.debug(
530 "... replaced existing config map '{}/{}'".format(
531 ns,
532 name
533 )
534 )
535 else:
536 # Create it
537 self.CoreV1.create_namespaced_config_map(
538 namespace=ns,
539 body=_cm
540 )
541 logger_cli.debug("... created config map '{}/{}'".format(
542 ns,
543 name
544 ))
545
546 return _data.keys()
547
548 def prepare_daemonset_from_yaml(self, ns, ds_yaml):
549 _name = ds_yaml['metadata']['name']
550 _ds = self.get_daemon_set_by_name(ns, _name)
551
552 if _ds is not None:
553 logger_cli.debug(
554 "... found existing daemonset '{}'".format(_name)
555 )
556 _r = self.AppsV1.replace_namespaced_daemon_set(
557 _ds.metadata.name,
558 _ds.metadata.namespace,
559 body=ds_yaml
560 )
561 logger_cli.debug(
562 "... replacing existing daemonset '{}'".format(_name)
563 )
564 return _r
565 else:
566 logger_cli.debug(
567 "... creating daemonset '{}'".format(_name)
568 )
569 _r = self.AppsV1.create_namespaced_daemon_set(ns, body=ds_yaml)
570 return _r
571
572 def delete_daemon_set_by_name(self, ns, name):
573 return self.AppsV1.delete_namespaced_daemon_set(name, ns)
574
575 def exec_on_all_pods(self, pods):
576 """
577 Create multiple threads to execute script on all target pods
578 """
579 # Create map for threads: [[node_name, ns, pod_name]...]
580 _pod_list = []
581 for item in pods.items:
582 _pod_list.append(
583 [
584 item.spec.nodeName,
585 item.metadata.namespace,
586 item.metadata.name
587 ]
588 )
589
590 # map func and cmd
Alexdcb792f2021-10-04 14:24:21 -0500591 logger_cli.error("ERROR: 'exec_on_all_pods'is not implemented yet")
Alex1f90e7b2021-09-03 15:31:28 -0500592 # create result list
593
594 return []
Alex7b0ee9a2021-09-21 17:16:17 -0500595
Alexb2129542021-11-23 15:49:42 -0600596 @retry(ApiException, initial_wait=5)
Alex7b0ee9a2021-09-21 17:16:17 -0500597 def get_pods_for_daemonset(self, ds):
598 # get all pod names for daemonset
599 logger_cli.debug(
600 "... extracting pod names from daemonset '{}'".format(
601 ds.metadata.name
602 )
603 )
604 _ns = ds.metadata.namespace
605 _name = ds.metadata.name
606 _pods = self.CoreV1.list_namespaced_pod(
607 namespace=_ns,
608 label_selector='name={}'.format(_name)
609 )
610 return _pods
611
Alexb2129542021-11-23 15:49:42 -0600612 @retry(ApiException, initial_wait=5)
Alex7b0ee9a2021-09-21 17:16:17 -0500613 def put_string_buffer_to_pod_as_textfile(
614 self,
615 pod_name,
616 namespace,
617 buffer,
618 filepath,
619 _request_timeout=120,
620 **kwargs
621 ):
622 _command = ['/bin/sh']
623 response = stream(
624 self.CoreV1.connect_get_namespaced_pod_exec,
625 pod_name,
626 namespace,
627 command=_command,
628 stderr=True,
629 stdin=True,
630 stdout=True,
631 tty=False,
632 _request_timeout=_request_timeout,
633 _preload_content=False,
634 **kwargs
635 )
636
637 # if json
638 # buffer = json.dumps(_dict, indent=2).encode('utf-8')
639
640 commands = [
641 bytes("cat <<'EOF' >" + filepath + "\n", 'utf-8'),
642 buffer,
643 bytes("\n" + "EOF\n", 'utf-8')
644 ]
645
646 while response.is_open():
647 response.update(timeout=1)
648 if response.peek_stdout():
649 logger_cli.debug("... STDOUT: %s" % response.read_stdout())
650 if response.peek_stderr():
651 logger_cli.debug("... STDERR: %s" % response.read_stderr())
652 if commands:
653 c = commands.pop(0)
Alexb2129542021-11-23 15:49:42 -0600654 logger_cli.debug("... running command... {}".format(c))
Alex7b0ee9a2021-09-21 17:16:17 -0500655 response.write_stdin(str(c, encoding='utf-8'))
656 else:
657 break
658 response.close()
659
Alex7b0ee9a2021-09-21 17:16:17 -0500660 return
Alexdcb792f2021-10-04 14:24:21 -0500661
662 def get_custom_resource(self, group, version, plural):
663 # Get it
664 # Example:
665 # kubernetes.client.CustomObjectsApi().list_cluster_custom_object(
666 # group="networking.istio.io",
667 # version="v1alpha3",
668 # plural="serviceentries"
669 # )
670 return self.CustomObjects.list_cluster_custom_object(
671 group=group,
672 version=version,
673 plural=plural
674 )
Alex5cace3b2021-11-10 16:40:37 -0600675
676 def init_pvc_resource(
677 self,
678 name,
679 storage_class,
680 size,
681 ns="qa-space",
682 mode="ReadWriteOnce"
683 ):
684 """Return the Kubernetes PVC resource"""
685 return kclient.V1PersistentVolumeClaim(
686 api_version='v1',
687 kind='PersistentVolumeClaim',
688 metadata=kclient.V1ObjectMeta(
689 name=name,
690 namespace=ns,
691 labels={"name": name}
692 ),
693 spec=kclient.V1PersistentVolumeClaimSpec(
694 storage_class_name=storage_class,
695 access_modes=[mode],
696 resources=kclient.V1ResourceRequirements(
697 requests={'storage': size}
698 )
699 )
700 )
701
702 def init_pv_resource(
703 self,
704 name,
705 storage_class,
706 size,
707 path,
708 ns="qa-space",
709 mode="ReadWriteOnce"
710 ):
711 """Return the Kubernetes PVC resource"""
712 return kclient.V1PersistentVolume(
713 api_version='v1',
714 kind='PersistentVolume',
715 metadata=kclient.V1ObjectMeta(
716 name=name,
717 namespace=ns,
718 labels={"name": name}
719 ),
720 spec=kclient.V1PersistentVolumeSpec(
721 storage_class_name=storage_class,
722 access_modes=[mode],
723 capacity={'storage': size},
724 host_path=kclient.V1HostPathVolumeSource(path=path)
725 )
726 )
727
728 def init_service(
729 self,
730 name,
731 port,
732 clusterip=None,
733 ns="qa-space"
734 ):
735 """ Inits a V1Service object with data for benchmark agent"""
736 _meta = kclient.V1ObjectMeta(
737 name=name,
738 namespace=ns,
739 labels={"name": name}
740 )
741 _port = kclient.V1ServicePort(
742 port=port,
743 protocol="TCP",
744 target_port=port
745 )
746 _spec = kclient.V1ServiceSpec(
747 # cluster_ip=clusterip,
748 selector={"name": name},
749 # type="ClusterIP",
750 ports=[_port]
751 )
752 return kclient.V1Service(
753 api_version="v1",
754 kind="Service",
755 metadata=_meta,
756 spec=_spec
757 )
758
759 def prepare_pv(self, pv_object):
Alex2a7657c2021-11-10 20:51:34 -0600760 _existing = self.get_pv_by_name(pv_object.metadata.name)
Alex5cace3b2021-11-10 16:40:37 -0600761 if _existing is not None:
762 self.CoreV1.replace_persistent_volume(
763 pv_object.metadata.name,
764 pv_object
765 )
766 else:
767 self.CoreV1.create_persistent_volume(pv_object)
768
Alex2a7657c2021-11-10 20:51:34 -0600769 return self.wait_for_phase(
770 "pv",
771 pv_object.metadata.name,
772 None,
773 ["Available", "Bound"]
Alex5cace3b2021-11-10 16:40:37 -0600774 )
775
776 def prepare_pvc(self, pvc_object):
Alex2a7657c2021-11-10 20:51:34 -0600777 _existing = self.get_pvc_by_name_and_ns(
778 pvc_object.metadata.name,
779 pvc_object.metadata.namespace
Alex5cace3b2021-11-10 16:40:37 -0600780 )
781 if _existing is not None:
782 _size_r = pvc_object.spec.resources.requests["storage"]
783 _size_e = _existing.spec.resources.requests["storage"]
Alex2a7657c2021-11-10 20:51:34 -0600784 logger_cli.info(
785 "-> Found PVC '{}/{}' with {}. Requested: {}'".format(
Alex5cace3b2021-11-10 16:40:37 -0600786 pvc_object.metadata.namespace,
787 pvc_object.metadata.name,
788 _size_e,
789 _size_r
790 )
791 )
792 if _size_r != _size_e:
793 raise CheckerException(
794 "ERROR: PVC exists on the cloud with different size "
795 "than needed. Please cleanup!"
796 )
797 else:
798 logger_cli.debug(
799 "... creating pvc '{}'".format(pvc_object.metadata.name)
800 )
801 self.CoreV1.create_namespaced_persistent_volume_claim(
802 pvc_object.metadata.namespace,
803 pvc_object
804 )
805
Alex2a7657c2021-11-10 20:51:34 -0600806 return self.wait_for_phase(
807 "pvc",
808 pvc_object.metadata.name,
809 pvc_object.metadata.namespace,
810 ["Available", "Bound"]
811 )
812
813 def get_pod_by_name_and_ns(self, name, ns):
814 return self.safe_get_item_by_name(
815 self.CoreV1.list_namespaced_pod(
816 ns,
817 label_selector='name={}'.format(name)
818 ),
819 name
820 )
821
Alexb2129542021-11-23 15:49:42 -0600822 def list_pods(self, ns, label_str=None):
823 return self.CoreV1.list_namespaced_pod(
824 ns,
825 label_selector=label_str
826 )
827
Alex2a7657c2021-11-10 20:51:34 -0600828 def get_svc_by_name_and_ns(self, name, ns):
829 return self.safe_get_item_by_name(
830 self.CoreV1.list_namespaced_service(
831 ns,
832 label_selector='name={}'.format(name)
833 ),
834 name
835 )
836
Alexb2129542021-11-23 15:49:42 -0600837 def list_svc(self, ns, label_str=None):
838 return self.CoreV1.list_namespaced_service(
839 ns,
840 label_selector=label_str
841 )
842
Alex2a7657c2021-11-10 20:51:34 -0600843 def get_pvc_by_name_and_ns(self, name, ns):
844 return self.safe_get_item_by_name(
845 self.CoreV1.list_namespaced_persistent_volume_claim(
846 ns,
847 label_selector='name={}'.format(name)
848 ),
849 name
850 )
851
Alexb2129542021-11-23 15:49:42 -0600852 def list_pvc(self, ns, label_str=None):
853 return self.CoreV1.list_namespaced_persistent_volume_claim(
854 ns,
855 label_selector=label_str
856 )
857
Alex2a7657c2021-11-10 20:51:34 -0600858 def get_pv_by_name(self, name):
859 return self.safe_get_item_by_name(
860 self.CoreV1.list_persistent_volume(
861 label_selector='name={}'.format(name)
862 ),
863 name
864 )
865
Alexb2129542021-11-23 15:49:42 -0600866 def list_pv(self, label_str=None):
867 return self.CoreV1.list_persistent_volume(
868 label_selector=label_str
869 )
870
Alex2a7657c2021-11-10 20:51:34 -0600871 def wait_for_phase(self, ttype, name, ns, phase_list, timeout=120):
872 logger_cli.debug(
873 "... waiting '{}'s until {} is '{}'".format(
874 timeout,
875 ttype,
876 ", ".join(phase_list)
877 )
878 )
879 while timeout > 0:
880 if ttype == "pod":
881 _t = self.get_pod_by_name_and_ns(name, ns)
882 elif ttype == "svc":
883 _t = self.get_svc_by_name_and_ns(name, ns)
884 elif ttype == "pvc":
885 _t = self.get_pvc_by_name_and_ns(name, ns)
886 elif ttype == "pv":
887 _t = self.get_pv_by_name(name)
888 if "Terminated" in phase_list and not _t:
889 if ns:
890 _s = "... {} {}/{} not found".format(ttype, ns, name)
Alex5cace3b2021-11-10 16:40:37 -0600891 else:
Alex2a7657c2021-11-10 20:51:34 -0600892 _s = "... {} '{}' not found".format(ttype, name)
893 logger_cli.debug(_s)
894 return None
895 logger_cli.debug("... {} is '{}'".format(ttype, _t.status.phase))
896 if _t.status.phase in phase_list:
897 return _t
898 sleep(2)
899 timeout -= 2
Alex5cace3b2021-11-10 16:40:37 -0600900 raise CheckerException(
Alex2a7657c2021-11-10 20:51:34 -0600901 "Timed out waiting for {} '{}' in '{}'".format(
902 ttype,
903 name,
904 ", ".join(ttype)
Alex5cace3b2021-11-10 16:40:37 -0600905 )
906 )
907
908 def prepare_pod_from_yaml(self, pod_yaml):
Alex2a7657c2021-11-10 20:51:34 -0600909 _existing = self.get_pod_by_name_and_ns(
910 pod_yaml['metadata']['name'],
911 pod_yaml['metadata']['namespace']
Alex5cace3b2021-11-10 16:40:37 -0600912 )
913 if _existing is not None:
Alexbfa947c2021-11-11 18:14:28 -0600914 logger_cli.info(
915 "-> Found pod '{}/{}'. Reusing.".format(
Alex5cace3b2021-11-10 16:40:37 -0600916 pod_yaml['metadata']['namespace'],
917 pod_yaml['metadata']['name']
918 )
919 )
920 return _existing
921 else:
922 self.CoreV1.create_namespaced_pod(
923 pod_yaml['metadata']['namespace'],
924 pod_yaml
925 )
Alex2a7657c2021-11-10 20:51:34 -0600926 return self.wait_for_phase(
927 "pod",
928 pod_yaml['metadata']['name'],
929 pod_yaml['metadata']['namespace'],
930 ["Running"]
Alex5cace3b2021-11-10 16:40:37 -0600931 )
932
933 def expose_pod_port(self, pod_object, port, ns="qa-space"):
Alex2a7657c2021-11-10 20:51:34 -0600934 _existing = self.get_svc_by_name_and_ns(
935 pod_object.metadata.name,
936 pod_object.metadata.namespace
Alex5cace3b2021-11-10 16:40:37 -0600937 )
938 if _existing is not None:
939 # TODO: Check port number?
Alex2a7657c2021-11-10 20:51:34 -0600940 logger_cli.info(
941 "-> Pod already exposed '{}/{}:{}'. Reusing.".format(
Alex5cace3b2021-11-10 16:40:37 -0600942 pod_object.metadata.namespace,
943 pod_object.metadata.name,
944 port
945 )
946 )
947 return _existing
948 else:
949 logger_cli.debug(
950 "... creating service for pod {}/{}: {}:{}".format(
951 pod_object.metadata.namespace,
952 pod_object.metadata.name,
953 pod_object.status.pod_ip,
954 port
955 )
956 )
957 _svc = self.init_service(
958 pod_object.metadata.name,
959 port
960 )
961 return self.CoreV1.create_namespaced_service(
962 pod_object.metadata.namespace,
963 _svc
964 )