blob: f16be6eebaa027d00e6bca1dc41b5230e66ac07b [file] [log] [blame]
Artem Panchenko0594cd72017-06-12 13:25:26 +03001# Copyright 2017 Mirantis, Inc.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain
5# a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations
13# under the License.
14
15import time
16
17import yaml
18
19from devops.helpers import helpers
20
21from tcp_tests import logger
22from tcp_tests.managers.execute_commands import ExecuteCommandsMixin
23from tcp_tests.managers.k8s import cluster
24
25
26LOG = logger.logger
27
28
29class K8SManager(ExecuteCommandsMixin):
30 """docstring for K8SManager"""
31
32 __config = None
33 __underlay = None
34
35 def __init__(self, config, underlay, salt):
36 self.__config = config
37 self.__underlay = underlay
38 self._salt = salt
39 self._api_client = None
40 super(K8SManager, self).__init__(
41 config=config, underlay=underlay)
42
43 def install(self, commands):
44 self.execute_commands(commands,
45 label='Install Kubernetes services')
46 self.__config.k8s.k8s_installed = True
47 self.__config.k8s.kube_host = self.get_proxy_api()
48
49 def get_proxy_api(self):
50 k8s_proxy_ip_pillars = self._salt.get_pillar(
vrovachev99228d32017-06-08 19:46:10 +040051 tgt='I@haproxy:proxy:enabled:true and I@kubernetes:master',
Artem Panchenko0594cd72017-06-12 13:25:26 +030052 pillar='haproxy:proxy:listen:k8s_secure:binds:address')
vrovachev99228d32017-06-08 19:46:10 +040053 k8s_hosts = self._salt.get_pillar(
54 tgt='I@haproxy:proxy:enabled:true and I@kubernetes:master',
55 pillar='kubernetes:pool:apiserver:host')
Artem Panchenko0594cd72017-06-12 13:25:26 +030056 k8s_proxy_ip = set([ip
57 for item in k8s_proxy_ip_pillars
vrovachev99228d32017-06-08 19:46:10 +040058 for node,ip in item.items() if ip])
59 k8s_hosts = set([ip
60 for item in k8s_hosts
61 for node,ip in item.items() if ip])
62 assert len(k8s_hosts) == 1, (
63 "Found more than one Kubernetes API hosts in pillars:{0}, "
64 "expected one!").format(k8s_hosts)
65 k8s_host = k8s_hosts.pop()
66 assert k8s_host in k8s_proxy_ip, (
67 "Kubernetes API host:{0} not found in proxies:{} "
68 "on k8s master nodes. K8s proxies are expected on "
69 "nodes with K8s master").format(k8s_host, k8s_proxy_ip)
70 return k8s_host
Artem Panchenko0594cd72017-06-12 13:25:26 +030071
72 @property
73 def api(self):
74 if self._api_client is None:
75 self._api_client = cluster.K8sCluster(
76 user=self.__config.k8s_deploy.kubernetes_admin_user,
77 password=self.__config.k8s_deploy.kubernetes_admin_password,
78 host=self.__config.k8s.kube_host,
79 port=self.__config.k8s.kube_apiserver_port,
80 default_namespace='default')
81 return self._api_client
82
83 def get_pod_phase(self, pod_name, namespace=None):
84 return self.api.pods.get(
85 name=pod_name, namespace=namespace).phase
86
87 def wait_pod_phase(self, pod_name, phase, namespace=None, timeout=60):
88 """Wait phase of pod_name from namespace while timeout
89
90 :param str: pod_name
91 :param str: namespace
92 :param list or str: phase
93 :param int: timeout
94
95 :rtype: None
96 """
97 if isinstance(phase, str):
98 phase = [phase]
99
100 def check():
101 return self.get_pod_phase(pod_name, namespace) in phase
102
103 helpers.wait(check, timeout=timeout,
104 timeout_msg='Timeout waiting, pod {pod_name} is not in '
105 '"{phase}" phase'.format(
106 pod_name=pod_name, phase=phase))
107
108 def wait_pods_phase(self, pods, phase, timeout=60):
109 """Wait timeout seconds for phase of pods
110
111 :param pods: list of K8sPod
112 :param phase: list or str
113 :param timeout: int
114
115 :rtype: None
116 """
117 if isinstance(phase, str):
118 phase = [phase]
119
120 def check(pod_name, namespace):
121 return self.get_pod_phase(pod_name, namespace) in phase
122
123 def check_all_pods():
124 return all(check(pod.name, pod.metadata.namespace) for pod in pods)
125
126 helpers.wait(
127 check_all_pods,
128 timeout=timeout,
129 timeout_msg='Timeout waiting, pods {0} are not in "{1}" '
130 'phase'.format([pod.name for pod in pods], phase))
131
132 def check_pod_create(self, body, namespace=None, timeout=300, interval=5):
133 """Check creating sample pod
134
135 :param k8s_pod: V1Pod
136 :param namespace: str
137 :rtype: V1Pod
138 """
139 LOG.info("Creating pod in k8s cluster")
140 LOG.debug(
141 "POD spec to create:\n{}".format(
142 yaml.dump(body, default_flow_style=False))
143 )
144 LOG.debug("Timeout for creation is set to {}".format(timeout))
145 LOG.debug("Checking interval is set to {}".format(interval))
146 pod = self.api.pods.create(body=body, namespace=namespace)
147 pod.wait_running(timeout=300, interval=5)
148 LOG.info("Pod '{0}' is created in '{1}' namespace".format(
149 pod.name, pod.namespace))
150 return self.api.pods.get(name=pod.name, namespace=pod.namespace)
151
152 def wait_pod_deleted(self, podname, timeout=60, interval=5):
153 helpers.wait(
154 lambda: podname not in [pod.name for pod in self.api.pods.list()],
155 timeout=timeout,
156 interval=interval,
157 timeout_msg="Pod deletion timeout reached!"
158 )
159
160 def check_pod_delete(self, k8s_pod, timeout=300, interval=5,
161 namespace=None):
162 """Deleting pod from k8s
163
164 :param k8s_pod: tcp_tests.managers.k8s.nodes.K8sNode
165 :param k8sclient: tcp_tests.managers.k8s.cluster.K8sCluster
166 """
167 LOG.info("Deleting pod '{}'".format(k8s_pod.name))
168 LOG.debug("Pod status:\n{}".format(k8s_pod.status))
169 LOG.debug("Timeout for deletion is set to {}".format(timeout))
170 LOG.debug("Checking interval is set to {}".format(interval))
171 self.api.pods.delete(body=k8s_pod, name=k8s_pod.name,
172 namespace=namespace)
173 self.wait_pod_deleted(k8s_pod.name, timeout, interval)
174 LOG.debug("Pod '{}' is deleted".format(k8s_pod.name))
175
176 def check_service_create(self, body, namespace=None):
177 """Check creating k8s service
178
179 :param body: dict, service spec
180 :param namespace: str
181 :rtype: K8sService object
182 """
183 LOG.info("Creating service in k8s cluster")
184 LOG.debug(
185 "Service spec to create:\n{}".format(
186 yaml.dump(body, default_flow_style=False))
187 )
188 service = self.api.services.create(body=body, namespace=namespace)
189 LOG.info("Service '{0}' is created in '{1}' namespace".format(
190 service.name, service.namespace))
191 return self.api.services.get(name=service.name,
192 namespace=service.namespace)
193
194 def check_ds_create(self, body, namespace=None):
195 """Check creating k8s DaemonSet
196
197 :param body: dict, DaemonSet spec
198 :param namespace: str
199 :rtype: K8sDaemonSet object
200 """
201 LOG.info("Creating DaemonSet in k8s cluster")
202 LOG.debug(
203 "DaemonSet spec to create:\n{}".format(
204 yaml.dump(body, default_flow_style=False))
205 )
206 ds = self.api.daemonsets.create(body=body, namespace=namespace)
207 LOG.info("DaemonSet '{0}' is created in '{1}' namespace".format(
208 ds.name, ds.namespace))
209 return self.api.daemonsets.get(name=ds.name, namespace=ds.namespace)
210
211 def check_ds_ready(self, dsname, namespace=None):
212 """Check if k8s DaemonSet is ready
213
214 :param dsname: str, ds name
215 :return: bool
216 """
217 ds = self.api.daemonsets.get(name=dsname, namespace=namespace)
218 return (ds.status.current_number_scheduled ==
219 ds.status.desired_number_scheduled)
220
221 def wait_ds_ready(self, dsname, namespace=None, timeout=60, interval=5):
222 """Wait until all pods are scheduled on nodes
223
224 :param dsname: str, ds name
225 :param timeout: int
226 :param interval: int
227 """
228 helpers.wait(
229 lambda: self.check_ds_ready(dsname, namespace=namespace),
230 timeout=timeout, interval=interval)
231
232 def check_namespace_create(self, name):
233 """Check creating k8s Namespace
234
235 :param name: str
236 :rtype: K8sNamespace object
237 """
238 LOG.info("Creating Namespace in k8s cluster")
239 ns = self.api.namespaces.create(body={'metadata': {'name': name}})
240 LOG.info("Namespace '{0}' is created".format(ns.name))
241 # wait 10 seconds until a token for new service account is created
242 time.sleep(10)
243 return self.api.namespaces.get(name=ns.name)
244
245 def create_objects(self, path):
246 if isinstance(path, str):
247 path = [path]
248 params = ' '.join(["-f {}".format(p) for p in path])
249 cmd = 'kubectl create {params}'.format(params=params)
250 with self.__underlay.remote(
251 host=self.__config.k8s.kube_host) as remote:
252 LOG.info("Running command '{cmd}' on node {node}".format(
253 cmd=cmd,
254 node=remote.hostname)
255 )
256 result = remote.check_call(cmd)
257 LOG.info(result['stdout'])
258
259 def get_running_pods(self, pod_name, namespace=None):
260 pods = [pod for pod in self.api.pods.list(namespace=namespace)
261 if (pod_name in pod.name and pod.status.phase == 'Running')]
262 return pods
263
264 def get_pods_number(self, pod_name, namespace=None):
265 pods = self.get_running_pods(pod_name, namespace)
266 return len(pods)
267
268 def get_running_pods_by_ssh(self, pod_name, namespace=None):
269 with self.__underlay.remote(
270 host=self.__config.k8s.kube_host) as remote:
271 result = remote.check_call("kubectl get pods --namespace {} |"
272 " grep {} | awk '{{print $1 \" \""
273 " $3}}'".format(namespace,
274 pod_name))['stdout']
275 running_pods = [data.strip().split()[0] for data in result
276 if data.strip().split()[1] == 'Running']
277 return running_pods
278
279 def get_pods_restarts(self, pod_name, namespace=None):
280 pods = [pod.status.container_statuses[0].restart_count
281 for pod in self.get_running_pods(pod_name, namespace)]
282 return sum(pods)