cfg-checker ceph bench module alpha version

 - Ceph benchmark report (beta)
 - Updated result time choosing. Now results reported based on start time
 - New methods for listing
 - Cleanup-only mode
 - Unified results processing
 - Additional ceph info gather
 - Experimental barchart graph example

Fixes:
 - Kube API client recreated each time for stability (HTTP/WebSocket specifics)
 - args naming fixes
 -

Change-Id: Id541f789a00ab4ee827603c5b6f7f07899aaa7c5
diff --git a/cfg_checker/modules/ceph/bench.py b/cfg_checker/modules/ceph/bench.py
index 7640440..d804f4a 100644
--- a/cfg_checker/modules/ceph/bench.py
+++ b/cfg_checker/modules/ceph/bench.py
@@ -7,7 +7,9 @@
 
 from cfg_checker.common import logger_cli
 from cfg_checker.common.decorators import retry
+from cfg_checker.common.file_utils import write_str_to_file
 from cfg_checker.helpers.console_utils import Progress
+from cfg_checker.reports import reporter
 # from cfg_checker.common.exception import InvalidReturnException
 # from cfg_checker.common.exception import ConfigException
 # from cfg_checker.common.exception import KubeException
@@ -16,6 +18,27 @@
 from cfg_checker.agent.fio_runner import _get_seconds, _datetime_fmt
 
 
+def _reformat_timestr(_str, _chars=["/", ",", " ", ":", "+"], _tchar=""):
+    _new = ""
+    for _c in _str:
+        _new += _c if _c not in _chars else _tchar
+    return _new
+
+
+def _parse_json_output(buffer):
+    try:
+        return json.loads(buffer)
+    except TypeError as e:
+        logger_cli.error(
+            "ERROR: Status not decoded: {}\n{}".format(e, buffer)
+        )
+    except json.decoder.JSONDecodeError as e:
+        logger_cli.error(
+            "ERROR: Status not decoded: {}\n{}".format(e, buffer)
+        )
+    return {}
+
+
 class CephBench(object):
     _agent_template = "cfgagent-template.yaml"
 
@@ -41,17 +64,30 @@
         self.agent_count = config.bench_agent_count
         self.master = KubeNodes(config)
         super(KubeCephBench, self).__init__(config)
-        self.storage_class = config.bench_storage_class
-        self.agent_pods = []
-        self.services = []
-        self.scheduled_delay = 30
+
         self.mode = config.bench_mode
+        self.resource_prefix = config.resource_prefix
         if config.bench_mode == "tasks":
             self.taskfile = config.bench_task_file
             self.load_tasks(self.taskfile)
+        elif config.bench_mode == "cleanup":
+            self.cleanup_list = []
+            return
+
+        self.storage_class = config.bench_storage_class
+        self.results_dump_path = config.bench_results_dump_path
+        self.agent_pods = []
+        self.services = []
+        # By default,
+        # 30 seconds should be enough to send tasks to 3-5 agents
+        self.scheduled_delay = 30
 
         self.cleanup_list = []
         self.results = {}
+        self.agent_results = {}
+
+    def set_ceph_info_class(self, ceph_info):
+        self.ceph_info = ceph_info
 
     def load_tasks(self, taskfile):
         # Load csv file
@@ -68,10 +104,33 @@
                     "iodepth": row[3],
                     "size": row[4]
                 })
+        logger_cli.info("-> Loaded {} tasks".format(len(self.tasks)))
 
     def add_for_deletion(self, obj, typ):
-        _d = [typ, obj.metadata.namespace, obj.metadata.name]
-        self.cleanup_list.append(_d)
+        self.cleanup_list.append(
+            [
+                typ,
+                obj.metadata.namespace,
+                obj.metadata.name
+            ]
+        )
+        return
+
+    def prepare_cleanup(self):
+        # Assume number of resources not given
+        # list all svc, pod, pvc, pv and identify 'cfgagent-xx ones
+        _types = ["pv", "pvc", "pod", "svc"]
+        _prefix = self.resource_prefix
+        for _typ in _types:
+            _list = self.master.list_resource_names_by_type_and_ns(_typ)
+            for ns, name in _list:
+                if name.startswith(_prefix):
+                    if ns:
+                        _msg = "{} {}/{}".format(_typ, ns, name)
+                    else:
+                        _msg = "{} {}".format(_typ, name)
+                    logger_cli.info("-> Found {}".format(_msg))
+                    self.cleanup_list.append([_typ, ns, name])
         return
 
     def prepare_agents(self, options):
@@ -98,19 +157,23 @@
             # Save service
             self.services.append(_svc)
             # prepopulate results
