cfg-checker ceph bench module part 4

 - benchmark code refactoring/cleaning
 - using UTC timezone for scheduling
 - 'tasks' mode first version
 - updated agent poking response structure

 Related-PROD: PROD-36669

Change-Id: I8bf9d9b4d54a41cc04f7765e17efb675028c6262
diff --git a/cfg_checker/agent/fio_runner.py b/cfg_checker/agent/fio_runner.py
index 50afeca..c8488af 100644
--- a/cfg_checker/agent/fio_runner.py
+++ b/cfg_checker/agent/fio_runner.py
@@ -6,7 +6,7 @@
 
 
 from copy import deepcopy
-from datetime import datetime, timedelta
+from datetime import datetime, timezone
 from platform import system, release, node
 from threading import Thread
 import threading
@@ -18,7 +18,7 @@
 from cfg_checker.common.log import logger
 
 
-_datetime_fmt = "%m/%d/%Y, %H:%M:%S"
+_datetime_fmt = "%m/%d/%Y, %H:%M:%S%z"
 fio_options_common = {
     "name": "agent_run",
     "filename": "/cephvol/testfile",
@@ -64,7 +64,7 @@
 
 def get_time(timestamp=None):
     if not timestamp:
-        _t = datetime.now()
+        _t = datetime.now(timezone.utc)
     else:
         _t = datetime.fromtimestamp(timestamp)
     return _t.strftime(_datetime_fmt)
@@ -85,7 +85,7 @@
 
 def wait_until(end_datetime):
     while True:
-        diff = (end_datetime - datetime.now()).total_seconds()
+        diff = (end_datetime - datetime.now(timezone.utc)).total_seconds()
         # In case end_datetime was in past to begin with
         if diff < 0:
             return
@@ -297,11 +297,12 @@
         self.fiorun = ShellThread(_cmd, _q)
         # Check if schedule is set
         if self.scheduled_datetime:
+            _now = datetime.now(timezone.utc)
             logger.debug(
                 "waiting for '{}', now is '{}', total of {} sec left".format(
                     self.scheduled_datetime.strftime(_datetime_fmt),
-                    datetime.now().strftime(_datetime_fmt),
-                    (self.scheduled_datetime - datetime.now()).total_seconds()
+                    _now.strftime(_datetime_fmt),
+                    (self.scheduled_datetime - _now).total_seconds()
                 )
             )
             wait_until(self.scheduled_datetime)
@@ -392,7 +393,8 @@
         _scheduled = False
         _diff = -1
         if self.scheduled_datetime:
-            _diff = (self.scheduled_datetime - datetime.now()).total_seconds()
+            _now = datetime.now(timezone.utc)
+            _diff = (self.scheduled_datetime - _now).total_seconds()
             if _diff > 0:
                 _scheduled = True
         _s = "running" if _running else "idle"
@@ -575,10 +577,11 @@
             _s = self.fio.status()
             if _s["status"] == "scheduled":
                 _t = self.fio.scheduled_datetime
-                _n = datetime.now()
+                _n = datetime.now(timezone.utc)
                 _delta = (_t - _n).total_seconds()
                 print(
-                    "waiting for '{}'; now '{}'; {} sec left".format(
+                    "{}: waiting for '{}'; now '{}'; {} sec left".format(
+                        _s["status"],
                         _t.strftime(_datetime_fmt),
                         _n.strftime(_datetime_fmt),
                         _delta
@@ -609,7 +612,8 @@
     _opts["readwrite"] = "read"
     _opts["ramp_time"] = "1s"
     _opts["runtime"] = "5s"
-    _shell.do_singlerun(_opts)
+    _opts["scheduled_to"] = "11/13/2021, 23:03:30+0000"
+    _shell.do_scheduledrun(_opts)
     _shell()
     _times = _shell.get_resultlist()
     print("# results:\n{}".format("\n".join(_times)))
@@ -623,9 +627,7 @@
     _opts["readwrite"] = "read"
     _opts["ramp_time"] = "1s"
     _opts["runtime"] = "10s"
-    _opts["scheduled_to"] = (datetime.now() + timedelta(seconds=12)).strftime(
-        _datetime_fmt
-    )
+    _opts["scheduled_to"] = "11/13/2021, 23:04:20+0000"
     _shell.do_scheduledrun(_opts)
     _shell()
     _times = _shell.get_resultlist()
diff --git a/cfg_checker/agent/webserver.py b/cfg_checker/agent/webserver.py
index 235ba1a..7f7e53a 100644
--- a/cfg_checker/agent/webserver.py
+++ b/cfg_checker/agent/webserver.py
@@ -158,6 +158,7 @@
                         logger.info("getting results for '{}'".format(_time))
                         resp.status = falcon.HTTP_200
                         resp.content_type = "application/json"
+                        # TODO: get timeline too?
                         resp.text = json.dumps({_time: _a(_time)})
                     elif _action == 'do_singlerun':
                         logger.info("executing single run")
diff --git a/cfg_checker/modules/ceph/__init__.py b/cfg_checker/modules/ceph/__init__.py
index 62941d5..0f1de01 100644
--- a/cfg_checker/modules/ceph/__init__.py
+++ b/cfg_checker/modules/ceph/__init__.py
@@ -176,17 +176,17 @@
         # TODO: Update options for single run
         logger_cli.debug("    {} = {}".format(_k, _opts[_k]))
 
-    # handle option inavailability
+    # handle option inavailability from command line for single mode
+
+    # init the Bench class
     ceph_bench = bench.KubeCephBench(config)
-
-    # Load tasks
-
     # Do the testrun
     ceph_bench.prepare_agents(_opts)
     if not ceph_bench.run_benchmark(_opts):
         # No cleaning and/or report if benchmark was not finished
         logger_cli.info("# Abnormal benchmark run, no cleaning performed")
         return
+    # Cleaning
     if not config.no_cleaning_after_benchmark:
         ceph_bench.cleanup()
     else:
diff --git a/cfg_checker/modules/ceph/bench.py b/cfg_checker/modules/ceph/bench.py
index 5fa4cfd..7640440 100644
--- a/cfg_checker/modules/ceph/bench.py
+++ b/cfg_checker/modules/ceph/bench.py
@@ -2,10 +2,11 @@
 import os
 import json
 
-from datetime import datetime, timedelta
+from datetime import datetime, timedelta, timezone
 from time import sleep
 
 from cfg_checker.common import logger_cli
+from cfg_checker.common.decorators import retry
 from cfg_checker.helpers.console_utils import Progress
 # from cfg_checker.common.exception import InvalidReturnException
 # from cfg_checker.common.exception import ConfigException
@@ -43,12 +44,14 @@
         self.storage_class = config.bench_storage_class
         self.agent_pods = []
         self.services = []
-        self.api_urls = []
+        self.scheduled_delay = 30
         self.mode = config.bench_mode
         if config.bench_mode == "tasks":
-            self.load_tasks(config.bench_task_file)
+            self.taskfile = config.bench_task_file
+            self.load_tasks(self.taskfile)
 
         self.cleanup_list = []
+        self.results = {}
 
     def load_tasks(self, taskfile):
         # Load csv file
@@ -94,15 +97,19 @@
             self.add_for_deletion(_svc, "svc")
             # Save service
             self.services.append(_svc)
-
-        # Build urls for agents
-        for svc in self.services:
-            self.api_urls.append(
+            # prepopulate results
+            self.results[_agent.metadata.name] = {}
+            self.results[_agent.metadata.name]["list"] = {}
+            self.results[_agent.metadata.name]["url"] = \
                 "http://{}:{}/api/".format(
-                    svc.spec.cluster_ip,
+                    _svc.spec.cluster_ip,
                     8765
                 )
-            )
+            self.results[_agent.metadata.name]["storage_class"] = \
+                self.storage_class
+            self.results[_agent.metadata.name]["volume_size"] = \
+                options['size']
+
         logger_cli.info("-> Done creating agents")
         return
 
@@ -147,98 +154,203 @@
 
         return None
 
-    def run_benchmark(self, options):
-        def get_status():
-            return [self._poke_agent(_u + "fio", {}) for _u in self.api_urls]
-        logger_cli.info("# Starting '{}' benchmark".format(self.mode))
-        logger_cli.info("# Checking agents")
+    def _ensure_agents_ready(self):
         # make sure agents idle
-        _tt = []
-        _rr = []
-        for _s in get_status():
-            if _s is None:
+        _status_set = []
+        _ready_set = []
+        for _agent, _d in self.get_agents_status().items():
+            # obviously, there should be some answer
+            if _d is None:
                 logger_cli.error("ERROR: Agent status not available")
                 return False
-            _h = _s["healthcheck"]["hostname"]
-            _t = _s['status']
-            _r = _s["healthcheck"]["ready"]
-            if _t not in ["idle", "finished"]:
-                logger_cli.error("Agent status invalid {}:{}".format(_h, _t))
-                _tt += [False]
+            # status should be idle or finished
+            if _d['status'] not in ["idle", "finished"]:
+                logger_cli.error(
+                    "Agent status invalid {}:{}".format(_agent, _d['status'])
+                )
+                _status_set += [False]
             else:
-                _tt += [True]
-            if not _r:
-                logger_cli.error("Agent is not ready {}".format(_h))
-                _rr += [False]
+                # Good agent
+                _status_set += [True]
+            # agent's fio shell should be in 'ready'
+            if not _d["healthcheck"]["ready"]:
+                logger_cli.error("Agent is not ready {}".format(_agent))
+                _ready_set += [False]
             else:
-                _rr += [True]
-        if not any(_tt) or not any(_rr):
+                # 'fio' shell for agent is ready
+                _ready_set += [True]
+        # all agent's statuses should be True
+        # and all 'fio' shell modules should be 'ready'
+        if not any(_status_set) or not any(_ready_set):
+            # At least one is not ready and it was already logged above
+            return False
+        else:
+            # All is good
+            return True
+
+    def get_agents_status(self):
+        _status = {}
+        for _agent, _d in self.results.items():
+            _status[_agent] = self._poke_agent(_d["url"] + "fio", {})
+        return _status
+
+    def get_agents_resultlist(self):
+        _t = {"module": "fio", "action": "get_resultlist"}
+        _status = {}
+        for _agent, _d in self.results.items():
+            _status[_agent] = self._poke_agent(_d["url"], _t, action="POST")
+        return _status
+
+    @retry(Exception)
+    def get_result_from_agent(self, agent, time):
+        _t = {
+            "module": "fio",
+            "action": "get_result",
+            "options": {
+                "time": time
+            }
+        }
+        return self._poke_agent(self.results[agent]["url"], _t, action="POST")
+
+    def _get_next_scheduled_time(self):
+        _now = datetime.now(timezone.utc)
+        logger_cli.info("-> time is '{}'".format(_now.strftime(_datetime_fmt)))
+        _time = _now + timedelta(seconds=self.scheduled_delay)
+        _str_time = _time.strftime(_datetime_fmt)
+        logger_cli.info(
+            "-> next benchmark scheduled to '{}'".format(_str_time)
+        )
+        return _str_time
+
+    def _send_scheduled_task(self, options):
+        _task = {
+            "module": "fio",
+            "action": "do_scheduledrun",
+            "options": options
+        }
+        for _agent, _d in self.results.items():
+            logger_cli.info(
+                "-> sending task to '{}:{}'".format(_agent, _d["url"])
+            )
+            _ret = self._poke_agent(_d["url"], _task, action="POST")
+            if 'error' in _ret:
+                logger_cli.error(
+                    "ERROR: Agent returned: '{}'".format(_ret['error'])
+                )
+                return False
+        # No errors detected
+        return True
+
+    def track_benchmark(self, options):
+        _runtime = _get_seconds(options["runtime"])
+        _ramptime = _get_seconds(options["ramp_time"])
+        # Sum up all timings that we must wait and double it
+        _timeout = (self.scheduled_delay + _runtime + _ramptime) * 2
+        _end = datetime.now(timezone.utc) + timedelta(seconds=_timeout)
+        while True:
+            # Print status
+            # TODO: do pooled status get
+            _sts = self.get_agents_status()
+            diff = (_end - datetime.now(timezone.utc)).total_seconds()
+            logger_cli.info("-> {:.2f}s left. Agent status:".format(diff))
+            for _agent, _status in _sts.items():
+                logger_cli.info(
+                    "\t{}: {} ({}%)".format(
+                        _agent,
+                        _status["status"],
+                        _status["progress"]
+                    )
+                )
+            finished = [True for _s in _sts.values() 
+                        if _s["status"] == 'finished']
+            _fcnt = len(finished)
+            _tcnt = len(_sts)
+            if _fcnt < _tcnt:
+                logger_cli.info("-> {}/{} finished".format(_fcnt, _tcnt))
+            else:
+                logger_cli.info("-> All agents finished run")
+                return True
+            # recalc how much is left
+            diff = (_end - datetime.now(timezone.utc)).total_seconds()
+            # In case end_datetime was in past to begin with
+            if diff < 0:
+                logger_cli.info("-> Timed out waiting for agents to finish")
+                return False
+            logger_cli.info("-> Sleeping for {:.2f}s".format(diff/3))
+            sleep(diff/3)
+            if diff <= 0.1:
+                logger_cli.info("-> Timed out waiting for agents to finish")
+                return False
+
+    def _do_testrun(self, options):
+        # send single to agent
+        if not self._send_scheduled_task(options):
+            return False
+        # Track this benchmark progress
+        if not self.track_benchmark(options):
+            return False
+        else:
+            logger_cli.info("-> Finished testrun")
+            # Get results for each agent
+            self.collect_results()
+            return True
+
+    def _wait_ceph_cooldown(self):
+        # TODO: Query Ceph ince a 20 sec to make sure its load dropped
+
+        return
+
+    def run_benchmark(self, options):
+        logger_cli.info("# Starting '{}' benchmark".format(self.mode))
+        # Check agent readyness
+        logger_cli.info("# Checking agents")
+        if not self._ensure_agents_ready():
             return False
 
         # Make sure that Ceph is at low load
         # TODO: Ceph status check
+        self._wait_ceph_cooldown()
 
         # Do benchmark according to mode
         if self.mode == "tasks":
-            # TODO: Implement 'tasks' mode
-            # take next task
-
-            # update options
-
-            # init time to schedule
-
-            # send next task to agent
-            pass
-            # wait for agents to finish
-        elif self.mode == "single":
-            logger_cli.info("# Running benchmark")
-            # init time to schedule
-            _time = datetime.now() + timedelta(seconds=10)
-            _str_time = _time.strftime(_datetime_fmt)
-            options["scheduled_to"] = _str_time
             logger_cli.info(
-                "-> next benchmark scheduled to '{}'".format(_str_time)
-            )
-            # send single to agent
-            _task = {
-                "module": "fio",
-                "action": "do_singlerun",
-                "options": options
-            }
-            for _u in self.api_urls:
-                logger_cli.info("-> sending task to '{}'".format(_u))
-                _ret = self._poke_agent(_u, _task, action="POST")
-                if 'error' in _ret:
-                    logger_cli.error(
-                        "ERROR: Agent returned: '{}'".format(_ret['error'])
-                    )
-
-        _runtime = _get_seconds(options["runtime"])
-        _ramptime = _get_seconds(options["ramp_time"])
-        _timeout = _runtime + _ramptime + 5
-        _end = datetime.now() + timedelta(seconds=_timeout)
-        while True:
-            # Print status
-            _sts = get_status()
-            _str = ""
-            for _s in _sts:
-                _str += "{}: {} ({}); ".format(
-                    _s["healthcheck"]["hostname"],
-                    _s["status"],
-                    _s["progress"]
+                "# Running benchmark with tasks from '{}'".format(
+                    self.taskfile
                 )
-            # recalc how much is left
-            diff = (_end - datetime.now()).total_seconds()
-            logger_cli.debug("... [{:.2f}s]: {}".format(diff, _str))
-            # In case end_datetime was in past to begin with
-            if diff < 0:
-                break
-            logger_cli.info("-> Sleeping for {:.2f}s".format(diff/2))
-            sleep(diff/2)
-            if diff <= 0.1:
-                break
+            )
+            # take next task
+            _total_tasks = len(self.tasks)
+            for idx in range(_total_tasks):
+                _task = self.tasks[idx]
+                logger_cli.info(
+                    "-> Starting next task ({}/{})".format(idx+1, _total_tasks)
+                )
+                logger_cli.info("-> Updating options with: {}".format(
+                        ", ".join(
+                            ["{} = {}".format(k, v) for k, v in _task.items()]
+                        )
+                    )
+                )
+                # update options
+                options.update(_task)
+                # init time to schedule
+                options["scheduled_to"] = self._get_next_scheduled_time()
+                if not self._do_testrun(options):
+                    return False
 
-        logger_cli.info("-> Done")
+                self._wait_ceph_cooldown()
+        elif self.mode == "single":
+            logger_cli.info("# Running single benchmark")
+            # init time to schedule
+            options["scheduled_to"] = self._get_next_scheduled_time()
+            if not self._do_testrun(options):
+                return False
+        else:
+            logger_cli.error("ERROR: Unknown mode '{}'".format(self.mode))
+            return False
+
+        # Normal exit
+        logger_cli.info("# All benchmark tasks done")
         return True
 
     def cleanup(self):
@@ -260,6 +372,7 @@
             if len(_l) > 0:
                 _p.write_progress(_idx)
             else:
+                _p.write_progress(_idx)
                 _p.end()
                 logger_cli.info("# Done cleaning up")
                 break
@@ -272,6 +385,38 @@
 
         return
 
+    def collect_results(self):
+        logger_cli.info("# Collecting results")
+        # query agents for results
+        _agents = self.get_agents_resultlist()
+
+        for _agent, _l in _agents.items():
+            _list = _l["resultlist"]
+            _new = [r for r in _list if r not in self.results[_agent]["list"]]
+            logger_cli.debug(
+                "... agent '{}' has {} new results".format(_agent, len(_new))
+            )
+            #  get all new results
+            for _time in _new:
+                logger_cli.info(
+                    "-> loading results for '{}' from '{}'".format(
+                        _time,
+                        _agent
+                    )
+                )
+                self.results[_agent]["list"].update(
+                    self.get_result_from_agent(_agent, _time)
+                )
+        return
+
+    def dump_results(self, path):
+        # Function dumps all availabkle results as jsons to the given path
+        # overwriting if needed
+
+        # TODO: Conduct the dumping
+
+        return
+
     # Create report
     def create_report(self, filename):
 
diff --git a/setup.py b/setup.py
index c27fcf4..fd73a82 100644
--- a/setup.py
+++ b/setup.py
@@ -38,7 +38,7 @@
 
 setup(
     name="mcp-checker",
-    version="0.63",
+    version="0.64",
     author="Alex Savatieiev",
     author_email="osavatieiev@mirantis.com",
     classifiers=[
diff --git a/templates/cfgagent-template.yaml b/templates/cfgagent-template.yaml
index fe1065d..3152c5f 100644
--- a/templates/cfgagent-template.yaml
+++ b/templates/cfgagent-template.yaml
@@ -11,7 +11,7 @@
     - checker-agent
     imagePullPolicy: IfNotPresent
     name: cfgagent-pod
-    image: savex13/cfg-checker-agent:0.63
+    image: savex13/cfg-checker-agent:0.64
     volumeMounts:
     - mountPath: /cephvol
       name: cfgagent-pv-placeholder