cfg-checker ceph bench module part 4
- benchmark code refactoring/cleaning
- using UTC timezone for scheduling
- 'tasks' mode first version
- updated agent poking response structure
Related-PROD: PROD-36669
Change-Id: I8bf9d9b4d54a41cc04f7765e17efb675028c6262
diff --git a/cfg_checker/agent/fio_runner.py b/cfg_checker/agent/fio_runner.py
index 50afeca..c8488af 100644
--- a/cfg_checker/agent/fio_runner.py
+++ b/cfg_checker/agent/fio_runner.py
@@ -6,7 +6,7 @@
from copy import deepcopy
-from datetime import datetime, timedelta
+from datetime import datetime, timezone
from platform import system, release, node
from threading import Thread
import threading
@@ -18,7 +18,7 @@
from cfg_checker.common.log import logger
-_datetime_fmt = "%m/%d/%Y, %H:%M:%S"
+_datetime_fmt = "%m/%d/%Y, %H:%M:%S%z"
fio_options_common = {
"name": "agent_run",
"filename": "/cephvol/testfile",
@@ -64,7 +64,7 @@
def get_time(timestamp=None):
if not timestamp:
- _t = datetime.now()
+ _t = datetime.now(timezone.utc)
else:
_t = datetime.fromtimestamp(timestamp)
return _t.strftime(_datetime_fmt)
@@ -85,7 +85,7 @@
def wait_until(end_datetime):
while True:
- diff = (end_datetime - datetime.now()).total_seconds()
+ diff = (end_datetime - datetime.now(timezone.utc)).total_seconds()
# In case end_datetime was in past to begin with
if diff < 0:
return
@@ -297,11 +297,12 @@
self.fiorun = ShellThread(_cmd, _q)
# Check if schedule is set
if self.scheduled_datetime:
+ _now = datetime.now(timezone.utc)
logger.debug(
"waiting for '{}', now is '{}', total of {} sec left".format(
self.scheduled_datetime.strftime(_datetime_fmt),
- datetime.now().strftime(_datetime_fmt),
- (self.scheduled_datetime - datetime.now()).total_seconds()
+ _now.strftime(_datetime_fmt),
+ (self.scheduled_datetime - _now).total_seconds()
)
)
wait_until(self.scheduled_datetime)
@@ -392,7 +393,8 @@
_scheduled = False
_diff = -1
if self.scheduled_datetime:
- _diff = (self.scheduled_datetime - datetime.now()).total_seconds()
+ _now = datetime.now(timezone.utc)
+ _diff = (self.scheduled_datetime - _now).total_seconds()
if _diff > 0:
_scheduled = True
_s = "running" if _running else "idle"
@@ -575,10 +577,11 @@
_s = self.fio.status()
if _s["status"] == "scheduled":
_t = self.fio.scheduled_datetime
- _n = datetime.now()
+ _n = datetime.now(timezone.utc)
_delta = (_t - _n).total_seconds()
print(
- "waiting for '{}'; now '{}'; {} sec left".format(
+ "{}: waiting for '{}'; now '{}'; {} sec left".format(
+ _s["status"],
_t.strftime(_datetime_fmt),
_n.strftime(_datetime_fmt),
_delta
@@ -609,7 +612,8 @@
_opts["readwrite"] = "read"
_opts["ramp_time"] = "1s"
_opts["runtime"] = "5s"
- _shell.do_singlerun(_opts)
+ _opts["scheduled_to"] = "11/13/2021, 23:03:30+0000"
+ _shell.do_scheduledrun(_opts)
_shell()
_times = _shell.get_resultlist()
print("# results:\n{}".format("\n".join(_times)))
@@ -623,9 +627,7 @@
_opts["readwrite"] = "read"
_opts["ramp_time"] = "1s"
_opts["runtime"] = "10s"
- _opts["scheduled_to"] = (datetime.now() + timedelta(seconds=12)).strftime(
- _datetime_fmt
- )
+ _opts["scheduled_to"] = "11/13/2021, 23:04:20+0000"
_shell.do_scheduledrun(_opts)
_shell()
_times = _shell.get_resultlist()
diff --git a/cfg_checker/agent/webserver.py b/cfg_checker/agent/webserver.py
index 235ba1a..7f7e53a 100644
--- a/cfg_checker/agent/webserver.py
+++ b/cfg_checker/agent/webserver.py
@@ -158,6 +158,7 @@
logger.info("getting results for '{}'".format(_time))
resp.status = falcon.HTTP_200
resp.content_type = "application/json"
+ # TODO: get timeline too?
resp.text = json.dumps({_time: _a(_time)})
elif _action == 'do_singlerun':
logger.info("executing single run")
diff --git a/cfg_checker/modules/ceph/__init__.py b/cfg_checker/modules/ceph/__init__.py
index 62941d5..0f1de01 100644
--- a/cfg_checker/modules/ceph/__init__.py
+++ b/cfg_checker/modules/ceph/__init__.py
@@ -176,17 +176,17 @@
# TODO: Update options for single run
logger_cli.debug(" {} = {}".format(_k, _opts[_k]))
- # handle option inavailability
+ # handle option inavailability from command line for single mode
+
+ # init the Bench class
ceph_bench = bench.KubeCephBench(config)
-
- # Load tasks
-
# Do the testrun
ceph_bench.prepare_agents(_opts)
if not ceph_bench.run_benchmark(_opts):
# No cleaning and/or report if benchmark was not finished
logger_cli.info("# Abnormal benchmark run, no cleaning performed")
return
+ # Cleaning
if not config.no_cleaning_after_benchmark:
ceph_bench.cleanup()
else:
diff --git a/cfg_checker/modules/ceph/bench.py b/cfg_checker/modules/ceph/bench.py
index 5fa4cfd..7640440 100644
--- a/cfg_checker/modules/ceph/bench.py
+++ b/cfg_checker/modules/ceph/bench.py
@@ -2,10 +2,11 @@
import os
import json
-from datetime import datetime, timedelta
+from datetime import datetime, timedelta, timezone
from time import sleep
from cfg_checker.common import logger_cli
+from cfg_checker.common.decorators import retry
from cfg_checker.helpers.console_utils import Progress
# from cfg_checker.common.exception import InvalidReturnException
# from cfg_checker.common.exception import ConfigException
@@ -43,12 +44,14 @@
self.storage_class = config.bench_storage_class
self.agent_pods = []
self.services = []
- self.api_urls = []
+ self.scheduled_delay = 30
self.mode = config.bench_mode
if config.bench_mode == "tasks":
- self.load_tasks(config.bench_task_file)
+ self.taskfile = config.bench_task_file
+ self.load_tasks(self.taskfile)
self.cleanup_list = []
+ self.results = {}
def load_tasks(self, taskfile):
# Load csv file
@@ -94,15 +97,19 @@
self.add_for_deletion(_svc, "svc")
# Save service
self.services.append(_svc)
-
- # Build urls for agents
- for svc in self.services:
- self.api_urls.append(
+ # prepopulate results
+ self.results[_agent.metadata.name] = {}
+ self.results[_agent.metadata.name]["list"] = {}
+ self.results[_agent.metadata.name]["url"] = \
"http://{}:{}/api/".format(
- svc.spec.cluster_ip,
+ _svc.spec.cluster_ip,
8765
)
- )
+ self.results[_agent.metadata.name]["storage_class"] = \
+ self.storage_class
+ self.results[_agent.metadata.name]["volume_size"] = \
+ options['size']
+
logger_cli.info("-> Done creating agents")
return
@@ -147,98 +154,203 @@
return None
- def run_benchmark(self, options):
- def get_status():
- return [self._poke_agent(_u + "fio", {}) for _u in self.api_urls]
- logger_cli.info("# Starting '{}' benchmark".format(self.mode))
- logger_cli.info("# Checking agents")
+ def _ensure_agents_ready(self):
# make sure agents idle
- _tt = []
- _rr = []
- for _s in get_status():
- if _s is None:
+ _status_set = []
+ _ready_set = []
+ for _agent, _d in self.get_agents_status().items():
+ # obviously, there should be some answer
+ if _d is None:
logger_cli.error("ERROR: Agent status not available")
return False
- _h = _s["healthcheck"]["hostname"]
- _t = _s['status']
- _r = _s["healthcheck"]["ready"]
- if _t not in ["idle", "finished"]:
- logger_cli.error("Agent status invalid {}:{}".format(_h, _t))
- _tt += [False]
+ # status should be idle or finished
+ if _d['status'] not in ["idle", "finished"]:
+ logger_cli.error(
+ "Agent status invalid {}:{}".format(_agent, _d['status'])
+ )
+ _status_set += [False]
else:
- _tt += [True]
- if not _r:
- logger_cli.error("Agent is not ready {}".format(_h))
- _rr += [False]
+ # Good agent
+ _status_set += [True]
+ # agent's fio shell should be in 'ready'
+ if not _d["healthcheck"]["ready"]:
+ logger_cli.error("Agent is not ready {}".format(_agent))
+ _ready_set += [False]
else:
- _rr += [True]
- if not any(_tt) or not any(_rr):
+ # 'fio' shell for agent is ready
+ _ready_set += [True]
+ # all agent's statuses should be True
+ # and all 'fio' shell modules should be 'ready'
+ if not any(_status_set) or not any(_ready_set):
+ # At least one is not ready and it was already logged above
+ return False
+ else:
+ # All is good
+ return True
+
+ def get_agents_status(self):
+ _status = {}
+ for _agent, _d in self.results.items():
+ _status[_agent] = self._poke_agent(_d["url"] + "fio", {})
+ return _status
+
+ def get_agents_resultlist(self):
+ _t = {"module": "fio", "action": "get_resultlist"}
+ _status = {}
+ for _agent, _d in self.results.items():
+ _status[_agent] = self._poke_agent(_d["url"], _t, action="POST")
+ return _status
+
+ @retry(Exception)
+ def get_result_from_agent(self, agent, time):
+ _t = {
+ "module": "fio",
+ "action": "get_result",
+ "options": {
+ "time": time
+ }
+ }
+ return self._poke_agent(self.results[agent]["url"], _t, action="POST")
+
+ def _get_next_scheduled_time(self):
+ _now = datetime.now(timezone.utc)
+ logger_cli.info("-> time is '{}'".format(_now.strftime(_datetime_fmt)))
+ _time = _now + timedelta(seconds=self.scheduled_delay)
+ _str_time = _time.strftime(_datetime_fmt)
+ logger_cli.info(
+ "-> next benchmark scheduled to '{}'".format(_str_time)
+ )
+ return _str_time
+
+ def _send_scheduled_task(self, options):
+ _task = {
+ "module": "fio",
+ "action": "do_scheduledrun",
+ "options": options
+ }
+ for _agent, _d in self.results.items():
+ logger_cli.info(
+ "-> sending task to '{}:{}'".format(_agent, _d["url"])
+ )
+ _ret = self._poke_agent(_d["url"], _task, action="POST")
+ if 'error' in _ret:
+ logger_cli.error(
+ "ERROR: Agent returned: '{}'".format(_ret['error'])
+ )
+ return False
+ # No errors detected
+ return True
+
+ def track_benchmark(self, options):
+ _runtime = _get_seconds(options["runtime"])
+ _ramptime = _get_seconds(options["ramp_time"])
+ # Sum up all timings that we must wait and double it
+ _timeout = (self.scheduled_delay + _runtime + _ramptime) * 2
+ _end = datetime.now(timezone.utc) + timedelta(seconds=_timeout)
+ while True:
+ # Print status
+ # TODO: do pooled status get
+ _sts = self.get_agents_status()
+ diff = (_end - datetime.now(timezone.utc)).total_seconds()
+ logger_cli.info("-> {:.2f}s left. Agent status:".format(diff))
+ for _agent, _status in _sts.items():
+ logger_cli.info(
+ "\t{}: {} ({}%)".format(
+ _agent,
+ _status["status"],
+ _status["progress"]
+ )
+ )
+ finished = [True for _s in _sts.values()
+ if _s["status"] == 'finished']
+ _fcnt = len(finished)
+ _tcnt = len(_sts)
+ if _fcnt < _tcnt:
+ logger_cli.info("-> {}/{} finished".format(_fcnt, _tcnt))
+ else:
+ logger_cli.info("-> All agents finished run")
+ return True
+ # recalc how much is left
+ diff = (_end - datetime.now(timezone.utc)).total_seconds()
+ # In case end_datetime was in past to begin with
+ if diff < 0:
+ logger_cli.info("-> Timed out waiting for agents to finish")
+ return False
+ logger_cli.info("-> Sleeping for {:.2f}s".format(diff/3))
+ sleep(diff/3)
+ if diff <= 0.1:
+ logger_cli.info("-> Timed out waiting for agents to finish")
+ return False
+
+ def _do_testrun(self, options):
+ # send single to agent
+ if not self._send_scheduled_task(options):
+ return False
+ # Track this benchmark progress
+ if not self.track_benchmark(options):
+ return False
+ else:
+ logger_cli.info("-> Finished testrun")
+ # Get results for each agent
+ self.collect_results()
+ return True
+
+ def _wait_ceph_cooldown(self):
+ # TODO: Query Ceph ince a 20 sec to make sure its load dropped
+
+ return
+
+ def run_benchmark(self, options):
+ logger_cli.info("# Starting '{}' benchmark".format(self.mode))
+ # Check agent readyness
+ logger_cli.info("# Checking agents")
+ if not self._ensure_agents_ready():
return False
# Make sure that Ceph is at low load
# TODO: Ceph status check
+ self._wait_ceph_cooldown()
# Do benchmark according to mode
if self.mode == "tasks":
- # TODO: Implement 'tasks' mode
- # take next task
-
- # update options
-
- # init time to schedule
-
- # send next task to agent
- pass
- # wait for agents to finish
- elif self.mode == "single":
- logger_cli.info("# Running benchmark")
- # init time to schedule
- _time = datetime.now() + timedelta(seconds=10)
- _str_time = _time.strftime(_datetime_fmt)
- options["scheduled_to"] = _str_time
logger_cli.info(
- "-> next benchmark scheduled to '{}'".format(_str_time)
- )
- # send single to agent
- _task = {
- "module": "fio",
- "action": "do_singlerun",
- "options": options
- }
- for _u in self.api_urls:
- logger_cli.info("-> sending task to '{}'".format(_u))
- _ret = self._poke_agent(_u, _task, action="POST")
- if 'error' in _ret:
- logger_cli.error(
- "ERROR: Agent returned: '{}'".format(_ret['error'])
- )
-
- _runtime = _get_seconds(options["runtime"])
- _ramptime = _get_seconds(options["ramp_time"])
- _timeout = _runtime + _ramptime + 5
- _end = datetime.now() + timedelta(seconds=_timeout)
- while True:
- # Print status
- _sts = get_status()
- _str = ""
- for _s in _sts:
- _str += "{}: {} ({}); ".format(
- _s["healthcheck"]["hostname"],
- _s["status"],
- _s["progress"]
+ "# Running benchmark with tasks from '{}'".format(
+ self.taskfile
)
- # recalc how much is left
- diff = (_end - datetime.now()).total_seconds()
- logger_cli.debug("... [{:.2f}s]: {}".format(diff, _str))
- # In case end_datetime was in past to begin with
- if diff < 0:
- break
- logger_cli.info("-> Sleeping for {:.2f}s".format(diff/2))
- sleep(diff/2)
- if diff <= 0.1:
- break
+ )
+ # take next task
+ _total_tasks = len(self.tasks)
+ for idx in range(_total_tasks):
+ _task = self.tasks[idx]
+ logger_cli.info(
+ "-> Starting next task ({}/{})".format(idx+1, _total_tasks)
+ )
+ logger_cli.info("-> Updating options with: {}".format(
+ ", ".join(
+ ["{} = {}".format(k, v) for k, v in _task.items()]
+ )
+ )
+ )
+ # update options
+ options.update(_task)
+ # init time to schedule
+ options["scheduled_to"] = self._get_next_scheduled_time()
+ if not self._do_testrun(options):
+ return False
- logger_cli.info("-> Done")
+ self._wait_ceph_cooldown()
+ elif self.mode == "single":
+ logger_cli.info("# Running single benchmark")
+ # init time to schedule
+ options["scheduled_to"] = self._get_next_scheduled_time()
+ if not self._do_testrun(options):
+ return False
+ else:
+ logger_cli.error("ERROR: Unknown mode '{}'".format(self.mode))
+ return False
+
+ # Normal exit
+ logger_cli.info("# All benchmark tasks done")
return True
def cleanup(self):
@@ -260,6 +372,7 @@
if len(_l) > 0:
_p.write_progress(_idx)
else:
+ _p.write_progress(_idx)
_p.end()
logger_cli.info("# Done cleaning up")
break
@@ -272,6 +385,38 @@
return
+ def collect_results(self):
+ logger_cli.info("# Collecting results")
+ # query agents for results
+ _agents = self.get_agents_resultlist()
+
+ for _agent, _l in _agents.items():
+ _list = _l["resultlist"]
+ _new = [r for r in _list if r not in self.results[_agent]["list"]]
+ logger_cli.debug(
+ "... agent '{}' has {} new results".format(_agent, len(_new))
+ )
+ # get all new results
+ for _time in _new:
+ logger_cli.info(
+ "-> loading results for '{}' from '{}'".format(
+ _time,
+ _agent
+ )
+ )
+ self.results[_agent]["list"].update(
+ self.get_result_from_agent(_agent, _time)
+ )
+ return
+
+ def dump_results(self, path):
+ # Function dumps all availabkle results as jsons to the given path
+ # overwriting if needed
+
+ # TODO: Conduct the dumping
+
+ return
+
# Create report
def create_report(self, filename):
diff --git a/setup.py b/setup.py
index c27fcf4..fd73a82 100644
--- a/setup.py
+++ b/setup.py
@@ -38,7 +38,7 @@
setup(
name="mcp-checker",
- version="0.63",
+ version="0.64",
author="Alex Savatieiev",
author_email="osavatieiev@mirantis.com",
classifiers=[
diff --git a/templates/cfgagent-template.yaml b/templates/cfgagent-template.yaml
index fe1065d..3152c5f 100644
--- a/templates/cfgagent-template.yaml
+++ b/templates/cfgagent-template.yaml
@@ -11,7 +11,7 @@
- checker-agent
imagePullPolicy: IfNotPresent
name: cfgagent-pod
- image: savex13/cfg-checker-agent:0.63
+ image: savex13/cfg-checker-agent:0.64
volumeMounts:
- mountPath: /cephvol
name: cfgagent-pv-placeholder