-            self.results[_agent.metadata.name] = {}
-            self.results[_agent.metadata.name]["list"] = {}
-            self.results[_agent.metadata.name]["url"] = \
+            self.agent_results[_agent.metadata.name] = {}
+            self.agent_results[_agent.metadata.name]["url"] = \
                 "http://{}:{}/api/".format(
                     _svc.spec.cluster_ip,
                     8765
                 )
-            self.results[_agent.metadata.name]["storage_class"] = \
+            self.agent_results[_agent.metadata.name]["storage_class"] = \
                 self.storage_class
-            self.results[_agent.metadata.name]["volume_size"] = \
+            self.agent_results[_agent.metadata.name]["volume_size"] = \
                 options['size']
 
         logger_cli.info("-> Done creating agents")
+        # TODO: Update after implementing pooled task sending
+        self.scheduled_delay = self.agent_count * 6
+        logger_cli.info(
+            "-> Schedule delay set to {} sec".format(self.scheduled_delay)
+        )
         return
 
     def _poke_agent(self, url, body, action="GET"):
@@ -141,18 +204,7 @@
             self.master._namespace,
             " ".join(_cmd)
         )
-        try:
-            return json.loads(_ret)
-        except TypeError as e:
-            logger_cli.error(
-                "ERROR: Status not decoded: {}\n{}".format(e, _ret)
-            )
-        except json.decoder.JSONDecodeError as e:
-            logger_cli.error(
-                "ERROR: Status not decoded: {}\n{}".format(e, _ret)
-            )
-
-        return None
+        return _parse_json_output(_ret)
 
     def _ensure_agents_ready(self):
         # make sure agents idle
@@ -190,18 +242,24 @@
 
     def get_agents_status(self):
         _status = {}
-        for _agent, _d in self.results.items():
-            _status[_agent] = self._poke_agent(_d["url"] + "fio", {})
+        _results = self.master.exec_on_labeled_pods_and_ns(
+            "app=cfgagent",
+            "curl -s http://localhost:8765/api/fio"
+        )
+        for _agent, _result in _results.items():
+            _j = _parse_json_output(_result)
+            _status[_agent] = _j
         return _status
 
+    @retry(Exception, initial_wait=5)
     def get_agents_resultlist(self):
         _t = {"module": "fio", "action": "get_resultlist"}
         _status = {}
-        for _agent, _d in self.results.items():
+        for _agent, _d in self.agent_results.items():
             _status[_agent] = self._poke_agent(_d["url"], _t, action="POST")
         return _status
 
-    @retry(Exception)
+    @retry(Exception, initial_wait=5)
     def get_result_from_agent(self, agent, time):
         _t = {
             "module": "fio",
@@ -210,7 +268,11 @@
                 "time": time
             }
         }
-        return self._poke_agent(self.results[agent]["url"], _t, action="POST")
+        return self._poke_agent(
+            self.agent_results[agent]["url"],
+            _t,
+            action="POST"
+        )
 
     def _get_next_scheduled_time(self):
         _now = datetime.now(timezone.utc)
@@ -228,7 +290,7 @@
             "action": "do_scheduledrun",
             "options": options
         }
-        for _agent, _d in self.results.items():
+        for _agent, _d in self.agent_results.items():
             logger_cli.info(
                 "-> sending task to '{}:{}'".format(_agent, _d["url"])
             )
@@ -261,7 +323,7 @@
                         _status["progress"]
                     )
                 )
