blob: f5af704946992e3489bbd8cfd18f269ed2b5c08d [file] [log] [blame]
# Author: Alex Savatieiev (osavatieiev@mirantis.com; a.savex@gmail.com)
# Copyright 2019-2022 Mirantis, Inc.
import csv
import os
import json
from copy import deepcopy
from datetime import datetime, timedelta, timezone
from cfg_checker.common import logger_cli
from cfg_checker.common.decorators import retry
from cfg_checker.common.file_utils import write_str_to_file
from cfg_checker.helpers.console_utils import Progress
from cfg_checker.helpers.console_utils import cl_typewriter
from cfg_checker.reports import reporter
# from cfg_checker.common.exception import InvalidReturnException
# from cfg_checker.common.exception import ConfigException
# from cfg_checker.common.exception import KubeException
from cfg_checker.nodes import KubeNodes
from cfg_checker.agent.fio_runner import _get_seconds, _datetime_fmt
_file_datetime_fmt = "%m%d%Y%H%M%S%z"
def _reformat_timestr(_str, _chars=["/", ",", " ", ":", "+"], _tchar=""):
_new = ""
for _c in _str:
_new += _c if _c not in _chars else _tchar
return _new
def _parse_json_output(buffer):
try:
return json.loads(buffer)
except TypeError as e:
logger_cli.error(
"ERROR: Status not decoded: {}\n{}".format(e, buffer)
)
except json.decoder.JSONDecodeError as e:
logger_cli.error(
"ERROR: Status not decoded: {}\n{}".format(e, buffer)
)
return {}
def _split_vol_size(size):
# I know, but it is faster then regex
_numbers = [48, 49, 50, 51, 52, 53, 54, 55, 56, 57]
_s_int = "0"
_s_type = ""
for ch in size:
if ord(ch) in _numbers:
_s_int += ch
else:
_s_type += ch
return int(_s_int), _s_type
class CephBench(object):
_agent_template = "cfgagent-template.yaml"
def __init__(self, config):
self.env_config = config
return
class SaltCephBench(CephBench):
def __init__(
self,
config
):
logger_cli.error("ERROR: Not impelented for Salt environment!")
# self.master = SaltNodes(config)
super(SaltCephBench, self).__init__(config)
return
class KubeCephBench(CephBench):
def __init__(self, config):
self.agent_count = config.bench_agent_count
self.master = KubeNodes(config)
super(KubeCephBench, self).__init__(config)
self.mode = config.bench_mode
self.resource_prefix = config.resource_prefix
if config.bench_mode == "tasks":
self.taskfile = config.bench_task_file
self.load_tasks(self.taskfile)
if config.bench_mode == "cleanup":
self.cleanup_list = []
return
self.bench_name = config.bench_name
self.results_dump_path = config.bench_results_dump_path
self.results = {}
self.agent_results = {}
self.cleanup_list = []
self.agent_pods = []
if config.bench_mode == "report":
self.results = {}
return
self.storage_class = config.bench_storage_class
self.services = []
# By default,
# 30 seconds should be enough to send tasks to 3-5 agents
self.scheduled_delay = 30
def set_ceph_info_class(self, ceph_info):
self.ceph_info = ceph_info
def load_tasks(self, taskfile):
# Load csv file
logger_cli.info("-> Loading taskfile '{}'".format(taskfile))
self.tasks = []
with open(taskfile) as f:
_reader = csv.reader(f, delimiter=',')
# load packages
for row in _reader:
self.tasks.append({
"readwrite": row[0],
"rwmixread": row[1],
"bs": row[2],
"iodepth": row[3],
"size": row[4],
"ramp_time": row[5],
"runtime": row[6]
})
logger_cli.info("-> Loaded {} tasks".format(len(self.tasks)))
def add_for_deletion(self, obj, typ):
self.cleanup_list.append(
[
typ,
obj.metadata.namespace,
obj.metadata.name
]
)
return
def prepare_cleanup(self):
# Assume number of resources not given
# list all svc, pod, pvc, pv and identify 'cfgagent-xx ones
_types = ["pv", "pvc", "pod", "svc"]
_prefix = self.resource_prefix
for _typ in _types:
_list = self.master.list_resource_names_by_type_and_ns(_typ)
for ns, name in _list:
if name.startswith(_prefix):
if ns:
_msg = "{} {}/{}".format(_typ, ns, name)
else:
_msg = "{} {}".format(_typ, name)
logger_cli.info("-> Found {}".format(_msg))
self.cleanup_list.append([_typ, ns, name])
return
def prepare_agents(self, options):
logger_cli.info("# Preparing {} agents".format(self.agent_count))
# Increase volume size a bit, so datafile fits
_quanitizer = 1.3
_v_size, _vol_size_units = _split_vol_size(options['size'])
_v_size = round(_v_size * _quanitizer)
_vol_size = str(_v_size) + _vol_size_units + "i"
logger_cli.info(
"-> Testfile size: {0}, Volume size: {1} ({0}*{2})".format(
options['size'],
_vol_size,
_quanitizer
)
)
# Start preparing
for idx in range(self.agent_count):
# create pvc/pv and pod
logger_cli.info("-> creating agent '{:02}'".format(idx))
# _agent, _pv, _pvc = self.master.prepare_benchmark_agent(
_agent, _pvc = self.master.prepare_benchmark_agent(
idx,
os.path.split(options["filename"])[0],
self.storage_class,
_vol_size,
self._agent_template
)
# save it to lists
self.agent_pods.append(_agent)
# self.add_for_deletion(_pv, "pv")
self.add_for_deletion(_pvc, "pvc")
self.add_for_deletion(_agent, "pod")
# expose it
_svc = self.master.expose_benchmark_agent(_agent)
self.add_for_deletion(_svc, "svc")
# Save service
self.services.append(_svc)
# prepopulate results
self.agent_results[_agent.metadata.name] = {}
self.agent_results[_agent.metadata.name]["url"] = \
"http://{}:{}/api/".format(
_svc.spec.cluster_ip,
8765
)
self.agent_results[_agent.metadata.name]["storage_class"] = \
self.storage_class
self.agent_results[_agent.metadata.name]["volume_size"] = \
options['size']
logger_cli.info("-> Done creating agents")
# TODO: Update after implementing pooled task sending
# idea is to have time to schedule task to each agent every 5 sec max
self.scheduled_delay = self.agent_count * 5
logger_cli.info(
"-> Schedule delay set to {} sec".format(self.scheduled_delay)
)
return
def _poke_agent(self, url, body, action="GET"):
_datafile = "/tmp/data"
_data = [
"-d",
"@" + _datafile
]
_cmd = [
"curl",
"-s",
"-H",
"'Content-Type: application/json'",
"-X",
action,
url
]
if body:
_cmd += _data
_ret = self.master.prepare_json_in_pod(
self.agent_pods[0].metadata.name,
self.master._namespace,
body,
_datafile
)
_ret = self.master.exec_cmd_on_target_pod(
self.agent_pods[0].metadata.name,
self.master._namespace,
" ".join(_cmd)
)
return _parse_json_output(_ret)
def _ensure_agents_ready(self):
# make sure agents idle
_status_set = []
_ready_set = []
for _agent, _d in self.get_agents_status().items():
# obviously, there should be some answer
if _d is None:
logger_cli.error("ERROR: Agent status not available")
return False
# status should be idle or finished
if _d['status'] not in ["idle", "finished"]:
logger_cli.error(
"Agent status invalid {}:{}".format(_agent, _d['status'])
)
_status_set += [False]
else:
# Good agent
_status_set += [True]
# agent's fio shell should be in 'ready'
if not _d["healthcheck"]["ready"]:
logger_cli.error("Agent is not ready {}".format(_agent))
_ready_set += [False]
else:
# 'fio' shell for agent is ready
_ready_set += [True]
# all agent's statuses should be True
# and all 'fio' shell modules should be 'ready'
if not any(_status_set) or not any(_ready_set):
# At least one is not ready and it was already logged above
return False
else:
# All is good
return True
def get_agents_status(self, silent=True):
_status = {}
_results = self.master.exec_on_labeled_pods_and_ns(
"app=cfgagent",
"curl -s http://localhost:8765/api/fio",
silent=silent
)
for _agent, _result in _results.items():
_j = _parse_json_output(_result)
_status[_agent] = _j
return _status
@retry(Exception, initial_wait=5)
def get_agents_resultlist(self):
_t = {"module": "fio", "action": "get_resultlist"}
_status = {}
for _agent, _d in self.agent_results.items():
_status[_agent] = self._poke_agent(_d["url"], _t, action="POST")
return _status
@retry(Exception, initial_wait=5)
def get_result_from_agent(self, agent, time):
_t = {
"module": "fio",
"action": "get_result",
"options": {
"time": time
}
}
return self._poke_agent(
self.agent_results[agent]["url"],
_t,
action="POST"
)
def _get_next_scheduled_time(self):
_now = datetime.now(timezone.utc)
logger_cli.info("-> time is '{}'".format(_now.strftime(_datetime_fmt)))
self.next_scheduled_time = _now + timedelta(
seconds=self.scheduled_delay
)
_str_time = self.next_scheduled_time.strftime(_datetime_fmt)
logger_cli.info(
"-> next benchmark scheduled to '{}'".format(_str_time)
)
return _str_time
def _send_scheduled_task(self, options):
_task = {
"module": "fio",
"action": "do_scheduledrun",
"options": options
}
for _agent, _d in self.agent_results.items():
logger_cli.info(
"-> sending task to '{}:{}'".format(_agent, _d["url"])
)
_ret = self._poke_agent(_d["url"], _task, action="POST")
if 'error' in _ret:
logger_cli.error(
"ERROR: Agent returned: '{}'".format(_ret['error'])
)
return False
# No errors detected
return True
def track_benchmark(self, options):
_runtime = _get_seconds(options["runtime"])
_ramptime = _get_seconds(options["ramp_time"])
# Sum up all timings that we must wait and double it
_timeout = (self.scheduled_delay + _runtime + _ramptime) * 2
# We should have no more than 65 measurements
_stats_delay = round((_runtime + _ramptime) / 65)
_start = self.next_scheduled_time
_end = datetime.now(timezone.utc) + timedelta(seconds=_timeout)
logger_cli.info(" ")
tw = cl_typewriter()
while True:
# Print status
tw.cl_start(" ")
_sts = self.get_agents_status(silent=True)
# Use same line
diff = (_end - datetime.now(timezone.utc)).total_seconds()
_startin = (_start - datetime.now(timezone.utc)).total_seconds()
if _startin > 0:
tw.cl_inline("-> starting in {:.2f}s ".format(_startin))
else:
tw.cl_inline("-> {:.2f}s; ".format(diff))
_progress = [_st["progress"] for _st in _sts.values()]
tw.cl_inline(
"{}% <-> {}%; ".format(
min(_progress),
max(_progress)
)
)
_a_sts = [_t["status"] for _t in _sts.values()]
tw.cl_inline(
", ".join(
["{} {}".format(_a_sts.count(_s), _s)
for _s in set(_a_sts)]
)
)
# Get Ceph status if _start time passed
_elapsed = (datetime.now(timezone.utc) - _start).total_seconds()
if _elapsed > _stats_delay:
# Use same line output
tw.cl_inline(" {:.2f}s elapsed".format(_elapsed))
_sec = "{:0.1f}".format(_elapsed)
self.results[options["scheduled_to"]]["ceph"][_sec] = \
self.ceph_info.get_cluster_status()
# Check if agents finished
finished = [True for _s in _sts.values()
if _s["status"] == 'finished']
_fcnt = len(finished)
_tcnt = len(_sts)
if _fcnt < _tcnt:
tw.cl_inline("; {}/{}".format(_fcnt, _tcnt))
else:
tw.cl_flush(newline=True)
logger_cli.info("-> All agents finished run")
return True
# recalc how much is left
diff = (_end - datetime.now(timezone.utc)).total_seconds()
# In case end_datetime was in past to begin with
if diff < 0:
tw.cl_flush(newline=True)
logger_cli.info("-> Timed out waiting for agents to finish")
return False
tw.cl_flush()
def _do_testrun(self, options):
self.results[options["scheduled_to"]]["osd_df_before"] = \
self.ceph_info.get_ceph_osd_df()
# send single to agent
if not self._send_scheduled_task(options):
return False
# Track this benchmark progress
if not self.track_benchmark(options):
return False
else:
logger_cli.info("-> Finished testrun. Collecting results...")
# get ceph osd stats
self.results[options["scheduled_to"]]["osd_df_after"] = \
self.ceph_info.get_ceph_osd_df()
# Get results for each agent
self.collect_results()
logger_cli.info("-> Calculating totals and averages")
self.calculate_totals()
self.calculate_ceph_stats()
self.osd_df_compare(options["scheduled_to"])
logger_cli.info("-> Dumping results")
for _time, _d in self.results.items():
self.dump_result(
self._get_dump_filename(_time),
_d
)
return True
def wait_ceph_cooldown(self):
# TODO: Query Ceph ince a 20 sec to make sure its load dropped
# get ceph idle status
self.ceph_idle_status = self.ceph_info.get_cluster_status()
self.health_detail = self.ceph_info.get_health_detail()
self.ceph_df = self.ceph_info.get_ceph_df()
self.ceph_pg_dump = self.ceph_info.get_ceph_pg_dump()
return
def run_benchmark(self, options):
logger_cli.info("# Starting '{}' benchmark".format(self.mode))
# Check agent readyness
logger_cli.info("# Checking agents")
if not self._ensure_agents_ready():
return False
# Make sure that Ceph is at low load
# TODO: Ceph status check
# self._wait_ceph_cooldown()
# Do benchmark according to mode
if self.mode == "tasks":
logger_cli.info(
"# Running benchmark with tasks from '{}'".format(
self.taskfile
)
)
# take next task
_total_tasks = len(self.tasks)
for idx in range(_total_tasks):
# init time to schedule
_task = self.tasks[idx]
_r = self.results
logger_cli.info(
"-> Starting next task ({}/{})".format(idx+1, _total_tasks)
)
logger_cli.info("-> Updating options with: {}".format(
", ".join(
["{} = {}".format(k, v) for k, v in _task.items()]
)
)
)
# update options
options.update(_task)
# Check if such result already exists
o = "input_options"
_existing = filter(
lambda t:
_r[t]["id"] == idx and
_r[t]["mode"] == "tasks" and
_r[t][o]["readwrite"] == options["readwrite"] and
_r[t][o]["rwmixread"] == options["rwmixread"] and
_r[t][o]["bs"] == options["bs"] and
_r[t][o]["iodepth"] == options["iodepth"] and
_r[t][o]["size"] == options["size"],
_r
)
if len(list(_existing)) > 0:
logger_cli.info(
"-> Skipped already performed task from {}: "
"line {}, {}({}), {}, {}, {}".format(
self.taskfile,
idx,
options["readwrite"],
options["rwmixread"],
options["bs"],
options["iodepth"],
options["size"]
)
)
continue
_sch_time = self._get_next_scheduled_time()
options["scheduled_to"] = _sch_time
# init results table
_r[_sch_time] = {
"id": idx,
"mode": self.mode,
"input_options": deepcopy(options),
"agents": {},
"ceph": {}
}
# exit on error
if not self._do_testrun(options):
return False
# Save ceph osd stats and wait cooldown
self.wait_ceph_cooldown()
elif self.mode == "single":
logger_cli.info("# Running single benchmark")
# init time to schedule
_sch_time = self._get_next_scheduled_time()
options["scheduled_to"] = _sch_time
# init results table
self.results[_sch_time] = {
"id": "{:2}".format(0),
"input_options": options,
"agents": {},
"ceph": {}
}
if not self._do_testrun(options):
return False
# Save ceph osd stats
else:
logger_cli.error("ERROR: Unknown mode '{}'".format(self.mode))
return False
# Normal exit
logger_cli.info("# All benchmark tasks done")
return True
def cleanup(self):
logger_cli.info("# Cleaning up")
self.cleanup_list.reverse()
for _res in self.cleanup_list:
self.master.cleanup_resource_by_name(_res[0], _res[2], ns=_res[1])
# Wait for resource to be cleaned
_timeout = 120
_total = len(self.cleanup_list)
logger_cli.info("-> Wait until {} resources cleaned".format(_total))
_p = Progress(_total)
while True:
_g = self.master.get_resource_phase_by_name
_l = [_g(r[0], r[2], ns=r[1]) for r in self.cleanup_list]
_l = [item for item in _l if item]
_idx = _total - len(_l)
if len(_l) > 0:
_p.write_progress(_idx)
else:
_p.write_progress(_idx)
_p.end()
logger_cli.info("# Done cleaning up")
break
if _timeout > 0:
_timeout -= 1
else:
_p.end()
logger_cli.info("# Timed out waiting after 120s.")
break
return
def collect_results(self):
logger_cli.info("# Collecting results")
# query agents for results
_agents = self.get_agents_resultlist()
for _agent, _l in _agents.items():
# Check if we already have this locally
for _time in _l["resultlist"]:
# There is a file already for this task/time
# Check if we need to load it
if _time not in self.results:
# Some older results found
# do not process them
logger_cli.debug(
"...skipped old results for '{}'".format(_time)
)
continue
elif _agent not in self.results[_time]["agents"]:
# Load result add it locally
logger_cli.info(
"-> Getting results for '{}' from '{}'".format(
_time,
_agent
)
)
_r = self.get_result_from_agent(_agent, _time)
self.results[_time]["agents"][_agent] = _r[_time]
else:
# Should never happen, actually
logger_cli.info(
"-> Skipped loaded result for '{}' from '{}'".format(
_time,
_agent
)
)
def _get_dump_filename(self, _time):
_r = self.results[_time]
_dirname = _r["input_options"]["name"]
_filename = "-".join([
_reformat_timestr(_time),
"{:02}".format(len(_r["agents"])),
_r["input_options"]["readwrite"],
_r["input_options"]["bs"],
str(_r["input_options"]["iodepth"]),
]) + ".json"
return os.path.join(
self.results_dump_path,
_dirname,
_filename
)
def preload_results(self):
logger_cli.info(
"# Preloading results for '{}'".format(self.bench_name)
)
# get all dirs in folder
_p = self.results_dump_path
if not os.path.isdir(_p):
logger_cli.warn(
"WARNING: Dump path is not a folder '{}'".format(_p)
)
return
for path, dirs, files in os.walk(_p):
if path == os.path.join(_p, self.bench_name):
logger_cli.info("-> Folder found '{}'".format(path))
for _fname in files:
logger_cli.debug("... processing '{}'".format(_fname))
_ext = _fname.split('.')[-1]
if _ext != "json":
logger_cli.info(
"-> Extension invalid '{}', "
"'json' is expected".format(_ext)
)
continue
# get time from filename
# Ugly, but works
_t = _fname.split('-')[0]
_str_time = _t[:14] + "+" + _t[14:]
_t = datetime.strptime(_str_time, _file_datetime_fmt)
_time = _t.strftime(_datetime_fmt)
self.results[_time] = self.load_dumped_result(
os.path.join(path, _fname)
)
logger_cli.info(
"-> Loaded '{}' as '{}'".format(
_fname,
_time
)
)
def dump_result(self, filename, data):
# Function dumps all available results as jsons to the given path
# overwriting if needed
_folder, _file = os.path.split(filename)
# Do dump
if not os.path.exists(_folder):
os.mkdir(_folder)
logger_cli.info("-> Created folder '{}'".format(_folder))
# Dump agent data for this test run
write_str_to_file(filename, json.dumps(data, indent=2))
logger_cli.info("-> Dumped '{}'".format(filename))
return
def load_dumped_result(self, filename):
try:
with open(filename, "rt+") as f:
return json.loads(f.read())
except FileNotFoundError as e:
logger_cli.error(
"ERROR: {}".format(e)
)
except TypeError as e:
logger_cli.error(
"ERROR: Invalid file ({}): {}".format(filename, e)
)
except json.decoder.JSONDecodeError as e:
logger_cli.error(
"ERROR: Failed to decode json ({}): {}".format(filename, e)
)
return None
def _lookup_storage_class_id_by_name(self, storage_class_name):
# Assume that self had proper data
for _pool in self.ceph_df["pools"]:
if storage_class_name == _pool["name"]:
return _pool["id"]
return None
def calculate_totals(self):
def _savg(vlist):
if len(vlist) > 0:
return (sum(vlist) / len(vlist)) / 1000
else:
return 0
# Calculate totals for Read and Write
for _time, data in self.results.items():
if "totals" not in data:
data["totals"] = {}
else:
continue
_totals = data["totals"]
_r_bw = 0
_r_avglat = []
_r_95clat = []
_r_iops = 0
_w_bw = 0
_w_avglat = []
_w_95clat = []
_w_iops = 0
for _a, _d in data["agents"].items():
# Hardcoded number of jobs param :(
_j = _d["jobs"][0]
_r_bw += _j["read"]["bw_bytes"]
_r_avglat += [_j["read"]["lat_ns"]["mean"]]
_r_iops += _j["read"]["iops"]
_w_bw += _j["write"]["bw_bytes"]
_w_avglat += [_j["write"]["lat_ns"]["mean"]]
_w_iops += _j["write"]["iops"]
# check for percentiles
if "percentile" in _j["read"]["clat_ns"]:
_r_95clat += \
[_j["read"]["clat_ns"]["percentile"]["95.000000"]]
else:
_r_95clat += []
if "percentile" in _j["write"]["clat_ns"]:
_w_95clat += \
[_j["write"]["clat_ns"]["percentile"]["95.000000"]]
else:
_w_95clat += []
# Save storage class name
if "storage_class" not in _totals:
_totals["storage_class"] = \
self.agent_results[_a]["storage_class"]
# Lookup storage class id and num_pg
_totals["storage_class_stats"] = \
reporter.get_pool_stats_by_id(
self._lookup_storage_class_id_by_name(
self.agent_results[_a]["storage_class"]
),
self.ceph_pg_dump
)
_totals["read_bw_bytes"] = _r_bw
_totals["read_avg_lat_us"] = _savg(_r_avglat)
_totals["read_95p_clat_us"] = _savg(_r_95clat)
_totals["read_iops"] = _r_iops
_totals["write_bw_bytes"] = _w_bw
_totals["write_avg_lat_us"] = _savg(_w_avglat)
_totals["write_95p_clat_us"] = _savg(_w_95clat)
_totals["write_iops"] = _w_iops
def calculate_ceph_stats(self):
# func to get values as lists
def _get_max_value(key, stats):
_max_time = 0
_value = 0
for _k, _v in stats.items():
if key in _v and _value < _v[key]:
_max_time = _k
_value = _v[key]
return _max_time, _value
def _perc(n, m):
if not n:
return 0
elif not m:
return 0
else:
return "{:.0f}%".format((n / m) * 100)
def _axis_vals(val):
return [
val, int(val*1.1), int(val*0.75), int(val*0.50), int(val*0.15)
]
_stats = {}
for _time, data in self.results.items():
if "ceph" not in data:
logger_cli.warning(
"WARNING: Ceph stats raw data not found in results"
)
continue
if "ceph_stats" not in data:
data["ceph_stats"] = {}
else:
continue
# Copy pool stats data
for _e, _d in data["ceph"].items():
_stats[_e] = _d["pgmap"]
# Maximums
mrb_t, mrb = _get_max_value("read_bytes_sec", _stats)
mwb_t, mwb = _get_max_value("write_bytes_sec", _stats)
mri_t, mri = _get_max_value("read_op_per_sec", _stats)
mwi_t, mwi = _get_max_value("write_op_per_sec", _stats)
# Replace ceph with shorter data
data["ceph"] = {
"max_rbl": _axis_vals(mrb),
"max_rbl_time": mrb_t,
"max_wbl": _axis_vals(mwb),
"max_wbl_time": mwb_t,
"max_ril": _axis_vals(mri),
"max_ril_time": mri_t,
"max_wil": _axis_vals(mwi),
"max_wil_time": mwi_t,
"stats": _stats
}
# Calculate %% values for barchart
for _e, _d in data["ceph"]["stats"].items():
_d["read_bytes_sec_perc"] = \
_perc(_d.get("read_bytes_sec", 0), mrb)
_d["write_bytes_sec_perc"] = \
_perc(_d.get("write_bytes_sec", 0), mwb)
_d["read_op_per_sec_perc"] = \
_perc(_d.get("read_op_per_sec", 0), mri)
_d["write_op_per_sec_perc"] = \
_perc(_d.get("write_op_per_sec", 0), mwi)
return
def osd_df_compare(self, _time):
def _get_osd(osd_id, nodes):
for osd in nodes:
if osd["id"] == osd_id:
return osd
return None
logger_cli.info("# Comparing OSD stats")
_osd = {}
if _time not in self.results:
logger_cli.warning("WARNING: {} not found in results. Check data")
return
data = self.results[_time]
# Save summary
data["osd_summary"] = {}
data["osd_summary"]["before"] = data["osd_df_before"]["summary"]
data["osd_summary"]["after"] = data["osd_df_after"]["summary"]
data["osd_summary"]["active"] = {
"status": "",
"device_class": "",
"pgs": 0,
"kb_used": 0,
"kb_used_data": 0,
"kb_used_omap": 0,
"kb_used_meta": 0,
"utilization": 0,
"var_down": 0,
"var_up": 0
}
# Compare OSD counts
osds_before = len(data["osd_df_before"]["nodes"])
osds_after = len(data["osd_df_after"]["nodes"])
if osds_before != osds_after:
logger_cli.warning(
"WARNING: Before/After bench OSD "
"count mismatch for '{}'".format(_time)
)
# iterate osds from before
_pgs = 0
_classes = set()
_nodes_up = 0
for idx in range(osds_before):
_osd_b = data["osd_df_before"]["nodes"][idx]
# search for the same osd in after
_osd_a = _get_osd(_osd_b["id"], data["osd_df_after"]["nodes"])
# Save data to the new place
_osd[_osd_b["name"]] = {}
_osd[_osd_b["name"]]["before"] = _osd_b
if not _osd_a:
# If this happen, Ceph cluster is actually broken
logger_cli.warning(
"WARNING: Wow! {} dissapered".format(_osd_b["name"])
)
_osd[_osd_b["name"]]["after"] = {}
else:
_osd[_osd_b["name"]]["after"] = _osd_a
_osd[_osd_b["name"]]["percent"] = {}
# Calculate summary using "after" data
_pgs += _osd_a["pgs"]
_classes.update([_osd_a["device_class"]])
if _osd_a["status"] == "up":
_nodes_up += 1
# compare
_keys_b = list(_osd_b.keys())
_keys_a = list(_osd_a.keys())
_nodes_up
# To be safe, detect if some keys are different
# ...and log it.
_diff = set(_keys_b).symmetric_difference(_keys_a)
if len(_diff) > 0:
# This should never happen, actually
logger_cli.warning(
"WARNING: Before/after keys mismatch "
"for OSD node {}: {}".format(idx, ", ".join(_diff))
)
continue
# Compare each key and calculate how it changed
for k in _keys_b:
if _osd_b[k] != _osd_a[k]:
# Announce change
logger_cli.debug(
"-> {:4}: {}, {} -> {}".format(
idx,
k,
_osd_b[k],
_osd_a[k]
)
)
# calculate percent
_change_perc = (_osd_a[k] / _osd_b[k]) * 100 - 100
_osd[_osd_b["name"]]["percent"][k] = _change_perc
# Increase counters
_p = data["osd_summary"]["active"]
if k not in _p:
_p[k] = 1
else:
_p[k] += 1
if k == "var":
if _change_perc > 0:
_p["var_up"] += 1
elif _change_perc < 0:
_p["var_down"] += 1
# Save sorted data
data["osds"] = _osd
logger_cli.info("-> Removing redundand osd before/after data")
data.pop("osd_df_before")
data.pop("osd_df_after")
# Save summary
data["osd_summary"]["active"]["status"] = "{}".format(_nodes_up)
data["osd_summary"]["active"]["device_class"] = \
"{}".format(len(list(_classes)))
data["osd_summary"]["active"]["pgs"] = _pgs
return
# Create report
def create_report(self, filename):
"""
Create static html showing ceph info report
:return: none
"""
logger_cli.info("### Generating report to '{}'".format(filename))
_report = reporter.ReportToFile(
reporter.HTMLCephBench(self),
filename
)
_report(
{
"results": self.results,
"idle_status": self.ceph_idle_status,
"health_detail": self.health_detail,
"ceph_df": self.ceph_df,
"ceph_pg_dump": self.ceph_pg_dump,
"info": self.ceph_info.ceph_info,
"cluster": self.ceph_info.cluster_info,
"ceph_version": self.ceph_info.ceph_version,
"nodes": self.agent_pods
}
)
logger_cli.info("-> Done")
return