blob: 7640440e17bc523141bc9f26df0c3488f96b0175 [file] [log] [blame]
Alex5cace3b2021-11-10 16:40:37 -06001import csv
2import os
3import json
4
Alex3034ba52021-11-13 17:06:45 -06005from datetime import datetime, timedelta, timezone
Alex5cace3b2021-11-10 16:40:37 -06006from time import sleep
7
Alexdcb792f2021-10-04 14:24:21 -05008from cfg_checker.common import logger_cli
Alex3034ba52021-11-13 17:06:45 -06009from cfg_checker.common.decorators import retry
Alexbfa947c2021-11-11 18:14:28 -060010from cfg_checker.helpers.console_utils import Progress
Alexdcb792f2021-10-04 14:24:21 -050011# from cfg_checker.common.exception import InvalidReturnException
12# from cfg_checker.common.exception import ConfigException
13# from cfg_checker.common.exception import KubeException
14
15from cfg_checker.nodes import KubeNodes
Alex5cace3b2021-11-10 16:40:37 -060016from cfg_checker.agent.fio_runner import _get_seconds, _datetime_fmt
Alexdcb792f2021-10-04 14:24:21 -050017
18
19class CephBench(object):
Alex5cace3b2021-11-10 16:40:37 -060020 _agent_template = "cfgagent-template.yaml"
21
22 def __init__(self, config):
Alexdcb792f2021-10-04 14:24:21 -050023 self.env_config = config
24 return
25
Alexdcb792f2021-10-04 14:24:21 -050026
27class SaltCephBench(CephBench):
28 def __init__(
29 self,
30 config
31 ):
Alex5cace3b2021-11-10 16:40:37 -060032 logger_cli.error("ERROR: Not impelented for Salt environment!")
Alexdcb792f2021-10-04 14:24:21 -050033
34 # self.master = SaltNodes(config)
Alex5cace3b2021-11-10 16:40:37 -060035 super(SaltCephBench, self).__init__(config)
Alexdcb792f2021-10-04 14:24:21 -050036 return
37
38
39class KubeCephBench(CephBench):
40 def __init__(self, config):
Alex5cace3b2021-11-10 16:40:37 -060041 self.agent_count = config.bench_agent_count
Alexdcb792f2021-10-04 14:24:21 -050042 self.master = KubeNodes(config)
43 super(KubeCephBench, self).__init__(config)
Alex5cace3b2021-11-10 16:40:37 -060044 self.storage_class = config.bench_storage_class
45 self.agent_pods = []
46 self.services = []
Alex3034ba52021-11-13 17:06:45 -060047 self.scheduled_delay = 30
Alex5cace3b2021-11-10 16:40:37 -060048 self.mode = config.bench_mode
49 if config.bench_mode == "tasks":
Alex3034ba52021-11-13 17:06:45 -060050 self.taskfile = config.bench_task_file
51 self.load_tasks(self.taskfile)
Alex5cace3b2021-11-10 16:40:37 -060052
Alex2a7657c2021-11-10 20:51:34 -060053 self.cleanup_list = []
Alex3034ba52021-11-13 17:06:45 -060054 self.results = {}
Alex2a7657c2021-11-10 20:51:34 -060055
Alex5cace3b2021-11-10 16:40:37 -060056 def load_tasks(self, taskfile):
57 # Load csv file
58 logger_cli.info("-> Loading taskfile '{}'".format(taskfile))
59 self.tasks = []
60 with open(taskfile) as f:
61 _reader = csv.reader(f, delimiter=',')
62 # load packages
63 for row in _reader:
64 self.tasks.append({
65 "readwrite": row[0],
66 "rwmixread": row[1],
67 "bs": row[2],
68 "iodepth": row[3],
69 "size": row[4]
70 })
71
Alex2a7657c2021-11-10 20:51:34 -060072 def add_for_deletion(self, obj, typ):
73 _d = [typ, obj.metadata.namespace, obj.metadata.name]
74 self.cleanup_list.append(_d)
75 return
76
Alex5cace3b2021-11-10 16:40:37 -060077 def prepare_agents(self, options):
78 logger_cli.info("# Preparing {} agents".format(self.agent_count))
79 for idx in range(self.agent_count):
80 # create pvc/pv and pod
81 logger_cli.info("-> creating agent '{:02}'".format(idx))
82 _agent, _pv, _pvc = self.master.prepare_benchmark_agent(
83 idx,
84 os.path.split(options["filename"])[0],
85 self.storage_class,
86 options['size'] + 'i',
87 self._agent_template
88 )
Alex2a7657c2021-11-10 20:51:34 -060089 # save it to lists
Alex5cace3b2021-11-10 16:40:37 -060090 self.agent_pods.append(_agent)
Alex2a7657c2021-11-10 20:51:34 -060091 self.add_for_deletion(_pv, "pv")
92 self.add_for_deletion(_pvc, "pvc")
93 self.add_for_deletion(_agent, "pod")
94
Alex5cace3b2021-11-10 16:40:37 -060095 # expose it
96 _svc = self.master.expose_benchmark_agent(_agent)
Alex2a7657c2021-11-10 20:51:34 -060097 self.add_for_deletion(_svc, "svc")
Alex5cace3b2021-11-10 16:40:37 -060098 # Save service
99 self.services.append(_svc)
Alex3034ba52021-11-13 17:06:45 -0600100 # prepopulate results
101 self.results[_agent.metadata.name] = {}
102 self.results[_agent.metadata.name]["list"] = {}
103 self.results[_agent.metadata.name]["url"] = \
Alex5cace3b2021-11-10 16:40:37 -0600104 "http://{}:{}/api/".format(
Alex3034ba52021-11-13 17:06:45 -0600105 _svc.spec.cluster_ip,
Alex5cace3b2021-11-10 16:40:37 -0600106 8765
107 )
Alex3034ba52021-11-13 17:06:45 -0600108 self.results[_agent.metadata.name]["storage_class"] = \
109 self.storage_class
110 self.results[_agent.metadata.name]["volume_size"] = \
111 options['size']
112
Alex5cace3b2021-11-10 16:40:37 -0600113 logger_cli.info("-> Done creating agents")
114 return
115
116 def _poke_agent(self, url, body, action="GET"):
117 _datafile = "/tmp/data"
118 _data = [
Alexbfa947c2021-11-11 18:14:28 -0600119 "-d",
120 "@" + _datafile
Alex5cace3b2021-11-10 16:40:37 -0600121 ]
122 _cmd = [
123 "curl",
124 "-s",
125 "-H",
126 "'Content-Type: application/json'",
127 "-X",
128 action,
129 url
130 ]
131 if body:
132 _cmd += _data
133 _ret = self.master.prepare_json_in_pod(
134 self.agent_pods[0].metadata.name,
135 self.master._namespace,
136 body,
137 _datafile
138 )
139 _ret = self.master.exec_cmd_on_target_pod(
140 self.agent_pods[0].metadata.name,
141 self.master._namespace,
142 " ".join(_cmd)
143 )
144 try:
145 return json.loads(_ret)
146 except TypeError as e:
147 logger_cli.error(
148 "ERROR: Status not decoded: {}\n{}".format(e, _ret)
149 )
150 except json.decoder.JSONDecodeError as e:
151 logger_cli.error(
152 "ERROR: Status not decoded: {}\n{}".format(e, _ret)
153 )
154
155 return None
156
Alex3034ba52021-11-13 17:06:45 -0600157 def _ensure_agents_ready(self):
Alex5cace3b2021-11-10 16:40:37 -0600158 # make sure agents idle
Alex3034ba52021-11-13 17:06:45 -0600159 _status_set = []
160 _ready_set = []
161 for _agent, _d in self.get_agents_status().items():
162 # obviously, there should be some answer
163 if _d is None:
Alex5cace3b2021-11-10 16:40:37 -0600164 logger_cli.error("ERROR: Agent status not available")
165 return False
Alex3034ba52021-11-13 17:06:45 -0600166 # status should be idle or finished
167 if _d['status'] not in ["idle", "finished"]:
168 logger_cli.error(
169 "Agent status invalid {}:{}".format(_agent, _d['status'])
170 )
171 _status_set += [False]
Alex5cace3b2021-11-10 16:40:37 -0600172 else:
Alex3034ba52021-11-13 17:06:45 -0600173 # Good agent
174 _status_set += [True]
175 # agent's fio shell should be in 'ready'
176 if not _d["healthcheck"]["ready"]:
177 logger_cli.error("Agent is not ready {}".format(_agent))
178 _ready_set += [False]
Alex5cace3b2021-11-10 16:40:37 -0600179 else:
Alex3034ba52021-11-13 17:06:45 -0600180 # 'fio' shell for agent is ready
181 _ready_set += [True]
182 # all agent's statuses should be True
183 # and all 'fio' shell modules should be 'ready'
184 if not any(_status_set) or not any(_ready_set):
185 # At least one is not ready and it was already logged above
186 return False
187 else:
188 # All is good
189 return True
190
191 def get_agents_status(self):
192 _status = {}
193 for _agent, _d in self.results.items():
194 _status[_agent] = self._poke_agent(_d["url"] + "fio", {})
195 return _status
196
197 def get_agents_resultlist(self):
198 _t = {"module": "fio", "action": "get_resultlist"}
199 _status = {}
200 for _agent, _d in self.results.items():
201 _status[_agent] = self._poke_agent(_d["url"], _t, action="POST")
202 return _status
203
204 @retry(Exception)
205 def get_result_from_agent(self, agent, time):
206 _t = {
207 "module": "fio",
208 "action": "get_result",
209 "options": {
210 "time": time
211 }
212 }
213 return self._poke_agent(self.results[agent]["url"], _t, action="POST")
214
215 def _get_next_scheduled_time(self):
216 _now = datetime.now(timezone.utc)
217 logger_cli.info("-> time is '{}'".format(_now.strftime(_datetime_fmt)))
218 _time = _now + timedelta(seconds=self.scheduled_delay)
219 _str_time = _time.strftime(_datetime_fmt)
220 logger_cli.info(
221 "-> next benchmark scheduled to '{}'".format(_str_time)
222 )
223 return _str_time
224
225 def _send_scheduled_task(self, options):
226 _task = {
227 "module": "fio",
228 "action": "do_scheduledrun",
229 "options": options
230 }
231 for _agent, _d in self.results.items():
232 logger_cli.info(
233 "-> sending task to '{}:{}'".format(_agent, _d["url"])
234 )
235 _ret = self._poke_agent(_d["url"], _task, action="POST")
236 if 'error' in _ret:
237 logger_cli.error(
238 "ERROR: Agent returned: '{}'".format(_ret['error'])
239 )
240 return False
241 # No errors detected
242 return True
243
244 def track_benchmark(self, options):
245 _runtime = _get_seconds(options["runtime"])
246 _ramptime = _get_seconds(options["ramp_time"])
247 # Sum up all timings that we must wait and double it
248 _timeout = (self.scheduled_delay + _runtime + _ramptime) * 2
249 _end = datetime.now(timezone.utc) + timedelta(seconds=_timeout)
250 while True:
251 # Print status
252 # TODO: do pooled status get
253 _sts = self.get_agents_status()
254 diff = (_end - datetime.now(timezone.utc)).total_seconds()
255 logger_cli.info("-> {:.2f}s left. Agent status:".format(diff))
256 for _agent, _status in _sts.items():
257 logger_cli.info(
258 "\t{}: {} ({}%)".format(
259 _agent,
260 _status["status"],
261 _status["progress"]
262 )
263 )
264 finished = [True for _s in _sts.values()
265 if _s["status"] == 'finished']
266 _fcnt = len(finished)
267 _tcnt = len(_sts)
268 if _fcnt < _tcnt:
269 logger_cli.info("-> {}/{} finished".format(_fcnt, _tcnt))
270 else:
271 logger_cli.info("-> All agents finished run")
272 return True
273 # recalc how much is left
274 diff = (_end - datetime.now(timezone.utc)).total_seconds()
275 # In case end_datetime was in past to begin with
276 if diff < 0:
277 logger_cli.info("-> Timed out waiting for agents to finish")
278 return False
279 logger_cli.info("-> Sleeping for {:.2f}s".format(diff/3))
280 sleep(diff/3)
281 if diff <= 0.1:
282 logger_cli.info("-> Timed out waiting for agents to finish")
283 return False
284
285 def _do_testrun(self, options):
286 # send single to agent
287 if not self._send_scheduled_task(options):
288 return False
289 # Track this benchmark progress
290 if not self.track_benchmark(options):
291 return False
292 else:
293 logger_cli.info("-> Finished testrun")
294 # Get results for each agent
295 self.collect_results()
296 return True
297
298 def _wait_ceph_cooldown(self):
299 # TODO: Query Ceph ince a 20 sec to make sure its load dropped
300
301 return
302
303 def run_benchmark(self, options):
304 logger_cli.info("# Starting '{}' benchmark".format(self.mode))
305 # Check agent readyness
306 logger_cli.info("# Checking agents")
307 if not self._ensure_agents_ready():
Alex5cace3b2021-11-10 16:40:37 -0600308 return False
309
310 # Make sure that Ceph is at low load
311 # TODO: Ceph status check
Alex3034ba52021-11-13 17:06:45 -0600312 self._wait_ceph_cooldown()
Alex5cace3b2021-11-10 16:40:37 -0600313
314 # Do benchmark according to mode
315 if self.mode == "tasks":
Alex5cace3b2021-11-10 16:40:37 -0600316 logger_cli.info(
Alex3034ba52021-11-13 17:06:45 -0600317 "# Running benchmark with tasks from '{}'".format(
318 self.taskfile
Alex5cace3b2021-11-10 16:40:37 -0600319 )
Alex3034ba52021-11-13 17:06:45 -0600320 )
321 # take next task
322 _total_tasks = len(self.tasks)
323 for idx in range(_total_tasks):
324 _task = self.tasks[idx]
325 logger_cli.info(
326 "-> Starting next task ({}/{})".format(idx+1, _total_tasks)
327 )
328 logger_cli.info("-> Updating options with: {}".format(
329 ", ".join(
330 ["{} = {}".format(k, v) for k, v in _task.items()]
331 )
332 )
333 )
334 # update options
335 options.update(_task)
336 # init time to schedule
337 options["scheduled_to"] = self._get_next_scheduled_time()
338 if not self._do_testrun(options):
339 return False
Alex5cace3b2021-11-10 16:40:37 -0600340
Alex3034ba52021-11-13 17:06:45 -0600341 self._wait_ceph_cooldown()
342 elif self.mode == "single":
343 logger_cli.info("# Running single benchmark")
344 # init time to schedule
345 options["scheduled_to"] = self._get_next_scheduled_time()
346 if not self._do_testrun(options):
347 return False
348 else:
349 logger_cli.error("ERROR: Unknown mode '{}'".format(self.mode))
350 return False
351
352 # Normal exit
353 logger_cli.info("# All benchmark tasks done")
Alex5cace3b2021-11-10 16:40:37 -0600354 return True
355
356 def cleanup(self):
Alex2a7657c2021-11-10 20:51:34 -0600357 self.cleanup_list.reverse()
Alex5cace3b2021-11-10 16:40:37 -0600358
Alex2a7657c2021-11-10 20:51:34 -0600359 for _res in self.cleanup_list:
360 self.master.cleanup_resource_by_name(_res[0], _res[2], ns=_res[1])
Alexbfa947c2021-11-11 18:14:28 -0600361
362 # Wait for resource to be cleaned
363 _timeout = 120
364 _total = len(self.cleanup_list)
365 logger_cli.info("-> Wait until {} resources cleaned".format(_total))
366 _p = Progress(_total)
367 while True:
368 _g = self.master.get_resource_phase_by_name
369 _l = [_g(r[0], r[2], ns=r[1]) for r in self.cleanup_list]
370 _l = [item for item in _l if item]
371 _idx = _total - len(_l)
372 if len(_l) > 0:
373 _p.write_progress(_idx)
374 else:
Alex3034ba52021-11-13 17:06:45 -0600375 _p.write_progress(_idx)
Alexbfa947c2021-11-11 18:14:28 -0600376 _p.end()
377 logger_cli.info("# Done cleaning up")
378 break
379 if _timeout > 0:
380 _timeout -= 1
381 else:
382 _p.end()
383 logger_cli.info("# Timed out waiting after 120s.")
384 break
385
Alex5cace3b2021-11-10 16:40:37 -0600386 return
387
Alex3034ba52021-11-13 17:06:45 -0600388 def collect_results(self):
389 logger_cli.info("# Collecting results")
390 # query agents for results
391 _agents = self.get_agents_resultlist()
392
393 for _agent, _l in _agents.items():
394 _list = _l["resultlist"]
395 _new = [r for r in _list if r not in self.results[_agent]["list"]]
396 logger_cli.debug(
397 "... agent '{}' has {} new results".format(_agent, len(_new))
398 )
399 # get all new results
400 for _time in _new:
401 logger_cli.info(
402 "-> loading results for '{}' from '{}'".format(
403 _time,
404 _agent
405 )
406 )
407 self.results[_agent]["list"].update(
408 self.get_result_from_agent(_agent, _time)
409 )
410 return
411
412 def dump_results(self, path):
413 # Function dumps all availabkle results as jsons to the given path
414 # overwriting if needed
415
416 # TODO: Conduct the dumping
417
418 return
419
Alex5cace3b2021-11-10 16:40:37 -0600420 # Create report
Alex2a7657c2021-11-10 20:51:34 -0600421 def create_report(self, filename):
Alex5cace3b2021-11-10 16:40:37 -0600422
423 return