-            finished = [True for _s in _sts.values() 
+            finished = [True for _s in _sts.values()
                         if _s["status"] == 'finished']
             _fcnt = len(finished)
             _tcnt = len(_sts)
@@ -276,8 +338,9 @@
             if diff < 0:
                 logger_cli.info("-> Timed out waiting for agents to finish")
                 return False
-            logger_cli.info("-> Sleeping for {:.2f}s".format(diff/3))
-            sleep(diff/3)
+            else:
+                logger_cli.info("-> Sleeping for {:.2f}s".format(2))
+                sleep(2)
             if diff <= 0.1:
                 logger_cli.info("-> Timed out waiting for agents to finish")
                 return False
@@ -292,12 +355,17 @@
         else:
             logger_cli.info("-> Finished testrun")
             # Get results for each agent
-            self.collect_results()
+            self.collect_results(options)
             return True
 
-    def _wait_ceph_cooldown(self):
+    def wait_ceph_cooldown(self):
         # TODO: Query Ceph ince a 20 sec to make sure its load dropped
 
+        # get ceph idle status
+        self.ceph_idle_status = self.ceph_info.get_cluster_status()
+        self.health_detail = self.ceph_info.get_health_detail()
+        self.ceph_df = self.ceph_info.get_ceph_df()
+        self.ceph_pg_dump = self.ceph_info.get_ceph_pg_dump()
         return
 
     def run_benchmark(self, options):
@@ -309,7 +377,9 @@
 
         # Make sure that Ceph is at low load
         # TODO: Ceph status check
-        self._wait_ceph_cooldown()
+        # self._wait_ceph_cooldown()
+
+        _get_df = self.ceph_info.get_ceph_osd_df
 
         # Do benchmark according to mode
         if self.mode == "tasks":
@@ -321,6 +391,8 @@
             # take next task
             _total_tasks = len(self.tasks)
             for idx in range(_total_tasks):
+                # init time to schedule
+                _osd_df_before = _get_df()
                 _task = self.tasks[idx]
                 logger_cli.info(
                     "-> Starting next task ({}/{})".format(idx+1, _total_tasks)
@@ -333,18 +405,36 @@
                 )
                 # update options
                 options.update(_task)
-                # init time to schedule
-                options["scheduled_to"] = self._get_next_scheduled_time()
+                _sch_time = self._get_next_scheduled_time()
+                options["scheduled_to"] = _sch_time
+                # init results table
+                self.results[_sch_time] = {
+                    "input_options": options,
+                    "agents": {},
+                    "osd_df_before": _osd_df_before
+                }
                 if not self._do_testrun(options):
                     return False
+                else:
+                    self.results[_sch_time]["osd_df_after"] = _get_df()
 
-                self._wait_ceph_cooldown()
+                self.wait_ceph_cooldown()
         elif self.mode == "single":
             logger_cli.info("# Running single benchmark")
+            _osd_df_before = _get_df()
             # init time to schedule
-            options["scheduled_to"] = self._get_next_scheduled_time()
+            _sch_time = self._get_next_scheduled_time()
+            options["scheduled_to"] = _sch_time
+            # init results table
+            self.results[_sch_time] = {
+                "input_options": options,
+                "agents": {},
+                "osd_df_before": _osd_df_before
+            }
             if not self._do_testrun(options):
                 return False
+            else:
+                self.results[_sch_time]["osd_df_after"] = _get_df()
         else:
             logger_cli.error("ERROR: Unknown mode '{}'".format(self.mode))
             return False
@@ -354,6 +444,7 @@
         return True
 
     def cleanup(self):
+        logger_cli.info("# Cleaning up")
         self.cleanup_list.reverse()
 
         for _res in self.cleanup_list:
@@ -385,39 +476,179 @@
 
         return
 
-    def collect_results(self):
+    def collect_results(self, options):
+        _sch_time = options["scheduled_to"]
         logger_cli.info("# Collecting results")
         # query agents for results
         _agents = self.get_agents_resultlist()
+        # Syntax shortcut
+        _ar = self.results[_sch_time]["agents"]
 
         for _agent, _l in _agents.items():
-            _list = _l["resultlist"]
-            _new = [r for r in _list if r not in self.results[_agent]["list"]]
-            logger_cli.debug(
-                "... agent '{}' has {} new results".format(_agent, len(_new))
-            )
-            #  get all new results
-            for _time in _new:
-                logger_cli.info(
-                    "-> loading results for '{}' from '{}'".format(
-                        _time,
-                        _agent
+            # Create a syntax shortcut
+            if _agent not in _ar:
+                _ar[_agent] = {}
+            _arl = _ar[_agent]
+            # Check if we already have this locally
+            for _time in _l["resultlist"]:
+                _filename = self._get_dump_filename(_sch_time, _agent, options)
+                if os.path.exists(_filename):
+                    # There is a file already for this task
+                    # Check if we need to load it
+                    if _sch_time in _arl:
+                        logger_cli.info(
+                            "-> Skipped already processed result '{}'".format(
+                                _filename
+                            )
+                        )
+                    else:
+                        # Load previously dumped result from disk
+                        logger_cli.info(
+                            "-> Loading already present result '{}'".format(
+                                _filename
+                            )
+                        )
+                        _arl[_sch_time] = self.load_dumped_result(_filename)
+                else:
+                    # Load result add it locally and dump it
+                    logger_cli.info(
+                        "-> Getting results for '{}' from '{}'".format(
+                            _sch_time,
+                            _agent
+                        )
                     )
-                )
-                self.results[_agent]["list"].update(
-                    self.get_result_from_agent(_agent, _time)
-                )
+                    _r = self.get_result_from_agent(_agent, _time)
+                    # Important to switch from result status time
+                    # to scheduled time
+                    _arl[_sch_time] = _r[_time]
+                    # Dump collected result
+                    self.dump_result(_filename, _arl[_sch_time])
         return
 
-    def dump_results(self, path):
-        # Function dumps all availabkle results as jsons to the given path
+    def _get_dump_filename(self, _time, agent, options):
+        _dirname = _reformat_timestr(_time)
+        _filename = "-".join([
+            _dirname,
+            agent,
+            options["readwrite"],
+            options["bs"],
+            str(options["iodepth"]),
+        ]) + ".json"
+        return os.path.join(
+            self.results_dump_path,
+            _dirname,
+            _filename
+        )
+
+    def dump_result(self, filename, data):
+        # Function dumps all available results as jsons to the given path
         # overwriting if needed
-
-        # TODO: Conduct the dumping
-
+        _folder, _file = os.path.split(filename)
+        # Do dump
+        if not os.path.exists(_folder):
+            os.mkdir(_folder)
+            logger_cli.info("-> Created folder '{}'".format(_folder))
+        # Dump agent data for this test run
+        write_str_to_file(filename, json.dumps(data, indent=2))
+        logger_cli.info("-> Dumped '{}'".format(filename))
         return
 
+    def load_dumped_result(self, filename):
+        try:
+            with open(filename, "rt+") as f:
+                return json.loads(f.read())
+        except FileNotFoundError as e:
+            logger_cli.error(
+                "ERROR: {}".format(e)
+            )
+        except TypeError as e:
+            logger_cli.error(
+                "ERROR: Invalid file ({}): {}".format(filename, e)
+            )
+        except json.decoder.JSONDecodeError as e:
+            logger_cli.error(
+                "ERROR: Failed to decode json ({}): {}".format(filename, e)
+            )
+        return None
+
+    def _lookup_storage_class_id_by_name(self, storage_class_name):
+        # Assume that self had proper data
+        for _pool in self.ceph_df["pools"]:
+            if storage_class_name == _pool["name"]:
+                return _pool["id"]
+        return None
+
+    def calculate_totals(self):
+        # Calculate totals for Read and Write
+        for _time, data in self.results.items():
+            if "totals" not in data:
+                data["totals"] = {}
+            else:
+                continue
+            _totals = data["totals"]
+            _r_bw = 0
+            _r_avglat = []
+            _r_iops = 0
+            _w_bw = 0
+            _w_avglat = []
+            _w_iops = 0
+            for _a, _d in data["agents"].items():
+                # Hardcoded number of jobs param :(
+                _j = _d[_time]["jobs"][0]
+                _r_bw += _j["read"]["bw_bytes"]
+                _r_avglat += [_j["read"]["lat_ns"]["mean"]]
+                _r_iops += _j["read"]["iops"]
+                _w_bw += _j["write"]["bw_bytes"]
+                _w_avglat += [_j["write"]["lat_ns"]["mean"]]
+                _w_iops += _j["write"]["iops"]
+                # Save storage class name
+                if "storage_class" not in _totals:
+                    _totals["storage_class"] = \
+                        self.agent_results[_a]["storage_class"]
+                    # Lookup storage class id and num_pg
+                    _totals["storage_class_stats"] = \
+                        reporter.get_pool_stats_by_id(
+                            self._lookup_storage_class_id_by_name(
+                                self.agent_results[_a]["storage_class"]
+                            ),
+                            self.ceph_pg_dump
+                        )
+
+            _totals["read_bw_bytes"] = _r_bw
+            _totals["read_avg_lat_us"] = \
+                (sum(_r_avglat) / len(_r_avglat)) / 1000
+            _totals["read_iops"] = _r_iops
+            _totals["write_bw_bytes"] = _w_bw
+            _totals["write_avg_lat_us"] = \
+                (sum(_w_avglat) / len(_w_avglat)) / 1000
+            _totals["write_iops"] = _w_iops
+
     # Create report
     def create_report(self, filename):
+        """
+        Create static html showing ceph info report
+
+        :return: none
+        """
+        logger_cli.info("### Generating report to '{}'".format(filename))
+        _report = reporter.ReportToFile(
+            reporter.HTMLCephBench(self),
+            filename
+        )
+        self.calculate_totals()
+        _report(
+            {
+                "results": self.results,
+                "idle_status": self.ceph_idle_status,
+                "health_detail": self.health_detail,
+                "ceph_df": self.ceph_df,
+                "ceph_pg_dump": self.ceph_pg_dump,
+                "info": self.ceph_info.ceph_info,
+                "cluster": self.ceph_info.cluster_info,
+                "ceph_version": self.ceph_info.ceph_version,
+                "nodes": self.agent_pods
+            }
+        )
+        logger_cli.info("-> Done")
 
         return