|  | import logging | 
|  |  | 
|  | from kubernetes import client as kclient | 
|  | from kubernetes import config as kconfig | 
|  |  | 
|  | from utils import exceptions | 
|  |  | 
|  | logger = logging.getLogger(__name__) | 
|  |  | 
|  |  | 
|  | class K8SClientManager(object): | 
|  | def __init__(self, k8s_config_path=None): | 
|  | self.k8s_config_path = k8s_config_path | 
|  | self._k8s_v1 = None | 
|  |  | 
|  | def get_k8s_v1_client(self): | 
|  | if not self.k8s_config_path: | 
|  | raise exceptions.InvalidConfigException( | 
|  | "Please provide the Kubernetes config file path at " | 
|  | "'mos_kubeconfig_path' config option." | 
|  | ) | 
|  | kconfig.load_kube_config(config_file=self.k8s_config_path) | 
|  | return kclient.CoreV1Api() | 
|  |  | 
|  | @property | 
|  | def k8s_v1(self): | 
|  | if self._k8s_v1 is None: | 
|  | self._k8s_v1 = self.get_k8s_v1_client() | 
|  | return self._k8s_v1 | 
|  |  | 
|  |  | 
|  | class K8SCliActions(object): | 
|  | INTERNAL_IP_TYPE = "InternalIP" | 
|  |  | 
|  | def __init__(self, k8s_v1_client): | 
|  | self.k8s_v1_client = k8s_v1_client | 
|  |  | 
|  | def list_nodes_names(self, watch=False, label_selector=None): | 
|  | nodes = self.k8s_v1_client.list_node( | 
|  | watch=watch, label_selector=label_selector) | 
|  | return [node.metadata.name for node in nodes.items] | 
|  |  | 
|  | def get_nodes_info_by_names(self, list_of_node_names): | 
|  | # Get the nodes' info (just names and Internal IPs) for | 
|  | # the specific list of the K8S nodes names | 
|  | result_nodes_info = [] | 
|  | for node in list_of_node_names: | 
|  | node_info = self.k8s_v1_client.read_node(name=node) | 
|  | result_nodes_info.append({ | 
|  | "name": node_info.metadata.name, | 
|  | "address": [ | 
|  | address.address | 
|  | for address in node_info.status.addresses | 
|  | if address.type == self.INTERNAL_IP_TYPE | 
|  | ][0], | 
|  | }) | 
|  | return result_nodes_info |