|  | #    Copyright 2017 Mirantis, Inc. | 
|  | # | 
|  | #    Licensed under the Apache License, Version 2.0 (the "License"); you may | 
|  | #    not use this file except in compliance with the License. You may obtain | 
|  | #    a copy of the License at | 
|  | # | 
|  | #         http://www.apache.org/licenses/LICENSE-2.0 | 
|  | # | 
|  | #    Unless required by applicable law or agreed to in writing, software | 
|  | #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | 
|  | #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | 
|  | #    License for the specific language governing permissions and limitations | 
|  | #    under the License. | 
|  |  | 
|  | import time | 
|  | from uuid import uuid4 | 
|  |  | 
|  | import yaml | 
|  |  | 
|  | from devops.helpers import helpers | 
|  | from devops.error import DevopsCalledProcessError | 
|  |  | 
|  | from tcp_tests import logger | 
|  | from tcp_tests.helpers import ext | 
|  | from tcp_tests.helpers.utils import retry | 
|  | from tcp_tests.managers.execute_commands import ExecuteCommandsMixin | 
|  | from tcp_tests.managers.k8s import cluster | 
|  | from k8sclient.client.rest import ApiException | 
|  |  | 
|  | LOG = logger.logger | 
|  |  | 
|  |  | 
|  | class K8SManager(ExecuteCommandsMixin): | 
|  | """docstring for K8SManager""" | 
|  |  | 
|  | __config = None | 
|  | __underlay = None | 
|  |  | 
|  | def __init__(self, config, underlay, salt): | 
|  | self.__config = config | 
|  | self.__underlay = underlay | 
|  | self._salt = salt | 
|  | self._api_client = None | 
|  | super(K8SManager, self).__init__( | 
|  | config=config, underlay=underlay) | 
|  |  | 
|  | def install(self, commands): | 
|  | self.execute_commands(commands, | 
|  | label='Install Kubernetes services') | 
|  | self.__config.k8s.k8s_installed = True | 
|  | self.__config.k8s.kube_host = self.get_proxy_api() | 
|  |  | 
|  | def get_proxy_api(self): | 
|  | k8s_proxy_ip_pillars = self._salt.get_pillar( | 
|  | tgt='I@haproxy:proxy:enabled:true and I@kubernetes:master', | 
|  | pillar='haproxy:proxy:listen:k8s_secure:binds:address') | 
|  | k8s_hosts = self._salt.get_pillar( | 
|  | tgt='I@haproxy:proxy:enabled:true and I@kubernetes:master', | 
|  | pillar='kubernetes:pool:apiserver:host') | 
|  | k8s_proxy_ip = set([ip | 
|  | for item in k8s_proxy_ip_pillars | 
|  | for node, ip in item.items() if ip]) | 
|  | k8s_hosts = set([ip | 
|  | for item in k8s_hosts | 
|  | for node, ip in item.items() if ip]) | 
|  | assert len(k8s_hosts) == 1, ( | 
|  | "Found more than one Kubernetes API hosts in pillars:{0}, " | 
|  | "expected one!").format(k8s_hosts) | 
|  | k8s_host = k8s_hosts.pop() | 
|  | assert k8s_host in k8s_proxy_ip, ( | 
|  | "Kubernetes API host:{0} not found in proxies:{} " | 
|  | "on k8s master nodes. K8s proxies are expected on " | 
|  | "nodes with K8s master").format(k8s_host, k8s_proxy_ip) | 
|  | return k8s_host | 
|  |  | 
|  | @property | 
|  | def api(self): | 
|  | if self._api_client is None: | 
|  | self._api_client = cluster.K8sCluster( | 
|  | user=self.__config.k8s_deploy.kubernetes_admin_user, | 
|  | password=self.__config.k8s_deploy.kubernetes_admin_password, | 
|  | host=self.__config.k8s.kube_host, | 
|  | port=self.__config.k8s.kube_apiserver_port, | 
|  | default_namespace='default') | 
|  | return self._api_client | 
|  |  | 
|  | @property | 
|  | def ctl_host(self): | 
|  | nodes = [node for node in self.__config.underlay.ssh if | 
|  | ext.UNDERLAY_NODE_ROLES.k8s_controller in node['roles']] | 
|  | return nodes[0]['node_name'] | 
|  |  | 
|  | def get_pod_phase(self, pod_name, namespace=None): | 
|  | return self.api.pods.get( | 
|  | name=pod_name, namespace=namespace).phase | 
|  |  | 
|  | def wait_pod_phase(self, pod_name, phase, namespace=None, timeout=60): | 
|  | """Wait phase of pod_name from namespace while timeout | 
|  |  | 
|  | :param str: pod_name | 
|  | :param str: namespace | 
|  | :param list or str: phase | 
|  | :param int: timeout | 
|  |  | 
|  | :rtype: None | 
|  | """ | 
|  | if isinstance(phase, str): | 
|  | phase = [phase] | 
|  |  | 
|  | def check(): | 
|  | return self.get_pod_phase(pod_name, namespace) in phase | 
|  |  | 
|  | helpers.wait(check, timeout=timeout, | 
|  | timeout_msg='Timeout waiting, pod {pod_name} is not in ' | 
|  | '"{phase}" phase'.format( | 
|  | pod_name=pod_name, phase=phase)) | 
|  |  | 
|  | def wait_pods_phase(self, pods, phase, timeout=60): | 
|  | """Wait timeout seconds for phase of pods | 
|  |  | 
|  | :param pods: list of K8sPod | 
|  | :param phase: list or str | 
|  | :param timeout: int | 
|  |  | 
|  | :rtype: None | 
|  | """ | 
|  | if isinstance(phase, str): | 
|  | phase = [phase] | 
|  |  | 
|  | def check(pod_name, namespace): | 
|  | return self.get_pod_phase(pod_name, namespace) in phase | 
|  |  | 
|  | def check_all_pods(): | 
|  | return all(check(pod.name, pod.metadata.namespace) for pod in pods) | 
|  |  | 
|  | helpers.wait( | 
|  | check_all_pods, | 
|  | timeout=timeout, | 
|  | timeout_msg='Timeout waiting, pods {0} are not in "{1}" ' | 
|  | 'phase'.format([pod.name for pod in pods], phase)) | 
|  |  | 
|  | def check_pod_create(self, body, namespace=None, timeout=300, interval=5): | 
|  | """Check creating sample pod | 
|  |  | 
|  | :param k8s_pod: V1Pod | 
|  | :param namespace: str | 
|  | :rtype: V1Pod | 
|  | """ | 
|  | LOG.info("Creating pod in k8s cluster") | 
|  | LOG.debug( | 
|  | "POD spec to create:\n{}".format( | 
|  | yaml.dump(body, default_flow_style=False)) | 
|  | ) | 
|  | LOG.debug("Timeout for creation is set to {}".format(timeout)) | 
|  | LOG.debug("Checking interval is set to {}".format(interval)) | 
|  | pod = self.api.pods.create(body=body, namespace=namespace) | 
|  | pod.wait_running(timeout=300, interval=5) | 
|  | LOG.info("Pod '{0}' is created in '{1}' namespace".format( | 
|  | pod.name, pod.namespace)) | 
|  | return self.api.pods.get(name=pod.name, namespace=pod.namespace) | 
|  |  | 
|  | def wait_pod_deleted(self, podname, timeout=60, interval=5): | 
|  | helpers.wait( | 
|  | lambda: podname not in [pod.name for pod in self.api.pods.list()], | 
|  | timeout=timeout, | 
|  | interval=interval, | 
|  | timeout_msg="Pod deletion timeout reached!" | 
|  | ) | 
|  |  | 
|  | def check_pod_delete(self, k8s_pod, timeout=300, interval=5, | 
|  | namespace=None): | 
|  | """Deleting pod from k8s | 
|  |  | 
|  | :param k8s_pod: tcp_tests.managers.k8s.nodes.K8sNode | 
|  | :param k8sclient: tcp_tests.managers.k8s.cluster.K8sCluster | 
|  | """ | 
|  | LOG.info("Deleting pod '{}'".format(k8s_pod.name)) | 
|  | LOG.debug("Pod status:\n{}".format(k8s_pod.status)) | 
|  | LOG.debug("Timeout for deletion is set to {}".format(timeout)) | 
|  | LOG.debug("Checking interval is set to {}".format(interval)) | 
|  | self.api.pods.delete(body=k8s_pod, name=k8s_pod.name, | 
|  | namespace=namespace) | 
|  | self.wait_pod_deleted(k8s_pod.name, timeout, interval) | 
|  | LOG.debug("Pod '{}' is deleted".format(k8s_pod.name)) | 
|  |  | 
|  | def check_service_create(self, body, namespace=None): | 
|  | """Check creating k8s service | 
|  |  | 
|  | :param body: dict, service spec | 
|  | :param namespace: str | 
|  | :rtype: K8sService object | 
|  | """ | 
|  | LOG.info("Creating service in k8s cluster") | 
|  | LOG.debug( | 
|  | "Service spec to create:\n{}".format( | 
|  | yaml.dump(body, default_flow_style=False)) | 
|  | ) | 
|  | service = self.api.services.create(body=body, namespace=namespace) | 
|  | LOG.info("Service '{0}' is created in '{1}' namespace".format( | 
|  | service.name, service.namespace)) | 
|  | return self.api.services.get(name=service.name, | 
|  | namespace=service.namespace) | 
|  |  | 
|  | def check_ds_create(self, body, namespace=None): | 
|  | """Check creating k8s DaemonSet | 
|  |  | 
|  | :param body: dict, DaemonSet spec | 
|  | :param namespace: str | 
|  | :rtype: K8sDaemonSet object | 
|  | """ | 
|  | LOG.info("Creating DaemonSet in k8s cluster") | 
|  | LOG.debug( | 
|  | "DaemonSet spec to create:\n{}".format( | 
|  | yaml.dump(body, default_flow_style=False)) | 
|  | ) | 
|  | ds = self.api.daemonsets.create(body=body, namespace=namespace) | 
|  | LOG.info("DaemonSet '{0}' is created  in '{1}' namespace".format( | 
|  | ds.name, ds.namespace)) | 
|  | return self.api.daemonsets.get(name=ds.name, namespace=ds.namespace) | 
|  |  | 
|  | def check_ds_ready(self, dsname, namespace=None): | 
|  | """Check if k8s DaemonSet is ready | 
|  |  | 
|  | :param dsname: str, ds name | 
|  | :return: bool | 
|  | """ | 
|  | ds = self.api.daemonsets.get(name=dsname, namespace=namespace) | 
|  | return (ds.status.current_number_scheduled == | 
|  | ds.status.desired_number_scheduled) | 
|  |  | 
|  | def wait_ds_ready(self, dsname, namespace=None, timeout=60, interval=5): | 
|  | """Wait until all pods are scheduled on nodes | 
|  |  | 
|  | :param dsname: str, ds name | 
|  | :param timeout: int | 
|  | :param interval: int | 
|  | """ | 
|  | helpers.wait( | 
|  | lambda: self.check_ds_ready(dsname, namespace=namespace), | 
|  | timeout=timeout, interval=interval) | 
|  |  | 
|  | def check_deploy_create(self, body, namespace=None): | 
|  | """Check creating k8s Deployment | 
|  |  | 
|  | :param body: dict, Deployment spec | 
|  | :param namespace: str | 
|  | :rtype: K8sDeployment object | 
|  | """ | 
|  | LOG.info("Creating Deployment in k8s cluster") | 
|  | LOG.debug( | 
|  | "Deployment spec to create:\n{}".format( | 
|  | yaml.dump(body, default_flow_style=False)) | 
|  | ) | 
|  | deploy = self.api.deployments.create(body=body, namespace=namespace) | 
|  | LOG.info("Deployment '{0}' is created  in '{1}' namespace".format( | 
|  | deploy.name, deploy.namespace)) | 
|  | return self.api.deployments.get(name=deploy.name, | 
|  | namespace=deploy.namespace) | 
|  |  | 
|  | def check_deploy_ready(self, deploy_name, namespace=None): | 
|  | """Check if k8s Deployment is ready | 
|  |  | 
|  | :param deploy_name: str, deploy name | 
|  | :return: bool | 
|  | """ | 
|  | deploy = self.api.deployments.get(name=deploy_name, | 
|  | namespace=namespace) | 
|  | return deploy.status.available_replicas == deploy.status.replicas | 
|  |  | 
|  | def wait_deploy_ready(self, deploy_name, namespace=None, timeout=60, | 
|  | interval=5): | 
|  | """Wait until all pods are scheduled on nodes | 
|  |  | 
|  | :param deploy_name: str, deploy name | 
|  | :param timeout: int | 
|  | :param interval: int | 
|  | """ | 
|  | helpers.wait( | 
|  | lambda: self.check_deploy_ready(deploy_name, namespace=namespace), | 
|  | timeout=timeout, interval=interval) | 
|  |  | 
|  | def check_namespace_create(self, name): | 
|  | """Check creating k8s Namespace | 
|  |  | 
|  | :param name: str | 
|  | :rtype: K8sNamespace object | 
|  | """ | 
|  | try: | 
|  | ns = self.api.namespaces.get(name=name) | 
|  | LOG.info("Namespace '{0}' is already exists".format(ns.name)) | 
|  | except ApiException as e: | 
|  | if hasattr(e, "status") and 404 == e.status: | 
|  | LOG.info("Creating Namespace in k8s cluster") | 
|  | ns = self.api.namespaces.create( | 
|  | body={'metadata': {'name': name}}) | 
|  | LOG.info("Namespace '{0}' is created".format(ns.name)) | 
|  | # wait 10 seconds until a token for new service account | 
|  | # is created | 
|  | time.sleep(10) | 
|  | ns = self.api.namespaces.get(name=ns.name) | 
|  | else: | 
|  | raise | 
|  | return ns | 
|  |  | 
|  | def create_objects(self, path): | 
|  | if isinstance(path, str): | 
|  | path = [path] | 
|  | params = ' '.join(["-f {}".format(p) for p in path]) | 
|  | cmd = 'kubectl create {params}'.format(params=params) | 
|  | with self.__underlay.remote( | 
|  | node_name=self.ctl_host) as remote: | 
|  | LOG.info("Running command '{cmd}' on node {node}".format( | 
|  | cmd=cmd, | 
|  | node=remote.hostname) | 
|  | ) | 
|  | result = remote.check_call(cmd) | 
|  | LOG.info(result['stdout']) | 
|  |  | 
|  | def get_running_pods(self, pod_name, namespace=None): | 
|  | pods = [pod for pod in self.api.pods.list(namespace=namespace) | 
|  | if (pod_name in pod.name and pod.status.phase == 'Running')] | 
|  | return pods | 
|  |  | 
|  | def get_pods_number(self, pod_name, namespace=None): | 
|  | pods = self.get_running_pods(pod_name, namespace) | 
|  | return len(pods) | 
|  |  | 
|  | def get_running_pods_by_ssh(self, pod_name, namespace=None): | 
|  | with self.__underlay.remote( | 
|  | node_name=self.ctl_host) as remote: | 
|  | result = remote.check_call("kubectl get pods --namespace {} |" | 
|  | " grep {} | awk '{{print $1 \" \"" | 
|  | " $3}}'".format(namespace, | 
|  | pod_name))['stdout'] | 
|  | running_pods = [data.strip().split()[0] for data in result | 
|  | if data.strip().split()[1] == 'Running'] | 
|  | return running_pods | 
|  |  | 
|  | def get_pods_restarts(self, pod_name, namespace=None): | 
|  | pods = [pod.status.container_statuses[0].restart_count | 
|  | for pod in self.get_running_pods(pod_name, namespace)] | 
|  | return sum(pods) | 
|  |  | 
|  | def run_conformance(self, timeout=60 * 60): | 
|  | with self.__underlay.remote( | 
|  | node_name=self.ctl_host) as remote: | 
|  | result = remote.check_call( | 
|  | "docker run --rm --net=host -e API_SERVER=" | 
|  | "'http://127.0.0.1:8080' {}".format( | 
|  | self.__config.k8s.k8s_conformance_image), | 
|  | timeout=timeout)['stdout'] | 
|  | return result | 
|  |  | 
|  | def get_k8s_masters(self): | 
|  | k8s_masters_fqdn = self._salt.get_pillar(tgt='I@kubernetes:master', | 
|  | pillar='linux:network:fqdn') | 
|  | return [self._K8SManager__underlay.host_by_node_name(node_name=v) | 
|  | for pillar in k8s_masters_fqdn for k, v in pillar.items()] | 
|  |  | 
|  | def kubectl_run(self, name, image, port): | 
|  | with self.__underlay.remote( | 
|  | node_name=self.ctl_host) as remote: | 
|  | result = remote.check_call( | 
|  | "kubectl run {0} --image={1} --port={2}".format( | 
|  | name, image, port | 
|  | ) | 
|  | ) | 
|  | return result | 
|  |  | 
|  | def kubectl_expose(self, resource, name, port, type): | 
|  | with self.__underlay.remote( | 
|  | node_name=self.ctl_host) as remote: | 
|  | result = remote.check_call( | 
|  | "kubectl expose {0} {1} --port={2} --type={3}".format( | 
|  | resource, name, port, type | 
|  | ) | 
|  | ) | 
|  | return result | 
|  |  | 
|  | def kubectl_annotate(self, resource, name, annotation): | 
|  | with self.__underlay.remote( | 
|  | node_name=self.ctl_host) as remote: | 
|  | result = remote.check_call( | 
|  | "kubectl annotate {0} {1} {2}".format( | 
|  | resource, name, annotation | 
|  | ) | 
|  | ) | 
|  | return result | 
|  |  | 
|  | def get_svc_ip(self, name, namespace='kube-system'): | 
|  | with self.__underlay.remote( | 
|  | node_name=self.ctl_host) as remote: | 
|  | result = remote.check_call( | 
|  | "kubectl get svc {0} -n {1} | " | 
|  | "awk '{{print $2}}' | tail -1".format(name, namespace) | 
|  | ) | 
|  | return result['stdout'][0].strip() | 
|  |  | 
|  | @retry(300, exception=DevopsCalledProcessError) | 
|  | def nslookup(self, host, src): | 
|  | with self.__underlay.remote( | 
|  | node_name=self.ctl_host) as remote: | 
|  | remote.check_call("nslookup {0} {1}".format(host, src)) | 
|  |  | 
|  | # ---------------------------- Virtlet methods ------------------------------- | 
|  | def install_jq(self): | 
|  | """Install JQuery on node. Required for changing yamls on the fly. | 
|  |  | 
|  | :return: | 
|  | """ | 
|  | cmd = "apt install jq -y" | 
|  | return self.__underlay.check_call(cmd, node_name=self.ctl_host) | 
|  |  | 
|  | def git_clone(self, project, target): | 
|  | cmd = "git clone {0} {1}".format(project, target) | 
|  | return self.__underlay.check_call(cmd, node_name=self.ctl_host) | 
|  |  | 
|  | def run_vm(self, name=None, yaml_path='~/virtlet/examples/cirros-vm.yaml'): | 
|  | if not name: | 
|  | name = 'virtlet-vm-{}'.format(uuid4()) | 
|  | cmd = ( | 
|  | "kubectl convert -f {0} --local " | 
|  | "-o json | jq '.metadata.name|=\"{1}\"' | kubectl create -f -") | 
|  | self.__underlay.check_call(cmd.format(yaml_path, name), | 
|  | node_name=self.ctl_host) | 
|  | return name | 
|  |  | 
|  | def get_vm_info(self, name, jsonpath="{.status.phase}", expected=None): | 
|  | cmd = "kubectl get po {} -n default".format(name) | 
|  | if jsonpath: | 
|  | cmd += " -o jsonpath={}".format(jsonpath) | 
|  | return self.__underlay.check_call( | 
|  | cmd, node_name=self.ctl_host, expected=expected) | 
|  |  | 
|  | def wait_active_state(self, name, timeout=180): | 
|  | helpers.wait( | 
|  | lambda: self.get_vm_info(name)['stdout'][0] == 'Running', | 
|  | timeout=timeout, | 
|  | timeout_msg="VM {} didn't Running state in {} sec. " | 
|  | "Current state: ".format( | 
|  | name, timeout, self.get_vm_info(name)['stdout'][0])) | 
|  |  | 
|  | def delete_vm(self, name, timeout=180): | 
|  | cmd = "kubectl delete po -n default {}".format(name) | 
|  | self.__underlay.check_call(cmd, node_name=self.ctl_host) | 
|  |  | 
|  | helpers.wait( | 
|  | lambda: | 
|  | "Error from server (NotFound):" in | 
|  | " ".join(self.get_vm_info(name, expected=[0, 1])['stderr']), | 
|  | timeout=timeout, | 
|  | timeout_msg="VM {} didn't Running state in {} sec. " | 
|  | "Current state: ".format( | 
|  | name, timeout, self.get_vm_info(name)['stdout'][0])) | 
|  |  | 
|  | def adjust_cirros_resources( | 
|  | self, cpu=2, memory='256', | 
|  | target_yaml='virtlet/examples/cirros-vm-exp.yaml'): | 
|  | # We will need to change params in case of example change | 
|  | cmd = ("cd ~/virtlet/examples && " | 
|  | "cp cirros-vm.yaml {2} && " | 
|  | "sed -r 's/^(\s*)(VirtletVCPUCount\s*:\s*\"1\"\s*$)/ " | 
|  | "\1VirtletVCPUCount: \"{0}\"/' {2} && " | 
|  | "sed -r 's/^(\s*)(memory\s*:\s*128Mi\s*$)/\1memory: " | 
|  | "{1}Mi/' {2}".format(cpu, memory, target_yaml)) | 
|  | self.__underlay.check_call(cmd, node_name=self.ctl_host) | 
|  |  | 
|  | def get_domain_name(self, vm_name): | 
|  | cmd = ("~/virtlet/examples/virsh.sh list --name | " | 
|  | "grep -i {0} ".format(vm_name)) | 
|  | result = self.__underlay.check_call(cmd, node_name=self.ctl_host) | 
|  | return result['stdout'].strip() | 
|  |  | 
|  | def get_vm_cpu_count(self, domain_name): | 
|  | cmd = ("~/virtlet/examples/virsh.sh dumpxml {0} | " | 
|  | "grep 'cpu' | grep -o '[[:digit:]]*'".format(domain_name)) | 
|  | result = self.__underlay.check_call(cmd, node_name=self.ctl_host) | 
|  | return int(result['stdout'].strip()) | 
|  |  | 
|  | def get_vm_memory_count(self, domain_name): | 
|  | cmd = ("~/virtlet/examples/virsh.sh dumpxml {0} | " | 
|  | "grep 'memory unit' | " | 
|  | "grep -o '[[:digit:]]*'".format(domain_name)) | 
|  | result = self.__underlay.check_call(cmd, node_name=self.ctl_host) | 
|  | return int(result['stdout'].strip()) | 
|  |  | 
|  | def get_domain_id(self, domain_name): | 
|  | cmd = ("virsh dumpxml {} | grep id=\' | " | 
|  | "grep -o [[:digit:]]*".format(domain_name)) | 
|  | result = self.__underlay.check_call(cmd, node_name=self.ctl_host) | 
|  | return int(result['stdout'].strip()) | 
|  |  | 
|  | def list_vm_volumes(self, domain_name): | 
|  | domain_id = self.get_domain_id(domain_name) | 
|  | cmd = ("~/virtlet/examples/virsh.sh domblklist {} | " | 
|  | "tail -n +3 | awk {{'print $2'}}".format(domain_id)) | 
|  | result = self.__underlay.check_call(cmd, node_name=self.ctl_host) | 
|  | return result['stdout'].strip() |