blob: 2eedcfb36e357aa9e41dd40f2e717d27a7816bdc [file] [log] [blame]
Alex5cace3b2021-11-10 16:40:37 -06001import csv
2import os
3import json
4
Alex3034ba52021-11-13 17:06:45 -06005from datetime import datetime, timedelta, timezone
Alex5cace3b2021-11-10 16:40:37 -06006
Alexdcb792f2021-10-04 14:24:21 -05007from cfg_checker.common import logger_cli
Alex3034ba52021-11-13 17:06:45 -06008from cfg_checker.common.decorators import retry
Alexb2129542021-11-23 15:49:42 -06009from cfg_checker.common.file_utils import write_str_to_file
Alexbfa947c2021-11-11 18:14:28 -060010from cfg_checker.helpers.console_utils import Progress
Alexb2129542021-11-23 15:49:42 -060011from cfg_checker.reports import reporter
Alexdcb792f2021-10-04 14:24:21 -050012# from cfg_checker.common.exception import InvalidReturnException
13# from cfg_checker.common.exception import ConfigException
14# from cfg_checker.common.exception import KubeException
15
16from cfg_checker.nodes import KubeNodes
Alex5cace3b2021-11-10 16:40:37 -060017from cfg_checker.agent.fio_runner import _get_seconds, _datetime_fmt
Alexdcb792f2021-10-04 14:24:21 -050018
19
Alex90ac1532021-12-09 11:13:14 -060020_file_datetime_fmt = "%m%d%Y%H%M%S%z"
21
22
Alexb2129542021-11-23 15:49:42 -060023def _reformat_timestr(_str, _chars=["/", ",", " ", ":", "+"], _tchar=""):
24 _new = ""
25 for _c in _str:
26 _new += _c if _c not in _chars else _tchar
27 return _new
28
29
30def _parse_json_output(buffer):
31 try:
32 return json.loads(buffer)
33 except TypeError as e:
34 logger_cli.error(
35 "ERROR: Status not decoded: {}\n{}".format(e, buffer)
36 )
37 except json.decoder.JSONDecodeError as e:
38 logger_cli.error(
39 "ERROR: Status not decoded: {}\n{}".format(e, buffer)
40 )
41 return {}
42
43
Alexdcb792f2021-10-04 14:24:21 -050044class CephBench(object):
Alex5cace3b2021-11-10 16:40:37 -060045 _agent_template = "cfgagent-template.yaml"
46
47 def __init__(self, config):
Alexdcb792f2021-10-04 14:24:21 -050048 self.env_config = config
49 return
50
Alexdcb792f2021-10-04 14:24:21 -050051
52class SaltCephBench(CephBench):
53 def __init__(
54 self,
55 config
56 ):
Alex5cace3b2021-11-10 16:40:37 -060057 logger_cli.error("ERROR: Not impelented for Salt environment!")
Alexdcb792f2021-10-04 14:24:21 -050058
59 # self.master = SaltNodes(config)
Alex5cace3b2021-11-10 16:40:37 -060060 super(SaltCephBench, self).__init__(config)
Alexdcb792f2021-10-04 14:24:21 -050061 return
62
63
64class KubeCephBench(CephBench):
65 def __init__(self, config):
Alex5cace3b2021-11-10 16:40:37 -060066 self.agent_count = config.bench_agent_count
Alexdcb792f2021-10-04 14:24:21 -050067 self.master = KubeNodes(config)
68 super(KubeCephBench, self).__init__(config)
Alexb2129542021-11-23 15:49:42 -060069
Alex5cace3b2021-11-10 16:40:37 -060070 self.mode = config.bench_mode
Alexb2129542021-11-23 15:49:42 -060071 self.resource_prefix = config.resource_prefix
Alex5cace3b2021-11-10 16:40:37 -060072 if config.bench_mode == "tasks":
Alex3034ba52021-11-13 17:06:45 -060073 self.taskfile = config.bench_task_file
74 self.load_tasks(self.taskfile)
Alexb2129542021-11-23 15:49:42 -060075 elif config.bench_mode == "cleanup":
76 self.cleanup_list = []
77 return
78
79 self.storage_class = config.bench_storage_class
80 self.results_dump_path = config.bench_results_dump_path
Alex90ac1532021-12-09 11:13:14 -060081 self.bench_name = config.bench_name
Alexb2129542021-11-23 15:49:42 -060082 self.agent_pods = []
83 self.services = []
84 # By default,
85 # 30 seconds should be enough to send tasks to 3-5 agents
86 self.scheduled_delay = 30
Alex5cace3b2021-11-10 16:40:37 -060087
Alex2a7657c2021-11-10 20:51:34 -060088 self.cleanup_list = []
Alex3034ba52021-11-13 17:06:45 -060089 self.results = {}
Alexb2129542021-11-23 15:49:42 -060090 self.agent_results = {}
91
92 def set_ceph_info_class(self, ceph_info):
93 self.ceph_info = ceph_info
Alex2a7657c2021-11-10 20:51:34 -060094
Alex5cace3b2021-11-10 16:40:37 -060095 def load_tasks(self, taskfile):
96 # Load csv file
97 logger_cli.info("-> Loading taskfile '{}'".format(taskfile))
98 self.tasks = []
99 with open(taskfile) as f:
100 _reader = csv.reader(f, delimiter=',')
101 # load packages
102 for row in _reader:
103 self.tasks.append({
104 "readwrite": row[0],
105 "rwmixread": row[1],
106 "bs": row[2],
107 "iodepth": row[3],
108 "size": row[4]
109 })
Alexb2129542021-11-23 15:49:42 -0600110 logger_cli.info("-> Loaded {} tasks".format(len(self.tasks)))
Alex5cace3b2021-11-10 16:40:37 -0600111
Alex2a7657c2021-11-10 20:51:34 -0600112 def add_for_deletion(self, obj, typ):
Alexb2129542021-11-23 15:49:42 -0600113 self.cleanup_list.append(
114 [
115 typ,
116 obj.metadata.namespace,
117 obj.metadata.name
118 ]
119 )
120 return
121
122 def prepare_cleanup(self):
123 # Assume number of resources not given
124 # list all svc, pod, pvc, pv and identify 'cfgagent-xx ones
125 _types = ["pv", "pvc", "pod", "svc"]
126 _prefix = self.resource_prefix
127 for _typ in _types:
128 _list = self.master.list_resource_names_by_type_and_ns(_typ)
129 for ns, name in _list:
130 if name.startswith(_prefix):
131 if ns:
132 _msg = "{} {}/{}".format(_typ, ns, name)
133 else:
134 _msg = "{} {}".format(_typ, name)
135 logger_cli.info("-> Found {}".format(_msg))
136 self.cleanup_list.append([_typ, ns, name])
Alex2a7657c2021-11-10 20:51:34 -0600137 return
138
Alex5cace3b2021-11-10 16:40:37 -0600139 def prepare_agents(self, options):
140 logger_cli.info("# Preparing {} agents".format(self.agent_count))
141 for idx in range(self.agent_count):
142 # create pvc/pv and pod
143 logger_cli.info("-> creating agent '{:02}'".format(idx))
Alex90ac1532021-12-09 11:13:14 -0600144 # _agent, _pv, _pvc = self.master.prepare_benchmark_agent(
145 _agent, _pvc = self.master.prepare_benchmark_agent(
Alex5cace3b2021-11-10 16:40:37 -0600146 idx,
147 os.path.split(options["filename"])[0],
148 self.storage_class,
149 options['size'] + 'i',
150 self._agent_template
151 )
Alex2a7657c2021-11-10 20:51:34 -0600152 # save it to lists
Alex5cace3b2021-11-10 16:40:37 -0600153 self.agent_pods.append(_agent)
Alex90ac1532021-12-09 11:13:14 -0600154 # self.add_for_deletion(_pv, "pv")
Alex2a7657c2021-11-10 20:51:34 -0600155 self.add_for_deletion(_pvc, "pvc")
156 self.add_for_deletion(_agent, "pod")
157
Alex5cace3b2021-11-10 16:40:37 -0600158 # expose it
159 _svc = self.master.expose_benchmark_agent(_agent)
Alex2a7657c2021-11-10 20:51:34 -0600160 self.add_for_deletion(_svc, "svc")
Alex5cace3b2021-11-10 16:40:37 -0600161 # Save service
162 self.services.append(_svc)
Alex3034ba52021-11-13 17:06:45 -0600163 # prepopulate results
Alexb2129542021-11-23 15:49:42 -0600164 self.agent_results[_agent.metadata.name] = {}
165 self.agent_results[_agent.metadata.name]["url"] = \
Alex5cace3b2021-11-10 16:40:37 -0600166 "http://{}:{}/api/".format(
Alex3034ba52021-11-13 17:06:45 -0600167 _svc.spec.cluster_ip,
Alex5cace3b2021-11-10 16:40:37 -0600168 8765
169 )
Alexb2129542021-11-23 15:49:42 -0600170 self.agent_results[_agent.metadata.name]["storage_class"] = \
Alex3034ba52021-11-13 17:06:45 -0600171 self.storage_class
Alexb2129542021-11-23 15:49:42 -0600172 self.agent_results[_agent.metadata.name]["volume_size"] = \
Alex3034ba52021-11-13 17:06:45 -0600173 options['size']
174
Alex5cace3b2021-11-10 16:40:37 -0600175 logger_cli.info("-> Done creating agents")
Alexb2129542021-11-23 15:49:42 -0600176 # TODO: Update after implementing pooled task sending
Alex90ac1532021-12-09 11:13:14 -0600177 self.scheduled_delay = self.agent_count * 10
Alexb2129542021-11-23 15:49:42 -0600178 logger_cli.info(
179 "-> Schedule delay set to {} sec".format(self.scheduled_delay)
180 )
Alex5cace3b2021-11-10 16:40:37 -0600181 return
182
183 def _poke_agent(self, url, body, action="GET"):
184 _datafile = "/tmp/data"
185 _data = [
Alexbfa947c2021-11-11 18:14:28 -0600186 "-d",
187 "@" + _datafile
Alex5cace3b2021-11-10 16:40:37 -0600188 ]
189 _cmd = [
190 "curl",
191 "-s",
192 "-H",
193 "'Content-Type: application/json'",
194 "-X",
195 action,
196 url
197 ]
198 if body:
199 _cmd += _data
200 _ret = self.master.prepare_json_in_pod(
201 self.agent_pods[0].metadata.name,
202 self.master._namespace,
203 body,
204 _datafile
205 )
206 _ret = self.master.exec_cmd_on_target_pod(
207 self.agent_pods[0].metadata.name,
208 self.master._namespace,
209 " ".join(_cmd)
210 )
Alexb2129542021-11-23 15:49:42 -0600211 return _parse_json_output(_ret)
Alex5cace3b2021-11-10 16:40:37 -0600212
Alex3034ba52021-11-13 17:06:45 -0600213 def _ensure_agents_ready(self):
Alex5cace3b2021-11-10 16:40:37 -0600214 # make sure agents idle
Alex3034ba52021-11-13 17:06:45 -0600215 _status_set = []
216 _ready_set = []
217 for _agent, _d in self.get_agents_status().items():
218 # obviously, there should be some answer
219 if _d is None:
Alex5cace3b2021-11-10 16:40:37 -0600220 logger_cli.error("ERROR: Agent status not available")
221 return False
Alex3034ba52021-11-13 17:06:45 -0600222 # status should be idle or finished
223 if _d['status'] not in ["idle", "finished"]:
224 logger_cli.error(
225 "Agent status invalid {}:{}".format(_agent, _d['status'])
226 )
227 _status_set += [False]
Alex5cace3b2021-11-10 16:40:37 -0600228 else:
Alex3034ba52021-11-13 17:06:45 -0600229 # Good agent
230 _status_set += [True]
231 # agent's fio shell should be in 'ready'
232 if not _d["healthcheck"]["ready"]:
233 logger_cli.error("Agent is not ready {}".format(_agent))
234 _ready_set += [False]
Alex5cace3b2021-11-10 16:40:37 -0600235 else:
Alex3034ba52021-11-13 17:06:45 -0600236 # 'fio' shell for agent is ready
237 _ready_set += [True]
238 # all agent's statuses should be True
239 # and all 'fio' shell modules should be 'ready'
240 if not any(_status_set) or not any(_ready_set):
241 # At least one is not ready and it was already logged above
242 return False
243 else:
244 # All is good
245 return True
246
247 def get_agents_status(self):
248 _status = {}
Alexb2129542021-11-23 15:49:42 -0600249 _results = self.master.exec_on_labeled_pods_and_ns(
250 "app=cfgagent",
251 "curl -s http://localhost:8765/api/fio"
252 )
253 for _agent, _result in _results.items():
254 _j = _parse_json_output(_result)
255 _status[_agent] = _j
Alex3034ba52021-11-13 17:06:45 -0600256 return _status
257
Alexb2129542021-11-23 15:49:42 -0600258 @retry(Exception, initial_wait=5)
Alex3034ba52021-11-13 17:06:45 -0600259 def get_agents_resultlist(self):
260 _t = {"module": "fio", "action": "get_resultlist"}
261 _status = {}
Alexb2129542021-11-23 15:49:42 -0600262 for _agent, _d in self.agent_results.items():
Alex3034ba52021-11-13 17:06:45 -0600263 _status[_agent] = self._poke_agent(_d["url"], _t, action="POST")
264 return _status
265
Alexb2129542021-11-23 15:49:42 -0600266 @retry(Exception, initial_wait=5)
Alex3034ba52021-11-13 17:06:45 -0600267 def get_result_from_agent(self, agent, time):
268 _t = {
269 "module": "fio",
270 "action": "get_result",
271 "options": {
272 "time": time
273 }
274 }
Alexb2129542021-11-23 15:49:42 -0600275 return self._poke_agent(
276 self.agent_results[agent]["url"],
277 _t,
278 action="POST"
279 )
Alex3034ba52021-11-13 17:06:45 -0600280
281 def _get_next_scheduled_time(self):
282 _now = datetime.now(timezone.utc)
283 logger_cli.info("-> time is '{}'".format(_now.strftime(_datetime_fmt)))
Alex90ac1532021-12-09 11:13:14 -0600284 self.next_scheduled_time = _now + timedelta(
285 seconds=self.scheduled_delay
286 )
287 _str_time = self.next_scheduled_time.strftime(_datetime_fmt)
Alex3034ba52021-11-13 17:06:45 -0600288 logger_cli.info(
289 "-> next benchmark scheduled to '{}'".format(_str_time)
290 )
291 return _str_time
292
293 def _send_scheduled_task(self, options):
294 _task = {
295 "module": "fio",
296 "action": "do_scheduledrun",
297 "options": options
298 }
Alexb2129542021-11-23 15:49:42 -0600299 for _agent, _d in self.agent_results.items():
Alex3034ba52021-11-13 17:06:45 -0600300 logger_cli.info(
301 "-> sending task to '{}:{}'".format(_agent, _d["url"])
302 )
303 _ret = self._poke_agent(_d["url"], _task, action="POST")
304 if 'error' in _ret:
305 logger_cli.error(
306 "ERROR: Agent returned: '{}'".format(_ret['error'])
307 )
308 return False
309 # No errors detected
310 return True
311
312 def track_benchmark(self, options):
313 _runtime = _get_seconds(options["runtime"])
314 _ramptime = _get_seconds(options["ramp_time"])
315 # Sum up all timings that we must wait and double it
316 _timeout = (self.scheduled_delay + _runtime + _ramptime) * 2
Alex90ac1532021-12-09 11:13:14 -0600317 _start = self.next_scheduled_time
Alex3034ba52021-11-13 17:06:45 -0600318 _end = datetime.now(timezone.utc) + timedelta(seconds=_timeout)
319 while True:
320 # Print status
Alex3034ba52021-11-13 17:06:45 -0600321 _sts = self.get_agents_status()
322 diff = (_end - datetime.now(timezone.utc)).total_seconds()
323 logger_cli.info("-> {:.2f}s left. Agent status:".format(diff))
324 for _agent, _status in _sts.items():
325 logger_cli.info(
326 "\t{}: {} ({}%)".format(
327 _agent,
328 _status["status"],
329 _status["progress"]
330 )
331 )
Alex90ac1532021-12-09 11:13:14 -0600332 # Get Ceph status if _start time passed
333 _elapsed = (datetime.now(timezone.utc) - _start).total_seconds()
334 if _elapsed > 0:
335 logger_cli.info("-> {:.2f}s elapsed".format(_elapsed))
336 self.results[options["scheduled_to"]]["ceph"][_elapsed] = \
337 self.ceph_info.get_cluster_status()
338 # Check if agents finished
Alexb2129542021-11-23 15:49:42 -0600339 finished = [True for _s in _sts.values()
Alex3034ba52021-11-13 17:06:45 -0600340 if _s["status"] == 'finished']
341 _fcnt = len(finished)
342 _tcnt = len(_sts)
343 if _fcnt < _tcnt:
344 logger_cli.info("-> {}/{} finished".format(_fcnt, _tcnt))
345 else:
346 logger_cli.info("-> All agents finished run")
347 return True
348 # recalc how much is left
349 diff = (_end - datetime.now(timezone.utc)).total_seconds()
350 # In case end_datetime was in past to begin with
351 if diff < 0:
352 logger_cli.info("-> Timed out waiting for agents to finish")
353 return False
Alex3034ba52021-11-13 17:06:45 -0600354
355 def _do_testrun(self, options):
356 # send single to agent
357 if not self._send_scheduled_task(options):
358 return False
359 # Track this benchmark progress
360 if not self.track_benchmark(options):
361 return False
362 else:
Alex90ac1532021-12-09 11:13:14 -0600363 logger_cli.info("-> Finished testrun. Collecting results...")
Alex3034ba52021-11-13 17:06:45 -0600364 # Get results for each agent
Alex90ac1532021-12-09 11:13:14 -0600365 self.collect_results()
366 logger_cli.info("-> Calculating totals and averages")
367 self.calculate_totals()
368 self.calculate_ceph_stats()
369 logger_cli.info("-> Dumping results")
370 for _time, _d in self.results.items():
371 self.dump_result(
372 self._get_dump_filename(_time),
373 _d
374 )
Alex3034ba52021-11-13 17:06:45 -0600375 return True
376
Alexb2129542021-11-23 15:49:42 -0600377 def wait_ceph_cooldown(self):
Alex3034ba52021-11-13 17:06:45 -0600378 # TODO: Query Ceph ince a 20 sec to make sure its load dropped
379
Alexb2129542021-11-23 15:49:42 -0600380 # get ceph idle status
381 self.ceph_idle_status = self.ceph_info.get_cluster_status()
382 self.health_detail = self.ceph_info.get_health_detail()
383 self.ceph_df = self.ceph_info.get_ceph_df()
384 self.ceph_pg_dump = self.ceph_info.get_ceph_pg_dump()
Alex3034ba52021-11-13 17:06:45 -0600385 return
386
387 def run_benchmark(self, options):
388 logger_cli.info("# Starting '{}' benchmark".format(self.mode))
389 # Check agent readyness
390 logger_cli.info("# Checking agents")
391 if not self._ensure_agents_ready():
Alex5cace3b2021-11-10 16:40:37 -0600392 return False
393
394 # Make sure that Ceph is at low load
395 # TODO: Ceph status check
Alexb2129542021-11-23 15:49:42 -0600396 # self._wait_ceph_cooldown()
397
398 _get_df = self.ceph_info.get_ceph_osd_df
Alex5cace3b2021-11-10 16:40:37 -0600399
400 # Do benchmark according to mode
401 if self.mode == "tasks":
Alex5cace3b2021-11-10 16:40:37 -0600402 logger_cli.info(
Alex3034ba52021-11-13 17:06:45 -0600403 "# Running benchmark with tasks from '{}'".format(
404 self.taskfile
Alex5cace3b2021-11-10 16:40:37 -0600405 )
Alex3034ba52021-11-13 17:06:45 -0600406 )
407 # take next task
408 _total_tasks = len(self.tasks)
409 for idx in range(_total_tasks):
Alexb2129542021-11-23 15:49:42 -0600410 # init time to schedule
411 _osd_df_before = _get_df()
Alex3034ba52021-11-13 17:06:45 -0600412 _task = self.tasks[idx]
413 logger_cli.info(
414 "-> Starting next task ({}/{})".format(idx+1, _total_tasks)
415 )
416 logger_cli.info("-> Updating options with: {}".format(
417 ", ".join(
418 ["{} = {}".format(k, v) for k, v in _task.items()]
419 )
420 )
421 )
422 # update options
423 options.update(_task)
Alexb2129542021-11-23 15:49:42 -0600424 _sch_time = self._get_next_scheduled_time()
425 options["scheduled_to"] = _sch_time
426 # init results table
427 self.results[_sch_time] = {
428 "input_options": options,
429 "agents": {},
Alex90ac1532021-12-09 11:13:14 -0600430 "ceph": {},
Alexb2129542021-11-23 15:49:42 -0600431 "osd_df_before": _osd_df_before
432 }
Alex3034ba52021-11-13 17:06:45 -0600433 if not self._do_testrun(options):
434 return False
Alexb2129542021-11-23 15:49:42 -0600435 else:
436 self.results[_sch_time]["osd_df_after"] = _get_df()
Alex5cace3b2021-11-10 16:40:37 -0600437
Alexb2129542021-11-23 15:49:42 -0600438 self.wait_ceph_cooldown()
Alex3034ba52021-11-13 17:06:45 -0600439 elif self.mode == "single":
440 logger_cli.info("# Running single benchmark")
Alexb2129542021-11-23 15:49:42 -0600441 _osd_df_before = _get_df()
Alex3034ba52021-11-13 17:06:45 -0600442 # init time to schedule
Alexb2129542021-11-23 15:49:42 -0600443 _sch_time = self._get_next_scheduled_time()
444 options["scheduled_to"] = _sch_time
445 # init results table
446 self.results[_sch_time] = {
447 "input_options": options,
448 "agents": {},
Alex90ac1532021-12-09 11:13:14 -0600449 "ceph": {},
Alexb2129542021-11-23 15:49:42 -0600450 "osd_df_before": _osd_df_before
451 }
Alex3034ba52021-11-13 17:06:45 -0600452 if not self._do_testrun(options):
453 return False
Alexb2129542021-11-23 15:49:42 -0600454 else:
455 self.results[_sch_time]["osd_df_after"] = _get_df()
Alex3034ba52021-11-13 17:06:45 -0600456 else:
457 logger_cli.error("ERROR: Unknown mode '{}'".format(self.mode))
458 return False
459
460 # Normal exit
461 logger_cli.info("# All benchmark tasks done")
Alex5cace3b2021-11-10 16:40:37 -0600462 return True
463
464 def cleanup(self):
Alexb2129542021-11-23 15:49:42 -0600465 logger_cli.info("# Cleaning up")
Alex2a7657c2021-11-10 20:51:34 -0600466 self.cleanup_list.reverse()
Alex5cace3b2021-11-10 16:40:37 -0600467
Alex2a7657c2021-11-10 20:51:34 -0600468 for _res in self.cleanup_list:
469 self.master.cleanup_resource_by_name(_res[0], _res[2], ns=_res[1])
Alexbfa947c2021-11-11 18:14:28 -0600470
471 # Wait for resource to be cleaned
472 _timeout = 120
473 _total = len(self.cleanup_list)
474 logger_cli.info("-> Wait until {} resources cleaned".format(_total))
475 _p = Progress(_total)
476 while True:
477 _g = self.master.get_resource_phase_by_name
478 _l = [_g(r[0], r[2], ns=r[1]) for r in self.cleanup_list]
479 _l = [item for item in _l if item]
480 _idx = _total - len(_l)
481 if len(_l) > 0:
482 _p.write_progress(_idx)
483 else:
Alex3034ba52021-11-13 17:06:45 -0600484 _p.write_progress(_idx)
Alexbfa947c2021-11-11 18:14:28 -0600485 _p.end()
486 logger_cli.info("# Done cleaning up")
487 break
488 if _timeout > 0:
489 _timeout -= 1
490 else:
491 _p.end()
492 logger_cli.info("# Timed out waiting after 120s.")
493 break
494
Alex5cace3b2021-11-10 16:40:37 -0600495 return
496
Alex90ac1532021-12-09 11:13:14 -0600497 def collect_results(self):
Alex3034ba52021-11-13 17:06:45 -0600498 logger_cli.info("# Collecting results")
499 # query agents for results
500 _agents = self.get_agents_resultlist()
Alex3034ba52021-11-13 17:06:45 -0600501 for _agent, _l in _agents.items():
Alexb2129542021-11-23 15:49:42 -0600502 # Check if we already have this locally
503 for _time in _l["resultlist"]:
Alex90ac1532021-12-09 11:13:14 -0600504 # There is a file already for this task/time
505 # Check if we need to load it
506 if _time not in self.results:
507 # Some older results found
508 # do not process them
509 logger_cli.info(
510 "-> Skipped old results for '{}'".format(_time)
511 )
512 continue
513 elif _agent not in self.results[_time]["agents"]:
514 # Load result add it locally
Alexb2129542021-11-23 15:49:42 -0600515 logger_cli.info(
516 "-> Getting results for '{}' from '{}'".format(
Alex90ac1532021-12-09 11:13:14 -0600517 _time,
Alexb2129542021-11-23 15:49:42 -0600518 _agent
519 )
Alex3034ba52021-11-13 17:06:45 -0600520 )
Alexb2129542021-11-23 15:49:42 -0600521 _r = self.get_result_from_agent(_agent, _time)
Alex90ac1532021-12-09 11:13:14 -0600522 self.results[_time]["agents"][_agent] = _r[_time]
523 else:
524 # Should never happen, actually
525 logger_cli.info(
526 "-> Skipped loaded result for '{}' from '{}'".format(
527 _time,
528 _agent
529 )
530 )
Alex3034ba52021-11-13 17:06:45 -0600531
Alex90ac1532021-12-09 11:13:14 -0600532 def _get_dump_filename(self, _time):
533 _r = self.results[_time]
534 _dirname = _r["input_options"]["name"]
Alexb2129542021-11-23 15:49:42 -0600535 _filename = "-".join([
Alex90ac1532021-12-09 11:13:14 -0600536 _reformat_timestr(_time),
537 "{:02}".format(len(_r["agents"])),
538 _r["input_options"]["readwrite"],
539 _r["input_options"]["bs"],
540 str(_r["input_options"]["iodepth"]),
Alexb2129542021-11-23 15:49:42 -0600541 ]) + ".json"
542 return os.path.join(
543 self.results_dump_path,
544 _dirname,
545 _filename
546 )
547
Alex90ac1532021-12-09 11:13:14 -0600548 def preload_results(self):
549 logger_cli.info(
550 "# Preloading results for '{}'".format(self.bench_name)
551 )
552 # get all dirs in folder
553 _p = self.results_dump_path
554 if not os.path.isdir(_p):
555 logger_cli.warn(
556 "WARNING: Dump path is not a folder '{}'".format(_p)
557 )
558 return
559 for path, dirs, files in os.walk(_p):
560 if path == os.path.join(_p, self.bench_name):
561 logger_cli.info("-> Folder found '{}'".format(path))
562 for _fname in files:
563 logger_cli.debug("... processing '{}'".format(_fname))
564 _ext = _fname.split('.')[-1]
565 if _ext != "json":
566 logger_cli.info(
567 "-> Extension invalid '{}', "
568 "'json' is expected".format(_ext)
569 )
570 continue
571 # get time from filename
572 # Ugly, but works
573 _t = _fname.split('-')[0]
574 _str_time = _t[:14] + "+" + _t[14:]
575 _t = datetime.strptime(_str_time, _file_datetime_fmt)
576 _time = _t.strftime(_datetime_fmt)
577 self.results[_time] = self.load_dumped_result(
578 os.path.join(path, _fname)
579 )
580 logger_cli.info(
581 "-> Loaded '{}' as '{}'".format(
582 _fname,
583 _time
584 )
585 )
586
Alexb2129542021-11-23 15:49:42 -0600587 def dump_result(self, filename, data):
588 # Function dumps all available results as jsons to the given path
Alex3034ba52021-11-13 17:06:45 -0600589 # overwriting if needed
Alexb2129542021-11-23 15:49:42 -0600590 _folder, _file = os.path.split(filename)
591 # Do dump
592 if not os.path.exists(_folder):
593 os.mkdir(_folder)
594 logger_cli.info("-> Created folder '{}'".format(_folder))
595 # Dump agent data for this test run
596 write_str_to_file(filename, json.dumps(data, indent=2))
597 logger_cli.info("-> Dumped '{}'".format(filename))
Alex3034ba52021-11-13 17:06:45 -0600598 return
599
Alexb2129542021-11-23 15:49:42 -0600600 def load_dumped_result(self, filename):
601 try:
602 with open(filename, "rt+") as f:
603 return json.loads(f.read())
604 except FileNotFoundError as e:
605 logger_cli.error(
606 "ERROR: {}".format(e)
607 )
608 except TypeError as e:
609 logger_cli.error(
610 "ERROR: Invalid file ({}): {}".format(filename, e)
611 )
612 except json.decoder.JSONDecodeError as e:
613 logger_cli.error(
614 "ERROR: Failed to decode json ({}): {}".format(filename, e)
615 )
616 return None
617
618 def _lookup_storage_class_id_by_name(self, storage_class_name):
619 # Assume that self had proper data
620 for _pool in self.ceph_df["pools"]:
621 if storage_class_name == _pool["name"]:
622 return _pool["id"]
623 return None
624
625 def calculate_totals(self):
626 # Calculate totals for Read and Write
627 for _time, data in self.results.items():
628 if "totals" not in data:
629 data["totals"] = {}
630 else:
631 continue
632 _totals = data["totals"]
633 _r_bw = 0
634 _r_avglat = []
635 _r_iops = 0
636 _w_bw = 0
637 _w_avglat = []
638 _w_iops = 0
639 for _a, _d in data["agents"].items():
640 # Hardcoded number of jobs param :(
Alex90ac1532021-12-09 11:13:14 -0600641 _j = _d["jobs"][0]
Alexb2129542021-11-23 15:49:42 -0600642 _r_bw += _j["read"]["bw_bytes"]
643 _r_avglat += [_j["read"]["lat_ns"]["mean"]]
644 _r_iops += _j["read"]["iops"]
645 _w_bw += _j["write"]["bw_bytes"]
646 _w_avglat += [_j["write"]["lat_ns"]["mean"]]
647 _w_iops += _j["write"]["iops"]
648 # Save storage class name
649 if "storage_class" not in _totals:
650 _totals["storage_class"] = \
651 self.agent_results[_a]["storage_class"]
652 # Lookup storage class id and num_pg
653 _totals["storage_class_stats"] = \
654 reporter.get_pool_stats_by_id(
655 self._lookup_storage_class_id_by_name(
656 self.agent_results[_a]["storage_class"]
657 ),
658 self.ceph_pg_dump
659 )
660
661 _totals["read_bw_bytes"] = _r_bw
662 _totals["read_avg_lat_us"] = \
663 (sum(_r_avglat) / len(_r_avglat)) / 1000
664 _totals["read_iops"] = _r_iops
665 _totals["write_bw_bytes"] = _w_bw
666 _totals["write_avg_lat_us"] = \
667 (sum(_w_avglat) / len(_w_avglat)) / 1000
668 _totals["write_iops"] = _w_iops
669
Alex90ac1532021-12-09 11:13:14 -0600670 def calculate_ceph_stats(self):
671 # func to get values as lists
672 def _as_list(key, stats):
673 _list = []
674 for _v in stats.values():
675 if key in _v:
676 _list += [_v[key]]
677 else:
678 _list += [0]
679 return _list
680
681 def _perc(n, m):
682 if not n:
683 return 0
684 elif not m:
685 return 0
686 else:
687 return (n / m) * 100
688
689 _stats = {}
690 for _time, data in self.results.items():
691 if "ceph" not in data:
692 logger_cli.warning(
693 "WARNING: Ceph stats raw data not found in results"
694 )
695 continue
696 if "ceph_stats" not in data:
697 data["ceph_stats"] = {}
698 else:
699 continue
700 # Copy pool stats data
701 for _e, _d in data["ceph"].items():
702 _stats[_e] = _d["pgmap"]
703 # Maximums
704 m_r_bytes = max(_as_list("read_bytes_sec", _stats))
705 m_w_bytes = max(_as_list("write_bytes_sec", _stats))
706 m_r_iops = max(_as_list("read_op_per_sec", _stats))
707 m_w_iops = max(_as_list("write_op_per_sec", _stats))
708 # Replace ceph with shorter data
709 data["ceph"] = {
710 "max_read_bytes_sec": m_r_bytes,
711 "max_write_bytes_sec": m_w_bytes,
712 "max_read_iops_sec": m_r_iops,
713 "max_write_iops_sec": m_w_iops,
714 "stats": _stats
715 }
716 # Calculate %% values for barchart
717 for _e, _d in data["ceph"]["stats"].items():
718 _d["read_bytes_sec_perc"] = \
719 _perc(_d.get("read_bytes_sec", 0), m_r_bytes)
720 _d["write_bytes_sec_perc"] = \
721 _perc(_d.get("write_bytes_sec", 0), m_w_bytes)
722 _d["read_op_per_sec_perc"] = \
723 _perc(_d.get("read_op_per_sec", 0), m_r_iops)
724 _d["write_op_per_sec_perc"] = \
725 _perc(_d.get("write_op_per_sec", 0), m_w_iops)
726 return
727
Alex5cace3b2021-11-10 16:40:37 -0600728 # Create report
Alex2a7657c2021-11-10 20:51:34 -0600729 def create_report(self, filename):
Alexb2129542021-11-23 15:49:42 -0600730 """
731 Create static html showing ceph info report
732
733 :return: none
734 """
735 logger_cli.info("### Generating report to '{}'".format(filename))
736 _report = reporter.ReportToFile(
737 reporter.HTMLCephBench(self),
738 filename
739 )
Alexb2129542021-11-23 15:49:42 -0600740 _report(
741 {
742 "results": self.results,
743 "idle_status": self.ceph_idle_status,
744 "health_detail": self.health_detail,
745 "ceph_df": self.ceph_df,
746 "ceph_pg_dump": self.ceph_pg_dump,
747 "info": self.ceph_info.ceph_info,
748 "cluster": self.ceph_info.cluster_info,
749 "ceph_version": self.ceph_info.ceph_version,
750 "nodes": self.agent_pods
751 }
752 )
753 logger_cli.info("-> Done")
Alex5cace3b2021-11-10 16:40:37 -0600754
755 return