blob: 380143648bbe540c0f0f7b2f881a1a8b9c2f24b3 [file] [log] [blame]
Artem Panchenko0594cd72017-06-12 13:25:26 +03001# Copyright 2017 Mirantis, Inc.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain
5# a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations
13# under the License.
14
15import time
16
17import yaml
18
19from devops.helpers import helpers
20
21from tcp_tests import logger
22from tcp_tests.managers.execute_commands import ExecuteCommandsMixin
23from tcp_tests.managers.k8s import cluster
24
25
26LOG = logger.logger
27
28
29class K8SManager(ExecuteCommandsMixin):
30 """docstring for K8SManager"""
31
32 __config = None
33 __underlay = None
34
35 def __init__(self, config, underlay, salt):
36 self.__config = config
37 self.__underlay = underlay
38 self._salt = salt
39 self._api_client = None
40 super(K8SManager, self).__init__(
41 config=config, underlay=underlay)
42
43 def install(self, commands):
44 self.execute_commands(commands,
45 label='Install Kubernetes services')
46 self.__config.k8s.k8s_installed = True
47 self.__config.k8s.kube_host = self.get_proxy_api()
48
49 def get_proxy_api(self):
50 k8s_proxy_ip_pillars = self._salt.get_pillar(
51 tgt='I@haproxy:proxy:enabled:true',
52 pillar='haproxy:proxy:listen:k8s_secure:binds:address')
53 k8s_proxy_ip = set([ip
54 for item in k8s_proxy_ip_pillars
55 for node,ip in item.items()])
56 assert len(k8s_proxy_ip) == 1, \
57 ("Found {0} Kubernetes endpoints in pillars,"
58 " expected one!").format(len(k8s_proxy_ip))
59 return k8s_proxy_ip.pop()
60
61 @property
62 def api(self):
63 if self._api_client is None:
64 self._api_client = cluster.K8sCluster(
65 user=self.__config.k8s_deploy.kubernetes_admin_user,
66 password=self.__config.k8s_deploy.kubernetes_admin_password,
67 host=self.__config.k8s.kube_host,
68 port=self.__config.k8s.kube_apiserver_port,
69 default_namespace='default')
70 return self._api_client
71
72 def get_pod_phase(self, pod_name, namespace=None):
73 return self.api.pods.get(
74 name=pod_name, namespace=namespace).phase
75
76 def wait_pod_phase(self, pod_name, phase, namespace=None, timeout=60):
77 """Wait phase of pod_name from namespace while timeout
78
79 :param str: pod_name
80 :param str: namespace
81 :param list or str: phase
82 :param int: timeout
83
84 :rtype: None
85 """
86 if isinstance(phase, str):
87 phase = [phase]
88
89 def check():
90 return self.get_pod_phase(pod_name, namespace) in phase
91
92 helpers.wait(check, timeout=timeout,
93 timeout_msg='Timeout waiting, pod {pod_name} is not in '
94 '"{phase}" phase'.format(
95 pod_name=pod_name, phase=phase))
96
97 def wait_pods_phase(self, pods, phase, timeout=60):
98 """Wait timeout seconds for phase of pods
99
100 :param pods: list of K8sPod
101 :param phase: list or str
102 :param timeout: int
103
104 :rtype: None
105 """
106 if isinstance(phase, str):
107 phase = [phase]
108
109 def check(pod_name, namespace):
110 return self.get_pod_phase(pod_name, namespace) in phase
111
112 def check_all_pods():
113 return all(check(pod.name, pod.metadata.namespace) for pod in pods)
114
115 helpers.wait(
116 check_all_pods,
117 timeout=timeout,
118 timeout_msg='Timeout waiting, pods {0} are not in "{1}" '
119 'phase'.format([pod.name for pod in pods], phase))
120
121 def check_pod_create(self, body, namespace=None, timeout=300, interval=5):
122 """Check creating sample pod
123
124 :param k8s_pod: V1Pod
125 :param namespace: str
126 :rtype: V1Pod
127 """
128 LOG.info("Creating pod in k8s cluster")
129 LOG.debug(
130 "POD spec to create:\n{}".format(
131 yaml.dump(body, default_flow_style=False))
132 )
133 LOG.debug("Timeout for creation is set to {}".format(timeout))
134 LOG.debug("Checking interval is set to {}".format(interval))
135 pod = self.api.pods.create(body=body, namespace=namespace)
136 pod.wait_running(timeout=300, interval=5)
137 LOG.info("Pod '{0}' is created in '{1}' namespace".format(
138 pod.name, pod.namespace))
139 return self.api.pods.get(name=pod.name, namespace=pod.namespace)
140
141 def wait_pod_deleted(self, podname, timeout=60, interval=5):
142 helpers.wait(
143 lambda: podname not in [pod.name for pod in self.api.pods.list()],
144 timeout=timeout,
145 interval=interval,
146 timeout_msg="Pod deletion timeout reached!"
147 )
148
149 def check_pod_delete(self, k8s_pod, timeout=300, interval=5,
150 namespace=None):
151 """Deleting pod from k8s
152
153 :param k8s_pod: tcp_tests.managers.k8s.nodes.K8sNode
154 :param k8sclient: tcp_tests.managers.k8s.cluster.K8sCluster
155 """
156 LOG.info("Deleting pod '{}'".format(k8s_pod.name))
157 LOG.debug("Pod status:\n{}".format(k8s_pod.status))
158 LOG.debug("Timeout for deletion is set to {}".format(timeout))
159 LOG.debug("Checking interval is set to {}".format(interval))
160 self.api.pods.delete(body=k8s_pod, name=k8s_pod.name,
161 namespace=namespace)
162 self.wait_pod_deleted(k8s_pod.name, timeout, interval)
163 LOG.debug("Pod '{}' is deleted".format(k8s_pod.name))
164
165 def check_service_create(self, body, namespace=None):
166 """Check creating k8s service
167
168 :param body: dict, service spec
169 :param namespace: str
170 :rtype: K8sService object
171 """
172 LOG.info("Creating service in k8s cluster")
173 LOG.debug(
174 "Service spec to create:\n{}".format(
175 yaml.dump(body, default_flow_style=False))
176 )
177 service = self.api.services.create(body=body, namespace=namespace)
178 LOG.info("Service '{0}' is created in '{1}' namespace".format(
179 service.name, service.namespace))
180 return self.api.services.get(name=service.name,
181 namespace=service.namespace)
182
183 def check_ds_create(self, body, namespace=None):
184 """Check creating k8s DaemonSet
185
186 :param body: dict, DaemonSet spec
187 :param namespace: str
188 :rtype: K8sDaemonSet object
189 """
190 LOG.info("Creating DaemonSet in k8s cluster")
191 LOG.debug(
192 "DaemonSet spec to create:\n{}".format(
193 yaml.dump(body, default_flow_style=False))
194 )
195 ds = self.api.daemonsets.create(body=body, namespace=namespace)
196 LOG.info("DaemonSet '{0}' is created in '{1}' namespace".format(
197 ds.name, ds.namespace))
198 return self.api.daemonsets.get(name=ds.name, namespace=ds.namespace)
199
200 def check_ds_ready(self, dsname, namespace=None):
201 """Check if k8s DaemonSet is ready
202
203 :param dsname: str, ds name
204 :return: bool
205 """
206 ds = self.api.daemonsets.get(name=dsname, namespace=namespace)
207 return (ds.status.current_number_scheduled ==
208 ds.status.desired_number_scheduled)
209
210 def wait_ds_ready(self, dsname, namespace=None, timeout=60, interval=5):
211 """Wait until all pods are scheduled on nodes
212
213 :param dsname: str, ds name
214 :param timeout: int
215 :param interval: int
216 """
217 helpers.wait(
218 lambda: self.check_ds_ready(dsname, namespace=namespace),
219 timeout=timeout, interval=interval)
220
221 def check_namespace_create(self, name):
222 """Check creating k8s Namespace
223
224 :param name: str
225 :rtype: K8sNamespace object
226 """
227 LOG.info("Creating Namespace in k8s cluster")
228 ns = self.api.namespaces.create(body={'metadata': {'name': name}})
229 LOG.info("Namespace '{0}' is created".format(ns.name))
230 # wait 10 seconds until a token for new service account is created
231 time.sleep(10)
232 return self.api.namespaces.get(name=ns.name)
233
234 def create_objects(self, path):
235 if isinstance(path, str):
236 path = [path]
237 params = ' '.join(["-f {}".format(p) for p in path])
238 cmd = 'kubectl create {params}'.format(params=params)
239 with self.__underlay.remote(
240 host=self.__config.k8s.kube_host) as remote:
241 LOG.info("Running command '{cmd}' on node {node}".format(
242 cmd=cmd,
243 node=remote.hostname)
244 )
245 result = remote.check_call(cmd)
246 LOG.info(result['stdout'])
247
248 def get_running_pods(self, pod_name, namespace=None):
249 pods = [pod for pod in self.api.pods.list(namespace=namespace)
250 if (pod_name in pod.name and pod.status.phase == 'Running')]
251 return pods
252
253 def get_pods_number(self, pod_name, namespace=None):
254 pods = self.get_running_pods(pod_name, namespace)
255 return len(pods)
256
257 def get_running_pods_by_ssh(self, pod_name, namespace=None):
258 with self.__underlay.remote(
259 host=self.__config.k8s.kube_host) as remote:
260 result = remote.check_call("kubectl get pods --namespace {} |"
261 " grep {} | awk '{{print $1 \" \""
262 " $3}}'".format(namespace,
263 pod_name))['stdout']
264 running_pods = [data.strip().split()[0] for data in result
265 if data.strip().split()[1] == 'Running']
266 return running_pods
267
268 def get_pods_restarts(self, pod_name, namespace=None):
269 pods = [pod.status.container_statuses[0].restart_count
270 for pod in self.get_running_pods(pod_name, namespace)]
271 return sum(pods)