cfg-checker benchmark module part 2

 - fixes for fio-runner error handling
 - fixes for web-server error handling
 - proper handling of 'scheduled_to' option
 - cleanup procedure
 - kube can wait for specific phases of svc, pod, pvc, pv

Change-Id: I9b241597e6314fed1dbc3aba5e8dee1637eea1c7
diff --git a/cfg_checker/agent/fio_runner.py b/cfg_checker/agent/fio_runner.py
index 69ec661..50afeca 100644
--- a/cfg_checker/agent/fio_runner.py
+++ b/cfg_checker/agent/fio_runner.py
@@ -270,7 +270,7 @@
                 self._fio_options_common[k] = v
             else:
                 raise CheckerException(
-                    "Error running fio: '{}'".format(self._shell_output)
+                    "Unknown option: '{}': '{}'".format(k, v)
                 )
         # recalc
         self.recalc_times()
@@ -319,11 +319,12 @@
                     _line = _bb
                 if _start < 0 and _end < 0 and not _line.startswith("{"):
                     _time = get_time()
-                    self.result[_time] = {
+                    self.results[_time] = {
                         "error": _line
                     }
                     self.eta = -1
                     self.fiorun.kill_shell()
+                    self.finished = True
                     return
                 _current = _line.splitlines()
                 _raw += _current
@@ -372,6 +373,7 @@
             _ioengines = self._shell_output
             _ioengines = _ioengines.replace("\t", "")
             _ioengines = _ioengines.splitlines()[1:]
+            self._shell_output = ""
         else:
             _ioengines = []
 
@@ -488,6 +490,11 @@
         # Reset thread if it closed
         self.fio_reset()
         # Fill options
+        if "scheduled_to" in options:
+            # just ignore it
+            _k = "scheduled_to"
+            _v = options.pop(_k)
+            logger.warning("Ignoring option: '{}': '{}'".format(_k, _v))
         self.fio.update_options(options)
         # Start it
         self.fio.start()
@@ -499,7 +506,7 @@
         # Handle scheduled time
         if "scheduled_to" not in options:
             # required parameter not set
-            return False
+            raise CheckerException("Parameter missing: 'scheduled_to'")
         else:
             # set time and get rid of it from options
             _time = options.pop("scheduled_to")
diff --git a/cfg_checker/agent/webserver.py b/cfg_checker/agent/webserver.py
index 67a4567..eacce8f 100644
--- a/cfg_checker/agent/webserver.py
+++ b/cfg_checker/agent/webserver.py
@@ -8,7 +8,9 @@
 from copy import deepcopy # noqa E402
 from platform import system, release, node # noqa E402
 
+from cfg_checker.common.log import logger # noqa E402
 from cfg_checker.common.settings import pkg_dir  # noqa E402
+from cfg_checker.common.exception import CheckerException  # noqa E402
 from cfg_checker.helpers.falcon_jinja2 import FalconTemplate  # noqa E402
 from .fio_runner import FioProcessShellRun, get_time # noqa E402
 
@@ -91,19 +93,28 @@
         resp.text = json.dumps(_status)
 
     def on_post(self, req, resp):
-        def _resp(resp, code, msg):
+        def _resp(resp, code, msg, opt={}):
             resp.status = code
             resp.content_type = "application/json"
-            resp.text = json.dumps({"error": msg})
+            resp.text = json.dumps({"error": msg, "options": opt})
         # Handle actions
-        _m = req.get_media(default_when_empty={})
+        logger.info("Getting media")
+        try:
+            _m = req.get_media(default_when_empty={})
+        except falcon.MediaMalformedError:
+            _msg = "Incorrect input data"
+            logger.error(_msg)
+            _resp(resp, falcon.HTTP_400, _msg)
+            return
         if _m:
+            logger.debug("got media object:\n{}".format(json.dumps(_m)))
             # Validate action structure
             _module = _m.get('module', "")
             _action = _m.get('action', "")
             _options = _m.get('options', {})
 
             if not _module or _module not in list(_modules.keys()):
+                logger.error("invalid module '{}'".format(_module))
                 _resp(
                     resp,
                     falcon.HTTP_400,
@@ -111,6 +122,7 @@
                 )
                 return
             elif not _action or _action not in _modules[_module]['actions']:
+                logger.error("invalid action '{}'".format(_action))
                 _resp(
                     resp,
                     falcon.HTTP_400,
@@ -120,52 +132,68 @@
             else:
                 # Handle command
                 _a = _fio.actions[_action]
-                if _action == 'get_options':
-                    resp.status = falcon.HTTP_200
-                    resp.content_type = "application/json"
-                    resp.text = json.dumps({"options": _a()})
-                elif _action == 'get_resultlist':
-                    resp.status = falcon.HTTP_200
-                    resp.content_type = "application/json"
-                    resp.text = json.dumps({"resultlist": _a()})
-                elif _action == 'get_result':
-                    if 'time' not in _options:
-                        _resp(
-                            resp,
-                            falcon.HTTP_400,
-                            "No 'time' found for '{}'".format(_action)
-                        )
-                        return
-                    _time = _options['time']
-                    resp.status = falcon.HTTP_200
-                    resp.content_type = "application/json"
-                    resp.text = json.dumps({_time: _a(_time)})
-                elif _action == 'do_singlerun':
-                    _a(_options)
-                    resp.status = falcon.HTTP_200
-                    resp.content_type = "application/json"
-                    resp.text = json.dumps({"ok": True})
-                elif _action == 'do_scheduledrun':
-                    # prepare scheduled run
+                try:
+                    if _action == 'get_options':
+                        logger.info("returning options")
+                        resp.status = falcon.HTTP_200
+                        resp.content_type = "application/json"
+                        resp.text = json.dumps({"options": _a()})
+                    elif _action == 'get_resultlist':
+                        logger.info("getting results")
+                        resp.status = falcon.HTTP_200
+                        resp.content_type = "application/json"
+                        resp.text = json.dumps({"resultlist": _a()})
+                    elif _action == 'get_result':
+                        if 'time' not in _options:
+                            _msg = "No 'time' found for '{}'".format(_action)
+                            logger.error(_msg)
+                            _resp(
+                                resp,
+                                falcon.HTTP_400,
+                                _msg
+                            )
+                            return
+                        _time = _options['time']
+                        logger.info("getting results for '{}'".format(_time))
+                        resp.status = falcon.HTTP_200
+                        resp.content_type = "application/json"
+                        resp.text = json.dumps({_time: _a(_time)})
+                    elif _action == 'do_singlerun':
+                        logger.info("executing single run")
+                        _a(_options)
+                        resp.status = falcon.HTTP_200
+                        resp.content_type = "application/json"
+                        resp.text = json.dumps({"ok": True})
+                    elif _action == 'do_scheduledrun':
+                        logger.info("executing scheduled run")
+                        # prepare scheduled run
 
-                    # Run it
-                    _a(_options)
-                    resp.status = falcon.HTTP_200
-                    resp.content_type = "application/json"
-                    resp.text = json.dumps({"ok": True})
-                else:
-                    _resp(
-                        resp,
-                        falcon.HTTP_500,
-                        "Unknown error happened for '{}/{}/{}'".format(
-                            _module,
-                            _action,
-                            _options
-                        )
-                    )
+                        # Run it
+                        _a(_options)
+                        resp.status = falcon.HTTP_200
+                        resp.content_type = "application/json"
+                        resp.text = json.dumps({"ok": True})
+                    else:
+                        _msg = "Unknown error happened for '{}/{}/{}'".format(
+                                _module,
+                                _action,
+                                _options
+                            )
+                        logger.error(_msg)
+                        _resp(resp, falcon.HTTP_500, _msg)
+                except CheckerException as e:
+                    _msg = "Error for '{}/{}':\n{}".format(
+                                _module,
+                                _action,
+                                e
+                            )
+                    logger.error(_msg)
+                    _resp(resp, falcon.HTTP_500, _msg, opt=_options)
                 return
         else:
-            _resp(resp, falcon.HTTP_400, "Empty request body")
+            _msg = "Empty request body"
+            logger.error(_msg)
+            _resp(resp, falcon.HTTP_400, _msg)
 
 
 class Index:
diff --git a/cfg_checker/common/kube_utils.py b/cfg_checker/common/kube_utils.py
index 0ce573a..af22aa3 100644
--- a/cfg_checker/common/kube_utils.py
+++ b/cfg_checker/common/kube_utils.py
@@ -264,7 +264,7 @@
 
         return None
 
-    def wait_for(self, _func, phase, *args, **kwargs):
+    def wait_for_phase_on_start(self, _func, phase, *args, **kwargs):
         w = watch.Watch()
         start_time = time()
         for event in w.stream(_func, *args, **kwargs):
@@ -285,6 +285,16 @@
                 w.stop()
                 return
 
+    def wait_for_event(self, _func, event, *args, **kwargs):
+        w = watch.Watch()
+        for event in w.stream(_func, *args, **kwargs):
+            # event.type: ADDED, MODIFIED, DELETED
+            if event["type"] == event:
+                # Pod was deleted while we were waiting for it to start.
+                logger_cli.debug("... got {} event".format(event["type"]))
+                w.stop()
+                return
+
     def get_node_info(self, http=False):
         # Query API for the nodes and do some presorting
         _nodes = {}
@@ -752,14 +762,7 @@
         )
 
     def prepare_pv(self, pv_object):
-        def _list_pv():
-            return self.CoreV1.list_persistent_volume(
-                label_selector='name={}'.format(pv_object.metadata.name)
-            )
-        _existing = self.safe_get_item_by_name(
-            _list_pv(),
-            pv_object.metadata.name
-        )
+        _existing = self.get_pv_by_name(pv_object.metadata.name)
         if _existing is not None:
             self.CoreV1.replace_persistent_volume(
                 pv_object.metadata.name,
@@ -768,35 +771,23 @@
         else:
             self.CoreV1.create_persistent_volume(pv_object)
 
-        _timeout = 60
-        while _timeout > 0:
-            _t = _list_pv()
-            for item in _t.items:
-                if item.status.phase in ["Available", "Bound"]:
-                    return item
-            sleep(1)
-            _timeout -= 1
-        raise CheckerException(
-            "Timed out creating PV '{}'".format(
-                pv_object.metadata.name
-            )
+        return self.wait_for_phase(
+            "pv",
+            pv_object.metadata.name,
+            None,
+            ["Available", "Bound"]
         )
 
     def prepare_pvc(self, pvc_object):
-        def _list_pvc():
-            return self.CoreV1.list_namespaced_persistent_volume_claim(
-                pvc_object.metadata.namespace,
-                label_selector='name={}'.format(pvc_object.metadata.name)
-            )
-        _existing = self.safe_get_item_by_name(
-            _list_pvc(),
-            pvc_object.metadata.name
+        _existing = self.get_pvc_by_name_and_ns(
+            pvc_object.metadata.name,
+            pvc_object.metadata.namespace
         )
         if _existing is not None:
             _size_r = pvc_object.spec.resources.requests["storage"]
             _size_e = _existing.spec.resources.requests["storage"]
-            logger_cli.warn(
-                "WARNING: Found PVC '{}/{}' with {}. Requested: {}'".format(
+            logger_cli.info(
+                "-> Found PVC '{}/{}' with {}. Requested: {}'".format(
                     pvc_object.metadata.namespace,
                     pvc_object.metadata.name,
                     _size_e,
@@ -817,36 +808,89 @@
                 pvc_object
             )
 
-        _timeout = 60
-        while _timeout > 0:
-            _t = _list_pvc()
-            for item in _t.items:
-                if item.status.phase in ["Available", "Bound"]:
-                    return item
+        return self.wait_for_phase(
+            "pvc",
+            pvc_object.metadata.name,
+            pvc_object.metadata.namespace,
+            ["Available", "Bound"]
+        )
+
+    def get_pod_by_name_and_ns(self, name, ns):
+        return self.safe_get_item_by_name(
+            self.CoreV1.list_namespaced_pod(
+                ns,
+                label_selector='name={}'.format(name)
+            ),
+            name
+        )
+
+    def get_svc_by_name_and_ns(self, name, ns):
+        return self.safe_get_item_by_name(
+            self.CoreV1.list_namespaced_service(
+                ns,
+                label_selector='name={}'.format(name)
+            ),
+            name
+        )
+
+    def get_pvc_by_name_and_ns(self, name, ns):
+        return self.safe_get_item_by_name(
+            self.CoreV1.list_namespaced_persistent_volume_claim(
+                ns,
+                label_selector='name={}'.format(name)
+            ),
+            name
+        )
+
+    def get_pv_by_name(self, name):
+        return self.safe_get_item_by_name(
+            self.CoreV1.list_persistent_volume(
+                label_selector='name={}'.format(name)
+            ),
+            name
+        )
+
+    def wait_for_phase(self, ttype, name, ns, phase_list, timeout=120):
+        logger_cli.debug(
+            "... waiting '{}'s until {} is '{}'".format(
+                timeout,
+                ttype,
+                ", ".join(phase_list)
+            )
+        )
+        while timeout > 0:
+            if ttype == "pod":
+                _t = self.get_pod_by_name_and_ns(name, ns)
+            elif ttype == "svc":
+                _t = self.get_svc_by_name_and_ns(name, ns)
+            elif ttype == "pvc":
+                _t = self.get_pvc_by_name_and_ns(name, ns)
+            elif ttype == "pv":
+                _t = self.get_pv_by_name(name)
+            if "Terminated" in phase_list and not _t:
+                if ns:
+                    _s = "... {} {}/{} not found".format(ttype, ns, name)
                 else:
-                    logger_cli.debug(
-                        "... pvcstatus: '{}', {} sec left".format(
-                            item.status.phase,
-                            _timeout
-                        )
-                    )
-            sleep(5)
-            _timeout -= 5
+                    _s = "... {} '{}' not found".format(ttype, name)
+                logger_cli.debug(_s)
+                return None
+            logger_cli.debug("... {} is '{}'".format(ttype, _t.status.phase))
+            if _t.status.phase in phase_list:
+                return _t
+            sleep(2)
+            timeout -= 2
         raise CheckerException(
-            "Timed out creating PVC '{}'".format(
-                pvc_object.metadata.name
+            "Timed out waiting for {} '{}' in '{}'".format(
+                ttype,
+                name,
+                ", ".join(ttype)
             )
         )
 
     def prepare_pod_from_yaml(self, pod_yaml):
-        def _list_pod():
-            return self.CoreV1.list_namespaced_pod(
-                pod_yaml['metadata']['namespace'],
-                label_selector='name={}'.format(pod_yaml['metadata']['name'])
-            )
-        _existing = self.safe_get_item_by_name(
-            _list_pod(),
-            pod_yaml['metadata']['name']
+        _existing = self.get_pod_by_name_and_ns(
+            pod_yaml['metadata']['name'],
+            pod_yaml['metadata']['namespace']
         )
         if _existing is not None:
             logger_cli.warn(
@@ -861,36 +905,22 @@
                 pod_yaml['metadata']['namespace'],
                 pod_yaml
             )
-        _timeout = 120
-        logger_cli.debug("... waiting '{}'s for pod to start".format(_timeout))
-        while _timeout > 0:
-            _t = _list_pod()
-            for item in _t.items:
-                logger_cli.debug("... pod is '{}'".format(item.status.phase))
-                if item.status.phase in ["Running"]:
-                    return item
-            sleep(2)
-            _timeout -= 2
-        raise CheckerException(
-            "Timed out creating pod '{}'".format(
-                pod_yaml['metadata']['name']
-            )
+        return self.wait_for_phase(
+            "pod",
+            pod_yaml['metadata']['name'],
+            pod_yaml['metadata']['namespace'],
+            ["Running"]
         )
 
     def expose_pod_port(self, pod_object, port, ns="qa-space"):
-        def _list_svc():
-            return self.CoreV1.list_namespaced_service(
-                pod_object.metadata.namespace,
-                label_selector='name={}'.format(pod_object.metadata.name)
-            )
-        _existing = self.safe_get_item_by_name(
-            _list_svc(),
-            pod_object.metadata.name
+        _existing = self.get_svc_by_name_and_ns(
+            pod_object.metadata.name,
+            pod_object.metadata.namespace
         )
         if _existing is not None:
             # TODO: Check port number?
-            logger_cli.warn(
-                "WARNING: Pod already exposed '{}/{}:{}'. Reusing.".format(
+            logger_cli.info(
+                "-> Pod already exposed '{}/{}:{}'. Reusing.".format(
                     pod_object.metadata.namespace,
                     pod_object.metadata.name,
                     port
diff --git a/cfg_checker/modules/ceph/__init__.py b/cfg_checker/modules/ceph/__init__.py
index 9d0f923..d3f9581 100644
--- a/cfg_checker/modules/ceph/__init__.py
+++ b/cfg_checker/modules/ceph/__init__.py
@@ -92,6 +92,11 @@
         metavar='task-file',
         help="Task file for benchmark"
     )
+    ceph_bench_parser.add_argument(
+        '--no-cleanup',
+        action="store_true", default=False,
+        help="Do not cleanup services, agents, pvc, and pv"
+    )
 
     return _parser
 
@@ -149,9 +154,9 @@
     args_utils.check_supported_env(ENV_TYPE_KUBE, args, config)
     _filename = args_utils.get_arg(args, 'html')
     # agents count option
-    _agents = args_utils.get_arg(args, "agents")
-    logger_cli.info("-> using {} agents".format(_agents))
-    config.bench_agent_count = _agents
+    config.bench_agent_count = args_utils.get_arg(args, "agents")
+    logger_cli.info("-> using {} agents".format(config.bench_agent_count))
+    config.no_cleaning_after_benchmark = args_utils.get_arg(args, "no_cleanup")
     # storage class
     _storage_class = args_utils.get_arg(args, "storage_class")
     logger_cli.info("-> using storage class of '{}'".format(_storage_class))
@@ -179,8 +184,10 @@
     # Do the testrun
     ceph_bench.prepare_agents(_opts)
     if not ceph_bench.run_benchmark(_opts):
+        # No cleaning and/or report if benchmark was not finished
         return
-    ceph_bench.cleanup()
+    if not config.no_cleaning_after_benchmark:
+        ceph_bench.cleanup()
 
     # Create report
     ceph_bench.create_report(_filename)
diff --git a/cfg_checker/modules/ceph/bench.py b/cfg_checker/modules/ceph/bench.py
index 190076a..dd96a69 100644
--- a/cfg_checker/modules/ceph/bench.py
+++ b/cfg_checker/modules/ceph/bench.py
@@ -47,6 +47,8 @@
         if config.bench_mode == "tasks":
             self.load_tasks(config.bench_task_file)
 
+        self.cleanup_list = []
+
     def load_tasks(self, taskfile):
         # Load csv file
         logger_cli.info("-> Loading taskfile '{}'".format(taskfile))
@@ -63,6 +65,11 @@
                     "size": row[4]
                 })
 
+    def add_for_deletion(self, obj, typ):
+        _d = [typ, obj.metadata.namespace, obj.metadata.name]
+        self.cleanup_list.append(_d)
+        return
+
     def prepare_agents(self, options):
         logger_cli.info("# Preparing {} agents".format(self.agent_count))
         for idx in range(self.agent_count):
@@ -75,10 +82,15 @@
                 options['size'] + 'i',
                 self._agent_template
             )
-            # save it to list
+            # save it to lists
             self.agent_pods.append(_agent)
+            self.add_for_deletion(_pv, "pv")
+            self.add_for_deletion(_pvc, "pvc")
+            self.add_for_deletion(_agent, "pod")
+
             # expose it
             _svc = self.master.expose_benchmark_agent(_agent)
+            self.add_for_deletion(_svc, "svc")
             # Save service
             self.services.append(_svc)
 
@@ -203,7 +215,9 @@
         _runtime = _get_seconds(options["runtime"])
         _ramptime = _get_seconds(options["ramp_time"])
         _timeout = _runtime + _ramptime + 5
-        while _timeout > 0:
+        _end = datetime.now() + timedelta(seconds=_timeout)
+        while True:
+            # Print status
             _sts = get_status()
             _str = ""
             for _s in _sts:
@@ -212,17 +226,29 @@
                     _s["status"],
                     _s["progress"]
                 )
-            logger_cli.debug("... {}".format(_str))
-            sleep(1)
-            _timeout -= 1
+            # recalc how much is left
+            diff = (_end - datetime.now()).total_seconds()
+            logger_cli.debug("... [{}s]: {}".format(diff, _str))
+            # In case end_datetime was in past to begin with
+            if diff < 0:
+                break
+            logger_cli.info("-> Sleeping for {}s".format(diff/2))
+            sleep(diff/2)
+            if diff <= 0.1:
+                break
 
+        logger_cli.info("-> Done")
         return True
 
     def cleanup(self):
+        self.cleanup_list.reverse()
 
+        for _res in self.cleanup_list:
+            self.master.cleanup_resource_by_name(_res[0], _res[2], ns=_res[1])
+        logger_cli.info("# Done cleaning up")
         return
 
     # Create report
-    def create_report(self):
+    def create_report(self, filename):
 
         return
diff --git a/cfg_checker/nodes.py b/cfg_checker/nodes.py
index 9570978..c1b3d4c 100644
--- a/cfg_checker/nodes.py
+++ b/cfg_checker/nodes.py
@@ -1308,3 +1308,52 @@
 
     def expose_benchmark_agent(self, agent):
         return self.kube.expose_pod_port(agent, 8765)
+
+    def cleanup_resource_by_name(self, res_type, name, ns=None, wait=False):
+        """Cleansup resource using string res_type and the ns/name
+
+        Args:
+            res_type (string): resource type name: pod, pv, pvc, svc
+            name (string): resource name to cleanup
+            ns (string, optional): Namespace to use. Default is 'qa-space'
+
+            return: (Bool) Is Success?
+        """
+        # fill defaults
+        if not ns:
+            ns = self._namespace
+        # Handle res_type errors and choose resource type
+        if not res_type:
+            logger_cli.debug(
+                "... resource type invalid: '{}'".format(res_type)
+            )
+            return False
+        elif not name:
+            logger_cli.debug("... resource name invalid: '{}'".format(name))
+            return False
+        elif res_type == "svc":
+            # Delete service
+            logger_cli.info("-> deleting svc {}/{}".format(ns, name))
+            self.kube.CoreV1.delete_namespaced_service(name, ns)
+            # TODO: Check if successfull
+        elif res_type == "pod":
+            # Delete a pod
+            logger_cli.info("-> deleting pod {}/{}".format(ns, name))
+            self.kube.CoreV1.delete_namespaced_pod(name, ns)
+            if wait:
+                self.kube.wait_for_phase(res_type, name, ns, ["Terminated"])
+        elif res_type == "pvc":
+            logger_cli.info("-> deleting pvc {}/{}".format(ns, name))
+            self.kube.CoreV1.delete_namespaced_persistent_volume_claim(
+                name,
+                ns
+            )
+            if wait:
+                self.kube.wait_for_phase(res_type, name, ns, ["Terminated"])
+        elif res_type == "pv":
+            logger_cli.info("-> deleting pv {}/{}".format(ns, name))
+            self.kube.CoreV1.delete_persistent_volume(name)
+            if wait:
+                self.kube.wait_for_phase(res_type, name, None, ["Terminated"])
+
+        return True
diff --git a/setup.py b/setup.py
index 4ae65c7..5c1e6b0 100644
--- a/setup.py
+++ b/setup.py
@@ -38,7 +38,7 @@
 
 setup(
     name="mcp-checker",
-    version="0.61",
+    version="0.62",
     author="Alex Savatieiev",
     author_email="osavatieiev@mirantis.com",
     classifiers=[
diff --git a/templates/cfgagent-template.yaml b/templates/cfgagent-template.yaml
index bc64af8..c6b305e 100644
--- a/templates/cfgagent-template.yaml
+++ b/templates/cfgagent-template.yaml
@@ -11,7 +11,7 @@
     - checker-agent
     imagePullPolicy: IfNotPresent
     name: cfgagent-pod
-    image: savex13/cfg-checker-agent:0.6
+    image: savex13/cfg-checker-agent:0.61
     volumeMounts:
     - mountPath: /cephvol
       name: cfgagent-pv-placeholder