Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 1 | import csv |
| 2 | import os |
| 3 | import json |
| 4 | |
| 5 | from datetime import datetime, timedelta |
| 6 | from time import sleep |
| 7 | |
Alex | dcb792f | 2021-10-04 14:24:21 -0500 | [diff] [blame] | 8 | from cfg_checker.common import logger_cli |
Alex | bfa947c | 2021-11-11 18:14:28 -0600 | [diff] [blame^] | 9 | from cfg_checker.helpers.console_utils import Progress |
Alex | dcb792f | 2021-10-04 14:24:21 -0500 | [diff] [blame] | 10 | # from cfg_checker.common.exception import InvalidReturnException |
| 11 | # from cfg_checker.common.exception import ConfigException |
| 12 | # from cfg_checker.common.exception import KubeException |
| 13 | |
| 14 | from cfg_checker.nodes import KubeNodes |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 15 | from cfg_checker.agent.fio_runner import _get_seconds, _datetime_fmt |
Alex | dcb792f | 2021-10-04 14:24:21 -0500 | [diff] [blame] | 16 | |
| 17 | |
| 18 | class CephBench(object): |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 19 | _agent_template = "cfgagent-template.yaml" |
| 20 | |
| 21 | def __init__(self, config): |
Alex | dcb792f | 2021-10-04 14:24:21 -0500 | [diff] [blame] | 22 | self.env_config = config |
| 23 | return |
| 24 | |
Alex | dcb792f | 2021-10-04 14:24:21 -0500 | [diff] [blame] | 25 | |
| 26 | class SaltCephBench(CephBench): |
| 27 | def __init__( |
| 28 | self, |
| 29 | config |
| 30 | ): |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 31 | logger_cli.error("ERROR: Not impelented for Salt environment!") |
Alex | dcb792f | 2021-10-04 14:24:21 -0500 | [diff] [blame] | 32 | |
| 33 | # self.master = SaltNodes(config) |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 34 | super(SaltCephBench, self).__init__(config) |
Alex | dcb792f | 2021-10-04 14:24:21 -0500 | [diff] [blame] | 35 | return |
| 36 | |
| 37 | |
| 38 | class KubeCephBench(CephBench): |
| 39 | def __init__(self, config): |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 40 | self.agent_count = config.bench_agent_count |
Alex | dcb792f | 2021-10-04 14:24:21 -0500 | [diff] [blame] | 41 | self.master = KubeNodes(config) |
| 42 | super(KubeCephBench, self).__init__(config) |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 43 | self.storage_class = config.bench_storage_class |
| 44 | self.agent_pods = [] |
| 45 | self.services = [] |
| 46 | self.api_urls = [] |
| 47 | self.mode = config.bench_mode |
| 48 | if config.bench_mode == "tasks": |
| 49 | self.load_tasks(config.bench_task_file) |
| 50 | |
Alex | 2a7657c | 2021-11-10 20:51:34 -0600 | [diff] [blame] | 51 | self.cleanup_list = [] |
| 52 | |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 53 | def load_tasks(self, taskfile): |
| 54 | # Load csv file |
| 55 | logger_cli.info("-> Loading taskfile '{}'".format(taskfile)) |
| 56 | self.tasks = [] |
| 57 | with open(taskfile) as f: |
| 58 | _reader = csv.reader(f, delimiter=',') |
| 59 | # load packages |
| 60 | for row in _reader: |
| 61 | self.tasks.append({ |
| 62 | "readwrite": row[0], |
| 63 | "rwmixread": row[1], |
| 64 | "bs": row[2], |
| 65 | "iodepth": row[3], |
| 66 | "size": row[4] |
| 67 | }) |
| 68 | |
Alex | 2a7657c | 2021-11-10 20:51:34 -0600 | [diff] [blame] | 69 | def add_for_deletion(self, obj, typ): |
| 70 | _d = [typ, obj.metadata.namespace, obj.metadata.name] |
| 71 | self.cleanup_list.append(_d) |
| 72 | return |
| 73 | |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 74 | def prepare_agents(self, options): |
| 75 | logger_cli.info("# Preparing {} agents".format(self.agent_count)) |
| 76 | for idx in range(self.agent_count): |
| 77 | # create pvc/pv and pod |
| 78 | logger_cli.info("-> creating agent '{:02}'".format(idx)) |
| 79 | _agent, _pv, _pvc = self.master.prepare_benchmark_agent( |
| 80 | idx, |
| 81 | os.path.split(options["filename"])[0], |
| 82 | self.storage_class, |
| 83 | options['size'] + 'i', |
| 84 | self._agent_template |
| 85 | ) |
Alex | 2a7657c | 2021-11-10 20:51:34 -0600 | [diff] [blame] | 86 | # save it to lists |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 87 | self.agent_pods.append(_agent) |
Alex | 2a7657c | 2021-11-10 20:51:34 -0600 | [diff] [blame] | 88 | self.add_for_deletion(_pv, "pv") |
| 89 | self.add_for_deletion(_pvc, "pvc") |
| 90 | self.add_for_deletion(_agent, "pod") |
| 91 | |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 92 | # expose it |
| 93 | _svc = self.master.expose_benchmark_agent(_agent) |
Alex | 2a7657c | 2021-11-10 20:51:34 -0600 | [diff] [blame] | 94 | self.add_for_deletion(_svc, "svc") |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 95 | # Save service |
| 96 | self.services.append(_svc) |
| 97 | |
| 98 | # Build urls for agents |
| 99 | for svc in self.services: |
| 100 | self.api_urls.append( |
| 101 | "http://{}:{}/api/".format( |
| 102 | svc.spec.cluster_ip, |
| 103 | 8765 |
| 104 | ) |
| 105 | ) |
| 106 | logger_cli.info("-> Done creating agents") |
| 107 | return |
| 108 | |
| 109 | def _poke_agent(self, url, body, action="GET"): |
| 110 | _datafile = "/tmp/data" |
| 111 | _data = [ |
Alex | bfa947c | 2021-11-11 18:14:28 -0600 | [diff] [blame^] | 112 | "-d", |
| 113 | "@" + _datafile |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 114 | ] |
| 115 | _cmd = [ |
| 116 | "curl", |
| 117 | "-s", |
| 118 | "-H", |
| 119 | "'Content-Type: application/json'", |
| 120 | "-X", |
| 121 | action, |
| 122 | url |
| 123 | ] |
| 124 | if body: |
| 125 | _cmd += _data |
| 126 | _ret = self.master.prepare_json_in_pod( |
| 127 | self.agent_pods[0].metadata.name, |
| 128 | self.master._namespace, |
| 129 | body, |
| 130 | _datafile |
| 131 | ) |
| 132 | _ret = self.master.exec_cmd_on_target_pod( |
| 133 | self.agent_pods[0].metadata.name, |
| 134 | self.master._namespace, |
| 135 | " ".join(_cmd) |
| 136 | ) |
| 137 | try: |
| 138 | return json.loads(_ret) |
| 139 | except TypeError as e: |
| 140 | logger_cli.error( |
| 141 | "ERROR: Status not decoded: {}\n{}".format(e, _ret) |
| 142 | ) |
| 143 | except json.decoder.JSONDecodeError as e: |
| 144 | logger_cli.error( |
| 145 | "ERROR: Status not decoded: {}\n{}".format(e, _ret) |
| 146 | ) |
| 147 | |
| 148 | return None |
| 149 | |
| 150 | def run_benchmark(self, options): |
| 151 | def get_status(): |
| 152 | return [self._poke_agent(_u + "fio", {}) for _u in self.api_urls] |
| 153 | logger_cli.info("# Starting '{}' benchmark".format(self.mode)) |
| 154 | logger_cli.info("# Checking agents") |
| 155 | # make sure agents idle |
| 156 | _tt = [] |
| 157 | _rr = [] |
| 158 | for _s in get_status(): |
| 159 | if _s is None: |
| 160 | logger_cli.error("ERROR: Agent status not available") |
| 161 | return False |
| 162 | _h = _s["healthcheck"]["hostname"] |
| 163 | _t = _s['status'] |
| 164 | _r = _s["healthcheck"]["ready"] |
Alex | bfa947c | 2021-11-11 18:14:28 -0600 | [diff] [blame^] | 165 | if _t not in ["idle", "finished"]: |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 166 | logger_cli.error("Agent status invalid {}:{}".format(_h, _t)) |
| 167 | _tt += [False] |
| 168 | else: |
| 169 | _tt += [True] |
| 170 | if not _r: |
| 171 | logger_cli.error("Agent is not ready {}".format(_h)) |
| 172 | _rr += [False] |
| 173 | else: |
| 174 | _rr += [True] |
| 175 | if not any(_tt) or not any(_rr): |
| 176 | return False |
| 177 | |
| 178 | # Make sure that Ceph is at low load |
| 179 | # TODO: Ceph status check |
| 180 | |
| 181 | # Do benchmark according to mode |
| 182 | if self.mode == "tasks": |
| 183 | # TODO: Implement 'tasks' mode |
| 184 | # take next task |
| 185 | |
| 186 | # update options |
| 187 | |
| 188 | # init time to schedule |
| 189 | |
| 190 | # send next task to agent |
| 191 | pass |
| 192 | # wait for agents to finish |
| 193 | elif self.mode == "single": |
| 194 | logger_cli.info("# Running benchmark") |
| 195 | # init time to schedule |
| 196 | _time = datetime.now() + timedelta(seconds=10) |
| 197 | _str_time = _time.strftime(_datetime_fmt) |
| 198 | options["scheduled_to"] = _str_time |
| 199 | logger_cli.info( |
| 200 | "-> next benchmark scheduled to '{}'".format(_str_time) |
| 201 | ) |
| 202 | # send single to agent |
| 203 | _task = { |
| 204 | "module": "fio", |
| 205 | "action": "do_singlerun", |
| 206 | "options": options |
| 207 | } |
| 208 | for _u in self.api_urls: |
| 209 | logger_cli.info("-> sending task to '{}'".format(_u)) |
| 210 | _ret = self._poke_agent(_u, _task, action="POST") |
| 211 | if 'error' in _ret: |
| 212 | logger_cli.error( |
| 213 | "ERROR: Agent returned: '{}'".format(_ret['error']) |
| 214 | ) |
| 215 | |
| 216 | _runtime = _get_seconds(options["runtime"]) |
| 217 | _ramptime = _get_seconds(options["ramp_time"]) |
| 218 | _timeout = _runtime + _ramptime + 5 |
Alex | 2a7657c | 2021-11-10 20:51:34 -0600 | [diff] [blame] | 219 | _end = datetime.now() + timedelta(seconds=_timeout) |
| 220 | while True: |
| 221 | # Print status |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 222 | _sts = get_status() |
| 223 | _str = "" |
| 224 | for _s in _sts: |
| 225 | _str += "{}: {} ({}); ".format( |
| 226 | _s["healthcheck"]["hostname"], |
| 227 | _s["status"], |
| 228 | _s["progress"] |
| 229 | ) |
Alex | 2a7657c | 2021-11-10 20:51:34 -0600 | [diff] [blame] | 230 | # recalc how much is left |
| 231 | diff = (_end - datetime.now()).total_seconds() |
Alex | bfa947c | 2021-11-11 18:14:28 -0600 | [diff] [blame^] | 232 | logger_cli.debug("... [{:.2f}s]: {}".format(diff, _str)) |
Alex | 2a7657c | 2021-11-10 20:51:34 -0600 | [diff] [blame] | 233 | # In case end_datetime was in past to begin with |
| 234 | if diff < 0: |
| 235 | break |
Alex | bfa947c | 2021-11-11 18:14:28 -0600 | [diff] [blame^] | 236 | logger_cli.info("-> Sleeping for {:.2f}s".format(diff/2)) |
Alex | 2a7657c | 2021-11-10 20:51:34 -0600 | [diff] [blame] | 237 | sleep(diff/2) |
| 238 | if diff <= 0.1: |
| 239 | break |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 240 | |
Alex | 2a7657c | 2021-11-10 20:51:34 -0600 | [diff] [blame] | 241 | logger_cli.info("-> Done") |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 242 | return True |
| 243 | |
| 244 | def cleanup(self): |
Alex | 2a7657c | 2021-11-10 20:51:34 -0600 | [diff] [blame] | 245 | self.cleanup_list.reverse() |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 246 | |
Alex | 2a7657c | 2021-11-10 20:51:34 -0600 | [diff] [blame] | 247 | for _res in self.cleanup_list: |
| 248 | self.master.cleanup_resource_by_name(_res[0], _res[2], ns=_res[1]) |
Alex | bfa947c | 2021-11-11 18:14:28 -0600 | [diff] [blame^] | 249 | |
| 250 | # Wait for resource to be cleaned |
| 251 | _timeout = 120 |
| 252 | _total = len(self.cleanup_list) |
| 253 | logger_cli.info("-> Wait until {} resources cleaned".format(_total)) |
| 254 | _p = Progress(_total) |
| 255 | while True: |
| 256 | _g = self.master.get_resource_phase_by_name |
| 257 | _l = [_g(r[0], r[2], ns=r[1]) for r in self.cleanup_list] |
| 258 | _l = [item for item in _l if item] |
| 259 | _idx = _total - len(_l) |
| 260 | if len(_l) > 0: |
| 261 | _p.write_progress(_idx) |
| 262 | else: |
| 263 | _p.end() |
| 264 | logger_cli.info("# Done cleaning up") |
| 265 | break |
| 266 | if _timeout > 0: |
| 267 | _timeout -= 1 |
| 268 | else: |
| 269 | _p.end() |
| 270 | logger_cli.info("# Timed out waiting after 120s.") |
| 271 | break |
| 272 | |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 273 | return |
| 274 | |
| 275 | # Create report |
Alex | 2a7657c | 2021-11-10 20:51:34 -0600 | [diff] [blame] | 276 | def create_report(self, filename): |
Alex | 5cace3b | 2021-11-10 16:40:37 -0600 | [diff] [blame] | 277 | |
| 278 | return |