blob: 8f526040ebcfc6c2da80ce034098dc2bec26a72b [file] [log] [blame]
Ievgeniia Zadorozhna86ab37a2025-09-11 12:58:49 +02001import pytest
2
3from si_tests import settings
4from si_tests.clients import k8s as k8s_client
5from si_tests.fixtures.kubectl import kcm_manager
6
7
8@pytest.fixture
9def ready_child_cluster(request, kcm_manager):
10 namespace, name = request.param
11 return namespace, name
12
13
14def pytest_generate_tests(metafunc):
15 if "ready_child_cluster" in metafunc.fixturenames:
16 clds = get_ready_cluster_deployments(kcm_manager())
17 metafunc.parametrize("ready_child_cluster",
18 range(len(clds)), indirect=True)
19
20
21def get_ready_cluster_deployments(kcm_manager):
22 ready_clds = []
23 clds = kcm_manager.list_all_clusterdeployments()
24
25 for cld in clds:
26 conditions = cld.data.get("status", {}).get("conditions", [])
27 for cond in conditions:
28 if cond["type"] == "Ready" and cond["status"] == "True":
29 ready_clds.append((cld.namespace, cld.name))
30 break
31 return ready_clds
32
33
34def check_cluster_deployment_exists(kcm_manager, namespace, name) -> bool:
35 clds = kcm_manager.list_all_clusterdeployments()
36 return any(cld.namespace == namespace and cld.name == name for cld in clds)
37
38
Ievgeniia Zadorozhna7dabab82025-10-24 18:37:18 +020039def is_kof_installed(kcm_manager) -> bool:
40 # Checks if 'kof' ns is present at mothership
41 all_ns = kcm_manager.api.namespaces.list()
42 # If yes, checks if there is any *kof* named cld deployed
43 if "kof" in [n.name for n in all_ns]:
44 ready_cld_names = [cluster[1] for cluster in
45 get_ready_cluster_deployments(kcm_manager)]
46 if any("kof" in name for name in ready_cld_names):
47 return True
48 return False
49
50
Ievgeniia Zadorozhna86ab37a2025-09-11 12:58:49 +020051@pytest.mark.sanity
Ievgeniia Zadorozhna7dabab82025-10-24 18:37:18 +020052def test_k0rdent_mgmt_object_is_ready(kcm_manager):
Ievgeniia Zadorozhna86ab37a2025-09-11 12:58:49 +020053 assert kcm_manager.mgmt.ready == True,\
54 f"Management 'kcm' object is not ready"
55
56
57@pytest.mark.sanity
58def test_cluster_deployments_are_ready(kcm_manager):
59 not_ready_clds = []
60 clds = kcm_manager.list_all_clusterdeployments()
61 for cld in clds:
62 conditions = cld.data.get("status", {}).get("conditions", [])
63 for cond in conditions:
64 if cond["type"] == "Ready" and cond["status"] != "True":
65 not_ready_clds.append((cld.namespace, cld.name))
66 break
67 assert not_ready_clds == [],\
68 f"There are some cluster deployments not ready: {not_ready_clds}"
69
70
71@pytest.mark.sanity
72def test_provider_templates_are_valid(kcm_manager):
73 k8s = k8s_client.K8sCluster(kubeconfig=settings.KUBECONFIG_PATH)
74 provider_templates = k8s.k0rdent_provider_templates.list_raw().items
75 invalid_res = []
76 for pt in provider_templates:
77 if not pt.status['valid']:
78 invalid_res.append({"name": pt.metadata.name,
79 "valid": pt.status['valid']})
80 assert not invalid_res, f"Invalid provider templates found: {invalid_res}"
81
82
83@pytest.mark.sanity
84def test_cluster_templates_are_valid(kcm_manager):
85 k8s = k8s_client.K8sCluster(kubeconfig=settings.KUBECONFIG_PATH)
86 cluster_templates = k8s.k0rdent_cluster_templates.list_raw().items
87 invalid_res = []
88 for ct in cluster_templates:
89 if not ct.status['valid']:
90 invalid_res.append({"name": ct.metadata.name,
91 "valid": ct.status['valid']})
92 assert not invalid_res, f"Invalid cluster templates found: {invalid_res}"
93
94
95@pytest.mark.sanity
96def test_service_templates_are_valid(kcm_manager):
97 k8s = k8s_client.K8sCluster(kubeconfig=settings.KUBECONFIG_PATH)
98 service_templates = k8s.k0rdent_service_templates.list_raw().items
99 invalid_res = []
100 for st in service_templates:
101 if not st.status['valid']:
102 invalid_res.append({"name": st.metadata.name,
103 "valid": st.status['valid']})
104 assert not invalid_res, f"Invalid service templates found: {invalid_res}"
105
106
107@pytest.mark.sanity
Ievgeniia Zadorozhna7dabab82025-10-24 18:37:18 +0200108def test_k0rdent_mgmt_pods_are_ready(
Ievgeniia Zadorozhna86ab37a2025-09-11 12:58:49 +0200109 kcm_manager, namespaces=None, allowed_phases=None):
Ievgeniia Zadorozhna86ab37a2025-09-11 12:58:49 +0200110 if namespaces is None:
111 namespaces = ["kcm-system", "projectsveltos"]
Ievgeniia Zadorozhna7dabab82025-10-24 18:37:18 +0200112 if is_kof_installed(kcm_manager):
113 namespaces.append("kof")
Ievgeniia Zadorozhna86ab37a2025-09-11 12:58:49 +0200114 if allowed_phases is None:
115 allowed_phases = ("Running", "Succeeded")
116
117 for ns in namespaces:
118 pods = kcm_manager.api.pods.list(namespace=ns)
119 assert pods, f"No pods found in namespace '{ns}'"
120
121 for pod in pods:
122 phase = pod.data["status"]["phase"]
123 assert phase in allowed_phases, (
124 f"Pod '{pod.name}' in namespace '{ns}' is in phase '{phase}', "
125 f"expected one of {allowed_phases}"
126 )
127
128
129@pytest.mark.sanity
Ievgeniia Zadorozhna7dabab82025-10-24 18:37:18 +0200130def test_k0rdent_mgmt_nodes_are_ready(kcm_manager):
Ievgeniia Zadorozhna86ab37a2025-09-11 12:58:49 +0200131 nodes = kcm_manager.api.nodes.list()
132 assert nodes, "No nodes found in the cluster"
133
134 for node in nodes:
135 conditions = node.data["status"]["conditions"]
136 ready_condition = next((c for c in conditions if c["type"] == "Ready"),
137 None)
138 assert ready_condition is not None,\
139 f"Node '{node.name}' has no Ready condition"
140 assert ready_condition["status"] == "True",\
141 f"Node '{node.name}' is not Ready"
142
143
Ievgeniia Zadorozhna7dabab82025-10-24 18:37:18 +0200144@pytest.mark.sanity
145def test_certificates_are_ready(kcm_manager):
146 certs = kcm_manager.api.api_custom.list_cluster_custom_object(
147 group="cert-manager.io",
148 version="v1",
149 plural="certificates",
150 ).get("items", [])
151 assert certs, "No certificates found in the cluster"
152
153 not_ready = []
154 for cert in certs:
155 name = cert["metadata"]["name"]
156 namespace = cert["metadata"]["namespace"]
157 conditions = cert.get("status", {}).get("conditions", [])
158 ready_condition = next((c for c in conditions if c["type"] == "Ready"),
159 None)
160
161 if not ready_condition or ready_condition.get("status") != "True":
162 not_ready.append(f"{namespace}/{name}")
163
164 assert not not_ready, f"Some certificates are not Ready: {not_ready}"
165
166
167@pytest.mark.sanity
168def test_cluster_summaries_features_are_provisioned(kcm_manager):
169 cluster_summaries = kcm_manager.api.api_custom.list_cluster_custom_object(
170 group="config.projectsveltos.io",
171 version="v1beta1",
172 plural="clustersummaries",
173 ).get("items", [])
174
175 not_provisioned = []
176 for summary in cluster_summaries:
177 name = summary["metadata"]["name"]
178 namespace = summary["metadata"].get("namespace", "")
179 summaries = summary.get("status", {}).get("featureSummaries", [])
180
181 for feature in summaries:
182 feature_id = feature.get("featureID")
183 status = feature.get("status")
184 if status != "Provisioned":
185 not_provisioned.append(
186 f"{namespace}/{name} → {feature_id}: {status}")
187
188 assert not not_provisioned, (
189 "Some ClusterSummaries have non-Provisioned features:\n"
190 + "\n".join(not_provisioned)
191 )
192
193
Ievgeniia Zadorozhna86ab37a2025-09-11 12:58:49 +0200194@pytest.mark.sanity_targeted
Ievgeniia Zadorozhna7dabab82025-10-24 18:37:18 +0200195def test_target_child_cluster_is_ready(kcm_manager):
Ievgeniia Zadorozhna86ab37a2025-09-11 12:58:49 +0200196 ns = kcm_manager.get_namespace(settings.TARGET_NAMESPACE)
197 cld = ns.get_cluster_deployment(settings.TARGET_CLD)
198 if not check_cluster_deployment_exists(
199 kcm_manager, settings.TARGET_NAMESPACE, settings.TARGET_CLD):
200 pytest.skip(f"Target cluster deployment '{cld.name}' is not found in "
201 f"namespace '{settings.TARGET_NAMESPACE}'. Please check "
202 f"TARGET_NAMESPACE and TARGET_CLD env vars.")
203 cld.check.check_cluster_readiness(timeout=600)
204 cld.check.check_k8s_pods()
205 cld.check.check_k8s_nodes()
206
207
208@pytest.mark.sanity
Ievgeniia Zadorozhna7dabab82025-10-24 18:37:18 +0200209def test_child_clusters_are_ready(kcm_manager, subtests):
Ievgeniia Zadorozhna86ab37a2025-09-11 12:58:49 +0200210 ready_clds = get_ready_cluster_deployments(kcm_manager)
211 assert ready_clds, "No ready child clusters found"
212
213 for namespace, cld_name in ready_clds:
214 label = f"{cld_name} ({namespace})"
215 with subtests.test(cluster=label):
216 ns = kcm_manager.get_namespace(namespace)
217 cld = ns.get_cluster_deployment(cld_name)
218
219 cld.check.check_cluster_readiness(timeout=600)
220 cld.check.check_k8s_pods()
221 cld.check.check_k8s_nodes()