| import csv |
| import os |
| import json |
| |
| from datetime import datetime, timedelta |
| from time import sleep |
| |
| from cfg_checker.common import logger_cli |
| # from cfg_checker.common.exception import InvalidReturnException |
| # from cfg_checker.common.exception import ConfigException |
| # from cfg_checker.common.exception import KubeException |
| |
| from cfg_checker.nodes import KubeNodes |
| from cfg_checker.agent.fio_runner import _get_seconds, _datetime_fmt |
| |
| |
| class CephBench(object): |
| _agent_template = "cfgagent-template.yaml" |
| |
| def __init__(self, config): |
| self.env_config = config |
| return |
| |
| |
| class SaltCephBench(CephBench): |
| def __init__( |
| self, |
| config |
| ): |
| logger_cli.error("ERROR: Not impelented for Salt environment!") |
| |
| # self.master = SaltNodes(config) |
| super(SaltCephBench, self).__init__(config) |
| return |
| |
| |
| class KubeCephBench(CephBench): |
| def __init__(self, config): |
| self.agent_count = config.bench_agent_count |
| self.master = KubeNodes(config) |
| super(KubeCephBench, self).__init__(config) |
| self.storage_class = config.bench_storage_class |
| self.agent_pods = [] |
| self.services = [] |
| self.api_urls = [] |
| self.mode = config.bench_mode |
| if config.bench_mode == "tasks": |
| self.load_tasks(config.bench_task_file) |
| |
| def load_tasks(self, taskfile): |
| # Load csv file |
| logger_cli.info("-> Loading taskfile '{}'".format(taskfile)) |
| self.tasks = [] |
| with open(taskfile) as f: |
| _reader = csv.reader(f, delimiter=',') |
| # load packages |
| for row in _reader: |
| self.tasks.append({ |
| "readwrite": row[0], |
| "rwmixread": row[1], |
| "bs": row[2], |
| "iodepth": row[3], |
| "size": row[4] |
| }) |
| |
| def prepare_agents(self, options): |
| logger_cli.info("# Preparing {} agents".format(self.agent_count)) |
| for idx in range(self.agent_count): |
| # create pvc/pv and pod |
| logger_cli.info("-> creating agent '{:02}'".format(idx)) |
| _agent, _pv, _pvc = self.master.prepare_benchmark_agent( |
| idx, |
| os.path.split(options["filename"])[0], |
| self.storage_class, |
| options['size'] + 'i', |
| self._agent_template |
| ) |
| # save it to list |
| self.agent_pods.append(_agent) |
| # expose it |
| _svc = self.master.expose_benchmark_agent(_agent) |
| # Save service |
| self.services.append(_svc) |
| |
| # Build urls for agents |
| for svc in self.services: |
| self.api_urls.append( |
| "http://{}:{}/api/".format( |
| svc.spec.cluster_ip, |
| 8765 |
| ) |
| ) |
| logger_cli.info("-> Done creating agents") |
| return |
| |
| def _poke_agent(self, url, body, action="GET"): |
| _datafile = "/tmp/data" |
| _data = [ |
| "--data-binary", |
| "@" + _datafile, |
| ] |
| _cmd = [ |
| "curl", |
| "-s", |
| "-H", |
| "'Content-Type: application/json'", |
| "-X", |
| action, |
| url |
| ] |
| if body: |
| _cmd += _data |
| _ret = self.master.prepare_json_in_pod( |
| self.agent_pods[0].metadata.name, |
| self.master._namespace, |
| body, |
| _datafile |
| ) |
| _ret = self.master.exec_cmd_on_target_pod( |
| self.agent_pods[0].metadata.name, |
| self.master._namespace, |
| " ".join(_cmd) |
| ) |
| try: |
| return json.loads(_ret) |
| except TypeError as e: |
| logger_cli.error( |
| "ERROR: Status not decoded: {}\n{}".format(e, _ret) |
| ) |
| except json.decoder.JSONDecodeError as e: |
| logger_cli.error( |
| "ERROR: Status not decoded: {}\n{}".format(e, _ret) |
| ) |
| |
| return None |
| |
| def run_benchmark(self, options): |
| def get_status(): |
| return [self._poke_agent(_u + "fio", {}) for _u in self.api_urls] |
| logger_cli.info("# Starting '{}' benchmark".format(self.mode)) |
| logger_cli.info("# Checking agents") |
| # make sure agents idle |
| _tt = [] |
| _rr = [] |
| for _s in get_status(): |
| if _s is None: |
| logger_cli.error("ERROR: Agent status not available") |
| return False |
| _h = _s["healthcheck"]["hostname"] |
| _t = _s['status'] |
| _r = _s["healthcheck"]["ready"] |
| if _t != "idle": |
| logger_cli.error("Agent status invalid {}:{}".format(_h, _t)) |
| _tt += [False] |
| else: |
| _tt += [True] |
| if not _r: |
| logger_cli.error("Agent is not ready {}".format(_h)) |
| _rr += [False] |
| else: |
| _rr += [True] |
| if not any(_tt) or not any(_rr): |
| return False |
| |
| # Make sure that Ceph is at low load |
| # TODO: Ceph status check |
| |
| # Do benchmark according to mode |
| if self.mode == "tasks": |
| # TODO: Implement 'tasks' mode |
| # take next task |
| |
| # update options |
| |
| # init time to schedule |
| |
| # send next task to agent |
| pass |
| # wait for agents to finish |
| elif self.mode == "single": |
| logger_cli.info("# Running benchmark") |
| # init time to schedule |
| _time = datetime.now() + timedelta(seconds=10) |
| _str_time = _time.strftime(_datetime_fmt) |
| options["scheduled_to"] = _str_time |
| logger_cli.info( |
| "-> next benchmark scheduled to '{}'".format(_str_time) |
| ) |
| # send single to agent |
| _task = { |
| "module": "fio", |
| "action": "do_singlerun", |
| "options": options |
| } |
| for _u in self.api_urls: |
| logger_cli.info("-> sending task to '{}'".format(_u)) |
| _ret = self._poke_agent(_u, _task, action="POST") |
| if 'error' in _ret: |
| logger_cli.error( |
| "ERROR: Agent returned: '{}'".format(_ret['error']) |
| ) |
| |
| _runtime = _get_seconds(options["runtime"]) |
| _ramptime = _get_seconds(options["ramp_time"]) |
| _timeout = _runtime + _ramptime + 5 |
| while _timeout > 0: |
| _sts = get_status() |
| _str = "" |
| for _s in _sts: |
| _str += "{}: {} ({}); ".format( |
| _s["healthcheck"]["hostname"], |
| _s["status"], |
| _s["progress"] |
| ) |
| logger_cli.debug("... {}".format(_str)) |
| sleep(1) |
| _timeout -= 1 |
| |
| return True |
| |
| def cleanup(self): |
| |
| return |
| |
| # Create report |
| def create_report(self): |
| |
| return |