cfg-checker bench part 1

 - first single test debug portion
 - updated fio option extraction
 - updated date interaction
 - fixed agent error showing and handling

 Related-PROD: PROD-36669

Change-Id: I7c1014c01b5b84429f112bff8db5ad34944c4644
diff --git a/cfg_checker/modules/ceph/__init__.py b/cfg_checker/modules/ceph/__init__.py
index 495e9b1..9d0f923 100644
--- a/cfg_checker/modules/ceph/__init__.py
+++ b/cfg_checker/modules/ceph/__init__.py
@@ -1,3 +1,4 @@
+from cfg_checker.agent.fio_runner import get_fio_options
 from cfg_checker.common import logger_cli
 from cfg_checker.common.settings import ENV_TYPE_KUBE
 from cfg_checker.helpers import args_utils
@@ -71,6 +72,26 @@
         metavar='ceph_tasks_filename',
         help="List file with data for Ceph bench testrun"
     )
+    ceph_bench_parser.add_argument(
+        '--agents',
+        type=int, metavar='agent_count', default=5,
+        help="List file with data for Ceph bench testrun"
+    )
+    ceph_bench_parser.add_argument(
+        '--html',
+        metavar='ceph_html_filename',
+        help="HTML filename to save report"
+    )
+    ceph_bench_parser.add_argument(
+        '--storage-class',
+        metavar='storage_class',
+        help="Storage class to be used in benchmark"
+    )
+    ceph_bench_parser.add_argument(
+        '--task-file',
+        metavar='task-file',
+        help="Task file for benchmark"
+    )
 
     return _parser
 
@@ -123,21 +144,45 @@
 
 def do_bench(args, config):
     # Ceph Benchmark using multiple pods
-    # Prepare the tasks and do synced testrun
-    # TODO: html option to create a fancy report
+    # Prepare the tasks and do synced testrun or a single one
+    logger_cli.info("# Initializing benchmark run")
     args_utils.check_supported_env(ENV_TYPE_KUBE, args, config)
+    _filename = args_utils.get_arg(args, 'html')
+    # agents count option
+    _agents = args_utils.get_arg(args, "agents")
+    logger_cli.info("-> using {} agents".format(_agents))
+    config.bench_agent_count = _agents
+    # storage class
+    _storage_class = args_utils.get_arg(args, "storage_class")
+    logger_cli.info("-> using storage class of '{}'".format(_storage_class))
+    config.bench_storage_class = _storage_class
+    # Task files or options
+    _task_file = args_utils.get_arg(args, "task_file", nofail=True)
+    if not _task_file:
+        logger_cli.info("-> running single run")
+        config.bench_mode = "single"
+    else:
+        logger_cli.info("-> running with tasks from '{}'".format(_task_file))
+        config.bench_task_file = _task_file
+        config.bench_mode = "tasks"
+    _opts = get_fio_options()
+    logger_cli.debug("... default/selected options for fio:")
+    for _k in _opts.keys():
+        # TODO: Update options for single run
+        logger_cli.debug("    {} = {}".format(_k, _opts[_k]))
 
+    # handle option inavailability
     ceph_bench = bench.KubeCephBench(config)
 
-    logger_cli.error("ERROR: To be implemented...")
-
     # Load tasks
 
     # Do the testrun
-    ceph_bench.prepare_pods()
-    ceph_bench.run_benchmark()
+    ceph_bench.prepare_agents(_opts)
+    if not ceph_bench.run_benchmark(_opts):
+        return
+    ceph_bench.cleanup()
 
     # Create report
-    ceph_bench.create_report()
+    ceph_bench.create_report(_filename)
 
     return
diff --git a/cfg_checker/modules/ceph/bench.py b/cfg_checker/modules/ceph/bench.py
index 28c7929..190076a 100644
--- a/cfg_checker/modules/ceph/bench.py
+++ b/cfg_checker/modules/ceph/bench.py
@@ -1,48 +1,228 @@
+import csv
+import os
+import json
+
+from datetime import datetime, timedelta
+from time import sleep
+
 from cfg_checker.common import logger_cli
 # from cfg_checker.common.exception import InvalidReturnException
 # from cfg_checker.common.exception import ConfigException
 # from cfg_checker.common.exception import KubeException
 
 from cfg_checker.nodes import KubeNodes
+from cfg_checker.agent.fio_runner import _get_seconds, _datetime_fmt
 
 
 class CephBench(object):
-    def __info__(
-        self,
-        config
-    ):
+    _agent_template = "cfgagent-template.yaml"
+
+    def __init__(self, config):
         self.env_config = config
         return
 
-    def prepare_pods(self):
-
-        return
-
-    def run_benchmark(self):
-
-        return
-
-    # Create report
-    def create_report(self):
-
-        return
-
 
 class SaltCephBench(CephBench):
     def __init__(
         self,
         config
     ):
-        logger_cli.error("ERROR: Not impelented for Sale environment!")
+        logger_cli.error("ERROR: Not impelented for Salt environment!")
 
         # self.master = SaltNodes(config)
-        super(SaltCephBench, self).__init__(
-            config
-        )
+        super(SaltCephBench, self).__init__(config)
         return
 
 
 class KubeCephBench(CephBench):
     def __init__(self, config):
+        self.agent_count = config.bench_agent_count
         self.master = KubeNodes(config)
         super(KubeCephBench, self).__init__(config)
