Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 1 | import csv |
| 2 | import os |
| 3 | import json |
| 4 | |
Alex | 3034ba5 | 2021-11-13 17:06:45 -0600 | [diff] [blame] | 5 | from datetime import datetime, timedelta, timezone |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 6 | from time import sleep |
| 7 | |
Alex | dcb792f | 2021-10-04 14:24:21 -0500 | [diff] [blame] | 8 | from cfg_checker.common import logger_cli |
Alex | 3034ba5 | 2021-11-13 17:06:45 -0600 | [diff] [blame] | 9 | from cfg_checker.common.decorators import retry |
Alex | b212954 | 2021-11-23 15:49:42 -0600 | [diff] [blame^] | 10 | from cfg_checker.common.file_utils import write_str_to_file |
Alex | bfa947c | 2021-11-11 18:14:28 -0600 | [diff] [blame] | 11 | from cfg_checker.helpers.console_utils import Progress |
Alex | b212954 | 2021-11-23 15:49:42 -0600 | [diff] [blame^] | 12 | from cfg_checker.reports import reporter |
Alex | dcb792f | 2021-10-04 14:24:21 -0500 | [diff] [blame] | 13 | # from cfg_checker.common.exception import InvalidReturnException |
| 14 | # from cfg_checker.common.exception import ConfigException |
| 15 | # from cfg_checker.common.exception import KubeException |
| 16 | |
| 17 | from cfg_checker.nodes import KubeNodes |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 18 | from cfg_checker.agent.fio_runner import _get_seconds, _datetime_fmt |
Alex | dcb792f | 2021-10-04 14:24:21 -0500 | [diff] [blame] | 19 | |
| 20 | |
Alex | b212954 | 2021-11-23 15:49:42 -0600 | [diff] [blame^] | 21 | def _reformat_timestr(_str, _chars=["/", ",", " ", ":", "+"], _tchar=""): |
| 22 | _new = "" |
| 23 | for _c in _str: |
| 24 | _new += _c if _c not in _chars else _tchar |
| 25 | return _new |
| 26 | |
| 27 | |
| 28 | def _parse_json_output(buffer): |
| 29 | try: |
| 30 | return json.loads(buffer) |
| 31 | except TypeError as e: |
| 32 | logger_cli.error( |
| 33 | "ERROR: Status not decoded: {}\n{}".format(e, buffer) |
| 34 | ) |
| 35 | except json.decoder.JSONDecodeError as e: |
| 36 | logger_cli.error( |
| 37 | "ERROR: Status not decoded: {}\n{}".format(e, buffer) |
| 38 | ) |
| 39 | return {} |
| 40 | |
| 41 | |
Alex | dcb792f | 2021-10-04 14:24:21 -0500 | [diff] [blame] | 42 | class CephBench(object): |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 43 | _agent_template = "cfgagent-template.yaml" |
| 44 | |
| 45 | def __init__(self, config): |
Alex | dcb792f | 2021-10-04 14:24:21 -0500 | [diff] [blame] | 46 | self.env_config = config |
| 47 | return |
| 48 | |
Alex | dcb792f | 2021-10-04 14:24:21 -0500 | [diff] [blame] | 49 | |
| 50 | class SaltCephBench(CephBench): |
| 51 | def __init__( |
| 52 | self, |
| 53 | config |
| 54 | ): |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 55 | logger_cli.error("ERROR: Not impelented for Salt environment!") |
Alex | dcb792f | 2021-10-04 14:24:21 -0500 | [diff] [blame] | 56 | |
| 57 | # self.master = SaltNodes(config) |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 58 | super(SaltCephBench, self).__init__(config) |
Alex | dcb792f | 2021-10-04 14:24:21 -0500 | [diff] [blame] | 59 | return |
| 60 | |
| 61 | |
| 62 | class KubeCephBench(CephBench): |
| 63 | def __init__(self, config): |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 64 | self.agent_count = config.bench_agent_count |
Alex | dcb792f | 2021-10-04 14:24:21 -0500 | [diff] [blame] | 65 | self.master = KubeNodes(config) |
| 66 | super(KubeCephBench, self).__init__(config) |
Alex | b212954 | 2021-11-23 15:49:42 -0600 | [diff] [blame^] | 67 | |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 68 | self.mode = config.bench_mode |
Alex | b212954 | 2021-11-23 15:49:42 -0600 | [diff] [blame^] | 69 | self.resource_prefix = config.resource_prefix |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 70 | if config.bench_mode == "tasks": |
Alex | 3034ba5 | 2021-11-13 17:06:45 -0600 | [diff] [blame] | 71 | self.taskfile = config.bench_task_file |
| 72 | self.load_tasks(self.taskfile) |
Alex | b212954 | 2021-11-23 15:49:42 -0600 | [diff] [blame^] | 73 | elif config.bench_mode == "cleanup": |
| 74 | self.cleanup_list = [] |
| 75 | return |
| 76 | |
| 77 | self.storage_class = config.bench_storage_class |
| 78 | self.results_dump_path = config.bench_results_dump_path |
| 79 | self.agent_pods = [] |
| 80 | self.services = [] |
| 81 | # By default, |
| 82 | # 30 seconds should be enough to send tasks to 3-5 agents |
| 83 | self.scheduled_delay = 30 |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 84 | |
Alex | 2a7657c | 2021-11-10 20:51:34 -0600 | [diff] [blame] | 85 | self.cleanup_list = [] |
Alex | 3034ba5 | 2021-11-13 17:06:45 -0600 | [diff] [blame] | 86 | self.results = {} |
Alex | b212954 | 2021-11-23 15:49:42 -0600 | [diff] [blame^] | 87 | self.agent_results = {} |
| 88 | |
| 89 | def set_ceph_info_class(self, ceph_info): |
| 90 | self.ceph_info = ceph_info |
Alex | 2a7657c | 2021-11-10 20:51:34 -0600 | [diff] [blame] | 91 | |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 92 | def load_tasks(self, taskfile): |
| 93 | # Load csv file |
| 94 | logger_cli.info("-> Loading taskfile '{}'".format(taskfile)) |
| 95 | self.tasks = [] |
| 96 | with open(taskfile) as f: |
| 97 | _reader = csv.reader(f, delimiter=',') |
| 98 | # load packages |
| 99 | for row in _reader: |
| 100 | self.tasks.append({ |
| 101 | "readwrite": row[0], |
| 102 | "rwmixread": row[1], |
| 103 | "bs": row[2], |
| 104 | "iodepth": row[3], |
| 105 | "size": row[4] |
| 106 | }) |
Alex | b212954 | 2021-11-23 15:49:42 -0600 | [diff] [blame^] | 107 | logger_cli.info("-> Loaded {} tasks".format(len(self.tasks))) |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 108 | |
Alex | 2a7657c | 2021-11-10 20:51:34 -0600 | [diff] [blame] | 109 | def add_for_deletion(self, obj, typ): |
Alex | b212954 | 2021-11-23 15:49:42 -0600 | [diff] [blame^] | 110 | self.cleanup_list.append( |
| 111 | [ |
| 112 | typ, |
| 113 | obj.metadata.namespace, |
| 114 | obj.metadata.name |
| 115 | ] |
| 116 | ) |
| 117 | return |
| 118 | |
| 119 | def prepare_cleanup(self): |
| 120 | # Assume number of resources not given |
| 121 | # list all svc, pod, pvc, pv and identify 'cfgagent-xx ones |
| 122 | _types = ["pv", "pvc", "pod", "svc"] |
| 123 | _prefix = self.resource_prefix |
| 124 | for _typ in _types: |
| 125 | _list = self.master.list_resource_names_by_type_and_ns(_typ) |
| 126 | for ns, name in _list: |
| 127 | if name.startswith(_prefix): |
| 128 | if ns: |
| 129 | _msg = "{} {}/{}".format(_typ, ns, name) |
| 130 | else: |
| 131 | _msg = "{} {}".format(_typ, name) |
| 132 | logger_cli.info("-> Found {}".format(_msg)) |
| 133 | self.cleanup_list.append([_typ, ns, name]) |
Alex | 2a7657c | 2021-11-10 20:51:34 -0600 | [diff] [blame] | 134 | return |
| 135 | |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 136 | def prepare_agents(self, options): |
| 137 | logger_cli.info("# Preparing {} agents".format(self.agent_count)) |
| 138 | for idx in range(self.agent_count): |
| 139 | # create pvc/pv and pod |
| 140 | logger_cli.info("-> creating agent '{:02}'".format(idx)) |
| 141 | _agent, _pv, _pvc = self.master.prepare_benchmark_agent( |
| 142 | idx, |
| 143 | os.path.split(options["filename"])[0], |
| 144 | self.storage_class, |
| 145 | options['size'] + 'i', |
| 146 | self._agent_template |
| 147 | ) |
Alex | 2a7657c | 2021-11-10 20:51:34 -0600 | [diff] [blame] | 148 | # save it to lists |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 149 | self.agent_pods.append(_agent) |
Alex | 2a7657c | 2021-11-10 20:51:34 -0600 | [diff] [blame] | 150 | self.add_for_deletion(_pv, "pv") |
| 151 | self.add_for_deletion(_pvc, "pvc") |
| 152 | self.add_for_deletion(_agent, "pod") |
| 153 | |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 154 | # expose it |
| 155 | _svc = self.master.expose_benchmark_agent(_agent) |
Alex | 2a7657c | 2021-11-10 20:51:34 -0600 | [diff] [blame] | 156 | self.add_for_deletion(_svc, "svc") |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 157 | # Save service |
| 158 | self.services.append(_svc) |
Alex | 3034ba5 | 2021-11-13 17:06:45 -0600 | [diff] [blame] | 159 | # prepopulate results |
Alex | b212954 | 2021-11-23 15:49:42 -0600 | [diff] [blame^] | 160 | self.agent_results[_agent.metadata.name] = {} |
| 161 | self.agent_results[_agent.metadata.name]["url"] = \ |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 162 | "http://{}:{}/api/".format( |
Alex | 3034ba5 | 2021-11-13 17:06:45 -0600 | [diff] [blame] | 163 | _svc.spec.cluster_ip, |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 164 | 8765 |
| 165 | ) |
Alex | b212954 | 2021-11-23 15:49:42 -0600 | [diff] [blame^] | 166 | self.agent_results[_agent.metadata.name]["storage_class"] = \ |
Alex | 3034ba5 | 2021-11-13 17:06:45 -0600 | [diff] [blame] | 167 | self.storage_class |
Alex | b212954 | 2021-11-23 15:49:42 -0600 | [diff] [blame^] | 168 | self.agent_results[_agent.metadata.name]["volume_size"] = \ |
Alex | 3034ba5 | 2021-11-13 17:06:45 -0600 | [diff] [blame] | 169 | options['size'] |
| 170 | |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 171 | logger_cli.info("-> Done creating agents") |
Alex | b212954 | 2021-11-23 15:49:42 -0600 | [diff] [blame^] | 172 | # TODO: Update after implementing pooled task sending |
| 173 | self.scheduled_delay = self.agent_count * 6 |
| 174 | logger_cli.info( |
| 175 | "-> Schedule delay set to {} sec".format(self.scheduled_delay) |
| 176 | ) |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 177 | return |
| 178 | |
| 179 | def _poke_agent(self, url, body, action="GET"): |
| 180 | _datafile = "/tmp/data" |
| 181 | _data = [ |
Alex | bfa947c | 2021-11-11 18:14:28 -0600 | [diff] [blame] | 182 | "-d", |
| 183 | "@" + _datafile |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 184 | ] |
| 185 | _cmd = [ |
| 186 | "curl", |
| 187 | "-s", |
| 188 | "-H", |
| 189 | "'Content-Type: application/json'", |
| 190 | "-X", |
| 191 | action, |
| 192 | url |
| 193 | ] |
| 194 | if body: |
| 195 | _cmd += _data |
| 196 | _ret = self.master.prepare_json_in_pod( |
| 197 | self.agent_pods[0].metadata.name, |
| 198 | self.master._namespace, |
| 199 | body, |
| 200 | _datafile |
| 201 | ) |
| 202 | _ret = self.master.exec_cmd_on_target_pod( |
| 203 | self.agent_pods[0].metadata.name, |
| 204 | self.master._namespace, |
| 205 | " ".join(_cmd) |
| 206 | ) |
Alex | b212954 | 2021-11-23 15:49:42 -0600 | [diff] [blame^] | 207 | return _parse_json_output(_ret) |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 208 | |
Alex | 3034ba5 | 2021-11-13 17:06:45 -0600 | [diff] [blame] | 209 | def _ensure_agents_ready(self): |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 210 | # make sure agents idle |
Alex | 3034ba5 | 2021-11-13 17:06:45 -0600 | [diff] [blame] | 211 | _status_set = [] |
| 212 | _ready_set = [] |
| 213 | for _agent, _d in self.get_agents_status().items(): |
| 214 | # obviously, there should be some answer |
| 215 | if _d is None: |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 216 | logger_cli.error("ERROR: Agent status not available") |
| 217 | return False |
Alex | 3034ba5 | 2021-11-13 17:06:45 -0600 | [diff] [blame] | 218 | # status should be idle or finished |
| 219 | if _d['status'] not in ["idle", "finished"]: |
| 220 | logger_cli.error( |
| 221 | "Agent status invalid {}:{}".format(_agent, _d['status']) |
| 222 | ) |
| 223 | _status_set += [False] |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 224 | else: |
Alex | 3034ba5 | 2021-11-13 17:06:45 -0600 | [diff] [blame] | 225 | # Good agent |
| 226 | _status_set += [True] |
| 227 | # agent's fio shell should be in 'ready' |
| 228 | if not _d["healthcheck"]["ready"]: |
| 229 | logger_cli.error("Agent is not ready {}".format(_agent)) |
| 230 | _ready_set += [False] |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 231 | else: |
Alex | 3034ba5 | 2021-11-13 17:06:45 -0600 | [diff] [blame] | 232 | # 'fio' shell for agent is ready |
| 233 | _ready_set += [True] |
| 234 | # all agent's statuses should be True |
| 235 | # and all 'fio' shell modules should be 'ready' |
| 236 | if not any(_status_set) or not any(_ready_set): |
| 237 | # At least one is not ready and it was already logged above |
| 238 | return False |
| 239 | else: |
| 240 | # All is good |
| 241 | return True |
| 242 | |
| 243 | def get_agents_status(self): |
| 244 | _status = {} |
Alex | b212954 | 2021-11-23 15:49:42 -0600 | [diff] [blame^] | 245 | _results = self.master.exec_on_labeled_pods_and_ns( |
| 246 | "app=cfgagent", |
| 247 | "curl -s http://localhost:8765/api/fio" |
| 248 | ) |
| 249 | for _agent, _result in _results.items(): |
| 250 | _j = _parse_json_output(_result) |
| 251 | _status[_agent] = _j |
Alex | 3034ba5 | 2021-11-13 17:06:45 -0600 | [diff] [blame] | 252 | return _status |
| 253 | |
Alex | b212954 | 2021-11-23 15:49:42 -0600 | [diff] [blame^] | 254 | @retry(Exception, initial_wait=5) |
Alex | 3034ba5 | 2021-11-13 17:06:45 -0600 | [diff] [blame] | 255 | def get_agents_resultlist(self): |
| 256 | _t = {"module": "fio", "action": "get_resultlist"} |
| 257 | _status = {} |
Alex | b212954 | 2021-11-23 15:49:42 -0600 | [diff] [blame^] | 258 | for _agent, _d in self.agent_results.items(): |
Alex | 3034ba5 | 2021-11-13 17:06:45 -0600 | [diff] [blame] | 259 | _status[_agent] = self._poke_agent(_d["url"], _t, action="POST") |
| 260 | return _status |
| 261 | |
Alex | b212954 | 2021-11-23 15:49:42 -0600 | [diff] [blame^] | 262 | @retry(Exception, initial_wait=5) |
Alex | 3034ba5 | 2021-11-13 17:06:45 -0600 | [diff] [blame] | 263 | def get_result_from_agent(self, agent, time): |
| 264 | _t = { |
| 265 | "module": "fio", |
| 266 | "action": "get_result", |
| 267 | "options": { |
| 268 | "time": time |
| 269 | } |
| 270 | } |
Alex | b212954 | 2021-11-23 15:49:42 -0600 | [diff] [blame^] | 271 | return self._poke_agent( |
| 272 | self.agent_results[agent]["url"], |
| 273 | _t, |
| 274 | action="POST" |
| 275 | ) |
Alex | 3034ba5 | 2021-11-13 17:06:45 -0600 | [diff] [blame] | 276 | |
| 277 | def _get_next_scheduled_time(self): |
| 278 | _now = datetime.now(timezone.utc) |
| 279 | logger_cli.info("-> time is '{}'".format(_now.strftime(_datetime_fmt))) |
| 280 | _time = _now + timedelta(seconds=self.scheduled_delay) |
| 281 | _str_time = _time.strftime(_datetime_fmt) |
| 282 | logger_cli.info( |
| 283 | "-> next benchmark scheduled to '{}'".format(_str_time) |
| 284 | ) |
| 285 | return _str_time |
| 286 | |
| 287 | def _send_scheduled_task(self, options): |
| 288 | _task = { |
| 289 | "module": "fio", |
| 290 | "action": "do_scheduledrun", |
| 291 | "options": options |
| 292 | } |
Alex | b212954 | 2021-11-23 15:49:42 -0600 | [diff] [blame^] | 293 | for _agent, _d in self.agent_results.items(): |
Alex | 3034ba5 | 2021-11-13 17:06:45 -0600 | [diff] [blame] | 294 | logger_cli.info( |
| 295 | "-> sending task to '{}:{}'".format(_agent, _d["url"]) |
| 296 | ) |
| 297 | _ret = self._poke_agent(_d["url"], _task, action="POST") |
| 298 | if 'error' in _ret: |
| 299 | logger_cli.error( |
| 300 | "ERROR: Agent returned: '{}'".format(_ret['error']) |
| 301 | ) |
| 302 | return False |
| 303 | # No errors detected |
| 304 | return True |
| 305 | |
| 306 | def track_benchmark(self, options): |
| 307 | _runtime = _get_seconds(options["runtime"]) |
| 308 | _ramptime = _get_seconds(options["ramp_time"]) |
| 309 | # Sum up all timings that we must wait and double it |
| 310 | _timeout = (self.scheduled_delay + _runtime + _ramptime) * 2 |
| 311 | _end = datetime.now(timezone.utc) + timedelta(seconds=_timeout) |
| 312 | while True: |
| 313 | # Print status |
| 314 | # TODO: do pooled status get |
| 315 | _sts = self.get_agents_status() |
| 316 | diff = (_end - datetime.now(timezone.utc)).total_seconds() |
| 317 | logger_cli.info("-> {:.2f}s left. Agent status:".format(diff)) |
| 318 | for _agent, _status in _sts.items(): |
| 319 | logger_cli.info( |
| 320 | "\t{}: {} ({}%)".format( |
| 321 | _agent, |
| 322 | _status["status"], |
| 323 | _status["progress"] |
| 324 | ) |
| 325 | ) |
Alex | b212954 | 2021-11-23 15:49:42 -0600 | [diff] [blame^] | 326 | finished = [True for _s in _sts.values() |
Alex | 3034ba5 | 2021-11-13 17:06:45 -0600 | [diff] [blame] | 327 | if _s["status"] == 'finished'] |
| 328 | _fcnt = len(finished) |
| 329 | _tcnt = len(_sts) |
| 330 | if _fcnt < _tcnt: |
| 331 | logger_cli.info("-> {}/{} finished".format(_fcnt, _tcnt)) |
| 332 | else: |
| 333 | logger_cli.info("-> All agents finished run") |
| 334 | return True |
| 335 | # recalc how much is left |
| 336 | diff = (_end - datetime.now(timezone.utc)).total_seconds() |
| 337 | # In case end_datetime was in past to begin with |
| 338 | if diff < 0: |
| 339 | logger_cli.info("-> Timed out waiting for agents to finish") |
| 340 | return False |
Alex | b212954 | 2021-11-23 15:49:42 -0600 | [diff] [blame^] | 341 | else: |
| 342 | logger_cli.info("-> Sleeping for {:.2f}s".format(2)) |
| 343 | sleep(2) |
Alex | 3034ba5 | 2021-11-13 17:06:45 -0600 | [diff] [blame] | 344 | if diff <= 0.1: |
| 345 | logger_cli.info("-> Timed out waiting for agents to finish") |
| 346 | return False |
| 347 | |
| 348 | def _do_testrun(self, options): |
| 349 | # send single to agent |
| 350 | if not self._send_scheduled_task(options): |
| 351 | return False |
| 352 | # Track this benchmark progress |
| 353 | if not self.track_benchmark(options): |
| 354 | return False |
| 355 | else: |
| 356 | logger_cli.info("-> Finished testrun") |
| 357 | # Get results for each agent |
Alex | b212954 | 2021-11-23 15:49:42 -0600 | [diff] [blame^] | 358 | self.collect_results(options) |
Alex | 3034ba5 | 2021-11-13 17:06:45 -0600 | [diff] [blame] | 359 | return True |
| 360 | |
Alex | b212954 | 2021-11-23 15:49:42 -0600 | [diff] [blame^] | 361 | def wait_ceph_cooldown(self): |
Alex | 3034ba5 | 2021-11-13 17:06:45 -0600 | [diff] [blame] | 362 | # TODO: Query Ceph ince a 20 sec to make sure its load dropped |
| 363 | |
Alex | b212954 | 2021-11-23 15:49:42 -0600 | [diff] [blame^] | 364 | # get ceph idle status |
| 365 | self.ceph_idle_status = self.ceph_info.get_cluster_status() |
| 366 | self.health_detail = self.ceph_info.get_health_detail() |
| 367 | self.ceph_df = self.ceph_info.get_ceph_df() |
| 368 | self.ceph_pg_dump = self.ceph_info.get_ceph_pg_dump() |
Alex | 3034ba5 | 2021-11-13 17:06:45 -0600 | [diff] [blame] | 369 | return |
| 370 | |
| 371 | def run_benchmark(self, options): |
| 372 | logger_cli.info("# Starting '{}' benchmark".format(self.mode)) |
| 373 | # Check agent readyness |
| 374 | logger_cli.info("# Checking agents") |
| 375 | if not self._ensure_agents_ready(): |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 376 | return False |
| 377 | |
| 378 | # Make sure that Ceph is at low load |
| 379 | # TODO: Ceph status check |
Alex | b212954 | 2021-11-23 15:49:42 -0600 | [diff] [blame^] | 380 | # self._wait_ceph_cooldown() |
| 381 | |
| 382 | _get_df = self.ceph_info.get_ceph_osd_df |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 383 | |
| 384 | # Do benchmark according to mode |
| 385 | if self.mode == "tasks": |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 386 | logger_cli.info( |
Alex | 3034ba5 | 2021-11-13 17:06:45 -0600 | [diff] [blame] | 387 | "# Running benchmark with tasks from '{}'".format( |
| 388 | self.taskfile |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 389 | ) |
Alex | 3034ba5 | 2021-11-13 17:06:45 -0600 | [diff] [blame] | 390 | ) |
| 391 | # take next task |
| 392 | _total_tasks = len(self.tasks) |
| 393 | for idx in range(_total_tasks): |
Alex | b212954 | 2021-11-23 15:49:42 -0600 | [diff] [blame^] | 394 | # init time to schedule |
| 395 | _osd_df_before = _get_df() |
Alex | 3034ba5 | 2021-11-13 17:06:45 -0600 | [diff] [blame] | 396 | _task = self.tasks[idx] |
| 397 | logger_cli.info( |
| 398 | "-> Starting next task ({}/{})".format(idx+1, _total_tasks) |
| 399 | ) |
| 400 | logger_cli.info("-> Updating options with: {}".format( |
| 401 | ", ".join( |
| 402 | ["{} = {}".format(k, v) for k, v in _task.items()] |
| 403 | ) |
| 404 | ) |
| 405 | ) |
| 406 | # update options |
| 407 | options.update(_task) |
Alex | b212954 | 2021-11-23 15:49:42 -0600 | [diff] [blame^] | 408 | _sch_time = self._get_next_scheduled_time() |
| 409 | options["scheduled_to"] = _sch_time |
| 410 | # init results table |
| 411 | self.results[_sch_time] = { |
| 412 | "input_options": options, |
| 413 | "agents": {}, |
| 414 | "osd_df_before": _osd_df_before |
| 415 | } |
Alex | 3034ba5 | 2021-11-13 17:06:45 -0600 | [diff] [blame] | 416 | if not self._do_testrun(options): |
| 417 | return False |
Alex | b212954 | 2021-11-23 15:49:42 -0600 | [diff] [blame^] | 418 | else: |
| 419 | self.results[_sch_time]["osd_df_after"] = _get_df() |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 420 | |
Alex | b212954 | 2021-11-23 15:49:42 -0600 | [diff] [blame^] | 421 | self.wait_ceph_cooldown() |
Alex | 3034ba5 | 2021-11-13 17:06:45 -0600 | [diff] [blame] | 422 | elif self.mode == "single": |
| 423 | logger_cli.info("# Running single benchmark") |
Alex | b212954 | 2021-11-23 15:49:42 -0600 | [diff] [blame^] | 424 | _osd_df_before = _get_df() |
Alex | 3034ba5 | 2021-11-13 17:06:45 -0600 | [diff] [blame] | 425 | # init time to schedule |
Alex | b212954 | 2021-11-23 15:49:42 -0600 | [diff] [blame^] | 426 | _sch_time = self._get_next_scheduled_time() |
| 427 | options["scheduled_to"] = _sch_time |
| 428 | # init results table |
| 429 | self.results[_sch_time] = { |
| 430 | "input_options": options, |
| 431 | "agents": {}, |
| 432 | "osd_df_before": _osd_df_before |
| 433 | } |
Alex | 3034ba5 | 2021-11-13 17:06:45 -0600 | [diff] [blame] | 434 | if not self._do_testrun(options): |
| 435 | return False |
Alex | b212954 | 2021-11-23 15:49:42 -0600 | [diff] [blame^] | 436 | else: |
| 437 | self.results[_sch_time]["osd_df_after"] = _get_df() |
Alex | 3034ba5 | 2021-11-13 17:06:45 -0600 | [diff] [blame] | 438 | else: |
| 439 | logger_cli.error("ERROR: Unknown mode '{}'".format(self.mode)) |
| 440 | return False |
| 441 | |
| 442 | # Normal exit |
| 443 | logger_cli.info("# All benchmark tasks done") |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 444 | return True |
| 445 | |
| 446 | def cleanup(self): |
Alex | b212954 | 2021-11-23 15:49:42 -0600 | [diff] [blame^] | 447 | logger_cli.info("# Cleaning up") |
Alex | 2a7657c | 2021-11-10 20:51:34 -0600 | [diff] [blame] | 448 | self.cleanup_list.reverse() |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 449 | |
Alex | 2a7657c | 2021-11-10 20:51:34 -0600 | [diff] [blame] | 450 | for _res in self.cleanup_list: |
| 451 | self.master.cleanup_resource_by_name(_res[0], _res[2], ns=_res[1]) |
Alex | bfa947c | 2021-11-11 18:14:28 -0600 | [diff] [blame] | 452 | |
| 453 | # Wait for resource to be cleaned |
| 454 | _timeout = 120 |
| 455 | _total = len(self.cleanup_list) |
| 456 | logger_cli.info("-> Wait until {} resources cleaned".format(_total)) |
| 457 | _p = Progress(_total) |
| 458 | while True: |
| 459 | _g = self.master.get_resource_phase_by_name |
| 460 | _l = [_g(r[0], r[2], ns=r[1]) for r in self.cleanup_list] |
| 461 | _l = [item for item in _l if item] |
| 462 | _idx = _total - len(_l) |
| 463 | if len(_l) > 0: |
| 464 | _p.write_progress(_idx) |
| 465 | else: |
Alex | 3034ba5 | 2021-11-13 17:06:45 -0600 | [diff] [blame] | 466 | _p.write_progress(_idx) |
Alex | bfa947c | 2021-11-11 18:14:28 -0600 | [diff] [blame] | 467 | _p.end() |
| 468 | logger_cli.info("# Done cleaning up") |
| 469 | break |
| 470 | if _timeout > 0: |
| 471 | _timeout -= 1 |
| 472 | else: |
| 473 | _p.end() |
| 474 | logger_cli.info("# Timed out waiting after 120s.") |
| 475 | break |
| 476 | |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 477 | return |
| 478 | |
Alex | b212954 | 2021-11-23 15:49:42 -0600 | [diff] [blame^] | 479 | def collect_results(self, options): |
| 480 | _sch_time = options["scheduled_to"] |
Alex | 3034ba5 | 2021-11-13 17:06:45 -0600 | [diff] [blame] | 481 | logger_cli.info("# Collecting results") |
| 482 | # query agents for results |
| 483 | _agents = self.get_agents_resultlist() |
Alex | b212954 | 2021-11-23 15:49:42 -0600 | [diff] [blame^] | 484 | # Syntax shortcut |
| 485 | _ar = self.results[_sch_time]["agents"] |
Alex | 3034ba5 | 2021-11-13 17:06:45 -0600 | [diff] [blame] | 486 | |
| 487 | for _agent, _l in _agents.items(): |
Alex | b212954 | 2021-11-23 15:49:42 -0600 | [diff] [blame^] | 488 | # Create a syntax shortcut |
| 489 | if _agent not in _ar: |
| 490 | _ar[_agent] = {} |
| 491 | _arl = _ar[_agent] |
| 492 | # Check if we already have this locally |
| 493 | for _time in _l["resultlist"]: |
| 494 | _filename = self._get_dump_filename(_sch_time, _agent, options) |
| 495 | if os.path.exists(_filename): |
| 496 | # There is a file already for this task |
| 497 | # Check if we need to load it |
| 498 | if _sch_time in _arl: |
| 499 | logger_cli.info( |
| 500 | "-> Skipped already processed result '{}'".format( |
| 501 | _filename |
| 502 | ) |
| 503 | ) |
| 504 | else: |
| 505 | # Load previously dumped result from disk |
| 506 | logger_cli.info( |
| 507 | "-> Loading already present result '{}'".format( |
| 508 | _filename |
| 509 | ) |
| 510 | ) |
| 511 | _arl[_sch_time] = self.load_dumped_result(_filename) |
| 512 | else: |
| 513 | # Load result add it locally and dump it |
| 514 | logger_cli.info( |
| 515 | "-> Getting results for '{}' from '{}'".format( |
| 516 | _sch_time, |
| 517 | _agent |
| 518 | ) |
Alex | 3034ba5 | 2021-11-13 17:06:45 -0600 | [diff] [blame] | 519 | ) |
Alex | b212954 | 2021-11-23 15:49:42 -0600 | [diff] [blame^] | 520 | _r = self.get_result_from_agent(_agent, _time) |
| 521 | # Important to switch from result status time |
| 522 | # to scheduled time |
| 523 | _arl[_sch_time] = _r[_time] |
| 524 | # Dump collected result |
| 525 | self.dump_result(_filename, _arl[_sch_time]) |
Alex | 3034ba5 | 2021-11-13 17:06:45 -0600 | [diff] [blame] | 526 | return |
| 527 | |
Alex | b212954 | 2021-11-23 15:49:42 -0600 | [diff] [blame^] | 528 | def _get_dump_filename(self, _time, agent, options): |
| 529 | _dirname = _reformat_timestr(_time) |
| 530 | _filename = "-".join([ |
| 531 | _dirname, |
| 532 | agent, |
| 533 | options["readwrite"], |
| 534 | options["bs"], |
| 535 | str(options["iodepth"]), |
| 536 | ]) + ".json" |
| 537 | return os.path.join( |
| 538 | self.results_dump_path, |
| 539 | _dirname, |
| 540 | _filename |
| 541 | ) |
| 542 | |
| 543 | def dump_result(self, filename, data): |
| 544 | # Function dumps all available results as jsons to the given path |
Alex | 3034ba5 | 2021-11-13 17:06:45 -0600 | [diff] [blame] | 545 | # overwriting if needed |
Alex | b212954 | 2021-11-23 15:49:42 -0600 | [diff] [blame^] | 546 | _folder, _file = os.path.split(filename) |
| 547 | # Do dump |
| 548 | if not os.path.exists(_folder): |
| 549 | os.mkdir(_folder) |
| 550 | logger_cli.info("-> Created folder '{}'".format(_folder)) |
| 551 | # Dump agent data for this test run |
| 552 | write_str_to_file(filename, json.dumps(data, indent=2)) |
| 553 | logger_cli.info("-> Dumped '{}'".format(filename)) |
Alex | 3034ba5 | 2021-11-13 17:06:45 -0600 | [diff] [blame] | 554 | return |
| 555 | |
Alex | b212954 | 2021-11-23 15:49:42 -0600 | [diff] [blame^] | 556 | def load_dumped_result(self, filename): |
| 557 | try: |
| 558 | with open(filename, "rt+") as f: |
| 559 | return json.loads(f.read()) |
| 560 | except FileNotFoundError as e: |
| 561 | logger_cli.error( |
| 562 | "ERROR: {}".format(e) |
| 563 | ) |
| 564 | except TypeError as e: |
| 565 | logger_cli.error( |
| 566 | "ERROR: Invalid file ({}): {}".format(filename, e) |
| 567 | ) |
| 568 | except json.decoder.JSONDecodeError as e: |
| 569 | logger_cli.error( |
| 570 | "ERROR: Failed to decode json ({}): {}".format(filename, e) |
| 571 | ) |
| 572 | return None |
| 573 | |
| 574 | def _lookup_storage_class_id_by_name(self, storage_class_name): |
| 575 | # Assume that self had proper data |
| 576 | for _pool in self.ceph_df["pools"]: |
| 577 | if storage_class_name == _pool["name"]: |
| 578 | return _pool["id"] |
| 579 | return None |
| 580 | |
| 581 | def calculate_totals(self): |
| 582 | # Calculate totals for Read and Write |
| 583 | for _time, data in self.results.items(): |
| 584 | if "totals" not in data: |
| 585 | data["totals"] = {} |
| 586 | else: |
| 587 | continue |
| 588 | _totals = data["totals"] |
| 589 | _r_bw = 0 |
| 590 | _r_avglat = [] |
| 591 | _r_iops = 0 |
| 592 | _w_bw = 0 |
| 593 | _w_avglat = [] |
| 594 | _w_iops = 0 |
| 595 | for _a, _d in data["agents"].items(): |
| 596 | # Hardcoded number of jobs param :( |
| 597 | _j = _d[_time]["jobs"][0] |
| 598 | _r_bw += _j["read"]["bw_bytes"] |
| 599 | _r_avglat += [_j["read"]["lat_ns"]["mean"]] |
| 600 | _r_iops += _j["read"]["iops"] |
| 601 | _w_bw += _j["write"]["bw_bytes"] |
| 602 | _w_avglat += [_j["write"]["lat_ns"]["mean"]] |
| 603 | _w_iops += _j["write"]["iops"] |
| 604 | # Save storage class name |
| 605 | if "storage_class" not in _totals: |
| 606 | _totals["storage_class"] = \ |
| 607 | self.agent_results[_a]["storage_class"] |
| 608 | # Lookup storage class id and num_pg |
| 609 | _totals["storage_class_stats"] = \ |
| 610 | reporter.get_pool_stats_by_id( |
| 611 | self._lookup_storage_class_id_by_name( |
| 612 | self.agent_results[_a]["storage_class"] |
| 613 | ), |
| 614 | self.ceph_pg_dump |
| 615 | ) |
| 616 | |
| 617 | _totals["read_bw_bytes"] = _r_bw |
| 618 | _totals["read_avg_lat_us"] = \ |
| 619 | (sum(_r_avglat) / len(_r_avglat)) / 1000 |
| 620 | _totals["read_iops"] = _r_iops |
| 621 | _totals["write_bw_bytes"] = _w_bw |
| 622 | _totals["write_avg_lat_us"] = \ |
| 623 | (sum(_w_avglat) / len(_w_avglat)) / 1000 |
| 624 | _totals["write_iops"] = _w_iops |
| 625 | |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 626 | # Create report |
Alex | 2a7657c | 2021-11-10 20:51:34 -0600 | [diff] [blame] | 627 | def create_report(self, filename): |
Alex | b212954 | 2021-11-23 15:49:42 -0600 | [diff] [blame^] | 628 | """ |
| 629 | Create static html showing ceph info report |
| 630 | |
| 631 | :return: none |
| 632 | """ |
| 633 | logger_cli.info("### Generating report to '{}'".format(filename)) |
| 634 | _report = reporter.ReportToFile( |
| 635 | reporter.HTMLCephBench(self), |
| 636 | filename |
| 637 | ) |
| 638 | self.calculate_totals() |
| 639 | _report( |
| 640 | { |
| 641 | "results": self.results, |
| 642 | "idle_status": self.ceph_idle_status, |
| 643 | "health_detail": self.health_detail, |
| 644 | "ceph_df": self.ceph_df, |
| 645 | "ceph_pg_dump": self.ceph_pg_dump, |
| 646 | "info": self.ceph_info.ceph_info, |
| 647 | "cluster": self.ceph_info.cluster_info, |
| 648 | "ceph_version": self.ceph_info.ceph_version, |
| 649 | "nodes": self.agent_pods |
| 650 | } |
| 651 | ) |
| 652 | logger_cli.info("-> Done") |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 653 | |
| 654 | return |