| """ |
| Module to handle interaction with Kube |
| """ |
| import base64 |
| import os |
| import urllib3 |
| import yaml |
| |
| from kubernetes import client as kclient, config as kconfig |
| from kubernetes.stream import stream |
| from kubernetes.client.rest import ApiException |
| |
| from cfg_checker.common import logger, logger_cli |
| from cfg_checker.common.decorators import retry |
| from cfg_checker.common.exception import InvalidReturnException, KubeException |
| from cfg_checker.common.file_utils import create_temp_file_with_content |
| from cfg_checker.common.other import utils, shell |
| from cfg_checker.common.ssh_utils import ssh_shell_p |
| from cfg_checker.common.const import ENV_LOCAL |
| |
| |
| urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) |
| |
| |
| def _init_kube_conf_local(config): |
| # Init kube library locally |
| _path = "local:{}".format(config.kube_config_path) |
| try: |
| kconfig.load_kube_config(config_file=config.kube_config_path) |
| if config.insecure: |
| kconfig.assert_hostname = False |
| kconfig.client_side_validation = False |
| logger_cli.debug( |
| "... found Kube env: core, {}". format( |
| ",".join( |
| kclient.CoreApi().get_api_versions().versions |
| ) |
| ) |
| ) |
| return kconfig, kclient.ApiClient(), _path |
| except Exception as e: |
| logger.warn("Failed to init local Kube client: {}".format( |
| str(e) |
| ) |
| ) |
| return None, None, _path |
| |
| |
| def _init_kube_conf_remote(config): |
| # init remote client |
| # Preload Kube token |
| """ |
| APISERVER=$(kubectl config view --minify | |
| grep server | cut -f 2- -d ":" | tr -d " ") |
| SECRET_NAME=$(kubectl get secrets | |
| grep ^default | cut -f1 -d ' ') |
| TOKEN=$(kubectl describe secret $SECRET_NAME | |
| grep -E '^token' | cut -f2 -d':' | tr -d " ") |
| |
| echo "Detected API Server at: '${APISERVER}'" |
| echo "Got secret: '${SECRET_NAME}'" |
| echo "Loaded token: '${TOKEN}'" |
| |
| curl $APISERVER/api |
| --header "Authorization: Bearer $TOKEN" --insecure |
| """ |
| import yaml |
| _path = '' |
| # Try to load remote config only if it was not detected already |
| if not config.kube_config_detected and not config.env_name == ENV_LOCAL: |
| _path = "{}@{}:{}".format( |
| config.ssh_user, |
| config.ssh_host, |
| config.kube_config_path |
| ) |
| _c_data = ssh_shell_p( |
| "cat " + config.kube_config_path, |
| config.ssh_host, |
| username=config.ssh_user, |
| keypath=config.ssh_key, |
| piped=False, |
| use_sudo=config.ssh_uses_sudo, |
| ) |
| else: |
| _path = "local:{}".format(config.kube_config_path) |
| with open(config.kube_config_path, 'r') as ff: |
| _c_data = ff.read() |
| |
| if len(_c_data) < 1: |
| return None, None, _path |
| |
| _conf = yaml.load(_c_data, Loader=yaml.SafeLoader) |
| |
| _kube_conf = kclient.Configuration() |
| # A remote host configuration |
| |
| # To work with remote cluster, we need to extract these |
| # keys = ['host', 'ssl_ca_cert', 'cert_file', 'key_file', 'verify_ssl'] |
| # When v12 of the client will be release, we will use load_from_dict |
| |
| _kube_conf.ssl_ca_cert = create_temp_file_with_content( |
| base64.standard_b64decode( |
| _conf['clusters'][0]['cluster']['certificate-authority-data'] |
| ) |
| ) |
| _host = _conf['clusters'][0]['cluster']['server'] |
| _kube_conf.cert_file = create_temp_file_with_content( |
| base64.standard_b64decode( |
| _conf['users'][0]['user']['client-certificate-data'] |
| ) |
| ) |
| _kube_conf.key_file = create_temp_file_with_content( |
| base64.standard_b64decode( |
| _conf['users'][0]['user']['client-key-data'] |
| ) |
| ) |
| if "http" not in _host or "443" not in _host: |
| logger_cli.error( |
| "Failed to extract Kube host: '{}'".format(_host) |
| ) |
| else: |
| logger_cli.debug( |
| "... 'context' host extracted: '{}' via SSH@{}".format( |
| _host, |
| config.ssh_host |
| ) |
| ) |
| |
| # Substitute context host to ours |
| _tmp = _host.split(':') |
| _kube_conf.host = \ |
| _tmp[0] + "://" + config.mcp_host + ":" + _tmp[2] |
| config.kube_port = _tmp[2] |
| logger_cli.debug( |
| "... kube remote host updated to {}".format( |
| _kube_conf.host |
| ) |
| ) |
| _kube_conf.verify_ssl = False |
| _kube_conf.debug = config.debug |
| if config.insecure: |
| _kube_conf.assert_hostname = False |
| _kube_conf.client_side_validation = False |
| |
| # Nevertheless if you want to do it |
| # you can with these 2 parameters |
| # configuration.verify_ssl=True |
| # ssl_ca_cert is the filepath |
| # to the file that contains the certificate. |
| # configuration.ssl_ca_cert="certificate" |
| |
| # _kube_conf.api_key = { |
| # "authorization": "Bearer " + config.kube_token |
| # } |
| |
| # Create a ApiClient with our config |
| _kube_api = kclient.ApiClient(_kube_conf) |
| |
| return _kube_conf, _kube_api, _path |
| |
| |
| class KubeApi(object): |
| def __init__(self, config): |
| self.config = config |
| self.initialized = self._init_kclient() |
| self.last_response = None |
| |
| def _init_kclient(self): |
| # if there is no password - try to get local, if this available |
| logger_cli.debug("... init kube config") |
| if self.config.env_name == "local": |
| self.kConf, self.kApi, self.kConfigPath = _init_kube_conf_local( |
| self.config |
| ) |
| self.is_local = True |
| # Try to load local config data |
| if self.config.kube_config_path and \ |
| os.path.exists(self.config.kube_config_path): |
| _cmd = "cat " + self.config.kube_config_path |
| _c_data = shell(_cmd) |
| _conf = yaml.load(_c_data, Loader=yaml.SafeLoader) |
| self.user_keypath = create_temp_file_with_content( |
| base64.standard_b64decode( |
| _conf['users'][0]['user']['client-key-data'] |
| ) |
| ) |
| self.yaml_conf = _c_data |
| else: |
| self.kConf, self.kApi, self.kConfigPath = _init_kube_conf_remote( |
| self.config |
| ) |
| self.is_local = False |
| |
| if self.kConf is None or self.kApi is None: |
| return False |
| else: |
| return True |
| |
| def get_versions_api(self): |
| # client.CoreApi().get_api_versions().versions |
| return kclient.VersionApi(self.kApi) |
| |
| |
| class KubeRemote(KubeApi): |
| def __init__(self, config): |
| super(KubeRemote, self).__init__(config) |
| self._coreV1 = None |
| self._appsV1 = None |
| self._podV1 = None |
| self._custom = None |
| |
| @property |
| def CustomObjects(self): |
| if not self._custom: |
| self._custom = kclient.CustomObjectsApi(self.kApi) |
| return self._custom |
| |
| @property |
| def CoreV1(self): |
| if not self._coreV1: |
| if self.is_local: |
| self._coreV1 = kclient.CoreV1Api(kclient.ApiClient()) |
| else: |
| self._coreV1 = kclient.CoreV1Api(kclient.ApiClient(self.kConf)) |
| return self._coreV1 |
| |
| @property |
| def AppsV1(self): |
| if not self._appsV1: |
| self._appsV1 = kclient.AppsV1Api(self.kApi) |
| return self._appsV1 |
| |
| @property |
| def PodsV1(self): |
| if not self._podsV1: |
| self._podsV1 = kclient.V1Pod(self.kApi) |
| return self._podsV1 |
| |
| @staticmethod |
| def _typed_list_to_dict(i_list): |
| _dict = {} |
| for _item in i_list: |
| _d = _item.to_dict() |
| _type = _d.pop("type") |
| _dict[_type.lower()] = _d |
| |
| return _dict |
| |
| @staticmethod |
| def _get_listed_attrs(items, _path): |
| _list = [] |
| for _n in items: |
| _list.append(utils.rgetattr(_n, _path)) |
| |
| return _list |
| |
| @staticmethod |
| def safe_get_item_by_name(api_resource, _name): |
| for item in api_resource.items: |
| if item.metadata.name == _name: |
| return item |
| |
| return None |
| |
| def get_node_info(self, http=False): |
| # Query API for the nodes and do some presorting |
| _nodes = {} |
| if http: |
| _raw_nodes = self.CoreV1.list_node_with_http_info() |
| else: |
| _raw_nodes = self.CoreV1.list_node() |
| |
| if not isinstance(_raw_nodes, kclient.models.v1_node_list.V1NodeList): |
| raise InvalidReturnException( |
| "Invalid return type: '{}'".format(type(_raw_nodes)) |
| ) |
| |
| for _n in _raw_nodes.items: |
| _name = _n.metadata.name |
| _d = _n.to_dict() |
| # parse inner data classes as dicts |
| _d['addresses'] = self._typed_list_to_dict(_n.status.addresses) |
| _d['conditions'] = self._typed_list_to_dict(_n.status.conditions) |
| # Update 'status' type |
| if isinstance(_d['conditions']['ready']['status'], str): |
| _d['conditions']['ready']['status'] = utils.to_bool( |
| _d['conditions']['ready']['status'] |
| ) |
| # Parse image names? |
| # TODO: Here is the place where we can parse each node image names |
| |
| # Parse roles |
| _d['labels'] = {} |
| for _label, _data in _d["metadata"]["labels"].items(): |
| if _data.lower() in ["true", "false"]: |
| _d['labels'][_label] = utils.to_bool(_data) |
| else: |
| _d['labels'][_label] = _data |
| |
| # Save |
| _nodes[_name] = _d |
| |
| # debug report on how many nodes detected |
| logger_cli.debug("...node items returned '{}'".format(len(_nodes))) |
| |
| return _nodes |
| |
| def get_pod_names_by_partial_name(self, partial_name, ns): |
| logger_cli.debug('... searching for pods with {}'.format(partial_name)) |
| _pods = self.CoreV1.list_namespaced_pod(ns) |
| _names = self._get_listed_attrs(_pods.items, "metadata.name") |
| _pnames = [n for n in _names if partial_name in n] |
| if len(_pnames) > 1: |
| logger_cli.debug( |
| "... more than one pod found for '{}': {}\n".format( |
| partial_name, |
| ", ".join(_pnames) |
| ) |
| ) |
| elif len(_pnames) < 1: |
| logger_cli.warning( |
| "WARNING: No pods found for '{}'".format(partial_name) |
| ) |
| |
| return _pnames |
| |
| def get_pods_by_partial_name(self, partial_name, ns): |
| logger_cli.debug('... searching for pods with {}'.format(partial_name)) |
| _all_pods = self.CoreV1.list_namespaced_pod(ns) |
| # _names = self._get_listed_attrs(_pods.items, "metadata.name") |
| _pods = [_pod for _pod in _all_pods.items |
| if partial_name in _pod.metadata.name] |
| if len(_pods) > 1: |
| logger_cli.debug( |
| "... more than one pod found for '{}': {}\n".format( |
| partial_name, |
| ", ".join(partial_name) |
| ) |
| ) |
| elif len(_pods) < 1: |
| logger_cli.warning( |
| "WARNING: No pods found for '{}'".format(partial_name) |
| ) |
| |
| return _pods |
| |
| def exec_on_target_pod( |
| self, |
| cmd, |
| pod_name, |
| namespace, |
| strict=False, |
| _request_timeout=120, |
| **kwargs |
| ): |
| _pname = "" |
| if not strict: |
| logger_cli.debug( |
| "... searching for pods with the name '{}'".format(pod_name) |
| ) |
| _pods = {} |
| _pods = self.CoreV1.list_namespaced_pod(namespace) |
| _names = self._get_listed_attrs(_pods.items, "metadata.name") |
| _pnames = [n for n in _names if n.startswith(pod_name)] |
| if len(_pnames) > 1: |
| logger_cli.debug( |
| "... more than one pod found for '{}': {}\n" |
| "... using first one".format( |
| pod_name, |
| ", ".join(_pnames) |
| ) |
| ) |
| _pname = _pnames[0] |
| elif len(_pnames) < 1: |
| raise KubeException("No pods found for '{}'".format(pod_name)) |
| else: |
| _pname = pod_name |
| logger_cli.debug( |
| "... cmd: [CoreV1] exec {} -n {} -- {}".format( |
| _pname, |
| namespace, |
| cmd |
| ) |
| ) |
| # Set preload_content to False to preserve JSON |
| # If not, output gets converted to str |
| # Which causes to change " to ' |
| # After that json.loads(...) fail |
| cmd = cmd if isinstance(cmd, list) else cmd.split() |
| _pod_stream = stream( |
| self.CoreV1.connect_get_namespaced_pod_exec, |
| _pname, |
| namespace, |
| command=cmd, |
| stderr=True, |
| stdin=False, |
| stdout=True, |
| tty=False, |
| _request_timeout=_request_timeout, |
| _preload_content=False, |
| **kwargs |
| ) |
| # run for timeout |
| _pod_stream.run_forever(timeout=_request_timeout) |
| # read the output |
| _output = _pod_stream.read_stdout() |
| # Force recreate of api objects |
| self._coreV1 = None |
| # Send output |
| return _output |
| |
| def ensure_namespace(self, ns): |
| """ |
| Ensure that given namespace exists |
| """ |
| # list active namespaces |
| _v1NamespaceList = self.CoreV1.list_namespace() |
| _ns = self.safe_get_item_by_name(_v1NamespaceList, ns) |
| |
| if _ns is None: |
| logger_cli.debug("... creating namespace '{}'".format(ns)) |
| _new_ns = kclient.V1Namespace() |
| _new_ns.metadata = kclient.V1ObjectMeta(name=ns) |
| _r = self.CoreV1.create_namespace(_new_ns) |
| # TODO: check return on fail |
| if not _r: |
| return False |
| else: |
| logger_cli.debug("... found existing namespace '{}'".format(ns)) |
| |
| return True |
| |
| def get_daemon_set_by_name(self, ns, name): |
| return self.safe_get_item_by_name( |
| self.AppsV1.list_namespaced_daemon_set(ns), |
| name |
| ) |
| |
| def create_config_map(self, ns, name, source, recreate=True): |
| """ |
| Creates/Overwrites ConfigMap in working namespace |
| """ |
| # Prepare source |
| logger_cli.debug( |
| "... preparing config map '{}/{}' with files from '{}'".format( |
| ns, |
| name, |
| source |
| ) |
| ) |
| _data = {} |
| if os.path.isfile(source): |
| # populate data with one file |
| with open(source, 'rt') as fS: |
| _data[os.path.split(source)[1]] = fS.read() |
| elif os.path.isdir(source): |
| # walk dirs and populate all 'py' files |
| for path, dirs, files in os.walk(source): |
| _e = ('.py') |
| _subfiles = (_fl for _fl in files |
| if _fl.endswith(_e) and not _fl.startswith('.')) |
| for _file in _subfiles: |
| with open(os.path.join(path, _file), 'rt') as fS: |
| _data[_file] = fS.read() |
| |
| _cm = kclient.V1ConfigMap() |
| _cm.metadata = kclient.V1ObjectMeta(name=name, namespace=ns) |
| _cm.data = _data |
| logger_cli.debug( |
| "... prepared config map with {} scripts".format(len(_data)) |
| ) |
| # Query existing configmap, delete if needed |
| _existing_cm = self.safe_get_item_by_name( |
| self.CoreV1.list_namespaced_config_map(namespace=ns), |
| name |
| ) |
| if _existing_cm is not None: |
| self.CoreV1.replace_namespaced_config_map( |
| namespace=ns, |
| name=name, |
| body=_cm |
| ) |
| logger_cli.debug( |
| "... replaced existing config map '{}/{}'".format( |
| ns, |
| name |
| ) |
| ) |
| else: |
| # Create it |
| self.CoreV1.create_namespaced_config_map( |
| namespace=ns, |
| body=_cm |
| ) |
| logger_cli.debug("... created config map '{}/{}'".format( |
| ns, |
| name |
| )) |
| |
| return _data.keys() |
| |
| def prepare_daemonset_from_yaml(self, ns, ds_yaml): |
| _name = ds_yaml['metadata']['name'] |
| _ds = self.get_daemon_set_by_name(ns, _name) |
| |
| if _ds is not None: |
| logger_cli.debug( |
| "... found existing daemonset '{}'".format(_name) |
| ) |
| _r = self.AppsV1.replace_namespaced_daemon_set( |
| _ds.metadata.name, |
| _ds.metadata.namespace, |
| body=ds_yaml |
| ) |
| logger_cli.debug( |
| "... replacing existing daemonset '{}'".format(_name) |
| ) |
| return _r |
| else: |
| logger_cli.debug( |
| "... creating daemonset '{}'".format(_name) |
| ) |
| _r = self.AppsV1.create_namespaced_daemon_set(ns, body=ds_yaml) |
| return _r |
| |
| def delete_daemon_set_by_name(self, ns, name): |
| return self.AppsV1.delete_namespaced_daemon_set(name, ns) |
| |
| def exec_on_all_pods(self, pods): |
| """ |
| Create multiple threads to execute script on all target pods |
| """ |
| # Create map for threads: [[node_name, ns, pod_name]...] |
| _pod_list = [] |
| for item in pods.items: |
| _pod_list.append( |
| [ |
| item.spec.nodeName, |
| item.metadata.namespace, |
| item.metadata.name |
| ] |
| ) |
| |
| # map func and cmd |
| logger_cli.error("ERROR: 'exec_on_all_pods'is not implemented yet") |
| # create result list |
| |
| return [] |
| |
| @retry(ApiException) |
| def get_pods_for_daemonset(self, ds): |
| # get all pod names for daemonset |
| logger_cli.debug( |
| "... extracting pod names from daemonset '{}'".format( |
| ds.metadata.name |
| ) |
| ) |
| _ns = ds.metadata.namespace |
| _name = ds.metadata.name |
| _pods = self.CoreV1.list_namespaced_pod( |
| namespace=_ns, |
| label_selector='name={}'.format(_name) |
| ) |
| return _pods |
| |
| def put_string_buffer_to_pod_as_textfile( |
| self, |
| pod_name, |
| namespace, |
| buffer, |
| filepath, |
| _request_timeout=120, |
| **kwargs |
| ): |
| _command = ['/bin/sh'] |
| response = stream( |
| self.CoreV1.connect_get_namespaced_pod_exec, |
| pod_name, |
| namespace, |
| command=_command, |
| stderr=True, |
| stdin=True, |
| stdout=True, |
| tty=False, |
| _request_timeout=_request_timeout, |
| _preload_content=False, |
| **kwargs |
| ) |
| |
| # if json |
| # buffer = json.dumps(_dict, indent=2).encode('utf-8') |
| |
| commands = [ |
| bytes("cat <<'EOF' >" + filepath + "\n", 'utf-8'), |
| buffer, |
| bytes("\n" + "EOF\n", 'utf-8') |
| ] |
| |
| while response.is_open(): |
| response.update(timeout=1) |
| if response.peek_stdout(): |
| logger_cli.debug("... STDOUT: %s" % response.read_stdout()) |
| if response.peek_stderr(): |
| logger_cli.debug("... STDERR: %s" % response.read_stderr()) |
| if commands: |
| c = commands.pop(0) |
| logger_cli.debug("... running command... {}\n".format(c)) |
| response.write_stdin(str(c, encoding='utf-8')) |
| else: |
| break |
| response.close() |
| |
| # Force recreate of Api objects |
| self._coreV1 = None |
| |
| return |
| |
| def get_custom_resource(self, group, version, plural): |
| # Get it |
| # Example: |
| # kubernetes.client.CustomObjectsApi().list_cluster_custom_object( |
| # group="networking.istio.io", |
| # version="v1alpha3", |
| # plural="serviceentries" |
| # ) |
| return self.CustomObjects.list_cluster_custom_object( |
| group=group, |
| version=version, |
| plural=plural |
| ) |