blob: c0877db1ca605a446c73a4b3046b1f8c2e11a9d8 [file] [log] [blame]
Alex5cace3b2021-11-10 16:40:37 -06001import csv
2import os
3import json
4
Alexbdc72742021-12-23 13:26:05 -06005from copy import deepcopy
Alex3034ba52021-11-13 17:06:45 -06006from datetime import datetime, timedelta, timezone
Alex5cace3b2021-11-10 16:40:37 -06007
Alexdcb792f2021-10-04 14:24:21 -05008from cfg_checker.common import logger_cli
Alex3034ba52021-11-13 17:06:45 -06009from cfg_checker.common.decorators import retry
Alexb2129542021-11-23 15:49:42 -060010from cfg_checker.common.file_utils import write_str_to_file
Alexbfa947c2021-11-11 18:14:28 -060011from cfg_checker.helpers.console_utils import Progress
Alexb2129542021-11-23 15:49:42 -060012from cfg_checker.reports import reporter
Alexdcb792f2021-10-04 14:24:21 -050013# from cfg_checker.common.exception import InvalidReturnException
14# from cfg_checker.common.exception import ConfigException
15# from cfg_checker.common.exception import KubeException
16
17from cfg_checker.nodes import KubeNodes
Alex5cace3b2021-11-10 16:40:37 -060018from cfg_checker.agent.fio_runner import _get_seconds, _datetime_fmt
Alexdcb792f2021-10-04 14:24:21 -050019
20
Alex90ac1532021-12-09 11:13:14 -060021_file_datetime_fmt = "%m%d%Y%H%M%S%z"
22
23
Alexb2129542021-11-23 15:49:42 -060024def _reformat_timestr(_str, _chars=["/", ",", " ", ":", "+"], _tchar=""):
25 _new = ""
26 for _c in _str:
27 _new += _c if _c not in _chars else _tchar
28 return _new
29
30
31def _parse_json_output(buffer):
32 try:
33 return json.loads(buffer)
34 except TypeError as e:
35 logger_cli.error(
36 "ERROR: Status not decoded: {}\n{}".format(e, buffer)
37 )
38 except json.decoder.JSONDecodeError as e:
39 logger_cli.error(
40 "ERROR: Status not decoded: {}\n{}".format(e, buffer)
41 )
42 return {}
43
44
Alex30380a42021-12-20 16:11:20 -060045def _split_vol_size(size):
46 # I know, but it is faster then regex
47 _numbers = [48, 49, 50, 51, 52, 53, 54, 55, 56, 57]
48 _s_int = "0"
49 _s_type = ""
50 for ch in size:
51 if ord(ch) in _numbers:
52 _s_int += ch
53 else:
54 _s_type += ch
55 return int(_s_int), _s_type
56
57
Alexdcb792f2021-10-04 14:24:21 -050058class CephBench(object):
Alex5cace3b2021-11-10 16:40:37 -060059 _agent_template = "cfgagent-template.yaml"
60
61 def __init__(self, config):
Alexdcb792f2021-10-04 14:24:21 -050062 self.env_config = config
63 return
64
Alexdcb792f2021-10-04 14:24:21 -050065
66class SaltCephBench(CephBench):
67 def __init__(
68 self,
69 config
70 ):
Alex5cace3b2021-11-10 16:40:37 -060071 logger_cli.error("ERROR: Not impelented for Salt environment!")
Alexdcb792f2021-10-04 14:24:21 -050072
73 # self.master = SaltNodes(config)
Alex5cace3b2021-11-10 16:40:37 -060074 super(SaltCephBench, self).__init__(config)
Alexdcb792f2021-10-04 14:24:21 -050075 return
76
77
78class KubeCephBench(CephBench):
79 def __init__(self, config):
Alex5cace3b2021-11-10 16:40:37 -060080 self.agent_count = config.bench_agent_count
Alexdcb792f2021-10-04 14:24:21 -050081 self.master = KubeNodes(config)
82 super(KubeCephBench, self).__init__(config)
Alexb2129542021-11-23 15:49:42 -060083
Alex5cace3b2021-11-10 16:40:37 -060084 self.mode = config.bench_mode
Alexb2129542021-11-23 15:49:42 -060085 self.resource_prefix = config.resource_prefix
Alex30380a42021-12-20 16:11:20 -060086
Alex5cace3b2021-11-10 16:40:37 -060087 if config.bench_mode == "tasks":
Alex3034ba52021-11-13 17:06:45 -060088 self.taskfile = config.bench_task_file
89 self.load_tasks(self.taskfile)
Alex30380a42021-12-20 16:11:20 -060090
91 if config.bench_mode == "cleanup":
Alexb2129542021-11-23 15:49:42 -060092 self.cleanup_list = []
93 return
94
Alex90ac1532021-12-09 11:13:14 -060095 self.bench_name = config.bench_name
Alex30380a42021-12-20 16:11:20 -060096 self.results_dump_path = config.bench_results_dump_path
97 self.results = {}
98 self.agent_results = {}
99 self.cleanup_list = []
Alexb2129542021-11-23 15:49:42 -0600100 self.agent_pods = []
Alex30380a42021-12-20 16:11:20 -0600101
102 if config.bench_mode == "report":
103 self.results = {}
104 return
105
106 self.storage_class = config.bench_storage_class
Alexb2129542021-11-23 15:49:42 -0600107 self.services = []
108 # By default,
109 # 30 seconds should be enough to send tasks to 3-5 agents
110 self.scheduled_delay = 30
Alex5cace3b2021-11-10 16:40:37 -0600111
Alexb2129542021-11-23 15:49:42 -0600112 def set_ceph_info_class(self, ceph_info):
113 self.ceph_info = ceph_info
Alex2a7657c2021-11-10 20:51:34 -0600114
Alex5cace3b2021-11-10 16:40:37 -0600115 def load_tasks(self, taskfile):
116 # Load csv file
117 logger_cli.info("-> Loading taskfile '{}'".format(taskfile))
118 self.tasks = []
119 with open(taskfile) as f:
120 _reader = csv.reader(f, delimiter=',')
121 # load packages
122 for row in _reader:
123 self.tasks.append({
124 "readwrite": row[0],
125 "rwmixread": row[1],
126 "bs": row[2],
127 "iodepth": row[3],
128 "size": row[4]
129 })
Alexb2129542021-11-23 15:49:42 -0600130 logger_cli.info("-> Loaded {} tasks".format(len(self.tasks)))
Alex5cace3b2021-11-10 16:40:37 -0600131
Alex2a7657c2021-11-10 20:51:34 -0600132 def add_for_deletion(self, obj, typ):
Alexb2129542021-11-23 15:49:42 -0600133 self.cleanup_list.append(
134 [
135 typ,
136 obj.metadata.namespace,
137 obj.metadata.name
138 ]
139 )
140 return
141
142 def prepare_cleanup(self):
143 # Assume number of resources not given
144 # list all svc, pod, pvc, pv and identify 'cfgagent-xx ones
145 _types = ["pv", "pvc", "pod", "svc"]
146 _prefix = self.resource_prefix
147 for _typ in _types:
148 _list = self.master.list_resource_names_by_type_and_ns(_typ)
149 for ns, name in _list:
150 if name.startswith(_prefix):
151 if ns:
152 _msg = "{} {}/{}".format(_typ, ns, name)
153 else:
154 _msg = "{} {}".format(_typ, name)
155 logger_cli.info("-> Found {}".format(_msg))
156 self.cleanup_list.append([_typ, ns, name])
Alex2a7657c2021-11-10 20:51:34 -0600157 return
158
Alex5cace3b2021-11-10 16:40:37 -0600159 def prepare_agents(self, options):
160 logger_cli.info("# Preparing {} agents".format(self.agent_count))
Alex30380a42021-12-20 16:11:20 -0600161 # Increase volume size a bit, so datafile fits
162 _quanitizer = 1.3
163 _v_size, _vol_size_units = _split_vol_size(options['size'])
164 _v_size = round(_v_size * _quanitizer)
165 _vol_size = str(_v_size) + _vol_size_units + "i"
166 logger_cli.info(
167 "-> Testfile size: {0}, Volume size: {1} ({0}*{2})".format(
168 options['size'],
169 _vol_size,
170 _quanitizer
171 )
172 )
173 # Start preparing
Alex5cace3b2021-11-10 16:40:37 -0600174 for idx in range(self.agent_count):
175 # create pvc/pv and pod
176 logger_cli.info("-> creating agent '{:02}'".format(idx))
Alex90ac1532021-12-09 11:13:14 -0600177 # _agent, _pv, _pvc = self.master.prepare_benchmark_agent(
178 _agent, _pvc = self.master.prepare_benchmark_agent(
Alex5cace3b2021-11-10 16:40:37 -0600179 idx,
180 os.path.split(options["filename"])[0],
181 self.storage_class,
Alex30380a42021-12-20 16:11:20 -0600182 _vol_size,
Alex5cace3b2021-11-10 16:40:37 -0600183 self._agent_template
184 )
Alex2a7657c2021-11-10 20:51:34 -0600185 # save it to lists
Alex5cace3b2021-11-10 16:40:37 -0600186 self.agent_pods.append(_agent)
Alex90ac1532021-12-09 11:13:14 -0600187 # self.add_for_deletion(_pv, "pv")
Alex2a7657c2021-11-10 20:51:34 -0600188 self.add_for_deletion(_pvc, "pvc")
189 self.add_for_deletion(_agent, "pod")
190
Alex5cace3b2021-11-10 16:40:37 -0600191 # expose it
192 _svc = self.master.expose_benchmark_agent(_agent)
Alex2a7657c2021-11-10 20:51:34 -0600193 self.add_for_deletion(_svc, "svc")
Alex5cace3b2021-11-10 16:40:37 -0600194 # Save service
195 self.services.append(_svc)
Alex3034ba52021-11-13 17:06:45 -0600196 # prepopulate results
Alexb2129542021-11-23 15:49:42 -0600197 self.agent_results[_agent.metadata.name] = {}
198 self.agent_results[_agent.metadata.name]["url"] = \
Alex5cace3b2021-11-10 16:40:37 -0600199 "http://{}:{}/api/".format(
Alex3034ba52021-11-13 17:06:45 -0600200 _svc.spec.cluster_ip,
Alex5cace3b2021-11-10 16:40:37 -0600201 8765
202 )
Alexb2129542021-11-23 15:49:42 -0600203 self.agent_results[_agent.metadata.name]["storage_class"] = \
Alex3034ba52021-11-13 17:06:45 -0600204 self.storage_class
Alexb2129542021-11-23 15:49:42 -0600205 self.agent_results[_agent.metadata.name]["volume_size"] = \
Alex3034ba52021-11-13 17:06:45 -0600206 options['size']
207
Alex5cace3b2021-11-10 16:40:37 -0600208 logger_cli.info("-> Done creating agents")
Alexb2129542021-11-23 15:49:42 -0600209 # TODO: Update after implementing pooled task sending
Alex90ac1532021-12-09 11:13:14 -0600210 self.scheduled_delay = self.agent_count * 10
Alexb2129542021-11-23 15:49:42 -0600211 logger_cli.info(
212 "-> Schedule delay set to {} sec".format(self.scheduled_delay)
213 )
Alex5cace3b2021-11-10 16:40:37 -0600214 return
215
216 def _poke_agent(self, url, body, action="GET"):
217 _datafile = "/tmp/data"
218 _data = [
Alexbfa947c2021-11-11 18:14:28 -0600219 "-d",
220 "@" + _datafile
Alex5cace3b2021-11-10 16:40:37 -0600221 ]
222 _cmd = [
223 "curl",
224 "-s",
225 "-H",
226 "'Content-Type: application/json'",
227 "-X",
228 action,
229 url
230 ]
231 if body:
232 _cmd += _data
233 _ret = self.master.prepare_json_in_pod(
234 self.agent_pods[0].metadata.name,
235 self.master._namespace,
236 body,
237 _datafile
238 )
239 _ret = self.master.exec_cmd_on_target_pod(
240 self.agent_pods[0].metadata.name,
241 self.master._namespace,
242 " ".join(_cmd)
243 )
Alexb2129542021-11-23 15:49:42 -0600244 return _parse_json_output(_ret)
Alex5cace3b2021-11-10 16:40:37 -0600245
Alex3034ba52021-11-13 17:06:45 -0600246 def _ensure_agents_ready(self):
Alex5cace3b2021-11-10 16:40:37 -0600247 # make sure agents idle
Alex3034ba52021-11-13 17:06:45 -0600248 _status_set = []
249 _ready_set = []
250 for _agent, _d in self.get_agents_status().items():
251 # obviously, there should be some answer
252 if _d is None:
Alex5cace3b2021-11-10 16:40:37 -0600253 logger_cli.error("ERROR: Agent status not available")
254 return False
Alex3034ba52021-11-13 17:06:45 -0600255 # status should be idle or finished
256 if _d['status'] not in ["idle", "finished"]:
257 logger_cli.error(
258 "Agent status invalid {}:{}".format(_agent, _d['status'])
259 )
260 _status_set += [False]
Alex5cace3b2021-11-10 16:40:37 -0600261 else:
Alex3034ba52021-11-13 17:06:45 -0600262 # Good agent
263 _status_set += [True]
264 # agent's fio shell should be in 'ready'
265 if not _d["healthcheck"]["ready"]:
266 logger_cli.error("Agent is not ready {}".format(_agent))
267 _ready_set += [False]
Alex5cace3b2021-11-10 16:40:37 -0600268 else:
Alex3034ba52021-11-13 17:06:45 -0600269 # 'fio' shell for agent is ready
270 _ready_set += [True]
271 # all agent's statuses should be True
272 # and all 'fio' shell modules should be 'ready'
273 if not any(_status_set) or not any(_ready_set):
274 # At least one is not ready and it was already logged above
275 return False
276 else:
277 # All is good
278 return True
279
280 def get_agents_status(self):
281 _status = {}
Alexb2129542021-11-23 15:49:42 -0600282 _results = self.master.exec_on_labeled_pods_and_ns(
283 "app=cfgagent",
284 "curl -s http://localhost:8765/api/fio"
285 )
286 for _agent, _result in _results.items():
287 _j = _parse_json_output(_result)
288 _status[_agent] = _j
Alex3034ba52021-11-13 17:06:45 -0600289 return _status
290
Alexb2129542021-11-23 15:49:42 -0600291 @retry(Exception, initial_wait=5)
Alex3034ba52021-11-13 17:06:45 -0600292 def get_agents_resultlist(self):
293 _t = {"module": "fio", "action": "get_resultlist"}
294 _status = {}
Alexb2129542021-11-23 15:49:42 -0600295 for _agent, _d in self.agent_results.items():
Alex3034ba52021-11-13 17:06:45 -0600296 _status[_agent] = self._poke_agent(_d["url"], _t, action="POST")
297 return _status
298
Alexb2129542021-11-23 15:49:42 -0600299 @retry(Exception, initial_wait=5)
Alex3034ba52021-11-13 17:06:45 -0600300 def get_result_from_agent(self, agent, time):
301 _t = {
302 "module": "fio",
303 "action": "get_result",
304 "options": {
305 "time": time
306 }
307 }
Alexb2129542021-11-23 15:49:42 -0600308 return self._poke_agent(
309 self.agent_results[agent]["url"],
310 _t,
311 action="POST"
312 )
Alex3034ba52021-11-13 17:06:45 -0600313
314 def _get_next_scheduled_time(self):
315 _now = datetime.now(timezone.utc)
316 logger_cli.info("-> time is '{}'".format(_now.strftime(_datetime_fmt)))
Alex90ac1532021-12-09 11:13:14 -0600317 self.next_scheduled_time = _now + timedelta(
318 seconds=self.scheduled_delay
319 )
320 _str_time = self.next_scheduled_time.strftime(_datetime_fmt)
Alex3034ba52021-11-13 17:06:45 -0600321 logger_cli.info(
322 "-> next benchmark scheduled to '{}'".format(_str_time)
323 )
324 return _str_time
325
326 def _send_scheduled_task(self, options):
327 _task = {
328 "module": "fio",
329 "action": "do_scheduledrun",
330 "options": options
331 }
Alexb2129542021-11-23 15:49:42 -0600332 for _agent, _d in self.agent_results.items():
Alex3034ba52021-11-13 17:06:45 -0600333 logger_cli.info(
334 "-> sending task to '{}:{}'".format(_agent, _d["url"])
335 )
336 _ret = self._poke_agent(_d["url"], _task, action="POST")
337 if 'error' in _ret:
338 logger_cli.error(
339 "ERROR: Agent returned: '{}'".format(_ret['error'])
340 )
341 return False
342 # No errors detected
343 return True
344
345 def track_benchmark(self, options):
346 _runtime = _get_seconds(options["runtime"])
347 _ramptime = _get_seconds(options["ramp_time"])
348 # Sum up all timings that we must wait and double it
349 _timeout = (self.scheduled_delay + _runtime + _ramptime) * 2
Alex30380a42021-12-20 16:11:20 -0600350 # We should have no more than 65 measurements
351 _stats_delay = round((_runtime + _ramptime) / 65)
Alex90ac1532021-12-09 11:13:14 -0600352 _start = self.next_scheduled_time
Alex3034ba52021-11-13 17:06:45 -0600353 _end = datetime.now(timezone.utc) + timedelta(seconds=_timeout)
354 while True:
355 # Print status
Alex3034ba52021-11-13 17:06:45 -0600356 _sts = self.get_agents_status()
357 diff = (_end - datetime.now(timezone.utc)).total_seconds()
358 logger_cli.info("-> {:.2f}s left. Agent status:".format(diff))
359 for _agent, _status in _sts.items():
360 logger_cli.info(
361 "\t{}: {} ({}%)".format(
362 _agent,
363 _status["status"],
364 _status["progress"]
365 )
366 )
Alex90ac1532021-12-09 11:13:14 -0600367 # Get Ceph status if _start time passed
368 _elapsed = (datetime.now(timezone.utc) - _start).total_seconds()
Alex30380a42021-12-20 16:11:20 -0600369 if _elapsed > _stats_delay:
Alex90ac1532021-12-09 11:13:14 -0600370 logger_cli.info("-> {:.2f}s elapsed".format(_elapsed))
Alex30380a42021-12-20 16:11:20 -0600371 _sec = "{:0.1f}".format(_elapsed)
372 self.results[options["scheduled_to"]]["ceph"][_sec] = \
Alex90ac1532021-12-09 11:13:14 -0600373 self.ceph_info.get_cluster_status()
374 # Check if agents finished
Alexb2129542021-11-23 15:49:42 -0600375 finished = [True for _s in _sts.values()
Alex3034ba52021-11-13 17:06:45 -0600376 if _s["status"] == 'finished']
377 _fcnt = len(finished)
378 _tcnt = len(_sts)
379 if _fcnt < _tcnt:
380 logger_cli.info("-> {}/{} finished".format(_fcnt, _tcnt))
381 else:
382 logger_cli.info("-> All agents finished run")
383 return True
384 # recalc how much is left
385 diff = (_end - datetime.now(timezone.utc)).total_seconds()
386 # In case end_datetime was in past to begin with
387 if diff < 0:
388 logger_cli.info("-> Timed out waiting for agents to finish")
389 return False
Alex3034ba52021-11-13 17:06:45 -0600390
391 def _do_testrun(self, options):
Alex30380a42021-12-20 16:11:20 -0600392 self.results[options["scheduled_to"]]["osd_df_before"] = \
393 self.ceph_info.get_ceph_osd_df()
Alex3034ba52021-11-13 17:06:45 -0600394 # send single to agent
395 if not self._send_scheduled_task(options):
396 return False
397 # Track this benchmark progress
398 if not self.track_benchmark(options):
399 return False
400 else:
Alex90ac1532021-12-09 11:13:14 -0600401 logger_cli.info("-> Finished testrun. Collecting results...")
Alex30380a42021-12-20 16:11:20 -0600402 # get ceph osd stats
403 self.results[options["scheduled_to"]]["osd_df_after"] = \
404 self.ceph_info.get_ceph_osd_df()
Alex3034ba52021-11-13 17:06:45 -0600405 # Get results for each agent
Alex90ac1532021-12-09 11:13:14 -0600406 self.collect_results()
407 logger_cli.info("-> Calculating totals and averages")
408 self.calculate_totals()
409 self.calculate_ceph_stats()
Alex30380a42021-12-20 16:11:20 -0600410 self.osd_df_compare(options["scheduled_to"])
Alex90ac1532021-12-09 11:13:14 -0600411 logger_cli.info("-> Dumping results")
412 for _time, _d in self.results.items():
413 self.dump_result(
414 self._get_dump_filename(_time),
415 _d
416 )
Alex3034ba52021-11-13 17:06:45 -0600417 return True
418
Alexb2129542021-11-23 15:49:42 -0600419 def wait_ceph_cooldown(self):
Alex3034ba52021-11-13 17:06:45 -0600420 # TODO: Query Ceph ince a 20 sec to make sure its load dropped
421
Alexb2129542021-11-23 15:49:42 -0600422 # get ceph idle status
423 self.ceph_idle_status = self.ceph_info.get_cluster_status()
424 self.health_detail = self.ceph_info.get_health_detail()
425 self.ceph_df = self.ceph_info.get_ceph_df()
426 self.ceph_pg_dump = self.ceph_info.get_ceph_pg_dump()
Alex3034ba52021-11-13 17:06:45 -0600427 return
428
429 def run_benchmark(self, options):
430 logger_cli.info("# Starting '{}' benchmark".format(self.mode))
431 # Check agent readyness
432 logger_cli.info("# Checking agents")
433 if not self._ensure_agents_ready():
Alex5cace3b2021-11-10 16:40:37 -0600434 return False
435
436 # Make sure that Ceph is at low load
437 # TODO: Ceph status check
Alexb2129542021-11-23 15:49:42 -0600438 # self._wait_ceph_cooldown()
439
Alex5cace3b2021-11-10 16:40:37 -0600440 # Do benchmark according to mode
441 if self.mode == "tasks":
Alex5cace3b2021-11-10 16:40:37 -0600442 logger_cli.info(
Alex3034ba52021-11-13 17:06:45 -0600443 "# Running benchmark with tasks from '{}'".format(
444 self.taskfile
Alex5cace3b2021-11-10 16:40:37 -0600445 )
Alex3034ba52021-11-13 17:06:45 -0600446 )
447 # take next task
448 _total_tasks = len(self.tasks)
449 for idx in range(_total_tasks):
Alexb2129542021-11-23 15:49:42 -0600450 # init time to schedule
Alex3034ba52021-11-13 17:06:45 -0600451 _task = self.tasks[idx]
Alexbdc72742021-12-23 13:26:05 -0600452 _r = self.results
Alex3034ba52021-11-13 17:06:45 -0600453 logger_cli.info(
454 "-> Starting next task ({}/{})".format(idx+1, _total_tasks)
455 )
456 logger_cli.info("-> Updating options with: {}".format(
457 ", ".join(
458 ["{} = {}".format(k, v) for k, v in _task.items()]
459 )
460 )
461 )
462 # update options
463 options.update(_task)
Alexbdc72742021-12-23 13:26:05 -0600464 # Check if such result already exists
465 o = "input_options"
466 _existing = filter(
467 lambda t:
468 _r[t]["id"] == idx and
469 _r[t]["mode"] == "tasks" and
470 _r[t][o]["readwrite"] == options["readwrite"] and
471 _r[t][o]["rwmixread"] == options["rwmixread"] and
472 _r[t][o]["bs"] == options["bs"] and
473 _r[t][o]["iodepth"] == options["iodepth"] and
474 _r[t][o]["size"] == options["size"],
475 _r
476 )
477 if len(list(_existing)) > 0:
478 logger_cli.info(
479 "-> Skipped already performed task from {}: "
480 "line {}, {}({}), {}, {}, {}".format(
481 self.taskfile,
482 idx,
483 options["readwrite"],
484 options["rwmixread"],
485 options["bs"],
486 options["iodepth"],
487 options["size"]
488 )
489 )
490 continue
Alexb2129542021-11-23 15:49:42 -0600491 _sch_time = self._get_next_scheduled_time()
492 options["scheduled_to"] = _sch_time
493 # init results table
Alexbdc72742021-12-23 13:26:05 -0600494 _r[_sch_time] = {
495 "id": idx,
496 "mode": self.mode,
497 "input_options": deepcopy(options),
Alexb2129542021-11-23 15:49:42 -0600498 "agents": {},
Alex30380a42021-12-20 16:11:20 -0600499 "ceph": {}
Alexb2129542021-11-23 15:49:42 -0600500 }
Alex30380a42021-12-20 16:11:20 -0600501 # exit on error
Alex3034ba52021-11-13 17:06:45 -0600502 if not self._do_testrun(options):
503 return False
Alex30380a42021-12-20 16:11:20 -0600504 # Save ceph osd stats and wait cooldown
Alexb2129542021-11-23 15:49:42 -0600505 self.wait_ceph_cooldown()
Alex3034ba52021-11-13 17:06:45 -0600506 elif self.mode == "single":
507 logger_cli.info("# Running single benchmark")
508 # init time to schedule
Alexb2129542021-11-23 15:49:42 -0600509 _sch_time = self._get_next_scheduled_time()
510 options["scheduled_to"] = _sch_time
511 # init results table
512 self.results[_sch_time] = {
Alexbdc72742021-12-23 13:26:05 -0600513 "id": "{:2}".format(0),
Alexb2129542021-11-23 15:49:42 -0600514 "input_options": options,
515 "agents": {},
Alex30380a42021-12-20 16:11:20 -0600516 "ceph": {}
Alexb2129542021-11-23 15:49:42 -0600517 }
Alex3034ba52021-11-13 17:06:45 -0600518 if not self._do_testrun(options):
519 return False
Alex30380a42021-12-20 16:11:20 -0600520 # Save ceph osd stats
Alex3034ba52021-11-13 17:06:45 -0600521 else:
522 logger_cli.error("ERROR: Unknown mode '{}'".format(self.mode))
523 return False
524
525 # Normal exit
526 logger_cli.info("# All benchmark tasks done")
Alex5cace3b2021-11-10 16:40:37 -0600527 return True
528
529 def cleanup(self):
Alexb2129542021-11-23 15:49:42 -0600530 logger_cli.info("# Cleaning up")
Alex2a7657c2021-11-10 20:51:34 -0600531 self.cleanup_list.reverse()
Alex5cace3b2021-11-10 16:40:37 -0600532
Alex2a7657c2021-11-10 20:51:34 -0600533 for _res in self.cleanup_list:
534 self.master.cleanup_resource_by_name(_res[0], _res[2], ns=_res[1])
Alexbfa947c2021-11-11 18:14:28 -0600535
536 # Wait for resource to be cleaned
537 _timeout = 120
538 _total = len(self.cleanup_list)
539 logger_cli.info("-> Wait until {} resources cleaned".format(_total))
540 _p = Progress(_total)
541 while True:
542 _g = self.master.get_resource_phase_by_name
543 _l = [_g(r[0], r[2], ns=r[1]) for r in self.cleanup_list]
544 _l = [item for item in _l if item]
545 _idx = _total - len(_l)
546 if len(_l) > 0:
547 _p.write_progress(_idx)
548 else:
Alex3034ba52021-11-13 17:06:45 -0600549 _p.write_progress(_idx)
Alexbfa947c2021-11-11 18:14:28 -0600550 _p.end()
551 logger_cli.info("# Done cleaning up")
552 break
553 if _timeout > 0:
554 _timeout -= 1
555 else:
556 _p.end()
557 logger_cli.info("# Timed out waiting after 120s.")
558 break
559
Alex5cace3b2021-11-10 16:40:37 -0600560 return
561
Alex90ac1532021-12-09 11:13:14 -0600562 def collect_results(self):
Alex3034ba52021-11-13 17:06:45 -0600563 logger_cli.info("# Collecting results")
564 # query agents for results
565 _agents = self.get_agents_resultlist()
Alex3034ba52021-11-13 17:06:45 -0600566 for _agent, _l in _agents.items():
Alexb2129542021-11-23 15:49:42 -0600567 # Check if we already have this locally
568 for _time in _l["resultlist"]:
Alex90ac1532021-12-09 11:13:14 -0600569 # There is a file already for this task/time
570 # Check if we need to load it
571 if _time not in self.results:
572 # Some older results found
573 # do not process them
574 logger_cli.info(
575 "-> Skipped old results for '{}'".format(_time)
576 )
577 continue
578 elif _agent not in self.results[_time]["agents"]:
579 # Load result add it locally
Alexb2129542021-11-23 15:49:42 -0600580 logger_cli.info(
581 "-> Getting results for '{}' from '{}'".format(
Alex90ac1532021-12-09 11:13:14 -0600582 _time,
Alexb2129542021-11-23 15:49:42 -0600583 _agent
584 )
Alex3034ba52021-11-13 17:06:45 -0600585 )
Alexb2129542021-11-23 15:49:42 -0600586 _r = self.get_result_from_agent(_agent, _time)
Alex90ac1532021-12-09 11:13:14 -0600587 self.results[_time]["agents"][_agent] = _r[_time]
588 else:
589 # Should never happen, actually
590 logger_cli.info(
591 "-> Skipped loaded result for '{}' from '{}'".format(
592 _time,
593 _agent
594 )
595 )
Alex3034ba52021-11-13 17:06:45 -0600596
Alex90ac1532021-12-09 11:13:14 -0600597 def _get_dump_filename(self, _time):
598 _r = self.results[_time]
599 _dirname = _r["input_options"]["name"]
Alexb2129542021-11-23 15:49:42 -0600600 _filename = "-".join([
Alex90ac1532021-12-09 11:13:14 -0600601 _reformat_timestr(_time),
602 "{:02}".format(len(_r["agents"])),
603 _r["input_options"]["readwrite"],
604 _r["input_options"]["bs"],
605 str(_r["input_options"]["iodepth"]),
Alexb2129542021-11-23 15:49:42 -0600606 ]) + ".json"
607 return os.path.join(
608 self.results_dump_path,
609 _dirname,
610 _filename
611 )
612
Alex90ac1532021-12-09 11:13:14 -0600613 def preload_results(self):
614 logger_cli.info(
615 "# Preloading results for '{}'".format(self.bench_name)
616 )
617 # get all dirs in folder
618 _p = self.results_dump_path
619 if not os.path.isdir(_p):
620 logger_cli.warn(
621 "WARNING: Dump path is not a folder '{}'".format(_p)
622 )
623 return
624 for path, dirs, files in os.walk(_p):
625 if path == os.path.join(_p, self.bench_name):
626 logger_cli.info("-> Folder found '{}'".format(path))
627 for _fname in files:
628 logger_cli.debug("... processing '{}'".format(_fname))
629 _ext = _fname.split('.')[-1]
630 if _ext != "json":
631 logger_cli.info(
632 "-> Extension invalid '{}', "
633 "'json' is expected".format(_ext)
634 )
635 continue
636 # get time from filename
637 # Ugly, but works
638 _t = _fname.split('-')[0]
639 _str_time = _t[:14] + "+" + _t[14:]
640 _t = datetime.strptime(_str_time, _file_datetime_fmt)
641 _time = _t.strftime(_datetime_fmt)
642 self.results[_time] = self.load_dumped_result(
643 os.path.join(path, _fname)
644 )
645 logger_cli.info(
646 "-> Loaded '{}' as '{}'".format(
647 _fname,
648 _time
649 )
650 )
651
Alexb2129542021-11-23 15:49:42 -0600652 def dump_result(self, filename, data):
653 # Function dumps all available results as jsons to the given path
Alex3034ba52021-11-13 17:06:45 -0600654 # overwriting if needed
Alexb2129542021-11-23 15:49:42 -0600655 _folder, _file = os.path.split(filename)
656 # Do dump
657 if not os.path.exists(_folder):
658 os.mkdir(_folder)
659 logger_cli.info("-> Created folder '{}'".format(_folder))
660 # Dump agent data for this test run
661 write_str_to_file(filename, json.dumps(data, indent=2))
662 logger_cli.info("-> Dumped '{}'".format(filename))
Alex3034ba52021-11-13 17:06:45 -0600663 return
664
Alexb2129542021-11-23 15:49:42 -0600665 def load_dumped_result(self, filename):
666 try:
667 with open(filename, "rt+") as f:
668 return json.loads(f.read())
669 except FileNotFoundError as e:
670 logger_cli.error(
671 "ERROR: {}".format(e)
672 )
673 except TypeError as e:
674 logger_cli.error(
675 "ERROR: Invalid file ({}): {}".format(filename, e)
676 )
677 except json.decoder.JSONDecodeError as e:
678 logger_cli.error(
679 "ERROR: Failed to decode json ({}): {}".format(filename, e)
680 )
681 return None
682
683 def _lookup_storage_class_id_by_name(self, storage_class_name):
684 # Assume that self had proper data
685 for _pool in self.ceph_df["pools"]:
686 if storage_class_name == _pool["name"]:
687 return _pool["id"]
688 return None
689
690 def calculate_totals(self):
691 # Calculate totals for Read and Write
692 for _time, data in self.results.items():
693 if "totals" not in data:
694 data["totals"] = {}
695 else:
696 continue
697 _totals = data["totals"]
698 _r_bw = 0
699 _r_avglat = []
700 _r_iops = 0
701 _w_bw = 0
702 _w_avglat = []
703 _w_iops = 0
704 for _a, _d in data["agents"].items():
705 # Hardcoded number of jobs param :(
Alex90ac1532021-12-09 11:13:14 -0600706 _j = _d["jobs"][0]
Alexb2129542021-11-23 15:49:42 -0600707 _r_bw += _j["read"]["bw_bytes"]
708 _r_avglat += [_j["read"]["lat_ns"]["mean"]]
709 _r_iops += _j["read"]["iops"]
710 _w_bw += _j["write"]["bw_bytes"]
711 _w_avglat += [_j["write"]["lat_ns"]["mean"]]
712 _w_iops += _j["write"]["iops"]
713 # Save storage class name
714 if "storage_class" not in _totals:
715 _totals["storage_class"] = \
716 self.agent_results[_a]["storage_class"]
717 # Lookup storage class id and num_pg
718 _totals["storage_class_stats"] = \
719 reporter.get_pool_stats_by_id(
720 self._lookup_storage_class_id_by_name(
721 self.agent_results[_a]["storage_class"]
722 ),
723 self.ceph_pg_dump
724 )
725
726 _totals["read_bw_bytes"] = _r_bw
727 _totals["read_avg_lat_us"] = \
728 (sum(_r_avglat) / len(_r_avglat)) / 1000
729 _totals["read_iops"] = _r_iops
730 _totals["write_bw_bytes"] = _w_bw
731 _totals["write_avg_lat_us"] = \
732 (sum(_w_avglat) / len(_w_avglat)) / 1000
733 _totals["write_iops"] = _w_iops
734
Alex90ac1532021-12-09 11:13:14 -0600735 def calculate_ceph_stats(self):
736 # func to get values as lists
Alex30380a42021-12-20 16:11:20 -0600737 def _get_max_value(key, stats):
738 _max_time = 0
739 _value = 0
740 for _k, _v in stats.items():
741 if key in _v and _value < _v[key]:
742 _max_time = _k
743 _value = _v[key]
744 return _max_time, _value
Alex90ac1532021-12-09 11:13:14 -0600745
746 def _perc(n, m):
747 if not n:
748 return 0
749 elif not m:
750 return 0
751 else:
Alex30380a42021-12-20 16:11:20 -0600752 return "{:.0f}%".format((n / m) * 100)
753
754 def _axis_vals(val):
755 return [
756 val, int(val*1.1), int(val*0.75), int(val*0.50), int(val*0.15)
757 ]
Alex90ac1532021-12-09 11:13:14 -0600758
759 _stats = {}
760 for _time, data in self.results.items():
761 if "ceph" not in data:
762 logger_cli.warning(
763 "WARNING: Ceph stats raw data not found in results"
764 )
765 continue
766 if "ceph_stats" not in data:
767 data["ceph_stats"] = {}
768 else:
769 continue
770 # Copy pool stats data
771 for _e, _d in data["ceph"].items():
772 _stats[_e] = _d["pgmap"]
773 # Maximums
Alex30380a42021-12-20 16:11:20 -0600774 mrb_t, mrb = _get_max_value("read_bytes_sec", _stats)
775 mwb_t, mwb = _get_max_value("write_bytes_sec", _stats)
776 mri_t, mri = _get_max_value("read_op_per_sec", _stats)
777 mwi_t, mwi = _get_max_value("write_op_per_sec", _stats)
Alex90ac1532021-12-09 11:13:14 -0600778 # Replace ceph with shorter data
779 data["ceph"] = {
Alex30380a42021-12-20 16:11:20 -0600780 "max_rbl": _axis_vals(mrb),
781 "max_rbl_time": mrb_t,
782 "max_wbl": _axis_vals(mwb),
783 "max_wbl_time": mwb_t,
784 "max_ril": _axis_vals(mri),
785 "max_ril_time": mri_t,
786 "max_wil": _axis_vals(mwi),
787 "max_wil_time": mwi_t,
Alex90ac1532021-12-09 11:13:14 -0600788 "stats": _stats
789 }
790 # Calculate %% values for barchart
791 for _e, _d in data["ceph"]["stats"].items():
792 _d["read_bytes_sec_perc"] = \
Alex30380a42021-12-20 16:11:20 -0600793 _perc(_d.get("read_bytes_sec", 0), mrb)
Alex90ac1532021-12-09 11:13:14 -0600794 _d["write_bytes_sec_perc"] = \
Alex30380a42021-12-20 16:11:20 -0600795 _perc(_d.get("write_bytes_sec", 0), mwb)
Alex90ac1532021-12-09 11:13:14 -0600796 _d["read_op_per_sec_perc"] = \
Alex30380a42021-12-20 16:11:20 -0600797 _perc(_d.get("read_op_per_sec", 0), mri)
Alex90ac1532021-12-09 11:13:14 -0600798 _d["write_op_per_sec_perc"] = \
Alex30380a42021-12-20 16:11:20 -0600799 _perc(_d.get("write_op_per_sec", 0), mwi)
800 return
801
802 def osd_df_compare(self, _time):
803 def _get_osd(osd_id, nodes):
804 for osd in nodes:
805 if osd["id"] == osd_id:
806 return osd
807 return None
808
809 logger_cli.info("# Comparing OSD stats")
810 _osd = {}
811 if _time not in self.results:
812 logger_cli.warning("WARNING: {} not found in results. Check data")
813 return
814 data = self.results[_time]
815 # Save summary
816 data["osd_summary"] = {}
817 data["osd_summary"]["before"] = data["osd_df_before"]["summary"]
818 data["osd_summary"]["after"] = data["osd_df_after"]["summary"]
819 data["osd_summary"]["active"] = {
820 "status": "",
821 "device_class": "",
822 "pgs": "",
823 "kb_used": 0,
824 "kb_used_data": 0,
825 "kb_used_omap": 0,
826 "kb_used_meta": 0,
827 "utilization": 0,
828 "var_down": 0,
829 "var_up": 0
830 }
831 # Compare OSD counts
832 osds_before = len(data["osd_df_before"]["nodes"])
833 osds_after = len(data["osd_df_after"]["nodes"])
834 if osds_before != osds_after:
835 logger_cli.warning(
836 "WARNING: Before/After bench OSD "
837 "count mismatch for '{}'".format(_time)
838 )
839 # iterate osds from before
840 _pgs = 0
841 _classes = set()
842 _nodes_up = 0
843 for idx in range(osds_before):
844 _osd_b = data["osd_df_before"]["nodes"][idx]
845 # search for the same osd in after
846 _osd_a = _get_osd(_osd_b["id"], data["osd_df_after"]["nodes"])
847 # Save data to the new place
848 _osd[_osd_b["name"]] = {}
849 _osd[_osd_b["name"]]["before"] = _osd_b
850 if not _osd_a:
851 # If this happen, Ceph cluster is actually broken
852 logger_cli.warning(
853 "WARNING: Wow! {} dissapered".format(_osd_b["name"])
854 )
855 _osd[_osd_b["name"]]["after"] = {}
856 else:
857 _osd[_osd_b["name"]]["after"] = _osd_a
858 _osd[_osd_b["name"]]["percent"] = {}
859 # Calculate summary using "after" data
860 _pgs += _osd_a["pgs"]
861 _classes.update([_osd_a["device_class"]])
862 if _osd_a["status"] == "up":
863 _nodes_up += 1
864 # compare
865 _keys_b = list(_osd_b.keys())
866 _keys_a = list(_osd_a.keys())
867 _nodes_up
868 # To be safe, detect if some keys are different
869 # ...and log it.
870 _diff = set(_keys_b).symmetric_difference(_keys_a)
871 if len(_diff) > 0:
872 # This should never happen, actually
873 logger_cli.warning(
874 "WARNING: Before/after keys mismatch "
875 "for OSD node {}: {}".format(idx, ", ".join(_diff))
876 )
877 continue
878 # Compare each key and calculate how it changed
879 for k in _keys_b:
880 if _osd_b[k] != _osd_a[k]:
881 # Announce change
882 logger_cli.debug(
883 "-> {:4}: {}, {} -> {}".format(
884 idx,
885 k,
886 _osd_b[k],
887 _osd_a[k]
888 )
889 )
890 # calculate percent
891 _change_perc = (_osd_a[k] / _osd_b[k]) * 100 - 100
892 _osd[_osd_b["name"]]["percent"][k] = _change_perc
893
894 # Increase counters
895 _p = data["osd_summary"]["active"]
896
897 if k not in _p:
898 _p[k] = 1
899 else:
900 _p[k] += 1
901 if k == "var":
902 if _change_perc > 0:
903 _p["var_up"] += 1
904 elif _change_perc < 0:
905 _p["var_down"] += 1
906 # Save sorted data
907 data["osds"] = _osd
908 logger_cli.info("-> Removing redundand osd before/after data")
909 data.pop("osd_df_before")
910 data.pop("osd_df_after")
911 # Save summary
912 data["osd_summary"]["active"]["status"] = "{}".format(_nodes_up)
913 data["osd_summary"]["active"]["device_class"] = \
914 "{}".format(len(list(_classes)))
915 data["osd_summary"]["active"]["pgs"] = _pgs
Alex90ac1532021-12-09 11:13:14 -0600916 return
917
Alex5cace3b2021-11-10 16:40:37 -0600918 # Create report
Alex2a7657c2021-11-10 20:51:34 -0600919 def create_report(self, filename):
Alexb2129542021-11-23 15:49:42 -0600920 """
921 Create static html showing ceph info report
922
923 :return: none
924 """
925 logger_cli.info("### Generating report to '{}'".format(filename))
926 _report = reporter.ReportToFile(
927 reporter.HTMLCephBench(self),
928 filename
929 )
Alexb2129542021-11-23 15:49:42 -0600930 _report(
931 {
932 "results": self.results,
933 "idle_status": self.ceph_idle_status,
934 "health_detail": self.health_detail,
935 "ceph_df": self.ceph_df,
936 "ceph_pg_dump": self.ceph_pg_dump,
937 "info": self.ceph_info.ceph_info,
938 "cluster": self.ceph_info.cluster_info,
939 "ceph_version": self.ceph_info.ceph_version,
940 "nodes": self.agent_pods
941 }
942 )
943 logger_cli.info("-> Done")
Alex5cace3b2021-11-10 16:40:37 -0600944
945 return