cfg-checker bench part 1
- first single test debug portion
- updated fio option extraction
- updated date interaction
- fixed agent error showing and handling
Related-PROD: PROD-36669
Change-Id: I7c1014c01b5b84429f112bff8db5ad34944c4644
diff --git a/cfg_checker/common/kube_utils.py b/cfg_checker/common/kube_utils.py
index 95bb19c..0ce573a 100644
--- a/cfg_checker/common/kube_utils.py
+++ b/cfg_checker/common/kube_utils.py
@@ -6,13 +6,15 @@
import urllib3
import yaml
-from kubernetes import client as kclient, config as kconfig
+from kubernetes import client as kclient, config as kconfig, watch
from kubernetes.stream import stream
from kubernetes.client.rest import ApiException
+from time import time, sleep
from cfg_checker.common import logger, logger_cli
from cfg_checker.common.decorators import retry
-from cfg_checker.common.exception import InvalidReturnException, KubeException
+from cfg_checker.common.exception import CheckerException, \
+ InvalidReturnException, KubeException
from cfg_checker.common.file_utils import create_temp_file_with_content
from cfg_checker.common.other import utils, shell
from cfg_checker.common.ssh_utils import ssh_shell_p
@@ -262,6 +264,27 @@
return None
+ def wait_for(self, _func, phase, *args, **kwargs):
+ w = watch.Watch()
+ start_time = time()
+ for event in w.stream(_func, *args, **kwargs):
+ if event["object"].status.phase == phase:
+ w.stop()
+ end_time = time()
+ logger_cli.debug(
+ "... bacame '{}' in {:0.2f} sec".format(
+ phase,
+ end_time-start_time
+ )
+ )
+ return
+ # event.type: ADDED, MODIFIED, DELETED
+ if event["type"] == "DELETED":
+ # Pod was deleted while we were waiting for it to start.
+ logger_cli.debug("... deleted before started")
+ w.stop()
+ return
+
def get_node_info(self, http=False):
# Query API for the nodes and do some presorting
_nodes = {}
@@ -644,3 +667,250 @@
version=version,
plural=plural
)
+
+ def init_pvc_resource(
+ self,
+ name,
+ storage_class,
+ size,
+ ns="qa-space",
+ mode="ReadWriteOnce"
+ ):
+ """Return the Kubernetes PVC resource"""
+ return kclient.V1PersistentVolumeClaim(
+ api_version='v1',
+ kind='PersistentVolumeClaim',
+ metadata=kclient.V1ObjectMeta(
+ name=name,
+ namespace=ns,
+ labels={"name": name}
+ ),
+ spec=kclient.V1PersistentVolumeClaimSpec(
+ storage_class_name=storage_class,
+ access_modes=[mode],
+ resources=kclient.V1ResourceRequirements(
+ requests={'storage': size}
+ )
+ )
+ )
+
+ def init_pv_resource(
+ self,
+ name,
+ storage_class,
+ size,
+ path,
+ ns="qa-space",
+ mode="ReadWriteOnce"
+ ):
+ """Return the Kubernetes PVC resource"""
+ return kclient.V1PersistentVolume(
+ api_version='v1',
+ kind='PersistentVolume',
+ metadata=kclient.V1ObjectMeta(
+ name=name,
+ namespace=ns,
+ labels={"name": name}
+ ),
+ spec=kclient.V1PersistentVolumeSpec(
+ storage_class_name=storage_class,
+ access_modes=[mode],
+ capacity={'storage': size},
+ host_path=kclient.V1HostPathVolumeSource(path=path)
+ )
+ )
+
+ def init_service(
+ self,
+ name,
+ port,
+ clusterip=None,
+ ns="qa-space"
+ ):
+ """ Inits a V1Service object with data for benchmark agent"""
+ _meta = kclient.V1ObjectMeta(
+ name=name,
+ namespace=ns,
+ labels={"name": name}
+ )
+ _port = kclient.V1ServicePort(
+ port=port,
+ protocol="TCP",
+ target_port=port
+ )
+ _spec = kclient.V1ServiceSpec(
+ # cluster_ip=clusterip,
+ selector={"name": name},
+ # type="ClusterIP",
+ ports=[_port]
+ )
+ return kclient.V1Service(
+ api_version="v1",
+ kind="Service",
+ metadata=_meta,
+ spec=_spec
+ )
+
+ def prepare_pv(self, pv_object):
+ def _list_pv():
+ return self.CoreV1.list_persistent_volume(
+ label_selector='name={}'.format(pv_object.metadata.name)
+ )
+ _existing = self.safe_get_item_by_name(
+ _list_pv(),
+ pv_object.metadata.name
+ )
+ if _existing is not None:
+ self.CoreV1.replace_persistent_volume(
+ pv_object.metadata.name,
+ pv_object
+ )
+ else:
+ self.CoreV1.create_persistent_volume(pv_object)
+
+ _timeout = 60
+ while _timeout > 0:
+ _t = _list_pv()
+ for item in _t.items:
+ if item.status.phase in ["Available", "Bound"]:
+ return item
+ sleep(1)
+ _timeout -= 1
+ raise CheckerException(
+ "Timed out creating PV '{}'".format(
+ pv_object.metadata.name
+ )
+ )
+
+ def prepare_pvc(self, pvc_object):
+ def _list_pvc():
+ return self.CoreV1.list_namespaced_persistent_volume_claim(
+ pvc_object.metadata.namespace,
+ label_selector='name={}'.format(pvc_object.metadata.name)
+ )
+ _existing = self.safe_get_item_by_name(
+ _list_pvc(),
+ pvc_object.metadata.name
+ )
+ if _existing is not None:
+ _size_r = pvc_object.spec.resources.requests["storage"]
+ _size_e = _existing.spec.resources.requests["storage"]
+ logger_cli.warn(
+ "WARNING: Found PVC '{}/{}' with {}. Requested: {}'".format(
+ pvc_object.metadata.namespace,
+ pvc_object.metadata.name,
+ _size_e,
+ _size_r
+ )
+ )
+ if _size_r != _size_e:
+ raise CheckerException(
+ "ERROR: PVC exists on the cloud with different size "
+ "than needed. Please cleanup!"
+ )
+ else:
+ logger_cli.debug(
+ "... creating pvc '{}'".format(pvc_object.metadata.name)
+ )
+ self.CoreV1.create_namespaced_persistent_volume_claim(
+ pvc_object.metadata.namespace,
+ pvc_object
+ )
+
+ _timeout = 60
+ while _timeout > 0:
+ _t = _list_pvc()
+ for item in _t.items:
+ if item.status.phase in ["Available", "Bound"]:
+ return item
+ else:
+ logger_cli.debug(
+ "... pvcstatus: '{}', {} sec left".format(
+ item.status.phase,
+ _timeout
+ )
+ )
+ sleep(5)
+ _timeout -= 5
+ raise CheckerException(
+ "Timed out creating PVC '{}'".format(
+ pvc_object.metadata.name
+ )
+ )
+
+ def prepare_pod_from_yaml(self, pod_yaml):
+ def _list_pod():
+ return self.CoreV1.list_namespaced_pod(
+ pod_yaml['metadata']['namespace'],
+ label_selector='name={}'.format(pod_yaml['metadata']['name'])
+ )
+ _existing = self.safe_get_item_by_name(
+ _list_pod(),
+ pod_yaml['metadata']['name']
+ )
+ if _existing is not None:
+ logger_cli.warn(
+ "WARNING: Found pod '{}/{}'. Reusing.".format(
+ pod_yaml['metadata']['namespace'],
+ pod_yaml['metadata']['name']
+ )
+ )
+ return _existing
+ else:
+ self.CoreV1.create_namespaced_pod(
+ pod_yaml['metadata']['namespace'],
+ pod_yaml
+ )
+ _timeout = 120
+ logger_cli.debug("... waiting '{}'s for pod to start".format(_timeout))
+ while _timeout > 0:
+ _t = _list_pod()
+ for item in _t.items:
+ logger_cli.debug("... pod is '{}'".format(item.status.phase))
+ if item.status.phase in ["Running"]:
+ return item
+ sleep(2)
+ _timeout -= 2
+ raise CheckerException(
+ "Timed out creating pod '{}'".format(
+ pod_yaml['metadata']['name']
+ )
+ )
+
+ def expose_pod_port(self, pod_object, port, ns="qa-space"):
+ def _list_svc():
+ return self.CoreV1.list_namespaced_service(
+ pod_object.metadata.namespace,
+ label_selector='name={}'.format(pod_object.metadata.name)
+ )
+ _existing = self.safe_get_item_by_name(
+ _list_svc(),
+ pod_object.metadata.name
+ )
+ if _existing is not None:
+ # TODO: Check port number?
+ logger_cli.warn(
+ "WARNING: Pod already exposed '{}/{}:{}'. Reusing.".format(
+ pod_object.metadata.namespace,
+ pod_object.metadata.name,
+ port
+ )
+ )
+ return _existing
+ else:
+ logger_cli.debug(
+ "... creating service for pod {}/{}: {}:{}".format(
+ pod_object.metadata.namespace,
+ pod_object.metadata.name,
+ pod_object.status.pod_ip,
+ port
+ )
+ )
+ _svc = self.init_service(
+ pod_object.metadata.name,
+ port
+ )
+ return self.CoreV1.create_namespaced_service(
+ pod_object.metadata.namespace,
+ _svc
+ )