blob: 72c8b06d779c0eeafa34f2f84736c67c6745b2e9 [file] [log] [blame]
import logging
from kubernetes import client as kclient
from kubernetes import config as kconfig
from utils import exceptions
logger = logging.getLogger(__name__)
class K8SClientManager(object):
def __init__(self, k8s_config_path=None):
self.k8s_config_path = k8s_config_path
self._k8s_v1 = None
def get_k8s_v1_client(self):
if not self.k8s_config_path:
raise exceptions.InvalidConfigException(
"Please provide the Kubernetes config file path at "
"'mos_kubeconfig_path' config option."
)
kconfig.load_kube_config(config_file=self.k8s_config_path)
return kclient.CoreV1Api()
@property
def k8s_v1(self):
if self._k8s_v1 is None:
self._k8s_v1 = self.get_k8s_v1_client()
return self._k8s_v1
class K8SCliActions(object):
INTERNAL_IP_TYPE = "InternalIP"
def __init__(self, k8s_v1_client):
self.k8s_v1_client = k8s_v1_client
def list_nodes_names(self, watch=False, label_selector=None):
nodes = self.k8s_v1_client.list_node(
watch=watch, label_selector=label_selector)
return [node.metadata.name for node in nodes.items]
def get_nodes_info_by_names(self, list_of_node_names):
# Get the nodes' info (just names and Internal IPs) for
# the specific list of the K8S nodes names
result_nodes_info = []
for node in list_of_node_names:
node_info = self.k8s_v1_client.read_node(name=node)
result_nodes_info.append({
"name": node_info.metadata.name,
"address": [
address.address
for address in node_info.status.addresses
if address.type == self.INTERNAL_IP_TYPE
][0],
})
return result_nodes_info