blob: 86e59a57890ef41759fba4b34bddba1d575fff77 [file] [log] [blame]
Alex9a4ad212020-10-01 18:04:25 -05001"""
2Module to handle interaction with Kube
3"""
4import base64
5import os
6import urllib3
7import yaml
8
9from kubernetes import client as kclient, config as kconfig
10from kubernetes.stream import stream
11
12from cfg_checker.common import logger, logger_cli
13from cfg_checker.common.exception import InvalidReturnException, KubeException
14from cfg_checker.common.file_utils import create_temp_file_with_content
15from cfg_checker.common.other import utils, shell
16from cfg_checker.common.ssh_utils import ssh_shell_p
Alex359e5752021-08-16 17:28:30 -050017from cfg_checker.common.const import ENV_LOCAL
Alex9a4ad212020-10-01 18:04:25 -050018
19urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
20
21
22def _init_kube_conf_local(config):
23 # Init kube library locally
Alex359e5752021-08-16 17:28:30 -050024 _path = "local:{}".format(config.kube_config_path)
Alex9a4ad212020-10-01 18:04:25 -050025 try:
Alexc4f59622021-08-27 13:42:00 -050026 kconfig.load_kube_config(config_file=config.kube_config_path)
Alex33747812021-04-07 10:11:39 -050027 if config.insecure:
28 kconfig.assert_hostname = False
29 kconfig.client_side_validation = False
Alex9a4ad212020-10-01 18:04:25 -050030 logger_cli.debug(
Alexc4f59622021-08-27 13:42:00 -050031 "... found Kube env: core, {}". format(
Alex9a4ad212020-10-01 18:04:25 -050032 ",".join(
33 kclient.CoreApi().get_api_versions().versions
34 )
35 )
36 )
Alexc4f59622021-08-27 13:42:00 -050037 return kconfig, kclient.ApiClient(), _path
Alex9a4ad212020-10-01 18:04:25 -050038 except Exception as e:
39 logger.warn("Failed to init local Kube client: {}".format(
40 str(e)
41 )
42 )
Alex359e5752021-08-16 17:28:30 -050043 return None, None, _path
Alex9a4ad212020-10-01 18:04:25 -050044
45
46def _init_kube_conf_remote(config):
47 # init remote client
48 # Preload Kube token
49 """
50 APISERVER=$(kubectl config view --minify |
51 grep server | cut -f 2- -d ":" | tr -d " ")
52 SECRET_NAME=$(kubectl get secrets |
53 grep ^default | cut -f1 -d ' ')
54 TOKEN=$(kubectl describe secret $SECRET_NAME |
55 grep -E '^token' | cut -f2 -d':' | tr -d " ")
56
57 echo "Detected API Server at: '${APISERVER}'"
58 echo "Got secret: '${SECRET_NAME}'"
59 echo "Loaded token: '${TOKEN}'"
60
61 curl $APISERVER/api
62 --header "Authorization: Bearer $TOKEN" --insecure
63 """
64 import yaml
Alex359e5752021-08-16 17:28:30 -050065 _path = ''
Alexc4f59622021-08-27 13:42:00 -050066 # Try to load remote config only if it was not detected already
67 if not config.kube_config_detected and not config.env_name == ENV_LOCAL:
Alex359e5752021-08-16 17:28:30 -050068 _path = "{}@{}:{}".format(
69 config.ssh_user,
70 config.ssh_host,
71 config.kube_config_path
72 )
Alex9d913532021-03-24 18:01:45 -050073 _c_data = ssh_shell_p(
Alexc4f59622021-08-27 13:42:00 -050074 "cat " + config.kube_config_path,
Alex9d913532021-03-24 18:01:45 -050075 config.ssh_host,
76 username=config.ssh_user,
77 keypath=config.ssh_key,
78 piped=False,
79 use_sudo=config.ssh_uses_sudo,
80 )
81 else:
Alex359e5752021-08-16 17:28:30 -050082 _path = "local:{}".format(config.kube_config_path)
Alex9d913532021-03-24 18:01:45 -050083 with open(config.kube_config_path, 'r') as ff:
84 _c_data = ff.read()
Alex9a4ad212020-10-01 18:04:25 -050085
Alex359e5752021-08-16 17:28:30 -050086 if len(_c_data) < 1:
87 return None, None, _path
88
Alex9a4ad212020-10-01 18:04:25 -050089 _conf = yaml.load(_c_data, Loader=yaml.SafeLoader)
90
91 _kube_conf = kclient.Configuration()
92 # A remote host configuration
93
94 # To work with remote cluster, we need to extract these
95 # keys = ['host', 'ssl_ca_cert', 'cert_file', 'key_file', 'verify_ssl']
96 # When v12 of the client will be release, we will use load_from_dict
97
98 _kube_conf.ssl_ca_cert = create_temp_file_with_content(
99 base64.standard_b64decode(
100 _conf['clusters'][0]['cluster']['certificate-authority-data']
101 )
102 )
103 _host = _conf['clusters'][0]['cluster']['server']
104 _kube_conf.cert_file = create_temp_file_with_content(
105 base64.standard_b64decode(
106 _conf['users'][0]['user']['client-certificate-data']
107 )
108 )
109 _kube_conf.key_file = create_temp_file_with_content(
110 base64.standard_b64decode(
111 _conf['users'][0]['user']['client-key-data']
112 )
113 )
114 if "http" not in _host or "443" not in _host:
115 logger_cli.error(
116 "Failed to extract Kube host: '{}'".format(_host)
117 )
118 else:
119 logger_cli.debug(
Alexc4f59622021-08-27 13:42:00 -0500120 "... 'context' host extracted: '{}' via SSH@{}".format(
Alex9a4ad212020-10-01 18:04:25 -0500121 _host,
122 config.ssh_host
123 )
124 )
125
126 # Substitute context host to ours
127 _tmp = _host.split(':')
128 _kube_conf.host = \
129 _tmp[0] + "://" + config.mcp_host + ":" + _tmp[2]
130 config.kube_port = _tmp[2]
131 logger_cli.debug(
Alexc4f59622021-08-27 13:42:00 -0500132 "... kube remote host updated to {}".format(
Alex9a4ad212020-10-01 18:04:25 -0500133 _kube_conf.host
134 )
135 )
136 _kube_conf.verify_ssl = False
137 _kube_conf.debug = config.debug
Alex33747812021-04-07 10:11:39 -0500138 if config.insecure:
139 _kube_conf.assert_hostname = False
140 _kube_conf.client_side_validation = False
141
Alex9a4ad212020-10-01 18:04:25 -0500142 # Nevertheless if you want to do it
143 # you can with these 2 parameters
144 # configuration.verify_ssl=True
145 # ssl_ca_cert is the filepath
146 # to the file that contains the certificate.
147 # configuration.ssl_ca_cert="certificate"
148
149 # _kube_conf.api_key = {
150 # "authorization": "Bearer " + config.kube_token
151 # }
152
153 # Create a ApiClient with our config
154 _kube_api = kclient.ApiClient(_kube_conf)
155
Alex359e5752021-08-16 17:28:30 -0500156 return _kube_conf, _kube_api, _path
Alex9a4ad212020-10-01 18:04:25 -0500157
158
159class KubeApi(object):
160 def __init__(self, config):
161 self.config = config
Alex359e5752021-08-16 17:28:30 -0500162 self.initialized = self._init_kclient()
Alex9a4ad212020-10-01 18:04:25 -0500163 self.last_response = None
164
165 def _init_kclient(self):
166 # if there is no password - try to get local, if this available
Alex359e5752021-08-16 17:28:30 -0500167 logger_cli.debug("... init kube config")
Alex9a4ad212020-10-01 18:04:25 -0500168 if self.config.env_name == "local":
Alex359e5752021-08-16 17:28:30 -0500169 self.kConf, self.kApi, self.kConfigPath = _init_kube_conf_local(
170 self.config
171 )
Alex9a4ad212020-10-01 18:04:25 -0500172 self.is_local = True
Alexc4f59622021-08-27 13:42:00 -0500173 # Try to load local config data
174 if self.config.kube_config_path and \
175 os.path.exists(self.config.kube_config_path):
176 _cmd = "cat " + self.config.kube_config_path
177 _c_data = shell(_cmd)
Alex9a4ad212020-10-01 18:04:25 -0500178 _conf = yaml.load(_c_data, Loader=yaml.SafeLoader)
179 self.user_keypath = create_temp_file_with_content(
180 base64.standard_b64decode(
181 _conf['users'][0]['user']['client-key-data']
182 )
183 )
184 self.yaml_conf = _c_data
185 else:
Alex359e5752021-08-16 17:28:30 -0500186 self.kConf, self.kApi, self.kConfigPath = _init_kube_conf_remote(
187 self.config
188 )
Alex9a4ad212020-10-01 18:04:25 -0500189 self.is_local = False
190
Alex359e5752021-08-16 17:28:30 -0500191 if self.kConf is None or self.kApi is None:
192 return False
193 else:
194 return True
195
Alex9a4ad212020-10-01 18:04:25 -0500196 def get_versions_api(self):
197 # client.CoreApi().get_api_versions().versions
198 return kclient.VersionApi(self.kApi)
199
200
201class KubeRemote(KubeApi):
202 def __init__(self, config):
203 super(KubeRemote, self).__init__(config)
204 self._coreV1 = None
Alex1f90e7b2021-09-03 15:31:28 -0500205 self._appsV1 = None
206 self._podV1 = None
Alex9a4ad212020-10-01 18:04:25 -0500207
208 @property
209 def CoreV1(self):
210 if not self._coreV1:
211 self._coreV1 = kclient.CoreV1Api(self.kApi)
212 return self._coreV1
213
Alex1f90e7b2021-09-03 15:31:28 -0500214 @property
215 def AppsV1(self):
216 if not self._appsV1:
217 self._appsV1 = kclient.AppsV1Api(self.kApi)
218 return self._appsV1
219
220 @property
221 def PodsV1(self):
222 if not self._podsV1:
223 self._podsV1 = kclient.V1Pod(self.kApi)
224 return self._podsV1
225
Alex9a4ad212020-10-01 18:04:25 -0500226 @staticmethod
227 def _typed_list_to_dict(i_list):
228 _dict = {}
229 for _item in i_list:
230 _d = _item.to_dict()
231 _type = _d.pop("type")
232 _dict[_type.lower()] = _d
233
234 return _dict
235
236 @staticmethod
237 def _get_listed_attrs(items, _path):
238 _list = []
239 for _n in items:
240 _list.append(utils.rgetattr(_n, _path))
241
242 return _list
243
Alex1f90e7b2021-09-03 15:31:28 -0500244 @staticmethod
245 def safe_get_item_by_name(api_resource, _name):
246 for item in api_resource.items:
247 if item.metadata.name == _name:
248 return item
249
250 return None
251
Alex9a4ad212020-10-01 18:04:25 -0500252 def get_node_info(self, http=False):
253 # Query API for the nodes and do some presorting
254 _nodes = {}
255 if http:
256 _raw_nodes = self.CoreV1.list_node_with_http_info()
257 else:
258 _raw_nodes = self.CoreV1.list_node()
259
260 if not isinstance(_raw_nodes, kclient.models.v1_node_list.V1NodeList):
261 raise InvalidReturnException(
262 "Invalid return type: '{}'".format(type(_raw_nodes))
263 )
264
265 for _n in _raw_nodes.items:
266 _name = _n.metadata.name
267 _d = _n.to_dict()
268 # parse inner data classes as dicts
269 _d['addresses'] = self._typed_list_to_dict(_n.status.addresses)
270 _d['conditions'] = self._typed_list_to_dict(_n.status.conditions)
271 # Update 'status' type
272 if isinstance(_d['conditions']['ready']['status'], str):
273 _d['conditions']['ready']['status'] = utils.to_bool(
274 _d['conditions']['ready']['status']
275 )
276 # Parse image names?
277 # TODO: Here is the place where we can parse each node image names
278
279 # Parse roles
280 _d['labels'] = {}
281 for _label, _data in _d["metadata"]["labels"].items():
282 if _data.lower() in ["true", "false"]:
283 _d['labels'][_label] = utils.to_bool(_data)
284 else:
285 _d['labels'][_label] = _data
286
287 # Save
288 _nodes[_name] = _d
289
290 # debug report on how many nodes detected
291 logger_cli.debug("...node items returned '{}'".format(len(_nodes)))
292
293 return _nodes
294
295 def exec_on_target_pod(
296 self,
297 cmd,
298 pod_name,
299 namespace,
300 strict=False,
301 _request_timeout=120,
302 **kwargs
303 ):
Alex9a4ad212020-10-01 18:04:25 -0500304 if not strict:
Alex1f90e7b2021-09-03 15:31:28 -0500305 logger_cli.debug(
306 "... searching for pods with the name '{}'".format(pod_name)
307 )
308 _pods = {}
309 _pods = self._coreV1.list_namespaced_pod(namespace)
310 _names = self._get_listed_attrs(_pods.items, "metadata.name")
311 _pname = ""
Alex33747812021-04-07 10:11:39 -0500312 _pnames = [n for n in _names if n.startswith(pod_name)]
313 if len(_pnames) > 1:
Alex9a4ad212020-10-01 18:04:25 -0500314 logger_cli.debug(
Alexc4f59622021-08-27 13:42:00 -0500315 "... more than one pod found for '{}': {}\n"
316 "... using first one".format(
Alex9a4ad212020-10-01 18:04:25 -0500317 pod_name,
Alex33747812021-04-07 10:11:39 -0500318 ", ".join(_pnames)
Alex9a4ad212020-10-01 18:04:25 -0500319 )
320 )
Alex33747812021-04-07 10:11:39 -0500321 _pname = _pnames[0]
Alex9a4ad212020-10-01 18:04:25 -0500322 elif len(_pname) < 1:
323 raise KubeException("No pods found for '{}'".format(pod_name))
324 else:
325 _pname = pod_name
Alex33747812021-04-07 10:11:39 -0500326 logger_cli.debug(
Alexc4f59622021-08-27 13:42:00 -0500327 "... cmd: [CoreV1] exec {} -n {} -- {}".format(
Alex33747812021-04-07 10:11:39 -0500328 _pname,
329 namespace,
330 cmd
331 )
332 )
Alex1f90e7b2021-09-03 15:31:28 -0500333 # Set preload_content to False to preserve JSON
334 # If not, output gets converted to str
335 # Which causes to change " to '
336 # After that json.loads(...) fail
337 _pod_stream = stream(
Alex9a4ad212020-10-01 18:04:25 -0500338 self.CoreV1.connect_get_namespaced_pod_exec,
339 _pname,
340 namespace,
341 command=cmd.split(),
342 stderr=True,
343 stdin=False,
344 stdout=True,
345 tty=False,
346 _request_timeout=_request_timeout,
Alex1f90e7b2021-09-03 15:31:28 -0500347 _preload_content=False,
Alex9a4ad212020-10-01 18:04:25 -0500348 **kwargs
349 )
Alex1f90e7b2021-09-03 15:31:28 -0500350 # run for timeout
351 _pod_stream.run_forever(timeout=_request_timeout)
352 # read the output
353 return _pod_stream.read_stdout()
Alex9a4ad212020-10-01 18:04:25 -0500354
Alex1f90e7b2021-09-03 15:31:28 -0500355 def ensure_namespace(self, ns):
356 """
357 Ensure that given namespace exists
358 """
359 # list active namespaces
360 _v1NamespaceList = self.CoreV1.list_namespace()
361 _ns = self.safe_get_item_by_name(_v1NamespaceList, ns)
362
363 if _ns is None:
364 logger_cli.debug("... creating namespace '{}'".format(ns))
365 _r = self.CoreV1.create_namespace(ns)
366 # TODO: check return on fail
367 if not _r:
368 return False
369 else:
370 logger_cli.debug("... found existing namespace '{}'".format(ns))
371
372 return True
373
374 def get_daemon_set_by_name(self, ns, name):
375 return self.safe_get_item_by_name(
376 self.AppsV1.list_namespaced_daemon_set(ns),
377 name
378 )
379
380 def create_config_map(self, ns, name, source, recreate=True):
381 """
382 Creates/Overwrites ConfigMap in working namespace
383 """
384 # Prepare source
385 logger_cli.debug(
386 "... preparing config map '{}/{}' with files from '{}'".format(
387 ns,
388 name,
389 source
390 )
391 )
392 _data = {}
393 if os.path.isfile(source):
394 # populate data with one file
395 with open(source, 'rt') as fS:
396 _data[os.path.split(source)[1]] = fS.read()
397 elif os.path.isdir(source):
398 # walk dirs and populate all 'py' files
399 for path, dirs, files in os.walk(source):
400 _e = ('.py')
401 _subfiles = (_fl for _fl in files
402 if _fl.endswith(_e) and not _fl.startswith('.'))
403 for _file in _subfiles:
404 with open(os.path.join(path, _file), 'rt') as fS:
405 _data[_file] = fS.read()
406
407 _cm = kclient.V1ConfigMap()
408 _cm.metadata = kclient.V1ObjectMeta(name=name, namespace=ns)
409 _cm.data = _data
410 logger_cli.debug(
411 "... prepared config map with {} scripts".format(len(_data))
412 )
413 # Query existing configmap, delete if needed
414 _existing_cm = self.safe_get_item_by_name(
415 self.CoreV1.list_namespaced_config_map(namespace=ns),
416 name
417 )
418 if _existing_cm is not None:
419 self.CoreV1.replace_namespaced_config_map(
420 namespace=ns,
421 name=name,
422 body=_cm
423 )
424 logger_cli.debug(
425 "... replaced existing config map '{}/{}'".format(
426 ns,
427 name
428 )
429 )
430 else:
431 # Create it
432 self.CoreV1.create_namespaced_config_map(
433 namespace=ns,
434 body=_cm
435 )
436 logger_cli.debug("... created config map '{}/{}'".format(
437 ns,
438 name
439 ))
440
441 return _data.keys()
442
443 def prepare_daemonset_from_yaml(self, ns, ds_yaml):
444 _name = ds_yaml['metadata']['name']
445 _ds = self.get_daemon_set_by_name(ns, _name)
446
447 if _ds is not None:
448 logger_cli.debug(
449 "... found existing daemonset '{}'".format(_name)
450 )
451 _r = self.AppsV1.replace_namespaced_daemon_set(
452 _ds.metadata.name,
453 _ds.metadata.namespace,
454 body=ds_yaml
455 )
456 logger_cli.debug(
457 "... replacing existing daemonset '{}'".format(_name)
458 )
459 return _r
460 else:
461 logger_cli.debug(
462 "... creating daemonset '{}'".format(_name)
463 )
464 _r = self.AppsV1.create_namespaced_daemon_set(ns, body=ds_yaml)
465 return _r
466
467 def delete_daemon_set_by_name(self, ns, name):
468 return self.AppsV1.delete_namespaced_daemon_set(name, ns)
469
470 def exec_on_all_pods(self, pods):
471 """
472 Create multiple threads to execute script on all target pods
473 """
474 # Create map for threads: [[node_name, ns, pod_name]...]
475 _pod_list = []
476 for item in pods.items:
477 _pod_list.append(
478 [
479 item.spec.nodeName,
480 item.metadata.namespace,
481 item.metadata.name
482 ]
483 )
484
485 # map func and cmd
486
487 # create result list
488
489 return []