Ceph report fixes and Ceph bench beta 0.1

- Ceph stats collection
- Updated Ceph results averages calculations

Fixes:
- Fixed huge PG dump copying >30MB jsons
- Fixes for the fio-runner constants

 Related-PROD: PROD-36669

Change-Id: Id8e250f626dfdaecc12ad005b61d03a21c9e6c4e
diff --git a/cfg_checker/modules/ceph/bench.py b/cfg_checker/modules/ceph/bench.py
index d804f4a..2eedcfb 100644
--- a/cfg_checker/modules/ceph/bench.py
+++ b/cfg_checker/modules/ceph/bench.py
@@ -3,7 +3,6 @@
 import json
 
 from datetime import datetime, timedelta, timezone
-from time import sleep
 
 from cfg_checker.common import logger_cli
 from cfg_checker.common.decorators import retry
@@ -18,6 +17,9 @@
 from cfg_checker.agent.fio_runner import _get_seconds, _datetime_fmt
 
 
+_file_datetime_fmt = "%m%d%Y%H%M%S%z"
+
+
 def _reformat_timestr(_str, _chars=["/", ",", " ", ":", "+"], _tchar=""):
     _new = ""
     for _c in _str:
@@ -76,6 +78,7 @@
 
         self.storage_class = config.bench_storage_class
         self.results_dump_path = config.bench_results_dump_path
+        self.bench_name = config.bench_name
         self.agent_pods = []
         self.services = []
         # By default,
@@ -138,7 +141,8 @@
         for idx in range(self.agent_count):
             # create pvc/pv and pod
             logger_cli.info("-> creating agent '{:02}'".format(idx))
-            _agent, _pv, _pvc = self.master.prepare_benchmark_agent(
+            # _agent, _pv, _pvc = self.master.prepare_benchmark_agent(
+            _agent, _pvc = self.master.prepare_benchmark_agent(
                 idx,
                 os.path.split(options["filename"])[0],
                 self.storage_class,
@@ -147,7 +151,7 @@
             )
             # save it to lists
             self.agent_pods.append(_agent)
-            self.add_for_deletion(_pv, "pv")
+            # self.add_for_deletion(_pv, "pv")
             self.add_for_deletion(_pvc, "pvc")
             self.add_for_deletion(_agent, "pod")
 
@@ -170,7 +174,7 @@
 
         logger_cli.info("-> Done creating agents")
         # TODO: Update after implementing pooled task sending
-        self.scheduled_delay = self.agent_count * 6
+        self.scheduled_delay = self.agent_count * 10
         logger_cli.info(
             "-> Schedule delay set to {} sec".format(self.scheduled_delay)
         )
@@ -277,8 +281,10 @@
     def _get_next_scheduled_time(self):
         _now = datetime.now(timezone.utc)
         logger_cli.info("-> time is '{}'".format(_now.strftime(_datetime_fmt)))
-        _time = _now + timedelta(seconds=self.scheduled_delay)
-        _str_time = _time.strftime(_datetime_fmt)
+        self.next_scheduled_time = _now + timedelta(
+            seconds=self.scheduled_delay
+        )
+        _str_time = self.next_scheduled_time.strftime(_datetime_fmt)
         logger_cli.info(
             "-> next benchmark scheduled to '{}'".format(_str_time)
         )
@@ -308,10 +314,10 @@
         _ramptime = _get_seconds(options["ramp_time"])
         # Sum up all timings that we must wait and double it
         _timeout = (self.scheduled_delay + _runtime + _ramptime) * 2
+        _start = self.next_scheduled_time
         _end = datetime.now(timezone.utc) + timedelta(seconds=_timeout)
         while True:
             # Print status
-            # TODO: do pooled status get
             _sts = self.get_agents_status()
             diff = (_end - datetime.now(timezone.utc)).total_seconds()
             logger_cli.info("-> {:.2f}s left. Agent status:".format(diff))
@@ -323,6 +329,13 @@
                         _status["progress"]
                     )
                 )
+            # Get Ceph status if _start time passed
+            _elapsed = (datetime.now(timezone.utc) - _start).total_seconds()
+            if _elapsed > 0:
+                logger_cli.info("-> {:.2f}s elapsed".format(_elapsed))
+                self.results[options["scheduled_to"]]["ceph"][_elapsed] = \
+                    self.ceph_info.get_cluster_status()
+            # Check if agents finished
             finished = [True for _s in _sts.values()
                         if _s["status"] == 'finished']
             _fcnt = len(finished)
