Move to new sensor selector, fix some bugs
diff --git a/wally/ceph.py b/wally/ceph.py
index 5c2a4f0..ad3f461 100644
--- a/wally/ceph.py
+++ b/wally/ceph.py
@@ -35,12 +35,6 @@
def run(self, ctx: TestRun) -> None:
"""Return list of ceph's nodes NodeInfo"""
-
- if 'ceph' not in ctx.config.discover:
- print(ctx.config.discover)
- logger.debug("Skip ceph discovery due to config setting")
- return
-
if 'all_nodes' in ctx.storage:
logger.debug("Skip ceph discovery, use previously discovered nodes")
return
@@ -82,20 +76,16 @@
with setup_rpc(connect(info), ctx.rpc_code, ctx.default_rpc_plugins,
log_level=ctx.config.rpc_log_level) as node:
- # ssh_key = node.get_file_content("~/.ssh/id_rsa", expanduser=True)
-
try:
ips = set()
for ip, osds_info in get_osds_info(node, ceph_extra_args, thcount=16).items():
ip = ip_remap.get(ip, ip)
ips.add(ip)
- # creds = ConnCreds(to_ip(cast(str, ip)), user="root", key=ssh_key)
creds = ConnCreds(to_ip(cast(str, ip)), user="root")
info = ctx.merge_node(creds, {'ceph-osd'})
info.params.setdefault('ceph-osds', []).extend(info.__dict__.copy() for info in osds_info)
assert 'ceph' not in info.params or info.params['ceph'] == ceph_params
info.params['ceph'] = ceph_params
-
logger.debug("Found %s nodes with ceph-osd role", len(ips))
except Exception as exc:
if not ignore_errors:
@@ -108,7 +98,6 @@
counter = 0
for counter, ip in enumerate(get_mons_ips(node, ceph_extra_args)):
ip = ip_remap.get(ip, ip)
- # creds = ConnCreds(to_ip(cast(str, ip)), user="root", key=ssh_key)
creds = ConnCreds(to_ip(cast(str, ip)), user="root")
info = ctx.merge_node(creds, {'ceph-mon'})
assert 'ceph' not in info.params or info.params['ceph'] == ceph_params
diff --git a/wally/console_report.py b/wally/console_report.py
index 6490329..381d9ff 100644
--- a/wally/console_report.py
+++ b/wally/console_report.py
@@ -1,52 +1,73 @@
import logging
-
+from typing import cast, Iterator, List, Union
import numpy
from cephlib.common import float2str
-from cephlib import texttable
+from cephlib.texttable import Texttable
from cephlib.statistic import calc_norm_stat_props, calc_histo_stat_props
from .stage import Stage, StepOrder
from .test_run_class import TestRun
+from .result_classes import SuiteConfig
from .suits.io.fio import FioTest
+from .suits.io.fio_job import FioJobParams
from .suits.io.fio_hist import get_lat_vals
from .data_selectors import get_aggregated
+from .result_storage import IWallyStorage
logger = logging.getLogger("wally")
+
+console_report_headers = ["Description", "IOPS ~ Dev", "BW, MiBps", 'Skew/Kurt', 'lat med, ms', 'lat 95, ms']
+console_report_align = ['l', 'r', 'r', 'r', 'r', 'r']
+
+def get_console_report_table(suite: SuiteConfig, rstorage: IWallyStorage) -> List[Union[List[str], Texttable.HLINE]]:
+ table = [] # type: List[Union[List[str], Texttable.HLINE]]
+ prev_params = None
+ for job in sorted(rstorage.iter_job(suite), key=lambda job: job.params):
+ fparams = cast(FioJobParams, job.params)
+ fparams['qd'] = None
+
+ if prev_params is not None and fparams.char_tpl != prev_params:
+ table.append(Texttable.HLINE)
+
+ prev_params = fparams.char_tpl
+
+ bw_ts = get_aggregated(rstorage, suite.storage_id, job.storage_id, metric='bw',
+ trange=job.reliable_info_range_s)
+ props = calc_norm_stat_props(bw_ts)
+ avg_iops = props.average // job.params.params['bsize']
+ iops_dev = props.deviation // job.params.params['bsize']
+
+ lat_ts = get_aggregated(rstorage, suite.storage_id, job.storage_id, metric='lat',
+ trange=job.reliable_info_range_s)
+ bins_edges = numpy.array(get_lat_vals(lat_ts.data.shape[1]), dtype='float32') / 1000 # convert us to ms
+ lat_props = calc_histo_stat_props(lat_ts, bins_edges)
+ table.append([job.params.summary,
+ "{:>6s} ~ {:>6s}".format(float2str(avg_iops), float2str(iops_dev)),
+ float2str(props.average / 1024), # Ki -> Mi
+ "{:>5.1f}/{:>5.1f}".format(props.skew, props.kurt),
+ float2str(lat_props.perc_50), float2str(lat_props.perc_95)])
+ return table
+
+
class ConsoleReportStage(Stage):
priority = StepOrder.REPORT
def run(self, ctx: TestRun) -> None:
for suite in ctx.rstorage.iter_suite(FioTest.name):
- table = texttable.Texttable(max_width=200)
-
+ table = Texttable(max_width=200)
+ table.set_deco(Texttable.VLINES | Texttable.BORDER | Texttable.HEADER)
tbl = ctx.rstorage.get_txt_report(suite)
if tbl is None:
- table.header(["Description", "IOPS ~ Dev", "BW, MiBps", 'Skew/Kurt', 'lat med, ms', 'lat 95, ms'])
- table.set_cols_align(('l', 'r', 'r', 'r', 'r', 'r'))
-
- for job in sorted(ctx.rstorage.iter_job(suite), key=lambda job: job.params):
- bw_ts = get_aggregated(ctx.rstorage, suite.storage_id, job.storage_id, metric='bw',
- trange=job.reliable_info_range_s)
- props = calc_norm_stat_props(bw_ts)
- avg_iops = props.average // job.params.params['bsize']
- iops_dev = props.deviation // job.params.params['bsize']
-
- lat_ts = get_aggregated(ctx.rstorage, suite.storage_id, job.storage_id, metric='lat',
- trange=job.reliable_info_range_s)
- bins_edges = numpy.array(get_lat_vals(lat_ts.data.shape[1]), dtype='float32') / 1000 # convert us to ms
- lat_props = calc_histo_stat_props(lat_ts, bins_edges)
- table.add_row([job.params.summary,
- "{} ~ {}".format(float2str(avg_iops), float2str(iops_dev)),
- float2str(props.average / 1024), # Ki -> Mi
- "{}/{}".format(float2str(props.skew), float2str(props.kurt)),
- float2str(lat_props.perc_50), float2str(lat_props.perc_95)])
-
+ table.header(console_report_headers)
+ table.set_cols_align(console_report_align)
+ for line in get_console_report_table(suite, ctx.rstorage):
+ table.add_row(line)
tbl = table.draw()
ctx.rstorage.put_txt_report(suite, tbl)
print(tbl)
diff --git a/wally/data_selectors.py b/wally/data_selectors.py
index 795c66b..1950e95 100644
--- a/wally/data_selectors.py
+++ b/wally/data_selectors.py
@@ -1,10 +1,11 @@
import logging
-from typing import Tuple, Iterator
+from typing import Tuple, Iterator, List, Iterable, Dict, Union, Callable, Set
import numpy
from cephlib.numeric_types import DataSource, TimeSeries
from cephlib.storage_selectors import c_interpolate_ts_on_seconds_border
+from cephlib.node import NodeInfo
from .result_classes import IWallyStorage
from .suits.io.fio_hist import expected_lat_bins
@@ -40,18 +41,23 @@
def get_aggregated(rstorage: IWallyStorage, suite_id: str, job_id: str, metric: str,
trange: Tuple[int, int]) -> TimeSeries:
- "Sum selected metric for all nodes for given Suite/job"
+ "Sum selected fio metric for all nodes for given Suite/job"
+
+ key = (id(rstorage), suite_id, job_id, metric, trange)
+ aggregated_cache = rstorage.storage.other_caches['aggregated']
+ if key in aggregated_cache:
+ return aggregated_cache[key].copy()
tss = list(find_all_series(rstorage, suite_id, job_id, metric))
if len(tss) == 0:
raise NameError("Can't found any TS for {},{},{}".format(suite_id, job_id, metric))
- ds = DataSource(suite_id=suite_id, job_id=job_id, node_id=AGG_TAG, sensor='fio',
- dev=AGG_TAG, metric=metric, tag='csv')
+ c_intp = c_interpolate_ts_on_seconds_border
+ tss_inp = [c_intp(ts.select(trange), tp='fio', allow_broken_step=(metric == 'lat')) for ts in tss]
- tss_inp = [c_interpolate_ts_on_seconds_border(ts, tp='fio', allow_broken_step=(metric == 'lat')) for ts in tss]
res = None
+ res_times = None
for ts in tss_inp:
if ts.time_units != 's':
@@ -82,16 +88,23 @@
dt = ts.data[idx1: idx2]
if res is None:
- res = dt
+ res = dt.copy()
+ res_times = ts.times[idx1: idx2].copy()
else:
assert res.shape == dt.shape, "res.shape(={}) != dt.shape(={})".format(res.shape, dt.shape)
res += dt
+ ds = DataSource(suite_id=suite_id, job_id=job_id, node_id=AGG_TAG, sensor='fio',
+ dev=AGG_TAG, metric=metric, tag='csv')
agg_ts = TimeSeries(res, source=ds,
- times=tss_inp[0].times.copy(),
+ times=res_times,
units=tss_inp[0].units,
histo_bins=tss_inp[0].histo_bins,
time_units=tss_inp[0].time_units)
+ aggregated_cache[key] = agg_ts
+ return agg_ts.copy()
- return agg_ts
+
+def get_nodes(storage: IWallyStorage, roles: Iterable[str]) -> List[NodeInfo]:
+ return [node for node in storage.load_nodes() if node.roles.intersection(roles)]
diff --git a/wally/main.py b/wally/main.py
index 66e39db..497bdac 100644
--- a/wally/main.py
+++ b/wally/main.py
@@ -10,11 +10,9 @@
from typing import List, Tuple, Any, Callable, IO, cast, Optional, Iterator
from yaml import load as _yaml_load
-
YLoader = Callable[[IO], Any]
yaml_load = None # type: YLoader
-
try:
from yaml import CLoader
yaml_load = cast(YLoader, functools.partial(_yaml_load, Loader=CLoader))
@@ -31,6 +29,7 @@
from cephlib.common import setup_logging
from cephlib.storage import make_storage
+from cephlib.wally_storage import WallyDB
from cephlib.ssh import set_ssh_key_passwd
from cephlib.node import log_nodes_statistic
from cephlib.node_impl import get_rpc_server_code
@@ -69,29 +68,53 @@
raise
-def list_results(path: str) -> List[Tuple[str, str, str, str]]:
- results = [] # type: List[Tuple[float, str, str, str, str]]
-
+def list_results(path: str, limit: int = None) -> List[Tuple[str, str, str, str, str]]:
+ dirs = []
for dir_name in os.listdir(path):
full_path = os.path.join(path, dir_name)
+ dirs.append((os.stat(full_path).st_ctime, full_path))
+ dirs.sort()
+ results = [] # type: List[Tuple[str, str, str, str, str]]
+ for _, full_path in dirs[::-1]:
try:
stor = make_storage(full_path, existing=True)
except Exception as exc:
logger.warning("Can't load folder {}. Error {}".format(full_path, exc))
- comment = cast(str, stor.get('info/comment'))
- run_uuid = cast(str, stor.get('info/run_uuid'))
- run_time = cast(float, stor.get('info/run_time'))
- test_types = ""
- results.append((run_time,
- run_uuid,
- test_types,
- time.ctime(run_time),
- '-' if comment is None else comment))
+ try:
+ try:
+ cfg = stor.load(Config, WallyDB.config)
+ except KeyError:
+ cfg = stor.load(Config, "config")
+ except Exception as exc:
+ print("Fail to load {}. {}".format(os.path.basename(full_path), exc))
+ continue
- results.sort()
- return [i[1:] for i in results]
+ if WallyDB.run_interval in stor:
+ run_time = stor.get(WallyDB.run_interval)[0]
+ else:
+ run_time = os.stat(full_path).st_ctime
+
+ ftime = time.strftime("%d %b %H:%M", time.localtime(run_time))
+
+ test_types = []
+ for suite_cfg in cfg.get('tests', []):
+ for suite_name, params in suite_cfg.items():
+ if suite_name == 'fio':
+ test_types.append("{}.{}".format(suite_name, params['load']))
+ else:
+ test_types.append(suite_name)
+ results.append((cfg.run_uuid,
+ ",".join(test_types),
+ ftime,
+ '-' if cfg.comment is None else cfg.comment,
+ '-'))
+
+ if limit and len(results) >= limit:
+ break
+
+ return results
def log_nodes_statistic_stage(ctx: TestRun) -> None:
@@ -103,7 +126,8 @@
parser = argparse.ArgumentParser(prog='wally', description=descr)
parser.add_argument("-l", '--log-level', help="print some extra log info")
parser.add_argument("--ssh-key-passwd", default=None, help="Pass ssh key password")
- parser.add_argument("--ssh-key-passwd-kbd", action="store_true", help="Enter ssh key password interactivelly")
+ parser.add_argument("--ssh-key-passwd-kbd", action="store_true", help="Enter ssh key password interactively")
+ parser.add_argument("--profile", action="store_true", help="Profile execution")
parser.add_argument("-s", '--settings-dir', default=None,
help="Folder to store key/settings/history files")
@@ -111,6 +135,8 @@
# ---------------------------------------------------------------------
report_parser = subparsers.add_parser('ls', help='list all results')
+ report_parser.add_argument("-l", "--limit", metavar='LIMIT', help="Show only LIMIT last results",
+ default=None, type=int)
report_parser.add_argument("result_storage", help="Folder with test results")
# ---------------------------------------------------------------------
@@ -236,6 +262,13 @@
config = None # type: Config
storage = None # type: IStorage
+ if opts.profile:
+ import cProfile
+ pr = cProfile.Profile()
+ pr.enable()
+ else:
+ pr = None
+
if opts.subparser_name == 'test':
config = load_config(opts.config_file)
config.storage_url, config.run_uuid = utils.get_uniq_path_uuid(config.results_storage)
@@ -250,7 +283,7 @@
config.discover = set(name for name in config.get('discover', '').split(",") if name)
storage = make_storage(config.storage_url)
- storage.put(config, 'config')
+ storage.put(config, WallyDB.config)
stages.extend(get_run_stages())
stages.append(SaveNodesStage())
@@ -267,7 +300,7 @@
elif opts.subparser_name == 'resume':
opts.resumed = True
storage = make_storage(opts.storage_dir, existing=True)
- config = storage.load(Config, 'config')
+ config = storage.load(Config, WallyDB.config)
stages.extend(get_run_stages())
stages.append(LoadStoredNodesStage())
prev_opts = storage.get('cli') # type: List[str]
@@ -281,9 +314,10 @@
elif opts.subparser_name == 'ls':
tab = Texttable(max_width=200)
- tab.set_cols_align(["l", "l", "l", "l"])
- tab.header(["Name", "Tests", "Run at", "Comment"])
- tab.add_rows(list_results(opts.result_storage))
+ tab.set_cols_align(["l", "l", "l", "l", 'c'])
+ tab.set_deco(Texttable.VLINES | Texttable.BORDER | Texttable.HEADER)
+ tab.header(["Name", "Tests", "Started at", "Comment", "Result"])
+ tab.add_rows(list_results(opts.result_storage, opts.limit), header=False)
print(tab.draw())
return 0
@@ -292,7 +326,7 @@
print(" --no-report option can't be used with 'report' cmd")
return 1
storage = make_storage(opts.data_dir, existing=True)
- config = storage.load(Config, 'config')
+ config = storage.load(Config, WallyDB.config)
report_profiles.default_format = opts.format
report.default_format = opts.format
stages.append(LoadStoredNodesStage())
@@ -327,6 +361,8 @@
print("Subparser {!r} is not supported".format(opts.subparser_name))
return 1
+ start_time = int(time.time())
+
report_stages = [] # type: List[Stage]
if not getattr(opts, "no_report", False):
reporters = opts.reporters.split(",")
@@ -346,6 +382,9 @@
ctx = TestRun(config, storage, WallyStorage(storage))
ctx.rpc_code, ctx.default_rpc_plugins = get_rpc_server_code()
+ if 'dev_roles' in ctx.config:
+ ctx.devs_locator = ctx.config.dev_roles
+
if opts.ssh_key_passwd is not None:
set_ssh_key_passwd(opts.ssh_key_passwd)
elif opts.ssh_key_passwd_kbd:
@@ -396,16 +435,30 @@
ctx.storage.sync()
logger.info("All info is stored into %r", config.storage_url)
+ end_time = int(time.time())
+ storage.put([start_time, end_time], WallyDB.run_interval)
if failed or cleanup_failed:
if opts.subparser_name == 'report':
logger.error("Report generation failed. See error details in log above")
else:
logger.error("Tests are failed. See error details in log above")
- return 1
+ code = 1
else:
if opts.subparser_name == 'report':
logger.info("Report successfully generated")
else:
logger.info("Tests finished successfully")
- return 0
+ code = 0
+
+ if opts.profile:
+ assert pr is not None
+ pr.disable()
+ import pstats
+ pstats.Stats(pr).sort_stats('tottime').print_stats(30)
+
+ if opts.subparser_name == 'test':
+ storage.put(code, WallyDB.res_code)
+
+ storage.sync()
+ return code
diff --git a/wally/report.py b/wally/report.py
index 9615461..f42c7c9 100644
--- a/wally/report.py
+++ b/wally/report.py
@@ -1,23 +1,30 @@
import os
import abc
import logging
+import collections
from collections import defaultdict
-from typing import Dict, Any, Iterator, Tuple, cast, List, Set, Optional, Union, Type
+from typing import Dict, Any, Iterator, Tuple, cast, List, Set, Optional, Union, Type, Iterable
import numpy
+import scipy.stats
from statsmodels.tsa.stattools import adfuller
import xmlbuilder3
import wally
+# import matplotlib
+# matplotlib.use('GTKAgg')
+
from cephlib import html
from cephlib.units import b2ssize, b2ssize_10, unit_conversion_coef, unit_conversion_coef_f
from cephlib.statistic import calc_norm_stat_props
-from cephlib.storage_selectors import summ_sensors, find_sensors_to_2d
+from cephlib.storage_selectors import sum_sensors, find_sensors_to_2d, update_storage_selector, DevRoles
from cephlib.wally_storage import find_nodes_by_roles
from cephlib.plot import (plot_simple_bars, plot_hmap_from_2d, plot_lat_over_time, plot_simple_over_time,
- plot_histo_heatmap, plot_v_over_time, plot_hist)
+ plot_histo_heatmap, plot_v_over_time, plot_hist, plot_dots_with_regression)
+from cephlib.numeric_types import ndarray2d
+from cephlib.node import NodeRole
from .utils import STORAGE_ROLES
from .stage import Stage, StepOrder
@@ -31,7 +38,8 @@
from .report_profiles import (DefStyleProfile, DefColorProfile, StyleProfile, ColorProfile,
default_format, io_chart_format)
from .plot import io_chart
-from .resources import ResourceNames, get_resources_usage, make_iosum, IOSummary, get_cluster_cpu_load
+from .resources import ResourceNames, get_resources_usage, make_iosum, get_cluster_cpu_load
+from .console_report import get_console_report_table, console_report_headers, console_report_align, Texttable
logger = logging.getLogger("wally")
@@ -43,31 +51,6 @@
DEBUG = False
-# ---------------- STRUCTS -------------------------------------------------------------------------------------------
-
-
-# TODO: need to be revised, have to user StatProps fields instead
-class StoragePerfSummary:
- def __init__(self) -> None:
- self.direct_iops_r_max = 0 # type: int
- self.direct_iops_w_max = 0 # type: int
-
- # 64 used instead of 4k to faster feed caches
- self.direct_iops_w64_max = 0 # type: int
-
- self.rws4k_10ms = 0 # type: int
- self.rws4k_30ms = 0 # type: int
- self.rws4k_100ms = 0 # type: int
- self.bw_write_max = 0 # type: int
- self.bw_read_max = 0 # type: int
-
- self.bw = None # type: float
- self.iops = None # type: float
- self.lat = None # type: float
- self.lat_50 = None # type: float
- self.lat_95 = None # type: float
-
-
# -------------- AGGREGATION AND STAT FUNCTIONS ----------------------------------------------------------------------
LEVEL_SENSORS = {("block-io", "io_queue"), ("system-cpu", "procs_blocked"), ("system-cpu", "procs_queue")}
@@ -124,23 +107,30 @@
class Menu1st:
- engineering = "Engineering"
summary = "Summary"
per_job = "Per Job"
+ engineering = "Engineering"
+ engineering_per_job = "Engineering per job"
+ order = [summary, per_job, engineering, engineering_per_job]
class Menu2ndEng:
+ summary = "Summary"
iops_time = "IOPS(time)"
hist = "IOPS/lat overall histogram"
lat_time = "Lat(time)"
+ resource_regression = "Resource usage LR"
+ order = [summary, iops_time, hist, lat_time, resource_regression]
class Menu2ndSumm:
+ summary = "Summary"
io_lat_qd = "IO & Lat vs QD"
- cpu_usage_qd = "CPU usage"
+ resources_usage_qd = "Resource usage"
+ order = [summary, io_lat_qd, resources_usage_qd]
-menu_1st_order = [Menu1st.summary, Menu1st.engineering, Menu1st.per_job]
+menu_1st_order = [Menu1st.summary, Menu1st.engineering, Menu1st.per_job, Menu1st.engineering_per_job]
# -------------------- REPORTS --------------------------------------------------------------------------------------
@@ -176,9 +166,120 @@
# """Creates graphs, which show how IOPS and Latency depend on block size"""
#
#
-# # Main performance report
-# class PerformanceSummary(SuiteReporter):
-# """Aggregated summary fro storage"""
+
+
+class StoragePerfSummary:
+ iops_units = "KiBps"
+ bw_units = "Bps"
+ NO_VAL = -1
+
+ def __init__(self) -> None:
+ self.rw_iops_10ms = self.NO_VAL # type: int
+ self.rw_iops_30ms = self.NO_VAL # type: int
+ self.rw_iops_100ms = self.NO_VAL # type: int
+
+ self.rr_iops_10ms = self.NO_VAL # type: int
+ self.rr_iops_30ms = self.NO_VAL # type: int
+ self.rr_iops_100ms = self.NO_VAL # type: int
+
+ self.bw_write_max = self.NO_VAL # type: int
+ self.bw_read_max = self.NO_VAL # type: int
+
+ self.bw = None # type: Optional[float]
+ self.read_iops = None # type: Optional[float]
+ self.write_iops = None # type: Optional[float]
+
+
+def get_performance_summary(storage: IWallyStorage, suite: SuiteConfig,
+ hboxes: int, large_blocks: int) -> Tuple[StoragePerfSummary, StoragePerfSummary]:
+
+ psum95 = StoragePerfSummary()
+ psum50 = StoragePerfSummary()
+
+ for job in storage.iter_job(suite):
+ if isinstance(job, FioJobConfig):
+ fjob = cast(FioJobConfig, job)
+ io_sum = make_iosum(storage, suite, job, hboxes)
+
+ bw_avg = io_sum.bw.average * unit_conversion_coef(io_sum.bw.units, StoragePerfSummary.bw_units)
+
+ if fjob.bsize < large_blocks:
+ lat_95_ms = io_sum.lat.perc_95 * unit_conversion_coef(io_sum.lat.units, 'ms')
+ lat_50_ms = io_sum.lat.perc_50 * unit_conversion_coef(io_sum.lat.units, 'ms')
+
+ iops_avg = io_sum.bw.average * unit_conversion_coef(io_sum.bw.units, StoragePerfSummary.iops_units)
+ iops_avg /= fjob.bsize
+
+ if fjob.oper == 'randwrite' and fjob.sync_mode == 'd':
+ for lat, field in [(10, 'rw_iops_10ms'), (30, 'rw_iops_30ms'), (100, 'rw_iops_100ms')]:
+ if lat_95_ms <= lat:
+ setattr(psum95, field, max(getattr(psum95, field), iops_avg))
+ if lat_50_ms <= lat:
+ setattr(psum50, field, max(getattr(psum50, field), iops_avg))
+
+ if fjob.oper == 'randread' and fjob.sync_mode == 'd':
+ for lat, field in [(10, 'rr_iops_10ms'), (30, 'rr_iops_30ms'), (100, 'rr_iops_100ms')]:
+ if lat_95_ms <= lat:
+ setattr(psum95, field, max(getattr(psum95, field), iops_avg))
+ if lat_50_ms <= lat:
+ setattr(psum50, field, max(getattr(psum50, field), iops_avg))
+ elif fjob.sync_mode == 'd':
+ if fjob.oper in ('randwrite', 'write'):
+ psum50.bw_write_max = max(psum50.bw_write_max, bw_avg)
+ elif fjob.oper in ('randread', 'read'):
+ psum50.bw_read_max = max(psum50.bw_read_max, bw_avg)
+
+ return psum50, psum95
+
+
+# Main performance report
+class PerformanceSummary(SuiteReporter):
+ """Aggregated summary for storage"""
+ def get_divs(self, suite: SuiteConfig) -> Iterator[Tuple[str, str, HTMLBlock]]:
+ psum50, psum95 = get_performance_summary(self.rstorage, suite, self.style.hist_boxes, self.style.large_blocks)
+
+ caption = "Storage summary report"
+ res = html.H3(html.center(caption))
+
+ headers = ["Mode", "Stats", "Explanation"]
+ align = ['left', 'right', "left"]
+ data = []
+
+ if psum95.rr_iops_10ms != psum95.NO_VAL or psum95.rr_iops_30ms != psum95.NO_VAL or \
+ psum95.rr_iops_100ms != psum95.NO_VAL:
+ data.append("Average random read IOPS for small blocks")
+
+ if psum95.rr_iops_10ms != psum95.NO_VAL:
+ data.append(("Database", b2ssize_10(psum95.rr_iops_10ms), "Latency 95th percentile < 10ms"))
+ if psum95.rr_iops_30ms != psum95.NO_VAL:
+ data.append(("File system", b2ssize_10(psum95.rr_iops_30ms), "Latency 95th percentile < 30ms"))
+ if psum95.rr_iops_100ms != psum95.NO_VAL:
+ data.append(("File server", b2ssize_10(psum95.rr_iops_100ms), "Latency 95th percentile < 100ms"))
+
+ if psum95.rw_iops_10ms != psum95.NO_VAL or psum95.rw_iops_30ms != psum95.NO_VAL or \
+ psum95.rw_iops_100ms != psum95.NO_VAL:
+ data.append("Average random write IOPS for small blocks")
+
+ if psum95.rw_iops_10ms != psum95.NO_VAL:
+ data.append(("Database", b2ssize_10(psum95.rw_iops_10ms), "Latency 95th percentile < 10ms"))
+ if psum95.rw_iops_30ms != psum95.NO_VAL:
+ data.append(("File system", b2ssize_10(psum95.rw_iops_30ms), "Latency 95th percentile < 30ms"))
+ if psum95.rw_iops_100ms != psum95.NO_VAL:
+ data.append(("File server", b2ssize_10(psum95.rw_iops_100ms), "Latency 95th percentile < 100ms"))
+
+ if psum50.bw_write_max != psum50.NO_VAL or psum50.bw_read_max != psum50.NO_VAL:
+ data.append("Average sequention IO")
+
+ if psum50.bw_write_max != psum95.NO_VAL:
+ data.append(("Write", b2ssize(psum50.bw_write_max) + psum50.bw_units,
+ "Large blocks (>={}KiB)".format(self.style.large_blocks)))
+ if psum50.bw_read_max != psum95.NO_VAL:
+ data.append(("Read", b2ssize(psum50.bw_read_max) + psum50.bw_units,
+ "Large blocks (>={}KiB)".format(self.style.large_blocks)))
+
+ res += html.center(html.table("Performance", headers, data, align=align))
+ yield Menu1st.summary, Menu2ndSumm.summary, HTMLBlock(res)
+
# # Node load over test time
# class NodeLoad(SuiteReporter):
@@ -186,7 +287,6 @@
# # Ceph operation breakout report
# class CephClusterSummary(SuiteReporter):
-# """IOPS/latency during test"""
# Main performance report
@@ -204,15 +304,15 @@
str_summary[fjob_no_qd] = (fjob_no_qd.summary, fjob_no_qd.long_summary)
ts_map[fjob_no_qd].append((suite, fjob))
+ caption = "IOPS, bandwith, and latency as function of parallel IO request count (QD)"
+ yield Menu1st.summary, Menu2ndSumm.io_lat_qd, HTMLBlock(html.H3(html.center(caption)))
+
for tpl, suites_jobs in ts_map.items():
if len(suites_jobs) >= self.style.min_iops_vs_qd_jobs:
iosums = [make_iosum(self.rstorage, suite, job, self.style.hist_boxes) for suite, job in suites_jobs]
iosums.sort(key=lambda x: x.qd)
summary, summary_long = str_summary[tpl]
- yield Menu1st.summary, Menu2ndSumm.io_lat_qd, \
- HTMLBlock(html.H2(html.center("IOPS, BW, Lat = func(QD). " + summary_long)))
-
ds = DataSource(suite_id=suite.storage_id,
job_id=summary,
node_id=AGG_TAG,
@@ -221,8 +321,8 @@
metric="io_over_qd",
tag=io_chart_format)
- fpath = self.plt(io_chart, ds, title="", legend="IOPS/BW", iosums=iosums)
- yield Menu1st.summary, Menu2ndSumm.io_lat_qd, HTMLBlock(html.center(html.img(fpath)))
+ fpath = self.plt(io_chart, ds, title=summary_long, legend="IOPS/BW", iosums=iosums)
+ yield Menu1st.summary, Menu2ndSumm.io_lat_qd, HTMLBlock(html.img(fpath))
class ResourceQD(SuiteReporter):
@@ -239,6 +339,8 @@
fjob_no_qd = cast(FioJobParams, fjob.params.copy(qd=None))
qd_grouped_jobs.setdefault(fjob_no_qd, []).append(fjob)
+ yield Menu1st.summary, Menu2ndSumm.resources_usage_qd, HTMLBlock(html.center(html.H3("Resource usage summary")))
+
for jc_no_qd, jobs in sorted(qd_grouped_jobs.items()):
cpu_usage2qd = {}
for job in jobs:
@@ -266,12 +368,94 @@
metric="cpu_for_iop",
tag=io_chart_format)
- fpath = self.plt(plot_simple_bars, ds, jc_no_qd.long_summary, labels, vals, errs,
- xlabel="CPU core time per IOP", ylabel="QD * Test nodes" if test_nc != 1 else "QD",
+ title = "CPU time per IOP, " + jc_no_qd.long_summary
+ fpath = self.plt(plot_simple_bars, ds, title, labels, vals, errs,
+ xlabel="CPU core time per IOP",
+ ylabel="QD * Test nodes" if test_nc != 1 else "QD",
x_formatter=(lambda x, pos: b2ssize_10(x) + 's'),
one_point_zero_line=False)
- yield Menu1st.summary, Menu2ndSumm.cpu_usage_qd, HTMLBlock(html.center(html.img(fpath)))
+ yield Menu1st.summary, Menu2ndSumm.resources_usage_qd, HTMLBlock(html.img(fpath))
+
+
+def get_resources_usage2(suite: SuiteConfig, job: JobConfig, rstorage: IWallyStorage,
+ roles, sensor, metric, test_metric, agg_window: int = 5) -> ndarray2d:
+ assert test_metric == 'iops'
+ fjob = cast(FioJobConfig, job)
+ bw = get_aggregated(rstorage, suite.storage_id, job.storage_id, "bw", job.reliable_info_range_s)
+ io_transfered = bw.data * unit_conversion_coef_f(bw.units, "Bps")
+ ops_done = io_transfered / (fjob.bsize * unit_conversion_coef_f("KiBps", "Bps"))
+ nodes = [node for node in rstorage.load_nodes() if node.roles.intersection(STORAGE_ROLES)]
+
+ if sensor == 'system-cpu':
+ assert metric == 'used'
+ core_count = None
+ for node in nodes:
+ if core_count is None:
+ core_count = sum(cores for _, cores in node.hw_info.cpus)
+ else:
+ assert core_count == sum(cores for _, cores in node.hw_info.cpus)
+ cpu_ts = get_cluster_cpu_load(rstorage, roles, job.reliable_info_range_s)
+ metric_data = (1.0 - (cpu_ts['idle'].data + cpu_ts['iowait'].data) / cpu_ts['total'].data) * core_count
+ else:
+ metric_data = sum_sensors(rstorage, job.reliable_info_range_s,
+ node_id=[node.node_id for node in nodes], sensor=sensor, metric=metric)
+
+ res = []
+ for pos in range(0, len(ops_done) - agg_window, agg_window):
+ pe = pos + agg_window
+ res.append((numpy.average(ops_done[pos: pe]), numpy.average(metric_data.data[pos: pe])))
+
+ return res
+
+
+class ResourceConsumptionSummary(SuiteReporter):
+ suite_types = {'fio'}
+
+ def get_divs(self, suite: SuiteConfig) -> Iterator[Tuple[str, str, HTMLBlock]]:
+ vs = 'iops'
+ for job_tp in ('rwd4', 'rrd4'):
+ for sensor_metric in ('net-io.send_packets', 'system-cpu.used'):
+ sensor, metric = sensor_metric.split(".")
+ usage = []
+ for job in self.rstorage.iter_job(suite):
+ if job_tp in job.summary:
+ usage.extend(get_resources_usage2(suite, job, self.rstorage, STORAGE_ROLES,
+ sensor=sensor, metric=metric, test_metric=vs))
+
+ if not usage:
+ continue
+
+ iops, cpu = zip(*usage)
+ slope, intercept, r_value, p_value, std_err = scipy.stats.linregress(iops, cpu)
+ x = numpy.array([0.0, max(iops) * 1.1])
+
+ ds = DataSource(suite_id=suite.storage_id,
+ job_id=job_tp,
+ node_id="storage",
+ sensor='usage-regression',
+ dev=AGG_TAG,
+ metric=sensor_metric + '.VS.' + vs,
+ tag=default_format)
+
+ fname = self.plt(plot_dots_with_regression, ds,
+ "{}::{}.{}".format(job_tp, sensor_metric, vs),
+ x=iops, y=cpu,
+ xlabel=vs,
+ ylabel=sensor_metric,
+ x_approx=x, y_approx=intercept + slope * x)
+
+ yield Menu1st.engineering, Menu2ndEng.resource_regression, HTMLBlock(html.img(fname))
+
+
+class EngineeringSummary(SuiteReporter):
+ suite_types = {'fio'}
+
+ def get_divs(self, suite: SuiteConfig) -> Iterator[Tuple[str, str, HTMLBlock]]:
+ tbl = [line for line in get_console_report_table(suite, self.rstorage) if line is not Texttable.HLINE]
+ align = [{'l': 'left', 'r': 'right'}[al] for al in console_report_align]
+ res = html.center(html.table("Test results", console_report_headers, tbl, align=align))
+ yield Menu1st.engineering, Menu2ndEng.summary, HTMLBlock(res)
class StatInfo(JobReporter):
@@ -288,7 +472,7 @@
if test_nc > 1:
caption += " * {} nodes".format(test_nc)
- res = html.H2(html.center(caption))
+ res = html.H3(html.center(caption))
stat_data_headers = ["Name",
"Total done",
"Average ~ Dev",
@@ -360,7 +544,7 @@
# sensor usage
stat_data.extend([iops_data, lat_data])
- res += html.center(html.table("Load stats info", stat_data_headers, stat_data, align=align))
+ res += html.center(html.table("Test results", stat_data_headers, stat_data, align=align))
yield Menu1st.per_job, job.summary, HTMLBlock(res)
@@ -404,7 +588,7 @@
table_structure2.append((line[1],))
table_structure = table_structure2
- yield Menu1st.per_job, job.summary, HTMLBlock(html.H2(html.center("Resources usage")))
+ yield Menu1st.per_job, job.summary, HTMLBlock(html.H3(html.center("Resources usage")))
doc = xmlbuilder3.XMLBuilder("table",
**{"class": "table table-bordered table-striped table-condensed table-hover",
@@ -498,7 +682,7 @@
pairs.append(('Net packets per IOP', net_pkt_names))
yield Menu1st.per_job, job.summary, \
- HTMLBlock(html.H2(html.center("Resource consumption per service provided")))
+ HTMLBlock(html.H3(html.center("Resource consumption per service provided")))
for tp, names in pairs:
vals = [] # type: List[float]
@@ -548,7 +732,7 @@
for node_id in nodes:
bn = 0
tot = 0
- for _, ds in self.rstorage.iter_sensors(node_id=node_id, sensor=sensor, metric=metric):
+ for ds in self.rstorage.iter_sensors(node_id=node_id, sensor=sensor, metric=metric):
if ds.dev in ('sdb', 'sdc', 'sdd', 'sde'):
ts = self.rstorage.get_sensor(ds, job.reliable_info_range_s)
bn += (ts.data > bn_val).sum()
@@ -580,70 +764,116 @@
yield Menu1st.per_job, job.summary, HTMLBlock(html.img(fname))
+class DevRoles:
+ client_disk = 'client_disk'
+ client_net = 'client_net'
+ client_cpu = 'client_cpu'
+
+ storage_disk = 'storage_disk'
+ storage_client_net = 'storage_client_net'
+ storage_replication_net = 'storage_replication_net'
+ storage_cpu = 'storage_disk'
+ ceph_storage = 'ceph_storage'
+ ceph_journal = 'ceph_journal'
+
+ compute_disk = 'compute_disk'
+ compute_net = 'compute_net'
+ compute_cpu = 'compute_cpu'
+
+
+def roles_for_sensors(storage: IWallyStorage) -> Dict[str, List[DataSource]]:
+ role2ds = defaultdict(list)
+
+ for node in storage.load_nodes():
+ ds = DataSource(node_id=node.node_id)
+ if 'ceph-osd' in node.roles:
+ for jdev in node.params.get('ceph_journal_devs', []):
+ role2ds[DevRoles.ceph_journal].append(ds(dev=jdev))
+ role2ds[DevRoles.storage_disk].append(ds(dev=jdev))
+
+ for sdev in node.params.get('ceph_storage_devs', []):
+ role2ds[DevRoles.ceph_storage].append(ds(dev=sdev))
+ role2ds[DevRoles.storage_disk].append(ds(dev=sdev))
+
+ if node.hw_info:
+ for dev in node.hw_info.disks_info:
+ role2ds[DevRoles.storage_disk].append(ds(dev=dev))
+
+ if 'testnode' in node.roles:
+ role2ds[DevRoles.client_disk].append(ds(dev='rbd0'))
+
+ return role2ds
+
+
+def get_sources_for_roles(roles: Iterable[str]) -> List[DataSource]:
+ return []
+
+
# IO time and QD
class QDIOTimeHeatmap(JobReporter):
def get_divs(self, suite: SuiteConfig, job: JobConfig) -> Iterator[Tuple[str, str, HTMLBlock]]:
- # TODO: fix this hardcode, need to track what devices are actually used on test and storage nodes
- # use saved storage info in nodes
-
- journal_devs = None
- storage_devs = None
- test_nodes_devs = ['rbd0']
-
- for node in self.rstorage.load_nodes():
- if node.roles.intersection(STORAGE_ROLES):
- cjd = set(node.params['ceph_journal_devs'])
- if journal_devs is None:
- journal_devs = cjd
- else:
- assert journal_devs == cjd, "{!r} != {!r}".format(journal_devs, cjd)
-
- csd = set(node.params['ceph_storage_devs'])
- if storage_devs is None:
- storage_devs = csd
- else:
- assert storage_devs == csd, "{!r} != {!r}".format(storage_devs, csd)
+ # journal_devs = None
+ # storage_devs = None
+ # test_nodes_devs = ['rbd0']
+ #
+ # for node in self.rstorage.load_nodes():
+ # if node.roles.intersection(STORAGE_ROLES):
+ # cjd = set(node.params['ceph_journal_devs'])
+ # if journal_devs is None:
+ # journal_devs = cjd
+ # else:
+ # assert journal_devs == cjd, "{!r} != {!r}".format(journal_devs, cjd)
+ #
+ # csd = set(node.params['ceph_storage_devs'])
+ # if storage_devs is None:
+ # storage_devs = csd
+ # else:
+ # assert storage_devs == csd, "{!r} != {!r}".format(storage_devs, csd)
+ #
trange = (job.reliable_info_range[0] // 1000, job.reliable_info_range[1] // 1000)
+ test_nc = len(list(find_nodes_by_roles(self.rstorage.storage, ['testnode'])))
- for name, devs, roles in [('storage', storage_devs, STORAGE_ROLES),
- ('journal', journal_devs, STORAGE_ROLES),
- ('test', test_nodes_devs, ['testnode'])]:
+ for dev_role in (DevRoles.ceph_storage, DevRoles.ceph_journal, DevRoles.client_disk):
- yield Menu1st.per_job, job.summary, \
- HTMLBlock(html.H2(html.center("{} IO heatmaps".format(name.capitalize()))))
+ caption = "{} IO heatmaps - {}".format(dev_role.capitalize(), cast(FioJobParams, job).params.long_summary)
+ if test_nc != 1:
+ caption += " * {} nodes".format(test_nc)
+
+ yield Menu1st.engineering_per_job, job.summary, HTMLBlock(html.H3(html.center(caption)))
# QD heatmap
- nodes = find_nodes_by_roles(self.rstorage.storage, roles)
- ioq2d = find_sensors_to_2d(self.rstorage, trange, sensor='block-io', dev=devs,
- node_id=nodes, metric='io_queue')
+ # nodes = find_nodes_by_roles(self.rstorage.storage, roles)
- ds = DataSource(suite.storage_id, job.storage_id, AGG_TAG, 'block-io', name, tag="hmap." + default_format)
+ ioq2d = find_sensors_to_2d(self.rstorage, trange, dev_role=dev_role, sensor='block-io', metric='io_queue')
+
+ ds = DataSource(suite.storage_id, job.storage_id, AGG_TAG, 'block-io', dev_role,
+ tag="hmap." + default_format)
fname = self.plt(plot_hmap_from_2d, ds(metric='io_queue'), data2d=ioq2d, xlabel='Time', ylabel="IO QD",
- title=name.capitalize() + " devs QD", bins=StyleProfile.qd_bins)
- yield Menu1st.per_job, job.summary, HTMLBlock(html.img(fname))
+ title=dev_role.capitalize() + " devs QD", bins=StyleProfile.qd_bins)
+ yield Menu1st.engineering_per_job, job.summary, HTMLBlock(html.img(fname))
# Block size heatmap
- wc2d = find_sensors_to_2d(self.rstorage, trange, node_id=nodes, sensor='block-io', dev=devs,
+ wc2d = find_sensors_to_2d(self.rstorage, trange, dev_role=dev_role, sensor='block-io',
metric='writes_completed')
wc2d[wc2d < 1E-3] = 1
- sw2d = find_sensors_to_2d(self.rstorage, trange, node_id=nodes, sensor='block-io', dev=devs,
+ sw2d = find_sensors_to_2d(self.rstorage, trange, dev_role=dev_role, sensor='block-io',
metric='sectors_written')
data2d = sw2d / wc2d / 1024
fname = self.plt(plot_hmap_from_2d, ds(metric='wr_block_size'),
- data2d=data2d, title=name.capitalize() + " write block size",
+ data2d=data2d, title=dev_role.capitalize() + " write block size",
ylabel="IO bsize, KiB", xlabel='Time', bins=StyleProfile.block_size_bins)
- yield Menu1st.per_job, job.summary, HTMLBlock(html.img(fname))
+ yield Menu1st.engineering_per_job, job.summary, HTMLBlock(html.img(fname))
# iotime heatmap
- wtime2d = find_sensors_to_2d(self.rstorage, trange, node_id=nodes, sensor='block-io', dev=devs,
+ wtime2d = find_sensors_to_2d(self.rstorage, trange, dev_role=dev_role, sensor='block-io',
metric='io_time')
fname = self.plt(plot_hmap_from_2d, ds(metric='io_time'), data2d=wtime2d,
xlabel='Time', ylabel="IO time (ms) per second",
- title=name.capitalize() + " iotime", bins=StyleProfile.iotime_bins)
- yield Menu1st.per_job, job.summary, HTMLBlock(html.img(fname))
+ title=dev_role.capitalize() + " iotime", bins=StyleProfile.iotime_bins)
+ yield Menu1st.engineering_per_job, job.summary, HTMLBlock(html.img(fname))
# IOPS/latency over test time for each job
@@ -652,14 +882,16 @@
suite_types = {'fio'}
def get_divs(self, suite: SuiteConfig, job: JobConfig) -> Iterator[Tuple[str, str, HTMLBlock]]:
-
fjob = cast(FioJobConfig, job)
- yield Menu1st.per_job, job.summary, HTMLBlock(html.H2(html.center("Load tool results")))
+ # caption = "Load tool results, " + job.params.long_summary
+ caption = "Load tool results"
+ yield Menu1st.per_job, job.summary, HTMLBlock(html.H3(html.center(caption)))
agg_io = get_aggregated(self.rstorage, suite.storage_id, fjob.storage_id, "bw", job.reliable_info_range_s)
+
if fjob.bsize >= DefStyleProfile.large_blocks:
- title = "Fio measured Bandwidth over time"
+ title = "Fio measured bandwidth over time"
units = "MiBps"
agg_io.data //= int(unit_conversion_coef_f(units, agg_io.units))
else:
@@ -670,6 +902,11 @@
fpath = self.plt(plot_v_over_time, agg_io.source(tag='ts.' + default_format), title, units, agg_io)
yield Menu1st.per_job, fjob.summary, HTMLBlock(html.img(fpath))
+ title = "BW distribution" if fjob.bsize >= DefStyleProfile.large_blocks else "IOPS distribution"
+ io_stat_prop = calc_norm_stat_props(agg_io, bins_count=StyleProfile.hist_boxes)
+ fpath = self.plt(plot_hist, agg_io.source(tag='hist.' + default_format), title, units, io_stat_prop)
+ yield Menu1st.per_job, fjob.summary, HTMLBlock(html.img(fpath))
+
if fjob.bsize < DefStyleProfile.large_blocks:
agg_lat = get_aggregated(self.rstorage, suite.storage_id, fjob.storage_id, "lat",
job.reliable_info_range_s)
@@ -687,23 +924,6 @@
yield Menu1st.per_job, fjob.summary, HTMLBlock(html.img(fpath))
- fjob = cast(FioJobConfig, job)
-
- agg_io = get_aggregated(self.rstorage, suite.storage_id, fjob.storage_id, "bw", job.reliable_info_range_s)
-
- if fjob.bsize >= DefStyleProfile.large_blocks:
- title = "BW distribution"
- units = "MiBps"
- agg_io.data //= int(unit_conversion_coef_f(units, agg_io.units))
- else:
- title = "IOPS distribution"
- agg_io.data //= (int(unit_conversion_coef_f("KiBps", agg_io.units)) * fjob.bsize)
- units = "IOPS"
-
- io_stat_prop = calc_norm_stat_props(agg_io, bins_count=StyleProfile.hist_boxes)
- fpath = self.plt(plot_hist, agg_io.source(tag='hist.' + default_format), title, units, io_stat_prop)
- yield Menu1st.per_job, fjob.summary, HTMLBlock(html.img(fpath))
-
# Cluster load over test time
class ClusterLoad(JobReporter):
@@ -719,15 +939,14 @@
def get_divs(self, suite: SuiteConfig, job: JobConfig) -> Iterator[Tuple[str, str, HTMLBlock]]:
- yield Menu1st.per_job, job.summary, HTMLBlock(html.H2(html.center("Cluster load")))
+ yield Menu1st.per_job, job.summary, HTMLBlock(html.H3(html.center("Cluster load")))
sensors = []
max_iop = 0
max_bytes = 0
stor_nodes = find_nodes_by_roles(self.rstorage.storage, STORAGE_ROLES)
for sensor, metric, op, units in self.storage_sensors:
- ts = summ_sensors(self.rstorage, job.reliable_info_range_s, node_id=stor_nodes, sensor=sensor,
- metric=metric)
+ ts = sum_sensors(self.rstorage, job.reliable_info_range_s, node_id=stor_nodes, sensor=sensor, metric=metric)
if ts is not None:
ds = DataSource(suite_id=suite.storage_id,
job_id=job.storage_id,
@@ -758,22 +977,60 @@
fpath = self.plt(plot_v_over_time, ds, title, units, ts=ts)
yield Menu1st.per_job, job.summary, HTMLBlock(html.img(fpath))
else:
- logger.info("Hide '%s' plot for %s, as it's cum load is less then %s%%",
+ logger.info("Hide '%s' plot for %s, as it's load is less then %s%% from maximum",
title, job.summary, int(DefStyleProfile.min_load_diff * 100))
# ------------------------------------------ REPORT STAGES -----------------------------------------------------------
+def add_devroles(ctx: TestRun):
+ # TODO: need to detect all devices for node on this stage using hw info
+ detected_selectors = collections.defaultdict(
+ lambda: collections.defaultdict(list)) # type: Dict[str, Dict[str, List[str]]]
+
+ for node in ctx.nodes:
+ if NodeRole.osd in node.info.roles:
+ all_devs = set()
+
+ jdevs = node.info.params.get('ceph_journal_devs')
+ if jdevs:
+ all_devs.update(jdevs)
+ detected_selectors[node.info.hostname]["|".join(jdevs)].append(DevRoles.osd_journal)
+
+ sdevs = node.info.params.get('ceph_storage_devs')
+ if sdevs:
+ all_devs.update(sdevs)
+ detected_selectors[node.info.hostname]["|".join(sdevs)].append(DevRoles.osd_storage)
+
+ if all_devs:
+ detected_selectors[node.info.hostname]["|".join(all_devs)].append(DevRoles.storage_block)
+
+ for hostname, dev_rules in detected_selectors.items():
+ dev_locs = [] # type: List[Dict[str, List[str]]]
+ ctx.devs_locator.append({hostname: dev_locs})
+ for dev_names, roles in dev_rules.items():
+ dev_locs.append({dev_names: roles})
+
+
class HtmlReportStage(Stage):
priority = StepOrder.REPORT
def run(self, ctx: TestRun) -> None:
- job_reporters_cls = [StatInfo, Resources, LoadToolResults, ClusterLoad, CPULoadPlot, QDIOTimeHeatmap]
+ nodes = ctx.rstorage.load_nodes()
+ update_storage_selector(ctx.rstorage, ctx.devs_locator, nodes)
+
+ job_reporters_cls = [StatInfo, LoadToolResults, Resources, ClusterLoad, CPULoadPlot, QDIOTimeHeatmap]
+ # job_reporters_cls = [QDIOTimeHeatmap]
job_reporters = [rcls(ctx.rstorage, DefStyleProfile, DefColorProfile)
for rcls in job_reporters_cls] # type: ignore
- suite_reporters_cls = [IOQD, ResourceQD] # type: List[Type[SuiteReporter]]
+ suite_reporters_cls = [IOQD,
+ ResourceQD,
+ PerformanceSummary,
+ EngineeringSummary,
+ ResourceConsumptionSummary] # type: List[Type[SuiteReporter]]
+ # suite_reporters_cls = [] # type: List[Type[SuiteReporter]]
suite_reporters = [rcls(ctx.rstorage, DefStyleProfile, DefColorProfile)
for rcls in suite_reporters_cls] # type: ignore
@@ -823,7 +1080,7 @@
for block, item, html in sreporter.get_divs(suite):
items[block][item].append(html)
except Exception:
- logger.exception("Failed to generate report for suite %s", suite)
+ logger.exception("Failed to generate report for suite %s", suite.storage_id)
if DEBUG:
break
@@ -837,10 +1094,16 @@
)
menu_block.append('<div class="collapse" id="item{}">'.format(idx_1st))
- if menu_1st == Menu1st.per_job:
- in_order = sorted(items[menu_1st], key=job_summ_sort_order.index)
+ if menu_1st in (Menu1st.per_job, Menu1st.engineering_per_job):
+ key = job_summ_sort_order.index
+ elif menu_1st == Menu1st.engineering:
+ key = Menu2ndEng.order.index
+ elif menu_1st == Menu1st.summary:
+ key = Menu2ndSumm.order.index
else:
- in_order = sorted(items[menu_1st])
+ key = lambda x: x
+
+ in_order = sorted(items[menu_1st], key=key)
for menu_2nd in in_order:
menu_block.append(' <a href="#content{}" class="nav-group-item">{}</a>'
diff --git a/wally/report_profiles.py b/wally/report_profiles.py
index 920c320..e83907e 100644
--- a/wally/report_profiles.py
+++ b/wally/report_profiles.py
@@ -9,6 +9,7 @@
suppl_color3 = 'orange'
box_color = 'y'
err_color = 'red'
+ super_outlier_color = 'orange'
noise_alpha = 0.3
subinfo_alpha = 0.7
@@ -43,6 +44,9 @@
point_shape = 'o'
err_point_shape = '*'
+ max_hidden_outliers_fraction = 0.05
+ super_outlier_point_shape_up = '^'
+ super_outlier_point_shape_down = 'v'
avg_range = 20
approx_average = True
@@ -71,9 +75,9 @@
# heatmap_interpolation_points = 300
heatmap_colorbar = False
- outliers_q_nd = 3.0
- outliers_hide_q_nd = 4.0
- outliers_lat = (0.01, 0.9)
+ outliers_q_nd = 3
+ outliers_hide_q_nd = 4
+ outliers_lat = (0.01, 0.95)
violin_instead_of_box = True
violin_point_count = 30000
diff --git a/wally/resources.py b/wally/resources.py
index dca9dcc..a08993d 100644
--- a/wally/resources.py
+++ b/wally/resources.py
@@ -7,9 +7,8 @@
from cephlib.units import b2ssize_10, b2ssize, unit_conversion_coef_f
from cephlib.statistic import NormStatProps, HistoStatProps, calc_norm_stat_props, calc_histo_stat_props
from cephlib.numeric_types import TimeSeries
-from cephlib.wally_storage import find_nodes_by_roles
-from cephlib.storage_selectors import summ_sensors
-from cephlib.node import HWInfo
+from cephlib.wally_storage import find_nodes_by_roles, WallyDB
+from cephlib.storage_selectors import sum_sensors
from .result_classes import IWallyStorage, SuiteConfig
from .utils import STORAGE_ROLES
@@ -120,8 +119,13 @@
cpu_ts = {}
cpu_metrics = "idle guest iowait sirq nice irq steal sys user".split()
nodes = find_nodes_by_roles(rstorage.storage, roles)
+
+ cores_per_node = {}
+ for node in rstorage.load_nodes():
+ cores_per_node[node.node_id] = sum(cores for _, cores in node.hw_info.cpus)
+
for name in cpu_metrics:
- cpu_ts[name] = summ_sensors(rstorage, time_range, node_id=nodes, sensor='system-cpu', metric=name)
+ cpu_ts[name] = sum_sensors(rstorage, time_range, node_id=nodes, sensor='system-cpu', metric=name)
it = iter(cpu_ts.values())
total_over_time = next(it).data.copy() # type: numpy.ndarray
@@ -148,7 +152,7 @@
records = {} # type: Dict[str, Tuple[str, float, float]]
if not nc:
- records = rstorage.get_job_info(suite, job, "resource_usage")
+ records = rstorage.get_job_info(suite, job, WallyDB.resource_usage_rel)
if records is not None:
records = records.copy()
iops_ok = records.pop('iops_ok')
@@ -201,7 +205,7 @@
continue
nodes = find_nodes_by_roles(rstorage.storage, roles)
- res_ts = summ_sensors(rstorage, job.reliable_info_range_s, node_id=nodes, sensor=sensor, metric=metric)
+ res_ts = sum_sensors(rstorage, job.reliable_info_range_s, node_id=nodes, sensor=sensor, metric=metric)
if res_ts is None:
continue
@@ -216,17 +220,15 @@
all_agg[vname] = data
# cpu usage
- stor_cores_count = 0
- all_stor_nodes = list(find_nodes_by_roles(rstorage.storage, STORAGE_ROLES))
- for node in all_stor_nodes:
- try:
- node_hw_info = rstorage.storage.load(HWInfo, 'hw_info', node)
- except KeyError:
- logger.warning("No hw_info available for node %s. Using 'NODE time' instead of " +
- "CPU core time for CPU consumption metrics", node)
- stor_cores_count = len(all_stor_nodes)
- break
- stor_cores_count += sum(cores for _, cores in node_hw_info.cores)
+ stor_cores_count = None
+ for node in rstorage.load_nodes():
+ if node.roles.intersection(STORAGE_ROLES):
+ if stor_cores_count is None:
+ stor_cores_count = sum(cores for _, cores in node.hw_info.cpus)
+ else:
+ assert stor_cores_count == sum(cores for _, cores in node.hw_info.cpus)
+
+ assert stor_cores_count != 0
cpu_ts = get_cluster_cpu_load(rstorage, STORAGE_ROLES, job.reliable_info_range_s)
cpus_used_sec = (1.0 - (cpu_ts['idle'].data + cpu_ts['iowait'].data) / cpu_ts['total'].data) * stor_cores_count
@@ -275,6 +277,6 @@
srecords = records.copy()
srecords['iops_ok'] = iops_ok
- rstorage.put_job_info(suite, job, "resource_usage", srecords)
+ rstorage.put_job_info(suite, job, WallyDB.resource_usage_rel, srecords)
return records, iops_ok
diff --git a/wally/result_classes.py b/wally/result_classes.py
index d1fd104..9d59d42 100644
--- a/wally/result_classes.py
+++ b/wally/result_classes.py
@@ -76,6 +76,10 @@
class IWallyStorage(ISensorStorage, IImagesStorage, metaclass=abc.ABCMeta):
@abc.abstractmethod
+ def flush(self) -> None:
+ pass
+
+ @abc.abstractmethod
def put_or_check_suite(self, suite: SuiteConfig) -> None:
pass
diff --git a/wally/result_storage.py b/wally/result_storage.py
index 70c46d7..de2f86d 100644
--- a/wally/result_storage.py
+++ b/wally/result_storage.py
@@ -7,7 +7,7 @@
import numpy
from cephlib.wally_storage import WallyDB
-from cephlib.sensor_storage import SensorStorageBase
+from cephlib.sensor_storage import SensorStorage
from cephlib.statistic import StatProps
from cephlib.numeric_types import DataSource, TimeSeries
from cephlib.node import NodeInfo
@@ -29,10 +29,12 @@
return path
-class WallyStorage(IWallyStorage, SensorStorageBase):
+class WallyStorage(IWallyStorage, SensorStorage):
def __init__(self, storage: Storage) -> None:
- SensorStorageBase.__init__(self, storage, WallyDB)
- self.cached_nodes = None
+ SensorStorage.__init__(self, storage, WallyDB)
+
+ def flush(self) -> None:
+ self.storage.flush()
# ------------- CHECK DATA IN STORAGE ----------------------------------------------------------------------------
def check_plot_file(self, source: DataSource) -> Optional[str]:
@@ -76,7 +78,7 @@
self.storage.put_raw(report.encode('utf8'), path)
def put_job_info(self, suite: SuiteConfig, job: JobConfig, key: str, data: Any) -> None:
- path = self.db_paths.job_extra.format(suite_id=suite.storage_id, job_id=job.storage_id, tag=key)
+ path = self.db_paths.job_extra.format(suite_id=suite.storage_id, job_id=job.storage_id, name=key)
if isinstance(data, bytes):
self.storage.put_raw(data, path)
else:
@@ -94,7 +96,7 @@
return None
def get_job_info(self, suite: SuiteConfig, job: JobConfig, key: str) -> Any:
- path = self.db_paths.job_extra.format(suite_id=suite.storage_id, job_id=job.storage_id, tag=key)
+ path = self.db_paths.job_extra.format(suite_id=suite.storage_id, job_id=job.storage_id, name=key)
return self.storage.get(path, None)
# ------------- ITER OVER STORAGE ------------------------------------------------------------------------------
@@ -102,7 +104,6 @@
for is_file, suite_info_path, groups in self.iter_paths(self.db_paths.suite_cfg_r):
assert is_file
suite = self.storage.load(SuiteConfig, suite_info_path)
- # suite = cast(SuiteConfig, self.storage.load(SuiteConfig, suite_info_path))
assert suite.storage_id == groups['suite_id']
if not suite_type or suite.test_type == suite_type:
yield suite
@@ -117,13 +118,16 @@
yield job
def load_nodes(self) -> List[NodeInfo]:
- if not self.cached_nodes:
- self.cached_nodes = self.storage.load_list(NodeInfo, WallyDB.all_nodes)
+ try:
+ return self.storage.other_caches['wally']['nodes']
+ except KeyError:
+ nodes = self.storage.load_list(NodeInfo, WallyDB.all_nodes)
if WallyDB.nodes_params in self.storage:
params = json.loads(self.storage.get_raw(WallyDB.nodes_params).decode('utf8'))
- for node in self.cached_nodes:
+ for node in nodes:
node.params = params.get(node.node_id, {})
- return self.cached_nodes
+ self.storage.other_caches['wally']['nodes'] = nodes
+ return nodes
# ----------------- TS ------------------------------------------------------------------------------------------
def get_ts(self, ds: DataSource) -> TimeSeries:
@@ -160,5 +164,5 @@
result = numpy.concatenate((tv, dv), axis=1)
self.storage.put_array(csv_path, result, header, header2=ts.histo_bins, append_on_exists=False)
- def iter_ts(self, **ds_parts) -> Iterator[DataSource]:
- return self.iter_objs(self.db_paths.ts_r, **ds_parts)
\ No newline at end of file
+ def iter_ts(self, **ds_parts: str) -> Iterator[DataSource]:
+ return self.iter_objs(self.db_paths.ts_r, **ds_parts)
diff --git a/wally/run_test.py b/wally/run_test.py
index 6fbefc4..578a65b 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -2,11 +2,10 @@
import json
import copy
import logging
-from concurrent.futures import Future
-from typing import List, Dict, Tuple, Optional, Union, cast
+from typing import List, Tuple, Optional, Union, cast
from cephlib.wally_storage import WallyDB
-from cephlib.node import NodeInfo, IRPCNode, get_hw_info, get_sw_info
+from cephlib.node import NodeInfo, IRPCNode, get_hw_info, get_sw_info, get_hostname
from cephlib.ssh import parse_ssh_uri
from cephlib.node_impl import setup_rpc, connect
@@ -80,7 +79,7 @@
delta = 0
if val > t_end:
delta = val - t_end
- elif t_start > val:
+ elif val < t_start:
delta = t_start - val
if delta > ctx.config.max_time_diff_ms:
@@ -91,9 +90,9 @@
ctx.config.max_time_diff_ms)
logger.error(msg)
raise utils.StopTestError(msg)
- if delta > 0:
- logger.warning("Node %s has time shift at least %s ms", node, delta)
+ if delta > 1:
+ logger.warning("Node %s has time shift at least %s ms", node, int(delta))
def cleanup(self, ctx: TestRun) -> None:
if ctx.config.get("download_rpc_logs", False):
@@ -123,43 +122,25 @@
class CollectInfoStage(Stage):
"""Collect node info"""
- priority = StepOrder.START_SENSORS - 2
+ priority = StepOrder.UPDATE_NODES_INFO
config_block = 'collect_info'
def run(self, ctx: TestRun) -> None:
- if not ctx.config.collect_info:
- return
-
- futures = {} # type: Dict[Tuple[str, str], Future]
-
with ctx.get_pool() as pool:
- # can't make next RPC request until finish with previous
- for node in ctx.nodes:
- nid = node.node_id
- hw_info_path = WallyDB.hw_info.format(node_id=nid)
- if hw_info_path not in ctx.storage:
- futures[(hw_info_path, nid)] = pool.submit(get_hw_info, node)
+ try:
+ # can't make next RPC request until finish with previous for same node
+ for node, hw_info in zip(ctx.nodes, pool.map(get_hw_info, ctx.nodes)):
+ node.info.hw_info = hw_info
+ for node, sw_info in zip(ctx.nodes, pool.map(get_sw_info, ctx.nodes)):
+ node.info.sw_info = sw_info
+ except Exception as exc:
+ logger.exception("During collecting cluster info")
+ raise utils.StopTestError() from exc
- for (path, nid), future in futures.items():
- try:
- ctx.storage.put(future.result(), path)
- except Exception:
- logger.exception("During collecting hardware info from %s", nid)
- raise utils.StopTestError()
-
- futures.clear()
- for node in ctx.nodes:
- nid = node.node_id
- sw_info_path = WallyDB.sw_info.format(node_id=nid)
- if sw_info_path not in ctx.storage:
- futures[(sw_info_path, nid)] = pool.submit(get_sw_info, node)
-
- for (path, nid), future in futures.items():
- try:
- ctx.storage.put(future.result(), path)
- except Exception:
- logger.exception("During collecting software info from %s", nid)
- raise utils.StopTestError()
+ logger.debug("Collecting hostnames")
+ hostnames = pool.map(get_hostname, ctx.nodes)
+ for node, hostname in zip(ctx.nodes, hostnames):
+ node.info.hostname = hostname
class ExplicitNodesStage(Stage):
diff --git a/wally/suits/io/fio_job.py b/wally/suits/io/fio_job.py
index 3d4e886..6676895 100644
--- a/wally/suits/io/fio_job.py
+++ b/wally/suits/io/fio_job.py
@@ -116,11 +116,12 @@
@property
def oper(self) -> str:
- return self.vals['rw']
+ vl = self.vals['rw']
+ return vl if ':' not in vl else vl.split(":")[0]
@property
def op_type_short(self) -> str:
- return self.op_type2short[self.vals['rw']]
+ return self.op_type2short[self.oper]
@property
def thcount(self) -> int:
diff --git a/wally/suits/io/fio_task_parser.py b/wally/suits/io/fio_task_parser.py
index 9ffeed8..5b91885 100644
--- a/wally/suits/io/fio_task_parser.py
+++ b/wally/suits/io/fio_task_parser.py
@@ -210,6 +210,7 @@
def apply_params(sec: FioJobConfig, params: FioParams) -> FioJobConfig:
processed_vals = OrderedDict() # type: Dict[str, Any]
processed_vals.update(params)
+
for name, val in sec.vals.items():
if name in params:
continue
@@ -300,6 +301,12 @@
def fio_cfg_compile(source: str, fname: str, test_params: FioParams) -> Iterator[FioJobConfig]:
+ test_params = test_params.copy()
+
+ if 'RAMPTIME' not in test_params and 'RUNTIME' in test_params:
+ ramp = int(int(test_params['RUNTIME']) * 0.05)
+ test_params['RAMPTIME'] = min(30, max(5, ramp))
+
it = parse_all_in_1(source, fname)
it = (apply_params(sec, test_params) for sec in it)
it = flatmap(process_cycles, it)
diff --git a/wally/suits/io/verify.cfg b/wally/suits/io/verify.cfg
index 9433361..250fade 100644
--- a/wally/suits/io/verify.cfg
+++ b/wally/suits/io/verify.cfg
@@ -6,9 +6,9 @@
LQDW={% 1, 4, 16, 64 %}
LQDR={% 1, 4, 16, 64 %}
-runtime=600
+runtime={RUNTIME}
direct=1
-ramp_time=30
+ramp_time={RAMPTIME}
# ---------------------------------------------------------------------
@@ -19,24 +19,21 @@
[verify_{TEST_SUMM}]
blocksize=1m
-rw=read
+rw=randread:16
iodepth={LQDR}
[verify_{TEST_SUMM}]
blocksize=4k
rw=randwrite
-direct=1
iodepth={QDW}
[verify_{TEST_SUMM}]
blocksize=4k
rw=randread
-direct=1
iodepth={QDR}
[verify_{TEST_SUMM}]
blocksize=4k
rw=randwrite
sync=1
-direct=1
iodepth=1
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index b2b3a54..a997b02 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -105,8 +105,8 @@
run_times = list(map(self.get_expected_runtime, not_in_storage))
if None not in run_times:
- # +5% - is a rough estimation for additional operations
- expected_run_time = int(sum(run_times) * 1.05)
+ # +10s - is a rough estimation for additional operations per iteration
+ expected_run_time = int(sum(run_times) + 10 * len(not_in_storage))
exec_time_s, end_dt_s = get_time_interval_printable_info(expected_run_time)
logger.info("Entire test should takes around %s and finish at %s", exec_time_s, end_dt_s)
diff --git a/wally/test_run_class.py b/wally/test_run_class.py
index a7721e6..7aed795 100644
--- a/wally/test_run_class.py
+++ b/wally/test_run_class.py
@@ -1,9 +1,11 @@
+import collections
from typing import List, Callable, Any, Dict, Optional, Set
from concurrent.futures import ThreadPoolExecutor
from cephlib.istorage import IStorage
from cephlib.node import NodeInfo, IRPCNode
from cephlib.ssh import ConnCreds
+from cephlib.storage_selectors import DevRolesConfig
from .openstack_api import OSCreds, OSConnection
from .config import Config
@@ -37,6 +39,7 @@
self.config = config
self.sensors_run_on = set() # type: Set[str]
self.os_spawned_nodes_ids = None # type: List[int]
+ self.devs_locator = [] # type: DevRolesConfig
def get_pool(self):
return ThreadPoolExecutor(self.config.get('worker_pool_sz', 32))