cfg-checker bench part 1

 - first single test debug portion
 - updated fio option extraction
 - updated date interaction
 - fixed agent error showing and handling

 Related-PROD: PROD-36669

Change-Id: I7c1014c01b5b84429f112bff8db5ad34944c4644
diff --git a/cfg_checker/modules/ceph/bench.py b/cfg_checker/modules/ceph/bench.py
index 28c7929..190076a 100644
--- a/cfg_checker/modules/ceph/bench.py
+++ b/cfg_checker/modules/ceph/bench.py
@@ -1,48 +1,228 @@
+import csv
+import os
+import json
+
+from datetime import datetime, timedelta
+from time import sleep
+
 from cfg_checker.common import logger_cli
 # from cfg_checker.common.exception import InvalidReturnException
 # from cfg_checker.common.exception import ConfigException
 # from cfg_checker.common.exception import KubeException
 
 from cfg_checker.nodes import KubeNodes
+from cfg_checker.agent.fio_runner import _get_seconds, _datetime_fmt
 
 
 class CephBench(object):
-    def __info__(
-        self,
-        config
-    ):
+    _agent_template = "cfgagent-template.yaml"
+
+    def __init__(self, config):
         self.env_config = config
         return
 
-    def prepare_pods(self):
-
-        return
-
-    def run_benchmark(self):
-
-        return
-
-    # Create report
-    def create_report(self):
-
-        return
-
 
 class SaltCephBench(CephBench):
     def __init__(
         self,
         config
     ):
-        logger_cli.error("ERROR: Not impelented for Sale environment!")
+        logger_cli.error("ERROR: Not impelented for Salt environment!")
 
         # self.master = SaltNodes(config)
-        super(SaltCephBench, self).__init__(
-            config
-        )
+        super(SaltCephBench, self).__init__(config)
         return
 
 
 class KubeCephBench(CephBench):
     def __init__(self, config):
+        self.agent_count = config.bench_agent_count
         self.master = KubeNodes(config)
         super(KubeCephBench, self).__init__(config)
+        self.storage_class = config.bench_storage_class
+        self.agent_pods = []
+        self.services = []
+        self.api_urls = []
+        self.mode = config.bench_mode
+        if config.bench_mode == "tasks":
+            self.load_tasks(config.bench_task_file)
+
+    def load_tasks(self, taskfile):
+        # Load csv file
+        logger_cli.info("-> Loading taskfile '{}'".format(taskfile))
+        self.tasks = []
+        with open(taskfile) as f:
+            _reader = csv.reader(f, delimiter=',')
+            # load packages
+            for row in _reader:
+                self.tasks.append({
+                    "readwrite": row[0],
+                    "rwmixread": row[1],
+                    "bs": row[2],
+                    "iodepth": row[3],
+                    "size": row[4]
+                })
+
+    def prepare_agents(self, options):
+        logger_cli.info("# Preparing {} agents".format(self.agent_count))
+        for idx in range(self.agent_count):
+            # create pvc/pv and pod
+            logger_cli.info("-> creating agent '{:02}'".format(idx))
+            _agent, _pv, _pvc = self.master.prepare_benchmark_agent(
+                idx,
+                os.path.split(options["filename"])[0],
+                self.storage_class,
+                options['size'] + 'i',
+                self._agent_template
+            )
+            # save it to list
+            self.agent_pods.append(_agent)
+            # expose it
+            _svc = self.master.expose_benchmark_agent(_agent)
+            # Save service
+            self.services.append(_svc)
+
+        # Build urls for agents
+        for svc in self.services:
+            self.api_urls.append(
+                "http://{}:{}/api/".format(
+                    svc.spec.cluster_ip,
+                    8765
+                )
+            )
+        logger_cli.info("-> Done creating agents")
+        return
+
+    def _poke_agent(self, url, body, action="GET"):
+        _datafile = "/tmp/data"
+        _data = [
+            "--data-binary",
+            "@" + _datafile,
+        ]
+        _cmd = [
+            "curl",
+            "-s",
+            "-H",
+            "'Content-Type: application/json'",
+            "-X",
+            action,
+            url
+        ]
+        if body:
+            _cmd += _data
+            _ret = self.master.prepare_json_in_pod(
+                self.agent_pods[0].metadata.name,
+                self.master._namespace,
+                body,
+                _datafile
+            )
+        _ret = self.master.exec_cmd_on_target_pod(
+            self.agent_pods[0].metadata.name,
+            self.master._namespace,
+            " ".join(_cmd)
+        )
+        try:
+            return json.loads(_ret)
+        except TypeError as e:
+            logger_cli.error(
+                "ERROR: Status not decoded: {}\n{}".format(e, _ret)
+            )
+        except json.decoder.JSONDecodeError as e:
+            logger_cli.error(
+                "ERROR: Status not decoded: {}\n{}".format(e, _ret)
+            )
+
+        return None
+
+    def run_benchmark(self, options):
+        def get_status():
+            return [self._poke_agent(_u + "fio", {}) for _u in self.api_urls]
+        logger_cli.info("# Starting '{}' benchmark".format(self.mode))
+        logger_cli.info("# Checking agents")
+        # make sure agents idle
+        _tt = []
+        _rr = []
+        for _s in get_status():
+            if _s is None:
+                logger_cli.error("ERROR: Agent status not available")
+                return False
+            _h = _s["healthcheck"]["hostname"]
+            _t = _s['status']
+            _r = _s["healthcheck"]["ready"]
+            if _t != "idle":
+                logger_cli.error("Agent status invalid {}:{}".format(_h, _t))
+                _tt += [False]
+            else:
+                _tt += [True]
+            if not _r:
+                logger_cli.error("Agent is not ready {}".format(_h))
+                _rr += [False]
+            else:
+                _rr += [True]
+        if not any(_tt) or not any(_rr):
+            return False
+
+        # Make sure that Ceph is at low load
+        # TODO: Ceph status check
+
+        # Do benchmark according to mode
+        if self.mode == "tasks":
+            # TODO: Implement 'tasks' mode
+            # take next task
+
+            # update options
+
+            # init time to schedule
+
+            # send next task to agent
+            pass
+            # wait for agents to finish
+        elif self.mode == "single":
+            logger_cli.info("# Running benchmark")
+            # init time to schedule
+            _time = datetime.now() + timedelta(seconds=10)
+            _str_time = _time.strftime(_datetime_fmt)
+            options["scheduled_to"] = _str_time
+            logger_cli.info(
+                "-> next benchmark scheduled to '{}'".format(_str_time)
+            )
+            # send single to agent
+            _task = {
+                "module": "fio",
+                "action": "do_singlerun",
+                "options": options
+            }
+            for _u in self.api_urls:
+                logger_cli.info("-> sending task to '{}'".format(_u))
+                _ret = self._poke_agent(_u, _task, action="POST")
+                if 'error' in _ret:
+                    logger_cli.error(
+                        "ERROR: Agent returned: '{}'".format(_ret['error'])
+                    )
+
+        _runtime = _get_seconds(options["runtime"])
+        _ramptime = _get_seconds(options["ramp_time"])
+        _timeout = _runtime + _ramptime + 5
+        while _timeout > 0:
+            _sts = get_status()
+            _str = ""
+            for _s in _sts:
+                _str += "{}: {} ({}); ".format(
+                    _s["healthcheck"]["hostname"],
+                    _s["status"],
+                    _s["progress"]
+                )
+            logger_cli.debug("... {}".format(_str))
+            sleep(1)
+            _timeout -= 1
+
+        return True
+
+    def cleanup(self):
+
+        return
+
+    # Create report
+    def create_report(self):
+
+        return