Move common storage, plot and statistic code to cephlib
diff --git a/wally/report.py b/wally/report.py
index 3ac1292..d37ac60 100644
--- a/wally/report.py
+++ b/wally/report.py
@@ -11,17 +11,21 @@
import wally
-from . import html
+from cephlib import html
+from cephlib.units import b2ssize, b2ssize_10, unit_conversion_coef, unit_conversion_coef_f
+from cephlib.statistic import calc_norm_stat_props
+from cephlib.storage_selectors import summ_sensors, find_sensors_to_2d
+from cephlib.wally_storage import find_nodes_by_roles
+
+from .utils import STORAGE_ROLES
from .stage import Stage, StepOrder
from .test_run_class import TestRun
-from .hlstorage import ResultStorage
-from .utils import b2ssize, b2ssize_10, STORAGE_ROLES, unit_conversion_coef
-from .statistic import calc_norm_stat_props
+from .result_classes import IResultStorage
from .result_classes import DataSource, TimeSeries, SuiteConfig
from .suits.io.fio import FioTest, FioJobConfig
from .suits.io.fio_job import FioJobParams
from .suits.job import JobConfig
-from .data_selectors import get_aggregated, AGG_TAG, summ_sensors, find_sensors_to_2d, find_nodes_by_roles
+from .data_selectors import get_aggregated, AGG_TAG
from .report_profiles import (DefStyleProfile, DefColorProfile, StyleProfile, ColorProfile,
default_format, io_chart_format)
from .plot import (io_chart, plot_simple_bars, plot_hmap_from_2d, plot_lat_over_time, plot_simple_over_time,
@@ -139,7 +143,7 @@
# -------------------- REPORTS --------------------------------------------------------------------------------------
class ReporterBase:
- def __init__(self, rstorage: ResultStorage, style: StyleProfile, colors: ColorProfile) -> None:
+ def __init__(self, rstorage: IResultStorage, style: StyleProfile, colors: ColorProfile) -> None:
self.style = style
self.colors = colors
self.rstorage = rstorage
@@ -223,7 +227,7 @@
def get_divs(self, suite: SuiteConfig) -> Iterator[Tuple[str, str, HTMLBlock]]:
qd_grouped_jobs = {} # type: Dict[FioJobParams, List[FioJobConfig]]
- test_nc = len(list(find_nodes_by_roles(self.rstorage, ['testnode'])))
+ test_nc = len(list(find_nodes_by_roles(self.rstorage.storage, ['testnode'])))
for job in self.rstorage.iter_job(suite):
fjob = cast(FioJobConfig, job)
if fjob.bsize != 4:
@@ -260,7 +264,7 @@
tag=io_chart_format)
fpath = self.plt(plot_simple_bars, ds, jc_no_qd.long_summary, labels, vals, errs,
- xlabel="CPU time per IOP", ylabel="QD * Test nodes" if test_nc != 1 else "QD",
+ xlabel="CPU core time per IOP", ylabel="QD * Test nodes" if test_nc != 1 else "QD",
x_formatter=(lambda x, pos: b2ssize_10(x) + 's'),
one_point_zero_line=False)
@@ -277,7 +281,7 @@
io_sum = make_iosum(self.rstorage, suite, fjob, self.style.hist_boxes)
caption = "Test summary - " + job.params.long_summary
- test_nc = len(list(find_nodes_by_roles(self.rstorage, ['testnode'])))
+ test_nc = len(list(find_nodes_by_roles(self.rstorage.storage, ['testnode'])))
if test_nc > 1:
caption += " * {} nodes".format(test_nc)
@@ -297,7 +301,7 @@
bw_units = "B"
bw_target_units = bw_units + 'ps'
- bw_coef = float(unit_conversion_coef(io_sum.bw.units, bw_target_units))
+ bw_coef = unit_conversion_coef_f(io_sum.bw.units, bw_target_units)
adf_v, *_1, stats, _2 = adfuller(io_sum.bw.data)
@@ -323,7 +327,7 @@
stat_data = [bw_data]
if fjob.bsize < StyleProfile.large_blocks:
- iops_coef = float(unit_conversion_coef(io_sum.bw.units, 'KiBps')) / fjob.bsize
+ iops_coef = unit_conversion_coef_f(io_sum.bw.units, 'KiBps') / fjob.bsize
iops_data = ["IOPS",
b2ssize_10(io_sum.bw.data.sum() * iops_coef),
"{}IOPS ~ {}IOPS".format(b2ssize_10(io_sum.bw.average * iops_coef),
@@ -337,7 +341,7 @@
ad_test]
lat_target_unit = 's'
- lat_coef = unit_conversion_coef(io_sum.lat.units, lat_target_unit)
+ lat_coef = unit_conversion_coef_f(io_sum.lat.units, lat_target_unit)
# latency
lat_data = ["Latency",
"-",
@@ -416,8 +420,8 @@
for name in records.keys()
}
- short_name[ResourceNames.storage_cpu_s] = "CPU (s/IOP)"
- short_name[ResourceNames.storage_cpu_s_b] = "CPU (s/B)"
+ short_name[ResourceNames.storage_cpu_s] = "CPU core (s/IOP)"
+ short_name[ResourceNames.storage_cpu_s_b] = "CPU core (s/B)"
with doc.tbody:
with doc.tr:
@@ -531,24 +535,21 @@
def get_divs(self, suite: SuiteConfig, job: JobConfig) -> Iterator[Tuple[str, str, HTMLBlock]]:
- nodes = list(find_nodes_by_roles(self.rstorage, STORAGE_ROLES))
+ nodes = list(find_nodes_by_roles(self.rstorage.storage, STORAGE_ROLES))
sensor = 'block-io'
metric = 'io_queue'
bn_val = 16
- for node in nodes:
+ for node_id in nodes:
bn = 0
tot = 0
- for _, ds in self.rstorage.iter_sensors(node_id=node.node_id, sensor=sensor, metric=metric):
+ for _, ds in self.rstorage.iter_sensors(node_id=node_id, sensor=sensor, metric=metric):
if ds.dev in ('sdb', 'sdc', 'sdd', 'sde'):
- data = self.rstorage.load_sensor(ds)
- p1 = job.reliable_info_range_s[0] * unit_conversion_coef('s', data.time_units)
- p2 = job.reliable_info_range_s[1] * unit_conversion_coef('s', data.time_units)
- idx1, idx2 = numpy.searchsorted(data.times, (p1, p2))
- bn += (data.data[idx1: idx2] > bn_val).sum()
- tot += idx2 - idx1
- print(node, bn, tot)
+ ts = self.rstorage.get_sensor(ds, job.reliable_info_range_s)
+ bn += (ts.data > bn_val).sum()
+ tot += len(ts.data)
+ print(node_id, bn, tot)
yield Menu1st.per_job, job.summary, HTMLBlock("")
@@ -586,7 +587,7 @@
storage_devs = None
test_nodes_devs = ['rbd0']
- for node in find_nodes_by_roles(self.rstorage, STORAGE_ROLES):
+ for node in self.rstorage.find_nodes(STORAGE_ROLES):
cjd = set(node.params['ceph_journal_devs'])
if journal_devs is None:
journal_devs = cjd
@@ -609,8 +610,9 @@
HTMLBlock(html.H2(html.center("{} IO heatmaps".format(name.capitalize()))))
# QD heatmap
- ioq2d = find_sensors_to_2d(self.rstorage, roles, sensor='block-io', devs=devs,
- metric='io_queue', time_range=trange)
+ nodes = find_nodes_by_roles(self.rstorage.storage, roles)
+ ioq2d = find_sensors_to_2d(self.rstorage, trange, sensor='block-io', devs=devs,
+ node_id=nodes, metric='io_queue', )
ds = DataSource(suite.storage_id, job.storage_id, AGG_TAG, 'block-io', name, tag="hmap." + default_format)
@@ -619,11 +621,11 @@
yield Menu1st.per_job, job.summary, HTMLBlock(html.img(fname))
# Block size heatmap
- wc2d = find_sensors_to_2d(self.rstorage, roles, sensor='block-io', devs=devs,
- metric='writes_completed', time_range=trange)
+ wc2d = find_sensors_to_2d(self.rstorage, trange, node_id=nodes, sensor='block-io', devs=devs,
+ metric='writes_completed')
wc2d[wc2d < 1E-3] = 1
- sw2d = find_sensors_to_2d(self.rstorage, roles, sensor='block-io', devs=devs,
- metric='sectors_written', time_range=trange)
+ sw2d = find_sensors_to_2d(self.rstorage, trange, node_id=nodes, sensor='block-io', devs=devs,
+ metric='sectors_written')
data2d = sw2d / wc2d / 1024
fname = self.plt(plot_hmap_from_2d, ds(metric='wr_block_size'),
data2d=data2d, title=name.capitalize() + " write block size",
@@ -631,8 +633,8 @@
yield Menu1st.per_job, job.summary, HTMLBlock(html.img(fname))
# iotime heatmap
- wtime2d = find_sensors_to_2d(self.rstorage, roles, sensor='block-io', devs=devs,
- metric='io_time', time_range=trange)
+ wtime2d = find_sensors_to_2d(self.rstorage, trange, node_id=nodes, sensor='block-io', devs=devs,
+ metric='io_time')
fname = self.plt(plot_hmap_from_2d, ds(metric='io_time'), data2d=wtime2d,
xlabel='Time', ylabel="IO time (ms) per second",
title=name.capitalize() + " iotime", bins=StyleProfile.iotime_bins)
@@ -650,24 +652,25 @@
yield Menu1st.per_job, job.summary, HTMLBlock(html.H2(html.center("Load tool results")))
- agg_io = get_aggregated(self.rstorage, suite, fjob, "bw")
+ agg_io = get_aggregated(self.rstorage, suite.storage_id, fjob.storage_id, "bw", job.reliable_info_range_s)
if fjob.bsize >= DefStyleProfile.large_blocks:
title = "Fio measured Bandwidth over time"
units = "MiBps"
- agg_io.data //= int(unit_conversion_coef(units, agg_io.units))
+ agg_io.data //= int(unit_conversion_coef_f(units, agg_io.units))
else:
title = "Fio measured IOPS over time"
- agg_io.data //= (int(unit_conversion_coef("KiBps", agg_io.units)) * fjob.bsize)
+ agg_io.data //= (int(unit_conversion_coef_f("KiBps", agg_io.units)) * fjob.bsize)
units = "IOPS"
fpath = self.plt(plot_v_over_time, agg_io.source(tag='ts.' + default_format), title, units, agg_io)
yield Menu1st.per_job, fjob.summary, HTMLBlock(html.img(fpath))
if fjob.bsize < DefStyleProfile.large_blocks:
- agg_lat = get_aggregated(self.rstorage, suite, fjob, "lat").copy()
+ agg_lat = get_aggregated(self.rstorage, suite.storage_id, fjob.storage_id, "lat",
+ job.reliable_info_range_s)
TARGET_UNITS = 'ms'
- coef = unit_conversion_coef(agg_lat.units, TARGET_UNITS)
- agg_lat.histo_bins = agg_lat.histo_bins.copy() * float(coef)
+ coef = unit_conversion_coef_f(agg_lat.units, TARGET_UNITS)
+ agg_lat.histo_bins = agg_lat.histo_bins.copy() * coef
agg_lat.units = TARGET_UNITS
fpath = self.plt(plot_lat_over_time, agg_lat.source(tag='ts.' + default_format), "Latency", agg_lat,
@@ -681,15 +684,15 @@
fjob = cast(FioJobConfig, job)
- agg_io = get_aggregated(self.rstorage, suite, fjob, "bw")
+ agg_io = get_aggregated(self.rstorage, suite.storage_id, fjob.storage_id, "bw", job.reliable_info_range_s)
if fjob.bsize >= DefStyleProfile.large_blocks:
title = "BW distribution"
units = "MiBps"
- agg_io.data //= int(unit_conversion_coef(units, agg_io.units))
+ agg_io.data //= int(unit_conversion_coef_f(units, agg_io.units))
else:
title = "IOPS distribution"
- agg_io.data //= (int(unit_conversion_coef("KiBps", agg_io.units)) * fjob.bsize)
+ agg_io.data //= (int(unit_conversion_coef_f("KiBps", agg_io.units)) * fjob.bsize)
units = "IOPS"
io_stat_prop = calc_norm_stat_props(agg_io, bins_count=StyleProfile.hist_boxes)
@@ -716,34 +719,34 @@
sensors = []
max_iop = 0
max_bytes = 0
-
+ stor_nodes = find_nodes_by_roles(self.rstorage.storage, STORAGE_ROLES)
for sensor, metric, op, units in self.storage_sensors:
- ts = summ_sensors(self.rstorage, STORAGE_ROLES, sensor, metric, job.reliable_info_range_s)
- ds = DataSource(suite_id=suite.storage_id,
- job_id=job.storage_id,
- node_id="storage",
- sensor=sensor,
- dev=AGG_TAG,
- metric=metric,
- tag="ts." + default_format)
+ ts = summ_sensors(self.rstorage, job.reliable_info_range_s, node_id=stor_nodes, sensor=sensor,
+ metric=metric)
+ if ts is not None:
+ ds = DataSource(suite_id=suite.storage_id,
+ job_id=job.storage_id,
+ node_id="storage",
+ sensor=sensor,
+ dev=AGG_TAG,
+ metric=metric,
+ tag="ts." + default_format)
- data = ts.data if units != 'MiB' else ts.data * float(unit_conversion_coef(ts.units, 'MiB'))
- ts = TimeSeries(name="",
- times=numpy.arange(*job.reliable_info_range_s),
- data=data,
- raw=None,
- units=units if ts.units is None else ts.units,
- time_units=ts.time_units,
- source=ds,
- histo_bins=ts.histo_bins)
+ data = ts.data if units != 'MiB' else ts.data * unit_conversion_coef_f(ts.units, 'MiB')
+ ts = TimeSeries(times=numpy.arange(*job.reliable_info_range_s),
+ data=data,
+ units=units if ts.units is None else ts.units,
+ time_units=ts.time_units,
+ source=ds,
+ histo_bins=ts.histo_bins)
- sensors.append(("{} {}".format(op, units), ds, ts, units))
+ sensors.append(("{} {}".format(op, units), ds, ts, units))
- if units == 'iop':
- max_iop = max(max_iop, data.sum())
- else:
- assert units == 'MiB'
- max_bytes = max(max_bytes, data.sum())
+ if units == 'iop':
+ max_iop = max(max_iop, data.sum())
+ else:
+ assert units == 'MiB'
+ max_bytes = max(max_bytes, data.sum())
for title, ds, ts, units in sensors:
if ts.data.sum() >= (max_iop if units == 'iop' else max_bytes) * DefStyleProfile.min_load_diff:
@@ -761,13 +764,11 @@
priority = StepOrder.REPORT
def run(self, ctx: TestRun) -> None:
- rstorage = ResultStorage(ctx.storage)
-
job_reporters_cls = [StatInfo, Resources, LoadToolResults, ClusterLoad, CPULoadPlot, QDIOTimeHeatmap]
- job_reporters = [rcls(rstorage, DefStyleProfile, DefColorProfile) for rcls in job_reporters_cls]
+ job_reporters = [rcls(ctx.rstorage, DefStyleProfile, DefColorProfile) for rcls in job_reporters_cls]
suite_reporters_cls = [IOQD, ResourceQD]
- suite_reporters = [rcls(rstorage, DefStyleProfile, DefColorProfile) for rcls in suite_reporters_cls]
+ suite_reporters = [rcls(ctx.rstorage, DefStyleProfile, DefColorProfile) for rcls in suite_reporters_cls]
root_dir = os.path.dirname(os.path.dirname(wally.__file__))
doc_templ_path = os.path.join(root_dir, "report_templates/index.html")
@@ -788,8 +789,8 @@
job_summ_sort_order = []
# TODO: filter reporters
- for suite in rstorage.iter_suite(FioTest.name):
- all_jobs = list(rstorage.iter_job(suite))
+ for suite in ctx.rstorage.iter_suite(FioTest.name):
+ all_jobs = list(ctx.rstorage.iter_job(suite))
all_jobs.sort(key=lambda job: job.params)
new_jobs_in_order = [job.summary for job in all_jobs]
@@ -797,7 +798,7 @@
assert not same, "Job with same names in different suits found: " + ",".join(same)
job_summ_sort_order.extend(new_jobs_in_order)
- for job in all_jobs[:1]:
+ for job in all_jobs:
try:
for reporter in job_reporters:
logger.debug("Start reporter %s on job %s suite %s",
@@ -845,6 +846,6 @@
report = report_template.replace("{{{menu}}}", ("\n" + " " * 16).join(menu_block))
report = report.replace("{{{content}}}", ("\n" + " " * 16).join(content_block))
- report_path = rstorage.put_report(report, "index.html")
- rstorage.put_report(css_file, "main.css")
+ report_path = ctx.rstorage.put_report(report, "index.html")
+ ctx.rstorage.put_report(css_file, "main.css")
logger.info("Report is stored into %r", report_path)