+        self.storage_class = config.bench_storage_class
+        self.agent_pods = []
+        self.services = []
+        self.api_urls = []
+        self.mode = config.bench_mode
+        if config.bench_mode == "tasks":
+            self.load_tasks(config.bench_task_file)
+
+    def load_tasks(self, taskfile):
+        # Load csv file
+        logger_cli.info("-> Loading taskfile '{}'".format(taskfile))
+        self.tasks = []
+        with open(taskfile) as f:
+            _reader = csv.reader(f, delimiter=',')
+            # load packages
+            for row in _reader:
+                self.tasks.append({
+                    "readwrite": row[0],
+                    "rwmixread": row[1],
+                    "bs": row[2],
+                    "iodepth": row[3],
+                    "size": row[4]
+                })
+
+    def prepare_agents(self, options):
+        logger_cli.info("# Preparing {} agents".format(self.agent_count))
+        for idx in range(self.agent_count):
+            # create pvc/pv and pod
+            logger_cli.info("-> creating agent '{:02}'".format(idx))
+            _agent, _pv, _pvc = self.master.prepare_benchmark_agent(
+                idx,
+                os.path.split(options["filename"])[0],
+                self.storage_class,
+                options['size'] + 'i',
+                self._agent_template
+            )
+            # save it to list
+            self.agent_pods.append(_agent)
+            # expose it
+            _svc = self.master.expose_benchmark_agent(_agent)
+            # Save service
+            self.services.append(_svc)
+
+        # Build urls for agents
+        for svc in self.services:
+            self.api_urls.append(
+                "http://{}:{}/api/".format(
+                    svc.spec.cluster_ip,
+                    8765
+                )
+            )
+        logger_cli.info("-> Done creating agents")
+        return
+
+    def _poke_agent(self, url, body, action="GET"):
+        _datafile = "/tmp/data"
+        _data = [
+            "--data-binary",
+            "@" + _datafile,
+        ]
+        _cmd = [
+            "curl",
+            "-s",
+            "-H",
+            "'Content-Type: application/json'",
+            "-X",
+            action,
+            url
+        ]
+        if body:
+            _cmd += _data
+            _ret = self.master.prepare_json_in_pod(
+                self.agent_pods[0].metadata.name,
+                self.master._namespace,
+                body,
+                _datafile
+            )
+        _ret = self.master.exec_cmd_on_target_pod(
+            self.agent_pods[0].metadata.name,
+            self.master._namespace,
+            " ".join(_cmd)
+        )
+        try:
+            return json.loads(_ret)
+        except TypeError as e:
+            logger_cli.error(
+                "ERROR: Status not decoded: {}\n{}".format(e, _ret)
+            )
+        except json.decoder.JSONDecodeError as e:
+            logger_cli.error(
+                "ERROR: Status not decoded: {}\n{}".format(e, _ret)
+            )
+
+        return None
+
+    def run_benchmark(self, options):
+        def get_status():
+            return [self._poke_agent(_u + "fio", {}) for _u in self.api_urls]
+        logger_cli.info("# Starting '{}' benchmark".format(self.mode))
+        logger_cli.info("# Checking agents")
+        # make sure agents idle
+        _tt = []
+        _rr = []
+        for _s in get_status():
+            if _s is None:
+                logger_cli.error("ERROR: Agent status not available")
+                return False
+            _h = _s["healthcheck"]["hostname"]
+            _t = _s['status']
+            _r = _s["healthcheck"]["ready"]
+            if _t != "idle":
+                logger_cli.error("Agent status invalid {}:{}".format(_h, _t))
+                _tt += [False]
+            else:
+                _tt += [True]
+            if not _r:
+                logger_cli.error("Agent is not ready {}".format(_h))
+                _rr += [False]
+            else:
+                _rr += [True]
+        if not any(_tt) or not any(_rr):
+            return False
+
+        # Make sure that Ceph is at low load
+        # TODO: Ceph status check
+
+        # Do benchmark according to mode
+        if self.mode == "tasks":
+            # TODO: Implement 'tasks' mode
+            # take next task
+
+            # update options
+
+            # init time to schedule
+
+            # send next task to agent
+            pass
+            # wait for agents to finish
+        elif self.mode == "single":
+            logger_cli.info("# Running benchmark")
+            # init time to schedule
+            _time = datetime.now() + timedelta(seconds=10)
+            _str_time = _time.strftime(_datetime_fmt)
+            options["scheduled_to"] = _str_time
+            logger_cli.info(
+                "-> next benchmark scheduled to '{}'".format(_str_time)
+            )
+            # send single to agent
+            _task = {
+                "module": "fio",
+                "action": "do_singlerun",
+                "options": options
+            }
+            for _u in self.api_urls:
+                logger_cli.info("-> sending task to '{}'".format(_u))
+                _ret = self._poke_agent(_u, _task, action="POST")
+                if 'error' in _ret:
+                    logger_cli.error(
+                        "ERROR: Agent returned: '{}'".format(_ret['error'])
+                    )
+
+        _runtime = _get_seconds(options["runtime"])
+        _ramptime = _get_seconds(options["ramp_time"])
+        _timeout = _runtime + _ramptime + 5
+        while _timeout > 0:
+            _sts = get_status()
+            _str = ""
+            for _s in _sts:
+                _str += "{}: {} ({}); ".format(
+                    _s["healthcheck"]["hostname"],
+                    _s["status"],
+                    _s["progress"]
+                )
+            logger_cli.debug("... {}".format(_str))
+            sleep(1)
+            _timeout -= 1
+
+        return True
+
+    def cleanup(self):
+
+        return
+
+    # Create report
+    def create_report(self):
+
+        return