Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 1 | import csv |
| 2 | import os |
| 3 | import json |
| 4 | |
Alex | 3034ba5 | 2021-11-13 17:06:45 -0600 | [diff] [blame^] | 5 | from datetime import datetime, timedelta, timezone |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 6 | from time import sleep |
| 7 | |
Alex | dcb792f | 2021-10-04 14:24:21 -0500 | [diff] [blame] | 8 | from cfg_checker.common import logger_cli |
Alex | 3034ba5 | 2021-11-13 17:06:45 -0600 | [diff] [blame^] | 9 | from cfg_checker.common.decorators import retry |
Alex | bfa947c | 2021-11-11 18:14:28 -0600 | [diff] [blame] | 10 | from cfg_checker.helpers.console_utils import Progress |
Alex | dcb792f | 2021-10-04 14:24:21 -0500 | [diff] [blame] | 11 | # from cfg_checker.common.exception import InvalidReturnException |
| 12 | # from cfg_checker.common.exception import ConfigException |
| 13 | # from cfg_checker.common.exception import KubeException |
| 14 | |
| 15 | from cfg_checker.nodes import KubeNodes |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 16 | from cfg_checker.agent.fio_runner import _get_seconds, _datetime_fmt |
Alex | dcb792f | 2021-10-04 14:24:21 -0500 | [diff] [blame] | 17 | |
| 18 | |
| 19 | class CephBench(object): |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 20 | _agent_template = "cfgagent-template.yaml" |
| 21 | |
| 22 | def __init__(self, config): |
Alex | dcb792f | 2021-10-04 14:24:21 -0500 | [diff] [blame] | 23 | self.env_config = config |
| 24 | return |
| 25 | |
Alex | dcb792f | 2021-10-04 14:24:21 -0500 | [diff] [blame] | 26 | |
| 27 | class SaltCephBench(CephBench): |
| 28 | def __init__( |
| 29 | self, |
| 30 | config |
| 31 | ): |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 32 | logger_cli.error("ERROR: Not impelented for Salt environment!") |
Alex | dcb792f | 2021-10-04 14:24:21 -0500 | [diff] [blame] | 33 | |
| 34 | # self.master = SaltNodes(config) |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 35 | super(SaltCephBench, self).__init__(config) |
Alex | dcb792f | 2021-10-04 14:24:21 -0500 | [diff] [blame] | 36 | return |
| 37 | |
| 38 | |
| 39 | class KubeCephBench(CephBench): |
| 40 | def __init__(self, config): |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 41 | self.agent_count = config.bench_agent_count |
Alex | dcb792f | 2021-10-04 14:24:21 -0500 | [diff] [blame] | 42 | self.master = KubeNodes(config) |
| 43 | super(KubeCephBench, self).__init__(config) |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 44 | self.storage_class = config.bench_storage_class |
| 45 | self.agent_pods = [] |
| 46 | self.services = [] |
Alex | 3034ba5 | 2021-11-13 17:06:45 -0600 | [diff] [blame^] | 47 | self.scheduled_delay = 30 |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 48 | self.mode = config.bench_mode |
| 49 | if config.bench_mode == "tasks": |
Alex | 3034ba5 | 2021-11-13 17:06:45 -0600 | [diff] [blame^] | 50 | self.taskfile = config.bench_task_file |
| 51 | self.load_tasks(self.taskfile) |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 52 | |
Alex | 2a7657c | 2021-11-10 20:51:34 -0600 | [diff] [blame] | 53 | self.cleanup_list = [] |
Alex | 3034ba5 | 2021-11-13 17:06:45 -0600 | [diff] [blame^] | 54 | self.results = {} |
Alex | 2a7657c | 2021-11-10 20:51:34 -0600 | [diff] [blame] | 55 | |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 56 | def load_tasks(self, taskfile): |
| 57 | # Load csv file |
| 58 | logger_cli.info("-> Loading taskfile '{}'".format(taskfile)) |
| 59 | self.tasks = [] |
| 60 | with open(taskfile) as f: |
| 61 | _reader = csv.reader(f, delimiter=',') |
| 62 | # load packages |
| 63 | for row in _reader: |
| 64 | self.tasks.append({ |
| 65 | "readwrite": row[0], |
| 66 | "rwmixread": row[1], |
| 67 | "bs": row[2], |
| 68 | "iodepth": row[3], |
| 69 | "size": row[4] |
| 70 | }) |
| 71 | |
Alex | 2a7657c | 2021-11-10 20:51:34 -0600 | [diff] [blame] | 72 | def add_for_deletion(self, obj, typ): |
| 73 | _d = [typ, obj.metadata.namespace, obj.metadata.name] |
| 74 | self.cleanup_list.append(_d) |
| 75 | return |
| 76 | |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 77 | def prepare_agents(self, options): |
| 78 | logger_cli.info("# Preparing {} agents".format(self.agent_count)) |
| 79 | for idx in range(self.agent_count): |
| 80 | # create pvc/pv and pod |
| 81 | logger_cli.info("-> creating agent '{:02}'".format(idx)) |
| 82 | _agent, _pv, _pvc = self.master.prepare_benchmark_agent( |
| 83 | idx, |
| 84 | os.path.split(options["filename"])[0], |
| 85 | self.storage_class, |
| 86 | options['size'] + 'i', |
| 87 | self._agent_template |
| 88 | ) |
Alex | 2a7657c | 2021-11-10 20:51:34 -0600 | [diff] [blame] | 89 | # save it to lists |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 90 | self.agent_pods.append(_agent) |
Alex | 2a7657c | 2021-11-10 20:51:34 -0600 | [diff] [blame] | 91 | self.add_for_deletion(_pv, "pv") |
| 92 | self.add_for_deletion(_pvc, "pvc") |
| 93 | self.add_for_deletion(_agent, "pod") |
| 94 | |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 95 | # expose it |
| 96 | _svc = self.master.expose_benchmark_agent(_agent) |
Alex | 2a7657c | 2021-11-10 20:51:34 -0600 | [diff] [blame] | 97 | self.add_for_deletion(_svc, "svc") |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 98 | # Save service |
| 99 | self.services.append(_svc) |
Alex | 3034ba5 | 2021-11-13 17:06:45 -0600 | [diff] [blame^] | 100 | # prepopulate results |
| 101 | self.results[_agent.metadata.name] = {} |
| 102 | self.results[_agent.metadata.name]["list"] = {} |
| 103 | self.results[_agent.metadata.name]["url"] = \ |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 104 | "http://{}:{}/api/".format( |
Alex | 3034ba5 | 2021-11-13 17:06:45 -0600 | [diff] [blame^] | 105 | _svc.spec.cluster_ip, |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 106 | 8765 |
| 107 | ) |
Alex | 3034ba5 | 2021-11-13 17:06:45 -0600 | [diff] [blame^] | 108 | self.results[_agent.metadata.name]["storage_class"] = \ |
| 109 | self.storage_class |
| 110 | self.results[_agent.metadata.name]["volume_size"] = \ |
| 111 | options['size'] |
| 112 | |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 113 | logger_cli.info("-> Done creating agents") |
| 114 | return |
| 115 | |
| 116 | def _poke_agent(self, url, body, action="GET"): |
| 117 | _datafile = "/tmp/data" |
| 118 | _data = [ |
Alex | bfa947c | 2021-11-11 18:14:28 -0600 | [diff] [blame] | 119 | "-d", |
| 120 | "@" + _datafile |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 121 | ] |
| 122 | _cmd = [ |
| 123 | "curl", |
| 124 | "-s", |
| 125 | "-H", |
| 126 | "'Content-Type: application/json'", |
| 127 | "-X", |
| 128 | action, |
| 129 | url |
| 130 | ] |
| 131 | if body: |
| 132 | _cmd += _data |
| 133 | _ret = self.master.prepare_json_in_pod( |
| 134 | self.agent_pods[0].metadata.name, |
| 135 | self.master._namespace, |
| 136 | body, |
| 137 | _datafile |
| 138 | ) |
| 139 | _ret = self.master.exec_cmd_on_target_pod( |
| 140 | self.agent_pods[0].metadata.name, |
| 141 | self.master._namespace, |
| 142 | " ".join(_cmd) |
| 143 | ) |
| 144 | try: |
| 145 | return json.loads(_ret) |
| 146 | except TypeError as e: |
| 147 | logger_cli.error( |
| 148 | "ERROR: Status not decoded: {}\n{}".format(e, _ret) |
| 149 | ) |
| 150 | except json.decoder.JSONDecodeError as e: |
| 151 | logger_cli.error( |
| 152 | "ERROR: Status not decoded: {}\n{}".format(e, _ret) |
| 153 | ) |
| 154 | |
| 155 | return None |
| 156 | |
Alex | 3034ba5 | 2021-11-13 17:06:45 -0600 | [diff] [blame^] | 157 | def _ensure_agents_ready(self): |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 158 | # make sure agents idle |
Alex | 3034ba5 | 2021-11-13 17:06:45 -0600 | [diff] [blame^] | 159 | _status_set = [] |
| 160 | _ready_set = [] |
| 161 | for _agent, _d in self.get_agents_status().items(): |
| 162 | # obviously, there should be some answer |
| 163 | if _d is None: |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 164 | logger_cli.error("ERROR: Agent status not available") |
| 165 | return False |
Alex | 3034ba5 | 2021-11-13 17:06:45 -0600 | [diff] [blame^] | 166 | # status should be idle or finished |
| 167 | if _d['status'] not in ["idle", "finished"]: |
| 168 | logger_cli.error( |
| 169 | "Agent status invalid {}:{}".format(_agent, _d['status']) |
| 170 | ) |
| 171 | _status_set += [False] |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 172 | else: |
Alex | 3034ba5 | 2021-11-13 17:06:45 -0600 | [diff] [blame^] | 173 | # Good agent |
| 174 | _status_set += [True] |
| 175 | # agent's fio shell should be in 'ready' |
| 176 | if not _d["healthcheck"]["ready"]: |
| 177 | logger_cli.error("Agent is not ready {}".format(_agent)) |
| 178 | _ready_set += [False] |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 179 | else: |
Alex | 3034ba5 | 2021-11-13 17:06:45 -0600 | [diff] [blame^] | 180 | # 'fio' shell for agent is ready |
| 181 | _ready_set += [True] |
| 182 | # all agent's statuses should be True |
| 183 | # and all 'fio' shell modules should be 'ready' |
| 184 | if not any(_status_set) or not any(_ready_set): |
| 185 | # At least one is not ready and it was already logged above |
| 186 | return False |
| 187 | else: |
| 188 | # All is good |
| 189 | return True |
| 190 | |
| 191 | def get_agents_status(self): |
| 192 | _status = {} |
| 193 | for _agent, _d in self.results.items(): |
| 194 | _status[_agent] = self._poke_agent(_d["url"] + "fio", {}) |
| 195 | return _status |
| 196 | |
| 197 | def get_agents_resultlist(self): |
| 198 | _t = {"module": "fio", "action": "get_resultlist"} |
| 199 | _status = {} |
| 200 | for _agent, _d in self.results.items(): |
| 201 | _status[_agent] = self._poke_agent(_d["url"], _t, action="POST") |
| 202 | return _status |
| 203 | |
| 204 | @retry(Exception) |
| 205 | def get_result_from_agent(self, agent, time): |
| 206 | _t = { |
| 207 | "module": "fio", |
| 208 | "action": "get_result", |
| 209 | "options": { |
| 210 | "time": time |
| 211 | } |
| 212 | } |
| 213 | return self._poke_agent(self.results[agent]["url"], _t, action="POST") |
| 214 | |
| 215 | def _get_next_scheduled_time(self): |
| 216 | _now = datetime.now(timezone.utc) |
| 217 | logger_cli.info("-> time is '{}'".format(_now.strftime(_datetime_fmt))) |
| 218 | _time = _now + timedelta(seconds=self.scheduled_delay) |
| 219 | _str_time = _time.strftime(_datetime_fmt) |
| 220 | logger_cli.info( |
| 221 | "-> next benchmark scheduled to '{}'".format(_str_time) |
| 222 | ) |
| 223 | return _str_time |
| 224 | |
| 225 | def _send_scheduled_task(self, options): |
| 226 | _task = { |
| 227 | "module": "fio", |
| 228 | "action": "do_scheduledrun", |
| 229 | "options": options |
| 230 | } |
| 231 | for _agent, _d in self.results.items(): |
| 232 | logger_cli.info( |
| 233 | "-> sending task to '{}:{}'".format(_agent, _d["url"]) |
| 234 | ) |
| 235 | _ret = self._poke_agent(_d["url"], _task, action="POST") |
| 236 | if 'error' in _ret: |
| 237 | logger_cli.error( |
| 238 | "ERROR: Agent returned: '{}'".format(_ret['error']) |
| 239 | ) |
| 240 | return False |
| 241 | # No errors detected |
| 242 | return True |
| 243 | |
| 244 | def track_benchmark(self, options): |
| 245 | _runtime = _get_seconds(options["runtime"]) |
| 246 | _ramptime = _get_seconds(options["ramp_time"]) |
| 247 | # Sum up all timings that we must wait and double it |
| 248 | _timeout = (self.scheduled_delay + _runtime + _ramptime) * 2 |
| 249 | _end = datetime.now(timezone.utc) + timedelta(seconds=_timeout) |
| 250 | while True: |
| 251 | # Print status |
| 252 | # TODO: do pooled status get |
| 253 | _sts = self.get_agents_status() |
| 254 | diff = (_end - datetime.now(timezone.utc)).total_seconds() |
| 255 | logger_cli.info("-> {:.2f}s left. Agent status:".format(diff)) |
| 256 | for _agent, _status in _sts.items(): |
| 257 | logger_cli.info( |
| 258 | "\t{}: {} ({}%)".format( |
| 259 | _agent, |
| 260 | _status["status"], |
| 261 | _status["progress"] |
| 262 | ) |
| 263 | ) |
| 264 | finished = [True for _s in _sts.values() |
| 265 | if _s["status"] == 'finished'] |
| 266 | _fcnt = len(finished) |
| 267 | _tcnt = len(_sts) |
| 268 | if _fcnt < _tcnt: |
| 269 | logger_cli.info("-> {}/{} finished".format(_fcnt, _tcnt)) |
| 270 | else: |
| 271 | logger_cli.info("-> All agents finished run") |
| 272 | return True |
| 273 | # recalc how much is left |
| 274 | diff = (_end - datetime.now(timezone.utc)).total_seconds() |
| 275 | # In case end_datetime was in past to begin with |
| 276 | if diff < 0: |
| 277 | logger_cli.info("-> Timed out waiting for agents to finish") |
| 278 | return False |
| 279 | logger_cli.info("-> Sleeping for {:.2f}s".format(diff/3)) |
| 280 | sleep(diff/3) |
| 281 | if diff <= 0.1: |
| 282 | logger_cli.info("-> Timed out waiting for agents to finish") |
| 283 | return False |
| 284 | |
| 285 | def _do_testrun(self, options): |
| 286 | # send single to agent |
| 287 | if not self._send_scheduled_task(options): |
| 288 | return False |
| 289 | # Track this benchmark progress |
| 290 | if not self.track_benchmark(options): |
| 291 | return False |
| 292 | else: |
| 293 | logger_cli.info("-> Finished testrun") |
| 294 | # Get results for each agent |
| 295 | self.collect_results() |
| 296 | return True |
| 297 | |
| 298 | def _wait_ceph_cooldown(self): |
| 299 | # TODO: Query Ceph ince a 20 sec to make sure its load dropped |
| 300 | |
| 301 | return |
| 302 | |
| 303 | def run_benchmark(self, options): |
| 304 | logger_cli.info("# Starting '{}' benchmark".format(self.mode)) |
| 305 | # Check agent readyness |
| 306 | logger_cli.info("# Checking agents") |
| 307 | if not self._ensure_agents_ready(): |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 308 | return False |
| 309 | |
| 310 | # Make sure that Ceph is at low load |
| 311 | # TODO: Ceph status check |
Alex | 3034ba5 | 2021-11-13 17:06:45 -0600 | [diff] [blame^] | 312 | self._wait_ceph_cooldown() |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 313 | |
| 314 | # Do benchmark according to mode |
| 315 | if self.mode == "tasks": |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 316 | logger_cli.info( |
Alex | 3034ba5 | 2021-11-13 17:06:45 -0600 | [diff] [blame^] | 317 | "# Running benchmark with tasks from '{}'".format( |
| 318 | self.taskfile |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 319 | ) |
Alex | 3034ba5 | 2021-11-13 17:06:45 -0600 | [diff] [blame^] | 320 | ) |
| 321 | # take next task |
| 322 | _total_tasks = len(self.tasks) |
| 323 | for idx in range(_total_tasks): |
| 324 | _task = self.tasks[idx] |
| 325 | logger_cli.info( |
| 326 | "-> Starting next task ({}/{})".format(idx+1, _total_tasks) |
| 327 | ) |
| 328 | logger_cli.info("-> Updating options with: {}".format( |
| 329 | ", ".join( |
| 330 | ["{} = {}".format(k, v) for k, v in _task.items()] |
| 331 | ) |
| 332 | ) |
| 333 | ) |
| 334 | # update options |
| 335 | options.update(_task) |
| 336 | # init time to schedule |
| 337 | options["scheduled_to"] = self._get_next_scheduled_time() |
| 338 | if not self._do_testrun(options): |
| 339 | return False |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 340 | |
Alex | 3034ba5 | 2021-11-13 17:06:45 -0600 | [diff] [blame^] | 341 | self._wait_ceph_cooldown() |
| 342 | elif self.mode == "single": |
| 343 | logger_cli.info("# Running single benchmark") |
| 344 | # init time to schedule |
| 345 | options["scheduled_to"] = self._get_next_scheduled_time() |
| 346 | if not self._do_testrun(options): |
| 347 | return False |
| 348 | else: |
| 349 | logger_cli.error("ERROR: Unknown mode '{}'".format(self.mode)) |
| 350 | return False |
| 351 | |
| 352 | # Normal exit |
| 353 | logger_cli.info("# All benchmark tasks done") |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 354 | return True |
| 355 | |
| 356 | def cleanup(self): |
Alex | 2a7657c | 2021-11-10 20:51:34 -0600 | [diff] [blame] | 357 | self.cleanup_list.reverse() |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 358 | |
Alex | 2a7657c | 2021-11-10 20:51:34 -0600 | [diff] [blame] | 359 | for _res in self.cleanup_list: |
| 360 | self.master.cleanup_resource_by_name(_res[0], _res[2], ns=_res[1]) |
Alex | bfa947c | 2021-11-11 18:14:28 -0600 | [diff] [blame] | 361 | |
| 362 | # Wait for resource to be cleaned |
| 363 | _timeout = 120 |
| 364 | _total = len(self.cleanup_list) |
| 365 | logger_cli.info("-> Wait until {} resources cleaned".format(_total)) |
| 366 | _p = Progress(_total) |
| 367 | while True: |
| 368 | _g = self.master.get_resource_phase_by_name |
| 369 | _l = [_g(r[0], r[2], ns=r[1]) for r in self.cleanup_list] |
| 370 | _l = [item for item in _l if item] |
| 371 | _idx = _total - len(_l) |
| 372 | if len(_l) > 0: |
| 373 | _p.write_progress(_idx) |
| 374 | else: |
Alex | 3034ba5 | 2021-11-13 17:06:45 -0600 | [diff] [blame^] | 375 | _p.write_progress(_idx) |
Alex | bfa947c | 2021-11-11 18:14:28 -0600 | [diff] [blame] | 376 | _p.end() |
| 377 | logger_cli.info("# Done cleaning up") |
| 378 | break |
| 379 | if _timeout > 0: |
| 380 | _timeout -= 1 |
| 381 | else: |
| 382 | _p.end() |
| 383 | logger_cli.info("# Timed out waiting after 120s.") |
| 384 | break |
| 385 | |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 386 | return |
| 387 | |
Alex | 3034ba5 | 2021-11-13 17:06:45 -0600 | [diff] [blame^] | 388 | def collect_results(self): |
| 389 | logger_cli.info("# Collecting results") |
| 390 | # query agents for results |
| 391 | _agents = self.get_agents_resultlist() |
| 392 | |
| 393 | for _agent, _l in _agents.items(): |
| 394 | _list = _l["resultlist"] |
| 395 | _new = [r for r in _list if r not in self.results[_agent]["list"]] |
| 396 | logger_cli.debug( |
| 397 | "... agent '{}' has {} new results".format(_agent, len(_new)) |
| 398 | ) |
| 399 | # get all new results |
| 400 | for _time in _new: |
| 401 | logger_cli.info( |
| 402 | "-> loading results for '{}' from '{}'".format( |
| 403 | _time, |
| 404 | _agent |
| 405 | ) |
| 406 | ) |
| 407 | self.results[_agent]["list"].update( |
| 408 | self.get_result_from_agent(_agent, _time) |
| 409 | ) |
| 410 | return |
| 411 | |
| 412 | def dump_results(self, path): |
| 413 | # Function dumps all availabkle results as jsons to the given path |
| 414 | # overwriting if needed |
| 415 | |
| 416 | # TODO: Conduct the dumping |
| 417 | |
| 418 | return |
| 419 | |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 420 | # Create report |
Alex | 2a7657c | 2021-11-10 20:51:34 -0600 | [diff] [blame] | 421 | def create_report(self, filename): |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 422 | |
| 423 | return |