cfg-checker bench part 1
- first single test debug portion
- updated fio option extraction
- updated date interaction
- fixed agent error showing and handling
Related-PROD: PROD-36669
Change-Id: I7c1014c01b5b84429f112bff8db5ad34944c4644
diff --git a/cfg_checker/modules/ceph/bench.py b/cfg_checker/modules/ceph/bench.py
index 28c7929..190076a 100644
--- a/cfg_checker/modules/ceph/bench.py
+++ b/cfg_checker/modules/ceph/bench.py
@@ -1,48 +1,228 @@
+import csv
+import os
+import json
+
+from datetime import datetime, timedelta
+from time import sleep
+
from cfg_checker.common import logger_cli
# from cfg_checker.common.exception import InvalidReturnException
# from cfg_checker.common.exception import ConfigException
# from cfg_checker.common.exception import KubeException
from cfg_checker.nodes import KubeNodes
+from cfg_checker.agent.fio_runner import _get_seconds, _datetime_fmt
class CephBench(object):
- def __info__(
- self,
- config
- ):
+ _agent_template = "cfgagent-template.yaml"
+
+ def __init__(self, config):
self.env_config = config
return
- def prepare_pods(self):
-
- return
-
- def run_benchmark(self):
-
- return
-
- # Create report
- def create_report(self):
-
- return
-
class SaltCephBench(CephBench):
def __init__(
self,
config
):
- logger_cli.error("ERROR: Not impelented for Sale environment!")
+ logger_cli.error("ERROR: Not impelented for Salt environment!")
# self.master = SaltNodes(config)
- super(SaltCephBench, self).__init__(
- config
- )
+ super(SaltCephBench, self).__init__(config)
return
class KubeCephBench(CephBench):
def __init__(self, config):
+ self.agent_count = config.bench_agent_count
self.master = KubeNodes(config)
super(KubeCephBench, self).__init__(config)
+ self.storage_class = config.bench_storage_class
+ self.agent_pods = []
+ self.services = []
+ self.api_urls = []
+ self.mode = config.bench_mode
+ if config.bench_mode == "tasks":
+ self.load_tasks(config.bench_task_file)
+
+ def load_tasks(self, taskfile):
+ # Load csv file
+ logger_cli.info("-> Loading taskfile '{}'".format(taskfile))
+ self.tasks = []
+ with open(taskfile) as f:
+ _reader = csv.reader(f, delimiter=',')
+ # load packages
+ for row in _reader:
+ self.tasks.append({
+ "readwrite": row[0],
+ "rwmixread": row[1],
+ "bs": row[2],
+ "iodepth": row[3],
+ "size": row[4]
+ })
+
+ def prepare_agents(self, options):
+ logger_cli.info("# Preparing {} agents".format(self.agent_count))
+ for idx in range(self.agent_count):
+ # create pvc/pv and pod
+ logger_cli.info("-> creating agent '{:02}'".format(idx))
+ _agent, _pv, _pvc = self.master.prepare_benchmark_agent(
+ idx,
+ os.path.split(options["filename"])[0],
+ self.storage_class,
+ options['size'] + 'i',
+ self._agent_template
+ )
+ # save it to list
+ self.agent_pods.append(_agent)
+ # expose it
+ _svc = self.master.expose_benchmark_agent(_agent)
+ # Save service
+ self.services.append(_svc)
+
+ # Build urls for agents
+ for svc in self.services:
+ self.api_urls.append(
+ "http://{}:{}/api/".format(
+ svc.spec.cluster_ip,
+ 8765
+ )
+ )
+ logger_cli.info("-> Done creating agents")
+ return
+
+ def _poke_agent(self, url, body, action="GET"):
+ _datafile = "/tmp/data"
+ _data = [
+ "--data-binary",
+ "@" + _datafile,
+ ]
+ _cmd = [
+ "curl",
+ "-s",
+ "-H",
+ "'Content-Type: application/json'",
+ "-X",
+ action,
+ url
+ ]
+ if body:
+ _cmd += _data
+ _ret = self.master.prepare_json_in_pod(
+ self.agent_pods[0].metadata.name,
+ self.master._namespace,
+ body,
+ _datafile
+ )
+ _ret = self.master.exec_cmd_on_target_pod(
+ self.agent_pods[0].metadata.name,
+ self.master._namespace,
+ " ".join(_cmd)
+ )
+ try:
+ return json.loads(_ret)
+ except TypeError as e:
+ logger_cli.error(
+ "ERROR: Status not decoded: {}\n{}".format(e, _ret)
+ )
+ except json.decoder.JSONDecodeError as e:
+ logger_cli.error(
+ "ERROR: Status not decoded: {}\n{}".format(e, _ret)
+ )
+
+ return None
+
+ def run_benchmark(self, options):
+ def get_status():
+ return [self._poke_agent(_u + "fio", {}) for _u in self.api_urls]
+ logger_cli.info("# Starting '{}' benchmark".format(self.mode))
+ logger_cli.info("# Checking agents")
+ # make sure agents idle
+ _tt = []
+ _rr = []
+ for _s in get_status():
+ if _s is None:
+ logger_cli.error("ERROR: Agent status not available")
+ return False
+ _h = _s["healthcheck"]["hostname"]
+ _t = _s['status']
+ _r = _s["healthcheck"]["ready"]
+ if _t != "idle":
+ logger_cli.error("Agent status invalid {}:{}".format(_h, _t))
+ _tt += [False]
+ else:
+ _tt += [True]
+ if not _r:
+ logger_cli.error("Agent is not ready {}".format(_h))
+ _rr += [False]
+ else:
+ _rr += [True]
+ if not any(_tt) or not any(_rr):
+ return False
+
+ # Make sure that Ceph is at low load
+ # TODO: Ceph status check
+
+ # Do benchmark according to mode
+ if self.mode == "tasks":
+ # TODO: Implement 'tasks' mode
+ # take next task
+
+ # update options
+
+ # init time to schedule
+
+ # send next task to agent
+ pass
+ # wait for agents to finish
+ elif self.mode == "single":
+ logger_cli.info("# Running benchmark")
+ # init time to schedule
+ _time = datetime.now() + timedelta(seconds=10)
+ _str_time = _time.strftime(_datetime_fmt)
+ options["scheduled_to"] = _str_time
+ logger_cli.info(
+ "-> next benchmark scheduled to '{}'".format(_str_time)
+ )
+ # send single to agent
+ _task = {
+ "module": "fio",
+ "action": "do_singlerun",
+ "options": options
+ }
+ for _u in self.api_urls:
+ logger_cli.info("-> sending task to '{}'".format(_u))
+ _ret = self._poke_agent(_u, _task, action="POST")
+ if 'error' in _ret:
+ logger_cli.error(
+ "ERROR: Agent returned: '{}'".format(_ret['error'])
+ )
+
+ _runtime = _get_seconds(options["runtime"])
+ _ramptime = _get_seconds(options["ramp_time"])
+ _timeout = _runtime + _ramptime + 5
+ while _timeout > 0:
+ _sts = get_status()
+ _str = ""
+ for _s in _sts:
+ _str += "{}: {} ({}); ".format(
+ _s["healthcheck"]["hostname"],
+ _s["status"],
+ _s["progress"]
+ )
+ logger_cli.debug("... {}".format(_str))
+ sleep(1)
+ _timeout -= 1
+
+ return True
+
+ def cleanup(self):
+
+ return
+
+ # Create report
+ def create_report(self):
+
+ return