ceph bench module hotfixes after client run
Related-PROD: PROD-37028
Change-Id: Ied20709e367877ca0be1c9bd531476070059de10
diff --git a/cfg_checker/cli/arguments.py b/cfg_checker/cli/arguments.py
index 387ed59..ac014ff 100644
--- a/cfg_checker/cli/arguments.py
+++ b/cfg_checker/cli/arguments.py
@@ -75,3 +75,10 @@
metavar='skip_nodes_file', default=None,
help="Filename with nodes to skip. Note: use fqdn node names."
)
+
+ parser.add_argument(
+ '--force-node-network',
+ metavar='force_node_network', default=None,
+ help="When creating node shell, use this network instead if internal"
+ "IP. Format: '10.10.10.'"
+ )
diff --git a/cfg_checker/common/kube_utils.py b/cfg_checker/common/kube_utils.py
index 195e791..f4c38ef 100644
--- a/cfg_checker/common/kube_utils.py
+++ b/cfg_checker/common/kube_utils.py
@@ -446,7 +446,7 @@
_error = _pod_stream.read_stderr()
if _error:
# copy error to output
- logger_cli.warning(
+ logger.warning(
"WARNING: cmd of '{}' returned error:\n{}\n".format(
" ".join(cmd),
_error
diff --git a/cfg_checker/helpers/console_utils.py b/cfg_checker/helpers/console_utils.py
index fa770a3..b45445e 100644
--- a/cfg_checker/helpers/console_utils.py
+++ b/cfg_checker/helpers/console_utils.py
@@ -3,6 +3,32 @@
import sys
+class cl_typewriter(object):
+ previous = 0
+ carret = 0
+
+ def cl_start(self, sttr):
+ self.previous = self.carret
+ self.carret += len(sttr)
+ sys.stdout.write("\r{}".format(sttr))
+
+ def cl_inline(self, sttr):
+ self.carret += len(sttr)
+ sys.stdout.write("{}".format(sttr))
+
+ def cl_sameline(self, sttr):
+ self.cl_inline("\r" + sttr)
+ self.cl_flush()
+
+ def cl_flush(self, newline=False):
+ if newline:
+ self.cl_inline("\n")
+ self.carret = 0
+ elif self.previous > self.carret:
+ self.cl_inline(" "*(self.previous - self.carret))
+ sys.stdout.flush()
+
+
class Progress(object):
_strsize = 0
_note_size = 0
diff --git a/cfg_checker/modules/ceph/__init__.py b/cfg_checker/modules/ceph/__init__.py
index e2f0049..5c9357b 100644
--- a/cfg_checker/modules/ceph/__init__.py
+++ b/cfg_checker/modules/ceph/__init__.py
@@ -1,7 +1,5 @@
# Author: Alex Savatieiev (osavatieiev@mirantis.com; a.savex@gmail.com)
# Copyright 2019-2022 Mirantis, Inc.
-from datetime import datetime
-
from cfg_checker.agent.fio_runner import get_fio_options
from cfg_checker.agent.fio_runner import seq_modes, mix_modes
from cfg_checker.common import logger_cli
@@ -337,6 +335,8 @@
logger_cli.info("-> running with tasks from '{}'".format(_task_file))
config.bench_task_file = _task_file
config.bench_mode = "tasks"
+ # Add default size to options
+ _opts["size"] = _get_param_and_log(args, "size")
logger_cli.debug("... default/selected options for fio:")
for _k in _opts.keys():
# TODO: Update options for single run
diff --git a/cfg_checker/modules/ceph/bench.py b/cfg_checker/modules/ceph/bench.py
index fbb0a13..95e39bf 100644
--- a/cfg_checker/modules/ceph/bench.py
+++ b/cfg_checker/modules/ceph/bench.py
@@ -11,6 +11,7 @@
from cfg_checker.common.decorators import retry
from cfg_checker.common.file_utils import write_str_to_file
from cfg_checker.helpers.console_utils import Progress
+from cfg_checker.helpers.console_utils import cl_typewriter
from cfg_checker.reports import reporter
# from cfg_checker.common.exception import InvalidReturnException
# from cfg_checker.common.exception import ConfigException
@@ -127,7 +128,9 @@
"rwmixread": row[1],
"bs": row[2],
"iodepth": row[3],
- "size": row[4]
+ "size": row[4],
+ "ramp_time": row[5],
+ "runtime": row[6]
})
logger_cli.info("-> Loaded {} tasks".format(len(self.tasks)))
@@ -279,11 +282,12 @@
# All is good
return True
- def get_agents_status(self):
+ def get_agents_status(self, silent=True):
_status = {}
_results = self.master.exec_on_labeled_pods_and_ns(
"app=cfgagent",
- "curl -s http://localhost:8765/api/fio"
+ "curl -s http://localhost:8765/api/fio",
+ silent=silent
)
for _agent, _result in _results.items():
_j = _parse_json_output(_result)
@@ -353,23 +357,40 @@
_stats_delay = round((_runtime + _ramptime) / 65)
_start = self.next_scheduled_time
_end = datetime.now(timezone.utc) + timedelta(seconds=_timeout)
+ logger_cli.info(" ")
+ tw = cl_typewriter()
while True:
# Print status
- _sts = self.get_agents_status()
+ tw.cl_start(" ")
+ _sts = self.get_agents_status(silent=True)
+ # Use same line
diff = (_end - datetime.now(timezone.utc)).total_seconds()
- logger_cli.info("-> {:.2f}s left. Agent status:".format(diff))
- for _agent, _status in _sts.items():
- logger_cli.info(
- "\t{}: {} ({}%)".format(
- _agent,
- _status["status"],
- _status["progress"]
+ _startin = (_start - datetime.now(timezone.utc)).total_seconds()
+ if _startin > 0:
+ tw.cl_inline("-> starting in {:.2f}s ".format(_startin))
+ else:
+ tw.cl_inline("-> {:.2f}s; ".format(diff))
+ _progress = [_st["progress"] for _st in _sts.values()]
+ tw.cl_inline(
+ "{}% <-> {}%; ".format(
+ min(_progress),
+ max(_progress)
)
)
+
+ _a_sts = [_t["status"] for _t in _sts.values()]
+ tw.cl_inline(
+ ", ".join(
+ ["{} {}".format(_a_sts.count(_s), _s)
+ for _s in set(_a_sts)]
+ )
+ )
+
# Get Ceph status if _start time passed
_elapsed = (datetime.now(timezone.utc) - _start).total_seconds()
if _elapsed > _stats_delay:
- logger_cli.info("-> {:.2f}s elapsed".format(_elapsed))
+ # Use same line output
+ tw.cl_inline(" {:.2f}s elapsed".format(_elapsed))
_sec = "{:0.1f}".format(_elapsed)
self.results[options["scheduled_to"]]["ceph"][_sec] = \
self.ceph_info.get_cluster_status()
@@ -379,16 +400,19 @@
_fcnt = len(finished)
_tcnt = len(_sts)
if _fcnt < _tcnt:
- logger_cli.info("-> {}/{} finished".format(_fcnt, _tcnt))
+ tw.cl_inline(" {}/{}".format(_fcnt, _tcnt))
else:
+ tw.cl_flush(newline=True)
logger_cli.info("-> All agents finished run")
return True
# recalc how much is left
diff = (_end - datetime.now(timezone.utc)).total_seconds()
# In case end_datetime was in past to begin with
if diff < 0:
+ tw.cl_flush(newline=True)
logger_cli.info("-> Timed out waiting for agents to finish")
return False
+ tw.cl_flush()
def _do_testrun(self, options):
self.results[options["scheduled_to"]]["osd_df_before"] = \
@@ -573,8 +597,8 @@
if _time not in self.results:
# Some older results found
# do not process them
- logger_cli.info(
- "-> Skipped old results for '{}'".format(_time)
+ logger_cli.debug(
+ "...skipped old results for '{}'".format(_time)
)
continue
elif _agent not in self.results[_time]["agents"]:
@@ -690,6 +714,11 @@
return None
def calculate_totals(self):
+ def _savg(vlist):
+ if len(vlist) > 0:
+ return (sum(vlist) / len(vlist)) / 1000
+ else:
+ return 0
# Calculate totals for Read and Write
for _time, data in self.results.items():
if "totals" not in data:
@@ -731,16 +760,12 @@
)
_totals["read_bw_bytes"] = _r_bw
- _totals["read_avg_lat_us"] = \
- (sum(_r_avglat) / len(_r_avglat)) / 1000
- _totals["read_95p_clat_us"] = \
- (sum(_r_95clat) / len(_r_95clat)) / 1000
+ _totals["read_avg_lat_us"] = _savg(_r_avglat)
+ _totals["read_95p_clat_us"] = _savg(_r_95clat)
_totals["read_iops"] = _r_iops
_totals["write_bw_bytes"] = _w_bw
- _totals["write_avg_lat_us"] = \
- (sum(_w_avglat) / len(_w_avglat)) / 1000
- _totals["write_95p_clat_us"] = \
- (sum(_w_95clat) / len(_w_95clat)) / 1000
+ _totals["write_avg_lat_us"] = _savg(_w_avglat)
+ _totals["write_95p_clat_us"] = _savg(_w_95clat)
_totals["write_iops"] = _w_iops
def calculate_ceph_stats(self):
@@ -830,7 +855,7 @@
data["osd_summary"]["active"] = {
"status": "",
"device_class": "",
- "pgs": "",
+ "pgs": 0,
"kb_used": 0,
"kb_used_data": 0,
"kb_used_omap": 0,
diff --git a/cfg_checker/modules/ceph/info.py b/cfg_checker/modules/ceph/info.py
index 1f3b8cb..1fb9205 100644
--- a/cfg_checker/modules/ceph/info.py
+++ b/cfg_checker/modules/ceph/info.py
@@ -8,7 +8,7 @@
from time import sleep
from datetime import datetime
-from cfg_checker.common import logger_cli
+from cfg_checker.common import logger_cli, logger
from cfg_checker.common.exception import KubeException
from cfg_checker.helpers.console_utils import Progress
@@ -204,15 +204,15 @@
_fname = d["filename"]
if isinstance(d["data"], dict) or isinstance(d["data"], list):
_buf = json.dumps(d["data"], indent=2)
- # _filename = key + ".json" if _fname is not None else _fname
+ # _filename = key+".json" if _fname is not None else _fname
_filename = _ensure_fname(".json")
elif isinstance(d["data"], str):
_buf = d["data"]
- # _filename = key + ".txt"
+ # _filename = key+".txt"
_filename = _ensure_fname(".txt")
else:
_buf = str(d["data"])
- # _filename = key + ".txt"
+ # _filename = key+".txt"
_filename = _ensure_fname(".txt")
logger_cli.debug("... writing '{}'".format(_filename))
_tgz.add_file(_filename, buf=_buf, replace=True)
@@ -279,9 +279,9 @@
cmd_str
)
if expect_output and not _r:
- logger_cli.debug("... got empty output for '{}'".format(cmd_str))
+ logger.debug("... got empty output for '{}'".format(cmd_str))
elif not expect_output and _r:
- logger_cli.warning(
+ logger.warning(
"WARNING: Unexpected output for '{}':\n"
"===== Start\n{}\n===== End".format(cmd_str, _r)
)
diff --git a/cfg_checker/nodes.py b/cfg_checker/nodes.py
index fdda358..80994e2 100644
--- a/cfg_checker/nodes.py
+++ b/cfg_checker/nodes.py
@@ -582,8 +582,15 @@
# hostname
self.nodes[_name]['shortname'] = \
_nodes[_name]['addresses']['hostname']['address']
+ # internal
self.nodes[_name]['internalip'] = \
_nodes[_name]['addresses']['internalip']['address']
+ # alternate
+ if self.env_config.force_node_network is not None:
+ iIP = self.nodes[_name]['internalip']
+ # use last number
+ aIP = self.env_config.force_node_network + iIP.split('.')[-1]
+ self.nodes[_name]["altip"] = aIP
self.nodes[_name]['node_group'] = None
self.nodes[_name]['labels'] = _labels
self.nodes[_name]['roles'] = _roles
@@ -694,7 +701,9 @@
):
_u = self.env_config.kube_node_user
_k = self.env_config.kube_node_keypath
- _h = self.nodes[node]['internalip']
+
+ _n = self.nodes[node]
+ _h = _n['altip'] if "altip" in _n else _n['internalip']
_p = 22
if self.kube.is_local or self.kube.config.ssh_direct:
logger.debug("Getting shell with no port forward")
@@ -1105,13 +1114,21 @@
_ds_results[_n] = _v
return _ds_results
- def exec_on_labeled_pods_and_ns(self, label_str, cmd, _args=None, ns=None):
+ def exec_on_labeled_pods_and_ns(
+ self,
+ label_str,
+ cmd,
+ _args=None,
+ ns=None,
+ silent=False
+ ):
if not ns:
ns = self._namespace
_results = self.exec_cmd_on_pods(
self.kube.list_pods(ns, label_str=label_str),
cmd,
- _args=_args
+ _args=_args,
+ silent=silent
)
_pod_results = {}
for _, _p, _v in _results:
@@ -1123,7 +1140,8 @@
pod_list,
cmd,
_args=None,
- is_script=False
+ is_script=False,
+ silent=False
):
def _kube_exec_on_pod(plist):
return [
@@ -1183,7 +1201,8 @@
_results = []
self.not_responded = []
# create result list
- _progress = Progress(len(_plist))
+ if not silent:
+ _progress = Progress(len(_plist))
ret = pool.imap_unordered(_kube_exec_on_pod, _plist)
for ii in enumerate(ret, start=1):
@@ -1191,9 +1210,11 @@
self.not_responded.append(ii[1][0])
else:
_results.append(ii[1])
- _progress.write_progress(ii[0])
+ if not silent:
+ _progress.write_progress(ii[0])
- _progress.end()
+ if not silent:
+ _progress.end()
pool.close()
pool.join()
logger_cli.debug(