working on reporting, this commit represent broking code state
diff --git a/wally/report.py b/wally/report.py
index cf1289b..f8d8c5a 100644
--- a/wally/report.py
+++ b/wally/report.py
@@ -1,44 +1,90 @@
+import os
+import re
import abc
+import bisect
import logging
-from typing import Dict, Any, Iterator, Tuple, cast, List
+from io import BytesIO
+from functools import wraps
+from typing import Dict, Any, Iterator, Tuple, cast, List, Callable
+from collections import defaultdict
import numpy
-import scipy
import matplotlib
-
# have to be before pyplot import to avoid tkinter(default graph frontend) import error
matplotlib.use('svg')
-
import matplotlib.pyplot as plt
+import scipy.stats
+import wally
-
-
-from .utils import ssize2b
+from . import html
+from .utils import b2ssize
from .stage import Stage, StepOrder
from .test_run_class import TestRun
-from .result_classes import NormStatProps
+from .hlstorage import ResultStorage
+from .node_interfaces import NodeInfo
+from .storage import Storage
+from .statistic import calc_norm_stat_props, calc_histo_stat_props
+from .result_classes import (StatProps, DataSource, TimeSeries, TestSuiteConfig,
+ NormStatProps, HistoStatProps, TestJobConfig)
+from .suits.io.fio_hist import get_lat_vals, expected_lat_bins
+from .suits.io.fio import FioTest, FioJobConfig
+from .suits.io.fio_task_parser import FioTestSumm
+from .statistic import approximate_curve, average, dev
logger = logging.getLogger("wally")
-class ConsoleReportStage(Stage):
-
- priority = StepOrder.REPORT
-
- def run(self, ctx: TestRun) -> None:
- # TODO(koder): load data from storage
- raise NotImplementedError("...")
+# ---------------- CONSTS ---------------------------------------------------------------------------------------------
-class HtmlReportStage(Stage):
+DEBUG = False
+LARGE_BLOCKS = 256
+MiB2KiB = 1024
+MS2S = 1000
- priority = StepOrder.REPORT
- def run(self, ctx: TestRun) -> None:
- # TODO(koder): load data from storage
- raise NotImplementedError("...")
+# ---------------- PROFILES ------------------------------------------------------------------------------------------
+
+
+class ColorProfile:
+ primary_color = 'b'
+ suppl_color1 = 'teal'
+ suppl_color2 = 'magenta'
+ box_color = 'y'
+
+ noise_alpha = 0.3
+ subinfo_alpha = 0.7
+
+
+class StyleProfile:
+ grid = True
+ tide_layout = True
+ hist_boxes = 10
+ min_points_for_dev = 5
+
+ dev_range_x = 2.0
+ dev_perc = 95
+
+ avg_range = 20
+
+ curve_approx_level = 5
+ curve_approx_points = 100
+ assert avg_range >= min_points_for_dev
+
+ extra_io_spine = True
+
+ legend_for_eng = True
+
+ units = {
+ 'bw': ("MiBps", MiB2KiB, "bandwith"),
+ 'iops': ("IOPS", 1, "iops"),
+ 'lat': ("ms", 1, "latency")
+ }
+
+
+# ---------------- STRUCTS -------------------------------------------------------------------------------------------
# TODO: need to be revised, have to user StatProps fields instead
@@ -63,15 +109,497 @@
self.lat_95 = None # type: float
+class IOSummary:
+ def __init__(self,
+ qd: int,
+ block_size: int,
+ nodes_count:int,
+ bw: NormStatProps,
+ lat: HistoStatProps) -> None:
+
+ self.qd = qd
+ self.nodes_count = nodes_count
+ self.block_size = block_size
+
+ self.bw = bw
+ self.lat = lat
+
+
+# -------------- AGGREGATION AND STAT FUNCTIONS ----------------------------------------------------------------------
+rexpr = {
+ 'sensor': r'(?P<sensor>[-a-z]+)',
+ 'dev': r'(?P<dev>[^.]+)',
+ 'metric': r'(?P<metric>[a-z_]+)',
+ 'node': r'(?P<node>\d+\.\d+\.\d+\.\d+:\d+)',
+}
+
+def iter_sensors(storage: Storage, node: str = None, sensor: str = None, dev: str = None, metric: str = None):
+ if node is None:
+ node = rexpr['node']
+ if sensor is None:
+ sensor = rexpr['sensor']
+ if dev is None:
+ dev = rexpr['dev']
+ if metric is None:
+ metric = rexpr['metric']
+
+ rr = r"{}_{}\.{}\.{}$".format(node, sensor, dev, metric)
+ sensor_name_re = re.compile(rr)
+
+ for is_file, sensor_data_name in storage.list("sensors"):
+ if is_file:
+ rr = sensor_name_re.match(sensor_data_name)
+ if rr:
+ yield 'sensors/' + sensor_data_name, rr.groupdict()
+
+
+def make_iosum(rstorage: ResultStorage, suite: TestSuiteConfig, job: FioJobConfig) -> IOSummary:
+ lat = get_aggregated(rstorage, suite, job, "lat")
+ bins_edges = numpy.array(get_lat_vals(lat.second_axis_size), dtype='float32') / 1000
+ io = get_aggregated(rstorage, suite, job, "bw")
+
+ return IOSummary(job.qd,
+ nodes_count=len(suite.nodes_ids),
+ block_size=job.bsize,
+ lat=calc_histo_stat_props(lat, bins_edges, StyleProfile.hist_boxes),
+ bw=calc_norm_stat_props(io, StyleProfile.hist_boxes))
+
+#
+# def iter_io_results(rstorage: ResultStorage,
+# qds: List[int] = None,
+# op_types: List[str] = None,
+# sync_types: List[str] = None,
+# block_sizes: List[int] = None) -> Iterator[Tuple[TestSuiteConfig, FioJobConfig]]:
+#
+# for suite in rstorage.iter_suite(FioTest.name):
+# for job in rstorage.iter_job(suite):
+# fjob = cast(FioJobConfig, job)
+# assert int(fjob.vals['numjobs']) == 1
+#
+# if sync_types is not None and fjob.sync_mode in sync_types:
+# continue
+#
+# if block_sizes is not None and fjob.bsize not in block_sizes:
+# continue
+#
+# if op_types is not None and fjob.op_type not in op_types:
+# continue
+#
+# if qds is not None and fjob.qd not in qds:
+# continue
+#
+# yield suite, fjob
+
+
+def get_aggregated(rstorage: ResultStorage, suite: TestSuiteConfig, job: FioJobConfig, sensor: str) -> TimeSeries:
+ tss = list(rstorage.iter_ts(suite, job, sensor=sensor))
+ ds = DataSource(suite_id=suite.storage_id,
+ job_id=job.storage_id,
+ node_id="__all__",
+ dev='fio',
+ sensor=sensor,
+ tag=None)
+
+ agg_ts = TimeSeries(sensor,
+ raw=None,
+ source=ds,
+ data=numpy.zeros(tss[0].data.shape, dtype=tss[0].data.dtype),
+ times=tss[0].times.copy(),
+ second_axis_size=tss[0].second_axis_size)
+
+ for ts in tss:
+ if sensor == 'lat' and ts.second_axis_size != expected_lat_bins:
+ logger.error("Sensor %s.%s on node %s has" +
+ "second_axis_size=%s. Can only process sensors with second_axis_size=%s.",
+ ts.source.dev, ts.source.sensor, ts.source.node_id,
+ ts.second_axis_size, expected_lat_bins)
+ continue
+
+ if sensor != 'lat' and ts.second_axis_size != 1:
+ logger.error("Sensor %s.%s on node %s has" +
+ "second_axis_size=%s. Can only process sensors with second_axis_size=1.",
+ ts.source.dev, ts.source.sensor, ts.source.node_id, ts.second_axis_size)
+ continue
+
+ # TODO: match times on different ts
+ agg_ts.data += ts.data
+
+ return agg_ts
+
+
+# -------------- PLOT HELPERS FUNCTIONS ------------------------------------------------------------------------------
+
+def get_emb_data_svg(plt: Any) -> bytes:
+ bio = BytesIO()
+ plt.savefig(bio, format='svg')
+ img_start = "<!-- Created with matplotlib (http://matplotlib.org/) -->"
+ return bio.getvalue().decode("utf8").split(img_start, 1)[1].encode("utf8")
+
+
+def provide_plot(func: Callable[..., None]) -> Callable[..., str]:
+ @wraps(func)
+ def closure1(storage: ResultStorage, path: DataSource, *args, **kwargs) -> str:
+ fpath = storage.check_plot_file(path)
+ if not fpath:
+ func(*args, **kwargs)
+ fpath = storage.put_plot_file(get_emb_data_svg(plt), path)
+ plt.clf()
+ logger.debug("Save plot for %s to %r", path, fpath)
+ return fpath
+ return closure1
+
+
+def apply_style(style: StyleProfile, eng: bool = True, no_legend: bool = False) -> None:
+ if style.grid:
+ plt.grid(True)
+
+ if (style.legend_for_eng or not eng) and not no_legend:
+ legend_location = "center left"
+ legend_bbox_to_anchor = (1.03, 0.81)
+ plt.legend(loc=legend_location, bbox_to_anchor=legend_bbox_to_anchor)
+
+
+# -------------- PLOT FUNCTIONS --------------------------------------------------------------------------------------
+
+
+@provide_plot
+def plot_hist(title: str, units: str,
+ prop: StatProps,
+ colors: Any = ColorProfile,
+ style: Any = StyleProfile) -> None:
+
+ # TODO: unit should came from ts
+ total = sum(prop.bins_populations)
+ mids = prop.bins_mids
+ normed_bins = [population / total for population in prop.bins_populations]
+ bar_width = mids[1] - mids[0]
+ plt.bar(mids - bar_width / 2, normed_bins, color=colors.box_color, width=bar_width, label="Real data")
+
+ plt.xlabel(units)
+ plt.ylabel("Value probability")
+ plt.title(title)
+
+ dist_plotted = False
+ if isinstance(prop, NormStatProps):
+ nprop = cast(NormStatProps, prop)
+ stats = scipy.stats.norm(nprop.average, nprop.deviation)
+
+ # xpoints = numpy.linspace(mids[0], mids[-1], style.curve_approx_points)
+ # ypoints = stats.pdf(xpoints) / style.curve_approx_points
+
+ edges, step = numpy.linspace(mids[0], mids[-1], len(mids) * 10, retstep=True)
+
+ ypoints = stats.cdf(edges) * 11
+ ypoints = [next - prev for (next, prev) in zip(ypoints[1:], ypoints[:-1])]
+ xpoints = (edges[1:] + edges[:-1]) / 2
+
+ plt.plot(xpoints, ypoints, color=colors.primary_color, label="Expected from\nnormal distribution")
+ dist_plotted = True
+
+ apply_style(style, eng=True, no_legend=not dist_plotted)
+
+
+@provide_plot
+def plot_v_over_time(title: str, units: str,
+ ts: TimeSeries,
+ plot_avg_dev: bool = True,
+ colors: Any = ColorProfile, style: Any = StyleProfile) -> None:
+
+ min_time = min(ts.times)
+
+ # /1000 is us to ms conversion
+ time_points = [(val_time - min_time) / 1000 for val_time in ts.times]
+
+ alpha = colors.noise_alpha if plot_avg_dev else 1.0
+ plt.plot(time_points, ts.data, "o", color=colors.primary_color, alpha=alpha, label="Data")
+
+ if plot_avg_dev:
+ avg_vals = []
+ low_vals_dev = []
+ hight_vals_dev = []
+ avg_times = []
+ dev_times = []
+
+ start = (len(ts.data) % style.avg_range) // 2
+ points = list(range(start, len(ts.data) + 1, style.avg_range))
+
+ for begin, end in zip(points[:-1], points[1:]):
+ vals = ts.data[begin: end]
+
+ cavg = average(vals)
+ cdev = dev(vals)
+ tavg = average(time_points[begin: end])
+
+ avg_vals.append(cavg)
+ avg_times.append(tavg)
+
+ low_vals_dev.append(cavg - style.dev_range_x * cdev)
+ hight_vals_dev.append(cavg + style.dev_range_x * cdev)
+ dev_times.append(tavg)
+
+ avg_timepoints = cast(List[float], numpy.linspace(avg_times[0], avg_times[-1], style.curve_approx_points))
+
+ low_vals_dev = approximate_curve(dev_times, low_vals_dev, avg_timepoints, style.curve_approx_level)
+ hight_vals_dev = approximate_curve(dev_times, hight_vals_dev, avg_timepoints, style.curve_approx_level)
+ new_vals_avg = approximate_curve(avg_times, avg_vals, avg_timepoints, style.curve_approx_level)
+
+ plt.plot(avg_timepoints, new_vals_avg, c=colors.suppl_color1,
+ label="Average\nover {}s".format(style.avg_range))
+ plt.plot(avg_timepoints, low_vals_dev, c=colors.suppl_color2,
+ label="Avg \xB1 {} * stdev\nover {}s".format(style.dev_range_x, style.avg_range))
+ plt.plot(avg_timepoints, hight_vals_dev, c=colors.suppl_color2)
+
+ plt.xlim(-5, max(time_points) + 5)
+
+ plt.xlabel("Time, seconds from test begin")
+ plt.ylabel("{}. Average and \xB1stddev over {} points".format(units, style.avg_range))
+ plt.title(title)
+ apply_style(style, eng=True)
+
+
+@provide_plot
+def plot_lat_over_time(title: str, ts: TimeSeries, bins_vals: List[int], samples: int = 5,
+ colors: Any = ColorProfile, style: Any = StyleProfile) -> None:
+
+ min_time = min(ts.times)
+ times = [int(tm - min_time + 500) // 1000 for tm in ts.times]
+ ts_len = len(times)
+ step = ts_len / samples
+ points = [times[int(i * step + 0.5)] for i in range(samples)]
+ points.append(times[-1])
+ bounds = list(zip(points[:-1], points[1:]))
+ data = numpy.array(ts.data, dtype='int32')
+ data.shape = [len(ts.data) // ts.second_axis_size, ts.second_axis_size] # type: ignore
+ agg_data = []
+ positions = []
+ labels = []
+
+ min_idxs = []
+ max_idxs = []
+
+ for begin, end in bounds:
+ agg_hist = numpy.sum(data[begin:end], axis=0)
+
+ vals = numpy.empty(shape=(numpy.sum(agg_hist),), dtype='float32')
+ cidx = 0
+ non_zero = agg_hist.nonzero()[0]
+ min_idxs.append(non_zero[0])
+ max_idxs.append(non_zero[-1])
+
+ for pos in non_zero:
+ vals[cidx:cidx + agg_hist[pos]] = bins_vals[pos]
+ cidx += agg_hist[pos]
+
+ agg_data.append(vals)
+ positions.append((end + begin) / 2)
+ labels.append(str((end + begin) // 2))
+
+ min_y = bins_vals[min(min_idxs)]
+ max_y = bins_vals[max(max_idxs)]
+
+ min_y -= (max_y - min_y) * 0.05
+ max_y += (max_y - min_y) * 0.05
+
+ # plot box size adjust (only plot, not spines and legend)
+ plt.boxplot(agg_data, 0, '', positions=positions, labels=labels, widths=step / 4)
+ plt.xlim(min(times), max(times))
+ plt.ylim(min_y, max_y)
+ plt.xlabel("Time, seconds from test begin, sampled for ~{} seconds".format(int(step)))
+ plt.ylabel("Latency, ms")
+ plt.title(title)
+ apply_style(style, eng=True, no_legend=True)
+
+
+@provide_plot
+def plot_heatmap(title: str, ts: TimeSeries, bins_vals: List[int], samples: int = 5,
+ colors: Any = ColorProfile, style: Any = StyleProfile) -> None:
+ hist_bins_count = 20
+ bin_top = [100 * 2 ** i for i in range(20)]
+ bin_ranges = [[0, 0]]
+ cborder_it = iter(bin_top)
+ cborder = next(cborder_it)
+ for bin_val in bins_vals:
+ if bin_val < cborder:
+ bin_ranges
+
+ # bins: [100us, 200us, ...., 104s]
+ # msp origin bins ranges to heatmap bins
+
+@provide_plot
+def io_chart(title: str,
+ legend: str,
+ iosums: List[IOSummary],
+ iops_log_spine: bool = False,
+ lat_log_spine: bool = False,
+ colors: Any = ColorProfile,
+ style: Any = StyleProfile) -> None:
+
+ # -------------- MAGIC VALUES ---------------------
+ # IOPS bar width
+ width = 0.35
+
+ # offset from center of bar to deviation/confidence range indicator
+ err_x_offset = 0.05
+
+ # figure size in inches
+ figsize = (12, 6)
+
+ # extra space on top and bottom, comparing to maximal tight layout
+ extra_y_space = 0.05
+
+ # additional spine for BW/IOPS on left side of plot
+ extra_io_spine_x_offset = -0.1
+
+ # extra space on left and right sides
+ extra_x_space = 0.5
+
+ # legend location settings
+ legend_location = "center left"
+ legend_bbox_to_anchor = (1.1, 0.81)
+
+ # plot box size adjust (only plot, not spines and legend)
+ plot_box_adjust = {'right': 0.66}
+ # -------------- END OF MAGIC VALUES ---------------------
+
+ block_size = iosums[0].block_size
+ lc = len(iosums)
+ xt = list(range(1, lc + 1))
+
+ # x coordinate of middle of the bars
+ xpos = [i - width / 2 for i in xt]
+
+ # import matplotlib.gridspec as gridspec
+ # gs = gridspec.GridSpec(1, 3, width_ratios=[1, 4, 1])
+ # p1 = plt.subplot(gs[1])
+
+ fig, p1 = plt.subplots(figsize=figsize)
+
+ # plot IOPS/BW bars
+ if block_size >= LARGE_BLOCKS:
+ iops_primary = False
+ coef = MiB2KiB
+ p1.set_ylabel("BW (MiBps)")
+ else:
+ iops_primary = True
+ coef = block_size
+ p1.set_ylabel("IOPS")
+
+ p1.bar(xpos, [iosum.bw.average / coef for iosum in iosums], width=width, color=colors.box_color, label=legend)
+
+ # set correct x limits for primary IO spine
+ min_io = min(iosum.bw.average - iosum.bw.deviation * style.dev_range_x for iosum in iosums)
+ max_io = max(iosum.bw.average + iosum.bw.deviation * style.dev_range_x for iosum in iosums)
+ border = (max_io - min_io) * extra_y_space
+ io_lims = (min_io - border, max_io + border)
+
+ p1.set_ylim(io_lims[0] / coef, io_lims[-1] / coef)
+
+ # plot deviation and confidence error ranges
+ err1_legend = err2_legend = None
+ for pos, iosum in zip(xpos, iosums):
+ err1_legend = p1.errorbar(pos + width / 2 - err_x_offset,
+ iosum.bw.average / coef,
+ iosum.bw.deviation * style.dev_range_x / coef,
+ alpha=colors.subinfo_alpha,
+ color=colors.suppl_color1) # 'magenta'
+ err2_legend = p1.errorbar(pos + width / 2 + err_x_offset,
+ iosum.bw.average / coef,
+ iosum.bw.confidence / coef,
+ alpha=colors.subinfo_alpha,
+ color=colors.suppl_color2) # 'teal'
+
+ if style.grid:
+ p1.grid(True)
+
+ handles1, labels1 = p1.get_legend_handles_labels()
+
+ handles1 += [err1_legend, err2_legend]
+ labels1 += ["{}% dev".format(style.dev_perc),
+ "{}% conf".format(int(100 * iosums[0].bw.confidence_level))]
+
+ # extra y spine for latency on right side
+ p2 = p1.twinx()
+
+ # plot median and 95 perc latency
+ p2.plot(xt, [iosum.lat.perc_50 for iosum in iosums], label="lat med")
+ p2.plot(xt, [iosum.lat.perc_95 for iosum in iosums], label="lat 95%")
+
+ # limit and label x spine
+ plt.xlim(extra_x_space, lc + extra_x_space)
+ plt.xticks(xt, ["{0} * {1}".format(iosum.qd, iosum.nodes_count) for iosum in iosums])
+ p1.set_xlabel("QD * Test node count")
+
+ # apply log scales for X spines, if set
+ if iops_log_spine:
+ p1.set_yscale('log')
+
+ if lat_log_spine:
+ p2.set_yscale('log')
+
+ # extra y spine for BW/IOPS on left side
+ if style.extra_io_spine:
+ p3 = p1.twinx()
+ if iops_log_spine:
+ p3.set_yscale('log')
+
+ if iops_primary:
+ p3.set_ylabel("BW (MiBps)")
+ p3.set_ylim(io_lims[0] / MiB2KiB, io_lims[1] / MiB2KiB)
+ else:
+ p3.set_ylabel("IOPS")
+ p3.set_ylim(io_lims[0] / block_size, io_lims[1] / block_size)
+
+ p3.spines["left"].set_position(("axes", extra_io_spine_x_offset))
+ p3.spines["left"].set_visible(True)
+ p3.yaxis.set_label_position('left')
+ p3.yaxis.set_ticks_position('left')
+
+ p2.set_ylabel("Latency (ms)")
+
+ plt.title(title)
+
+ # legend box
+ handles2, labels2 = p2.get_legend_handles_labels()
+ plt.legend(handles1 + handles2, labels1 + labels2, loc=legend_location, bbox_to_anchor=legend_bbox_to_anchor)
+
+ # adjust central box size to fit legend
+ plt.subplots_adjust(**plot_box_adjust)
+ apply_style(style, eng=False, no_legend=True)
+
+
+# -------------------- REPORT HELPERS --------------------------------------------------------------------------------
+
+
class HTMLBlock:
data = None # type: str
js_links = [] # type: List[str]
css_links = [] # type: List[str]
+class Menu1st:
+ engineering = "Engineering"
+ summary = "Summary"
+
+
+class Menu2ndEng:
+ iops_time = "IOPS(time)"
+ hist = "IOPS/lat overall histogram"
+ lat_time = "Lat(time)"
+
+
+class Menu2ndSumm:
+ io_lat_qd = "IO & Lat vs QD"
+
+
+menu_1st_order = [Menu1st.summary, Menu1st.engineering]
+
+
+# -------------------- REPORTS --------------------------------------------------------------------------------------
+
+
class Reporter(metaclass=abc.ABCMeta):
@abc.abstractmethod
- def get_divs(self, config, storage) -> Iterator[Tuple[str, str, HTMLBlock]]:
+ def get_divs(self, suite: TestSuiteConfig, storage: ResultStorage) -> Iterator[Tuple[str, str, HTMLBlock]]:
pass
@@ -81,8 +609,38 @@
# Main performance report
-class IOPS_QD(Reporter):
+class IO_QD(Reporter):
"""Creates graph, which show how IOPS and Latency depend on QD"""
+ def get_divs(self, suite: TestSuiteConfig, rstorage: ResultStorage) -> Iterator[Tuple[str, str, HTMLBlock]]:
+ ts_map = {} # type: Dict[FioTestSumm, List[IOSummary]]
+ str_summary = {} # type: Dict[FioTestSumm, List[IOSummary]]
+ for job in rstorage.iter_job(suite):
+ fjob = cast(FioJobConfig, job)
+ tpl_no_qd = fjob.characterized_tuple_no_qd()
+ io_summ = make_iosum(rstorage, suite, job)
+
+ if tpl_no_qd not in ts_map:
+ ts_map[tpl_no_qd] = [io_summ]
+ str_summary[tpl_no_qd] = (fjob.summary_no_qd(), fjob.long_summary_no_qd())
+ else:
+ ts_map[tpl_no_qd].append(io_summ)
+
+ for tpl, iosums in ts_map.items():
+ iosums.sort(key=lambda x: x.qd)
+ summary, summary_long = str_summary[tlp]
+
+ ds = DataSource(suite_id=suite.storage_id,
+ job_id="io_over_qd_".format(summary),
+ node_id="__all__",
+ dev='fio',
+ sensor="io_over_qd",
+ tag="svg")
+
+ title = "IOPS, BW, Lat vs. QD.\n" + summary_long
+ fpath = io_chart(rstorage, ds, title=title, legend="IOPS/BW", iosums=iosums)
+ yield Menu1st.summary, Menu2ndSumm.io_lat_qd, html.img(fpath)
+ if DEBUG:
+ return
# Linearization report
@@ -91,21 +649,231 @@
# IOPS/latency distribution
-class IOPSHist(Reporter):
+class IOHist(Reporter):
"""IOPS.latency distribution histogram"""
+ def get_divs(self, suite: TestSuiteConfig, rstorage: ResultStorage) -> Iterator[Tuple[str, str, HTMLBlock]]:
+ for job in rstorage.iter_job(suite):
+ fjob = cast(FioJobConfig, job)
+ agg_lat = get_aggregated(rstorage, suite, fjob, "lat")
+ bins_edges = numpy.array(get_lat_vals(agg_lat.second_axis_size), dtype='float32') / 1000 # convert us to ms
+ lat_stat_prop = calc_histo_stat_props(agg_lat, bins_edges, bins_count=StyleProfile.hist_boxes)
+
+ title = "Latency distribution. " + fjob.long_summary
+ units = "ms"
+
+ fpath = plot_hist(rstorage, agg_lat.source(tag='hist.svg'), title, units, lat_stat_prop)
+ if DEBUG:
+ yield Menu1st.summary, Menu2ndSumm.io_lat_qd, html.img(fpath)
+ else:
+ yield Menu1st.engineering, Menu2ndEng.hist, html.img(fpath)
+
+ agg_io = get_aggregated(rstorage, suite, fjob, "bw")
+
+ if fjob.bsize >= LARGE_BLOCKS:
+ title = "BW distribution. " + fjob.long_summary
+ units = "MiBps"
+ agg_io.data /= MiB2KiB
+ else:
+ title = "IOPS distribution. " + fjob.long_summary
+ agg_io.data /= fjob.bsize
+ units = "IOPS"
+
+ io_stat_prop = calc_norm_stat_props(agg_io, bins_count=StyleProfile.hist_boxes)
+ fpath = plot_hist(rstorage, agg_io.source(tag='hist.svg'), title, units, io_stat_prop)
+ if DEBUG:
+ yield Menu1st.summary, Menu2ndSumm.io_lat_qd, html.img(fpath)
+ return
+ else:
+ yield Menu1st.engineering, Menu2ndEng.hist, html.img(fpath)
-# IOPS/latency over test time
-class IOPSTime(Reporter):
+# IOPS/latency over test time for each job
+class IOTime(Reporter):
"""IOPS/latency during test"""
- def get_divs(self, config, storage) -> Iterator[Tuple[str, str, HTMLBlock]]:
- pass
+ def get_divs(self, suite: TestSuiteConfig, rstorage: ResultStorage) -> Iterator[Tuple[str, str, HTMLBlock]]:
+ for job in rstorage.iter_job(suite):
+ fjob = cast(FioJobConfig, job)
+ agg_lat = get_aggregated(rstorage, suite, fjob, "lat")
+ bins_edges = numpy.array(get_lat_vals(agg_lat.second_axis_size), dtype='float32') / 1000
+ title = "Latency during test. " + fjob.long_summary
+
+ fpath = plot_lat_over_time(rstorage, agg_lat.source(tag='ts.svg'), title, agg_lat, bins_edges)
+ if DEBUG:
+ yield Menu1st.summary, Menu2ndSumm.io_lat_qd, html.img(fpath)
+ else:
+ yield Menu1st.engineering, Menu2ndEng.lat_time, html.img(fpath)
+
+ fpath = plot_heatmap(rstorage, agg_lat.source(tag='hmap.svg'), title, agg_lat, bins_edges)
+ if DEBUG:
+ yield Menu1st.summary, Menu2ndSumm.io_lat_qd, html.img(fpath)
+ else:
+ yield Menu1st.engineering, Menu2ndEng.lat_time, html.img(fpath)
+
+ agg_io = get_aggregated(rstorage, suite, fjob, "bw")
+ if fjob.bsize >= LARGE_BLOCKS:
+ title = "BW during test. " + fjob.long_summary
+ units = "MiBps"
+ agg_io.data /= MiB2KiB
+ else:
+ title = "IOPS during test. " + fjob.long_summary
+ agg_io.data /= fjob.bsize
+ units = "IOPS"
+
+ fpath = plot_v_over_time(rstorage, agg_io.source(tag='ts.svg'), title, units, agg_io)
+
+ if DEBUG:
+ yield Menu1st.summary, Menu2ndSumm.io_lat_qd, html.img(fpath)
+ return
+ else:
+ yield Menu1st.engineering, Menu2ndEng.iops_time, html.img(fpath)
+
+
+def is_sensor_numarray(sensor: str, metric: str) -> bool:
+ """Returns True if sensor provides one-dimension array of numeric values. One number per one measurement."""
+ return True
+
+
+LEVEL_SENSORS = {("block-io", "io_queue"),
+ ("system-cpu", "procs_blocked"),
+ ("system-cpu", "procs_queue")}
+
+
+def is_level_sensor(sensor: str, metric: str) -> bool:
+ """Returns True if sensor measure level of any kind, E.g. queue depth."""
+ return (sensor, metric) in LEVEL_SENSORS
+
+
+def is_delta_sensor(sensor: str, metric: str) -> bool:
+ """Returns True if sensor provides deltas for cumulative value. E.g. io completed in given period"""
+ return not is_level_sensor(sensor, metric)
+
+
+
+def get_sensor(storage: Storage, node: str, sensor: str, dev: str, metric: str,
+ time_range: Tuple[int, int]) -> numpy.array:
+ """Return sensor values for given node for given period. Return per second estimated values array
+
+ Raise an error if required range is not full covered by data in storage.
+ First it finds range of results from sensor, which fully covers requested range.
+ ...."""
+
+ collected_at = numpy.array(storage.get_array("sensors/{}_collected_at".format(node)), dtype="int")
+ data = numpy.array(storage.get_array("sensors/{}_{}.{}.{}".format(node, sensor, dev, metric)))
+
+ # collected_at is array of pairs (collection_started_at, collection_finished_at)
+ collection_start_at = collected_at[::2]
+
+ MICRO = 1000000
+
+ # convert secods to us
+ begin = time_range[0] * MICRO
+ end = time_range[1] * MICRO
+
+ if begin < collection_start_at[0] or end > collection_start_at[-1] or end <= begin:
+ raise AssertionError(("Incorrect data for get_sensor - time_range={!r}, collected_at=[{}, ..., {}]," +
+ "sensor = {}_{}.{}.{}").format(time_range,
+ collected_at[0] // MICRO,
+ collected_at[-1] // MICRO,
+ node, sensor, dev, metric))
+
+ pos1, pos2 = numpy.searchsorted(collection_start_at, (begin, end))
+ assert pos1 >= 1
+
+ time_bounds = collection_start_at[pos1 - 1: pos2]
+ edge_it = iter(time_bounds)
+ val_it = iter(data[pos1 - 1: pos2])
+
+ result = []
+ curr_summ = 0
+
+ results_cell_ends = begin + MICRO
+ curr_end = next(edge_it)
+
+ while results_cell_ends <= end:
+ curr_start = curr_end
+ curr_end = next(edge_it)
+ curr_val = next(val_it)
+ while curr_end >= results_cell_ends and results_cell_ends <= end:
+ current_part = (results_cell_ends - curr_start) / (curr_end - curr_start) * curr_val
+ result.append(curr_summ + current_part)
+ curr_summ = 0
+ curr_val -= current_part
+ curr_start = results_cell_ends
+ results_cell_ends += MICRO
+ curr_summ += curr_val
+
+ assert len(result) == (end - begin) // MICRO
+ return result
+
+
+class ResourceUsage:
+ def __init__(self, io_r_ops: int, io_w_ops: int, io_r_kb: int, io_w_kb: int) -> None:
+ self.io_w_ops = io_w_ops
+ self.io_r_ops = io_r_ops
+ self.io_w_kb = io_w_kb
+ self.io_r_kb = io_r_kb
+
+ self.cpu_used_user = None # type: int
+ self.cpu_used_sys = None # type: int
+ self.cpu_wait_io = None # type: int
+
+ self.net_send_packets = None # type: int
+ self.net_recv_packets = None # type: int
+ self.net_send_kb = None # type: int
+ self.net_recv_kb = None # type: int
# Cluster load over test time
class ClusterLoad(Reporter):
"""IOPS/latency during test"""
+ storage_sensors = [
+ ('block-io', 'reads_completed', "Read ops"),
+ ('block-io', 'writes_completed', "Write ops"),
+ ('block-io', 'sectors_read', "Read kb"),
+ ('block-io', 'sectors_written', "Write kb"),
+ ]
+
+ def get_divs(self, suite: TestSuiteConfig, rstorage: ResultStorage) -> Iterator[Tuple[str, str, HTMLBlock]]:
+ # split nodes on test and other
+ storage = rstorage.storage
+ nodes = storage.load_list(NodeInfo, "all_nodes") # type: List[NodeInfo]
+
+ test_nodes = {node.node_id for node in nodes if 'testnode' in node.roles}
+ cluster_nodes = {node.node_id for node in nodes if 'testnode' not in node.roles}
+
+ for job in rstorage.iter_job(suite):
+ # convert ms to s
+ time_range = (job.reliable_info_starts_at // MS2S, job.reliable_info_stops_at // MS2S)
+ len = time_range[1] - time_range[0]
+
+ for sensor, metric, sensor_title in self.storage_sensors:
+ sum_testnode = numpy.zeros((len,))
+ sum_other = numpy.zeros((len,))
+
+ for path, groups in iter_sensors(rstorage.storage, sensor=sensor, metric=metric):
+ data = get_sensor(rstorage.storage, groups['node'], sensor, groups['dev'], metric, time_range)
+ if groups['node'] in test_nodes:
+ sum_testnode += data
+ else:
+ sum_other += data
+
+ ds = DataSource(suite_id=suite.storage_id, job_id=job.summary, node_id="cluster",
+ dev=sensor, sensor=metric, tag="ts.svg")
+
+ # s to ms
+ ts = TimeSeries(name="", times=numpy.arange(*time_range) * MS2S, data=sum_testnode, raw=None)
+ fpath = plot_v_over_time(rstorage, ds, "{}.{}".format(sensor, metric), sensor_title, ts=ts)
+ yield Menu1st.engineering, Menu2ndEng.iops_time, html.img(fpath)
+
+ if DEBUG:
+ return
+
+
+# Ceph cluster summary
+class ResourceConsumption(Reporter):
+ """Resources consumption report, only text"""
+
# Node load over test time
class NodeLoad(Reporter):
@@ -117,12 +885,72 @@
"""IOPS/latency during test"""
-# TODO: Resource consumption report
# TODO: Ceph operation breakout report
# TODO: Resource consumption for different type of test
-#
+# ------------------------------------------ REPORT STAGES -----------------------------------------------------------
+
+
+class HtmlReportStage(Stage):
+ priority = StepOrder.REPORT
+
+ def run(self, ctx: TestRun) -> None:
+ rstorage = ResultStorage(ctx.storage)
+ reporters = [ClusterLoad()] # IO_QD(), IOTime(), IOHist()] # type: List[Reporter]
+
+ root_dir = os.path.dirname(os.path.dirname(wally.__file__))
+ doc_templ_path = os.path.join(root_dir, "report_templates/index.html")
+ report_template = open(doc_templ_path, "rt").read()
+ css_file_src = os.path.join(root_dir, "report_templates/main.css")
+ css_file = open(css_file_src, "rt").read()
+
+ menu_block = []
+ content_block = []
+ link_idx = 0
+
+ matplotlib.rcParams.update({'font.size': 10})
+
+ items = defaultdict(lambda: defaultdict(list)) # type: Dict[str, Dict[str, list]]
+ for suite in rstorage.iter_suite(FioTest.name):
+ for reporter in reporters:
+ for block, item, html in reporter.get_divs(suite, rstorage):
+ items[block][item].append(html)
+
+ for idx_1st, menu_1st in enumerate(sorted(items, key=lambda x: menu_1st_order.index(x))):
+ menu_block.append(
+ '<a href="#item{}" class="nav-group" data-toggle="collapse" data-parent="#MainMenu">{}</a>'
+ .format(idx_1st, menu_1st)
+ )
+ menu_block.append('<div class="collapse" id="item{}">'.format(idx_1st))
+ for menu_2nd in sorted(items[menu_1st]):
+ menu_block.append(' <a href="#content{}" class="nav-group-item">{}</a>'
+ .format(link_idx, menu_2nd))
+ content_block.append('<div id="content{}">'.format(link_idx))
+ content_block.extend(" " + x for x in items[menu_1st][menu_2nd])
+ content_block.append('</div>')
+ link_idx += 1
+ menu_block.append('</div>')
+
+ report = report_template.replace("{{{menu}}}", ("\n" + " " * 16).join(menu_block))
+ report = report.replace("{{{content}}}", ("\n" + " " * 16).join(content_block))
+ report_path = rstorage.put_report(report, "index.html")
+ rstorage.put_report(css_file, "main.css")
+ logger.info("Report is stored into %r", report_path)
+
+
+class ConsoleReportStage(Stage):
+
+ priority = StepOrder.REPORT
+
+ def run(self, ctx: TestRun) -> None:
+ # TODO(koder): load data from storage
+ raise NotImplementedError("...")
+
+
+# --------------------------- LEGASY --------------------------------------------------------------------------------
+
+
# # disk_info = None
# # base = None
# # linearity = None