blob: 8f526040ebcfc6c2da80ce034098dc2bec26a72b [file] [log] [blame]
import pytest
from si_tests import settings
from si_tests.clients import k8s as k8s_client
from si_tests.fixtures.kubectl import kcm_manager
@pytest.fixture
def ready_child_cluster(request, kcm_manager):
namespace, name = request.param
return namespace, name
def pytest_generate_tests(metafunc):
if "ready_child_cluster" in metafunc.fixturenames:
clds = get_ready_cluster_deployments(kcm_manager())
metafunc.parametrize("ready_child_cluster",
range(len(clds)), indirect=True)
def get_ready_cluster_deployments(kcm_manager):
ready_clds = []
clds = kcm_manager.list_all_clusterdeployments()
for cld in clds:
conditions = cld.data.get("status", {}).get("conditions", [])
for cond in conditions:
if cond["type"] == "Ready" and cond["status"] == "True":
ready_clds.append((cld.namespace, cld.name))
break
return ready_clds
def check_cluster_deployment_exists(kcm_manager, namespace, name) -> bool:
clds = kcm_manager.list_all_clusterdeployments()
return any(cld.namespace == namespace and cld.name == name for cld in clds)
def is_kof_installed(kcm_manager) -> bool:
# Checks if 'kof' ns is present at mothership
all_ns = kcm_manager.api.namespaces.list()
# If yes, checks if there is any *kof* named cld deployed
if "kof" in [n.name for n in all_ns]:
ready_cld_names = [cluster[1] for cluster in
get_ready_cluster_deployments(kcm_manager)]
if any("kof" in name for name in ready_cld_names):
return True
return False
@pytest.mark.sanity
def test_k0rdent_mgmt_object_is_ready(kcm_manager):
assert kcm_manager.mgmt.ready == True,\
f"Management 'kcm' object is not ready"
@pytest.mark.sanity
def test_cluster_deployments_are_ready(kcm_manager):
not_ready_clds = []
clds = kcm_manager.list_all_clusterdeployments()
for cld in clds:
conditions = cld.data.get("status", {}).get("conditions", [])
for cond in conditions:
if cond["type"] == "Ready" and cond["status"] != "True":
not_ready_clds.append((cld.namespace, cld.name))
break
assert not_ready_clds == [],\
f"There are some cluster deployments not ready: {not_ready_clds}"
@pytest.mark.sanity
def test_provider_templates_are_valid(kcm_manager):
k8s = k8s_client.K8sCluster(kubeconfig=settings.KUBECONFIG_PATH)
provider_templates = k8s.k0rdent_provider_templates.list_raw().items
invalid_res = []
for pt in provider_templates:
if not pt.status['valid']:
invalid_res.append({"name": pt.metadata.name,
"valid": pt.status['valid']})
assert not invalid_res, f"Invalid provider templates found: {invalid_res}"
@pytest.mark.sanity
def test_cluster_templates_are_valid(kcm_manager):
k8s = k8s_client.K8sCluster(kubeconfig=settings.KUBECONFIG_PATH)
cluster_templates = k8s.k0rdent_cluster_templates.list_raw().items
invalid_res = []
for ct in cluster_templates:
if not ct.status['valid']:
invalid_res.append({"name": ct.metadata.name,
"valid": ct.status['valid']})
assert not invalid_res, f"Invalid cluster templates found: {invalid_res}"
@pytest.mark.sanity
def test_service_templates_are_valid(kcm_manager):
k8s = k8s_client.K8sCluster(kubeconfig=settings.KUBECONFIG_PATH)
service_templates = k8s.k0rdent_service_templates.list_raw().items
invalid_res = []
for st in service_templates:
if not st.status['valid']:
invalid_res.append({"name": st.metadata.name,
"valid": st.status['valid']})
assert not invalid_res, f"Invalid service templates found: {invalid_res}"
@pytest.mark.sanity
def test_k0rdent_mgmt_pods_are_ready(
kcm_manager, namespaces=None, allowed_phases=None):
if namespaces is None:
namespaces = ["kcm-system", "projectsveltos"]
if is_kof_installed(kcm_manager):
namespaces.append("kof")
if allowed_phases is None:
allowed_phases = ("Running", "Succeeded")
for ns in namespaces:
pods = kcm_manager.api.pods.list(namespace=ns)
assert pods, f"No pods found in namespace '{ns}'"
for pod in pods:
phase = pod.data["status"]["phase"]
assert phase in allowed_phases, (
f"Pod '{pod.name}' in namespace '{ns}' is in phase '{phase}', "
f"expected one of {allowed_phases}"
)
@pytest.mark.sanity
def test_k0rdent_mgmt_nodes_are_ready(kcm_manager):
nodes = kcm_manager.api.nodes.list()
assert nodes, "No nodes found in the cluster"
for node in nodes:
conditions = node.data["status"]["conditions"]
ready_condition = next((c for c in conditions if c["type"] == "Ready"),
None)
assert ready_condition is not None,\
f"Node '{node.name}' has no Ready condition"
assert ready_condition["status"] == "True",\
f"Node '{node.name}' is not Ready"
@pytest.mark.sanity
def test_certificates_are_ready(kcm_manager):
certs = kcm_manager.api.api_custom.list_cluster_custom_object(
group="cert-manager.io",
version="v1",
plural="certificates",
).get("items", [])
assert certs, "No certificates found in the cluster"
not_ready = []
for cert in certs:
name = cert["metadata"]["name"]
namespace = cert["metadata"]["namespace"]
conditions = cert.get("status", {}).get("conditions", [])
ready_condition = next((c for c in conditions if c["type"] == "Ready"),
None)
if not ready_condition or ready_condition.get("status") != "True":
not_ready.append(f"{namespace}/{name}")
assert not not_ready, f"Some certificates are not Ready: {not_ready}"
@pytest.mark.sanity
def test_cluster_summaries_features_are_provisioned(kcm_manager):
cluster_summaries = kcm_manager.api.api_custom.list_cluster_custom_object(
group="config.projectsveltos.io",
version="v1beta1",
plural="clustersummaries",
).get("items", [])
not_provisioned = []
for summary in cluster_summaries:
name = summary["metadata"]["name"]
namespace = summary["metadata"].get("namespace", "")
summaries = summary.get("status", {}).get("featureSummaries", [])
for feature in summaries:
feature_id = feature.get("featureID")
status = feature.get("status")
if status != "Provisioned":
not_provisioned.append(
f"{namespace}/{name} → {feature_id}: {status}")
assert not not_provisioned, (
"Some ClusterSummaries have non-Provisioned features:\n"
+ "\n".join(not_provisioned)
)
@pytest.mark.sanity_targeted
def test_target_child_cluster_is_ready(kcm_manager):
ns = kcm_manager.get_namespace(settings.TARGET_NAMESPACE)
cld = ns.get_cluster_deployment(settings.TARGET_CLD)
if not check_cluster_deployment_exists(
kcm_manager, settings.TARGET_NAMESPACE, settings.TARGET_CLD):
pytest.skip(f"Target cluster deployment '{cld.name}' is not found in "
f"namespace '{settings.TARGET_NAMESPACE}'. Please check "
f"TARGET_NAMESPACE and TARGET_CLD env vars.")
cld.check.check_cluster_readiness(timeout=600)
cld.check.check_k8s_pods()
cld.check.check_k8s_nodes()
@pytest.mark.sanity
def test_child_clusters_are_ready(kcm_manager, subtests):
ready_clds = get_ready_cluster_deployments(kcm_manager)
assert ready_clds, "No ready child clusters found"
for namespace, cld_name in ready_clds:
label = f"{cld_name} ({namespace})"
with subtests.test(cluster=label):
ns = kcm_manager.get_namespace(namespace)
cld = ns.get_cluster_deployment(cld_name)
cld.check.check_cluster_readiness(timeout=600)
cld.check.check_k8s_pods()
cld.check.check_k8s_nodes()