blob: 0780596522fa1780ff7ec7f336fbcf83c85de63e [file] [log] [blame]
Alex5cace3b2021-11-10 16:40:37 -06001import csv
2import os
3import json
4
Alex3034ba52021-11-13 17:06:45 -06005from datetime import datetime, timedelta, timezone
Alex5cace3b2021-11-10 16:40:37 -06006
Alexdcb792f2021-10-04 14:24:21 -05007from cfg_checker.common import logger_cli
Alex3034ba52021-11-13 17:06:45 -06008from cfg_checker.common.decorators import retry
Alexb2129542021-11-23 15:49:42 -06009from cfg_checker.common.file_utils import write_str_to_file
Alexbfa947c2021-11-11 18:14:28 -060010from cfg_checker.helpers.console_utils import Progress
Alexb2129542021-11-23 15:49:42 -060011from cfg_checker.reports import reporter
Alexdcb792f2021-10-04 14:24:21 -050012# from cfg_checker.common.exception import InvalidReturnException
13# from cfg_checker.common.exception import ConfigException
14# from cfg_checker.common.exception import KubeException
15
16from cfg_checker.nodes import KubeNodes
Alex5cace3b2021-11-10 16:40:37 -060017from cfg_checker.agent.fio_runner import _get_seconds, _datetime_fmt
Alexdcb792f2021-10-04 14:24:21 -050018
19
Alex90ac1532021-12-09 11:13:14 -060020_file_datetime_fmt = "%m%d%Y%H%M%S%z"
21
22
Alexb2129542021-11-23 15:49:42 -060023def _reformat_timestr(_str, _chars=["/", ",", " ", ":", "+"], _tchar=""):
24 _new = ""
25 for _c in _str:
26 _new += _c if _c not in _chars else _tchar
27 return _new
28
29
30def _parse_json_output(buffer):
31 try:
32 return json.loads(buffer)
33 except TypeError as e:
34 logger_cli.error(
35 "ERROR: Status not decoded: {}\n{}".format(e, buffer)
36 )
37 except json.decoder.JSONDecodeError as e:
38 logger_cli.error(
39 "ERROR: Status not decoded: {}\n{}".format(e, buffer)
40 )
41 return {}
42
43
Alex30380a42021-12-20 16:11:20 -060044def _split_vol_size(size):
45 # I know, but it is faster then regex
46 _numbers = [48, 49, 50, 51, 52, 53, 54, 55, 56, 57]
47 _s_int = "0"
48 _s_type = ""
49 for ch in size:
50 if ord(ch) in _numbers:
51 _s_int += ch
52 else:
53 _s_type += ch
54 return int(_s_int), _s_type
55
56
Alexdcb792f2021-10-04 14:24:21 -050057class CephBench(object):
Alex5cace3b2021-11-10 16:40:37 -060058 _agent_template = "cfgagent-template.yaml"
59
60 def __init__(self, config):
Alexdcb792f2021-10-04 14:24:21 -050061 self.env_config = config
62 return
63
Alexdcb792f2021-10-04 14:24:21 -050064
65class SaltCephBench(CephBench):
66 def __init__(
67 self,
68 config
69 ):
Alex5cace3b2021-11-10 16:40:37 -060070 logger_cli.error("ERROR: Not impelented for Salt environment!")
Alexdcb792f2021-10-04 14:24:21 -050071
72 # self.master = SaltNodes(config)
Alex5cace3b2021-11-10 16:40:37 -060073 super(SaltCephBench, self).__init__(config)
Alexdcb792f2021-10-04 14:24:21 -050074 return
75
76
77class KubeCephBench(CephBench):
78 def __init__(self, config):
Alex5cace3b2021-11-10 16:40:37 -060079 self.agent_count = config.bench_agent_count
Alexdcb792f2021-10-04 14:24:21 -050080 self.master = KubeNodes(config)
81 super(KubeCephBench, self).__init__(config)
Alexb2129542021-11-23 15:49:42 -060082
Alex5cace3b2021-11-10 16:40:37 -060083 self.mode = config.bench_mode
Alexb2129542021-11-23 15:49:42 -060084 self.resource_prefix = config.resource_prefix
Alex30380a42021-12-20 16:11:20 -060085
Alex5cace3b2021-11-10 16:40:37 -060086 if config.bench_mode == "tasks":
Alex3034ba52021-11-13 17:06:45 -060087 self.taskfile = config.bench_task_file
88 self.load_tasks(self.taskfile)
Alex30380a42021-12-20 16:11:20 -060089
90 if config.bench_mode == "cleanup":
Alexb2129542021-11-23 15:49:42 -060091 self.cleanup_list = []
92 return
93
Alex90ac1532021-12-09 11:13:14 -060094 self.bench_name = config.bench_name
Alex30380a42021-12-20 16:11:20 -060095 self.results_dump_path = config.bench_results_dump_path
96 self.results = {}
97 self.agent_results = {}
98 self.cleanup_list = []
Alexb2129542021-11-23 15:49:42 -060099 self.agent_pods = []
Alex30380a42021-12-20 16:11:20 -0600100
101 if config.bench_mode == "report":
102 self.results = {}
103 return
104
105 self.storage_class = config.bench_storage_class
Alexb2129542021-11-23 15:49:42 -0600106 self.services = []
107 # By default,
108 # 30 seconds should be enough to send tasks to 3-5 agents
109 self.scheduled_delay = 30
Alex5cace3b2021-11-10 16:40:37 -0600110
Alexb2129542021-11-23 15:49:42 -0600111 def set_ceph_info_class(self, ceph_info):
112 self.ceph_info = ceph_info
Alex2a7657c2021-11-10 20:51:34 -0600113
Alex5cace3b2021-11-10 16:40:37 -0600114 def load_tasks(self, taskfile):
115 # Load csv file
116 logger_cli.info("-> Loading taskfile '{}'".format(taskfile))
117 self.tasks = []
118 with open(taskfile) as f:
119 _reader = csv.reader(f, delimiter=',')
120 # load packages
121 for row in _reader:
122 self.tasks.append({
123 "readwrite": row[0],
124 "rwmixread": row[1],
125 "bs": row[2],
126 "iodepth": row[3],
127 "size": row[4]
128 })
Alexb2129542021-11-23 15:49:42 -0600129 logger_cli.info("-> Loaded {} tasks".format(len(self.tasks)))
Alex5cace3b2021-11-10 16:40:37 -0600130
Alex2a7657c2021-11-10 20:51:34 -0600131 def add_for_deletion(self, obj, typ):
Alexb2129542021-11-23 15:49:42 -0600132 self.cleanup_list.append(
133 [
134 typ,
135 obj.metadata.namespace,
136 obj.metadata.name
137 ]
138 )
139 return
140
141 def prepare_cleanup(self):
142 # Assume number of resources not given
143 # list all svc, pod, pvc, pv and identify 'cfgagent-xx ones
144 _types = ["pv", "pvc", "pod", "svc"]
145 _prefix = self.resource_prefix
146 for _typ in _types:
147 _list = self.master.list_resource_names_by_type_and_ns(_typ)
148 for ns, name in _list:
149 if name.startswith(_prefix):
150 if ns:
151 _msg = "{} {}/{}".format(_typ, ns, name)
152 else:
153 _msg = "{} {}".format(_typ, name)
154 logger_cli.info("-> Found {}".format(_msg))
155 self.cleanup_list.append([_typ, ns, name])
Alex2a7657c2021-11-10 20:51:34 -0600156 return
157
Alex5cace3b2021-11-10 16:40:37 -0600158 def prepare_agents(self, options):
159 logger_cli.info("# Preparing {} agents".format(self.agent_count))
Alex30380a42021-12-20 16:11:20 -0600160 # Increase volume size a bit, so datafile fits
161 _quanitizer = 1.3
162 _v_size, _vol_size_units = _split_vol_size(options['size'])
163 _v_size = round(_v_size * _quanitizer)
164 _vol_size = str(_v_size) + _vol_size_units + "i"
165 logger_cli.info(
166 "-> Testfile size: {0}, Volume size: {1} ({0}*{2})".format(
167 options['size'],
168 _vol_size,
169 _quanitizer
170 )
171 )
172 # Start preparing
Alex5cace3b2021-11-10 16:40:37 -0600173 for idx in range(self.agent_count):
174 # create pvc/pv and pod
175 logger_cli.info("-> creating agent '{:02}'".format(idx))
Alex90ac1532021-12-09 11:13:14 -0600176 # _agent, _pv, _pvc = self.master.prepare_benchmark_agent(
177 _agent, _pvc = self.master.prepare_benchmark_agent(
Alex5cace3b2021-11-10 16:40:37 -0600178 idx,
179 os.path.split(options["filename"])[0],
180 self.storage_class,
Alex30380a42021-12-20 16:11:20 -0600181 _vol_size,
Alex5cace3b2021-11-10 16:40:37 -0600182 self._agent_template
183 )
Alex2a7657c2021-11-10 20:51:34 -0600184 # save it to lists
Alex5cace3b2021-11-10 16:40:37 -0600185 self.agent_pods.append(_agent)
Alex90ac1532021-12-09 11:13:14 -0600186 # self.add_for_deletion(_pv, "pv")
Alex2a7657c2021-11-10 20:51:34 -0600187 self.add_for_deletion(_pvc, "pvc")
188 self.add_for_deletion(_agent, "pod")
189
Alex5cace3b2021-11-10 16:40:37 -0600190 # expose it
191 _svc = self.master.expose_benchmark_agent(_agent)
Alex2a7657c2021-11-10 20:51:34 -0600192 self.add_for_deletion(_svc, "svc")
Alex5cace3b2021-11-10 16:40:37 -0600193 # Save service
194 self.services.append(_svc)
Alex3034ba52021-11-13 17:06:45 -0600195 # prepopulate results
Alexb2129542021-11-23 15:49:42 -0600196 self.agent_results[_agent.metadata.name] = {}
197 self.agent_results[_agent.metadata.name]["url"] = \
Alex5cace3b2021-11-10 16:40:37 -0600198 "http://{}:{}/api/".format(
Alex3034ba52021-11-13 17:06:45 -0600199 _svc.spec.cluster_ip,
Alex5cace3b2021-11-10 16:40:37 -0600200 8765
201 )
Alexb2129542021-11-23 15:49:42 -0600202 self.agent_results[_agent.metadata.name]["storage_class"] = \
Alex3034ba52021-11-13 17:06:45 -0600203 self.storage_class
Alexb2129542021-11-23 15:49:42 -0600204 self.agent_results[_agent.metadata.name]["volume_size"] = \
Alex3034ba52021-11-13 17:06:45 -0600205 options['size']
206
Alex5cace3b2021-11-10 16:40:37 -0600207 logger_cli.info("-> Done creating agents")
Alexb2129542021-11-23 15:49:42 -0600208 # TODO: Update after implementing pooled task sending
Alex90ac1532021-12-09 11:13:14 -0600209 self.scheduled_delay = self.agent_count * 10
Alexb2129542021-11-23 15:49:42 -0600210 logger_cli.info(
211 "-> Schedule delay set to {} sec".format(self.scheduled_delay)
212 )
Alex5cace3b2021-11-10 16:40:37 -0600213 return
214
215 def _poke_agent(self, url, body, action="GET"):
216 _datafile = "/tmp/data"
217 _data = [
Alexbfa947c2021-11-11 18:14:28 -0600218 "-d",
219 "@" + _datafile
Alex5cace3b2021-11-10 16:40:37 -0600220 ]
221 _cmd = [
222 "curl",
223 "-s",
224 "-H",
225 "'Content-Type: application/json'",
226 "-X",
227 action,
228 url
229 ]
230 if body:
231 _cmd += _data
232 _ret = self.master.prepare_json_in_pod(
233 self.agent_pods[0].metadata.name,
234 self.master._namespace,
235 body,
236 _datafile
237 )
238 _ret = self.master.exec_cmd_on_target_pod(
239 self.agent_pods[0].metadata.name,
240 self.master._namespace,
241 " ".join(_cmd)
242 )
Alexb2129542021-11-23 15:49:42 -0600243 return _parse_json_output(_ret)
Alex5cace3b2021-11-10 16:40:37 -0600244
Alex3034ba52021-11-13 17:06:45 -0600245 def _ensure_agents_ready(self):
Alex5cace3b2021-11-10 16:40:37 -0600246 # make sure agents idle
Alex3034ba52021-11-13 17:06:45 -0600247 _status_set = []
248 _ready_set = []
249 for _agent, _d in self.get_agents_status().items():
250 # obviously, there should be some answer
251 if _d is None:
Alex5cace3b2021-11-10 16:40:37 -0600252 logger_cli.error("ERROR: Agent status not available")
253 return False
Alex3034ba52021-11-13 17:06:45 -0600254 # status should be idle or finished
255 if _d['status'] not in ["idle", "finished"]:
256 logger_cli.error(
257 "Agent status invalid {}:{}".format(_agent, _d['status'])
258 )
259 _status_set += [False]
Alex5cace3b2021-11-10 16:40:37 -0600260 else:
Alex3034ba52021-11-13 17:06:45 -0600261 # Good agent
262 _status_set += [True]
263 # agent's fio shell should be in 'ready'
264 if not _d["healthcheck"]["ready"]:
265 logger_cli.error("Agent is not ready {}".format(_agent))
266 _ready_set += [False]
Alex5cace3b2021-11-10 16:40:37 -0600267 else:
Alex3034ba52021-11-13 17:06:45 -0600268 # 'fio' shell for agent is ready
269 _ready_set += [True]
270 # all agent's statuses should be True
271 # and all 'fio' shell modules should be 'ready'
272 if not any(_status_set) or not any(_ready_set):
273 # At least one is not ready and it was already logged above
274 return False
275 else:
276 # All is good
277 return True
278
279 def get_agents_status(self):
280 _status = {}
Alexb2129542021-11-23 15:49:42 -0600281 _results = self.master.exec_on_labeled_pods_and_ns(
282 "app=cfgagent",
283 "curl -s http://localhost:8765/api/fio"
284 )
285 for _agent, _result in _results.items():
286 _j = _parse_json_output(_result)
287 _status[_agent] = _j
Alex3034ba52021-11-13 17:06:45 -0600288 return _status
289
Alexb2129542021-11-23 15:49:42 -0600290 @retry(Exception, initial_wait=5)
Alex3034ba52021-11-13 17:06:45 -0600291 def get_agents_resultlist(self):
292 _t = {"module": "fio", "action": "get_resultlist"}
293 _status = {}
Alexb2129542021-11-23 15:49:42 -0600294 for _agent, _d in self.agent_results.items():
Alex3034ba52021-11-13 17:06:45 -0600295 _status[_agent] = self._poke_agent(_d["url"], _t, action="POST")
296 return _status
297
Alexb2129542021-11-23 15:49:42 -0600298 @retry(Exception, initial_wait=5)
Alex3034ba52021-11-13 17:06:45 -0600299 def get_result_from_agent(self, agent, time):
300 _t = {
301 "module": "fio",
302 "action": "get_result",
303 "options": {
304 "time": time
305 }
306 }
Alexb2129542021-11-23 15:49:42 -0600307 return self._poke_agent(
308 self.agent_results[agent]["url"],
309 _t,
310 action="POST"
311 )
Alex3034ba52021-11-13 17:06:45 -0600312
313 def _get_next_scheduled_time(self):
314 _now = datetime.now(timezone.utc)
315 logger_cli.info("-> time is '{}'".format(_now.strftime(_datetime_fmt)))
Alex90ac1532021-12-09 11:13:14 -0600316 self.next_scheduled_time = _now + timedelta(
317 seconds=self.scheduled_delay
318 )
319 _str_time = self.next_scheduled_time.strftime(_datetime_fmt)
Alex3034ba52021-11-13 17:06:45 -0600320 logger_cli.info(
321 "-> next benchmark scheduled to '{}'".format(_str_time)
322 )
323 return _str_time
324
325 def _send_scheduled_task(self, options):
326 _task = {
327 "module": "fio",
328 "action": "do_scheduledrun",
329 "options": options
330 }
Alexb2129542021-11-23 15:49:42 -0600331 for _agent, _d in self.agent_results.items():
Alex3034ba52021-11-13 17:06:45 -0600332 logger_cli.info(
333 "-> sending task to '{}:{}'".format(_agent, _d["url"])
334 )
335 _ret = self._poke_agent(_d["url"], _task, action="POST")
336 if 'error' in _ret:
337 logger_cli.error(
338 "ERROR: Agent returned: '{}'".format(_ret['error'])
339 )
340 return False
341 # No errors detected
342 return True
343
344 def track_benchmark(self, options):
345 _runtime = _get_seconds(options["runtime"])
346 _ramptime = _get_seconds(options["ramp_time"])
347 # Sum up all timings that we must wait and double it
348 _timeout = (self.scheduled_delay + _runtime + _ramptime) * 2
Alex30380a42021-12-20 16:11:20 -0600349 # We should have no more than 65 measurements
350 _stats_delay = round((_runtime + _ramptime) / 65)
Alex90ac1532021-12-09 11:13:14 -0600351 _start = self.next_scheduled_time
Alex3034ba52021-11-13 17:06:45 -0600352 _end = datetime.now(timezone.utc) + timedelta(seconds=_timeout)
353 while True:
354 # Print status
Alex3034ba52021-11-13 17:06:45 -0600355 _sts = self.get_agents_status()
356 diff = (_end - datetime.now(timezone.utc)).total_seconds()
357 logger_cli.info("-> {:.2f}s left. Agent status:".format(diff))
358 for _agent, _status in _sts.items():
359 logger_cli.info(
360 "\t{}: {} ({}%)".format(
361 _agent,
362 _status["status"],
363 _status["progress"]
364 )
365 )
Alex90ac1532021-12-09 11:13:14 -0600366 # Get Ceph status if _start time passed
367 _elapsed = (datetime.now(timezone.utc) - _start).total_seconds()
Alex30380a42021-12-20 16:11:20 -0600368 if _elapsed > _stats_delay:
Alex90ac1532021-12-09 11:13:14 -0600369 logger_cli.info("-> {:.2f}s elapsed".format(_elapsed))
Alex30380a42021-12-20 16:11:20 -0600370 _sec = "{:0.1f}".format(_elapsed)
371 self.results[options["scheduled_to"]]["ceph"][_sec] = \
Alex90ac1532021-12-09 11:13:14 -0600372 self.ceph_info.get_cluster_status()
373 # Check if agents finished
Alexb2129542021-11-23 15:49:42 -0600374 finished = [True for _s in _sts.values()
Alex3034ba52021-11-13 17:06:45 -0600375 if _s["status"] == 'finished']
376 _fcnt = len(finished)
377 _tcnt = len(_sts)
378 if _fcnt < _tcnt:
379 logger_cli.info("-> {}/{} finished".format(_fcnt, _tcnt))
380 else:
381 logger_cli.info("-> All agents finished run")
382 return True
383 # recalc how much is left
384 diff = (_end - datetime.now(timezone.utc)).total_seconds()
385 # In case end_datetime was in past to begin with
386 if diff < 0:
387 logger_cli.info("-> Timed out waiting for agents to finish")
388 return False
Alex3034ba52021-11-13 17:06:45 -0600389
390 def _do_testrun(self, options):
Alex30380a42021-12-20 16:11:20 -0600391 self.results[options["scheduled_to"]]["osd_df_before"] = \
392 self.ceph_info.get_ceph_osd_df()
Alex3034ba52021-11-13 17:06:45 -0600393 # send single to agent
394 if not self._send_scheduled_task(options):
395 return False
396 # Track this benchmark progress
397 if not self.track_benchmark(options):
398 return False
399 else:
Alex90ac1532021-12-09 11:13:14 -0600400 logger_cli.info("-> Finished testrun. Collecting results...")
Alex30380a42021-12-20 16:11:20 -0600401 # get ceph osd stats
402 self.results[options["scheduled_to"]]["osd_df_after"] = \
403 self.ceph_info.get_ceph_osd_df()
Alex3034ba52021-11-13 17:06:45 -0600404 # Get results for each agent
Alex90ac1532021-12-09 11:13:14 -0600405 self.collect_results()
406 logger_cli.info("-> Calculating totals and averages")
407 self.calculate_totals()
408 self.calculate_ceph_stats()
Alex30380a42021-12-20 16:11:20 -0600409 self.osd_df_compare(options["scheduled_to"])
Alex90ac1532021-12-09 11:13:14 -0600410 logger_cli.info("-> Dumping results")
411 for _time, _d in self.results.items():
412 self.dump_result(
413 self._get_dump_filename(_time),
414 _d
415 )
Alex3034ba52021-11-13 17:06:45 -0600416 return True
417
Alexb2129542021-11-23 15:49:42 -0600418 def wait_ceph_cooldown(self):
Alex3034ba52021-11-13 17:06:45 -0600419 # TODO: Query Ceph ince a 20 sec to make sure its load dropped
420
Alexb2129542021-11-23 15:49:42 -0600421 # get ceph idle status
422 self.ceph_idle_status = self.ceph_info.get_cluster_status()
423 self.health_detail = self.ceph_info.get_health_detail()
424 self.ceph_df = self.ceph_info.get_ceph_df()
425 self.ceph_pg_dump = self.ceph_info.get_ceph_pg_dump()
Alex3034ba52021-11-13 17:06:45 -0600426 return
427
428 def run_benchmark(self, options):
429 logger_cli.info("# Starting '{}' benchmark".format(self.mode))
430 # Check agent readyness
431 logger_cli.info("# Checking agents")
432 if not self._ensure_agents_ready():
Alex5cace3b2021-11-10 16:40:37 -0600433 return False
434
435 # Make sure that Ceph is at low load
436 # TODO: Ceph status check
Alexb2129542021-11-23 15:49:42 -0600437 # self._wait_ceph_cooldown()
438
Alex5cace3b2021-11-10 16:40:37 -0600439 # Do benchmark according to mode
440 if self.mode == "tasks":
Alex5cace3b2021-11-10 16:40:37 -0600441 logger_cli.info(
Alex3034ba52021-11-13 17:06:45 -0600442 "# Running benchmark with tasks from '{}'".format(
443 self.taskfile
Alex5cace3b2021-11-10 16:40:37 -0600444 )
Alex3034ba52021-11-13 17:06:45 -0600445 )
446 # take next task
447 _total_tasks = len(self.tasks)
448 for idx in range(_total_tasks):
Alexb2129542021-11-23 15:49:42 -0600449 # init time to schedule
Alex3034ba52021-11-13 17:06:45 -0600450 _task = self.tasks[idx]
451 logger_cli.info(
452 "-> Starting next task ({}/{})".format(idx+1, _total_tasks)
453 )
454 logger_cli.info("-> Updating options with: {}".format(
455 ", ".join(
456 ["{} = {}".format(k, v) for k, v in _task.items()]
457 )
458 )
459 )
460 # update options
461 options.update(_task)
Alexb2129542021-11-23 15:49:42 -0600462 _sch_time = self._get_next_scheduled_time()
463 options["scheduled_to"] = _sch_time
464 # init results table
465 self.results[_sch_time] = {
466 "input_options": options,
467 "agents": {},
Alex30380a42021-12-20 16:11:20 -0600468 "ceph": {}
Alexb2129542021-11-23 15:49:42 -0600469 }
Alex30380a42021-12-20 16:11:20 -0600470 # exit on error
Alex3034ba52021-11-13 17:06:45 -0600471 if not self._do_testrun(options):
472 return False
Alex30380a42021-12-20 16:11:20 -0600473 # Save ceph osd stats and wait cooldown
Alexb2129542021-11-23 15:49:42 -0600474 self.wait_ceph_cooldown()
Alex3034ba52021-11-13 17:06:45 -0600475 elif self.mode == "single":
476 logger_cli.info("# Running single benchmark")
477 # init time to schedule
Alexb2129542021-11-23 15:49:42 -0600478 _sch_time = self._get_next_scheduled_time()
479 options["scheduled_to"] = _sch_time
480 # init results table
481 self.results[_sch_time] = {
482 "input_options": options,
483 "agents": {},
Alex30380a42021-12-20 16:11:20 -0600484 "ceph": {}
Alexb2129542021-11-23 15:49:42 -0600485 }
Alex3034ba52021-11-13 17:06:45 -0600486 if not self._do_testrun(options):
487 return False
Alex30380a42021-12-20 16:11:20 -0600488 # Save ceph osd stats
Alex3034ba52021-11-13 17:06:45 -0600489 else:
490 logger_cli.error("ERROR: Unknown mode '{}'".format(self.mode))
491 return False
492
493 # Normal exit
494 logger_cli.info("# All benchmark tasks done")
Alex5cace3b2021-11-10 16:40:37 -0600495 return True
496
497 def cleanup(self):
Alexb2129542021-11-23 15:49:42 -0600498 logger_cli.info("# Cleaning up")
Alex2a7657c2021-11-10 20:51:34 -0600499 self.cleanup_list.reverse()
Alex5cace3b2021-11-10 16:40:37 -0600500
Alex2a7657c2021-11-10 20:51:34 -0600501 for _res in self.cleanup_list:
502 self.master.cleanup_resource_by_name(_res[0], _res[2], ns=_res[1])
Alexbfa947c2021-11-11 18:14:28 -0600503
504 # Wait for resource to be cleaned
505 _timeout = 120
506 _total = len(self.cleanup_list)
507 logger_cli.info("-> Wait until {} resources cleaned".format(_total))
508 _p = Progress(_total)
509 while True:
510 _g = self.master.get_resource_phase_by_name
511 _l = [_g(r[0], r[2], ns=r[1]) for r in self.cleanup_list]
512 _l = [item for item in _l if item]
513 _idx = _total - len(_l)
514 if len(_l) > 0:
515 _p.write_progress(_idx)
516 else:
Alex3034ba52021-11-13 17:06:45 -0600517 _p.write_progress(_idx)
Alexbfa947c2021-11-11 18:14:28 -0600518 _p.end()
519 logger_cli.info("# Done cleaning up")
520 break
521 if _timeout > 0:
522 _timeout -= 1
523 else:
524 _p.end()
525 logger_cli.info("# Timed out waiting after 120s.")
526 break
527
Alex5cace3b2021-11-10 16:40:37 -0600528 return
529
Alex90ac1532021-12-09 11:13:14 -0600530 def collect_results(self):
Alex3034ba52021-11-13 17:06:45 -0600531 logger_cli.info("# Collecting results")
532 # query agents for results
533 _agents = self.get_agents_resultlist()
Alex3034ba52021-11-13 17:06:45 -0600534 for _agent, _l in _agents.items():
Alexb2129542021-11-23 15:49:42 -0600535 # Check if we already have this locally
536 for _time in _l["resultlist"]:
Alex90ac1532021-12-09 11:13:14 -0600537 # There is a file already for this task/time
538 # Check if we need to load it
539 if _time not in self.results:
540 # Some older results found
541 # do not process them
542 logger_cli.info(
543 "-> Skipped old results for '{}'".format(_time)
544 )
545 continue
546 elif _agent not in self.results[_time]["agents"]:
547 # Load result add it locally
Alexb2129542021-11-23 15:49:42 -0600548 logger_cli.info(
549 "-> Getting results for '{}' from '{}'".format(
Alex90ac1532021-12-09 11:13:14 -0600550 _time,
Alexb2129542021-11-23 15:49:42 -0600551 _agent
552 )
Alex3034ba52021-11-13 17:06:45 -0600553 )
Alexb2129542021-11-23 15:49:42 -0600554 _r = self.get_result_from_agent(_agent, _time)
Alex90ac1532021-12-09 11:13:14 -0600555 self.results[_time]["agents"][_agent] = _r[_time]
556 else:
557 # Should never happen, actually
558 logger_cli.info(
559 "-> Skipped loaded result for '{}' from '{}'".format(
560 _time,
561 _agent
562 )
563 )
Alex3034ba52021-11-13 17:06:45 -0600564
Alex90ac1532021-12-09 11:13:14 -0600565 def _get_dump_filename(self, _time):
566 _r = self.results[_time]
567 _dirname = _r["input_options"]["name"]
Alexb2129542021-11-23 15:49:42 -0600568 _filename = "-".join([
Alex90ac1532021-12-09 11:13:14 -0600569 _reformat_timestr(_time),
570 "{:02}".format(len(_r["agents"])),
571 _r["input_options"]["readwrite"],
572 _r["input_options"]["bs"],
573 str(_r["input_options"]["iodepth"]),
Alexb2129542021-11-23 15:49:42 -0600574 ]) + ".json"
575 return os.path.join(
576 self.results_dump_path,
577 _dirname,
578 _filename
579 )
580
Alex90ac1532021-12-09 11:13:14 -0600581 def preload_results(self):
582 logger_cli.info(
583 "# Preloading results for '{}'".format(self.bench_name)
584 )
585 # get all dirs in folder
586 _p = self.results_dump_path
587 if not os.path.isdir(_p):
588 logger_cli.warn(
589 "WARNING: Dump path is not a folder '{}'".format(_p)
590 )
591 return
592 for path, dirs, files in os.walk(_p):
593 if path == os.path.join(_p, self.bench_name):
594 logger_cli.info("-> Folder found '{}'".format(path))
595 for _fname in files:
596 logger_cli.debug("... processing '{}'".format(_fname))
597 _ext = _fname.split('.')[-1]
598 if _ext != "json":
599 logger_cli.info(
600 "-> Extension invalid '{}', "
601 "'json' is expected".format(_ext)
602 )
603 continue
604 # get time from filename
605 # Ugly, but works
606 _t = _fname.split('-')[0]
607 _str_time = _t[:14] + "+" + _t[14:]
608 _t = datetime.strptime(_str_time, _file_datetime_fmt)
609 _time = _t.strftime(_datetime_fmt)
610 self.results[_time] = self.load_dumped_result(
611 os.path.join(path, _fname)
612 )
613 logger_cli.info(
614 "-> Loaded '{}' as '{}'".format(
615 _fname,
616 _time
617 )
618 )
619
Alexb2129542021-11-23 15:49:42 -0600620 def dump_result(self, filename, data):
621 # Function dumps all available results as jsons to the given path
Alex3034ba52021-11-13 17:06:45 -0600622 # overwriting if needed
Alexb2129542021-11-23 15:49:42 -0600623 _folder, _file = os.path.split(filename)
624 # Do dump
625 if not os.path.exists(_folder):
626 os.mkdir(_folder)
627 logger_cli.info("-> Created folder '{}'".format(_folder))
628 # Dump agent data for this test run
629 write_str_to_file(filename, json.dumps(data, indent=2))
630 logger_cli.info("-> Dumped '{}'".format(filename))
Alex3034ba52021-11-13 17:06:45 -0600631 return
632
Alexb2129542021-11-23 15:49:42 -0600633 def load_dumped_result(self, filename):
634 try:
635 with open(filename, "rt+") as f:
636 return json.loads(f.read())
637 except FileNotFoundError as e:
638 logger_cli.error(
639 "ERROR: {}".format(e)
640 )
641 except TypeError as e:
642 logger_cli.error(
643 "ERROR: Invalid file ({}): {}".format(filename, e)
644 )
645 except json.decoder.JSONDecodeError as e:
646 logger_cli.error(
647 "ERROR: Failed to decode json ({}): {}".format(filename, e)
648 )
649 return None
650
651 def _lookup_storage_class_id_by_name(self, storage_class_name):
652 # Assume that self had proper data
653 for _pool in self.ceph_df["pools"]:
654 if storage_class_name == _pool["name"]:
655 return _pool["id"]
656 return None
657
658 def calculate_totals(self):
659 # Calculate totals for Read and Write
660 for _time, data in self.results.items():
661 if "totals" not in data:
662 data["totals"] = {}
663 else:
664 continue
665 _totals = data["totals"]
666 _r_bw = 0
667 _r_avglat = []
668 _r_iops = 0
669 _w_bw = 0
670 _w_avglat = []
671 _w_iops = 0
672 for _a, _d in data["agents"].items():
673 # Hardcoded number of jobs param :(
Alex90ac1532021-12-09 11:13:14 -0600674 _j = _d["jobs"][0]
Alexb2129542021-11-23 15:49:42 -0600675 _r_bw += _j["read"]["bw_bytes"]
676 _r_avglat += [_j["read"]["lat_ns"]["mean"]]
677 _r_iops += _j["read"]["iops"]
678 _w_bw += _j["write"]["bw_bytes"]
679 _w_avglat += [_j["write"]["lat_ns"]["mean"]]
680 _w_iops += _j["write"]["iops"]
681 # Save storage class name
682 if "storage_class" not in _totals:
683 _totals["storage_class"] = \
684 self.agent_results[_a]["storage_class"]
685 # Lookup storage class id and num_pg
686 _totals["storage_class_stats"] = \
687 reporter.get_pool_stats_by_id(
688 self._lookup_storage_class_id_by_name(
689 self.agent_results[_a]["storage_class"]
690 ),
691 self.ceph_pg_dump
692 )
693
694 _totals["read_bw_bytes"] = _r_bw
695 _totals["read_avg_lat_us"] = \
696 (sum(_r_avglat) / len(_r_avglat)) / 1000
697 _totals["read_iops"] = _r_iops
698 _totals["write_bw_bytes"] = _w_bw
699 _totals["write_avg_lat_us"] = \
700 (sum(_w_avglat) / len(_w_avglat)) / 1000
701 _totals["write_iops"] = _w_iops
702
Alex90ac1532021-12-09 11:13:14 -0600703 def calculate_ceph_stats(self):
704 # func to get values as lists
Alex30380a42021-12-20 16:11:20 -0600705 def _get_max_value(key, stats):
706 _max_time = 0
707 _value = 0
708 for _k, _v in stats.items():
709 if key in _v and _value < _v[key]:
710 _max_time = _k
711 _value = _v[key]
712 return _max_time, _value
Alex90ac1532021-12-09 11:13:14 -0600713
714 def _perc(n, m):
715 if not n:
716 return 0
717 elif not m:
718 return 0
719 else:
Alex30380a42021-12-20 16:11:20 -0600720 return "{:.0f}%".format((n / m) * 100)
721
722 def _axis_vals(val):
723 return [
724 val, int(val*1.1), int(val*0.75), int(val*0.50), int(val*0.15)
725 ]
Alex90ac1532021-12-09 11:13:14 -0600726
727 _stats = {}
728 for _time, data in self.results.items():
729 if "ceph" not in data:
730 logger_cli.warning(
731 "WARNING: Ceph stats raw data not found in results"
732 )
733 continue
734 if "ceph_stats" not in data:
735 data["ceph_stats"] = {}
736 else:
737 continue
738 # Copy pool stats data
739 for _e, _d in data["ceph"].items():
740 _stats[_e] = _d["pgmap"]
741 # Maximums
Alex30380a42021-12-20 16:11:20 -0600742 mrb_t, mrb = _get_max_value("read_bytes_sec", _stats)
743 mwb_t, mwb = _get_max_value("write_bytes_sec", _stats)
744 mri_t, mri = _get_max_value("read_op_per_sec", _stats)
745 mwi_t, mwi = _get_max_value("write_op_per_sec", _stats)
Alex90ac1532021-12-09 11:13:14 -0600746 # Replace ceph with shorter data
747 data["ceph"] = {
Alex30380a42021-12-20 16:11:20 -0600748 "max_rbl": _axis_vals(mrb),
749 "max_rbl_time": mrb_t,
750 "max_wbl": _axis_vals(mwb),
751 "max_wbl_time": mwb_t,
752 "max_ril": _axis_vals(mri),
753 "max_ril_time": mri_t,
754 "max_wil": _axis_vals(mwi),
755 "max_wil_time": mwi_t,
Alex90ac1532021-12-09 11:13:14 -0600756 "stats": _stats
757 }
758 # Calculate %% values for barchart
759 for _e, _d in data["ceph"]["stats"].items():
760 _d["read_bytes_sec_perc"] = \
Alex30380a42021-12-20 16:11:20 -0600761 _perc(_d.get("read_bytes_sec", 0), mrb)
Alex90ac1532021-12-09 11:13:14 -0600762 _d["write_bytes_sec_perc"] = \
Alex30380a42021-12-20 16:11:20 -0600763 _perc(_d.get("write_bytes_sec", 0), mwb)
Alex90ac1532021-12-09 11:13:14 -0600764 _d["read_op_per_sec_perc"] = \
Alex30380a42021-12-20 16:11:20 -0600765 _perc(_d.get("read_op_per_sec", 0), mri)
Alex90ac1532021-12-09 11:13:14 -0600766 _d["write_op_per_sec_perc"] = \
Alex30380a42021-12-20 16:11:20 -0600767 _perc(_d.get("write_op_per_sec", 0), mwi)
768 return
769
770 def osd_df_compare(self, _time):
771 def _get_osd(osd_id, nodes):
772 for osd in nodes:
773 if osd["id"] == osd_id:
774 return osd
775 return None
776
777 logger_cli.info("# Comparing OSD stats")
778 _osd = {}
779 if _time not in self.results:
780 logger_cli.warning("WARNING: {} not found in results. Check data")
781 return
782 data = self.results[_time]
783 # Save summary
784 data["osd_summary"] = {}
785 data["osd_summary"]["before"] = data["osd_df_before"]["summary"]
786 data["osd_summary"]["after"] = data["osd_df_after"]["summary"]
787 data["osd_summary"]["active"] = {
788 "status": "",
789 "device_class": "",
790 "pgs": "",
791 "kb_used": 0,
792 "kb_used_data": 0,
793 "kb_used_omap": 0,
794 "kb_used_meta": 0,
795 "utilization": 0,
796 "var_down": 0,
797 "var_up": 0
798 }
799 # Compare OSD counts
800 osds_before = len(data["osd_df_before"]["nodes"])
801 osds_after = len(data["osd_df_after"]["nodes"])
802 if osds_before != osds_after:
803 logger_cli.warning(
804 "WARNING: Before/After bench OSD "
805 "count mismatch for '{}'".format(_time)
806 )
807 # iterate osds from before
808 _pgs = 0
809 _classes = set()
810 _nodes_up = 0
811 for idx in range(osds_before):
812 _osd_b = data["osd_df_before"]["nodes"][idx]
813 # search for the same osd in after
814 _osd_a = _get_osd(_osd_b["id"], data["osd_df_after"]["nodes"])
815 # Save data to the new place
816 _osd[_osd_b["name"]] = {}
817 _osd[_osd_b["name"]]["before"] = _osd_b
818 if not _osd_a:
819 # If this happen, Ceph cluster is actually broken
820 logger_cli.warning(
821 "WARNING: Wow! {} dissapered".format(_osd_b["name"])
822 )
823 _osd[_osd_b["name"]]["after"] = {}
824 else:
825 _osd[_osd_b["name"]]["after"] = _osd_a
826 _osd[_osd_b["name"]]["percent"] = {}
827 # Calculate summary using "after" data
828 _pgs += _osd_a["pgs"]
829 _classes.update([_osd_a["device_class"]])
830 if _osd_a["status"] == "up":
831 _nodes_up += 1
832 # compare
833 _keys_b = list(_osd_b.keys())
834 _keys_a = list(_osd_a.keys())
835 _nodes_up
836 # To be safe, detect if some keys are different
837 # ...and log it.
838 _diff = set(_keys_b).symmetric_difference(_keys_a)
839 if len(_diff) > 0:
840 # This should never happen, actually
841 logger_cli.warning(
842 "WARNING: Before/after keys mismatch "
843 "for OSD node {}: {}".format(idx, ", ".join(_diff))
844 )
845 continue
846 # Compare each key and calculate how it changed
847 for k in _keys_b:
848 if _osd_b[k] != _osd_a[k]:
849 # Announce change
850 logger_cli.debug(
851 "-> {:4}: {}, {} -> {}".format(
852 idx,
853 k,
854 _osd_b[k],
855 _osd_a[k]
856 )
857 )
858 # calculate percent
859 _change_perc = (_osd_a[k] / _osd_b[k]) * 100 - 100
860 _osd[_osd_b["name"]]["percent"][k] = _change_perc
861
862 # Increase counters
863 _p = data["osd_summary"]["active"]
864
865 if k not in _p:
866 _p[k] = 1
867 else:
868 _p[k] += 1
869 if k == "var":
870 if _change_perc > 0:
871 _p["var_up"] += 1
872 elif _change_perc < 0:
873 _p["var_down"] += 1
874 # Save sorted data
875 data["osds"] = _osd
876 logger_cli.info("-> Removing redundand osd before/after data")
877 data.pop("osd_df_before")
878 data.pop("osd_df_after")
879 # Save summary
880 data["osd_summary"]["active"]["status"] = "{}".format(_nodes_up)
881 data["osd_summary"]["active"]["device_class"] = \
882 "{}".format(len(list(_classes)))
883 data["osd_summary"]["active"]["pgs"] = _pgs
Alex90ac1532021-12-09 11:13:14 -0600884 return
885
Alex5cace3b2021-11-10 16:40:37 -0600886 # Create report
Alex2a7657c2021-11-10 20:51:34 -0600887 def create_report(self, filename):
Alexb2129542021-11-23 15:49:42 -0600888 """
889 Create static html showing ceph info report
890
891 :return: none
892 """
893 logger_cli.info("### Generating report to '{}'".format(filename))
894 _report = reporter.ReportToFile(
895 reporter.HTMLCephBench(self),
896 filename
897 )
Alexb2129542021-11-23 15:49:42 -0600898 _report(
899 {
900 "results": self.results,
901 "idle_status": self.ceph_idle_status,
902 "health_detail": self.health_detail,
903 "ceph_df": self.ceph_df,
904 "ceph_pg_dump": self.ceph_pg_dump,
905 "info": self.ceph_info.ceph_info,
906 "cluster": self.ceph_info.cluster_info,
907 "ceph_version": self.ceph_info.ceph_version,
908 "nodes": self.agent_pods
909 }
910 )
911 logger_cli.info("-> Done")
Alex5cace3b2021-11-10 16:40:37 -0600912
913 return