@@ -338,12 +351,6 @@
             if diff < 0:
                 logger_cli.info("-> Timed out waiting for agents to finish")
                 return False
-            else:
-                logger_cli.info("-> Sleeping for {:.2f}s".format(2))
-                sleep(2)
-            if diff <= 0.1:
-                logger_cli.info("-> Timed out waiting for agents to finish")
-                return False
 
     def _do_testrun(self, options):
         # send single to agent
@@ -353,9 +360,18 @@
         if not self.track_benchmark(options):
             return False
         else:
-            logger_cli.info("-> Finished testrun")
+            logger_cli.info("-> Finished testrun. Collecting results...")
             # Get results for each agent
-            self.collect_results(options)
+            self.collect_results()
+            logger_cli.info("-> Calculating totals and averages")
+            self.calculate_totals()
+            self.calculate_ceph_stats()
+            logger_cli.info("-> Dumping results")
+            for _time, _d in self.results.items():
+                self.dump_result(
+                    self._get_dump_filename(_time),
+                    _d
+                )
             return True
 
     def wait_ceph_cooldown(self):
@@ -411,6 +427,7 @@
                 self.results[_sch_time] = {
                     "input_options": options,
                     "agents": {},
+                    "ceph": {},
                     "osd_df_before": _osd_df_before
                 }
                 if not self._do_testrun(options):
@@ -429,6 +446,7 @@
             self.results[_sch_time] = {
                 "input_options": options,
                 "agents": {},
+                "ceph": {},
                 "osd_df_before": _osd_df_before
             }
             if not self._do_testrun(options):
@@ -476,63 +494,50 @@
 
         return
 
-    def collect_results(self, options):
-        _sch_time = options["scheduled_to"]
+    def collect_results(self):
         logger_cli.info("# Collecting results")
         # query agents for results
         _agents = self.get_agents_resultlist()
-        # Syntax shortcut
-        _ar = self.results[_sch_time]["agents"]
-
         for _agent, _l in _agents.items():
-            # Create a syntax shortcut
-            if _agent not in _ar:
-                _ar[_agent] = {}
-            _arl = _ar[_agent]
             # Check if we already have this locally
             for _time in _l["resultlist"]:
-                _filename = self._get_dump_filename(_sch_time, _agent, options)
-                if os.path.exists(_filename):
-                    # There is a file already for this task
-                    # Check if we need to load it
-                    if _sch_time in _arl:
-                        logger_cli.info(
-                            "-> Skipped already processed result '{}'".format(
-                                _filename
-                            )
-                        )
-                    else:
-                        # Load previously dumped result from disk
-                        logger_cli.info(
-                            "-> Loading already present result '{}'".format(
-                                _filename
-                            )
-                        )
-                        _arl[_sch_time] = self.load_dumped_result(_filename)
-                else:
-                    # Load result add it locally and dump it
+                # There is a file already for this task/time
+                # Check if we need to load it
+                if _time not in self.results:
+                    # Some older results found
+                    # do not process them
+                    logger_cli.info(
+                        "-> Skipped old results for '{}'".format(_time)
+                    )
+                    continue
+                elif _agent not in self.results[_time]["agents"]:
+                    # Load result add it locally
                     logger_cli.info(
                         "-> Getting results for '{}' from '{}'".format(
-                            _sch_time,
+                            _time,
                             _agent
                         )
                     )
                     _r = self.get_result_from_agent(_agent, _time)
-                    # Important to switch from result status time
-                    # to scheduled time
-                    _arl[_sch_time] = _r[_time]
-                    # Dump collected result
-                    self.dump_result(_filename, _arl[_sch_time])
-        return
+                    self.results[_time]["agents"][_agent] = _r[_time]
+                else:
+                    # Should never happen, actually
+                    logger_cli.info(
+                        "-> Skipped loaded result for '{}' from '{}'".format(
+                            _time,
+                            _agent
+                        )
+                    )
 
-    def _get_dump_filename(self, _time, agent, options):
-        _dirname = _reformat_timestr(_time)
+    def _get_dump_filename(self, _time):
+        _r = self.results[_time]
+        _dirname = _r["input_options"]["name"]
         _filename = "-".join([
-            _dirname,
-            agent,
-            options["readwrite"],
-            options["bs"],
-            str(options["iodepth"]),
+            _reformat_timestr(_time),
+            "{:02}".format(len(_r["agents"])),
+            _r["input_options"]["readwrite"],
+            _r["input_options"]["bs"],
+            str(_r["input_options"]["iodepth"]),
         ]) + ".json"
         return os.path.join(
             self.results_dump_path,
@@ -540,6 +545,45 @@
             _filename
         )
 
