| import logging |
| |
| from kubernetes import client as kclient |
| from kubernetes import config as kconfig |
| |
| from utils import exceptions |
| |
| logger = logging.getLogger(__name__) |
| |
| |
| class K8SClientManager(object): |
| def __init__(self, k8s_config_path=None): |
| self.k8s_config_path = k8s_config_path |
| self._k8s_v1 = None |
| |
| def get_k8s_v1_client(self): |
| if not self.k8s_config_path: |
| raise exceptions.InvalidConfigException( |
| "Please provide the Kubernetes config file path at " |
| "'mos_kubeconfig_path' config option." |
| ) |
| kconfig.load_kube_config(config_file=self.k8s_config_path) |
| return kclient.CoreV1Api() |
| |
| @property |
| def k8s_v1(self): |
| if self._k8s_v1 is None: |
| self._k8s_v1 = self.get_k8s_v1_client() |
| return self._k8s_v1 |
| |
| |
| class K8SCliActions(object): |
| INTERNAL_IP_TYPE = "InternalIP" |
| |
| def __init__(self, k8s_v1_client): |
| self.k8s_v1_client = k8s_v1_client |
| |
| def list_nodes_names(self, watch=False, label_selector=None): |
| nodes = self.k8s_v1_client.list_node( |
| watch=watch, label_selector=label_selector) |
| return [node.metadata.name for node in nodes.items] |
| |
| def get_nodes_info_by_names(self, list_of_node_names): |
| # Get the nodes' info (just names and Internal IPs) for |
| # the specific list of the K8S nodes names |
| result_nodes_info = [] |
| for node in list_of_node_names: |
| node_info = self.k8s_v1_client.read_node(name=node) |
| result_nodes_info.append({ |
| "name": node_info.metadata.name, |
| "address": [ |
| address.address |
| for address in node_info.status.addresses |
| if address.type == self.INTERNAL_IP_TYPE |
| ][0], |
| }) |
| return result_nodes_info |