blob: b79007c705451c8b5de1525e139d4cb36abd23f9 [file] [log] [blame]
Alex0989ecf2022-03-29 13:43:21 -05001# Author: Alex Savatieiev (osavatieiev@mirantis.com; a.savex@gmail.com)
2# Copyright 2019-2022 Mirantis, Inc.
Alex5cace3b2021-11-10 16:40:37 -06003import csv
4import os
5import json
6
Alexbdc72742021-12-23 13:26:05 -06007from copy import deepcopy
Alex3034ba52021-11-13 17:06:45 -06008from datetime import datetime, timedelta, timezone
Alex5cace3b2021-11-10 16:40:37 -06009
Alexdcb792f2021-10-04 14:24:21 -050010from cfg_checker.common import logger_cli
Alex3034ba52021-11-13 17:06:45 -060011from cfg_checker.common.decorators import retry
Alexb2129542021-11-23 15:49:42 -060012from cfg_checker.common.file_utils import write_str_to_file
Alex0bcf31b2022-03-29 17:38:58 -050013from cfg_checker.common.other import utils
Alexbfa947c2021-11-11 18:14:28 -060014from cfg_checker.helpers.console_utils import Progress
Alexe4de1142022-11-04 19:26:03 -050015from cfg_checker.helpers.console_utils import cl_typewriter
Alexb2129542021-11-23 15:49:42 -060016from cfg_checker.reports import reporter
Alexdcb792f2021-10-04 14:24:21 -050017# from cfg_checker.common.exception import InvalidReturnException
18# from cfg_checker.common.exception import ConfigException
19# from cfg_checker.common.exception import KubeException
20
21from cfg_checker.nodes import KubeNodes
Alex5cace3b2021-11-10 16:40:37 -060022from cfg_checker.agent.fio_runner import _get_seconds, _datetime_fmt
Alexdcb792f2021-10-04 14:24:21 -050023
24
Alex90ac1532021-12-09 11:13:14 -060025_file_datetime_fmt = "%m%d%Y%H%M%S%z"
26
27
Alexb2129542021-11-23 15:49:42 -060028def _reformat_timestr(_str, _chars=["/", ",", " ", ":", "+"], _tchar=""):
29 _new = ""
30 for _c in _str:
31 _new += _c if _c not in _chars else _tchar
32 return _new
33
34
35def _parse_json_output(buffer):
36 try:
37 return json.loads(buffer)
38 except TypeError as e:
39 logger_cli.error(
40 "ERROR: Status not decoded: {}\n{}".format(e, buffer)
41 )
42 except json.decoder.JSONDecodeError as e:
43 logger_cli.error(
44 "ERROR: Status not decoded: {}\n{}".format(e, buffer)
45 )
46 return {}
47
48
Alexdcb792f2021-10-04 14:24:21 -050049class CephBench(object):
Alex5cace3b2021-11-10 16:40:37 -060050 _agent_template = "cfgagent-template.yaml"
51
52 def __init__(self, config):
Alexdcb792f2021-10-04 14:24:21 -050053 self.env_config = config
54 return
55
Alexdcb792f2021-10-04 14:24:21 -050056
57class SaltCephBench(CephBench):
58 def __init__(
59 self,
60 config
61 ):
Alex5cace3b2021-11-10 16:40:37 -060062 logger_cli.error("ERROR: Not impelented for Salt environment!")
Alexdcb792f2021-10-04 14:24:21 -050063
64 # self.master = SaltNodes(config)
Alex5cace3b2021-11-10 16:40:37 -060065 super(SaltCephBench, self).__init__(config)
Alexdcb792f2021-10-04 14:24:21 -050066 return
67
68
69class KubeCephBench(CephBench):
70 def __init__(self, config):
Alex5cace3b2021-11-10 16:40:37 -060071 self.agent_count = config.bench_agent_count
Alexdcb792f2021-10-04 14:24:21 -050072 self.master = KubeNodes(config)
73 super(KubeCephBench, self).__init__(config)
Alexb2129542021-11-23 15:49:42 -060074
Alex5cace3b2021-11-10 16:40:37 -060075 self.mode = config.bench_mode
Alexb2129542021-11-23 15:49:42 -060076 self.resource_prefix = config.resource_prefix
Alex30380a42021-12-20 16:11:20 -060077
Alex5cace3b2021-11-10 16:40:37 -060078 if config.bench_mode == "tasks":
Alex3034ba52021-11-13 17:06:45 -060079 self.taskfile = config.bench_task_file
80 self.load_tasks(self.taskfile)
Alex30380a42021-12-20 16:11:20 -060081
82 if config.bench_mode == "cleanup":
Alexb2129542021-11-23 15:49:42 -060083 self.cleanup_list = []
84 return
85
Alex90ac1532021-12-09 11:13:14 -060086 self.bench_name = config.bench_name
Alex30380a42021-12-20 16:11:20 -060087 self.results_dump_path = config.bench_results_dump_path
88 self.results = {}
89 self.agent_results = {}
90 self.cleanup_list = []
Alexb2129542021-11-23 15:49:42 -060091 self.agent_pods = []
Alex30380a42021-12-20 16:11:20 -060092
93 if config.bench_mode == "report":
94 self.results = {}
95 return
96
97 self.storage_class = config.bench_storage_class
Alexb2129542021-11-23 15:49:42 -060098 self.services = []
99 # By default,
100 # 30 seconds should be enough to send tasks to 3-5 agents
101 self.scheduled_delay = 30
Alex5cace3b2021-11-10 16:40:37 -0600102
Alexb2129542021-11-23 15:49:42 -0600103 def set_ceph_info_class(self, ceph_info):
104 self.ceph_info = ceph_info
Alex2a7657c2021-11-10 20:51:34 -0600105
Alex5cace3b2021-11-10 16:40:37 -0600106 def load_tasks(self, taskfile):
107 # Load csv file
108 logger_cli.info("-> Loading taskfile '{}'".format(taskfile))
109 self.tasks = []
110 with open(taskfile) as f:
111 _reader = csv.reader(f, delimiter=',')
112 # load packages
113 for row in _reader:
114 self.tasks.append({
115 "readwrite": row[0],
116 "rwmixread": row[1],
117 "bs": row[2],
118 "iodepth": row[3],
Alexe4de1142022-11-04 19:26:03 -0500119 "size": row[4],
120 "ramp_time": row[5],
121 "runtime": row[6]
Alex5cace3b2021-11-10 16:40:37 -0600122 })
Alexb2129542021-11-23 15:49:42 -0600123 logger_cli.info("-> Loaded {} tasks".format(len(self.tasks)))
Alex5cace3b2021-11-10 16:40:37 -0600124
Alex2a7657c2021-11-10 20:51:34 -0600125 def add_for_deletion(self, obj, typ):
Alexb2129542021-11-23 15:49:42 -0600126 self.cleanup_list.append(
127 [
128 typ,
129 obj.metadata.namespace,
130 obj.metadata.name
131 ]
132 )
133 return
134
135 def prepare_cleanup(self):
136 # Assume number of resources not given
137 # list all svc, pod, pvc, pv and identify 'cfgagent-xx ones
138 _types = ["pv", "pvc", "pod", "svc"]
139 _prefix = self.resource_prefix
140 for _typ in _types:
141 _list = self.master.list_resource_names_by_type_and_ns(_typ)
142 for ns, name in _list:
143 if name.startswith(_prefix):
144 if ns:
145 _msg = "{} {}/{}".format(_typ, ns, name)
146 else:
147 _msg = "{} {}".format(_typ, name)
148 logger_cli.info("-> Found {}".format(_msg))
149 self.cleanup_list.append([_typ, ns, name])
Alex2a7657c2021-11-10 20:51:34 -0600150 return
151
Alex5cace3b2021-11-10 16:40:37 -0600152 def prepare_agents(self, options):
153 logger_cli.info("# Preparing {} agents".format(self.agent_count))
Alex30380a42021-12-20 16:11:20 -0600154 # Increase volume size a bit, so datafile fits
155 _quanitizer = 1.3
Alex0bcf31b2022-03-29 17:38:58 -0500156 _v_size, _vol_size_units = utils.split_option_type(options['size'])
Alex30380a42021-12-20 16:11:20 -0600157 _v_size = round(_v_size * _quanitizer)
158 _vol_size = str(_v_size) + _vol_size_units + "i"
159 logger_cli.info(
160 "-> Testfile size: {0}, Volume size: {1} ({0}*{2})".format(
161 options['size'],
162 _vol_size,
163 _quanitizer
164 )
165 )
166 # Start preparing
Alex5cace3b2021-11-10 16:40:37 -0600167 for idx in range(self.agent_count):
168 # create pvc/pv and pod
169 logger_cli.info("-> creating agent '{:02}'".format(idx))
Alex90ac1532021-12-09 11:13:14 -0600170 # _agent, _pv, _pvc = self.master.prepare_benchmark_agent(
171 _agent, _pvc = self.master.prepare_benchmark_agent(
Alex5cace3b2021-11-10 16:40:37 -0600172 idx,
173 os.path.split(options["filename"])[0],
174 self.storage_class,
Alex30380a42021-12-20 16:11:20 -0600175 _vol_size,
Alex5cace3b2021-11-10 16:40:37 -0600176 self._agent_template
177 )
Alex2a7657c2021-11-10 20:51:34 -0600178 # save it to lists
Alex5cace3b2021-11-10 16:40:37 -0600179 self.agent_pods.append(_agent)
Alex90ac1532021-12-09 11:13:14 -0600180 # self.add_for_deletion(_pv, "pv")
Alex2a7657c2021-11-10 20:51:34 -0600181 self.add_for_deletion(_pvc, "pvc")
182 self.add_for_deletion(_agent, "pod")
183
Alex5cace3b2021-11-10 16:40:37 -0600184 # expose it
185 _svc = self.master.expose_benchmark_agent(_agent)
Alex2a7657c2021-11-10 20:51:34 -0600186 self.add_for_deletion(_svc, "svc")
Alex5cace3b2021-11-10 16:40:37 -0600187 # Save service
188 self.services.append(_svc)
Alex3034ba52021-11-13 17:06:45 -0600189 # prepopulate results
Alexb2129542021-11-23 15:49:42 -0600190 self.agent_results[_agent.metadata.name] = {}
191 self.agent_results[_agent.metadata.name]["url"] = \
Alex5cace3b2021-11-10 16:40:37 -0600192 "http://{}:{}/api/".format(
Alex3034ba52021-11-13 17:06:45 -0600193 _svc.spec.cluster_ip,
Alex5cace3b2021-11-10 16:40:37 -0600194 8765
195 )
Alexb2129542021-11-23 15:49:42 -0600196 self.agent_results[_agent.metadata.name]["storage_class"] = \
Alex3034ba52021-11-13 17:06:45 -0600197 self.storage_class
Alexb2129542021-11-23 15:49:42 -0600198 self.agent_results[_agent.metadata.name]["volume_size"] = \
Alex3034ba52021-11-13 17:06:45 -0600199 options['size']
200
Alex5cace3b2021-11-10 16:40:37 -0600201 logger_cli.info("-> Done creating agents")
Alexb2129542021-11-23 15:49:42 -0600202 # TODO: Update after implementing pooled task sending
Alexdefbfeb2022-11-08 12:17:54 -0600203 # idea is to have time to schedule task to each agent every 5 sec max
204 self.scheduled_delay = self.agent_count * 5
Alexb2129542021-11-23 15:49:42 -0600205 logger_cli.info(
206 "-> Schedule delay set to {} sec".format(self.scheduled_delay)
207 )
Alex5cace3b2021-11-10 16:40:37 -0600208 return
209
210 def _poke_agent(self, url, body, action="GET"):
211 _datafile = "/tmp/data"
212 _data = [
Alexbfa947c2021-11-11 18:14:28 -0600213 "-d",
214 "@" + _datafile
Alex5cace3b2021-11-10 16:40:37 -0600215 ]
216 _cmd = [
217 "curl",
218 "-s",
219 "-H",
220 "'Content-Type: application/json'",
221 "-X",
222 action,
223 url
224 ]
225 if body:
226 _cmd += _data
227 _ret = self.master.prepare_json_in_pod(
228 self.agent_pods[0].metadata.name,
229 self.master._namespace,
230 body,
231 _datafile
232 )
233 _ret = self.master.exec_cmd_on_target_pod(
234 self.agent_pods[0].metadata.name,
235 self.master._namespace,
236 " ".join(_cmd)
237 )
Alexb2129542021-11-23 15:49:42 -0600238 return _parse_json_output(_ret)
Alex5cace3b2021-11-10 16:40:37 -0600239
Alex3034ba52021-11-13 17:06:45 -0600240 def _ensure_agents_ready(self):
Alex5cace3b2021-11-10 16:40:37 -0600241 # make sure agents idle
Alex3034ba52021-11-13 17:06:45 -0600242 _status_set = []
243 _ready_set = []
244 for _agent, _d in self.get_agents_status().items():
245 # obviously, there should be some answer
246 if _d is None:
Alex5cace3b2021-11-10 16:40:37 -0600247 logger_cli.error("ERROR: Agent status not available")
248 return False
Alex3034ba52021-11-13 17:06:45 -0600249 # status should be idle or finished
250 if _d['status'] not in ["idle", "finished"]:
251 logger_cli.error(
252 "Agent status invalid {}:{}".format(_agent, _d['status'])
253 )
254 _status_set += [False]
Alex5cace3b2021-11-10 16:40:37 -0600255 else:
Alex3034ba52021-11-13 17:06:45 -0600256 # Good agent
257 _status_set += [True]
258 # agent's fio shell should be in 'ready'
259 if not _d["healthcheck"]["ready"]:
260 logger_cli.error("Agent is not ready {}".format(_agent))
261 _ready_set += [False]
Alex5cace3b2021-11-10 16:40:37 -0600262 else:
Alex3034ba52021-11-13 17:06:45 -0600263 # 'fio' shell for agent is ready
264 _ready_set += [True]
265 # all agent's statuses should be True
266 # and all 'fio' shell modules should be 'ready'
267 if not any(_status_set) or not any(_ready_set):
268 # At least one is not ready and it was already logged above
269 return False
270 else:
271 # All is good
272 return True
273
Alexe4de1142022-11-04 19:26:03 -0500274 def get_agents_status(self, silent=True):
Alex3034ba52021-11-13 17:06:45 -0600275 _status = {}
Alexb2129542021-11-23 15:49:42 -0600276 _results = self.master.exec_on_labeled_pods_and_ns(
277 "app=cfgagent",
Alexe4de1142022-11-04 19:26:03 -0500278 "curl -s http://localhost:8765/api/fio",
279 silent=silent
Alexb2129542021-11-23 15:49:42 -0600280 )
281 for _agent, _result in _results.items():
282 _j = _parse_json_output(_result)
283 _status[_agent] = _j
Alex3034ba52021-11-13 17:06:45 -0600284 return _status
285
Alexb2129542021-11-23 15:49:42 -0600286 @retry(Exception, initial_wait=5)
Alex3034ba52021-11-13 17:06:45 -0600287 def get_agents_resultlist(self):
288 _t = {"module": "fio", "action": "get_resultlist"}
289 _status = {}
Alexb2129542021-11-23 15:49:42 -0600290 for _agent, _d in self.agent_results.items():
Alex3034ba52021-11-13 17:06:45 -0600291 _status[_agent] = self._poke_agent(_d["url"], _t, action="POST")
292 return _status
293
Alexb2129542021-11-23 15:49:42 -0600294 @retry(Exception, initial_wait=5)
Alex3034ba52021-11-13 17:06:45 -0600295 def get_result_from_agent(self, agent, time):
296 _t = {
297 "module": "fio",
298 "action": "get_result",
299 "options": {
300 "time": time
301 }
302 }
Alexb2129542021-11-23 15:49:42 -0600303 return self._poke_agent(
304 self.agent_results[agent]["url"],
305 _t,
306 action="POST"
307 )
Alex3034ba52021-11-13 17:06:45 -0600308
309 def _get_next_scheduled_time(self):
310 _now = datetime.now(timezone.utc)
311 logger_cli.info("-> time is '{}'".format(_now.strftime(_datetime_fmt)))
Alex90ac1532021-12-09 11:13:14 -0600312 self.next_scheduled_time = _now + timedelta(
313 seconds=self.scheduled_delay
314 )
315 _str_time = self.next_scheduled_time.strftime(_datetime_fmt)
Alex3034ba52021-11-13 17:06:45 -0600316 logger_cli.info(
317 "-> next benchmark scheduled to '{}'".format(_str_time)
318 )
319 return _str_time
320
321 def _send_scheduled_task(self, options):
322 _task = {
323 "module": "fio",
324 "action": "do_scheduledrun",
325 "options": options
326 }
Alexb2129542021-11-23 15:49:42 -0600327 for _agent, _d in self.agent_results.items():
Alex3034ba52021-11-13 17:06:45 -0600328 logger_cli.info(
329 "-> sending task to '{}:{}'".format(_agent, _d["url"])
330 )
331 _ret = self._poke_agent(_d["url"], _task, action="POST")
332 if 'error' in _ret:
333 logger_cli.error(
334 "ERROR: Agent returned: '{}'".format(_ret['error'])
335 )
336 return False
337 # No errors detected
338 return True
339
340 def track_benchmark(self, options):
341 _runtime = _get_seconds(options["runtime"])
342 _ramptime = _get_seconds(options["ramp_time"])
343 # Sum up all timings that we must wait and double it
344 _timeout = (self.scheduled_delay + _runtime + _ramptime) * 2
Alex30380a42021-12-20 16:11:20 -0600345 # We should have no more than 65 measurements
346 _stats_delay = round((_runtime + _ramptime) / 65)
Alex90ac1532021-12-09 11:13:14 -0600347 _start = self.next_scheduled_time
Alex3034ba52021-11-13 17:06:45 -0600348 _end = datetime.now(timezone.utc) + timedelta(seconds=_timeout)
Alexe4de1142022-11-04 19:26:03 -0500349 logger_cli.info(" ")
350 tw = cl_typewriter()
Alex3034ba52021-11-13 17:06:45 -0600351 while True:
352 # Print status
Alexe4de1142022-11-04 19:26:03 -0500353 tw.cl_start(" ")
354 _sts = self.get_agents_status(silent=True)
355 # Use same line
Alex3034ba52021-11-13 17:06:45 -0600356 diff = (_end - datetime.now(timezone.utc)).total_seconds()
Alexe4de1142022-11-04 19:26:03 -0500357 _startin = (_start - datetime.now(timezone.utc)).total_seconds()
358 if _startin > 0:
359 tw.cl_inline("-> starting in {:.2f}s ".format(_startin))
360 else:
361 tw.cl_inline("-> {:.2f}s; ".format(diff))
362 _progress = [_st["progress"] for _st in _sts.values()]
363 tw.cl_inline(
364 "{}% <-> {}%; ".format(
365 min(_progress),
366 max(_progress)
Alex3034ba52021-11-13 17:06:45 -0600367 )
368 )
Alexe4de1142022-11-04 19:26:03 -0500369
370 _a_sts = [_t["status"] for _t in _sts.values()]
371 tw.cl_inline(
372 ", ".join(
373 ["{} {}".format(_a_sts.count(_s), _s)
374 for _s in set(_a_sts)]
375 )
376 )
377
Alex90ac1532021-12-09 11:13:14 -0600378 # Get Ceph status if _start time passed
379 _elapsed = (datetime.now(timezone.utc) - _start).total_seconds()
Alex30380a42021-12-20 16:11:20 -0600380 if _elapsed > _stats_delay:
Alexe4de1142022-11-04 19:26:03 -0500381 # Use same line output
382 tw.cl_inline(" {:.2f}s elapsed".format(_elapsed))
Alex30380a42021-12-20 16:11:20 -0600383 _sec = "{:0.1f}".format(_elapsed)
384 self.results[options["scheduled_to"]]["ceph"][_sec] = \
Alex90ac1532021-12-09 11:13:14 -0600385 self.ceph_info.get_cluster_status()
386 # Check if agents finished
Alexb2129542021-11-23 15:49:42 -0600387 finished = [True for _s in _sts.values()
Alex3034ba52021-11-13 17:06:45 -0600388 if _s["status"] == 'finished']
389 _fcnt = len(finished)
390 _tcnt = len(_sts)
391 if _fcnt < _tcnt:
Alexdefbfeb2022-11-08 12:17:54 -0600392 tw.cl_inline("; {}/{}".format(_fcnt, _tcnt))
Alex3034ba52021-11-13 17:06:45 -0600393 else:
Alexe4de1142022-11-04 19:26:03 -0500394 tw.cl_flush(newline=True)
Alex3034ba52021-11-13 17:06:45 -0600395 logger_cli.info("-> All agents finished run")
396 return True
397 # recalc how much is left
398 diff = (_end - datetime.now(timezone.utc)).total_seconds()
399 # In case end_datetime was in past to begin with
400 if diff < 0:
Alexe4de1142022-11-04 19:26:03 -0500401 tw.cl_flush(newline=True)
Alex3034ba52021-11-13 17:06:45 -0600402 logger_cli.info("-> Timed out waiting for agents to finish")
403 return False
Alexe4de1142022-11-04 19:26:03 -0500404 tw.cl_flush()
Alex3034ba52021-11-13 17:06:45 -0600405
406 def _do_testrun(self, options):
Alex30380a42021-12-20 16:11:20 -0600407 self.results[options["scheduled_to"]]["osd_df_before"] = \
408 self.ceph_info.get_ceph_osd_df()
Alex3034ba52021-11-13 17:06:45 -0600409 # send single to agent
410 if not self._send_scheduled_task(options):
411 return False
412 # Track this benchmark progress
413 if not self.track_benchmark(options):
414 return False
415 else:
Alex90ac1532021-12-09 11:13:14 -0600416 logger_cli.info("-> Finished testrun. Collecting results...")
Alex30380a42021-12-20 16:11:20 -0600417 # get ceph osd stats
418 self.results[options["scheduled_to"]]["osd_df_after"] = \
419 self.ceph_info.get_ceph_osd_df()
Alex3034ba52021-11-13 17:06:45 -0600420 # Get results for each agent
Alex90ac1532021-12-09 11:13:14 -0600421 self.collect_results()
422 logger_cli.info("-> Calculating totals and averages")
423 self.calculate_totals()
424 self.calculate_ceph_stats()
Alex30380a42021-12-20 16:11:20 -0600425 self.osd_df_compare(options["scheduled_to"])
Alex90ac1532021-12-09 11:13:14 -0600426 logger_cli.info("-> Dumping results")
427 for _time, _d in self.results.items():
428 self.dump_result(
429 self._get_dump_filename(_time),
430 _d
431 )
Alex3034ba52021-11-13 17:06:45 -0600432 return True
433
Alexb2129542021-11-23 15:49:42 -0600434 def wait_ceph_cooldown(self):
Alex3034ba52021-11-13 17:06:45 -0600435 # TODO: Query Ceph ince a 20 sec to make sure its load dropped
436
Alexb2129542021-11-23 15:49:42 -0600437 # get ceph idle status
438 self.ceph_idle_status = self.ceph_info.get_cluster_status()
439 self.health_detail = self.ceph_info.get_health_detail()
440 self.ceph_df = self.ceph_info.get_ceph_df()
441 self.ceph_pg_dump = self.ceph_info.get_ceph_pg_dump()
Alex3034ba52021-11-13 17:06:45 -0600442 return
443
444 def run_benchmark(self, options):
445 logger_cli.info("# Starting '{}' benchmark".format(self.mode))
446 # Check agent readyness
447 logger_cli.info("# Checking agents")
448 if not self._ensure_agents_ready():
Alex5cace3b2021-11-10 16:40:37 -0600449 return False
450
451 # Make sure that Ceph is at low load
452 # TODO: Ceph status check
Alexb2129542021-11-23 15:49:42 -0600453 # self._wait_ceph_cooldown()
454
Alex5cace3b2021-11-10 16:40:37 -0600455 # Do benchmark according to mode
456 if self.mode == "tasks":
Alex5cace3b2021-11-10 16:40:37 -0600457 logger_cli.info(
Alex3034ba52021-11-13 17:06:45 -0600458 "# Running benchmark with tasks from '{}'".format(
459 self.taskfile
Alex5cace3b2021-11-10 16:40:37 -0600460 )
Alex3034ba52021-11-13 17:06:45 -0600461 )
462 # take next task
463 _total_tasks = len(self.tasks)
464 for idx in range(_total_tasks):
Alexb2129542021-11-23 15:49:42 -0600465 # init time to schedule
Alex3034ba52021-11-13 17:06:45 -0600466 _task = self.tasks[idx]
Alexbdc72742021-12-23 13:26:05 -0600467 _r = self.results
Alex3034ba52021-11-13 17:06:45 -0600468 logger_cli.info(
469 "-> Starting next task ({}/{})".format(idx+1, _total_tasks)
470 )
471 logger_cli.info("-> Updating options with: {}".format(
472 ", ".join(
473 ["{} = {}".format(k, v) for k, v in _task.items()]
474 )
475 )
476 )
477 # update options
478 options.update(_task)
Alexbdc72742021-12-23 13:26:05 -0600479 # Check if such result already exists
480 o = "input_options"
481 _existing = filter(
482 lambda t:
483 _r[t]["id"] == idx and
484 _r[t]["mode"] == "tasks" and
485 _r[t][o]["readwrite"] == options["readwrite"] and
486 _r[t][o]["rwmixread"] == options["rwmixread"] and
487 _r[t][o]["bs"] == options["bs"] and
488 _r[t][o]["iodepth"] == options["iodepth"] and
489 _r[t][o]["size"] == options["size"],
490 _r
491 )
492 if len(list(_existing)) > 0:
493 logger_cli.info(
494 "-> Skipped already performed task from {}: "
495 "line {}, {}({}), {}, {}, {}".format(
496 self.taskfile,
497 idx,
498 options["readwrite"],
499 options["rwmixread"],
500 options["bs"],
501 options["iodepth"],
502 options["size"]
503 )
504 )
505 continue
Alexb2129542021-11-23 15:49:42 -0600506 _sch_time = self._get_next_scheduled_time()
507 options["scheduled_to"] = _sch_time
508 # init results table
Alexbdc72742021-12-23 13:26:05 -0600509 _r[_sch_time] = {
510 "id": idx,
511 "mode": self.mode,
512 "input_options": deepcopy(options),
Alexb2129542021-11-23 15:49:42 -0600513 "agents": {},
Alex30380a42021-12-20 16:11:20 -0600514 "ceph": {}
Alexb2129542021-11-23 15:49:42 -0600515 }
Alex30380a42021-12-20 16:11:20 -0600516 # exit on error
Alex3034ba52021-11-13 17:06:45 -0600517 if not self._do_testrun(options):
518 return False
Alex30380a42021-12-20 16:11:20 -0600519 # Save ceph osd stats and wait cooldown
Alexb2129542021-11-23 15:49:42 -0600520 self.wait_ceph_cooldown()
Alex3034ba52021-11-13 17:06:45 -0600521 elif self.mode == "single":
522 logger_cli.info("# Running single benchmark")
523 # init time to schedule
Alexb2129542021-11-23 15:49:42 -0600524 _sch_time = self._get_next_scheduled_time()
525 options["scheduled_to"] = _sch_time
526 # init results table
527 self.results[_sch_time] = {
Alexbdc72742021-12-23 13:26:05 -0600528 "id": "{:2}".format(0),
Alexb2129542021-11-23 15:49:42 -0600529 "input_options": options,
530 "agents": {},
Alex30380a42021-12-20 16:11:20 -0600531 "ceph": {}
Alexb2129542021-11-23 15:49:42 -0600532 }
Alex3034ba52021-11-13 17:06:45 -0600533 if not self._do_testrun(options):
534 return False
Alex30380a42021-12-20 16:11:20 -0600535 # Save ceph osd stats
Alex3034ba52021-11-13 17:06:45 -0600536 else:
537 logger_cli.error("ERROR: Unknown mode '{}'".format(self.mode))
538 return False
539
540 # Normal exit
541 logger_cli.info("# All benchmark tasks done")
Alex5cace3b2021-11-10 16:40:37 -0600542 return True
543
544 def cleanup(self):
Alexb2129542021-11-23 15:49:42 -0600545 logger_cli.info("# Cleaning up")
Alex2a7657c2021-11-10 20:51:34 -0600546 self.cleanup_list.reverse()
Alex5cace3b2021-11-10 16:40:37 -0600547
Alex2a7657c2021-11-10 20:51:34 -0600548 for _res in self.cleanup_list:
549 self.master.cleanup_resource_by_name(_res[0], _res[2], ns=_res[1])
Alexbfa947c2021-11-11 18:14:28 -0600550
551 # Wait for resource to be cleaned
552 _timeout = 120
553 _total = len(self.cleanup_list)
554 logger_cli.info("-> Wait until {} resources cleaned".format(_total))
555 _p = Progress(_total)
556 while True:
557 _g = self.master.get_resource_phase_by_name
558 _l = [_g(r[0], r[2], ns=r[1]) for r in self.cleanup_list]
559 _l = [item for item in _l if item]
560 _idx = _total - len(_l)
561 if len(_l) > 0:
562 _p.write_progress(_idx)
563 else:
Alex3034ba52021-11-13 17:06:45 -0600564 _p.write_progress(_idx)
Alexbfa947c2021-11-11 18:14:28 -0600565 _p.end()
566 logger_cli.info("# Done cleaning up")
567 break
568 if _timeout > 0:
569 _timeout -= 1
570 else:
571 _p.end()
572 logger_cli.info("# Timed out waiting after 120s.")
573 break
574
Alex5cace3b2021-11-10 16:40:37 -0600575 return
576
Alex90ac1532021-12-09 11:13:14 -0600577 def collect_results(self):
Alex3034ba52021-11-13 17:06:45 -0600578 logger_cli.info("# Collecting results")
579 # query agents for results
580 _agents = self.get_agents_resultlist()
Alex3034ba52021-11-13 17:06:45 -0600581 for _agent, _l in _agents.items():
Alexb2129542021-11-23 15:49:42 -0600582 # Check if we already have this locally
583 for _time in _l["resultlist"]:
Alex90ac1532021-12-09 11:13:14 -0600584 # There is a file already for this task/time
585 # Check if we need to load it
586 if _time not in self.results:
587 # Some older results found
588 # do not process them
Alexe4de1142022-11-04 19:26:03 -0500589 logger_cli.debug(
590 "...skipped old results for '{}'".format(_time)
Alex90ac1532021-12-09 11:13:14 -0600591 )
592 continue
593 elif _agent not in self.results[_time]["agents"]:
594 # Load result add it locally
Alexb2129542021-11-23 15:49:42 -0600595 logger_cli.info(
596 "-> Getting results for '{}' from '{}'".format(
Alex90ac1532021-12-09 11:13:14 -0600597 _time,
Alexb2129542021-11-23 15:49:42 -0600598 _agent
599 )
Alex3034ba52021-11-13 17:06:45 -0600600 )
Alexb2129542021-11-23 15:49:42 -0600601 _r = self.get_result_from_agent(_agent, _time)
Alex90ac1532021-12-09 11:13:14 -0600602 self.results[_time]["agents"][_agent] = _r[_time]
603 else:
604 # Should never happen, actually
605 logger_cli.info(
606 "-> Skipped loaded result for '{}' from '{}'".format(
607 _time,
608 _agent
609 )
610 )
Alex3034ba52021-11-13 17:06:45 -0600611
Alex90ac1532021-12-09 11:13:14 -0600612 def _get_dump_filename(self, _time):
613 _r = self.results[_time]
614 _dirname = _r["input_options"]["name"]
Alexb2129542021-11-23 15:49:42 -0600615 _filename = "-".join([
Alex90ac1532021-12-09 11:13:14 -0600616 _reformat_timestr(_time),
617 "{:02}".format(len(_r["agents"])),
618 _r["input_options"]["readwrite"],
619 _r["input_options"]["bs"],
620 str(_r["input_options"]["iodepth"]),
Alexb2129542021-11-23 15:49:42 -0600621 ]) + ".json"
622 return os.path.join(
623 self.results_dump_path,
624 _dirname,
625 _filename
626 )
627
Alex90ac1532021-12-09 11:13:14 -0600628 def preload_results(self):
629 logger_cli.info(
630 "# Preloading results for '{}'".format(self.bench_name)
631 )
632 # get all dirs in folder
633 _p = self.results_dump_path
634 if not os.path.isdir(_p):
635 logger_cli.warn(
636 "WARNING: Dump path is not a folder '{}'".format(_p)
637 )
638 return
639 for path, dirs, files in os.walk(_p):
640 if path == os.path.join(_p, self.bench_name):
641 logger_cli.info("-> Folder found '{}'".format(path))
642 for _fname in files:
643 logger_cli.debug("... processing '{}'".format(_fname))
644 _ext = _fname.split('.')[-1]
645 if _ext != "json":
646 logger_cli.info(
647 "-> Extension invalid '{}', "
648 "'json' is expected".format(_ext)
649 )
650 continue
651 # get time from filename
652 # Ugly, but works
653 _t = _fname.split('-')[0]
654 _str_time = _t[:14] + "+" + _t[14:]
655 _t = datetime.strptime(_str_time, _file_datetime_fmt)
656 _time = _t.strftime(_datetime_fmt)
657 self.results[_time] = self.load_dumped_result(
658 os.path.join(path, _fname)
659 )
660 logger_cli.info(
661 "-> Loaded '{}' as '{}'".format(
662 _fname,
663 _time
664 )
665 )
666
Alexb2129542021-11-23 15:49:42 -0600667 def dump_result(self, filename, data):
668 # Function dumps all available results as jsons to the given path
Alex3034ba52021-11-13 17:06:45 -0600669 # overwriting if needed
Alexb2129542021-11-23 15:49:42 -0600670 _folder, _file = os.path.split(filename)
671 # Do dump
672 if not os.path.exists(_folder):
673 os.mkdir(_folder)
674 logger_cli.info("-> Created folder '{}'".format(_folder))
675 # Dump agent data for this test run
676 write_str_to_file(filename, json.dumps(data, indent=2))
677 logger_cli.info("-> Dumped '{}'".format(filename))
Alex3034ba52021-11-13 17:06:45 -0600678 return
679
Alexb2129542021-11-23 15:49:42 -0600680 def load_dumped_result(self, filename):
681 try:
682 with open(filename, "rt+") as f:
683 return json.loads(f.read())
684 except FileNotFoundError as e:
685 logger_cli.error(
686 "ERROR: {}".format(e)
687 )
688 except TypeError as e:
689 logger_cli.error(
690 "ERROR: Invalid file ({}): {}".format(filename, e)
691 )
692 except json.decoder.JSONDecodeError as e:
693 logger_cli.error(
694 "ERROR: Failed to decode json ({}): {}".format(filename, e)
695 )
696 return None
697
698 def _lookup_storage_class_id_by_name(self, storage_class_name):
699 # Assume that self had proper data
700 for _pool in self.ceph_df["pools"]:
701 if storage_class_name == _pool["name"]:
702 return _pool["id"]
703 return None
704
705 def calculate_totals(self):
Alexe4de1142022-11-04 19:26:03 -0500706 def _savg(vlist):
707 if len(vlist) > 0:
708 return (sum(vlist) / len(vlist)) / 1000
709 else:
710 return 0
Alexb2129542021-11-23 15:49:42 -0600711 # Calculate totals for Read and Write
712 for _time, data in self.results.items():
713 if "totals" not in data:
714 data["totals"] = {}
715 else:
716 continue
717 _totals = data["totals"]
718 _r_bw = 0
719 _r_avglat = []
Alex0989ecf2022-03-29 13:43:21 -0500720 _r_95clat = []
Alexb2129542021-11-23 15:49:42 -0600721 _r_iops = 0
722 _w_bw = 0
723 _w_avglat = []
Alex0989ecf2022-03-29 13:43:21 -0500724 _w_95clat = []
Alexb2129542021-11-23 15:49:42 -0600725 _w_iops = 0
726 for _a, _d in data["agents"].items():
727 # Hardcoded number of jobs param :(
Alex90ac1532021-12-09 11:13:14 -0600728 _j = _d["jobs"][0]
Alexb2129542021-11-23 15:49:42 -0600729 _r_bw += _j["read"]["bw_bytes"]
730 _r_avglat += [_j["read"]["lat_ns"]["mean"]]
731 _r_iops += _j["read"]["iops"]
732 _w_bw += _j["write"]["bw_bytes"]
733 _w_avglat += [_j["write"]["lat_ns"]["mean"]]
734 _w_iops += _j["write"]["iops"]
Alexdefbfeb2022-11-08 12:17:54 -0600735 # check for percentiles
736 if "percentile" in _j["read"]["clat_ns"]:
737 _r_95clat += \
738 [_j["read"]["clat_ns"]["percentile"]["95.000000"]]
739 else:
740 _r_95clat += []
741 if "percentile" in _j["write"]["clat_ns"]:
742 _w_95clat += \
743 [_j["write"]["clat_ns"]["percentile"]["95.000000"]]
744 else:
745 _w_95clat += []
746
Alexb2129542021-11-23 15:49:42 -0600747 # Save storage class name
748 if "storage_class" not in _totals:
749 _totals["storage_class"] = \
750 self.agent_results[_a]["storage_class"]
751 # Lookup storage class id and num_pg
752 _totals["storage_class_stats"] = \
753 reporter.get_pool_stats_by_id(
754 self._lookup_storage_class_id_by_name(
755 self.agent_results[_a]["storage_class"]
756 ),
757 self.ceph_pg_dump
758 )
759
760 _totals["read_bw_bytes"] = _r_bw
Alexe4de1142022-11-04 19:26:03 -0500761 _totals["read_avg_lat_us"] = _savg(_r_avglat)
762 _totals["read_95p_clat_us"] = _savg(_r_95clat)
Alexb2129542021-11-23 15:49:42 -0600763 _totals["read_iops"] = _r_iops
764 _totals["write_bw_bytes"] = _w_bw
Alexe4de1142022-11-04 19:26:03 -0500765 _totals["write_avg_lat_us"] = _savg(_w_avglat)
766 _totals["write_95p_clat_us"] = _savg(_w_95clat)
Alexb2129542021-11-23 15:49:42 -0600767 _totals["write_iops"] = _w_iops
768
Alex90ac1532021-12-09 11:13:14 -0600769 def calculate_ceph_stats(self):
770 # func to get values as lists
Alex30380a42021-12-20 16:11:20 -0600771 def _get_max_value(key, stats):
772 _max_time = 0
773 _value = 0
774 for _k, _v in stats.items():
775 if key in _v and _value < _v[key]:
776 _max_time = _k
777 _value = _v[key]
778 return _max_time, _value
Alex90ac1532021-12-09 11:13:14 -0600779
780 def _perc(n, m):
781 if not n:
782 return 0
783 elif not m:
784 return 0
785 else:
Alex30380a42021-12-20 16:11:20 -0600786 return "{:.0f}%".format((n / m) * 100)
787
788 def _axis_vals(val):
789 return [
790 val, int(val*1.1), int(val*0.75), int(val*0.50), int(val*0.15)
791 ]
Alex90ac1532021-12-09 11:13:14 -0600792
793 _stats = {}
794 for _time, data in self.results.items():
795 if "ceph" not in data:
796 logger_cli.warning(
797 "WARNING: Ceph stats raw data not found in results"
798 )
799 continue
800 if "ceph_stats" not in data:
801 data["ceph_stats"] = {}
802 else:
803 continue
804 # Copy pool stats data
805 for _e, _d in data["ceph"].items():
806 _stats[_e] = _d["pgmap"]
807 # Maximums
Alex30380a42021-12-20 16:11:20 -0600808 mrb_t, mrb = _get_max_value("read_bytes_sec", _stats)
809 mwb_t, mwb = _get_max_value("write_bytes_sec", _stats)
810 mri_t, mri = _get_max_value("read_op_per_sec", _stats)
811 mwi_t, mwi = _get_max_value("write_op_per_sec", _stats)
Alex90ac1532021-12-09 11:13:14 -0600812 # Replace ceph with shorter data
813 data["ceph"] = {
Alex30380a42021-12-20 16:11:20 -0600814 "max_rbl": _axis_vals(mrb),
815 "max_rbl_time": mrb_t,
816 "max_wbl": _axis_vals(mwb),
817 "max_wbl_time": mwb_t,
818 "max_ril": _axis_vals(mri),
819 "max_ril_time": mri_t,
820 "max_wil": _axis_vals(mwi),
821 "max_wil_time": mwi_t,
Alex90ac1532021-12-09 11:13:14 -0600822 "stats": _stats
823 }
824 # Calculate %% values for barchart
825 for _e, _d in data["ceph"]["stats"].items():
826 _d["read_bytes_sec_perc"] = \
Alex30380a42021-12-20 16:11:20 -0600827 _perc(_d.get("read_bytes_sec", 0), mrb)
Alex90ac1532021-12-09 11:13:14 -0600828 _d["write_bytes_sec_perc"] = \
Alex30380a42021-12-20 16:11:20 -0600829 _perc(_d.get("write_bytes_sec", 0), mwb)
Alex90ac1532021-12-09 11:13:14 -0600830 _d["read_op_per_sec_perc"] = \
Alex30380a42021-12-20 16:11:20 -0600831 _perc(_d.get("read_op_per_sec", 0), mri)
Alex90ac1532021-12-09 11:13:14 -0600832 _d["write_op_per_sec_perc"] = \
Alex30380a42021-12-20 16:11:20 -0600833 _perc(_d.get("write_op_per_sec", 0), mwi)
834 return
835
836 def osd_df_compare(self, _time):
837 def _get_osd(osd_id, nodes):
838 for osd in nodes:
839 if osd["id"] == osd_id:
840 return osd
841 return None
842
843 logger_cli.info("# Comparing OSD stats")
844 _osd = {}
845 if _time not in self.results:
846 logger_cli.warning("WARNING: {} not found in results. Check data")
847 return
848 data = self.results[_time]
849 # Save summary
850 data["osd_summary"] = {}
851 data["osd_summary"]["before"] = data["osd_df_before"]["summary"]
852 data["osd_summary"]["after"] = data["osd_df_after"]["summary"]
853 data["osd_summary"]["active"] = {
854 "status": "",
855 "device_class": "",
Alexe4de1142022-11-04 19:26:03 -0500856 "pgs": 0,
Alex30380a42021-12-20 16:11:20 -0600857 "kb_used": 0,
858 "kb_used_data": 0,
859 "kb_used_omap": 0,
860 "kb_used_meta": 0,
861 "utilization": 0,
862 "var_down": 0,
863 "var_up": 0
864 }
865 # Compare OSD counts
866 osds_before = len(data["osd_df_before"]["nodes"])
867 osds_after = len(data["osd_df_after"]["nodes"])
868 if osds_before != osds_after:
869 logger_cli.warning(
870 "WARNING: Before/After bench OSD "
871 "count mismatch for '{}'".format(_time)
872 )
873 # iterate osds from before
874 _pgs = 0
875 _classes = set()
876 _nodes_up = 0
877 for idx in range(osds_before):
878 _osd_b = data["osd_df_before"]["nodes"][idx]
879 # search for the same osd in after
880 _osd_a = _get_osd(_osd_b["id"], data["osd_df_after"]["nodes"])
881 # Save data to the new place
882 _osd[_osd_b["name"]] = {}
883 _osd[_osd_b["name"]]["before"] = _osd_b
884 if not _osd_a:
885 # If this happen, Ceph cluster is actually broken
886 logger_cli.warning(
887 "WARNING: Wow! {} dissapered".format(_osd_b["name"])
888 )
889 _osd[_osd_b["name"]]["after"] = {}
890 else:
891 _osd[_osd_b["name"]]["after"] = _osd_a
892 _osd[_osd_b["name"]]["percent"] = {}
893 # Calculate summary using "after" data
894 _pgs += _osd_a["pgs"]
895 _classes.update([_osd_a["device_class"]])
896 if _osd_a["status"] == "up":
897 _nodes_up += 1
898 # compare
899 _keys_b = list(_osd_b.keys())
900 _keys_a = list(_osd_a.keys())
901 _nodes_up
902 # To be safe, detect if some keys are different
903 # ...and log it.
904 _diff = set(_keys_b).symmetric_difference(_keys_a)
905 if len(_diff) > 0:
906 # This should never happen, actually
907 logger_cli.warning(
908 "WARNING: Before/after keys mismatch "
909 "for OSD node {}: {}".format(idx, ", ".join(_diff))
910 )
911 continue
912 # Compare each key and calculate how it changed
913 for k in _keys_b:
914 if _osd_b[k] != _osd_a[k]:
915 # Announce change
916 logger_cli.debug(
917 "-> {:4}: {}, {} -> {}".format(
918 idx,
919 k,
920 _osd_b[k],
921 _osd_a[k]
922 )
923 )
924 # calculate percent
925 _change_perc = (_osd_a[k] / _osd_b[k]) * 100 - 100
926 _osd[_osd_b["name"]]["percent"][k] = _change_perc
927
928 # Increase counters
929 _p = data["osd_summary"]["active"]
930
931 if k not in _p:
932 _p[k] = 1
933 else:
934 _p[k] += 1
935 if k == "var":
936 if _change_perc > 0:
937 _p["var_up"] += 1
938 elif _change_perc < 0:
939 _p["var_down"] += 1
940 # Save sorted data
941 data["osds"] = _osd
942 logger_cli.info("-> Removing redundand osd before/after data")
943 data.pop("osd_df_before")
944 data.pop("osd_df_after")
945 # Save summary
946 data["osd_summary"]["active"]["status"] = "{}".format(_nodes_up)
947 data["osd_summary"]["active"]["device_class"] = \
948 "{}".format(len(list(_classes)))
949 data["osd_summary"]["active"]["pgs"] = _pgs
Alex90ac1532021-12-09 11:13:14 -0600950 return
951
Alex5cace3b2021-11-10 16:40:37 -0600952 # Create report
Alex2a7657c2021-11-10 20:51:34 -0600953 def create_report(self, filename):
Alexb2129542021-11-23 15:49:42 -0600954 """
955 Create static html showing ceph info report
956
957 :return: none
958 """
959 logger_cli.info("### Generating report to '{}'".format(filename))
960 _report = reporter.ReportToFile(
961 reporter.HTMLCephBench(self),
962 filename
963 )
Alexb2129542021-11-23 15:49:42 -0600964 _report(
965 {
966 "results": self.results,
967 "idle_status": self.ceph_idle_status,
968 "health_detail": self.health_detail,
969 "ceph_df": self.ceph_df,
970 "ceph_pg_dump": self.ceph_pg_dump,
971 "info": self.ceph_info.ceph_info,
972 "cluster": self.ceph_info.cluster_info,
973 "ceph_version": self.ceph_info.ceph_version,
974 "nodes": self.agent_pods
975 }
976 )
977 logger_cli.info("-> Done")
Alex5cace3b2021-11-10 16:40:37 -0600978
979 return