blob: 0ce573acdf94d1f097362c4291e3df306e7e4832 [file] [log] [blame]
Alex9a4ad212020-10-01 18:04:25 -05001"""
2Module to handle interaction with Kube
3"""
4import base64
5import os
6import urllib3
7import yaml
8
Alex5cace3b2021-11-10 16:40:37 -06009from kubernetes import client as kclient, config as kconfig, watch
Alex9a4ad212020-10-01 18:04:25 -050010from kubernetes.stream import stream
Alex7b0ee9a2021-09-21 17:16:17 -050011from kubernetes.client.rest import ApiException
Alex5cace3b2021-11-10 16:40:37 -060012from time import time, sleep
Alex9a4ad212020-10-01 18:04:25 -050013
14from cfg_checker.common import logger, logger_cli
Alex7b0ee9a2021-09-21 17:16:17 -050015from cfg_checker.common.decorators import retry
Alex5cace3b2021-11-10 16:40:37 -060016from cfg_checker.common.exception import CheckerException, \
17 InvalidReturnException, KubeException
Alex9a4ad212020-10-01 18:04:25 -050018from cfg_checker.common.file_utils import create_temp_file_with_content
19from cfg_checker.common.other import utils, shell
20from cfg_checker.common.ssh_utils import ssh_shell_p
Alex359e5752021-08-16 17:28:30 -050021from cfg_checker.common.const import ENV_LOCAL
Alex9a4ad212020-10-01 18:04:25 -050022
Alex7b0ee9a2021-09-21 17:16:17 -050023
Alex9a4ad212020-10-01 18:04:25 -050024urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
25
26
27def _init_kube_conf_local(config):
28 # Init kube library locally
Alex359e5752021-08-16 17:28:30 -050029 _path = "local:{}".format(config.kube_config_path)
Alex9a4ad212020-10-01 18:04:25 -050030 try:
Alexc4f59622021-08-27 13:42:00 -050031 kconfig.load_kube_config(config_file=config.kube_config_path)
Alex33747812021-04-07 10:11:39 -050032 if config.insecure:
33 kconfig.assert_hostname = False
34 kconfig.client_side_validation = False
Alex9a4ad212020-10-01 18:04:25 -050035 logger_cli.debug(
Alexc4f59622021-08-27 13:42:00 -050036 "... found Kube env: core, {}". format(
Alex9a4ad212020-10-01 18:04:25 -050037 ",".join(
38 kclient.CoreApi().get_api_versions().versions
39 )
40 )
41 )
Alexc4f59622021-08-27 13:42:00 -050042 return kconfig, kclient.ApiClient(), _path
Alex9a4ad212020-10-01 18:04:25 -050043 except Exception as e:
44 logger.warn("Failed to init local Kube client: {}".format(
45 str(e)
46 )
47 )
Alex359e5752021-08-16 17:28:30 -050048 return None, None, _path
Alex9a4ad212020-10-01 18:04:25 -050049
50
51def _init_kube_conf_remote(config):
52 # init remote client
53 # Preload Kube token
54 """
55 APISERVER=$(kubectl config view --minify |
56 grep server | cut -f 2- -d ":" | tr -d " ")
57 SECRET_NAME=$(kubectl get secrets |
58 grep ^default | cut -f1 -d ' ')
59 TOKEN=$(kubectl describe secret $SECRET_NAME |
60 grep -E '^token' | cut -f2 -d':' | tr -d " ")
61
62 echo "Detected API Server at: '${APISERVER}'"
63 echo "Got secret: '${SECRET_NAME}'"
64 echo "Loaded token: '${TOKEN}'"
65
66 curl $APISERVER/api
67 --header "Authorization: Bearer $TOKEN" --insecure
68 """
69 import yaml
Alex359e5752021-08-16 17:28:30 -050070 _path = ''
Alexc4f59622021-08-27 13:42:00 -050071 # Try to load remote config only if it was not detected already
72 if not config.kube_config_detected and not config.env_name == ENV_LOCAL:
Alex359e5752021-08-16 17:28:30 -050073 _path = "{}@{}:{}".format(
74 config.ssh_user,
75 config.ssh_host,
76 config.kube_config_path
77 )
Alex9d913532021-03-24 18:01:45 -050078 _c_data = ssh_shell_p(
Alexc4f59622021-08-27 13:42:00 -050079 "cat " + config.kube_config_path,
Alex9d913532021-03-24 18:01:45 -050080 config.ssh_host,
81 username=config.ssh_user,
82 keypath=config.ssh_key,
83 piped=False,
84 use_sudo=config.ssh_uses_sudo,
85 )
86 else:
Alex359e5752021-08-16 17:28:30 -050087 _path = "local:{}".format(config.kube_config_path)
Alex9d913532021-03-24 18:01:45 -050088 with open(config.kube_config_path, 'r') as ff:
89 _c_data = ff.read()
Alex9a4ad212020-10-01 18:04:25 -050090
Alex359e5752021-08-16 17:28:30 -050091 if len(_c_data) < 1:
92 return None, None, _path
93
Alex9a4ad212020-10-01 18:04:25 -050094 _conf = yaml.load(_c_data, Loader=yaml.SafeLoader)
95
96 _kube_conf = kclient.Configuration()
97 # A remote host configuration
98
99 # To work with remote cluster, we need to extract these
100 # keys = ['host', 'ssl_ca_cert', 'cert_file', 'key_file', 'verify_ssl']
101 # When v12 of the client will be release, we will use load_from_dict
102
103 _kube_conf.ssl_ca_cert = create_temp_file_with_content(
104 base64.standard_b64decode(
105 _conf['clusters'][0]['cluster']['certificate-authority-data']
106 )
107 )
108 _host = _conf['clusters'][0]['cluster']['server']
109 _kube_conf.cert_file = create_temp_file_with_content(
110 base64.standard_b64decode(
111 _conf['users'][0]['user']['client-certificate-data']
112 )
113 )
114 _kube_conf.key_file = create_temp_file_with_content(
115 base64.standard_b64decode(
116 _conf['users'][0]['user']['client-key-data']
117 )
118 )
119 if "http" not in _host or "443" not in _host:
120 logger_cli.error(
121 "Failed to extract Kube host: '{}'".format(_host)
122 )
123 else:
124 logger_cli.debug(
Alexc4f59622021-08-27 13:42:00 -0500125 "... 'context' host extracted: '{}' via SSH@{}".format(
Alex9a4ad212020-10-01 18:04:25 -0500126 _host,
127 config.ssh_host
128 )
129 )
130
131 # Substitute context host to ours
132 _tmp = _host.split(':')
133 _kube_conf.host = \
134 _tmp[0] + "://" + config.mcp_host + ":" + _tmp[2]
135 config.kube_port = _tmp[2]
136 logger_cli.debug(
Alexc4f59622021-08-27 13:42:00 -0500137 "... kube remote host updated to {}".format(
Alex9a4ad212020-10-01 18:04:25 -0500138 _kube_conf.host
139 )
140 )
141 _kube_conf.verify_ssl = False
142 _kube_conf.debug = config.debug
Alex33747812021-04-07 10:11:39 -0500143 if config.insecure:
144 _kube_conf.assert_hostname = False
145 _kube_conf.client_side_validation = False
146
Alex9a4ad212020-10-01 18:04:25 -0500147 # Nevertheless if you want to do it
148 # you can with these 2 parameters
149 # configuration.verify_ssl=True
150 # ssl_ca_cert is the filepath
151 # to the file that contains the certificate.
152 # configuration.ssl_ca_cert="certificate"
153
154 # _kube_conf.api_key = {
155 # "authorization": "Bearer " + config.kube_token
156 # }
157
158 # Create a ApiClient with our config
159 _kube_api = kclient.ApiClient(_kube_conf)
160
Alex359e5752021-08-16 17:28:30 -0500161 return _kube_conf, _kube_api, _path
Alex9a4ad212020-10-01 18:04:25 -0500162
163
164class KubeApi(object):
165 def __init__(self, config):
166 self.config = config
Alex359e5752021-08-16 17:28:30 -0500167 self.initialized = self._init_kclient()
Alex9a4ad212020-10-01 18:04:25 -0500168 self.last_response = None
169
170 def _init_kclient(self):
171 # if there is no password - try to get local, if this available
Alex359e5752021-08-16 17:28:30 -0500172 logger_cli.debug("... init kube config")
Alex9a4ad212020-10-01 18:04:25 -0500173 if self.config.env_name == "local":
Alex359e5752021-08-16 17:28:30 -0500174 self.kConf, self.kApi, self.kConfigPath = _init_kube_conf_local(
175 self.config
176 )
Alex9a4ad212020-10-01 18:04:25 -0500177 self.is_local = True
Alexc4f59622021-08-27 13:42:00 -0500178 # Try to load local config data
179 if self.config.kube_config_path and \
180 os.path.exists(self.config.kube_config_path):
181 _cmd = "cat " + self.config.kube_config_path
182 _c_data = shell(_cmd)
Alex9a4ad212020-10-01 18:04:25 -0500183 _conf = yaml.load(_c_data, Loader=yaml.SafeLoader)
184 self.user_keypath = create_temp_file_with_content(
185 base64.standard_b64decode(
186 _conf['users'][0]['user']['client-key-data']
187 )
188 )
189 self.yaml_conf = _c_data
190 else:
Alex359e5752021-08-16 17:28:30 -0500191 self.kConf, self.kApi, self.kConfigPath = _init_kube_conf_remote(
192 self.config
193 )
Alex9a4ad212020-10-01 18:04:25 -0500194 self.is_local = False
195
Alex359e5752021-08-16 17:28:30 -0500196 if self.kConf is None or self.kApi is None:
197 return False
198 else:
199 return True
200
Alex9a4ad212020-10-01 18:04:25 -0500201 def get_versions_api(self):
202 # client.CoreApi().get_api_versions().versions
203 return kclient.VersionApi(self.kApi)
204
205
206class KubeRemote(KubeApi):
207 def __init__(self, config):
208 super(KubeRemote, self).__init__(config)
209 self._coreV1 = None
Alex1f90e7b2021-09-03 15:31:28 -0500210 self._appsV1 = None
211 self._podV1 = None
Alexdcb792f2021-10-04 14:24:21 -0500212 self._custom = None
213
214 @property
215 def CustomObjects(self):
216 if not self._custom:
217 self._custom = kclient.CustomObjectsApi(self.kApi)
218 return self._custom
Alex9a4ad212020-10-01 18:04:25 -0500219
220 @property
221 def CoreV1(self):
222 if not self._coreV1:
Alex7b0ee9a2021-09-21 17:16:17 -0500223 if self.is_local:
224 self._coreV1 = kclient.CoreV1Api(kclient.ApiClient())
225 else:
226 self._coreV1 = kclient.CoreV1Api(kclient.ApiClient(self.kConf))
Alex9a4ad212020-10-01 18:04:25 -0500227 return self._coreV1
228
Alex1f90e7b2021-09-03 15:31:28 -0500229 @property
230 def AppsV1(self):
231 if not self._appsV1:
232 self._appsV1 = kclient.AppsV1Api(self.kApi)
233 return self._appsV1
234
235 @property
236 def PodsV1(self):
237 if not self._podsV1:
238 self._podsV1 = kclient.V1Pod(self.kApi)
239 return self._podsV1
240
Alex9a4ad212020-10-01 18:04:25 -0500241 @staticmethod
242 def _typed_list_to_dict(i_list):
243 _dict = {}
244 for _item in i_list:
245 _d = _item.to_dict()
246 _type = _d.pop("type")
247 _dict[_type.lower()] = _d
248
249 return _dict
250
251 @staticmethod
252 def _get_listed_attrs(items, _path):
253 _list = []
254 for _n in items:
255 _list.append(utils.rgetattr(_n, _path))
256
257 return _list
258
Alex1f90e7b2021-09-03 15:31:28 -0500259 @staticmethod
260 def safe_get_item_by_name(api_resource, _name):
261 for item in api_resource.items:
262 if item.metadata.name == _name:
263 return item
264
265 return None
266
Alex5cace3b2021-11-10 16:40:37 -0600267 def wait_for(self, _func, phase, *args, **kwargs):
268 w = watch.Watch()
269 start_time = time()
270 for event in w.stream(_func, *args, **kwargs):
271 if event["object"].status.phase == phase:
272 w.stop()
273 end_time = time()
274 logger_cli.debug(
275 "... bacame '{}' in {:0.2f} sec".format(
276 phase,
277 end_time-start_time
278 )
279 )
280 return
281 # event.type: ADDED, MODIFIED, DELETED
282 if event["type"] == "DELETED":
283 # Pod was deleted while we were waiting for it to start.
284 logger_cli.debug("... deleted before started")
285 w.stop()
286 return
287
Alex9a4ad212020-10-01 18:04:25 -0500288 def get_node_info(self, http=False):
289 # Query API for the nodes and do some presorting
290 _nodes = {}
291 if http:
292 _raw_nodes = self.CoreV1.list_node_with_http_info()
293 else:
294 _raw_nodes = self.CoreV1.list_node()
295
296 if not isinstance(_raw_nodes, kclient.models.v1_node_list.V1NodeList):
297 raise InvalidReturnException(
298 "Invalid return type: '{}'".format(type(_raw_nodes))
299 )
300
301 for _n in _raw_nodes.items:
302 _name = _n.metadata.name
303 _d = _n.to_dict()
304 # parse inner data classes as dicts
305 _d['addresses'] = self._typed_list_to_dict(_n.status.addresses)
306 _d['conditions'] = self._typed_list_to_dict(_n.status.conditions)
307 # Update 'status' type
308 if isinstance(_d['conditions']['ready']['status'], str):
309 _d['conditions']['ready']['status'] = utils.to_bool(
310 _d['conditions']['ready']['status']
311 )
312 # Parse image names?
313 # TODO: Here is the place where we can parse each node image names
314
315 # Parse roles
316 _d['labels'] = {}
317 for _label, _data in _d["metadata"]["labels"].items():
318 if _data.lower() in ["true", "false"]:
319 _d['labels'][_label] = utils.to_bool(_data)
320 else:
321 _d['labels'][_label] = _data
322
323 # Save
324 _nodes[_name] = _d
325
326 # debug report on how many nodes detected
327 logger_cli.debug("...node items returned '{}'".format(len(_nodes)))
328
329 return _nodes
330
Alexdcb792f2021-10-04 14:24:21 -0500331 def get_pod_names_by_partial_name(self, partial_name, ns):
332 logger_cli.debug('... searching for pods with {}'.format(partial_name))
333 _pods = self.CoreV1.list_namespaced_pod(ns)
334 _names = self._get_listed_attrs(_pods.items, "metadata.name")
335 _pnames = [n for n in _names if partial_name in n]
336 if len(_pnames) > 1:
337 logger_cli.debug(
338 "... more than one pod found for '{}': {}\n".format(
339 partial_name,
340 ", ".join(_pnames)
341 )
342 )
343 elif len(_pnames) < 1:
344 logger_cli.warning(
345 "WARNING: No pods found for '{}'".format(partial_name)
346 )
347
348 return _pnames
349
350 def get_pods_by_partial_name(self, partial_name, ns):
351 logger_cli.debug('... searching for pods with {}'.format(partial_name))
352 _all_pods = self.CoreV1.list_namespaced_pod(ns)
353 # _names = self._get_listed_attrs(_pods.items, "metadata.name")
354 _pods = [_pod for _pod in _all_pods.items
355 if partial_name in _pod.metadata.name]
356 if len(_pods) > 1:
357 logger_cli.debug(
358 "... more than one pod found for '{}': {}\n".format(
359 partial_name,
360 ", ".join(partial_name)
361 )
362 )
363 elif len(_pods) < 1:
364 logger_cli.warning(
365 "WARNING: No pods found for '{}'".format(partial_name)
366 )
367
368 return _pods
369
Alex9a4ad212020-10-01 18:04:25 -0500370 def exec_on_target_pod(
371 self,
372 cmd,
373 pod_name,
374 namespace,
375 strict=False,
376 _request_timeout=120,
Alexb78191f2021-11-02 16:35:46 -0500377 arguments=None,
Alex9a4ad212020-10-01 18:04:25 -0500378 **kwargs
379 ):
Alexdcb792f2021-10-04 14:24:21 -0500380 _pname = ""
Alex9a4ad212020-10-01 18:04:25 -0500381 if not strict:
Alex1f90e7b2021-09-03 15:31:28 -0500382 logger_cli.debug(
383 "... searching for pods with the name '{}'".format(pod_name)
384 )
385 _pods = {}
Alex7b0ee9a2021-09-21 17:16:17 -0500386 _pods = self.CoreV1.list_namespaced_pod(namespace)
Alex1f90e7b2021-09-03 15:31:28 -0500387 _names = self._get_listed_attrs(_pods.items, "metadata.name")
Alex33747812021-04-07 10:11:39 -0500388 _pnames = [n for n in _names if n.startswith(pod_name)]
389 if len(_pnames) > 1:
Alex9a4ad212020-10-01 18:04:25 -0500390 logger_cli.debug(
Alexc4f59622021-08-27 13:42:00 -0500391 "... more than one pod found for '{}': {}\n"
392 "... using first one".format(
Alex9a4ad212020-10-01 18:04:25 -0500393 pod_name,
Alex33747812021-04-07 10:11:39 -0500394 ", ".join(_pnames)
Alex9a4ad212020-10-01 18:04:25 -0500395 )
396 )
Alexdcb792f2021-10-04 14:24:21 -0500397 elif len(_pnames) < 1:
Alex9a4ad212020-10-01 18:04:25 -0500398 raise KubeException("No pods found for '{}'".format(pod_name))
Alexb78191f2021-11-02 16:35:46 -0500399 # in case of >1 and =1 we are taking 1st anyway
400 _pname = _pnames[0]
Alex9a4ad212020-10-01 18:04:25 -0500401 else:
402 _pname = pod_name
Alex33747812021-04-07 10:11:39 -0500403 logger_cli.debug(
Alexb78191f2021-11-02 16:35:46 -0500404 "... cmd: [CoreV1] exec {} -n {} -- {} '{}'".format(
Alex33747812021-04-07 10:11:39 -0500405 _pname,
406 namespace,
Alexb78191f2021-11-02 16:35:46 -0500407 cmd,
408 arguments
Alex33747812021-04-07 10:11:39 -0500409 )
410 )
Alex1f90e7b2021-09-03 15:31:28 -0500411 # Set preload_content to False to preserve JSON
412 # If not, output gets converted to str
413 # Which causes to change " to '
414 # After that json.loads(...) fail
Alex7b0ee9a2021-09-21 17:16:17 -0500415 cmd = cmd if isinstance(cmd, list) else cmd.split()
Alexb78191f2021-11-02 16:35:46 -0500416 if arguments:
417 cmd += [arguments]
Alex1f90e7b2021-09-03 15:31:28 -0500418 _pod_stream = stream(
Alex9a4ad212020-10-01 18:04:25 -0500419 self.CoreV1.connect_get_namespaced_pod_exec,
420 _pname,
421 namespace,
Alex7b0ee9a2021-09-21 17:16:17 -0500422 command=cmd,
Alex9a4ad212020-10-01 18:04:25 -0500423 stderr=True,
424 stdin=False,
425 stdout=True,
426 tty=False,
427 _request_timeout=_request_timeout,
Alex1f90e7b2021-09-03 15:31:28 -0500428 _preload_content=False,
Alex9a4ad212020-10-01 18:04:25 -0500429 **kwargs
430 )
Alex1f90e7b2021-09-03 15:31:28 -0500431 # run for timeout
432 _pod_stream.run_forever(timeout=_request_timeout)
433 # read the output
Alex7b0ee9a2021-09-21 17:16:17 -0500434 _output = _pod_stream.read_stdout()
Alexb78191f2021-11-02 16:35:46 -0500435 _error = _pod_stream.read_stderr()
436 if _error:
437 # copy error to output
438 logger_cli.warning(
439 "WARNING: cmd of '{}' returned error:\n{}\n".format(
440 " ".join(cmd),
441 _error
442 )
443 )
444 if not _output:
445 _output = _error
Alex7b0ee9a2021-09-21 17:16:17 -0500446 # Force recreate of api objects
447 self._coreV1 = None
448 # Send output
449 return _output
Alex9a4ad212020-10-01 18:04:25 -0500450
Alex1f90e7b2021-09-03 15:31:28 -0500451 def ensure_namespace(self, ns):
452 """
453 Ensure that given namespace exists
454 """
455 # list active namespaces
456 _v1NamespaceList = self.CoreV1.list_namespace()
457 _ns = self.safe_get_item_by_name(_v1NamespaceList, ns)
458
459 if _ns is None:
460 logger_cli.debug("... creating namespace '{}'".format(ns))
Alexdcb792f2021-10-04 14:24:21 -0500461 _new_ns = kclient.V1Namespace()
462 _new_ns.metadata = kclient.V1ObjectMeta(name=ns)
463 _r = self.CoreV1.create_namespace(_new_ns)
Alex1f90e7b2021-09-03 15:31:28 -0500464 # TODO: check return on fail
465 if not _r:
466 return False
467 else:
468 logger_cli.debug("... found existing namespace '{}'".format(ns))
469
470 return True
471
472 def get_daemon_set_by_name(self, ns, name):
473 return self.safe_get_item_by_name(
474 self.AppsV1.list_namespaced_daemon_set(ns),
475 name
476 )
477
478 def create_config_map(self, ns, name, source, recreate=True):
479 """
480 Creates/Overwrites ConfigMap in working namespace
481 """
482 # Prepare source
483 logger_cli.debug(
484 "... preparing config map '{}/{}' with files from '{}'".format(
485 ns,
486 name,
487 source
488 )
489 )
490 _data = {}
491 if os.path.isfile(source):
492 # populate data with one file
493 with open(source, 'rt') as fS:
494 _data[os.path.split(source)[1]] = fS.read()
495 elif os.path.isdir(source):
496 # walk dirs and populate all 'py' files
497 for path, dirs, files in os.walk(source):
498 _e = ('.py')
499 _subfiles = (_fl for _fl in files
500 if _fl.endswith(_e) and not _fl.startswith('.'))
501 for _file in _subfiles:
502 with open(os.path.join(path, _file), 'rt') as fS:
503 _data[_file] = fS.read()
504
505 _cm = kclient.V1ConfigMap()
506 _cm.metadata = kclient.V1ObjectMeta(name=name, namespace=ns)
507 _cm.data = _data
508 logger_cli.debug(
509 "... prepared config map with {} scripts".format(len(_data))
510 )
511 # Query existing configmap, delete if needed
512 _existing_cm = self.safe_get_item_by_name(
513 self.CoreV1.list_namespaced_config_map(namespace=ns),
514 name
515 )
516 if _existing_cm is not None:
517 self.CoreV1.replace_namespaced_config_map(
518 namespace=ns,
519 name=name,
520 body=_cm
521 )
522 logger_cli.debug(
523 "... replaced existing config map '{}/{}'".format(
524 ns,
525 name
526 )
527 )
528 else:
529 # Create it
530 self.CoreV1.create_namespaced_config_map(
531 namespace=ns,
532 body=_cm
533 )
534 logger_cli.debug("... created config map '{}/{}'".format(
535 ns,
536 name
537 ))
538
539 return _data.keys()
540
541 def prepare_daemonset_from_yaml(self, ns, ds_yaml):
542 _name = ds_yaml['metadata']['name']
543 _ds = self.get_daemon_set_by_name(ns, _name)
544
545 if _ds is not None:
546 logger_cli.debug(
547 "... found existing daemonset '{}'".format(_name)
548 )
549 _r = self.AppsV1.replace_namespaced_daemon_set(
550 _ds.metadata.name,
551 _ds.metadata.namespace,
552 body=ds_yaml
553 )
554 logger_cli.debug(
555 "... replacing existing daemonset '{}'".format(_name)
556 )
557 return _r
558 else:
559 logger_cli.debug(
560 "... creating daemonset '{}'".format(_name)
561 )
562 _r = self.AppsV1.create_namespaced_daemon_set(ns, body=ds_yaml)
563 return _r
564
565 def delete_daemon_set_by_name(self, ns, name):
566 return self.AppsV1.delete_namespaced_daemon_set(name, ns)
567
568 def exec_on_all_pods(self, pods):
569 """
570 Create multiple threads to execute script on all target pods
571 """
572 # Create map for threads: [[node_name, ns, pod_name]...]
573 _pod_list = []
574 for item in pods.items:
575 _pod_list.append(
576 [
577 item.spec.nodeName,
578 item.metadata.namespace,
579 item.metadata.name
580 ]
581 )
582
583 # map func and cmd
Alexdcb792f2021-10-04 14:24:21 -0500584 logger_cli.error("ERROR: 'exec_on_all_pods'is not implemented yet")
Alex1f90e7b2021-09-03 15:31:28 -0500585 # create result list
586
587 return []
Alex7b0ee9a2021-09-21 17:16:17 -0500588
589 @retry(ApiException)
590 def get_pods_for_daemonset(self, ds):
591 # get all pod names for daemonset
592 logger_cli.debug(
593 "... extracting pod names from daemonset '{}'".format(
594 ds.metadata.name
595 )
596 )
597 _ns = ds.metadata.namespace
598 _name = ds.metadata.name
599 _pods = self.CoreV1.list_namespaced_pod(
600 namespace=_ns,
601 label_selector='name={}'.format(_name)
602 )
603 return _pods
604
605 def put_string_buffer_to_pod_as_textfile(
606 self,
607 pod_name,
608 namespace,
609 buffer,
610 filepath,
611 _request_timeout=120,
612 **kwargs
613 ):
614 _command = ['/bin/sh']
615 response = stream(
616 self.CoreV1.connect_get_namespaced_pod_exec,
617 pod_name,
618 namespace,
619 command=_command,
620 stderr=True,
621 stdin=True,
622 stdout=True,
623 tty=False,
624 _request_timeout=_request_timeout,
625 _preload_content=False,
626 **kwargs
627 )
628
629 # if json
630 # buffer = json.dumps(_dict, indent=2).encode('utf-8')
631
632 commands = [
633 bytes("cat <<'EOF' >" + filepath + "\n", 'utf-8'),
634 buffer,
635 bytes("\n" + "EOF\n", 'utf-8')
636 ]
637
638 while response.is_open():
639 response.update(timeout=1)
640 if response.peek_stdout():
641 logger_cli.debug("... STDOUT: %s" % response.read_stdout())
642 if response.peek_stderr():
643 logger_cli.debug("... STDERR: %s" % response.read_stderr())
644 if commands:
645 c = commands.pop(0)
646 logger_cli.debug("... running command... {}\n".format(c))
647 response.write_stdin(str(c, encoding='utf-8'))
648 else:
649 break
650 response.close()
651
652 # Force recreate of Api objects
653 self._coreV1 = None
654
655 return
Alexdcb792f2021-10-04 14:24:21 -0500656
657 def get_custom_resource(self, group, version, plural):
658 # Get it
659 # Example:
660 # kubernetes.client.CustomObjectsApi().list_cluster_custom_object(
661 # group="networking.istio.io",
662 # version="v1alpha3",
663 # plural="serviceentries"
664 # )
665 return self.CustomObjects.list_cluster_custom_object(
666 group=group,
667 version=version,
668 plural=plural
669 )
Alex5cace3b2021-11-10 16:40:37 -0600670
671 def init_pvc_resource(
672 self,
673 name,
674 storage_class,
675 size,
676 ns="qa-space",
677 mode="ReadWriteOnce"
678 ):
679 """Return the Kubernetes PVC resource"""
680 return kclient.V1PersistentVolumeClaim(
681 api_version='v1',
682 kind='PersistentVolumeClaim',
683 metadata=kclient.V1ObjectMeta(
684 name=name,
685 namespace=ns,
686 labels={"name": name}
687 ),
688 spec=kclient.V1PersistentVolumeClaimSpec(
689 storage_class_name=storage_class,
690 access_modes=[mode],
691 resources=kclient.V1ResourceRequirements(
692 requests={'storage': size}
693 )
694 )
695 )
696
697 def init_pv_resource(
698 self,
699 name,
700 storage_class,
701 size,
702 path,
703 ns="qa-space",
704 mode="ReadWriteOnce"
705 ):
706 """Return the Kubernetes PVC resource"""
707 return kclient.V1PersistentVolume(
708 api_version='v1',
709 kind='PersistentVolume',
710 metadata=kclient.V1ObjectMeta(
711 name=name,
712 namespace=ns,
713 labels={"name": name}
714 ),
715 spec=kclient.V1PersistentVolumeSpec(
716 storage_class_name=storage_class,
717 access_modes=[mode],
718 capacity={'storage': size},
719 host_path=kclient.V1HostPathVolumeSource(path=path)
720 )
721 )
722
723 def init_service(
724 self,
725 name,
726 port,
727 clusterip=None,
728 ns="qa-space"
729 ):
730 """ Inits a V1Service object with data for benchmark agent"""
731 _meta = kclient.V1ObjectMeta(
732 name=name,
733 namespace=ns,
734 labels={"name": name}
735 )
736 _port = kclient.V1ServicePort(
737 port=port,
738 protocol="TCP",
739 target_port=port
740 )
741 _spec = kclient.V1ServiceSpec(
742 # cluster_ip=clusterip,
743 selector={"name": name},
744 # type="ClusterIP",
745 ports=[_port]
746 )
747 return kclient.V1Service(
748 api_version="v1",
749 kind="Service",
750 metadata=_meta,
751 spec=_spec
752 )
753
754 def prepare_pv(self, pv_object):
755 def _list_pv():
756 return self.CoreV1.list_persistent_volume(
757 label_selector='name={}'.format(pv_object.metadata.name)
758 )
759 _existing = self.safe_get_item_by_name(
760 _list_pv(),
761 pv_object.metadata.name
762 )
763 if _existing is not None:
764 self.CoreV1.replace_persistent_volume(
765 pv_object.metadata.name,
766 pv_object
767 )
768 else:
769 self.CoreV1.create_persistent_volume(pv_object)
770
771 _timeout = 60
772 while _timeout > 0:
773 _t = _list_pv()
774 for item in _t.items:
775 if item.status.phase in ["Available", "Bound"]:
776 return item
777 sleep(1)
778 _timeout -= 1
779 raise CheckerException(
780 "Timed out creating PV '{}'".format(
781 pv_object.metadata.name
782 )
783 )
784
785 def prepare_pvc(self, pvc_object):
786 def _list_pvc():
787 return self.CoreV1.list_namespaced_persistent_volume_claim(
788 pvc_object.metadata.namespace,
789 label_selector='name={}'.format(pvc_object.metadata.name)
790 )
791 _existing = self.safe_get_item_by_name(
792 _list_pvc(),
793 pvc_object.metadata.name
794 )
795 if _existing is not None:
796 _size_r = pvc_object.spec.resources.requests["storage"]
797 _size_e = _existing.spec.resources.requests["storage"]
798 logger_cli.warn(
799 "WARNING: Found PVC '{}/{}' with {}. Requested: {}'".format(
800 pvc_object.metadata.namespace,
801 pvc_object.metadata.name,
802 _size_e,
803 _size_r
804 )
805 )
806 if _size_r != _size_e:
807 raise CheckerException(
808 "ERROR: PVC exists on the cloud with different size "
809 "than needed. Please cleanup!"
810 )
811 else:
812 logger_cli.debug(
813 "... creating pvc '{}'".format(pvc_object.metadata.name)
814 )
815 self.CoreV1.create_namespaced_persistent_volume_claim(
816 pvc_object.metadata.namespace,
817 pvc_object
818 )
819
820 _timeout = 60
821 while _timeout > 0:
822 _t = _list_pvc()
823 for item in _t.items:
824 if item.status.phase in ["Available", "Bound"]:
825 return item
826 else:
827 logger_cli.debug(
828 "... pvcstatus: '{}', {} sec left".format(
829 item.status.phase,
830 _timeout
831 )
832 )
833 sleep(5)
834 _timeout -= 5
835 raise CheckerException(
836 "Timed out creating PVC '{}'".format(
837 pvc_object.metadata.name
838 )
839 )
840
841 def prepare_pod_from_yaml(self, pod_yaml):
842 def _list_pod():
843 return self.CoreV1.list_namespaced_pod(
844 pod_yaml['metadata']['namespace'],
845 label_selector='name={}'.format(pod_yaml['metadata']['name'])
846 )
847 _existing = self.safe_get_item_by_name(
848 _list_pod(),
849 pod_yaml['metadata']['name']
850 )
851 if _existing is not None:
852 logger_cli.warn(
853 "WARNING: Found pod '{}/{}'. Reusing.".format(
854 pod_yaml['metadata']['namespace'],
855 pod_yaml['metadata']['name']
856 )
857 )
858 return _existing
859 else:
860 self.CoreV1.create_namespaced_pod(
861 pod_yaml['metadata']['namespace'],
862 pod_yaml
863 )
864 _timeout = 120
865 logger_cli.debug("... waiting '{}'s for pod to start".format(_timeout))
866 while _timeout > 0:
867 _t = _list_pod()
868 for item in _t.items:
869 logger_cli.debug("... pod is '{}'".format(item.status.phase))
870 if item.status.phase in ["Running"]:
871 return item
872 sleep(2)
873 _timeout -= 2
874 raise CheckerException(
875 "Timed out creating pod '{}'".format(
876 pod_yaml['metadata']['name']
877 )
878 )
879
880 def expose_pod_port(self, pod_object, port, ns="qa-space"):
881 def _list_svc():
882 return self.CoreV1.list_namespaced_service(
883 pod_object.metadata.namespace,
884 label_selector='name={}'.format(pod_object.metadata.name)
885 )
886 _existing = self.safe_get_item_by_name(
887 _list_svc(),
888 pod_object.metadata.name
889 )
890 if _existing is not None:
891 # TODO: Check port number?
892 logger_cli.warn(
893 "WARNING: Pod already exposed '{}/{}:{}'. Reusing.".format(
894 pod_object.metadata.namespace,
895 pod_object.metadata.name,
896 port
897 )
898 )
899 return _existing
900 else:
901 logger_cli.debug(
902 "... creating service for pod {}/{}: {}:{}".format(
903 pod_object.metadata.namespace,
904 pod_object.metadata.name,
905 pod_object.status.pod_ip,
906 port
907 )
908 )
909 _svc = self.init_service(
910 pod_object.metadata.name,
911 port
912 )
913 return self.CoreV1.create_namespaced_service(
914 pod_object.metadata.namespace,
915 _svc
916 )