cfg-checker ceph bench module part 4
- benchmark code refactoring/cleaning
- using UTC timezone for scheduling
- 'tasks' mode first version
- updated agent poking response structure
Related-PROD: PROD-36669
Change-Id: I8bf9d9b4d54a41cc04f7765e17efb675028c6262
diff --git a/cfg_checker/modules/ceph/__init__.py b/cfg_checker/modules/ceph/__init__.py
index 62941d5..0f1de01 100644
--- a/cfg_checker/modules/ceph/__init__.py
+++ b/cfg_checker/modules/ceph/__init__.py
@@ -176,17 +176,17 @@
# TODO: Update options for single run
logger_cli.debug(" {} = {}".format(_k, _opts[_k]))
- # handle option inavailability
+ # handle option inavailability from command line for single mode
+
+ # init the Bench class
ceph_bench = bench.KubeCephBench(config)
-
- # Load tasks
-
# Do the testrun
ceph_bench.prepare_agents(_opts)
if not ceph_bench.run_benchmark(_opts):
# No cleaning and/or report if benchmark was not finished
logger_cli.info("# Abnormal benchmark run, no cleaning performed")
return
+ # Cleaning
if not config.no_cleaning_after_benchmark:
ceph_bench.cleanup()
else:
diff --git a/cfg_checker/modules/ceph/bench.py b/cfg_checker/modules/ceph/bench.py
index 5fa4cfd..7640440 100644
--- a/cfg_checker/modules/ceph/bench.py
+++ b/cfg_checker/modules/ceph/bench.py
@@ -2,10 +2,11 @@
import os
import json
-from datetime import datetime, timedelta
+from datetime import datetime, timedelta, timezone
from time import sleep
from cfg_checker.common import logger_cli
+from cfg_checker.common.decorators import retry
from cfg_checker.helpers.console_utils import Progress
# from cfg_checker.common.exception import InvalidReturnException
# from cfg_checker.common.exception import ConfigException
@@ -43,12 +44,14 @@
self.storage_class = config.bench_storage_class
self.agent_pods = []
self.services = []
- self.api_urls = []
+ self.scheduled_delay = 30
self.mode = config.bench_mode
if config.bench_mode == "tasks":
- self.load_tasks(config.bench_task_file)
+ self.taskfile = config.bench_task_file
+ self.load_tasks(self.taskfile)
self.cleanup_list = []
+ self.results = {}
def load_tasks(self, taskfile):
# Load csv file
@@ -94,15 +97,19 @@
self.add_for_deletion(_svc, "svc")
# Save service
self.services.append(_svc)
-
- # Build urls for agents
- for svc in self.services:
- self.api_urls.append(
+ # prepopulate results
+ self.results[_agent.metadata.name] = {}
+ self.results[_agent.metadata.name]["list"] = {}
+ self.results[_agent.metadata.name]["url"] = \
"http://{}:{}/api/".format(
- svc.spec.cluster_ip,
+ _svc.spec.cluster_ip,
8765
)
- )
+ self.results[_agent.metadata.name]["storage_class"] = \
+ self.storage_class
+ self.results[_agent.metadata.name]["volume_size"] = \
+ options['size']
+
logger_cli.info("-> Done creating agents")
return
@@ -147,98 +154,203 @@
return None
- def run_benchmark(self, options):
- def get_status():
- return [self._poke_agent(_u + "fio", {}) for _u in self.api_urls]
- logger_cli.info("# Starting '{}' benchmark".format(self.mode))
- logger_cli.info("# Checking agents")
+ def _ensure_agents_ready(self):
# make sure agents idle
- _tt = []
- _rr = []
- for _s in get_status():
- if _s is None:
+ _status_set = []
+ _ready_set = []
+ for _agent, _d in self.get_agents_status().items():
+ # obviously, there should be some answer
+ if _d is None:
logger_cli.error("ERROR: Agent status not available")
return False
- _h = _s["healthcheck"]["hostname"]
- _t = _s['status']
- _r = _s["healthcheck"]["ready"]
- if _t not in ["idle", "finished"]:
- logger_cli.error("Agent status invalid {}:{}".format(_h, _t))
- _tt += [False]
+ # status should be idle or finished
+ if _d['status'] not in ["idle", "finished"]:
+ logger_cli.error(
+ "Agent status invalid {}:{}".format(_agent, _d['status'])
+ )
+ _status_set += [False]
else:
- _tt += [True]
- if not _r:
- logger_cli.error("Agent is not ready {}".format(_h))
- _rr += [False]
+ # Good agent
+ _status_set += [True]
+ # agent's fio shell should be in 'ready'
+ if not _d["healthcheck"]["ready"]:
+ logger_cli.error("Agent is not ready {}".format(_agent))
+ _ready_set += [False]
else:
- _rr += [True]
- if not any(_tt) or not any(_rr):
+ # 'fio' shell for agent is ready
+ _ready_set += [True]
+ # all agent's statuses should be True
+ # and all 'fio' shell modules should be 'ready'
+ if not any(_status_set) or not any(_ready_set):
+ # At least one is not ready and it was already logged above
+ return False
+ else:
+ # All is good
+ return True
+
+ def get_agents_status(self):
+ _status = {}
+ for _agent, _d in self.results.items():
+ _status[_agent] = self._poke_agent(_d["url"] + "fio", {})
+ return _status
+
+ def get_agents_resultlist(self):
+ _t = {"module": "fio", "action": "get_resultlist"}
+ _status = {}
+ for _agent, _d in self.results.items():
+ _status[_agent] = self._poke_agent(_d["url"], _t, action="POST")
+ return _status
+
+ @retry(Exception)
+ def get_result_from_agent(self, agent, time):
+ _t = {
+ "module": "fio",
+ "action": "get_result",
+ "options": {
+ "time": time
+ }
+ }
+ return self._poke_agent(self.results[agent]["url"], _t, action="POST")
+
+ def _get_next_scheduled_time(self):
+ _now = datetime.now(timezone.utc)
+ logger_cli.info("-> time is '{}'".format(_now.strftime(_datetime_fmt)))
+ _time = _now + timedelta(seconds=self.scheduled_delay)
+ _str_time = _time.strftime(_datetime_fmt)
+ logger_cli.info(
+ "-> next benchmark scheduled to '{}'".format(_str_time)
+ )
+ return _str_time
+
+ def _send_scheduled_task(self, options):
+ _task = {
+ "module": "fio",
+ "action": "do_scheduledrun",
+ "options": options
+ }
+ for _agent, _d in self.results.items():
+ logger_cli.info(
+ "-> sending task to '{}:{}'".format(_agent, _d["url"])
+ )
+ _ret = self._poke_agent(_d["url"], _task, action="POST")
+ if 'error' in _ret:
+ logger_cli.error(
+ "ERROR: Agent returned: '{}'".format(_ret['error'])
+ )
+ return False
+ # No errors detected
+ return True
+
+ def track_benchmark(self, options):
+ _runtime = _get_seconds(options["runtime"])
+ _ramptime = _get_seconds(options["ramp_time"])
+ # Sum up all timings that we must wait and double it
+ _timeout = (self.scheduled_delay + _runtime + _ramptime) * 2
+ _end = datetime.now(timezone.utc) + timedelta(seconds=_timeout)
+ while True:
+ # Print status
+ # TODO: do pooled status get
+ _sts = self.get_agents_status()
+ diff = (_end - datetime.now(timezone.utc)).total_seconds()
+ logger_cli.info("-> {:.2f}s left. Agent status:".format(diff))
+ for _agent, _status in _sts.items():
+ logger_cli.info(
+ "\t{}: {} ({}%)".format(
+ _agent,
+ _status["status"],
+ _status["progress"]
+ )
+ )
+ finished = [True for _s in _sts.values()
+ if _s["status"] == 'finished']
+ _fcnt = len(finished)
+ _tcnt = len(_sts)
+ if _fcnt < _tcnt:
+ logger_cli.info("-> {}/{} finished".format(_fcnt, _tcnt))
+ else:
+ logger_cli.info("-> All agents finished run")
+ return True
+ # recalc how much is left
+ diff = (_end - datetime.now(timezone.utc)).total_seconds()
+ # In case end_datetime was in past to begin with
+ if diff < 0:
+ logger_cli.info("-> Timed out waiting for agents to finish")
+ return False
+ logger_cli.info("-> Sleeping for {:.2f}s".format(diff/3))
+ sleep(diff/3)
+ if diff <= 0.1:
+ logger_cli.info("-> Timed out waiting for agents to finish")
+ return False
+
+ def _do_testrun(self, options):
+ # send single to agent
+ if not self._send_scheduled_task(options):
+ return False
+ # Track this benchmark progress
+ if not self.track_benchmark(options):
+ return False
+ else:
+ logger_cli.info("-> Finished testrun")
+ # Get results for each agent
+ self.collect_results()
+ return True
+
+ def _wait_ceph_cooldown(self):
+ # TODO: Query Ceph ince a 20 sec to make sure its load dropped
+
+ return
+
+ def run_benchmark(self, options):
+ logger_cli.info("# Starting '{}' benchmark".format(self.mode))
+ # Check agent readyness
+ logger_cli.info("# Checking agents")
+ if not self._ensure_agents_ready():
return False
# Make sure that Ceph is at low load
# TODO: Ceph status check
+ self._wait_ceph_cooldown()
# Do benchmark according to mode
if self.mode == "tasks":
- # TODO: Implement 'tasks' mode
- # take next task
-
- # update options
-
- # init time to schedule
-
- # send next task to agent
- pass
- # wait for agents to finish
- elif self.mode == "single":
- logger_cli.info("# Running benchmark")
- # init time to schedule
- _time = datetime.now() + timedelta(seconds=10)
- _str_time = _time.strftime(_datetime_fmt)
- options["scheduled_to"] = _str_time
logger_cli.info(
- "-> next benchmark scheduled to '{}'".format(_str_time)
- )
- # send single to agent
- _task = {
- "module": "fio",
- "action": "do_singlerun",
- "options": options
- }
- for _u in self.api_urls:
- logger_cli.info("-> sending task to '{}'".format(_u))
- _ret = self._poke_agent(_u, _task, action="POST")
- if 'error' in _ret:
- logger_cli.error(
- "ERROR: Agent returned: '{}'".format(_ret['error'])
- )
-
- _runtime = _get_seconds(options["runtime"])
- _ramptime = _get_seconds(options["ramp_time"])
- _timeout = _runtime + _ramptime + 5
- _end = datetime.now() + timedelta(seconds=_timeout)
- while True:
- # Print status
- _sts = get_status()
- _str = ""
- for _s in _sts:
- _str += "{}: {} ({}); ".format(
- _s["healthcheck"]["hostname"],
- _s["status"],
- _s["progress"]
+ "# Running benchmark with tasks from '{}'".format(
+ self.taskfile
)
- # recalc how much is left
- diff = (_end - datetime.now()).total_seconds()
- logger_cli.debug("... [{:.2f}s]: {}".format(diff, _str))
- # In case end_datetime was in past to begin with
- if diff < 0:
- break
- logger_cli.info("-> Sleeping for {:.2f}s".format(diff/2))
- sleep(diff/2)
- if diff <= 0.1:
- break
+ )
+ # take next task
+ _total_tasks = len(self.tasks)
+ for idx in range(_total_tasks):
+ _task = self.tasks[idx]
+ logger_cli.info(
+ "-> Starting next task ({}/{})".format(idx+1, _total_tasks)
+ )
+ logger_cli.info("-> Updating options with: {}".format(
+ ", ".join(
+ ["{} = {}".format(k, v) for k, v in _task.items()]
+ )
+ )
+ )
+ # update options
+ options.update(_task)
+ # init time to schedule
+ options["scheduled_to"] = self._get_next_scheduled_time()
+ if not self._do_testrun(options):
+ return False
- logger_cli.info("-> Done")
+ self._wait_ceph_cooldown()
+ elif self.mode == "single":
+ logger_cli.info("# Running single benchmark")
+ # init time to schedule
+ options["scheduled_to"] = self._get_next_scheduled_time()
+ if not self._do_testrun(options):
+ return False
+ else:
+ logger_cli.error("ERROR: Unknown mode '{}'".format(self.mode))
+ return False
+
+ # Normal exit
+ logger_cli.info("# All benchmark tasks done")
return True
def cleanup(self):
@@ -260,6 +372,7 @@
if len(_l) > 0:
_p.write_progress(_idx)
else:
+ _p.write_progress(_idx)
_p.end()
logger_cli.info("# Done cleaning up")
break
@@ -272,6 +385,38 @@
return
+ def collect_results(self):
+ logger_cli.info("# Collecting results")
+ # query agents for results
+ _agents = self.get_agents_resultlist()
+
+ for _agent, _l in _agents.items():
+ _list = _l["resultlist"]
+ _new = [r for r in _list if r not in self.results[_agent]["list"]]
+ logger_cli.debug(
+ "... agent '{}' has {} new results".format(_agent, len(_new))
+ )
+ # get all new results
+ for _time in _new:
+ logger_cli.info(
+ "-> loading results for '{}' from '{}'".format(
+ _time,
+ _agent
+ )
+ )
+ self.results[_agent]["list"].update(
+ self.get_result_from_agent(_agent, _time)
+ )
+ return
+
+ def dump_results(self, path):
+ # Function dumps all availabkle results as jsons to the given path
+ # overwriting if needed
+
+ # TODO: Conduct the dumping
+
+ return
+
# Create report
def create_report(self, filename):