blob: 2212025497a8b7f72a37de5d6b4c2f2d28f9bcf7 [file] [log] [blame]
import pytest
import json
import os
def test_k8s_get_cs_status(local_salt_client):
result = local_salt_client.cmd(
'etcd:server', 'cmd.run',
['kubectl get cs'],
expr_form='pillar'
)
errors = []
if not result:
pytest.skip("k8s is not found on this environment")
for node in result:
for line in result[node].split('\n'):
line = line.strip()
if 'MESSAGE' in line or 'proto' in line:
continue
else:
if 'Healthy' not in line:
errors.append(line)
break
assert not errors, 'k8s is not healthy: {}'.format(json.dumps(
errors,
indent=4))
def test_k8s_get_nodes_status(local_salt_client):
result = local_salt_client.cmd(
'etcd:server', 'cmd.run',
['kubectl get nodes'],
expr_form='pillar'
)
errors = []
if not result:
pytest.skip("k8s is not found on this environment")
for node in result:
for line in result[node].split('\n'):
line = line.strip()
if 'STATUS' in line or 'proto' in line:
continue
else:
if 'Ready' != line.split()[1]:
errors.append(line)
break
assert not errors, 'k8s is not healthy: {}'.format(json.dumps(
errors,
indent=4))
def test_k8s_get_calico_status(local_salt_client):
result = local_salt_client.cmd(
'kubernetes:pool', 'cmd.run',
['calicoctl node status'],
expr_form='pillar'
)
errors = []
if not result:
pytest.skip("k8s is not found on this environment")
for node in result:
for line in result[node].split('\n'):
line = line.strip('|')
if 'STATE' in line or '| ' not in line:
continue
else:
if 'up' not in line or 'Established' not in line:
errors.append(line)
assert not errors, 'Calico node status is not good: {}'.format(json.dumps(
errors,
indent=4))
def test_k8s_cluster_status(local_salt_client):
result = local_salt_client.cmd(
'kubernetes:master', 'cmd.run',
['kubectl cluster-info'],
expr_form='pillar'
)
errors = []
if not result:
pytest.skip("k8s is not found on this environment")
for node in result:
for line in result[node].split('\n'):
if 'proto' in line or 'further' in line or line == '':
continue
else:
if 'is running' not in line:
errors.append(line)
break
assert not errors, 'k8s cluster info is not good: {}'.format(json.dumps(
errors,
indent=4))
def test_k8s_kubelet_status(local_salt_client):
result = local_salt_client.cmd(
'kubernetes:pool', 'service.status',
['kubelet'],
expr_form='pillar'
)
errors = []
if not result:
pytest.skip("k8s is not found on this environment")
for node in result:
if not result[node]:
errors.append(node)
assert not errors, 'Kublete is not running on these nodes: {}'.format(
errors)
def test_k8s_check_system_pods_status(local_salt_client):
result = local_salt_client.cmd(
'etcd:server', 'cmd.run',
['kubectl --namespace="kube-system" get pods'],
expr_form='pillar'
)
errors = []
if not result:
pytest.skip("k8s is not found on this environment")
for node in result:
for line in result[node].split('\n'):
line = line.strip('|')
if 'STATUS' in line or 'proto' in line:
continue
else:
if 'Running' not in line:
errors.append(line)
break
assert not errors, 'Some system pods are not running: {}'.format(json.dumps(
errors,
indent=4))
def test_check_k8s_image_availability(local_salt_client):
# not a test actually
hostname = 'https://docker-dev-virtual.docker.mirantis.net/artifactory/webapp/'
response = os.system('curl -s --insecure {} > /dev/null'.format(hostname))
if response == 0:
print '{} is AVAILABLE'.format(hostname)
else:
print '{} IS NOT AVAILABLE'.format(hostname)
def test_k8s_dashboard_available(local_salt_client):
result = local_salt_client.cmd(
'etcd:server', 'cmd.run',
['kubectl get svc -n kube-system'],
expr_form='pillar'
)
if not result:
pytest.skip("k8s is not found on this environment")
# service name 'kubernetes-dashboard' is hardcoded in kubernetes formula
dashboard_enabled = local_salt_client.cmd(
'etcd:server', 'pillar.get',
['kubernetes:common:addons:dashboard:enabled'],
expr_form='pillar'
).values()[0]
if not dashboard_enabled:
pytest.skip("Kubernetes dashboard is not enabled in the cluster.")
external_ip = local_salt_client.cmd(
'etcd:server', 'pillar.get',
['kubernetes:common:addons:dashboard:public_ip'],
expr_form='pillar'
).values()[0]
# dashboard port 8443 is hardcoded in kubernetes formula
url = "https://{}:8443".format(external_ip)
check = local_salt_client.cmd(
'etcd:server', 'cmd.run',
['curl {} 2>&1 | grep kubernetesDashboard'.format(url)],
expr_form='pillar'
)
assert len(check.values()[0]) != 0, \
'Kubernetes dashboard is not reachable on {} ' \
'from ctl nodes'.format(url)