blob: 72c8b06d779c0eeafa34f2f84736c67c6745b2e9 [file] [log] [blame]
Ievgeniia Zadorozhnaa080ec02023-12-05 02:08:52 +01001import logging
2
3from kubernetes import client as kclient
4from kubernetes import config as kconfig
5
6from utils import exceptions
7
8logger = logging.getLogger(__name__)
9
10
11class K8SClientManager(object):
12 def __init__(self, k8s_config_path=None):
13 self.k8s_config_path = k8s_config_path
14 self._k8s_v1 = None
15
16 def get_k8s_v1_client(self):
17 if not self.k8s_config_path:
18 raise exceptions.InvalidConfigException(
19 "Please provide the Kubernetes config file path at "
20 "'mos_kubeconfig_path' config option."
21 )
22 kconfig.load_kube_config(config_file=self.k8s_config_path)
23 return kclient.CoreV1Api()
24
25 @property
26 def k8s_v1(self):
27 if self._k8s_v1 is None:
28 self._k8s_v1 = self.get_k8s_v1_client()
29 return self._k8s_v1
30
31
32class K8SCliActions(object):
33 INTERNAL_IP_TYPE = "InternalIP"
34
35 def __init__(self, k8s_v1_client):
36 self.k8s_v1_client = k8s_v1_client
37
38 def list_nodes_names(self, watch=False, label_selector=None):
39 nodes = self.k8s_v1_client.list_node(
40 watch=watch, label_selector=label_selector)
41 return [node.metadata.name for node in nodes.items]
42
43 def get_nodes_info_by_names(self, list_of_node_names):
44 # Get the nodes' info (just names and Internal IPs) for
45 # the specific list of the K8S nodes names
46 result_nodes_info = []
47 for node in list_of_node_names:
48 node_info = self.k8s_v1_client.read_node(name=node)
49 result_nodes_info.append({
50 "name": node_info.metadata.name,
51 "address": [
52 address.address
53 for address in node_info.status.addresses
54 if address.type == self.INTERNAL_IP_TYPE
55 ][0],
56 })
57 return result_nodes_info