blob: f5af704946992e3489bbd8cfd18f269ed2b5c08d [file] [log] [blame]
Alex0989ecf2022-03-29 13:43:21 -05001# Author: Alex Savatieiev (osavatieiev@mirantis.com; a.savex@gmail.com)
2# Copyright 2019-2022 Mirantis, Inc.
Alex5cace3b2021-11-10 16:40:37 -06003import csv
4import os
5import json
6
Alexbdc72742021-12-23 13:26:05 -06007from copy import deepcopy
Alex3034ba52021-11-13 17:06:45 -06008from datetime import datetime, timedelta, timezone
Alex5cace3b2021-11-10 16:40:37 -06009
Alexdcb792f2021-10-04 14:24:21 -050010from cfg_checker.common import logger_cli
Alex3034ba52021-11-13 17:06:45 -060011from cfg_checker.common.decorators import retry
Alexb2129542021-11-23 15:49:42 -060012from cfg_checker.common.file_utils import write_str_to_file
Alexbfa947c2021-11-11 18:14:28 -060013from cfg_checker.helpers.console_utils import Progress
Alexe4de1142022-11-04 19:26:03 -050014from cfg_checker.helpers.console_utils import cl_typewriter
Alexb2129542021-11-23 15:49:42 -060015from cfg_checker.reports import reporter
Alexdcb792f2021-10-04 14:24:21 -050016# from cfg_checker.common.exception import InvalidReturnException
17# from cfg_checker.common.exception import ConfigException
18# from cfg_checker.common.exception import KubeException
19
20from cfg_checker.nodes import KubeNodes
Alex5cace3b2021-11-10 16:40:37 -060021from cfg_checker.agent.fio_runner import _get_seconds, _datetime_fmt
Alexdcb792f2021-10-04 14:24:21 -050022
23
Alex90ac1532021-12-09 11:13:14 -060024_file_datetime_fmt = "%m%d%Y%H%M%S%z"
25
26
Alexb2129542021-11-23 15:49:42 -060027def _reformat_timestr(_str, _chars=["/", ",", " ", ":", "+"], _tchar=""):
28 _new = ""
29 for _c in _str:
30 _new += _c if _c not in _chars else _tchar
31 return _new
32
33
34def _parse_json_output(buffer):
35 try:
36 return json.loads(buffer)
37 except TypeError as e:
38 logger_cli.error(
39 "ERROR: Status not decoded: {}\n{}".format(e, buffer)
40 )
41 except json.decoder.JSONDecodeError as e:
42 logger_cli.error(
43 "ERROR: Status not decoded: {}\n{}".format(e, buffer)
44 )
45 return {}
46
47
Alex30380a42021-12-20 16:11:20 -060048def _split_vol_size(size):
49 # I know, but it is faster then regex
50 _numbers = [48, 49, 50, 51, 52, 53, 54, 55, 56, 57]
51 _s_int = "0"
52 _s_type = ""
53 for ch in size:
54 if ord(ch) in _numbers:
55 _s_int += ch
56 else:
57 _s_type += ch
58 return int(_s_int), _s_type
59
60
Alexdcb792f2021-10-04 14:24:21 -050061class CephBench(object):
Alex5cace3b2021-11-10 16:40:37 -060062 _agent_template = "cfgagent-template.yaml"
63
64 def __init__(self, config):
Alexdcb792f2021-10-04 14:24:21 -050065 self.env_config = config
66 return
67
Alexdcb792f2021-10-04 14:24:21 -050068
69class SaltCephBench(CephBench):
70 def __init__(
71 self,
72 config
73 ):
Alex5cace3b2021-11-10 16:40:37 -060074 logger_cli.error("ERROR: Not impelented for Salt environment!")
Alexdcb792f2021-10-04 14:24:21 -050075
76 # self.master = SaltNodes(config)
Alex5cace3b2021-11-10 16:40:37 -060077 super(SaltCephBench, self).__init__(config)
Alexdcb792f2021-10-04 14:24:21 -050078 return
79
80
81class KubeCephBench(CephBench):
82 def __init__(self, config):
Alex5cace3b2021-11-10 16:40:37 -060083 self.agent_count = config.bench_agent_count
Alexdcb792f2021-10-04 14:24:21 -050084 self.master = KubeNodes(config)
85 super(KubeCephBench, self).__init__(config)
Alexb2129542021-11-23 15:49:42 -060086
Alex5cace3b2021-11-10 16:40:37 -060087 self.mode = config.bench_mode
Alexb2129542021-11-23 15:49:42 -060088 self.resource_prefix = config.resource_prefix
Alex30380a42021-12-20 16:11:20 -060089
Alex5cace3b2021-11-10 16:40:37 -060090 if config.bench_mode == "tasks":
Alex3034ba52021-11-13 17:06:45 -060091 self.taskfile = config.bench_task_file
92 self.load_tasks(self.taskfile)
Alex30380a42021-12-20 16:11:20 -060093
94 if config.bench_mode == "cleanup":
Alexb2129542021-11-23 15:49:42 -060095 self.cleanup_list = []
96 return
97
Alex90ac1532021-12-09 11:13:14 -060098 self.bench_name = config.bench_name
Alex30380a42021-12-20 16:11:20 -060099 self.results_dump_path = config.bench_results_dump_path
100 self.results = {}
101 self.agent_results = {}
102 self.cleanup_list = []
Alexb2129542021-11-23 15:49:42 -0600103 self.agent_pods = []
Alex30380a42021-12-20 16:11:20 -0600104
105 if config.bench_mode == "report":
106 self.results = {}
107 return
108
109 self.storage_class = config.bench_storage_class
Alexb2129542021-11-23 15:49:42 -0600110 self.services = []
111 # By default,
112 # 30 seconds should be enough to send tasks to 3-5 agents
113 self.scheduled_delay = 30
Alex5cace3b2021-11-10 16:40:37 -0600114
Alexb2129542021-11-23 15:49:42 -0600115 def set_ceph_info_class(self, ceph_info):
116 self.ceph_info = ceph_info
Alex2a7657c2021-11-10 20:51:34 -0600117
Alex5cace3b2021-11-10 16:40:37 -0600118 def load_tasks(self, taskfile):
119 # Load csv file
120 logger_cli.info("-> Loading taskfile '{}'".format(taskfile))
121 self.tasks = []
122 with open(taskfile) as f:
123 _reader = csv.reader(f, delimiter=',')
124 # load packages
125 for row in _reader:
126 self.tasks.append({
127 "readwrite": row[0],
128 "rwmixread": row[1],
129 "bs": row[2],
130 "iodepth": row[3],
Alexe4de1142022-11-04 19:26:03 -0500131 "size": row[4],
132 "ramp_time": row[5],
133 "runtime": row[6]
Alex5cace3b2021-11-10 16:40:37 -0600134 })
Alexb2129542021-11-23 15:49:42 -0600135 logger_cli.info("-> Loaded {} tasks".format(len(self.tasks)))
Alex5cace3b2021-11-10 16:40:37 -0600136
Alex2a7657c2021-11-10 20:51:34 -0600137 def add_for_deletion(self, obj, typ):
Alexb2129542021-11-23 15:49:42 -0600138 self.cleanup_list.append(
139 [
140 typ,
141 obj.metadata.namespace,
142 obj.metadata.name
143 ]
144 )
145 return
146
147 def prepare_cleanup(self):
148 # Assume number of resources not given
149 # list all svc, pod, pvc, pv and identify 'cfgagent-xx ones
150 _types = ["pv", "pvc", "pod", "svc"]
151 _prefix = self.resource_prefix
152 for _typ in _types:
153 _list = self.master.list_resource_names_by_type_and_ns(_typ)
154 for ns, name in _list:
155 if name.startswith(_prefix):
156 if ns:
157 _msg = "{} {}/{}".format(_typ, ns, name)
158 else:
159 _msg = "{} {}".format(_typ, name)
160 logger_cli.info("-> Found {}".format(_msg))
161 self.cleanup_list.append([_typ, ns, name])
Alex2a7657c2021-11-10 20:51:34 -0600162 return
163
Alex5cace3b2021-11-10 16:40:37 -0600164 def prepare_agents(self, options):
165 logger_cli.info("# Preparing {} agents".format(self.agent_count))
Alex30380a42021-12-20 16:11:20 -0600166 # Increase volume size a bit, so datafile fits
167 _quanitizer = 1.3
168 _v_size, _vol_size_units = _split_vol_size(options['size'])
169 _v_size = round(_v_size * _quanitizer)
170 _vol_size = str(_v_size) + _vol_size_units + "i"
171 logger_cli.info(
172 "-> Testfile size: {0}, Volume size: {1} ({0}*{2})".format(
173 options['size'],
174 _vol_size,
175 _quanitizer
176 )
177 )
178 # Start preparing
Alex5cace3b2021-11-10 16:40:37 -0600179 for idx in range(self.agent_count):
180 # create pvc/pv and pod
181 logger_cli.info("-> creating agent '{:02}'".format(idx))
Alex90ac1532021-12-09 11:13:14 -0600182 # _agent, _pv, _pvc = self.master.prepare_benchmark_agent(
183 _agent, _pvc = self.master.prepare_benchmark_agent(
Alex5cace3b2021-11-10 16:40:37 -0600184 idx,
185 os.path.split(options["filename"])[0],
186 self.storage_class,
Alex30380a42021-12-20 16:11:20 -0600187 _vol_size,
Alex5cace3b2021-11-10 16:40:37 -0600188 self._agent_template
189 )
Alex2a7657c2021-11-10 20:51:34 -0600190 # save it to lists
Alex5cace3b2021-11-10 16:40:37 -0600191 self.agent_pods.append(_agent)
Alex90ac1532021-12-09 11:13:14 -0600192 # self.add_for_deletion(_pv, "pv")
Alex2a7657c2021-11-10 20:51:34 -0600193 self.add_for_deletion(_pvc, "pvc")
194 self.add_for_deletion(_agent, "pod")
195
Alex5cace3b2021-11-10 16:40:37 -0600196 # expose it
197 _svc = self.master.expose_benchmark_agent(_agent)
Alex2a7657c2021-11-10 20:51:34 -0600198 self.add_for_deletion(_svc, "svc")
Alex5cace3b2021-11-10 16:40:37 -0600199 # Save service
200 self.services.append(_svc)
Alex3034ba52021-11-13 17:06:45 -0600201 # prepopulate results
Alexb2129542021-11-23 15:49:42 -0600202 self.agent_results[_agent.metadata.name] = {}
203 self.agent_results[_agent.metadata.name]["url"] = \
Alex5cace3b2021-11-10 16:40:37 -0600204 "http://{}:{}/api/".format(
Alex3034ba52021-11-13 17:06:45 -0600205 _svc.spec.cluster_ip,
Alex5cace3b2021-11-10 16:40:37 -0600206 8765
207 )
Alexb2129542021-11-23 15:49:42 -0600208 self.agent_results[_agent.metadata.name]["storage_class"] = \
Alex3034ba52021-11-13 17:06:45 -0600209 self.storage_class
Alexb2129542021-11-23 15:49:42 -0600210 self.agent_results[_agent.metadata.name]["volume_size"] = \
Alex3034ba52021-11-13 17:06:45 -0600211 options['size']
212
Alex5cace3b2021-11-10 16:40:37 -0600213 logger_cli.info("-> Done creating agents")
Alexb2129542021-11-23 15:49:42 -0600214 # TODO: Update after implementing pooled task sending
Alexdefbfeb2022-11-08 12:17:54 -0600215 # idea is to have time to schedule task to each agent every 5 sec max
216 self.scheduled_delay = self.agent_count * 5
Alexb2129542021-11-23 15:49:42 -0600217 logger_cli.info(
218 "-> Schedule delay set to {} sec".format(self.scheduled_delay)
219 )
Alex5cace3b2021-11-10 16:40:37 -0600220 return
221
222 def _poke_agent(self, url, body, action="GET"):
223 _datafile = "/tmp/data"
224 _data = [
Alexbfa947c2021-11-11 18:14:28 -0600225 "-d",
226 "@" + _datafile
Alex5cace3b2021-11-10 16:40:37 -0600227 ]
228 _cmd = [
229 "curl",
230 "-s",
231 "-H",
232 "'Content-Type: application/json'",
233 "-X",
234 action,
235 url
236 ]
237 if body:
238 _cmd += _data
239 _ret = self.master.prepare_json_in_pod(
240 self.agent_pods[0].metadata.name,
241 self.master._namespace,
242 body,
243 _datafile
244 )
245 _ret = self.master.exec_cmd_on_target_pod(
246 self.agent_pods[0].metadata.name,
247 self.master._namespace,
248 " ".join(_cmd)
249 )
Alexb2129542021-11-23 15:49:42 -0600250 return _parse_json_output(_ret)
Alex5cace3b2021-11-10 16:40:37 -0600251
Alex3034ba52021-11-13 17:06:45 -0600252 def _ensure_agents_ready(self):
Alex5cace3b2021-11-10 16:40:37 -0600253 # make sure agents idle
Alex3034ba52021-11-13 17:06:45 -0600254 _status_set = []
255 _ready_set = []
256 for _agent, _d in self.get_agents_status().items():
257 # obviously, there should be some answer
258 if _d is None:
Alex5cace3b2021-11-10 16:40:37 -0600259 logger_cli.error("ERROR: Agent status not available")
260 return False
Alex3034ba52021-11-13 17:06:45 -0600261 # status should be idle or finished
262 if _d['status'] not in ["idle", "finished"]:
263 logger_cli.error(
264 "Agent status invalid {}:{}".format(_agent, _d['status'])
265 )
266 _status_set += [False]
Alex5cace3b2021-11-10 16:40:37 -0600267 else:
Alex3034ba52021-11-13 17:06:45 -0600268 # Good agent
269 _status_set += [True]
270 # agent's fio shell should be in 'ready'
271 if not _d["healthcheck"]["ready"]:
272 logger_cli.error("Agent is not ready {}".format(_agent))
273 _ready_set += [False]
Alex5cace3b2021-11-10 16:40:37 -0600274 else:
Alex3034ba52021-11-13 17:06:45 -0600275 # 'fio' shell for agent is ready
276 _ready_set += [True]
277 # all agent's statuses should be True
278 # and all 'fio' shell modules should be 'ready'
279 if not any(_status_set) or not any(_ready_set):
280 # At least one is not ready and it was already logged above
281 return False
282 else:
283 # All is good
284 return True
285
Alexe4de1142022-11-04 19:26:03 -0500286 def get_agents_status(self, silent=True):
Alex3034ba52021-11-13 17:06:45 -0600287 _status = {}
Alexb2129542021-11-23 15:49:42 -0600288 _results = self.master.exec_on_labeled_pods_and_ns(
289 "app=cfgagent",
Alexe4de1142022-11-04 19:26:03 -0500290 "curl -s http://localhost:8765/api/fio",
291 silent=silent
Alexb2129542021-11-23 15:49:42 -0600292 )
293 for _agent, _result in _results.items():
294 _j = _parse_json_output(_result)
295 _status[_agent] = _j
Alex3034ba52021-11-13 17:06:45 -0600296 return _status
297
Alexb2129542021-11-23 15:49:42 -0600298 @retry(Exception, initial_wait=5)
Alex3034ba52021-11-13 17:06:45 -0600299 def get_agents_resultlist(self):
300 _t = {"module": "fio", "action": "get_resultlist"}
301 _status = {}
Alexb2129542021-11-23 15:49:42 -0600302 for _agent, _d in self.agent_results.items():
Alex3034ba52021-11-13 17:06:45 -0600303 _status[_agent] = self._poke_agent(_d["url"], _t, action="POST")
304 return _status
305
Alexb2129542021-11-23 15:49:42 -0600306 @retry(Exception, initial_wait=5)
Alex3034ba52021-11-13 17:06:45 -0600307 def get_result_from_agent(self, agent, time):
308 _t = {
309 "module": "fio",
310 "action": "get_result",
311 "options": {
312 "time": time
313 }
314 }
Alexb2129542021-11-23 15:49:42 -0600315 return self._poke_agent(
316 self.agent_results[agent]["url"],
317 _t,
318 action="POST"
319 )
Alex3034ba52021-11-13 17:06:45 -0600320
321 def _get_next_scheduled_time(self):
322 _now = datetime.now(timezone.utc)
323 logger_cli.info("-> time is '{}'".format(_now.strftime(_datetime_fmt)))
Alex90ac1532021-12-09 11:13:14 -0600324 self.next_scheduled_time = _now + timedelta(
325 seconds=self.scheduled_delay
326 )
327 _str_time = self.next_scheduled_time.strftime(_datetime_fmt)
Alex3034ba52021-11-13 17:06:45 -0600328 logger_cli.info(
329 "-> next benchmark scheduled to '{}'".format(_str_time)
330 )
331 return _str_time
332
333 def _send_scheduled_task(self, options):
334 _task = {
335 "module": "fio",
336 "action": "do_scheduledrun",
337 "options": options
338 }
Alexb2129542021-11-23 15:49:42 -0600339 for _agent, _d in self.agent_results.items():
Alex3034ba52021-11-13 17:06:45 -0600340 logger_cli.info(
341 "-> sending task to '{}:{}'".format(_agent, _d["url"])
342 )
343 _ret = self._poke_agent(_d["url"], _task, action="POST")
344 if 'error' in _ret:
345 logger_cli.error(
346 "ERROR: Agent returned: '{}'".format(_ret['error'])
347 )
348 return False
349 # No errors detected
350 return True
351
352 def track_benchmark(self, options):
353 _runtime = _get_seconds(options["runtime"])
354 _ramptime = _get_seconds(options["ramp_time"])
355 # Sum up all timings that we must wait and double it
356 _timeout = (self.scheduled_delay + _runtime + _ramptime) * 2
Alex30380a42021-12-20 16:11:20 -0600357 # We should have no more than 65 measurements
358 _stats_delay = round((_runtime + _ramptime) / 65)
Alex90ac1532021-12-09 11:13:14 -0600359 _start = self.next_scheduled_time
Alex3034ba52021-11-13 17:06:45 -0600360 _end = datetime.now(timezone.utc) + timedelta(seconds=_timeout)
Alexe4de1142022-11-04 19:26:03 -0500361 logger_cli.info(" ")
362 tw = cl_typewriter()
Alex3034ba52021-11-13 17:06:45 -0600363 while True:
364 # Print status
Alexe4de1142022-11-04 19:26:03 -0500365 tw.cl_start(" ")
366 _sts = self.get_agents_status(silent=True)
367 # Use same line
Alex3034ba52021-11-13 17:06:45 -0600368 diff = (_end - datetime.now(timezone.utc)).total_seconds()
Alexe4de1142022-11-04 19:26:03 -0500369 _startin = (_start - datetime.now(timezone.utc)).total_seconds()
370 if _startin > 0:
371 tw.cl_inline("-> starting in {:.2f}s ".format(_startin))
372 else:
373 tw.cl_inline("-> {:.2f}s; ".format(diff))
374 _progress = [_st["progress"] for _st in _sts.values()]
375 tw.cl_inline(
376 "{}% <-> {}%; ".format(
377 min(_progress),
378 max(_progress)
Alex3034ba52021-11-13 17:06:45 -0600379 )
380 )
Alexe4de1142022-11-04 19:26:03 -0500381
382 _a_sts = [_t["status"] for _t in _sts.values()]
383 tw.cl_inline(
384 ", ".join(
385 ["{} {}".format(_a_sts.count(_s), _s)
386 for _s in set(_a_sts)]
387 )
388 )
389
Alex90ac1532021-12-09 11:13:14 -0600390 # Get Ceph status if _start time passed
391 _elapsed = (datetime.now(timezone.utc) - _start).total_seconds()
Alex30380a42021-12-20 16:11:20 -0600392 if _elapsed > _stats_delay:
Alexe4de1142022-11-04 19:26:03 -0500393 # Use same line output
394 tw.cl_inline(" {:.2f}s elapsed".format(_elapsed))
Alex30380a42021-12-20 16:11:20 -0600395 _sec = "{:0.1f}".format(_elapsed)
396 self.results[options["scheduled_to"]]["ceph"][_sec] = \
Alex90ac1532021-12-09 11:13:14 -0600397 self.ceph_info.get_cluster_status()
398 # Check if agents finished
Alexb2129542021-11-23 15:49:42 -0600399 finished = [True for _s in _sts.values()
Alex3034ba52021-11-13 17:06:45 -0600400 if _s["status"] == 'finished']
401 _fcnt = len(finished)
402 _tcnt = len(_sts)
403 if _fcnt < _tcnt:
Alexdefbfeb2022-11-08 12:17:54 -0600404 tw.cl_inline("; {}/{}".format(_fcnt, _tcnt))
Alex3034ba52021-11-13 17:06:45 -0600405 else:
Alexe4de1142022-11-04 19:26:03 -0500406 tw.cl_flush(newline=True)
Alex3034ba52021-11-13 17:06:45 -0600407 logger_cli.info("-> All agents finished run")
408 return True
409 # recalc how much is left
410 diff = (_end - datetime.now(timezone.utc)).total_seconds()
411 # In case end_datetime was in past to begin with
412 if diff < 0:
Alexe4de1142022-11-04 19:26:03 -0500413 tw.cl_flush(newline=True)
Alex3034ba52021-11-13 17:06:45 -0600414 logger_cli.info("-> Timed out waiting for agents to finish")
415 return False
Alexe4de1142022-11-04 19:26:03 -0500416 tw.cl_flush()
Alex3034ba52021-11-13 17:06:45 -0600417
418 def _do_testrun(self, options):
Alex30380a42021-12-20 16:11:20 -0600419 self.results[options["scheduled_to"]]["osd_df_before"] = \
420 self.ceph_info.get_ceph_osd_df()
Alex3034ba52021-11-13 17:06:45 -0600421 # send single to agent
422 if not self._send_scheduled_task(options):
423 return False
424 # Track this benchmark progress
425 if not self.track_benchmark(options):
426 return False
427 else:
Alex90ac1532021-12-09 11:13:14 -0600428 logger_cli.info("-> Finished testrun. Collecting results...")
Alex30380a42021-12-20 16:11:20 -0600429 # get ceph osd stats
430 self.results[options["scheduled_to"]]["osd_df_after"] = \
431 self.ceph_info.get_ceph_osd_df()
Alex3034ba52021-11-13 17:06:45 -0600432 # Get results for each agent
Alex90ac1532021-12-09 11:13:14 -0600433 self.collect_results()
434 logger_cli.info("-> Calculating totals and averages")
435 self.calculate_totals()
436 self.calculate_ceph_stats()
Alex30380a42021-12-20 16:11:20 -0600437 self.osd_df_compare(options["scheduled_to"])
Alex90ac1532021-12-09 11:13:14 -0600438 logger_cli.info("-> Dumping results")
439 for _time, _d in self.results.items():
440 self.dump_result(
441 self._get_dump_filename(_time),
442 _d
443 )
Alex3034ba52021-11-13 17:06:45 -0600444 return True
445
Alexb2129542021-11-23 15:49:42 -0600446 def wait_ceph_cooldown(self):
Alex3034ba52021-11-13 17:06:45 -0600447 # TODO: Query Ceph ince a 20 sec to make sure its load dropped
448
Alexb2129542021-11-23 15:49:42 -0600449 # get ceph idle status
450 self.ceph_idle_status = self.ceph_info.get_cluster_status()
451 self.health_detail = self.ceph_info.get_health_detail()
452 self.ceph_df = self.ceph_info.get_ceph_df()
453 self.ceph_pg_dump = self.ceph_info.get_ceph_pg_dump()
Alex3034ba52021-11-13 17:06:45 -0600454 return
455
456 def run_benchmark(self, options):
457 logger_cli.info("# Starting '{}' benchmark".format(self.mode))
458 # Check agent readyness
459 logger_cli.info("# Checking agents")
460 if not self._ensure_agents_ready():
Alex5cace3b2021-11-10 16:40:37 -0600461 return False
462
463 # Make sure that Ceph is at low load
464 # TODO: Ceph status check
Alexb2129542021-11-23 15:49:42 -0600465 # self._wait_ceph_cooldown()
466
Alex5cace3b2021-11-10 16:40:37 -0600467 # Do benchmark according to mode
468 if self.mode == "tasks":
Alex5cace3b2021-11-10 16:40:37 -0600469 logger_cli.info(
Alex3034ba52021-11-13 17:06:45 -0600470 "# Running benchmark with tasks from '{}'".format(
471 self.taskfile
Alex5cace3b2021-11-10 16:40:37 -0600472 )
Alex3034ba52021-11-13 17:06:45 -0600473 )
474 # take next task
475 _total_tasks = len(self.tasks)
476 for idx in range(_total_tasks):
Alexb2129542021-11-23 15:49:42 -0600477 # init time to schedule
Alex3034ba52021-11-13 17:06:45 -0600478 _task = self.tasks[idx]
Alexbdc72742021-12-23 13:26:05 -0600479 _r = self.results
Alex3034ba52021-11-13 17:06:45 -0600480 logger_cli.info(
481 "-> Starting next task ({}/{})".format(idx+1, _total_tasks)
482 )
483 logger_cli.info("-> Updating options with: {}".format(
484 ", ".join(
485 ["{} = {}".format(k, v) for k, v in _task.items()]
486 )
487 )
488 )
489 # update options
490 options.update(_task)
Alexbdc72742021-12-23 13:26:05 -0600491 # Check if such result already exists
492 o = "input_options"
493 _existing = filter(
494 lambda t:
495 _r[t]["id"] == idx and
496 _r[t]["mode"] == "tasks" and
497 _r[t][o]["readwrite"] == options["readwrite"] and
498 _r[t][o]["rwmixread"] == options["rwmixread"] and
499 _r[t][o]["bs"] == options["bs"] and
500 _r[t][o]["iodepth"] == options["iodepth"] and
501 _r[t][o]["size"] == options["size"],
502 _r
503 )
504 if len(list(_existing)) > 0:
505 logger_cli.info(
506 "-> Skipped already performed task from {}: "
507 "line {}, {}({}), {}, {}, {}".format(
508 self.taskfile,
509 idx,
510 options["readwrite"],
511 options["rwmixread"],
512 options["bs"],
513 options["iodepth"],
514 options["size"]
515 )
516 )
517 continue
Alexb2129542021-11-23 15:49:42 -0600518 _sch_time = self._get_next_scheduled_time()
519 options["scheduled_to"] = _sch_time
520 # init results table
Alexbdc72742021-12-23 13:26:05 -0600521 _r[_sch_time] = {
522 "id": idx,
523 "mode": self.mode,
524 "input_options": deepcopy(options),
Alexb2129542021-11-23 15:49:42 -0600525 "agents": {},
Alex30380a42021-12-20 16:11:20 -0600526 "ceph": {}
Alexb2129542021-11-23 15:49:42 -0600527 }
Alex30380a42021-12-20 16:11:20 -0600528 # exit on error
Alex3034ba52021-11-13 17:06:45 -0600529 if not self._do_testrun(options):
530 return False
Alex30380a42021-12-20 16:11:20 -0600531 # Save ceph osd stats and wait cooldown
Alexb2129542021-11-23 15:49:42 -0600532 self.wait_ceph_cooldown()
Alex3034ba52021-11-13 17:06:45 -0600533 elif self.mode == "single":
534 logger_cli.info("# Running single benchmark")
535 # init time to schedule
Alexb2129542021-11-23 15:49:42 -0600536 _sch_time = self._get_next_scheduled_time()
537 options["scheduled_to"] = _sch_time
538 # init results table
539 self.results[_sch_time] = {
Alexbdc72742021-12-23 13:26:05 -0600540 "id": "{:2}".format(0),
Alexb2129542021-11-23 15:49:42 -0600541 "input_options": options,
542 "agents": {},
Alex30380a42021-12-20 16:11:20 -0600543 "ceph": {}
Alexb2129542021-11-23 15:49:42 -0600544 }
Alex3034ba52021-11-13 17:06:45 -0600545 if not self._do_testrun(options):
546 return False
Alex30380a42021-12-20 16:11:20 -0600547 # Save ceph osd stats
Alex3034ba52021-11-13 17:06:45 -0600548 else:
549 logger_cli.error("ERROR: Unknown mode '{}'".format(self.mode))
550 return False
551
552 # Normal exit
553 logger_cli.info("# All benchmark tasks done")
Alex5cace3b2021-11-10 16:40:37 -0600554 return True
555
556 def cleanup(self):
Alexb2129542021-11-23 15:49:42 -0600557 logger_cli.info("# Cleaning up")
Alex2a7657c2021-11-10 20:51:34 -0600558 self.cleanup_list.reverse()
Alex5cace3b2021-11-10 16:40:37 -0600559
Alex2a7657c2021-11-10 20:51:34 -0600560 for _res in self.cleanup_list:
561 self.master.cleanup_resource_by_name(_res[0], _res[2], ns=_res[1])
Alexbfa947c2021-11-11 18:14:28 -0600562
563 # Wait for resource to be cleaned
564 _timeout = 120
565 _total = len(self.cleanup_list)
566 logger_cli.info("-> Wait until {} resources cleaned".format(_total))
567 _p = Progress(_total)
568 while True:
569 _g = self.master.get_resource_phase_by_name
570 _l = [_g(r[0], r[2], ns=r[1]) for r in self.cleanup_list]
571 _l = [item for item in _l if item]
572 _idx = _total - len(_l)
573 if len(_l) > 0:
574 _p.write_progress(_idx)
575 else:
Alex3034ba52021-11-13 17:06:45 -0600576 _p.write_progress(_idx)
Alexbfa947c2021-11-11 18:14:28 -0600577 _p.end()
578 logger_cli.info("# Done cleaning up")
579 break
580 if _timeout > 0:
581 _timeout -= 1
582 else:
583 _p.end()
584 logger_cli.info("# Timed out waiting after 120s.")
585 break
586
Alex5cace3b2021-11-10 16:40:37 -0600587 return
588
Alex90ac1532021-12-09 11:13:14 -0600589 def collect_results(self):
Alex3034ba52021-11-13 17:06:45 -0600590 logger_cli.info("# Collecting results")
591 # query agents for results
592 _agents = self.get_agents_resultlist()
Alex3034ba52021-11-13 17:06:45 -0600593 for _agent, _l in _agents.items():
Alexb2129542021-11-23 15:49:42 -0600594 # Check if we already have this locally
595 for _time in _l["resultlist"]:
Alex90ac1532021-12-09 11:13:14 -0600596 # There is a file already for this task/time
597 # Check if we need to load it
598 if _time not in self.results:
599 # Some older results found
600 # do not process them
Alexe4de1142022-11-04 19:26:03 -0500601 logger_cli.debug(
602 "...skipped old results for '{}'".format(_time)
Alex90ac1532021-12-09 11:13:14 -0600603 )
604 continue
605 elif _agent not in self.results[_time]["agents"]:
606 # Load result add it locally
Alexb2129542021-11-23 15:49:42 -0600607 logger_cli.info(
608 "-> Getting results for '{}' from '{}'".format(
Alex90ac1532021-12-09 11:13:14 -0600609 _time,
Alexb2129542021-11-23 15:49:42 -0600610 _agent
611 )
Alex3034ba52021-11-13 17:06:45 -0600612 )
Alexb2129542021-11-23 15:49:42 -0600613 _r = self.get_result_from_agent(_agent, _time)
Alex90ac1532021-12-09 11:13:14 -0600614 self.results[_time]["agents"][_agent] = _r[_time]
615 else:
616 # Should never happen, actually
617 logger_cli.info(
618 "-> Skipped loaded result for '{}' from '{}'".format(
619 _time,
620 _agent
621 )
622 )
Alex3034ba52021-11-13 17:06:45 -0600623
Alex90ac1532021-12-09 11:13:14 -0600624 def _get_dump_filename(self, _time):
625 _r = self.results[_time]
626 _dirname = _r["input_options"]["name"]
Alexb2129542021-11-23 15:49:42 -0600627 _filename = "-".join([
Alex90ac1532021-12-09 11:13:14 -0600628 _reformat_timestr(_time),
629 "{:02}".format(len(_r["agents"])),
630 _r["input_options"]["readwrite"],
631 _r["input_options"]["bs"],
632 str(_r["input_options"]["iodepth"]),
Alexb2129542021-11-23 15:49:42 -0600633 ]) + ".json"
634 return os.path.join(
635 self.results_dump_path,
636 _dirname,
637 _filename
638 )
639
Alex90ac1532021-12-09 11:13:14 -0600640 def preload_results(self):
641 logger_cli.info(
642 "# Preloading results for '{}'".format(self.bench_name)
643 )
644 # get all dirs in folder
645 _p = self.results_dump_path
646 if not os.path.isdir(_p):
647 logger_cli.warn(
648 "WARNING: Dump path is not a folder '{}'".format(_p)
649 )
650 return
651 for path, dirs, files in os.walk(_p):
652 if path == os.path.join(_p, self.bench_name):
653 logger_cli.info("-> Folder found '{}'".format(path))
654 for _fname in files:
655 logger_cli.debug("... processing '{}'".format(_fname))
656 _ext = _fname.split('.')[-1]
657 if _ext != "json":
658 logger_cli.info(
659 "-> Extension invalid '{}', "
660 "'json' is expected".format(_ext)
661 )
662 continue
663 # get time from filename
664 # Ugly, but works
665 _t = _fname.split('-')[0]
666 _str_time = _t[:14] + "+" + _t[14:]
667 _t = datetime.strptime(_str_time, _file_datetime_fmt)
668 _time = _t.strftime(_datetime_fmt)
669 self.results[_time] = self.load_dumped_result(
670 os.path.join(path, _fname)
671 )
672 logger_cli.info(
673 "-> Loaded '{}' as '{}'".format(
674 _fname,
675 _time
676 )
677 )
678
Alexb2129542021-11-23 15:49:42 -0600679 def dump_result(self, filename, data):
680 # Function dumps all available results as jsons to the given path
Alex3034ba52021-11-13 17:06:45 -0600681 # overwriting if needed
Alexb2129542021-11-23 15:49:42 -0600682 _folder, _file = os.path.split(filename)
683 # Do dump
684 if not os.path.exists(_folder):
685 os.mkdir(_folder)
686 logger_cli.info("-> Created folder '{}'".format(_folder))
687 # Dump agent data for this test run
688 write_str_to_file(filename, json.dumps(data, indent=2))
689 logger_cli.info("-> Dumped '{}'".format(filename))
Alex3034ba52021-11-13 17:06:45 -0600690 return
691
Alexb2129542021-11-23 15:49:42 -0600692 def load_dumped_result(self, filename):
693 try:
694 with open(filename, "rt+") as f:
695 return json.loads(f.read())
696 except FileNotFoundError as e:
697 logger_cli.error(
698 "ERROR: {}".format(e)
699 )
700 except TypeError as e:
701 logger_cli.error(
702 "ERROR: Invalid file ({}): {}".format(filename, e)
703 )
704 except json.decoder.JSONDecodeError as e:
705 logger_cli.error(
706 "ERROR: Failed to decode json ({}): {}".format(filename, e)
707 )
708 return None
709
710 def _lookup_storage_class_id_by_name(self, storage_class_name):
711 # Assume that self had proper data
712 for _pool in self.ceph_df["pools"]:
713 if storage_class_name == _pool["name"]:
714 return _pool["id"]
715 return None
716
717 def calculate_totals(self):
Alexe4de1142022-11-04 19:26:03 -0500718 def _savg(vlist):
719 if len(vlist) > 0:
720 return (sum(vlist) / len(vlist)) / 1000
721 else:
722 return 0
Alexb2129542021-11-23 15:49:42 -0600723 # Calculate totals for Read and Write
724 for _time, data in self.results.items():
725 if "totals" not in data:
726 data["totals"] = {}
727 else:
728 continue
729 _totals = data["totals"]
730 _r_bw = 0
731 _r_avglat = []
Alex0989ecf2022-03-29 13:43:21 -0500732 _r_95clat = []
Alexb2129542021-11-23 15:49:42 -0600733 _r_iops = 0
734 _w_bw = 0
735 _w_avglat = []
Alex0989ecf2022-03-29 13:43:21 -0500736 _w_95clat = []
Alexb2129542021-11-23 15:49:42 -0600737 _w_iops = 0
738 for _a, _d in data["agents"].items():
739 # Hardcoded number of jobs param :(
Alex90ac1532021-12-09 11:13:14 -0600740 _j = _d["jobs"][0]
Alexb2129542021-11-23 15:49:42 -0600741 _r_bw += _j["read"]["bw_bytes"]
742 _r_avglat += [_j["read"]["lat_ns"]["mean"]]
743 _r_iops += _j["read"]["iops"]
744 _w_bw += _j["write"]["bw_bytes"]
745 _w_avglat += [_j["write"]["lat_ns"]["mean"]]
746 _w_iops += _j["write"]["iops"]
Alexdefbfeb2022-11-08 12:17:54 -0600747 # check for percentiles
748 if "percentile" in _j["read"]["clat_ns"]:
749 _r_95clat += \
750 [_j["read"]["clat_ns"]["percentile"]["95.000000"]]
751 else:
752 _r_95clat += []
753 if "percentile" in _j["write"]["clat_ns"]:
754 _w_95clat += \
755 [_j["write"]["clat_ns"]["percentile"]["95.000000"]]
756 else:
757 _w_95clat += []
758
Alexb2129542021-11-23 15:49:42 -0600759 # Save storage class name
760 if "storage_class" not in _totals:
761 _totals["storage_class"] = \
762 self.agent_results[_a]["storage_class"]
763 # Lookup storage class id and num_pg
764 _totals["storage_class_stats"] = \
765 reporter.get_pool_stats_by_id(
766 self._lookup_storage_class_id_by_name(
767 self.agent_results[_a]["storage_class"]
768 ),
769 self.ceph_pg_dump
770 )
771
772 _totals["read_bw_bytes"] = _r_bw
Alexe4de1142022-11-04 19:26:03 -0500773 _totals["read_avg_lat_us"] = _savg(_r_avglat)
774 _totals["read_95p_clat_us"] = _savg(_r_95clat)
Alexb2129542021-11-23 15:49:42 -0600775 _totals["read_iops"] = _r_iops
776 _totals["write_bw_bytes"] = _w_bw
Alexe4de1142022-11-04 19:26:03 -0500777 _totals["write_avg_lat_us"] = _savg(_w_avglat)
778 _totals["write_95p_clat_us"] = _savg(_w_95clat)
Alexb2129542021-11-23 15:49:42 -0600779 _totals["write_iops"] = _w_iops
780
Alex90ac1532021-12-09 11:13:14 -0600781 def calculate_ceph_stats(self):
782 # func to get values as lists
Alex30380a42021-12-20 16:11:20 -0600783 def _get_max_value(key, stats):
784 _max_time = 0
785 _value = 0
786 for _k, _v in stats.items():
787 if key in _v and _value < _v[key]:
788 _max_time = _k
789 _value = _v[key]
790 return _max_time, _value
Alex90ac1532021-12-09 11:13:14 -0600791
792 def _perc(n, m):
793 if not n:
794 return 0
795 elif not m:
796 return 0
797 else:
Alex30380a42021-12-20 16:11:20 -0600798 return "{:.0f}%".format((n / m) * 100)
799
800 def _axis_vals(val):
801 return [
802 val, int(val*1.1), int(val*0.75), int(val*0.50), int(val*0.15)
803 ]
Alex90ac1532021-12-09 11:13:14 -0600804
805 _stats = {}
806 for _time, data in self.results.items():
807 if "ceph" not in data:
808 logger_cli.warning(
809 "WARNING: Ceph stats raw data not found in results"
810 )
811 continue
812 if "ceph_stats" not in data:
813 data["ceph_stats"] = {}
814 else:
815 continue
816 # Copy pool stats data
817 for _e, _d in data["ceph"].items():
818 _stats[_e] = _d["pgmap"]
819 # Maximums
Alex30380a42021-12-20 16:11:20 -0600820 mrb_t, mrb = _get_max_value("read_bytes_sec", _stats)
821 mwb_t, mwb = _get_max_value("write_bytes_sec", _stats)
822 mri_t, mri = _get_max_value("read_op_per_sec", _stats)
823 mwi_t, mwi = _get_max_value("write_op_per_sec", _stats)
Alex90ac1532021-12-09 11:13:14 -0600824 # Replace ceph with shorter data
825 data["ceph"] = {
Alex30380a42021-12-20 16:11:20 -0600826 "max_rbl": _axis_vals(mrb),
827 "max_rbl_time": mrb_t,
828 "max_wbl": _axis_vals(mwb),
829 "max_wbl_time": mwb_t,
830 "max_ril": _axis_vals(mri),
831 "max_ril_time": mri_t,
832 "max_wil": _axis_vals(mwi),
833 "max_wil_time": mwi_t,
Alex90ac1532021-12-09 11:13:14 -0600834 "stats": _stats
835 }
836 # Calculate %% values for barchart
837 for _e, _d in data["ceph"]["stats"].items():
838 _d["read_bytes_sec_perc"] = \
Alex30380a42021-12-20 16:11:20 -0600839 _perc(_d.get("read_bytes_sec", 0), mrb)
Alex90ac1532021-12-09 11:13:14 -0600840 _d["write_bytes_sec_perc"] = \
Alex30380a42021-12-20 16:11:20 -0600841 _perc(_d.get("write_bytes_sec", 0), mwb)
Alex90ac1532021-12-09 11:13:14 -0600842 _d["read_op_per_sec_perc"] = \
Alex30380a42021-12-20 16:11:20 -0600843 _perc(_d.get("read_op_per_sec", 0), mri)
Alex90ac1532021-12-09 11:13:14 -0600844 _d["write_op_per_sec_perc"] = \
Alex30380a42021-12-20 16:11:20 -0600845 _perc(_d.get("write_op_per_sec", 0), mwi)
846 return
847
848 def osd_df_compare(self, _time):
849 def _get_osd(osd_id, nodes):
850 for osd in nodes:
851 if osd["id"] == osd_id:
852 return osd
853 return None
854
855 logger_cli.info("# Comparing OSD stats")
856 _osd = {}
857 if _time not in self.results:
858 logger_cli.warning("WARNING: {} not found in results. Check data")
859 return
860 data = self.results[_time]
861 # Save summary
862 data["osd_summary"] = {}
863 data["osd_summary"]["before"] = data["osd_df_before"]["summary"]
864 data["osd_summary"]["after"] = data["osd_df_after"]["summary"]
865 data["osd_summary"]["active"] = {
866 "status": "",
867 "device_class": "",
Alexe4de1142022-11-04 19:26:03 -0500868 "pgs": 0,
Alex30380a42021-12-20 16:11:20 -0600869 "kb_used": 0,
870 "kb_used_data": 0,
871 "kb_used_omap": 0,
872 "kb_used_meta": 0,
873 "utilization": 0,
874 "var_down": 0,
875 "var_up": 0
876 }
877 # Compare OSD counts
878 osds_before = len(data["osd_df_before"]["nodes"])
879 osds_after = len(data["osd_df_after"]["nodes"])
880 if osds_before != osds_after:
881 logger_cli.warning(
882 "WARNING: Before/After bench OSD "
883 "count mismatch for '{}'".format(_time)
884 )
885 # iterate osds from before
886 _pgs = 0
887 _classes = set()
888 _nodes_up = 0
889 for idx in range(osds_before):
890 _osd_b = data["osd_df_before"]["nodes"][idx]
891 # search for the same osd in after
892 _osd_a = _get_osd(_osd_b["id"], data["osd_df_after"]["nodes"])
893 # Save data to the new place
894 _osd[_osd_b["name"]] = {}
895 _osd[_osd_b["name"]]["before"] = _osd_b
896 if not _osd_a:
897 # If this happen, Ceph cluster is actually broken
898 logger_cli.warning(
899 "WARNING: Wow! {} dissapered".format(_osd_b["name"])
900 )
901 _osd[_osd_b["name"]]["after"] = {}
902 else:
903 _osd[_osd_b["name"]]["after"] = _osd_a
904 _osd[_osd_b["name"]]["percent"] = {}
905 # Calculate summary using "after" data
906 _pgs += _osd_a["pgs"]
907 _classes.update([_osd_a["device_class"]])
908 if _osd_a["status"] == "up":
909 _nodes_up += 1
910 # compare
911 _keys_b = list(_osd_b.keys())
912 _keys_a = list(_osd_a.keys())
913 _nodes_up
914 # To be safe, detect if some keys are different
915 # ...and log it.
916 _diff = set(_keys_b).symmetric_difference(_keys_a)
917 if len(_diff) > 0:
918 # This should never happen, actually
919 logger_cli.warning(
920 "WARNING: Before/after keys mismatch "
921 "for OSD node {}: {}".format(idx, ", ".join(_diff))
922 )
923 continue
924 # Compare each key and calculate how it changed
925 for k in _keys_b:
926 if _osd_b[k] != _osd_a[k]:
927 # Announce change
928 logger_cli.debug(
929 "-> {:4}: {}, {} -> {}".format(
930 idx,
931 k,
932 _osd_b[k],
933 _osd_a[k]
934 )
935 )
936 # calculate percent
937 _change_perc = (_osd_a[k] / _osd_b[k]) * 100 - 100
938 _osd[_osd_b["name"]]["percent"][k] = _change_perc
939
940 # Increase counters
941 _p = data["osd_summary"]["active"]
942
943 if k not in _p:
944 _p[k] = 1
945 else:
946 _p[k] += 1
947 if k == "var":
948 if _change_perc > 0:
949 _p["var_up"] += 1
950 elif _change_perc < 0:
951 _p["var_down"] += 1
952 # Save sorted data
953 data["osds"] = _osd
954 logger_cli.info("-> Removing redundand osd before/after data")
955 data.pop("osd_df_before")
956 data.pop("osd_df_after")
957 # Save summary
958 data["osd_summary"]["active"]["status"] = "{}".format(_nodes_up)
959 data["osd_summary"]["active"]["device_class"] = \
960 "{}".format(len(list(_classes)))
961 data["osd_summary"]["active"]["pgs"] = _pgs
Alex90ac1532021-12-09 11:13:14 -0600962 return
963
Alex5cace3b2021-11-10 16:40:37 -0600964 # Create report
Alex2a7657c2021-11-10 20:51:34 -0600965 def create_report(self, filename):
Alexb2129542021-11-23 15:49:42 -0600966 """
967 Create static html showing ceph info report
968
969 :return: none
970 """
971 logger_cli.info("### Generating report to '{}'".format(filename))
972 _report = reporter.ReportToFile(
973 reporter.HTMLCephBench(self),
974 filename
975 )
Alexb2129542021-11-23 15:49:42 -0600976 _report(
977 {
978 "results": self.results,
979 "idle_status": self.ceph_idle_status,
980 "health_detail": self.health_detail,
981 "ceph_df": self.ceph_df,
982 "ceph_pg_dump": self.ceph_pg_dump,
983 "info": self.ceph_info.ceph_info,
984 "cluster": self.ceph_info.cluster_info,
985 "ceph_version": self.ceph_info.ceph_version,
986 "nodes": self.agent_pods
987 }
988 )
989 logger_cli.info("-> Done")
Alex5cace3b2021-11-10 16:40:37 -0600990
991 return