cfg-checker benchmark module part 2
- fixes for fio-runner error handling
- fixes for web-server error handling
- proper handling of 'scheduled_to' option
- cleanup procedure
- kube can wait for specific phases of svc, pod, pvc, pv
Change-Id: I9b241597e6314fed1dbc3aba5e8dee1637eea1c7
diff --git a/cfg_checker/agent/fio_runner.py b/cfg_checker/agent/fio_runner.py
index 69ec661..50afeca 100644
--- a/cfg_checker/agent/fio_runner.py
+++ b/cfg_checker/agent/fio_runner.py
@@ -270,7 +270,7 @@
self._fio_options_common[k] = v
else:
raise CheckerException(
- "Error running fio: '{}'".format(self._shell_output)
+ "Unknown option: '{}': '{}'".format(k, v)
)
# recalc
self.recalc_times()
@@ -319,11 +319,12 @@
_line = _bb
if _start < 0 and _end < 0 and not _line.startswith("{"):
_time = get_time()
- self.result[_time] = {
+ self.results[_time] = {
"error": _line
}
self.eta = -1
self.fiorun.kill_shell()
+ self.finished = True
return
_current = _line.splitlines()
_raw += _current
@@ -372,6 +373,7 @@
_ioengines = self._shell_output
_ioengines = _ioengines.replace("\t", "")
_ioengines = _ioengines.splitlines()[1:]
+ self._shell_output = ""
else:
_ioengines = []
@@ -488,6 +490,11 @@
# Reset thread if it closed
self.fio_reset()
# Fill options
+ if "scheduled_to" in options:
+ # just ignore it
+ _k = "scheduled_to"
+ _v = options.pop(_k)
+ logger.warning("Ignoring option: '{}': '{}'".format(_k, _v))
self.fio.update_options(options)
# Start it
self.fio.start()
@@ -499,7 +506,7 @@
# Handle scheduled time
if "scheduled_to" not in options:
# required parameter not set
- return False
+ raise CheckerException("Parameter missing: 'scheduled_to'")
else:
# set time and get rid of it from options
_time = options.pop("scheduled_to")
diff --git a/cfg_checker/agent/webserver.py b/cfg_checker/agent/webserver.py
index 67a4567..eacce8f 100644
--- a/cfg_checker/agent/webserver.py
+++ b/cfg_checker/agent/webserver.py
@@ -8,7 +8,9 @@
from copy import deepcopy # noqa E402
from platform import system, release, node # noqa E402
+from cfg_checker.common.log import logger # noqa E402
from cfg_checker.common.settings import pkg_dir # noqa E402
+from cfg_checker.common.exception import CheckerException # noqa E402
from cfg_checker.helpers.falcon_jinja2 import FalconTemplate # noqa E402
from .fio_runner import FioProcessShellRun, get_time # noqa E402
@@ -91,19 +93,28 @@
resp.text = json.dumps(_status)
def on_post(self, req, resp):
- def _resp(resp, code, msg):
+ def _resp(resp, code, msg, opt={}):
resp.status = code
resp.content_type = "application/json"
- resp.text = json.dumps({"error": msg})
+ resp.text = json.dumps({"error": msg, "options": opt})
# Handle actions
- _m = req.get_media(default_when_empty={})
+ logger.info("Getting media")
+ try:
+ _m = req.get_media(default_when_empty={})
+ except falcon.MediaMalformedError:
+ _msg = "Incorrect input data"
+ logger.error(_msg)
+ _resp(resp, falcon.HTTP_400, _msg)
+ return
if _m:
+ logger.debug("got media object:\n{}".format(json.dumps(_m)))
# Validate action structure
_module = _m.get('module', "")
_action = _m.get('action', "")
_options = _m.get('options', {})
if not _module or _module not in list(_modules.keys()):
+ logger.error("invalid module '{}'".format(_module))
_resp(
resp,
falcon.HTTP_400,
@@ -111,6 +122,7 @@
)
return
elif not _action or _action not in _modules[_module]['actions']:
+ logger.error("invalid action '{}'".format(_action))
_resp(
resp,
falcon.HTTP_400,
@@ -120,52 +132,68 @@
else:
# Handle command
_a = _fio.actions[_action]
- if _action == 'get_options':
- resp.status = falcon.HTTP_200
- resp.content_type = "application/json"
- resp.text = json.dumps({"options": _a()})
- elif _action == 'get_resultlist':
- resp.status = falcon.HTTP_200
- resp.content_type = "application/json"
- resp.text = json.dumps({"resultlist": _a()})
- elif _action == 'get_result':
- if 'time' not in _options:
- _resp(
- resp,
- falcon.HTTP_400,
- "No 'time' found for '{}'".format(_action)
- )
- return
- _time = _options['time']
- resp.status = falcon.HTTP_200
- resp.content_type = "application/json"
- resp.text = json.dumps({_time: _a(_time)})
- elif _action == 'do_singlerun':
- _a(_options)
- resp.status = falcon.HTTP_200
- resp.content_type = "application/json"
- resp.text = json.dumps({"ok": True})
- elif _action == 'do_scheduledrun':
- # prepare scheduled run
+ try:
+ if _action == 'get_options':
+ logger.info("returning options")
+ resp.status = falcon.HTTP_200
+ resp.content_type = "application/json"
+ resp.text = json.dumps({"options": _a()})
+ elif _action == 'get_resultlist':
+ logger.info("getting results")
+ resp.status = falcon.HTTP_200
+ resp.content_type = "application/json"
+ resp.text = json.dumps({"resultlist": _a()})
+ elif _action == 'get_result':
+ if 'time' not in _options:
+ _msg = "No 'time' found for '{}'".format(_action)
+ logger.error(_msg)
+ _resp(
+ resp,
+ falcon.HTTP_400,
+ _msg
+ )
+ return
+ _time = _options['time']
+ logger.info("getting results for '{}'".format(_time))
+ resp.status = falcon.HTTP_200
+ resp.content_type = "application/json"
+ resp.text = json.dumps({_time: _a(_time)})
+ elif _action == 'do_singlerun':
+ logger.info("executing single run")
+ _a(_options)
+ resp.status = falcon.HTTP_200
+ resp.content_type = "application/json"
+ resp.text = json.dumps({"ok": True})
+ elif _action == 'do_scheduledrun':
+ logger.info("executing scheduled run")
+ # prepare scheduled run
- # Run it
- _a(_options)
- resp.status = falcon.HTTP_200
- resp.content_type = "application/json"
- resp.text = json.dumps({"ok": True})
- else:
- _resp(
- resp,
- falcon.HTTP_500,
- "Unknown error happened for '{}/{}/{}'".format(
- _module,
- _action,
- _options
- )
- )
+ # Run it
+ _a(_options)
+ resp.status = falcon.HTTP_200
+ resp.content_type = "application/json"
+ resp.text = json.dumps({"ok": True})
+ else:
+ _msg = "Unknown error happened for '{}/{}/{}'".format(
+ _module,
+ _action,
+ _options
+ )
+ logger.error(_msg)
+ _resp(resp, falcon.HTTP_500, _msg)
+ except CheckerException as e:
+ _msg = "Error for '{}/{}':\n{}".format(
+ _module,
+ _action,
+ e
+ )
+ logger.error(_msg)
+ _resp(resp, falcon.HTTP_500, _msg, opt=_options)
return
else:
- _resp(resp, falcon.HTTP_400, "Empty request body")
+ _msg = "Empty request body"
+ logger.error(_msg)
+ _resp(resp, falcon.HTTP_400, _msg)
class Index:
diff --git a/cfg_checker/common/kube_utils.py b/cfg_checker/common/kube_utils.py
index 0ce573a..af22aa3 100644
--- a/cfg_checker/common/kube_utils.py
+++ b/cfg_checker/common/kube_utils.py
@@ -264,7 +264,7 @@
return None
- def wait_for(self, _func, phase, *args, **kwargs):
+ def wait_for_phase_on_start(self, _func, phase, *args, **kwargs):
w = watch.Watch()
start_time = time()
for event in w.stream(_func, *args, **kwargs):
@@ -285,6 +285,16 @@
w.stop()
return
+ def wait_for_event(self, _func, event, *args, **kwargs):
+ w = watch.Watch()
+ for event in w.stream(_func, *args, **kwargs):
+ # event.type: ADDED, MODIFIED, DELETED
+ if event["type"] == event:
+ # Pod was deleted while we were waiting for it to start.
+ logger_cli.debug("... got {} event".format(event["type"]))
+ w.stop()
+ return
+
def get_node_info(self, http=False):
# Query API for the nodes and do some presorting
_nodes = {}
@@ -752,14 +762,7 @@
)
def prepare_pv(self, pv_object):
- def _list_pv():
- return self.CoreV1.list_persistent_volume(
- label_selector='name={}'.format(pv_object.metadata.name)
- )
- _existing = self.safe_get_item_by_name(
- _list_pv(),
- pv_object.metadata.name
- )
+ _existing = self.get_pv_by_name(pv_object.metadata.name)
if _existing is not None:
self.CoreV1.replace_persistent_volume(
pv_object.metadata.name,
@@ -768,35 +771,23 @@
else:
self.CoreV1.create_persistent_volume(pv_object)
- _timeout = 60
- while _timeout > 0:
- _t = _list_pv()
- for item in _t.items:
- if item.status.phase in ["Available", "Bound"]:
- return item
- sleep(1)
- _timeout -= 1
- raise CheckerException(
- "Timed out creating PV '{}'".format(
- pv_object.metadata.name
- )
+ return self.wait_for_phase(
+ "pv",
+ pv_object.metadata.name,
+ None,
+ ["Available", "Bound"]
)
def prepare_pvc(self, pvc_object):
- def _list_pvc():
- return self.CoreV1.list_namespaced_persistent_volume_claim(
- pvc_object.metadata.namespace,
- label_selector='name={}'.format(pvc_object.metadata.name)
- )
- _existing = self.safe_get_item_by_name(
- _list_pvc(),
- pvc_object.metadata.name
+ _existing = self.get_pvc_by_name_and_ns(
+ pvc_object.metadata.name,
+ pvc_object.metadata.namespace
)
if _existing is not None:
_size_r = pvc_object.spec.resources.requests["storage"]
_size_e = _existing.spec.resources.requests["storage"]
- logger_cli.warn(
- "WARNING: Found PVC '{}/{}' with {}. Requested: {}'".format(
+ logger_cli.info(
+ "-> Found PVC '{}/{}' with {}. Requested: {}'".format(
pvc_object.metadata.namespace,
pvc_object.metadata.name,
_size_e,
@@ -817,36 +808,89 @@
pvc_object
)
- _timeout = 60
- while _timeout > 0:
- _t = _list_pvc()
- for item in _t.items:
- if item.status.phase in ["Available", "Bound"]:
- return item
+ return self.wait_for_phase(
+ "pvc",
+ pvc_object.metadata.name,
+ pvc_object.metadata.namespace,
+ ["Available", "Bound"]
+ )
+
+ def get_pod_by_name_and_ns(self, name, ns):
+ return self.safe_get_item_by_name(
+ self.CoreV1.list_namespaced_pod(
+ ns,
+ label_selector='name={}'.format(name)
+ ),
+ name
+ )
+
+ def get_svc_by_name_and_ns(self, name, ns):
+ return self.safe_get_item_by_name(
+ self.CoreV1.list_namespaced_service(
+ ns,
+ label_selector='name={}'.format(name)
+ ),
+ name
+ )
+
+ def get_pvc_by_name_and_ns(self, name, ns):
+ return self.safe_get_item_by_name(
+ self.CoreV1.list_namespaced_persistent_volume_claim(
+ ns,
+ label_selector='name={}'.format(name)
+ ),
+ name
+ )
+
+ def get_pv_by_name(self, name):
+ return self.safe_get_item_by_name(
+ self.CoreV1.list_persistent_volume(
+ label_selector='name={}'.format(name)
+ ),
+ name
+ )
+
+ def wait_for_phase(self, ttype, name, ns, phase_list, timeout=120):
+ logger_cli.debug(
+ "... waiting '{}'s until {} is '{}'".format(
+ timeout,
+ ttype,
+ ", ".join(phase_list)
+ )
+ )
+ while timeout > 0:
+ if ttype == "pod":
+ _t = self.get_pod_by_name_and_ns(name, ns)
+ elif ttype == "svc":
+ _t = self.get_svc_by_name_and_ns(name, ns)
+ elif ttype == "pvc":
+ _t = self.get_pvc_by_name_and_ns(name, ns)
+ elif ttype == "pv":
+ _t = self.get_pv_by_name(name)
+ if "Terminated" in phase_list and not _t:
+ if ns:
+ _s = "... {} {}/{} not found".format(ttype, ns, name)
else:
- logger_cli.debug(
- "... pvcstatus: '{}', {} sec left".format(
- item.status.phase,
- _timeout
- )
- )
- sleep(5)
- _timeout -= 5
+ _s = "... {} '{}' not found".format(ttype, name)
+ logger_cli.debug(_s)
+ return None
+ logger_cli.debug("... {} is '{}'".format(ttype, _t.status.phase))
+ if _t.status.phase in phase_list:
+ return _t
+ sleep(2)
+ timeout -= 2
raise CheckerException(
- "Timed out creating PVC '{}'".format(
- pvc_object.metadata.name
+ "Timed out waiting for {} '{}' in '{}'".format(
+ ttype,
+ name,
+ ", ".join(ttype)
)
)
def prepare_pod_from_yaml(self, pod_yaml):
- def _list_pod():
- return self.CoreV1.list_namespaced_pod(
- pod_yaml['metadata']['namespace'],
- label_selector='name={}'.format(pod_yaml['metadata']['name'])
- )
- _existing = self.safe_get_item_by_name(
- _list_pod(),
- pod_yaml['metadata']['name']
+ _existing = self.get_pod_by_name_and_ns(
+ pod_yaml['metadata']['name'],
+ pod_yaml['metadata']['namespace']
)
if _existing is not None:
logger_cli.warn(
@@ -861,36 +905,22 @@
pod_yaml['metadata']['namespace'],
pod_yaml
)
- _timeout = 120
- logger_cli.debug("... waiting '{}'s for pod to start".format(_timeout))
- while _timeout > 0:
- _t = _list_pod()
- for item in _t.items:
- logger_cli.debug("... pod is '{}'".format(item.status.phase))
- if item.status.phase in ["Running"]:
- return item
- sleep(2)
- _timeout -= 2
- raise CheckerException(
- "Timed out creating pod '{}'".format(
- pod_yaml['metadata']['name']
- )
+ return self.wait_for_phase(
+ "pod",
+ pod_yaml['metadata']['name'],
+ pod_yaml['metadata']['namespace'],
+ ["Running"]
)
def expose_pod_port(self, pod_object, port, ns="qa-space"):
- def _list_svc():
- return self.CoreV1.list_namespaced_service(
- pod_object.metadata.namespace,
- label_selector='name={}'.format(pod_object.metadata.name)
- )
- _existing = self.safe_get_item_by_name(
- _list_svc(),
- pod_object.metadata.name
+ _existing = self.get_svc_by_name_and_ns(
+ pod_object.metadata.name,
+ pod_object.metadata.namespace
)
if _existing is not None:
# TODO: Check port number?
- logger_cli.warn(
- "WARNING: Pod already exposed '{}/{}:{}'. Reusing.".format(
+ logger_cli.info(
+ "-> Pod already exposed '{}/{}:{}'. Reusing.".format(
pod_object.metadata.namespace,
pod_object.metadata.name,
port
diff --git a/cfg_checker/modules/ceph/__init__.py b/cfg_checker/modules/ceph/__init__.py
index 9d0f923..d3f9581 100644
--- a/cfg_checker/modules/ceph/__init__.py
+++ b/cfg_checker/modules/ceph/__init__.py
@@ -92,6 +92,11 @@
metavar='task-file',
help="Task file for benchmark"
)
+ ceph_bench_parser.add_argument(
+ '--no-cleanup',
+ action="store_true", default=False,
+ help="Do not cleanup services, agents, pvc, and pv"
+ )
return _parser
@@ -149,9 +154,9 @@
args_utils.check_supported_env(ENV_TYPE_KUBE, args, config)
_filename = args_utils.get_arg(args, 'html')
# agents count option
- _agents = args_utils.get_arg(args, "agents")
- logger_cli.info("-> using {} agents".format(_agents))
- config.bench_agent_count = _agents
+ config.bench_agent_count = args_utils.get_arg(args, "agents")
+ logger_cli.info("-> using {} agents".format(config.bench_agent_count))
+ config.no_cleaning_after_benchmark = args_utils.get_arg(args, "no_cleanup")
# storage class
_storage_class = args_utils.get_arg(args, "storage_class")
logger_cli.info("-> using storage class of '{}'".format(_storage_class))
@@ -179,8 +184,10 @@
# Do the testrun
ceph_bench.prepare_agents(_opts)
if not ceph_bench.run_benchmark(_opts):
+ # No cleaning and/or report if benchmark was not finished
return
- ceph_bench.cleanup()
+ if not config.no_cleaning_after_benchmark:
+ ceph_bench.cleanup()
# Create report
ceph_bench.create_report(_filename)
diff --git a/cfg_checker/modules/ceph/bench.py b/cfg_checker/modules/ceph/bench.py
index 190076a..dd96a69 100644
--- a/cfg_checker/modules/ceph/bench.py
+++ b/cfg_checker/modules/ceph/bench.py
@@ -47,6 +47,8 @@
if config.bench_mode == "tasks":
self.load_tasks(config.bench_task_file)
+ self.cleanup_list = []
+
def load_tasks(self, taskfile):
# Load csv file
logger_cli.info("-> Loading taskfile '{}'".format(taskfile))
@@ -63,6 +65,11 @@
"size": row[4]
})
+ def add_for_deletion(self, obj, typ):
+ _d = [typ, obj.metadata.namespace, obj.metadata.name]
+ self.cleanup_list.append(_d)
+ return
+
def prepare_agents(self, options):
logger_cli.info("# Preparing {} agents".format(self.agent_count))
for idx in range(self.agent_count):
@@ -75,10 +82,15 @@
options['size'] + 'i',
self._agent_template
)
- # save it to list
+ # save it to lists
self.agent_pods.append(_agent)
+ self.add_for_deletion(_pv, "pv")
+ self.add_for_deletion(_pvc, "pvc")
+ self.add_for_deletion(_agent, "pod")
+
# expose it
_svc = self.master.expose_benchmark_agent(_agent)
+ self.add_for_deletion(_svc, "svc")
# Save service
self.services.append(_svc)
@@ -203,7 +215,9 @@
_runtime = _get_seconds(options["runtime"])
_ramptime = _get_seconds(options["ramp_time"])
_timeout = _runtime + _ramptime + 5
- while _timeout > 0:
+ _end = datetime.now() + timedelta(seconds=_timeout)
+ while True:
+ # Print status
_sts = get_status()
_str = ""
for _s in _sts:
@@ -212,17 +226,29 @@
_s["status"],
_s["progress"]
)
- logger_cli.debug("... {}".format(_str))
- sleep(1)
- _timeout -= 1
+ # recalc how much is left
+ diff = (_end - datetime.now()).total_seconds()
+ logger_cli.debug("... [{}s]: {}".format(diff, _str))
+ # In case end_datetime was in past to begin with
+ if diff < 0:
+ break
+ logger_cli.info("-> Sleeping for {}s".format(diff/2))
+ sleep(diff/2)
+ if diff <= 0.1:
+ break
+ logger_cli.info("-> Done")
return True
def cleanup(self):
+ self.cleanup_list.reverse()
+ for _res in self.cleanup_list:
+ self.master.cleanup_resource_by_name(_res[0], _res[2], ns=_res[1])
+ logger_cli.info("# Done cleaning up")
return
# Create report
- def create_report(self):
+ def create_report(self, filename):
return
diff --git a/cfg_checker/nodes.py b/cfg_checker/nodes.py
index 9570978..c1b3d4c 100644
--- a/cfg_checker/nodes.py
+++ b/cfg_checker/nodes.py
@@ -1308,3 +1308,52 @@
def expose_benchmark_agent(self, agent):
return self.kube.expose_pod_port(agent, 8765)
+
+ def cleanup_resource_by_name(self, res_type, name, ns=None, wait=False):
+ """Cleansup resource using string res_type and the ns/name
+
+ Args:
+ res_type (string): resource type name: pod, pv, pvc, svc
+ name (string): resource name to cleanup
+ ns (string, optional): Namespace to use. Default is 'qa-space'
+
+ return: (Bool) Is Success?
+ """
+ # fill defaults
+ if not ns:
+ ns = self._namespace
+ # Handle res_type errors and choose resource type
+ if not res_type:
+ logger_cli.debug(
+ "... resource type invalid: '{}'".format(res_type)
+ )
+ return False
+ elif not name:
+ logger_cli.debug("... resource name invalid: '{}'".format(name))
+ return False
+ elif res_type == "svc":
+ # Delete service
+ logger_cli.info("-> deleting svc {}/{}".format(ns, name))
+ self.kube.CoreV1.delete_namespaced_service(name, ns)
+ # TODO: Check if successfull
+ elif res_type == "pod":
+ # Delete a pod
+ logger_cli.info("-> deleting pod {}/{}".format(ns, name))
+ self.kube.CoreV1.delete_namespaced_pod(name, ns)
+ if wait:
+ self.kube.wait_for_phase(res_type, name, ns, ["Terminated"])
+ elif res_type == "pvc":
+ logger_cli.info("-> deleting pvc {}/{}".format(ns, name))
+ self.kube.CoreV1.delete_namespaced_persistent_volume_claim(
+ name,
+ ns
+ )
+ if wait:
+ self.kube.wait_for_phase(res_type, name, ns, ["Terminated"])
+ elif res_type == "pv":
+ logger_cli.info("-> deleting pv {}/{}".format(ns, name))
+ self.kube.CoreV1.delete_persistent_volume(name)
+ if wait:
+ self.kube.wait_for_phase(res_type, name, None, ["Terminated"])
+
+ return True
diff --git a/setup.py b/setup.py
index 4ae65c7..5c1e6b0 100644
--- a/setup.py
+++ b/setup.py
@@ -38,7 +38,7 @@
setup(
name="mcp-checker",
- version="0.61",
+ version="0.62",
author="Alex Savatieiev",
author_email="osavatieiev@mirantis.com",
classifiers=[
diff --git a/templates/cfgagent-template.yaml b/templates/cfgagent-template.yaml
index bc64af8..c6b305e 100644
--- a/templates/cfgagent-template.yaml
+++ b/templates/cfgagent-template.yaml
@@ -11,7 +11,7 @@
- checker-agent
imagePullPolicy: IfNotPresent
name: cfgagent-pod
- image: savex13/cfg-checker-agent:0.6
+ image: savex13/cfg-checker-agent:0.61
volumeMounts:
- mountPath: /cephvol
name: cfgagent-pv-placeholder