| # Author: Alex Savatieiev (osavatieiev@mirantis.com; a.savex@gmail.com) |
| # Copyright 2019-2022 Mirantis, Inc. |
| import csv |
| import os |
| import json |
| |
| from copy import deepcopy |
| from datetime import datetime, timedelta, timezone |
| |
| from cfg_checker.common import logger_cli |
| from cfg_checker.common.decorators import retry |
| from cfg_checker.common.file_utils import write_str_to_file |
| from cfg_checker.helpers.console_utils import Progress |
| from cfg_checker.reports import reporter |
| # from cfg_checker.common.exception import InvalidReturnException |
| # from cfg_checker.common.exception import ConfigException |
| # from cfg_checker.common.exception import KubeException |
| |
| from cfg_checker.nodes import KubeNodes |
| from cfg_checker.agent.fio_runner import _get_seconds, _datetime_fmt |
| |
| |
| _file_datetime_fmt = "%m%d%Y%H%M%S%z" |
| |
| |
| def _reformat_timestr(_str, _chars=["/", ",", " ", ":", "+"], _tchar=""): |
| _new = "" |
| for _c in _str: |
| _new += _c if _c not in _chars else _tchar |
| return _new |
| |
| |
| def _parse_json_output(buffer): |
| try: |
| return json.loads(buffer) |
| except TypeError as e: |
| logger_cli.error( |
| "ERROR: Status not decoded: {}\n{}".format(e, buffer) |
| ) |
| except json.decoder.JSONDecodeError as e: |
| logger_cli.error( |
| "ERROR: Status not decoded: {}\n{}".format(e, buffer) |
| ) |
| return {} |
| |
| |
| def _split_vol_size(size): |
| # I know, but it is faster then regex |
| _numbers = [48, 49, 50, 51, 52, 53, 54, 55, 56, 57] |
| _s_int = "0" |
| _s_type = "" |
| for ch in size: |
| if ord(ch) in _numbers: |
| _s_int += ch |
| else: |
| _s_type += ch |
| return int(_s_int), _s_type |
| |
| |
| class CephBench(object): |
| _agent_template = "cfgagent-template.yaml" |
| |
| def __init__(self, config): |
| self.env_config = config |
| return |
| |
| |
| class SaltCephBench(CephBench): |
| def __init__( |
| self, |
| config |
| ): |
| logger_cli.error("ERROR: Not impelented for Salt environment!") |
| |
| # self.master = SaltNodes(config) |
| super(SaltCephBench, self).__init__(config) |
| return |
| |
| |
| class KubeCephBench(CephBench): |
| def __init__(self, config): |
| self.agent_count = config.bench_agent_count |
| self.master = KubeNodes(config) |
| super(KubeCephBench, self).__init__(config) |
| |
| self.mode = config.bench_mode |
| self.resource_prefix = config.resource_prefix |
| |
| if config.bench_mode == "tasks": |
| self.taskfile = config.bench_task_file |
| self.load_tasks(self.taskfile) |
| |
| if config.bench_mode == "cleanup": |
| self.cleanup_list = [] |
| return |
| |
| self.bench_name = config.bench_name |
| self.results_dump_path = config.bench_results_dump_path |
| self.results = {} |
| self.agent_results = {} |
| self.cleanup_list = [] |
| self.agent_pods = [] |
| |
| if config.bench_mode == "report": |
| self.results = {} |
| return |
| |
| self.storage_class = config.bench_storage_class |
| self.services = [] |
| # By default, |
| # 30 seconds should be enough to send tasks to 3-5 agents |
| self.scheduled_delay = 30 |
| |
| def set_ceph_info_class(self, ceph_info): |
| self.ceph_info = ceph_info |
| |
| def load_tasks(self, taskfile): |
| # Load csv file |
| logger_cli.info("-> Loading taskfile '{}'".format(taskfile)) |
| self.tasks = [] |
| with open(taskfile) as f: |
| _reader = csv.reader(f, delimiter=',') |
| # load packages |
| for row in _reader: |
| self.tasks.append({ |
| "readwrite": row[0], |
| "rwmixread": row[1], |
| "bs": row[2], |
| "iodepth": row[3], |
| "size": row[4] |
| }) |
| logger_cli.info("-> Loaded {} tasks".format(len(self.tasks))) |
| |
| def add_for_deletion(self, obj, typ): |
| self.cleanup_list.append( |
| [ |
| typ, |
| obj.metadata.namespace, |
| obj.metadata.name |
| ] |
| ) |
| return |
| |
| def prepare_cleanup(self): |
| # Assume number of resources not given |
| # list all svc, pod, pvc, pv and identify 'cfgagent-xx ones |
| _types = ["pv", "pvc", "pod", "svc"] |
| _prefix = self.resource_prefix |
| for _typ in _types: |
| _list = self.master.list_resource_names_by_type_and_ns(_typ) |
| for ns, name in _list: |
| if name.startswith(_prefix): |
| if ns: |
| _msg = "{} {}/{}".format(_typ, ns, name) |
| else: |
| _msg = "{} {}".format(_typ, name) |
| logger_cli.info("-> Found {}".format(_msg)) |
| self.cleanup_list.append([_typ, ns, name]) |
| return |
| |
| def prepare_agents(self, options): |
| logger_cli.info("# Preparing {} agents".format(self.agent_count)) |
| # Increase volume size a bit, so datafile fits |
| _quanitizer = 1.3 |
| _v_size, _vol_size_units = _split_vol_size(options['size']) |
| _v_size = round(_v_size * _quanitizer) |
| _vol_size = str(_v_size) + _vol_size_units + "i" |
| logger_cli.info( |
| "-> Testfile size: {0}, Volume size: {1} ({0}*{2})".format( |
| options['size'], |
| _vol_size, |
| _quanitizer |
| ) |
| ) |
| # Start preparing |
| for idx in range(self.agent_count): |
| # create pvc/pv and pod |
| logger_cli.info("-> creating agent '{:02}'".format(idx)) |
| # _agent, _pv, _pvc = self.master.prepare_benchmark_agent( |
| _agent, _pvc = self.master.prepare_benchmark_agent( |
| idx, |
| os.path.split(options["filename"])[0], |
| self.storage_class, |
| _vol_size, |
| self._agent_template |
| ) |
| # save it to lists |
| self.agent_pods.append(_agent) |
| # self.add_for_deletion(_pv, "pv") |
| self.add_for_deletion(_pvc, "pvc") |
| self.add_for_deletion(_agent, "pod") |
| |
| # expose it |
| _svc = self.master.expose_benchmark_agent(_agent) |
| self.add_for_deletion(_svc, "svc") |
| # Save service |
| self.services.append(_svc) |
| # prepopulate results |
| self.agent_results[_agent.metadata.name] = {} |
| self.agent_results[_agent.metadata.name]["url"] = \ |
| "http://{}:{}/api/".format( |
| _svc.spec.cluster_ip, |
| 8765 |
| ) |
| self.agent_results[_agent.metadata.name]["storage_class"] = \ |
| self.storage_class |
| self.agent_results[_agent.metadata.name]["volume_size"] = \ |
| options['size'] |
| |
| logger_cli.info("-> Done creating agents") |
| # TODO: Update after implementing pooled task sending |
| self.scheduled_delay = self.agent_count * 10 |
| logger_cli.info( |
| "-> Schedule delay set to {} sec".format(self.scheduled_delay) |
| ) |
| return |
| |
| def _poke_agent(self, url, body, action="GET"): |
| _datafile = "/tmp/data" |
| _data = [ |
| "-d", |
| "@" + _datafile |
| ] |
| _cmd = [ |
| "curl", |
| "-s", |
| "-H", |
| "'Content-Type: application/json'", |
| "-X", |
| action, |
| url |
| ] |
| if body: |
| _cmd += _data |
| _ret = self.master.prepare_json_in_pod( |
| self.agent_pods[0].metadata.name, |
| self.master._namespace, |
| body, |
| _datafile |
| ) |
| _ret = self.master.exec_cmd_on_target_pod( |
| self.agent_pods[0].metadata.name, |
| self.master._namespace, |
| " ".join(_cmd) |
| ) |
| return _parse_json_output(_ret) |
| |
| def _ensure_agents_ready(self): |
| # make sure agents idle |
| _status_set = [] |
| _ready_set = [] |
| for _agent, _d in self.get_agents_status().items(): |
| # obviously, there should be some answer |
| if _d is None: |
| logger_cli.error("ERROR: Agent status not available") |
| return False |
| # status should be idle or finished |
| if _d['status'] not in ["idle", "finished"]: |
| logger_cli.error( |
| "Agent status invalid {}:{}".format(_agent, _d['status']) |
| ) |
| _status_set += [False] |
| else: |
| # Good agent |
| _status_set += [True] |
| # agent's fio shell should be in 'ready' |
| if not _d["healthcheck"]["ready"]: |
| logger_cli.error("Agent is not ready {}".format(_agent)) |
| _ready_set += [False] |
| else: |
| # 'fio' shell for agent is ready |
| _ready_set += [True] |
| # all agent's statuses should be True |
| # and all 'fio' shell modules should be 'ready' |
| if not any(_status_set) or not any(_ready_set): |
| # At least one is not ready and it was already logged above |
| return False |
| else: |
| # All is good |
| return True |
| |
| def get_agents_status(self): |
| _status = {} |
| _results = self.master.exec_on_labeled_pods_and_ns( |
| "app=cfgagent", |
| "curl -s http://localhost:8765/api/fio" |
| ) |
| for _agent, _result in _results.items(): |
| _j = _parse_json_output(_result) |
| _status[_agent] = _j |
| return _status |
| |
| @retry(Exception, initial_wait=5) |
| def get_agents_resultlist(self): |
| _t = {"module": "fio", "action": "get_resultlist"} |
| _status = {} |
| for _agent, _d in self.agent_results.items(): |
| _status[_agent] = self._poke_agent(_d["url"], _t, action="POST") |
| return _status |
| |
| @retry(Exception, initial_wait=5) |
| def get_result_from_agent(self, agent, time): |
| _t = { |
| "module": "fio", |
| "action": "get_result", |
| "options": { |
| "time": time |
| } |
| } |
| return self._poke_agent( |
| self.agent_results[agent]["url"], |
| _t, |
| action="POST" |
| ) |
| |
| def _get_next_scheduled_time(self): |
| _now = datetime.now(timezone.utc) |
| logger_cli.info("-> time is '{}'".format(_now.strftime(_datetime_fmt))) |
| self.next_scheduled_time = _now + timedelta( |
| seconds=self.scheduled_delay |
| ) |
| _str_time = self.next_scheduled_time.strftime(_datetime_fmt) |
| logger_cli.info( |
| "-> next benchmark scheduled to '{}'".format(_str_time) |
| ) |
| return _str_time |
| |
| def _send_scheduled_task(self, options): |
| _task = { |
| "module": "fio", |
| "action": "do_scheduledrun", |
| "options": options |
| } |
| for _agent, _d in self.agent_results.items(): |
| logger_cli.info( |
| "-> sending task to '{}:{}'".format(_agent, _d["url"]) |
| ) |
| _ret = self._poke_agent(_d["url"], _task, action="POST") |
| if 'error' in _ret: |
| logger_cli.error( |
| "ERROR: Agent returned: '{}'".format(_ret['error']) |
| ) |
| return False |
| # No errors detected |
| return True |
| |
| def track_benchmark(self, options): |
| _runtime = _get_seconds(options["runtime"]) |
| _ramptime = _get_seconds(options["ramp_time"]) |
| # Sum up all timings that we must wait and double it |
| _timeout = (self.scheduled_delay + _runtime + _ramptime) * 2 |
| # We should have no more than 65 measurements |
| _stats_delay = round((_runtime + _ramptime) / 65) |
| _start = self.next_scheduled_time |
| _end = datetime.now(timezone.utc) + timedelta(seconds=_timeout) |
| while True: |
| # Print status |
| _sts = self.get_agents_status() |
| diff = (_end - datetime.now(timezone.utc)).total_seconds() |
| logger_cli.info("-> {:.2f}s left. Agent status:".format(diff)) |
| for _agent, _status in _sts.items(): |
| logger_cli.info( |
| "\t{}: {} ({}%)".format( |
| _agent, |
| _status["status"], |
| _status["progress"] |
| ) |
| ) |
| # Get Ceph status if _start time passed |
| _elapsed = (datetime.now(timezone.utc) - _start).total_seconds() |
| if _elapsed > _stats_delay: |
| logger_cli.info("-> {:.2f}s elapsed".format(_elapsed)) |
| _sec = "{:0.1f}".format(_elapsed) |
| self.results[options["scheduled_to"]]["ceph"][_sec] = \ |
| self.ceph_info.get_cluster_status() |
| # Check if agents finished |
| finished = [True for _s in _sts.values() |
| if _s["status"] == 'finished'] |
| _fcnt = len(finished) |
| _tcnt = len(_sts) |
| if _fcnt < _tcnt: |
| logger_cli.info("-> {}/{} finished".format(_fcnt, _tcnt)) |
| else: |
| logger_cli.info("-> All agents finished run") |
| return True |
| # recalc how much is left |
| diff = (_end - datetime.now(timezone.utc)).total_seconds() |
| # In case end_datetime was in past to begin with |
| if diff < 0: |
| logger_cli.info("-> Timed out waiting for agents to finish") |
| return False |
| |
| def _do_testrun(self, options): |
| self.results[options["scheduled_to"]]["osd_df_before"] = \ |
| self.ceph_info.get_ceph_osd_df() |
| # send single to agent |
| if not self._send_scheduled_task(options): |
| return False |
| # Track this benchmark progress |
| if not self.track_benchmark(options): |
| return False |
| else: |
| logger_cli.info("-> Finished testrun. Collecting results...") |
| # get ceph osd stats |
| self.results[options["scheduled_to"]]["osd_df_after"] = \ |
| self.ceph_info.get_ceph_osd_df() |
| # Get results for each agent |
| self.collect_results() |
| logger_cli.info("-> Calculating totals and averages") |
| self.calculate_totals() |
| self.calculate_ceph_stats() |
| self.osd_df_compare(options["scheduled_to"]) |
| logger_cli.info("-> Dumping results") |
| for _time, _d in self.results.items(): |
| self.dump_result( |
| self._get_dump_filename(_time), |
| _d |
| ) |
| return True |
| |
| def wait_ceph_cooldown(self): |
| # TODO: Query Ceph ince a 20 sec to make sure its load dropped |
| |
| # get ceph idle status |
| self.ceph_idle_status = self.ceph_info.get_cluster_status() |
| self.health_detail = self.ceph_info.get_health_detail() |
| self.ceph_df = self.ceph_info.get_ceph_df() |
| self.ceph_pg_dump = self.ceph_info.get_ceph_pg_dump() |
| return |
| |
| def run_benchmark(self, options): |
| logger_cli.info("# Starting '{}' benchmark".format(self.mode)) |
| # Check agent readyness |
| logger_cli.info("# Checking agents") |
| if not self._ensure_agents_ready(): |
| return False |
| |
| # Make sure that Ceph is at low load |
| # TODO: Ceph status check |
| # self._wait_ceph_cooldown() |
| |
| # Do benchmark according to mode |
| if self.mode == "tasks": |
| logger_cli.info( |
| "# Running benchmark with tasks from '{}'".format( |
| self.taskfile |
| ) |
| ) |
| # take next task |
| _total_tasks = len(self.tasks) |
| for idx in range(_total_tasks): |
| # init time to schedule |
| _task = self.tasks[idx] |
| _r = self.results |
| logger_cli.info( |
| "-> Starting next task ({}/{})".format(idx+1, _total_tasks) |
| ) |
| logger_cli.info("-> Updating options with: {}".format( |
| ", ".join( |
| ["{} = {}".format(k, v) for k, v in _task.items()] |
| ) |
| ) |
| ) |
| # update options |
| options.update(_task) |
| # Check if such result already exists |
| o = "input_options" |
| _existing = filter( |
| lambda t: |
| _r[t]["id"] == idx and |
| _r[t]["mode"] == "tasks" and |
| _r[t][o]["readwrite"] == options["readwrite"] and |
| _r[t][o]["rwmixread"] == options["rwmixread"] and |
| _r[t][o]["bs"] == options["bs"] and |
| _r[t][o]["iodepth"] == options["iodepth"] and |
| _r[t][o]["size"] == options["size"], |
| _r |
| ) |
| if len(list(_existing)) > 0: |
| logger_cli.info( |
| "-> Skipped already performed task from {}: " |
| "line {}, {}({}), {}, {}, {}".format( |
| self.taskfile, |
| idx, |
| options["readwrite"], |
| options["rwmixread"], |
| options["bs"], |
| options["iodepth"], |
| options["size"] |
| ) |
| ) |
| continue |
| _sch_time = self._get_next_scheduled_time() |
| options["scheduled_to"] = _sch_time |
| # init results table |
| _r[_sch_time] = { |
| "id": idx, |
| "mode": self.mode, |
| "input_options": deepcopy(options), |
| "agents": {}, |
| "ceph": {} |
| } |
| # exit on error |
| if not self._do_testrun(options): |
| return False |
| # Save ceph osd stats and wait cooldown |
| self.wait_ceph_cooldown() |
| elif self.mode == "single": |
| logger_cli.info("# Running single benchmark") |
| # init time to schedule |
| _sch_time = self._get_next_scheduled_time() |
| options["scheduled_to"] = _sch_time |
| # init results table |
| self.results[_sch_time] = { |
| "id": "{:2}".format(0), |
| "input_options": options, |
| "agents": {}, |
| "ceph": {} |
| } |
| if not self._do_testrun(options): |
| return False |
| # Save ceph osd stats |
| else: |
| logger_cli.error("ERROR: Unknown mode '{}'".format(self.mode)) |
| return False |
| |
| # Normal exit |
| logger_cli.info("# All benchmark tasks done") |
| return True |
| |
| def cleanup(self): |
| logger_cli.info("# Cleaning up") |
| self.cleanup_list.reverse() |
| |
| for _res in self.cleanup_list: |
| self.master.cleanup_resource_by_name(_res[0], _res[2], ns=_res[1]) |
| |
| # Wait for resource to be cleaned |
| _timeout = 120 |
| _total = len(self.cleanup_list) |
| logger_cli.info("-> Wait until {} resources cleaned".format(_total)) |
| _p = Progress(_total) |
| while True: |
| _g = self.master.get_resource_phase_by_name |
| _l = [_g(r[0], r[2], ns=r[1]) for r in self.cleanup_list] |
| _l = [item for item in _l if item] |
| _idx = _total - len(_l) |
| if len(_l) > 0: |
| _p.write_progress(_idx) |
| else: |
| _p.write_progress(_idx) |
| _p.end() |
| logger_cli.info("# Done cleaning up") |
| break |
| if _timeout > 0: |
| _timeout -= 1 |
| else: |
| _p.end() |
| logger_cli.info("# Timed out waiting after 120s.") |
| break |
| |
| return |
| |
| def collect_results(self): |
| logger_cli.info("# Collecting results") |
| # query agents for results |
| _agents = self.get_agents_resultlist() |
| for _agent, _l in _agents.items(): |
| # Check if we already have this locally |
| for _time in _l["resultlist"]: |
| # There is a file already for this task/time |
| # Check if we need to load it |
| if _time not in self.results: |
| # Some older results found |
| # do not process them |
| logger_cli.info( |
| "-> Skipped old results for '{}'".format(_time) |
| ) |
| continue |
| elif _agent not in self.results[_time]["agents"]: |
| # Load result add it locally |
| logger_cli.info( |
| "-> Getting results for '{}' from '{}'".format( |
| _time, |
| _agent |
| ) |
| ) |
| _r = self.get_result_from_agent(_agent, _time) |
| self.results[_time]["agents"][_agent] = _r[_time] |
| else: |
| # Should never happen, actually |
| logger_cli.info( |
| "-> Skipped loaded result for '{}' from '{}'".format( |
| _time, |
| _agent |
| ) |
| ) |
| |
| def _get_dump_filename(self, _time): |
| _r = self.results[_time] |
| _dirname = _r["input_options"]["name"] |
| _filename = "-".join([ |
| _reformat_timestr(_time), |
| "{:02}".format(len(_r["agents"])), |
| _r["input_options"]["readwrite"], |
| _r["input_options"]["bs"], |
| str(_r["input_options"]["iodepth"]), |
| ]) + ".json" |
| return os.path.join( |
| self.results_dump_path, |
| _dirname, |
| _filename |
| ) |
| |
| def preload_results(self): |
| logger_cli.info( |
| "# Preloading results for '{}'".format(self.bench_name) |
| ) |
| # get all dirs in folder |
| _p = self.results_dump_path |
| if not os.path.isdir(_p): |
| logger_cli.warn( |
| "WARNING: Dump path is not a folder '{}'".format(_p) |
| ) |
| return |
| for path, dirs, files in os.walk(_p): |
| if path == os.path.join(_p, self.bench_name): |
| logger_cli.info("-> Folder found '{}'".format(path)) |
| for _fname in files: |
| logger_cli.debug("... processing '{}'".format(_fname)) |
| _ext = _fname.split('.')[-1] |
| if _ext != "json": |
| logger_cli.info( |
| "-> Extension invalid '{}', " |
| "'json' is expected".format(_ext) |
| ) |
| continue |
| # get time from filename |
| # Ugly, but works |
| _t = _fname.split('-')[0] |
| _str_time = _t[:14] + "+" + _t[14:] |
| _t = datetime.strptime(_str_time, _file_datetime_fmt) |
| _time = _t.strftime(_datetime_fmt) |
| self.results[_time] = self.load_dumped_result( |
| os.path.join(path, _fname) |
| ) |
| logger_cli.info( |
| "-> Loaded '{}' as '{}'".format( |
| _fname, |
| _time |
| ) |
| ) |
| |
| def dump_result(self, filename, data): |
| # Function dumps all available results as jsons to the given path |
| # overwriting if needed |
| _folder, _file = os.path.split(filename) |
| # Do dump |
| if not os.path.exists(_folder): |
| os.mkdir(_folder) |
| logger_cli.info("-> Created folder '{}'".format(_folder)) |
| # Dump agent data for this test run |
| write_str_to_file(filename, json.dumps(data, indent=2)) |
| logger_cli.info("-> Dumped '{}'".format(filename)) |
| return |
| |
| def load_dumped_result(self, filename): |
| try: |
| with open(filename, "rt+") as f: |
| return json.loads(f.read()) |
| except FileNotFoundError as e: |
| logger_cli.error( |
| "ERROR: {}".format(e) |
| ) |
| except TypeError as e: |
| logger_cli.error( |
| "ERROR: Invalid file ({}): {}".format(filename, e) |
| ) |
| except json.decoder.JSONDecodeError as e: |
| logger_cli.error( |
| "ERROR: Failed to decode json ({}): {}".format(filename, e) |
| ) |
| return None |
| |
| def _lookup_storage_class_id_by_name(self, storage_class_name): |
| # Assume that self had proper data |
| for _pool in self.ceph_df["pools"]: |
| if storage_class_name == _pool["name"]: |
| return _pool["id"] |
| return None |
| |
| def calculate_totals(self): |
| # Calculate totals for Read and Write |
| for _time, data in self.results.items(): |
| if "totals" not in data: |
| data["totals"] = {} |
| else: |
| continue |
| _totals = data["totals"] |
| _r_bw = 0 |
| _r_avglat = [] |
| _r_95clat = [] |
| _r_iops = 0 |
| _w_bw = 0 |
| _w_avglat = [] |
| _w_95clat = [] |
| _w_iops = 0 |
| for _a, _d in data["agents"].items(): |
| # Hardcoded number of jobs param :( |
| _j = _d["jobs"][0] |
| _r_bw += _j["read"]["bw_bytes"] |
| _r_avglat += [_j["read"]["lat_ns"]["mean"]] |
| _r_95clat += [_j["read"]["clat_ns"]["percentile"]["95.000000"]] |
| _r_iops += _j["read"]["iops"] |
| _w_bw += _j["write"]["bw_bytes"] |
| _w_avglat += [_j["write"]["lat_ns"]["mean"]] |
| _w_95clat += \ |
| [_j["write"]["clat_ns"]["percentile"]["95.000000"]] |
| _w_iops += _j["write"]["iops"] |
| # Save storage class name |
| if "storage_class" not in _totals: |
| _totals["storage_class"] = \ |
| self.agent_results[_a]["storage_class"] |
| # Lookup storage class id and num_pg |
| _totals["storage_class_stats"] = \ |
| reporter.get_pool_stats_by_id( |
| self._lookup_storage_class_id_by_name( |
| self.agent_results[_a]["storage_class"] |
| ), |
| self.ceph_pg_dump |
| ) |
| |
| _totals["read_bw_bytes"] = _r_bw |
| _totals["read_avg_lat_us"] = \ |
| (sum(_r_avglat) / len(_r_avglat)) / 1000 |
| _totals["read_95p_clat_us"] = \ |
| (sum(_r_95clat) / len(_r_95clat)) / 1000 |
| _totals["read_iops"] = _r_iops |
| _totals["write_bw_bytes"] = _w_bw |
| _totals["write_avg_lat_us"] = \ |
| (sum(_w_avglat) / len(_w_avglat)) / 1000 |
| _totals["write_95p_clat_us"] = \ |
| (sum(_w_95clat) / len(_w_95clat)) / 1000 |
| _totals["write_iops"] = _w_iops |
| |
| def calculate_ceph_stats(self): |
| # func to get values as lists |
| def _get_max_value(key, stats): |
| _max_time = 0 |
| _value = 0 |
| for _k, _v in stats.items(): |
| if key in _v and _value < _v[key]: |
| _max_time = _k |
| _value = _v[key] |
| return _max_time, _value |
| |
| def _perc(n, m): |
| if not n: |
| return 0 |
| elif not m: |
| return 0 |
| else: |
| return "{:.0f}%".format((n / m) * 100) |
| |
| def _axis_vals(val): |
| return [ |
| val, int(val*1.1), int(val*0.75), int(val*0.50), int(val*0.15) |
| ] |
| |
| _stats = {} |
| for _time, data in self.results.items(): |
| if "ceph" not in data: |
| logger_cli.warning( |
| "WARNING: Ceph stats raw data not found in results" |
| ) |
| continue |
| if "ceph_stats" not in data: |
| data["ceph_stats"] = {} |
| else: |
| continue |
| # Copy pool stats data |
| for _e, _d in data["ceph"].items(): |
| _stats[_e] = _d["pgmap"] |
| # Maximums |
| mrb_t, mrb = _get_max_value("read_bytes_sec", _stats) |
| mwb_t, mwb = _get_max_value("write_bytes_sec", _stats) |
| mri_t, mri = _get_max_value("read_op_per_sec", _stats) |
| mwi_t, mwi = _get_max_value("write_op_per_sec", _stats) |
| # Replace ceph with shorter data |
| data["ceph"] = { |
| "max_rbl": _axis_vals(mrb), |
| "max_rbl_time": mrb_t, |
| "max_wbl": _axis_vals(mwb), |
| "max_wbl_time": mwb_t, |
| "max_ril": _axis_vals(mri), |
| "max_ril_time": mri_t, |
| "max_wil": _axis_vals(mwi), |
| "max_wil_time": mwi_t, |
| "stats": _stats |
| } |
| # Calculate %% values for barchart |
| for _e, _d in data["ceph"]["stats"].items(): |
| _d["read_bytes_sec_perc"] = \ |
| _perc(_d.get("read_bytes_sec", 0), mrb) |
| _d["write_bytes_sec_perc"] = \ |
| _perc(_d.get("write_bytes_sec", 0), mwb) |
| _d["read_op_per_sec_perc"] = \ |
| _perc(_d.get("read_op_per_sec", 0), mri) |
| _d["write_op_per_sec_perc"] = \ |
| _perc(_d.get("write_op_per_sec", 0), mwi) |
| return |
| |
| def osd_df_compare(self, _time): |
| def _get_osd(osd_id, nodes): |
| for osd in nodes: |
| if osd["id"] == osd_id: |
| return osd |
| return None |
| |
| logger_cli.info("# Comparing OSD stats") |
| _osd = {} |
| if _time not in self.results: |
| logger_cli.warning("WARNING: {} not found in results. Check data") |
| return |
| data = self.results[_time] |
| # Save summary |
| data["osd_summary"] = {} |
| data["osd_summary"]["before"] = data["osd_df_before"]["summary"] |
| data["osd_summary"]["after"] = data["osd_df_after"]["summary"] |
| data["osd_summary"]["active"] = { |
| "status": "", |
| "device_class": "", |
| "pgs": "", |
| "kb_used": 0, |
| "kb_used_data": 0, |
| "kb_used_omap": 0, |
| "kb_used_meta": 0, |
| "utilization": 0, |
| "var_down": 0, |
| "var_up": 0 |
| } |
| # Compare OSD counts |
| osds_before = len(data["osd_df_before"]["nodes"]) |
| osds_after = len(data["osd_df_after"]["nodes"]) |
| if osds_before != osds_after: |
| logger_cli.warning( |
| "WARNING: Before/After bench OSD " |
| "count mismatch for '{}'".format(_time) |
| ) |
| # iterate osds from before |
| _pgs = 0 |
| _classes = set() |
| _nodes_up = 0 |
| for idx in range(osds_before): |
| _osd_b = data["osd_df_before"]["nodes"][idx] |
| # search for the same osd in after |
| _osd_a = _get_osd(_osd_b["id"], data["osd_df_after"]["nodes"]) |
| # Save data to the new place |
| _osd[_osd_b["name"]] = {} |
| _osd[_osd_b["name"]]["before"] = _osd_b |
| if not _osd_a: |
| # If this happen, Ceph cluster is actually broken |
| logger_cli.warning( |
| "WARNING: Wow! {} dissapered".format(_osd_b["name"]) |
| ) |
| _osd[_osd_b["name"]]["after"] = {} |
| else: |
| _osd[_osd_b["name"]]["after"] = _osd_a |
| _osd[_osd_b["name"]]["percent"] = {} |
| # Calculate summary using "after" data |
| _pgs += _osd_a["pgs"] |
| _classes.update([_osd_a["device_class"]]) |
| if _osd_a["status"] == "up": |
| _nodes_up += 1 |
| # compare |
| _keys_b = list(_osd_b.keys()) |
| _keys_a = list(_osd_a.keys()) |
| _nodes_up |
| # To be safe, detect if some keys are different |
| # ...and log it. |
| _diff = set(_keys_b).symmetric_difference(_keys_a) |
| if len(_diff) > 0: |
| # This should never happen, actually |
| logger_cli.warning( |
| "WARNING: Before/after keys mismatch " |
| "for OSD node {}: {}".format(idx, ", ".join(_diff)) |
| ) |
| continue |
| # Compare each key and calculate how it changed |
| for k in _keys_b: |
| if _osd_b[k] != _osd_a[k]: |
| # Announce change |
| logger_cli.debug( |
| "-> {:4}: {}, {} -> {}".format( |
| idx, |
| k, |
| _osd_b[k], |
| _osd_a[k] |
| ) |
| ) |
| # calculate percent |
| _change_perc = (_osd_a[k] / _osd_b[k]) * 100 - 100 |
| _osd[_osd_b["name"]]["percent"][k] = _change_perc |
| |
| # Increase counters |
| _p = data["osd_summary"]["active"] |
| |
| if k not in _p: |
| _p[k] = 1 |
| else: |
| _p[k] += 1 |
| if k == "var": |
| if _change_perc > 0: |
| _p["var_up"] += 1 |
| elif _change_perc < 0: |
| _p["var_down"] += 1 |
| # Save sorted data |
| data["osds"] = _osd |
| logger_cli.info("-> Removing redundand osd before/after data") |
| data.pop("osd_df_before") |
| data.pop("osd_df_after") |
| # Save summary |
| data["osd_summary"]["active"]["status"] = "{}".format(_nodes_up) |
| data["osd_summary"]["active"]["device_class"] = \ |
| "{}".format(len(list(_classes))) |
| data["osd_summary"]["active"]["pgs"] = _pgs |
| return |
| |
| # Create report |
| def create_report(self, filename): |
| """ |
| Create static html showing ceph info report |
| |
| :return: none |
| """ |
| logger_cli.info("### Generating report to '{}'".format(filename)) |
| _report = reporter.ReportToFile( |
| reporter.HTMLCephBench(self), |
| filename |
| ) |
| _report( |
| { |
| "results": self.results, |
| "idle_status": self.ceph_idle_status, |
| "health_detail": self.health_detail, |
| "ceph_df": self.ceph_df, |
| "ceph_pg_dump": self.ceph_pg_dump, |
| "info": self.ceph_info.ceph_info, |
| "cluster": self.ceph_info.cluster_info, |
| "ceph_version": self.ceph_info.ceph_version, |
| "nodes": self.agent_pods |
| } |
| ) |
| logger_cli.info("-> Done") |
| |
| return |