+    def preload_results(self):
+        logger_cli.info(
+            "# Preloading results for '{}'".format(self.bench_name)
+        )
+        # get all dirs in folder
+        _p = self.results_dump_path
+        if not os.path.isdir(_p):
+            logger_cli.warn(
+                "WARNING: Dump path is not a folder '{}'".format(_p)
+            )
+            return
+        for path, dirs, files in os.walk(_p):
+            if path == os.path.join(_p, self.bench_name):
+                logger_cli.info("-> Folder found '{}'".format(path))
+                for _fname in files:
+                    logger_cli.debug("... processing '{}'".format(_fname))
+                    _ext = _fname.split('.')[-1]
+                    if _ext != "json":
+                        logger_cli.info(
+                            "-> Extension invalid '{}', "
+                            "'json' is expected".format(_ext)
+                        )
+                        continue
+                    # get time from filename
+                    # Ugly, but works
+                    _t = _fname.split('-')[0]
+                    _str_time = _t[:14] + "+" + _t[14:]
+                    _t = datetime.strptime(_str_time, _file_datetime_fmt)
+                    _time = _t.strftime(_datetime_fmt)
+                    self.results[_time] = self.load_dumped_result(
+                        os.path.join(path, _fname)
+                    )
+                    logger_cli.info(
+                        "-> Loaded '{}' as '{}'".format(
+                            _fname,
+                            _time
+                        )
+                    )
+
     def dump_result(self, filename, data):
         # Function dumps all available results as jsons to the given path
         # overwriting if needed
@@ -594,7 +638,7 @@
             _w_iops = 0
             for _a, _d in data["agents"].items():
                 # Hardcoded number of jobs param :(
-                _j = _d[_time]["jobs"][0]
+                _j = _d["jobs"][0]
                 _r_bw += _j["read"]["bw_bytes"]
                 _r_avglat += [_j["read"]["lat_ns"]["mean"]]
                 _r_iops += _j["read"]["iops"]
@@ -623,6 +667,64 @@
                 (sum(_w_avglat) / len(_w_avglat)) / 1000
             _totals["write_iops"] = _w_iops
 
+    def calculate_ceph_stats(self):
+        # func to get values as lists
+        def _as_list(key, stats):
+            _list = []
+            for _v in stats.values():
+                if key in _v:
+                    _list += [_v[key]]
+                else:
+                    _list += [0]
+            return _list
+
+        def _perc(n, m):
+            if not n:
+                return 0
+            elif not m:
+                return 0
+            else:
+                return (n / m) * 100
+
+        _stats = {}
+        for _time, data in self.results.items():
+            if "ceph" not in data:
+                logger_cli.warning(
+                    "WARNING: Ceph stats raw data not found in results"
+                )
+                continue
+            if "ceph_stats" not in data:
+                data["ceph_stats"] = {}
+            else:
+                continue
+            # Copy pool stats data
+            for _e, _d in data["ceph"].items():
+                _stats[_e] = _d["pgmap"]
+            # Maximums
+            m_r_bytes = max(_as_list("read_bytes_sec", _stats))
+            m_w_bytes = max(_as_list("write_bytes_sec", _stats))
+            m_r_iops = max(_as_list("read_op_per_sec", _stats))
+            m_w_iops = max(_as_list("write_op_per_sec", _stats))
+            # Replace ceph with shorter data
+            data["ceph"] = {
+                "max_read_bytes_sec": m_r_bytes,
+                "max_write_bytes_sec": m_w_bytes,
+                "max_read_iops_sec": m_r_iops,
+                "max_write_iops_sec": m_w_iops,
+                "stats": _stats
+            }
+            # Calculate %% values for barchart
+            for _e, _d in data["ceph"]["stats"].items():
+                _d["read_bytes_sec_perc"] = \
+                    _perc(_d.get("read_bytes_sec", 0), m_r_bytes)
+                _d["write_bytes_sec_perc"] = \
+                    _perc(_d.get("write_bytes_sec", 0), m_w_bytes)
+                _d["read_op_per_sec_perc"] = \
+                    _perc(_d.get("read_op_per_sec", 0), m_r_iops)
+                _d["write_op_per_sec_perc"] = \
+                    _perc(_d.get("write_op_per_sec", 0), m_w_iops)
+        return
+
     # Create report
     def create_report(self, filename):
         """
@@ -635,7 +737,6 @@
             reporter.HTMLCephBench(self),
             filename
         )
-        self.calculate_totals()
         _report(
             {
                 "results": self.results,