blob: d804f4aaa1e5296dad80b98d3f1d179128ee95a4 [file] [log] [blame]
Alex5cace3b2021-11-10 16:40:37 -06001import csv
2import os
3import json
4
Alex3034ba52021-11-13 17:06:45 -06005from datetime import datetime, timedelta, timezone
Alex5cace3b2021-11-10 16:40:37 -06006from time import sleep
7
Alexdcb792f2021-10-04 14:24:21 -05008from cfg_checker.common import logger_cli
Alex3034ba52021-11-13 17:06:45 -06009from cfg_checker.common.decorators import retry
Alexb2129542021-11-23 15:49:42 -060010from cfg_checker.common.file_utils import write_str_to_file
Alexbfa947c2021-11-11 18:14:28 -060011from cfg_checker.helpers.console_utils import Progress
Alexb2129542021-11-23 15:49:42 -060012from cfg_checker.reports import reporter
Alexdcb792f2021-10-04 14:24:21 -050013# from cfg_checker.common.exception import InvalidReturnException
14# from cfg_checker.common.exception import ConfigException
15# from cfg_checker.common.exception import KubeException
16
17from cfg_checker.nodes import KubeNodes
Alex5cace3b2021-11-10 16:40:37 -060018from cfg_checker.agent.fio_runner import _get_seconds, _datetime_fmt
Alexdcb792f2021-10-04 14:24:21 -050019
20
Alexb2129542021-11-23 15:49:42 -060021def _reformat_timestr(_str, _chars=["/", ",", " ", ":", "+"], _tchar=""):
22 _new = ""
23 for _c in _str:
24 _new += _c if _c not in _chars else _tchar
25 return _new
26
27
28def _parse_json_output(buffer):
29 try:
30 return json.loads(buffer)
31 except TypeError as e:
32 logger_cli.error(
33 "ERROR: Status not decoded: {}\n{}".format(e, buffer)
34 )
35 except json.decoder.JSONDecodeError as e:
36 logger_cli.error(
37 "ERROR: Status not decoded: {}\n{}".format(e, buffer)
38 )
39 return {}
40
41
Alexdcb792f2021-10-04 14:24:21 -050042class CephBench(object):
Alex5cace3b2021-11-10 16:40:37 -060043 _agent_template = "cfgagent-template.yaml"
44
45 def __init__(self, config):
Alexdcb792f2021-10-04 14:24:21 -050046 self.env_config = config
47 return
48
Alexdcb792f2021-10-04 14:24:21 -050049
50class SaltCephBench(CephBench):
51 def __init__(
52 self,
53 config
54 ):
Alex5cace3b2021-11-10 16:40:37 -060055 logger_cli.error("ERROR: Not impelented for Salt environment!")
Alexdcb792f2021-10-04 14:24:21 -050056
57 # self.master = SaltNodes(config)
Alex5cace3b2021-11-10 16:40:37 -060058 super(SaltCephBench, self).__init__(config)
Alexdcb792f2021-10-04 14:24:21 -050059 return
60
61
62class KubeCephBench(CephBench):
63 def __init__(self, config):
Alex5cace3b2021-11-10 16:40:37 -060064 self.agent_count = config.bench_agent_count
Alexdcb792f2021-10-04 14:24:21 -050065 self.master = KubeNodes(config)
66 super(KubeCephBench, self).__init__(config)
Alexb2129542021-11-23 15:49:42 -060067
Alex5cace3b2021-11-10 16:40:37 -060068 self.mode = config.bench_mode
Alexb2129542021-11-23 15:49:42 -060069 self.resource_prefix = config.resource_prefix
Alex5cace3b2021-11-10 16:40:37 -060070 if config.bench_mode == "tasks":
Alex3034ba52021-11-13 17:06:45 -060071 self.taskfile = config.bench_task_file
72 self.load_tasks(self.taskfile)
Alexb2129542021-11-23 15:49:42 -060073 elif config.bench_mode == "cleanup":
74 self.cleanup_list = []
75 return
76
77 self.storage_class = config.bench_storage_class
78 self.results_dump_path = config.bench_results_dump_path
79 self.agent_pods = []
80 self.services = []
81 # By default,
82 # 30 seconds should be enough to send tasks to 3-5 agents
83 self.scheduled_delay = 30
Alex5cace3b2021-11-10 16:40:37 -060084
Alex2a7657c2021-11-10 20:51:34 -060085 self.cleanup_list = []
Alex3034ba52021-11-13 17:06:45 -060086 self.results = {}
Alexb2129542021-11-23 15:49:42 -060087 self.agent_results = {}
88
89 def set_ceph_info_class(self, ceph_info):
90 self.ceph_info = ceph_info
Alex2a7657c2021-11-10 20:51:34 -060091
Alex5cace3b2021-11-10 16:40:37 -060092 def load_tasks(self, taskfile):
93 # Load csv file
94 logger_cli.info("-> Loading taskfile '{}'".format(taskfile))
95 self.tasks = []
96 with open(taskfile) as f:
97 _reader = csv.reader(f, delimiter=',')
98 # load packages
99 for row in _reader:
100 self.tasks.append({
101 "readwrite": row[0],
102 "rwmixread": row[1],
103 "bs": row[2],
104 "iodepth": row[3],
105 "size": row[4]
106 })
Alexb2129542021-11-23 15:49:42 -0600107 logger_cli.info("-> Loaded {} tasks".format(len(self.tasks)))
Alex5cace3b2021-11-10 16:40:37 -0600108
Alex2a7657c2021-11-10 20:51:34 -0600109 def add_for_deletion(self, obj, typ):
Alexb2129542021-11-23 15:49:42 -0600110 self.cleanup_list.append(
111 [
112 typ,
113 obj.metadata.namespace,
114 obj.metadata.name
115 ]
116 )
117 return
118
119 def prepare_cleanup(self):
120 # Assume number of resources not given
121 # list all svc, pod, pvc, pv and identify 'cfgagent-xx ones
122 _types = ["pv", "pvc", "pod", "svc"]
123 _prefix = self.resource_prefix
124 for _typ in _types:
125 _list = self.master.list_resource_names_by_type_and_ns(_typ)
126 for ns, name in _list:
127 if name.startswith(_prefix):
128 if ns:
129 _msg = "{} {}/{}".format(_typ, ns, name)
130 else:
131 _msg = "{} {}".format(_typ, name)
132 logger_cli.info("-> Found {}".format(_msg))
133 self.cleanup_list.append([_typ, ns, name])
Alex2a7657c2021-11-10 20:51:34 -0600134 return
135
Alex5cace3b2021-11-10 16:40:37 -0600136 def prepare_agents(self, options):
137 logger_cli.info("# Preparing {} agents".format(self.agent_count))
138 for idx in range(self.agent_count):
139 # create pvc/pv and pod
140 logger_cli.info("-> creating agent '{:02}'".format(idx))
141 _agent, _pv, _pvc = self.master.prepare_benchmark_agent(
142 idx,
143 os.path.split(options["filename"])[0],
144 self.storage_class,
145 options['size'] + 'i',
146 self._agent_template
147 )
Alex2a7657c2021-11-10 20:51:34 -0600148 # save it to lists
Alex5cace3b2021-11-10 16:40:37 -0600149 self.agent_pods.append(_agent)
Alex2a7657c2021-11-10 20:51:34 -0600150 self.add_for_deletion(_pv, "pv")
151 self.add_for_deletion(_pvc, "pvc")
152 self.add_for_deletion(_agent, "pod")
153
Alex5cace3b2021-11-10 16:40:37 -0600154 # expose it
155 _svc = self.master.expose_benchmark_agent(_agent)
Alex2a7657c2021-11-10 20:51:34 -0600156 self.add_for_deletion(_svc, "svc")
Alex5cace3b2021-11-10 16:40:37 -0600157 # Save service
158 self.services.append(_svc)
Alex3034ba52021-11-13 17:06:45 -0600159 # prepopulate results
Alexb2129542021-11-23 15:49:42 -0600160 self.agent_results[_agent.metadata.name] = {}
161 self.agent_results[_agent.metadata.name]["url"] = \
Alex5cace3b2021-11-10 16:40:37 -0600162 "http://{}:{}/api/".format(
Alex3034ba52021-11-13 17:06:45 -0600163 _svc.spec.cluster_ip,
Alex5cace3b2021-11-10 16:40:37 -0600164 8765
165 )
Alexb2129542021-11-23 15:49:42 -0600166 self.agent_results[_agent.metadata.name]["storage_class"] = \
Alex3034ba52021-11-13 17:06:45 -0600167 self.storage_class
Alexb2129542021-11-23 15:49:42 -0600168 self.agent_results[_agent.metadata.name]["volume_size"] = \
Alex3034ba52021-11-13 17:06:45 -0600169 options['size']
170
Alex5cace3b2021-11-10 16:40:37 -0600171 logger_cli.info("-> Done creating agents")
Alexb2129542021-11-23 15:49:42 -0600172 # TODO: Update after implementing pooled task sending
173 self.scheduled_delay = self.agent_count * 6
174 logger_cli.info(
175 "-> Schedule delay set to {} sec".format(self.scheduled_delay)
176 )
Alex5cace3b2021-11-10 16:40:37 -0600177 return
178
179 def _poke_agent(self, url, body, action="GET"):
180 _datafile = "/tmp/data"
181 _data = [
Alexbfa947c2021-11-11 18:14:28 -0600182 "-d",
183 "@" + _datafile
Alex5cace3b2021-11-10 16:40:37 -0600184 ]
185 _cmd = [
186 "curl",
187 "-s",
188 "-H",
189 "'Content-Type: application/json'",
190 "-X",
191 action,
192 url
193 ]
194 if body:
195 _cmd += _data
196 _ret = self.master.prepare_json_in_pod(
197 self.agent_pods[0].metadata.name,
198 self.master._namespace,
199 body,
200 _datafile
201 )
202 _ret = self.master.exec_cmd_on_target_pod(
203 self.agent_pods[0].metadata.name,
204 self.master._namespace,
205 " ".join(_cmd)
206 )
Alexb2129542021-11-23 15:49:42 -0600207 return _parse_json_output(_ret)
Alex5cace3b2021-11-10 16:40:37 -0600208
Alex3034ba52021-11-13 17:06:45 -0600209 def _ensure_agents_ready(self):
Alex5cace3b2021-11-10 16:40:37 -0600210 # make sure agents idle
Alex3034ba52021-11-13 17:06:45 -0600211 _status_set = []
212 _ready_set = []
213 for _agent, _d in self.get_agents_status().items():
214 # obviously, there should be some answer
215 if _d is None:
Alex5cace3b2021-11-10 16:40:37 -0600216 logger_cli.error("ERROR: Agent status not available")
217 return False
Alex3034ba52021-11-13 17:06:45 -0600218 # status should be idle or finished
219 if _d['status'] not in ["idle", "finished"]:
220 logger_cli.error(
221 "Agent status invalid {}:{}".format(_agent, _d['status'])
222 )
223 _status_set += [False]
Alex5cace3b2021-11-10 16:40:37 -0600224 else:
Alex3034ba52021-11-13 17:06:45 -0600225 # Good agent
226 _status_set += [True]
227 # agent's fio shell should be in 'ready'
228 if not _d["healthcheck"]["ready"]:
229 logger_cli.error("Agent is not ready {}".format(_agent))
230 _ready_set += [False]
Alex5cace3b2021-11-10 16:40:37 -0600231 else:
Alex3034ba52021-11-13 17:06:45 -0600232 # 'fio' shell for agent is ready
233 _ready_set += [True]
234 # all agent's statuses should be True
235 # and all 'fio' shell modules should be 'ready'
236 if not any(_status_set) or not any(_ready_set):
237 # At least one is not ready and it was already logged above
238 return False
239 else:
240 # All is good
241 return True
242
243 def get_agents_status(self):
244 _status = {}
Alexb2129542021-11-23 15:49:42 -0600245 _results = self.master.exec_on_labeled_pods_and_ns(
246 "app=cfgagent",
247 "curl -s http://localhost:8765/api/fio"
248 )
249 for _agent, _result in _results.items():
250 _j = _parse_json_output(_result)
251 _status[_agent] = _j
Alex3034ba52021-11-13 17:06:45 -0600252 return _status
253
Alexb2129542021-11-23 15:49:42 -0600254 @retry(Exception, initial_wait=5)
Alex3034ba52021-11-13 17:06:45 -0600255 def get_agents_resultlist(self):
256 _t = {"module": "fio", "action": "get_resultlist"}
257 _status = {}
Alexb2129542021-11-23 15:49:42 -0600258 for _agent, _d in self.agent_results.items():
Alex3034ba52021-11-13 17:06:45 -0600259 _status[_agent] = self._poke_agent(_d["url"], _t, action="POST")
260 return _status
261
Alexb2129542021-11-23 15:49:42 -0600262 @retry(Exception, initial_wait=5)
Alex3034ba52021-11-13 17:06:45 -0600263 def get_result_from_agent(self, agent, time):
264 _t = {
265 "module": "fio",
266 "action": "get_result",
267 "options": {
268 "time": time
269 }
270 }
Alexb2129542021-11-23 15:49:42 -0600271 return self._poke_agent(
272 self.agent_results[agent]["url"],
273 _t,
274 action="POST"
275 )
Alex3034ba52021-11-13 17:06:45 -0600276
277 def _get_next_scheduled_time(self):
278 _now = datetime.now(timezone.utc)
279 logger_cli.info("-> time is '{}'".format(_now.strftime(_datetime_fmt)))
280 _time = _now + timedelta(seconds=self.scheduled_delay)
281 _str_time = _time.strftime(_datetime_fmt)
282 logger_cli.info(
283 "-> next benchmark scheduled to '{}'".format(_str_time)
284 )
285 return _str_time
286
287 def _send_scheduled_task(self, options):
288 _task = {
289 "module": "fio",
290 "action": "do_scheduledrun",
291 "options": options
292 }
Alexb2129542021-11-23 15:49:42 -0600293 for _agent, _d in self.agent_results.items():
Alex3034ba52021-11-13 17:06:45 -0600294 logger_cli.info(
295 "-> sending task to '{}:{}'".format(_agent, _d["url"])
296 )
297 _ret = self._poke_agent(_d["url"], _task, action="POST")
298 if 'error' in _ret:
299 logger_cli.error(
300 "ERROR: Agent returned: '{}'".format(_ret['error'])
301 )
302 return False
303 # No errors detected
304 return True
305
306 def track_benchmark(self, options):
307 _runtime = _get_seconds(options["runtime"])
308 _ramptime = _get_seconds(options["ramp_time"])
309 # Sum up all timings that we must wait and double it
310 _timeout = (self.scheduled_delay + _runtime + _ramptime) * 2
311 _end = datetime.now(timezone.utc) + timedelta(seconds=_timeout)
312 while True:
313 # Print status
314 # TODO: do pooled status get
315 _sts = self.get_agents_status()
316 diff = (_end - datetime.now(timezone.utc)).total_seconds()
317 logger_cli.info("-> {:.2f}s left. Agent status:".format(diff))
318 for _agent, _status in _sts.items():
319 logger_cli.info(
320 "\t{}: {} ({}%)".format(
321 _agent,
322 _status["status"],
323 _status["progress"]
324 )
325 )
Alexb2129542021-11-23 15:49:42 -0600326 finished = [True for _s in _sts.values()
Alex3034ba52021-11-13 17:06:45 -0600327 if _s["status"] == 'finished']
328 _fcnt = len(finished)
329 _tcnt = len(_sts)
330 if _fcnt < _tcnt:
331 logger_cli.info("-> {}/{} finished".format(_fcnt, _tcnt))
332 else:
333 logger_cli.info("-> All agents finished run")
334 return True
335 # recalc how much is left
336 diff = (_end - datetime.now(timezone.utc)).total_seconds()
337 # In case end_datetime was in past to begin with
338 if diff < 0:
339 logger_cli.info("-> Timed out waiting for agents to finish")
340 return False
Alexb2129542021-11-23 15:49:42 -0600341 else:
342 logger_cli.info("-> Sleeping for {:.2f}s".format(2))
343 sleep(2)
Alex3034ba52021-11-13 17:06:45 -0600344 if diff <= 0.1:
345 logger_cli.info("-> Timed out waiting for agents to finish")
346 return False
347
348 def _do_testrun(self, options):
349 # send single to agent
350 if not self._send_scheduled_task(options):
351 return False
352 # Track this benchmark progress
353 if not self.track_benchmark(options):
354 return False
355 else:
356 logger_cli.info("-> Finished testrun")
357 # Get results for each agent
Alexb2129542021-11-23 15:49:42 -0600358 self.collect_results(options)
Alex3034ba52021-11-13 17:06:45 -0600359 return True
360
Alexb2129542021-11-23 15:49:42 -0600361 def wait_ceph_cooldown(self):
Alex3034ba52021-11-13 17:06:45 -0600362 # TODO: Query Ceph ince a 20 sec to make sure its load dropped
363
Alexb2129542021-11-23 15:49:42 -0600364 # get ceph idle status
365 self.ceph_idle_status = self.ceph_info.get_cluster_status()
366 self.health_detail = self.ceph_info.get_health_detail()
367 self.ceph_df = self.ceph_info.get_ceph_df()
368 self.ceph_pg_dump = self.ceph_info.get_ceph_pg_dump()
Alex3034ba52021-11-13 17:06:45 -0600369 return
370
371 def run_benchmark(self, options):
372 logger_cli.info("# Starting '{}' benchmark".format(self.mode))
373 # Check agent readyness
374 logger_cli.info("# Checking agents")
375 if not self._ensure_agents_ready():
Alex5cace3b2021-11-10 16:40:37 -0600376 return False
377
378 # Make sure that Ceph is at low load
379 # TODO: Ceph status check
Alexb2129542021-11-23 15:49:42 -0600380 # self._wait_ceph_cooldown()
381
382 _get_df = self.ceph_info.get_ceph_osd_df
Alex5cace3b2021-11-10 16:40:37 -0600383
384 # Do benchmark according to mode
385 if self.mode == "tasks":
Alex5cace3b2021-11-10 16:40:37 -0600386 logger_cli.info(
Alex3034ba52021-11-13 17:06:45 -0600387 "# Running benchmark with tasks from '{}'".format(
388 self.taskfile
Alex5cace3b2021-11-10 16:40:37 -0600389 )
Alex3034ba52021-11-13 17:06:45 -0600390 )
391 # take next task
392 _total_tasks = len(self.tasks)
393 for idx in range(_total_tasks):
Alexb2129542021-11-23 15:49:42 -0600394 # init time to schedule
395 _osd_df_before = _get_df()
Alex3034ba52021-11-13 17:06:45 -0600396 _task = self.tasks[idx]
397 logger_cli.info(
398 "-> Starting next task ({}/{})".format(idx+1, _total_tasks)
399 )
400 logger_cli.info("-> Updating options with: {}".format(
401 ", ".join(
402 ["{} = {}".format(k, v) for k, v in _task.items()]
403 )
404 )
405 )
406 # update options
407 options.update(_task)
Alexb2129542021-11-23 15:49:42 -0600408 _sch_time = self._get_next_scheduled_time()
409 options["scheduled_to"] = _sch_time
410 # init results table
411 self.results[_sch_time] = {
412 "input_options": options,
413 "agents": {},
414 "osd_df_before": _osd_df_before
415 }
Alex3034ba52021-11-13 17:06:45 -0600416 if not self._do_testrun(options):
417 return False
Alexb2129542021-11-23 15:49:42 -0600418 else:
419 self.results[_sch_time]["osd_df_after"] = _get_df()
Alex5cace3b2021-11-10 16:40:37 -0600420
Alexb2129542021-11-23 15:49:42 -0600421 self.wait_ceph_cooldown()
Alex3034ba52021-11-13 17:06:45 -0600422 elif self.mode == "single":
423 logger_cli.info("# Running single benchmark")
Alexb2129542021-11-23 15:49:42 -0600424 _osd_df_before = _get_df()
Alex3034ba52021-11-13 17:06:45 -0600425 # init time to schedule
Alexb2129542021-11-23 15:49:42 -0600426 _sch_time = self._get_next_scheduled_time()
427 options["scheduled_to"] = _sch_time
428 # init results table
429 self.results[_sch_time] = {
430 "input_options": options,
431 "agents": {},
432 "osd_df_before": _osd_df_before
433 }
Alex3034ba52021-11-13 17:06:45 -0600434 if not self._do_testrun(options):
435 return False
Alexb2129542021-11-23 15:49:42 -0600436 else:
437 self.results[_sch_time]["osd_df_after"] = _get_df()
Alex3034ba52021-11-13 17:06:45 -0600438 else:
439 logger_cli.error("ERROR: Unknown mode '{}'".format(self.mode))
440 return False
441
442 # Normal exit
443 logger_cli.info("# All benchmark tasks done")
Alex5cace3b2021-11-10 16:40:37 -0600444 return True
445
446 def cleanup(self):
Alexb2129542021-11-23 15:49:42 -0600447 logger_cli.info("# Cleaning up")
Alex2a7657c2021-11-10 20:51:34 -0600448 self.cleanup_list.reverse()
Alex5cace3b2021-11-10 16:40:37 -0600449
Alex2a7657c2021-11-10 20:51:34 -0600450 for _res in self.cleanup_list:
451 self.master.cleanup_resource_by_name(_res[0], _res[2], ns=_res[1])
Alexbfa947c2021-11-11 18:14:28 -0600452
453 # Wait for resource to be cleaned
454 _timeout = 120
455 _total = len(self.cleanup_list)
456 logger_cli.info("-> Wait until {} resources cleaned".format(_total))
457 _p = Progress(_total)
458 while True:
459 _g = self.master.get_resource_phase_by_name
460 _l = [_g(r[0], r[2], ns=r[1]) for r in self.cleanup_list]
461 _l = [item for item in _l if item]
462 _idx = _total - len(_l)
463 if len(_l) > 0:
464 _p.write_progress(_idx)
465 else:
Alex3034ba52021-11-13 17:06:45 -0600466 _p.write_progress(_idx)
Alexbfa947c2021-11-11 18:14:28 -0600467 _p.end()
468 logger_cli.info("# Done cleaning up")
469 break
470 if _timeout > 0:
471 _timeout -= 1
472 else:
473 _p.end()
474 logger_cli.info("# Timed out waiting after 120s.")
475 break
476
Alex5cace3b2021-11-10 16:40:37 -0600477 return
478
Alexb2129542021-11-23 15:49:42 -0600479 def collect_results(self, options):
480 _sch_time = options["scheduled_to"]
Alex3034ba52021-11-13 17:06:45 -0600481 logger_cli.info("# Collecting results")
482 # query agents for results
483 _agents = self.get_agents_resultlist()
Alexb2129542021-11-23 15:49:42 -0600484 # Syntax shortcut
485 _ar = self.results[_sch_time]["agents"]
Alex3034ba52021-11-13 17:06:45 -0600486
487 for _agent, _l in _agents.items():
Alexb2129542021-11-23 15:49:42 -0600488 # Create a syntax shortcut
489 if _agent not in _ar:
490 _ar[_agent] = {}
491 _arl = _ar[_agent]
492 # Check if we already have this locally
493 for _time in _l["resultlist"]:
494 _filename = self._get_dump_filename(_sch_time, _agent, options)
495 if os.path.exists(_filename):
496 # There is a file already for this task
497 # Check if we need to load it
498 if _sch_time in _arl:
499 logger_cli.info(
500 "-> Skipped already processed result '{}'".format(
501 _filename
502 )
503 )
504 else:
505 # Load previously dumped result from disk
506 logger_cli.info(
507 "-> Loading already present result '{}'".format(
508 _filename
509 )
510 )
511 _arl[_sch_time] = self.load_dumped_result(_filename)
512 else:
513 # Load result add it locally and dump it
514 logger_cli.info(
515 "-> Getting results for '{}' from '{}'".format(
516 _sch_time,
517 _agent
518 )
Alex3034ba52021-11-13 17:06:45 -0600519 )
Alexb2129542021-11-23 15:49:42 -0600520 _r = self.get_result_from_agent(_agent, _time)
521 # Important to switch from result status time
522 # to scheduled time
523 _arl[_sch_time] = _r[_time]
524 # Dump collected result
525 self.dump_result(_filename, _arl[_sch_time])
Alex3034ba52021-11-13 17:06:45 -0600526 return
527
Alexb2129542021-11-23 15:49:42 -0600528 def _get_dump_filename(self, _time, agent, options):
529 _dirname = _reformat_timestr(_time)
530 _filename = "-".join([
531 _dirname,
532 agent,
533 options["readwrite"],
534 options["bs"],
535 str(options["iodepth"]),
536 ]) + ".json"
537 return os.path.join(
538 self.results_dump_path,
539 _dirname,
540 _filename
541 )
542
543 def dump_result(self, filename, data):
544 # Function dumps all available results as jsons to the given path
Alex3034ba52021-11-13 17:06:45 -0600545 # overwriting if needed
Alexb2129542021-11-23 15:49:42 -0600546 _folder, _file = os.path.split(filename)
547 # Do dump
548 if not os.path.exists(_folder):
549 os.mkdir(_folder)
550 logger_cli.info("-> Created folder '{}'".format(_folder))
551 # Dump agent data for this test run
552 write_str_to_file(filename, json.dumps(data, indent=2))
553 logger_cli.info("-> Dumped '{}'".format(filename))
Alex3034ba52021-11-13 17:06:45 -0600554 return
555
Alexb2129542021-11-23 15:49:42 -0600556 def load_dumped_result(self, filename):
557 try:
558 with open(filename, "rt+") as f:
559 return json.loads(f.read())
560 except FileNotFoundError as e:
561 logger_cli.error(
562 "ERROR: {}".format(e)
563 )
564 except TypeError as e:
565 logger_cli.error(
566 "ERROR: Invalid file ({}): {}".format(filename, e)
567 )
568 except json.decoder.JSONDecodeError as e:
569 logger_cli.error(
570 "ERROR: Failed to decode json ({}): {}".format(filename, e)
571 )
572 return None
573
574 def _lookup_storage_class_id_by_name(self, storage_class_name):
575 # Assume that self had proper data
576 for _pool in self.ceph_df["pools"]:
577 if storage_class_name == _pool["name"]:
578 return _pool["id"]
579 return None
580
581 def calculate_totals(self):
582 # Calculate totals for Read and Write
583 for _time, data in self.results.items():
584 if "totals" not in data:
585 data["totals"] = {}
586 else:
587 continue
588 _totals = data["totals"]
589 _r_bw = 0
590 _r_avglat = []
591 _r_iops = 0
592 _w_bw = 0
593 _w_avglat = []
594 _w_iops = 0
595 for _a, _d in data["agents"].items():
596 # Hardcoded number of jobs param :(
597 _j = _d[_time]["jobs"][0]
598 _r_bw += _j["read"]["bw_bytes"]
599 _r_avglat += [_j["read"]["lat_ns"]["mean"]]
600 _r_iops += _j["read"]["iops"]
601 _w_bw += _j["write"]["bw_bytes"]
602 _w_avglat += [_j["write"]["lat_ns"]["mean"]]
603 _w_iops += _j["write"]["iops"]
604 # Save storage class name
605 if "storage_class" not in _totals:
606 _totals["storage_class"] = \
607 self.agent_results[_a]["storage_class"]
608 # Lookup storage class id and num_pg
609 _totals["storage_class_stats"] = \
610 reporter.get_pool_stats_by_id(
611 self._lookup_storage_class_id_by_name(
612 self.agent_results[_a]["storage_class"]
613 ),
614 self.ceph_pg_dump
615 )
616
617 _totals["read_bw_bytes"] = _r_bw
618 _totals["read_avg_lat_us"] = \
619 (sum(_r_avglat) / len(_r_avglat)) / 1000
620 _totals["read_iops"] = _r_iops
621 _totals["write_bw_bytes"] = _w_bw
622 _totals["write_avg_lat_us"] = \
623 (sum(_w_avglat) / len(_w_avglat)) / 1000
624 _totals["write_iops"] = _w_iops
625
Alex5cace3b2021-11-10 16:40:37 -0600626 # Create report
Alex2a7657c2021-11-10 20:51:34 -0600627 def create_report(self, filename):
Alexb2129542021-11-23 15:49:42 -0600628 """
629 Create static html showing ceph info report
630
631 :return: none
632 """
633 logger_cli.info("### Generating report to '{}'".format(filename))
634 _report = reporter.ReportToFile(
635 reporter.HTMLCephBench(self),
636 filename
637 )
638 self.calculate_totals()
639 _report(
640 {
641 "results": self.results,
642 "idle_status": self.ceph_idle_status,
643 "health_detail": self.health_detail,
644 "ceph_df": self.ceph_df,
645 "ceph_pg_dump": self.ceph_pg_dump,
646 "info": self.ceph_info.ceph_info,
647 "cluster": self.ceph_info.cluster_info,
648 "ceph_version": self.ceph_info.ceph_version,
649 "nodes": self.agent_pods
650 }
651 )
652 logger_cli.info("-> Done")
Alex5cace3b2021-11-10 16:40:37 -0600653
654 return