blob: fbb0a1311c51dfad380c78db82e54f9f4fdd46c7 [file] [log] [blame]
Alex0989ecf2022-03-29 13:43:21 -05001# Author: Alex Savatieiev (osavatieiev@mirantis.com; a.savex@gmail.com)
2# Copyright 2019-2022 Mirantis, Inc.
Alex5cace3b2021-11-10 16:40:37 -06003import csv
4import os
5import json
6
Alexbdc72742021-12-23 13:26:05 -06007from copy import deepcopy
Alex3034ba52021-11-13 17:06:45 -06008from datetime import datetime, timedelta, timezone
Alex5cace3b2021-11-10 16:40:37 -06009
Alexdcb792f2021-10-04 14:24:21 -050010from cfg_checker.common import logger_cli
Alex3034ba52021-11-13 17:06:45 -060011from cfg_checker.common.decorators import retry
Alexb2129542021-11-23 15:49:42 -060012from cfg_checker.common.file_utils import write_str_to_file
Alexbfa947c2021-11-11 18:14:28 -060013from cfg_checker.helpers.console_utils import Progress
Alexb2129542021-11-23 15:49:42 -060014from cfg_checker.reports import reporter
Alexdcb792f2021-10-04 14:24:21 -050015# from cfg_checker.common.exception import InvalidReturnException
16# from cfg_checker.common.exception import ConfigException
17# from cfg_checker.common.exception import KubeException
18
19from cfg_checker.nodes import KubeNodes
Alex5cace3b2021-11-10 16:40:37 -060020from cfg_checker.agent.fio_runner import _get_seconds, _datetime_fmt
Alexdcb792f2021-10-04 14:24:21 -050021
22
Alex90ac1532021-12-09 11:13:14 -060023_file_datetime_fmt = "%m%d%Y%H%M%S%z"
24
25
Alexb2129542021-11-23 15:49:42 -060026def _reformat_timestr(_str, _chars=["/", ",", " ", ":", "+"], _tchar=""):
27 _new = ""
28 for _c in _str:
29 _new += _c if _c not in _chars else _tchar
30 return _new
31
32
33def _parse_json_output(buffer):
34 try:
35 return json.loads(buffer)
36 except TypeError as e:
37 logger_cli.error(
38 "ERROR: Status not decoded: {}\n{}".format(e, buffer)
39 )
40 except json.decoder.JSONDecodeError as e:
41 logger_cli.error(
42 "ERROR: Status not decoded: {}\n{}".format(e, buffer)
43 )
44 return {}
45
46
Alex30380a42021-12-20 16:11:20 -060047def _split_vol_size(size):
48 # I know, but it is faster then regex
49 _numbers = [48, 49, 50, 51, 52, 53, 54, 55, 56, 57]
50 _s_int = "0"
51 _s_type = ""
52 for ch in size:
53 if ord(ch) in _numbers:
54 _s_int += ch
55 else:
56 _s_type += ch
57 return int(_s_int), _s_type
58
59
Alexdcb792f2021-10-04 14:24:21 -050060class CephBench(object):
Alex5cace3b2021-11-10 16:40:37 -060061 _agent_template = "cfgagent-template.yaml"
62
63 def __init__(self, config):
Alexdcb792f2021-10-04 14:24:21 -050064 self.env_config = config
65 return
66
Alexdcb792f2021-10-04 14:24:21 -050067
68class SaltCephBench(CephBench):
69 def __init__(
70 self,
71 config
72 ):
Alex5cace3b2021-11-10 16:40:37 -060073 logger_cli.error("ERROR: Not impelented for Salt environment!")
Alexdcb792f2021-10-04 14:24:21 -050074
75 # self.master = SaltNodes(config)
Alex5cace3b2021-11-10 16:40:37 -060076 super(SaltCephBench, self).__init__(config)
Alexdcb792f2021-10-04 14:24:21 -050077 return
78
79
80class KubeCephBench(CephBench):
81 def __init__(self, config):
Alex5cace3b2021-11-10 16:40:37 -060082 self.agent_count = config.bench_agent_count
Alexdcb792f2021-10-04 14:24:21 -050083 self.master = KubeNodes(config)
84 super(KubeCephBench, self).__init__(config)
Alexb2129542021-11-23 15:49:42 -060085
Alex5cace3b2021-11-10 16:40:37 -060086 self.mode = config.bench_mode
Alexb2129542021-11-23 15:49:42 -060087 self.resource_prefix = config.resource_prefix
Alex30380a42021-12-20 16:11:20 -060088
Alex5cace3b2021-11-10 16:40:37 -060089 if config.bench_mode == "tasks":
Alex3034ba52021-11-13 17:06:45 -060090 self.taskfile = config.bench_task_file
91 self.load_tasks(self.taskfile)
Alex30380a42021-12-20 16:11:20 -060092
93 if config.bench_mode == "cleanup":
Alexb2129542021-11-23 15:49:42 -060094 self.cleanup_list = []
95 return
96
Alex90ac1532021-12-09 11:13:14 -060097 self.bench_name = config.bench_name
Alex30380a42021-12-20 16:11:20 -060098 self.results_dump_path = config.bench_results_dump_path
99 self.results = {}
100 self.agent_results = {}
101 self.cleanup_list = []
Alexb2129542021-11-23 15:49:42 -0600102 self.agent_pods = []
Alex30380a42021-12-20 16:11:20 -0600103
104 if config.bench_mode == "report":
105 self.results = {}
106 return
107
108 self.storage_class = config.bench_storage_class
Alexb2129542021-11-23 15:49:42 -0600109 self.services = []
110 # By default,
111 # 30 seconds should be enough to send tasks to 3-5 agents
112 self.scheduled_delay = 30
Alex5cace3b2021-11-10 16:40:37 -0600113
Alexb2129542021-11-23 15:49:42 -0600114 def set_ceph_info_class(self, ceph_info):
115 self.ceph_info = ceph_info
Alex2a7657c2021-11-10 20:51:34 -0600116
Alex5cace3b2021-11-10 16:40:37 -0600117 def load_tasks(self, taskfile):
118 # Load csv file
119 logger_cli.info("-> Loading taskfile '{}'".format(taskfile))
120 self.tasks = []
121 with open(taskfile) as f:
122 _reader = csv.reader(f, delimiter=',')
123 # load packages
124 for row in _reader:
125 self.tasks.append({
126 "readwrite": row[0],
127 "rwmixread": row[1],
128 "bs": row[2],
129 "iodepth": row[3],
130 "size": row[4]
131 })
Alexb2129542021-11-23 15:49:42 -0600132 logger_cli.info("-> Loaded {} tasks".format(len(self.tasks)))
Alex5cace3b2021-11-10 16:40:37 -0600133
Alex2a7657c2021-11-10 20:51:34 -0600134 def add_for_deletion(self, obj, typ):
Alexb2129542021-11-23 15:49:42 -0600135 self.cleanup_list.append(
136 [
137 typ,
138 obj.metadata.namespace,
139 obj.metadata.name
140 ]
141 )
142 return
143
144 def prepare_cleanup(self):
145 # Assume number of resources not given
146 # list all svc, pod, pvc, pv and identify 'cfgagent-xx ones
147 _types = ["pv", "pvc", "pod", "svc"]
148 _prefix = self.resource_prefix
149 for _typ in _types:
150 _list = self.master.list_resource_names_by_type_and_ns(_typ)
151 for ns, name in _list:
152 if name.startswith(_prefix):
153 if ns:
154 _msg = "{} {}/{}".format(_typ, ns, name)
155 else:
156 _msg = "{} {}".format(_typ, name)
157 logger_cli.info("-> Found {}".format(_msg))
158 self.cleanup_list.append([_typ, ns, name])
Alex2a7657c2021-11-10 20:51:34 -0600159 return
160
Alex5cace3b2021-11-10 16:40:37 -0600161 def prepare_agents(self, options):
162 logger_cli.info("# Preparing {} agents".format(self.agent_count))
Alex30380a42021-12-20 16:11:20 -0600163 # Increase volume size a bit, so datafile fits
164 _quanitizer = 1.3
165 _v_size, _vol_size_units = _split_vol_size(options['size'])
166 _v_size = round(_v_size * _quanitizer)
167 _vol_size = str(_v_size) + _vol_size_units + "i"
168 logger_cli.info(
169 "-> Testfile size: {0}, Volume size: {1} ({0}*{2})".format(
170 options['size'],
171 _vol_size,
172 _quanitizer
173 )
174 )
175 # Start preparing
Alex5cace3b2021-11-10 16:40:37 -0600176 for idx in range(self.agent_count):
177 # create pvc/pv and pod
178 logger_cli.info("-> creating agent '{:02}'".format(idx))
Alex90ac1532021-12-09 11:13:14 -0600179 # _agent, _pv, _pvc = self.master.prepare_benchmark_agent(
180 _agent, _pvc = self.master.prepare_benchmark_agent(
Alex5cace3b2021-11-10 16:40:37 -0600181 idx,
182 os.path.split(options["filename"])[0],
183 self.storage_class,
Alex30380a42021-12-20 16:11:20 -0600184 _vol_size,
Alex5cace3b2021-11-10 16:40:37 -0600185 self._agent_template
186 )
Alex2a7657c2021-11-10 20:51:34 -0600187 # save it to lists
Alex5cace3b2021-11-10 16:40:37 -0600188 self.agent_pods.append(_agent)
Alex90ac1532021-12-09 11:13:14 -0600189 # self.add_for_deletion(_pv, "pv")
Alex2a7657c2021-11-10 20:51:34 -0600190 self.add_for_deletion(_pvc, "pvc")
191 self.add_for_deletion(_agent, "pod")
192
Alex5cace3b2021-11-10 16:40:37 -0600193 # expose it
194 _svc = self.master.expose_benchmark_agent(_agent)
Alex2a7657c2021-11-10 20:51:34 -0600195 self.add_for_deletion(_svc, "svc")
Alex5cace3b2021-11-10 16:40:37 -0600196 # Save service
197 self.services.append(_svc)
Alex3034ba52021-11-13 17:06:45 -0600198 # prepopulate results
Alexb2129542021-11-23 15:49:42 -0600199 self.agent_results[_agent.metadata.name] = {}
200 self.agent_results[_agent.metadata.name]["url"] = \
Alex5cace3b2021-11-10 16:40:37 -0600201 "http://{}:{}/api/".format(
Alex3034ba52021-11-13 17:06:45 -0600202 _svc.spec.cluster_ip,
Alex5cace3b2021-11-10 16:40:37 -0600203 8765
204 )
Alexb2129542021-11-23 15:49:42 -0600205 self.agent_results[_agent.metadata.name]["storage_class"] = \
Alex3034ba52021-11-13 17:06:45 -0600206 self.storage_class
Alexb2129542021-11-23 15:49:42 -0600207 self.agent_results[_agent.metadata.name]["volume_size"] = \
Alex3034ba52021-11-13 17:06:45 -0600208 options['size']
209
Alex5cace3b2021-11-10 16:40:37 -0600210 logger_cli.info("-> Done creating agents")
Alexb2129542021-11-23 15:49:42 -0600211 # TODO: Update after implementing pooled task sending
Alex90ac1532021-12-09 11:13:14 -0600212 self.scheduled_delay = self.agent_count * 10
Alexb2129542021-11-23 15:49:42 -0600213 logger_cli.info(
214 "-> Schedule delay set to {} sec".format(self.scheduled_delay)
215 )
Alex5cace3b2021-11-10 16:40:37 -0600216 return
217
218 def _poke_agent(self, url, body, action="GET"):
219 _datafile = "/tmp/data"
220 _data = [
Alexbfa947c2021-11-11 18:14:28 -0600221 "-d",
222 "@" + _datafile
Alex5cace3b2021-11-10 16:40:37 -0600223 ]
224 _cmd = [
225 "curl",
226 "-s",
227 "-H",
228 "'Content-Type: application/json'",
229 "-X",
230 action,
231 url
232 ]
233 if body:
234 _cmd += _data
235 _ret = self.master.prepare_json_in_pod(
236 self.agent_pods[0].metadata.name,
237 self.master._namespace,
238 body,
239 _datafile
240 )
241 _ret = self.master.exec_cmd_on_target_pod(
242 self.agent_pods[0].metadata.name,
243 self.master._namespace,
244 " ".join(_cmd)
245 )
Alexb2129542021-11-23 15:49:42 -0600246 return _parse_json_output(_ret)
Alex5cace3b2021-11-10 16:40:37 -0600247
Alex3034ba52021-11-13 17:06:45 -0600248 def _ensure_agents_ready(self):
Alex5cace3b2021-11-10 16:40:37 -0600249 # make sure agents idle
Alex3034ba52021-11-13 17:06:45 -0600250 _status_set = []
251 _ready_set = []
252 for _agent, _d in self.get_agents_status().items():
253 # obviously, there should be some answer
254 if _d is None:
Alex5cace3b2021-11-10 16:40:37 -0600255 logger_cli.error("ERROR: Agent status not available")
256 return False
Alex3034ba52021-11-13 17:06:45 -0600257 # status should be idle or finished
258 if _d['status'] not in ["idle", "finished"]:
259 logger_cli.error(
260 "Agent status invalid {}:{}".format(_agent, _d['status'])
261 )
262 _status_set += [False]
Alex5cace3b2021-11-10 16:40:37 -0600263 else:
Alex3034ba52021-11-13 17:06:45 -0600264 # Good agent
265 _status_set += [True]
266 # agent's fio shell should be in 'ready'
267 if not _d["healthcheck"]["ready"]:
268 logger_cli.error("Agent is not ready {}".format(_agent))
269 _ready_set += [False]
Alex5cace3b2021-11-10 16:40:37 -0600270 else:
Alex3034ba52021-11-13 17:06:45 -0600271 # 'fio' shell for agent is ready
272 _ready_set += [True]
273 # all agent's statuses should be True
274 # and all 'fio' shell modules should be 'ready'
275 if not any(_status_set) or not any(_ready_set):
276 # At least one is not ready and it was already logged above
277 return False
278 else:
279 # All is good
280 return True
281
282 def get_agents_status(self):
283 _status = {}
Alexb2129542021-11-23 15:49:42 -0600284 _results = self.master.exec_on_labeled_pods_and_ns(
285 "app=cfgagent",
286 "curl -s http://localhost:8765/api/fio"
287 )
288 for _agent, _result in _results.items():
289 _j = _parse_json_output(_result)
290 _status[_agent] = _j
Alex3034ba52021-11-13 17:06:45 -0600291 return _status
292
Alexb2129542021-11-23 15:49:42 -0600293 @retry(Exception, initial_wait=5)
Alex3034ba52021-11-13 17:06:45 -0600294 def get_agents_resultlist(self):
295 _t = {"module": "fio", "action": "get_resultlist"}
296 _status = {}
Alexb2129542021-11-23 15:49:42 -0600297 for _agent, _d in self.agent_results.items():
Alex3034ba52021-11-13 17:06:45 -0600298 _status[_agent] = self._poke_agent(_d["url"], _t, action="POST")
299 return _status
300
Alexb2129542021-11-23 15:49:42 -0600301 @retry(Exception, initial_wait=5)
Alex3034ba52021-11-13 17:06:45 -0600302 def get_result_from_agent(self, agent, time):
303 _t = {
304 "module": "fio",
305 "action": "get_result",
306 "options": {
307 "time": time
308 }
309 }
Alexb2129542021-11-23 15:49:42 -0600310 return self._poke_agent(
311 self.agent_results[agent]["url"],
312 _t,
313 action="POST"
314 )
Alex3034ba52021-11-13 17:06:45 -0600315
316 def _get_next_scheduled_time(self):
317 _now = datetime.now(timezone.utc)
318 logger_cli.info("-> time is '{}'".format(_now.strftime(_datetime_fmt)))
Alex90ac1532021-12-09 11:13:14 -0600319 self.next_scheduled_time = _now + timedelta(
320 seconds=self.scheduled_delay
321 )
322 _str_time = self.next_scheduled_time.strftime(_datetime_fmt)
Alex3034ba52021-11-13 17:06:45 -0600323 logger_cli.info(
324 "-> next benchmark scheduled to '{}'".format(_str_time)
325 )
326 return _str_time
327
328 def _send_scheduled_task(self, options):
329 _task = {
330 "module": "fio",
331 "action": "do_scheduledrun",
332 "options": options
333 }
Alexb2129542021-11-23 15:49:42 -0600334 for _agent, _d in self.agent_results.items():
Alex3034ba52021-11-13 17:06:45 -0600335 logger_cli.info(
336 "-> sending task to '{}:{}'".format(_agent, _d["url"])
337 )
338 _ret = self._poke_agent(_d["url"], _task, action="POST")
339 if 'error' in _ret:
340 logger_cli.error(
341 "ERROR: Agent returned: '{}'".format(_ret['error'])
342 )
343 return False
344 # No errors detected
345 return True
346
347 def track_benchmark(self, options):
348 _runtime = _get_seconds(options["runtime"])
349 _ramptime = _get_seconds(options["ramp_time"])
350 # Sum up all timings that we must wait and double it
351 _timeout = (self.scheduled_delay + _runtime + _ramptime) * 2
Alex30380a42021-12-20 16:11:20 -0600352 # We should have no more than 65 measurements
353 _stats_delay = round((_runtime + _ramptime) / 65)
Alex90ac1532021-12-09 11:13:14 -0600354 _start = self.next_scheduled_time
Alex3034ba52021-11-13 17:06:45 -0600355 _end = datetime.now(timezone.utc) + timedelta(seconds=_timeout)
356 while True:
357 # Print status
Alex3034ba52021-11-13 17:06:45 -0600358 _sts = self.get_agents_status()
359 diff = (_end - datetime.now(timezone.utc)).total_seconds()
360 logger_cli.info("-> {:.2f}s left. Agent status:".format(diff))
361 for _agent, _status in _sts.items():
362 logger_cli.info(
363 "\t{}: {} ({}%)".format(
364 _agent,
365 _status["status"],
366 _status["progress"]
367 )
368 )
Alex90ac1532021-12-09 11:13:14 -0600369 # Get Ceph status if _start time passed
370 _elapsed = (datetime.now(timezone.utc) - _start).total_seconds()
Alex30380a42021-12-20 16:11:20 -0600371 if _elapsed > _stats_delay:
Alex90ac1532021-12-09 11:13:14 -0600372 logger_cli.info("-> {:.2f}s elapsed".format(_elapsed))
Alex30380a42021-12-20 16:11:20 -0600373 _sec = "{:0.1f}".format(_elapsed)
374 self.results[options["scheduled_to"]]["ceph"][_sec] = \
Alex90ac1532021-12-09 11:13:14 -0600375 self.ceph_info.get_cluster_status()
376 # Check if agents finished
Alexb2129542021-11-23 15:49:42 -0600377 finished = [True for _s in _sts.values()
Alex3034ba52021-11-13 17:06:45 -0600378 if _s["status"] == 'finished']
379 _fcnt = len(finished)
380 _tcnt = len(_sts)
381 if _fcnt < _tcnt:
382 logger_cli.info("-> {}/{} finished".format(_fcnt, _tcnt))
383 else:
384 logger_cli.info("-> All agents finished run")
385 return True
386 # recalc how much is left
387 diff = (_end - datetime.now(timezone.utc)).total_seconds()
388 # In case end_datetime was in past to begin with
389 if diff < 0:
390 logger_cli.info("-> Timed out waiting for agents to finish")
391 return False
Alex3034ba52021-11-13 17:06:45 -0600392
393 def _do_testrun(self, options):
Alex30380a42021-12-20 16:11:20 -0600394 self.results[options["scheduled_to"]]["osd_df_before"] = \
395 self.ceph_info.get_ceph_osd_df()
Alex3034ba52021-11-13 17:06:45 -0600396 # send single to agent
397 if not self._send_scheduled_task(options):
398 return False
399 # Track this benchmark progress
400 if not self.track_benchmark(options):
401 return False
402 else:
Alex90ac1532021-12-09 11:13:14 -0600403 logger_cli.info("-> Finished testrun. Collecting results...")
Alex30380a42021-12-20 16:11:20 -0600404 # get ceph osd stats
405 self.results[options["scheduled_to"]]["osd_df_after"] = \
406 self.ceph_info.get_ceph_osd_df()
Alex3034ba52021-11-13 17:06:45 -0600407 # Get results for each agent
Alex90ac1532021-12-09 11:13:14 -0600408 self.collect_results()
409 logger_cli.info("-> Calculating totals and averages")
410 self.calculate_totals()
411 self.calculate_ceph_stats()
Alex30380a42021-12-20 16:11:20 -0600412 self.osd_df_compare(options["scheduled_to"])
Alex90ac1532021-12-09 11:13:14 -0600413 logger_cli.info("-> Dumping results")
414 for _time, _d in self.results.items():
415 self.dump_result(
416 self._get_dump_filename(_time),
417 _d
418 )
Alex3034ba52021-11-13 17:06:45 -0600419 return True
420
Alexb2129542021-11-23 15:49:42 -0600421 def wait_ceph_cooldown(self):
Alex3034ba52021-11-13 17:06:45 -0600422 # TODO: Query Ceph ince a 20 sec to make sure its load dropped
423
Alexb2129542021-11-23 15:49:42 -0600424 # get ceph idle status
425 self.ceph_idle_status = self.ceph_info.get_cluster_status()
426 self.health_detail = self.ceph_info.get_health_detail()
427 self.ceph_df = self.ceph_info.get_ceph_df()
428 self.ceph_pg_dump = self.ceph_info.get_ceph_pg_dump()
Alex3034ba52021-11-13 17:06:45 -0600429 return
430
431 def run_benchmark(self, options):
432 logger_cli.info("# Starting '{}' benchmark".format(self.mode))
433 # Check agent readyness
434 logger_cli.info("# Checking agents")
435 if not self._ensure_agents_ready():
Alex5cace3b2021-11-10 16:40:37 -0600436 return False
437
438 # Make sure that Ceph is at low load
439 # TODO: Ceph status check
Alexb2129542021-11-23 15:49:42 -0600440 # self._wait_ceph_cooldown()
441
Alex5cace3b2021-11-10 16:40:37 -0600442 # Do benchmark according to mode
443 if self.mode == "tasks":
Alex5cace3b2021-11-10 16:40:37 -0600444 logger_cli.info(
Alex3034ba52021-11-13 17:06:45 -0600445 "# Running benchmark with tasks from '{}'".format(
446 self.taskfile
Alex5cace3b2021-11-10 16:40:37 -0600447 )
Alex3034ba52021-11-13 17:06:45 -0600448 )
449 # take next task
450 _total_tasks = len(self.tasks)
451 for idx in range(_total_tasks):
Alexb2129542021-11-23 15:49:42 -0600452 # init time to schedule
Alex3034ba52021-11-13 17:06:45 -0600453 _task = self.tasks[idx]
Alexbdc72742021-12-23 13:26:05 -0600454 _r = self.results
Alex3034ba52021-11-13 17:06:45 -0600455 logger_cli.info(
456 "-> Starting next task ({}/{})".format(idx+1, _total_tasks)
457 )
458 logger_cli.info("-> Updating options with: {}".format(
459 ", ".join(
460 ["{} = {}".format(k, v) for k, v in _task.items()]
461 )
462 )
463 )
464 # update options
465 options.update(_task)
Alexbdc72742021-12-23 13:26:05 -0600466 # Check if such result already exists
467 o = "input_options"
468 _existing = filter(
469 lambda t:
470 _r[t]["id"] == idx and
471 _r[t]["mode"] == "tasks" and
472 _r[t][o]["readwrite"] == options["readwrite"] and
473 _r[t][o]["rwmixread"] == options["rwmixread"] and
474 _r[t][o]["bs"] == options["bs"] and
475 _r[t][o]["iodepth"] == options["iodepth"] and
476 _r[t][o]["size"] == options["size"],
477 _r
478 )
479 if len(list(_existing)) > 0:
480 logger_cli.info(
481 "-> Skipped already performed task from {}: "
482 "line {}, {}({}), {}, {}, {}".format(
483 self.taskfile,
484 idx,
485 options["readwrite"],
486 options["rwmixread"],
487 options["bs"],
488 options["iodepth"],
489 options["size"]
490 )
491 )
492 continue
Alexb2129542021-11-23 15:49:42 -0600493 _sch_time = self._get_next_scheduled_time()
494 options["scheduled_to"] = _sch_time
495 # init results table
Alexbdc72742021-12-23 13:26:05 -0600496 _r[_sch_time] = {
497 "id": idx,
498 "mode": self.mode,
499 "input_options": deepcopy(options),
Alexb2129542021-11-23 15:49:42 -0600500 "agents": {},
Alex30380a42021-12-20 16:11:20 -0600501 "ceph": {}
Alexb2129542021-11-23 15:49:42 -0600502 }
Alex30380a42021-12-20 16:11:20 -0600503 # exit on error
Alex3034ba52021-11-13 17:06:45 -0600504 if not self._do_testrun(options):
505 return False
Alex30380a42021-12-20 16:11:20 -0600506 # Save ceph osd stats and wait cooldown
Alexb2129542021-11-23 15:49:42 -0600507 self.wait_ceph_cooldown()
Alex3034ba52021-11-13 17:06:45 -0600508 elif self.mode == "single":
509 logger_cli.info("# Running single benchmark")
510 # init time to schedule
Alexb2129542021-11-23 15:49:42 -0600511 _sch_time = self._get_next_scheduled_time()
512 options["scheduled_to"] = _sch_time
513 # init results table
514 self.results[_sch_time] = {
Alexbdc72742021-12-23 13:26:05 -0600515 "id": "{:2}".format(0),
Alexb2129542021-11-23 15:49:42 -0600516 "input_options": options,
517 "agents": {},
Alex30380a42021-12-20 16:11:20 -0600518 "ceph": {}
Alexb2129542021-11-23 15:49:42 -0600519 }
Alex3034ba52021-11-13 17:06:45 -0600520 if not self._do_testrun(options):
521 return False
Alex30380a42021-12-20 16:11:20 -0600522 # Save ceph osd stats
Alex3034ba52021-11-13 17:06:45 -0600523 else:
524 logger_cli.error("ERROR: Unknown mode '{}'".format(self.mode))
525 return False
526
527 # Normal exit
528 logger_cli.info("# All benchmark tasks done")
Alex5cace3b2021-11-10 16:40:37 -0600529 return True
530
531 def cleanup(self):
Alexb2129542021-11-23 15:49:42 -0600532 logger_cli.info("# Cleaning up")
Alex2a7657c2021-11-10 20:51:34 -0600533 self.cleanup_list.reverse()
Alex5cace3b2021-11-10 16:40:37 -0600534
Alex2a7657c2021-11-10 20:51:34 -0600535 for _res in self.cleanup_list:
536 self.master.cleanup_resource_by_name(_res[0], _res[2], ns=_res[1])
Alexbfa947c2021-11-11 18:14:28 -0600537
538 # Wait for resource to be cleaned
539 _timeout = 120
540 _total = len(self.cleanup_list)
541 logger_cli.info("-> Wait until {} resources cleaned".format(_total))
542 _p = Progress(_total)
543 while True:
544 _g = self.master.get_resource_phase_by_name
545 _l = [_g(r[0], r[2], ns=r[1]) for r in self.cleanup_list]
546 _l = [item for item in _l if item]
547 _idx = _total - len(_l)
548 if len(_l) > 0:
549 _p.write_progress(_idx)
550 else:
Alex3034ba52021-11-13 17:06:45 -0600551 _p.write_progress(_idx)
Alexbfa947c2021-11-11 18:14:28 -0600552 _p.end()
553 logger_cli.info("# Done cleaning up")
554 break
555 if _timeout > 0:
556 _timeout -= 1
557 else:
558 _p.end()
559 logger_cli.info("# Timed out waiting after 120s.")
560 break
561
Alex5cace3b2021-11-10 16:40:37 -0600562 return
563
Alex90ac1532021-12-09 11:13:14 -0600564 def collect_results(self):
Alex3034ba52021-11-13 17:06:45 -0600565 logger_cli.info("# Collecting results")
566 # query agents for results
567 _agents = self.get_agents_resultlist()
Alex3034ba52021-11-13 17:06:45 -0600568 for _agent, _l in _agents.items():
Alexb2129542021-11-23 15:49:42 -0600569 # Check if we already have this locally
570 for _time in _l["resultlist"]:
Alex90ac1532021-12-09 11:13:14 -0600571 # There is a file already for this task/time
572 # Check if we need to load it
573 if _time not in self.results:
574 # Some older results found
575 # do not process them
576 logger_cli.info(
577 "-> Skipped old results for '{}'".format(_time)
578 )
579 continue
580 elif _agent not in self.results[_time]["agents"]:
581 # Load result add it locally
Alexb2129542021-11-23 15:49:42 -0600582 logger_cli.info(
583 "-> Getting results for '{}' from '{}'".format(
Alex90ac1532021-12-09 11:13:14 -0600584 _time,
Alexb2129542021-11-23 15:49:42 -0600585 _agent
586 )
Alex3034ba52021-11-13 17:06:45 -0600587 )
Alexb2129542021-11-23 15:49:42 -0600588 _r = self.get_result_from_agent(_agent, _time)
Alex90ac1532021-12-09 11:13:14 -0600589 self.results[_time]["agents"][_agent] = _r[_time]
590 else:
591 # Should never happen, actually
592 logger_cli.info(
593 "-> Skipped loaded result for '{}' from '{}'".format(
594 _time,
595 _agent
596 )
597 )
Alex3034ba52021-11-13 17:06:45 -0600598
Alex90ac1532021-12-09 11:13:14 -0600599 def _get_dump_filename(self, _time):
600 _r = self.results[_time]
601 _dirname = _r["input_options"]["name"]
Alexb2129542021-11-23 15:49:42 -0600602 _filename = "-".join([
Alex90ac1532021-12-09 11:13:14 -0600603 _reformat_timestr(_time),
604 "{:02}".format(len(_r["agents"])),
605 _r["input_options"]["readwrite"],
606 _r["input_options"]["bs"],
607 str(_r["input_options"]["iodepth"]),
Alexb2129542021-11-23 15:49:42 -0600608 ]) + ".json"
609 return os.path.join(
610 self.results_dump_path,
611 _dirname,
612 _filename
613 )
614
Alex90ac1532021-12-09 11:13:14 -0600615 def preload_results(self):
616 logger_cli.info(
617 "# Preloading results for '{}'".format(self.bench_name)
618 )
619 # get all dirs in folder
620 _p = self.results_dump_path
621 if not os.path.isdir(_p):
622 logger_cli.warn(
623 "WARNING: Dump path is not a folder '{}'".format(_p)
624 )
625 return
626 for path, dirs, files in os.walk(_p):
627 if path == os.path.join(_p, self.bench_name):
628 logger_cli.info("-> Folder found '{}'".format(path))
629 for _fname in files:
630 logger_cli.debug("... processing '{}'".format(_fname))
631 _ext = _fname.split('.')[-1]
632 if _ext != "json":
633 logger_cli.info(
634 "-> Extension invalid '{}', "
635 "'json' is expected".format(_ext)
636 )
637 continue
638 # get time from filename
639 # Ugly, but works
640 _t = _fname.split('-')[0]
641 _str_time = _t[:14] + "+" + _t[14:]
642 _t = datetime.strptime(_str_time, _file_datetime_fmt)
643 _time = _t.strftime(_datetime_fmt)
644 self.results[_time] = self.load_dumped_result(
645 os.path.join(path, _fname)
646 )
647 logger_cli.info(
648 "-> Loaded '{}' as '{}'".format(
649 _fname,
650 _time
651 )
652 )
653
Alexb2129542021-11-23 15:49:42 -0600654 def dump_result(self, filename, data):
655 # Function dumps all available results as jsons to the given path
Alex3034ba52021-11-13 17:06:45 -0600656 # overwriting if needed
Alexb2129542021-11-23 15:49:42 -0600657 _folder, _file = os.path.split(filename)
658 # Do dump
659 if not os.path.exists(_folder):
660 os.mkdir(_folder)
661 logger_cli.info("-> Created folder '{}'".format(_folder))
662 # Dump agent data for this test run
663 write_str_to_file(filename, json.dumps(data, indent=2))
664 logger_cli.info("-> Dumped '{}'".format(filename))
Alex3034ba52021-11-13 17:06:45 -0600665 return
666
Alexb2129542021-11-23 15:49:42 -0600667 def load_dumped_result(self, filename):
668 try:
669 with open(filename, "rt+") as f:
670 return json.loads(f.read())
671 except FileNotFoundError as e:
672 logger_cli.error(
673 "ERROR: {}".format(e)
674 )
675 except TypeError as e:
676 logger_cli.error(
677 "ERROR: Invalid file ({}): {}".format(filename, e)
678 )
679 except json.decoder.JSONDecodeError as e:
680 logger_cli.error(
681 "ERROR: Failed to decode json ({}): {}".format(filename, e)
682 )
683 return None
684
685 def _lookup_storage_class_id_by_name(self, storage_class_name):
686 # Assume that self had proper data
687 for _pool in self.ceph_df["pools"]:
688 if storage_class_name == _pool["name"]:
689 return _pool["id"]
690 return None
691
692 def calculate_totals(self):
693 # Calculate totals for Read and Write
694 for _time, data in self.results.items():
695 if "totals" not in data:
696 data["totals"] = {}
697 else:
698 continue
699 _totals = data["totals"]
700 _r_bw = 0
701 _r_avglat = []
Alex0989ecf2022-03-29 13:43:21 -0500702 _r_95clat = []
Alexb2129542021-11-23 15:49:42 -0600703 _r_iops = 0
704 _w_bw = 0
705 _w_avglat = []
Alex0989ecf2022-03-29 13:43:21 -0500706 _w_95clat = []
Alexb2129542021-11-23 15:49:42 -0600707 _w_iops = 0
708 for _a, _d in data["agents"].items():
709 # Hardcoded number of jobs param :(
Alex90ac1532021-12-09 11:13:14 -0600710 _j = _d["jobs"][0]
Alexb2129542021-11-23 15:49:42 -0600711 _r_bw += _j["read"]["bw_bytes"]
712 _r_avglat += [_j["read"]["lat_ns"]["mean"]]
Alex0989ecf2022-03-29 13:43:21 -0500713 _r_95clat += [_j["read"]["clat_ns"]["percentile"]["95.000000"]]
Alexb2129542021-11-23 15:49:42 -0600714 _r_iops += _j["read"]["iops"]
715 _w_bw += _j["write"]["bw_bytes"]
716 _w_avglat += [_j["write"]["lat_ns"]["mean"]]
Alex0989ecf2022-03-29 13:43:21 -0500717 _w_95clat += \
718 [_j["write"]["clat_ns"]["percentile"]["95.000000"]]
Alexb2129542021-11-23 15:49:42 -0600719 _w_iops += _j["write"]["iops"]
720 # Save storage class name
721 if "storage_class" not in _totals:
722 _totals["storage_class"] = \
723 self.agent_results[_a]["storage_class"]
724 # Lookup storage class id and num_pg
725 _totals["storage_class_stats"] = \
726 reporter.get_pool_stats_by_id(
727 self._lookup_storage_class_id_by_name(
728 self.agent_results[_a]["storage_class"]
729 ),
730 self.ceph_pg_dump
731 )
732
733 _totals["read_bw_bytes"] = _r_bw
734 _totals["read_avg_lat_us"] = \
735 (sum(_r_avglat) / len(_r_avglat)) / 1000
Alex0989ecf2022-03-29 13:43:21 -0500736 _totals["read_95p_clat_us"] = \
737 (sum(_r_95clat) / len(_r_95clat)) / 1000
Alexb2129542021-11-23 15:49:42 -0600738 _totals["read_iops"] = _r_iops
739 _totals["write_bw_bytes"] = _w_bw
740 _totals["write_avg_lat_us"] = \
741 (sum(_w_avglat) / len(_w_avglat)) / 1000
Alex0989ecf2022-03-29 13:43:21 -0500742 _totals["write_95p_clat_us"] = \
743 (sum(_w_95clat) / len(_w_95clat)) / 1000
Alexb2129542021-11-23 15:49:42 -0600744 _totals["write_iops"] = _w_iops
745
Alex90ac1532021-12-09 11:13:14 -0600746 def calculate_ceph_stats(self):
747 # func to get values as lists
Alex30380a42021-12-20 16:11:20 -0600748 def _get_max_value(key, stats):
749 _max_time = 0
750 _value = 0
751 for _k, _v in stats.items():
752 if key in _v and _value < _v[key]:
753 _max_time = _k
754 _value = _v[key]
755 return _max_time, _value
Alex90ac1532021-12-09 11:13:14 -0600756
757 def _perc(n, m):
758 if not n:
759 return 0
760 elif not m:
761 return 0
762 else:
Alex30380a42021-12-20 16:11:20 -0600763 return "{:.0f}%".format((n / m) * 100)
764
765 def _axis_vals(val):
766 return [
767 val, int(val*1.1), int(val*0.75), int(val*0.50), int(val*0.15)
768 ]
Alex90ac1532021-12-09 11:13:14 -0600769
770 _stats = {}
771 for _time, data in self.results.items():
772 if "ceph" not in data:
773 logger_cli.warning(
774 "WARNING: Ceph stats raw data not found in results"
775 )
776 continue
777 if "ceph_stats" not in data:
778 data["ceph_stats"] = {}
779 else:
780 continue
781 # Copy pool stats data
782 for _e, _d in data["ceph"].items():
783 _stats[_e] = _d["pgmap"]
784 # Maximums
Alex30380a42021-12-20 16:11:20 -0600785 mrb_t, mrb = _get_max_value("read_bytes_sec", _stats)
786 mwb_t, mwb = _get_max_value("write_bytes_sec", _stats)
787 mri_t, mri = _get_max_value("read_op_per_sec", _stats)
788 mwi_t, mwi = _get_max_value("write_op_per_sec", _stats)
Alex90ac1532021-12-09 11:13:14 -0600789 # Replace ceph with shorter data
790 data["ceph"] = {
Alex30380a42021-12-20 16:11:20 -0600791 "max_rbl": _axis_vals(mrb),
792 "max_rbl_time": mrb_t,
793 "max_wbl": _axis_vals(mwb),
794 "max_wbl_time": mwb_t,
795 "max_ril": _axis_vals(mri),
796 "max_ril_time": mri_t,
797 "max_wil": _axis_vals(mwi),
798 "max_wil_time": mwi_t,
Alex90ac1532021-12-09 11:13:14 -0600799 "stats": _stats
800 }
801 # Calculate %% values for barchart
802 for _e, _d in data["ceph"]["stats"].items():
803 _d["read_bytes_sec_perc"] = \
Alex30380a42021-12-20 16:11:20 -0600804 _perc(_d.get("read_bytes_sec", 0), mrb)
Alex90ac1532021-12-09 11:13:14 -0600805 _d["write_bytes_sec_perc"] = \
Alex30380a42021-12-20 16:11:20 -0600806 _perc(_d.get("write_bytes_sec", 0), mwb)
Alex90ac1532021-12-09 11:13:14 -0600807 _d["read_op_per_sec_perc"] = \
Alex30380a42021-12-20 16:11:20 -0600808 _perc(_d.get("read_op_per_sec", 0), mri)
Alex90ac1532021-12-09 11:13:14 -0600809 _d["write_op_per_sec_perc"] = \
Alex30380a42021-12-20 16:11:20 -0600810 _perc(_d.get("write_op_per_sec", 0), mwi)
811 return
812
813 def osd_df_compare(self, _time):
814 def _get_osd(osd_id, nodes):
815 for osd in nodes:
816 if osd["id"] == osd_id:
817 return osd
818 return None
819
820 logger_cli.info("# Comparing OSD stats")
821 _osd = {}
822 if _time not in self.results:
823 logger_cli.warning("WARNING: {} not found in results. Check data")
824 return
825 data = self.results[_time]
826 # Save summary
827 data["osd_summary"] = {}
828 data["osd_summary"]["before"] = data["osd_df_before"]["summary"]
829 data["osd_summary"]["after"] = data["osd_df_after"]["summary"]
830 data["osd_summary"]["active"] = {
831 "status": "",
832 "device_class": "",
833 "pgs": "",
834 "kb_used": 0,
835 "kb_used_data": 0,
836 "kb_used_omap": 0,
837 "kb_used_meta": 0,
838 "utilization": 0,
839 "var_down": 0,
840 "var_up": 0
841 }
842 # Compare OSD counts
843 osds_before = len(data["osd_df_before"]["nodes"])
844 osds_after = len(data["osd_df_after"]["nodes"])
845 if osds_before != osds_after:
846 logger_cli.warning(
847 "WARNING: Before/After bench OSD "
848 "count mismatch for '{}'".format(_time)
849 )
850 # iterate osds from before
851 _pgs = 0
852 _classes = set()
853 _nodes_up = 0
854 for idx in range(osds_before):
855 _osd_b = data["osd_df_before"]["nodes"][idx]
856 # search for the same osd in after
857 _osd_a = _get_osd(_osd_b["id"], data["osd_df_after"]["nodes"])
858 # Save data to the new place
859 _osd[_osd_b["name"]] = {}
860 _osd[_osd_b["name"]]["before"] = _osd_b
861 if not _osd_a:
862 # If this happen, Ceph cluster is actually broken
863 logger_cli.warning(
864 "WARNING: Wow! {} dissapered".format(_osd_b["name"])
865 )
866 _osd[_osd_b["name"]]["after"] = {}
867 else:
868 _osd[_osd_b["name"]]["after"] = _osd_a
869 _osd[_osd_b["name"]]["percent"] = {}
870 # Calculate summary using "after" data
871 _pgs += _osd_a["pgs"]
872 _classes.update([_osd_a["device_class"]])
873 if _osd_a["status"] == "up":
874 _nodes_up += 1
875 # compare
876 _keys_b = list(_osd_b.keys())
877 _keys_a = list(_osd_a.keys())
878 _nodes_up
879 # To be safe, detect if some keys are different
880 # ...and log it.
881 _diff = set(_keys_b).symmetric_difference(_keys_a)
882 if len(_diff) > 0:
883 # This should never happen, actually
884 logger_cli.warning(
885 "WARNING: Before/after keys mismatch "
886 "for OSD node {}: {}".format(idx, ", ".join(_diff))
887 )
888 continue
889 # Compare each key and calculate how it changed
890 for k in _keys_b:
891 if _osd_b[k] != _osd_a[k]:
892 # Announce change
893 logger_cli.debug(
894 "-> {:4}: {}, {} -> {}".format(
895 idx,
896 k,
897 _osd_b[k],
898 _osd_a[k]
899 )
900 )
901 # calculate percent
902 _change_perc = (_osd_a[k] / _osd_b[k]) * 100 - 100
903 _osd[_osd_b["name"]]["percent"][k] = _change_perc
904
905 # Increase counters
906 _p = data["osd_summary"]["active"]
907
908 if k not in _p:
909 _p[k] = 1
910 else:
911 _p[k] += 1
912 if k == "var":
913 if _change_perc > 0:
914 _p["var_up"] += 1
915 elif _change_perc < 0:
916 _p["var_down"] += 1
917 # Save sorted data
918 data["osds"] = _osd
919 logger_cli.info("-> Removing redundand osd before/after data")
920 data.pop("osd_df_before")
921 data.pop("osd_df_after")
922 # Save summary
923 data["osd_summary"]["active"]["status"] = "{}".format(_nodes_up)
924 data["osd_summary"]["active"]["device_class"] = \
925 "{}".format(len(list(_classes)))
926 data["osd_summary"]["active"]["pgs"] = _pgs
Alex90ac1532021-12-09 11:13:14 -0600927 return
928
Alex5cace3b2021-11-10 16:40:37 -0600929 # Create report
Alex2a7657c2021-11-10 20:51:34 -0600930 def create_report(self, filename):
Alexb2129542021-11-23 15:49:42 -0600931 """
932 Create static html showing ceph info report
933
934 :return: none
935 """
936 logger_cli.info("### Generating report to '{}'".format(filename))
937 _report = reporter.ReportToFile(
938 reporter.HTMLCephBench(self),
939 filename
940 )
Alexb2129542021-11-23 15:49:42 -0600941 _report(
942 {
943 "results": self.results,
944 "idle_status": self.ceph_idle_status,
945 "health_detail": self.health_detail,
946 "ceph_df": self.ceph_df,
947 "ceph_pg_dump": self.ceph_pg_dump,
948 "info": self.ceph_info.ceph_info,
949 "cluster": self.ceph_info.cluster_info,
950 "ceph_version": self.ceph_info.ceph_version,
951 "nodes": self.agent_pods
952 }
953 )
954 logger_cli.info("-> Done")
Alex5cace3b2021-11-10 16:40:37 -0600955
956 return