a lot of fixes, improve visualization speed, add c++ code
diff --git a/wally/report.py b/wally/report.py
index 75b8028..b8a7713 100644
--- a/wally/report.py
+++ b/wally/report.py
@@ -1,14 +1,23 @@
import os
import abc
import logging
+import warnings
from io import BytesIO
from functools import wraps
-from typing import Dict, Any, Iterator, Tuple, cast, List, Callable, Set, Optional
from collections import defaultdict
+from typing import Dict, Any, Iterator, Tuple, cast, List, Callable, Set
import numpy
import scipy.stats
+
+import matplotlib
+# matplotlib.use('GTKAgg')
+
import matplotlib.pyplot as plt
+from matplotlib import gridspec
+
+from cephlib.common import float2str
+from cephlib.plot import plot_hmap_with_y_histo, hmap_from_2d
import wally
@@ -17,15 +26,19 @@
from .test_run_class import TestRun
from .hlstorage import ResultStorage
from .node_interfaces import NodeInfo
-from .utils import b2ssize, b2ssize_10, STORAGE_ROLES
+from .utils import b2ssize, b2ssize_10, STORAGE_ROLES, unit_conversion_coef
from .statistic import (calc_norm_stat_props, calc_histo_stat_props, moving_average, moving_dev,
- hist_outliers_perc, ts_hist_outliers_perc, find_ouliers_ts, approximate_curve)
-from .result_classes import (StatProps, DataSource, TimeSeries, NormStatProps, HistoStatProps, SuiteConfig,
- IResultStorage)
-from .suits.io.fio_hist import get_lat_vals, expected_lat_bins
+ hist_outliers_perc, find_ouliers_ts, approximate_curve)
+from .result_classes import (StatProps, DataSource, TimeSeries, NormStatProps, HistoStatProps, SuiteConfig)
from .suits.io.fio import FioTest, FioJobConfig
from .suits.io.fio_job import FioJobParams
from .suits.job import JobConfig
+from .data_selectors import get_aggregated, AGG_TAG, summ_sensors, find_sensors_to_2d, find_nodes_by_roles
+
+
+with warnings.catch_warnings():
+ warnings.simplefilter("ignore")
+ import seaborn
logger = logging.getLogger("wally")
@@ -65,6 +78,7 @@
hist_boxes = 10
hist_lat_boxes = 25
hm_hist_bins_count = 25
+ hm_x_slots = 25
min_points_for_dev = 5
dev_range_x = 2.0
@@ -86,11 +100,12 @@
extra_io_spine = True
legend_for_eng = True
- heatmap_interpolation = '1d'
+ # heatmap_interpolation = '1d'
+ heatmap_interpolation = None
heatmap_interpolation_points = 300
outliers_q_nd = 3.0
outliers_hide_q_nd = 4.0
- outliers_lat = (0.01, 0.995)
+ outliers_lat = (0.01, 0.9)
violin_instead_of_box = True
violin_point_count = 30000
@@ -105,6 +120,10 @@
'lat': ("ms", 1, "latency")
}
+ qd_bins = [0, 1, 2, 4, 6, 8, 12, 16, 20, 26, 32, 40, 48, 56, 64, 96, 128]
+ iotime_bins = list(range(0, 1030, 50))
+ block_size_bins = [0, 2, 4, 8, 16, 32, 48, 64, 96, 128, 192, 256, 384, 512, 1024, 2048]
+
# ---------------- STRUCTS -------------------------------------------------------------------------------------------
@@ -151,81 +170,14 @@
def make_iosum(rstorage: ResultStorage, suite: SuiteConfig, job: FioJobConfig) -> IOSummary:
lat = get_aggregated(rstorage, suite, job, "lat")
- bins_edges = numpy.array(get_lat_vals(lat.data.shape[1]), dtype='float32') / 1000
io = get_aggregated(rstorage, suite, job, "bw")
return IOSummary(job.qd,
nodes_count=len(suite.nodes_ids),
block_size=job.bsize,
- lat=calc_histo_stat_props(lat, bins_edges, rebins_count=StyleProfile.hist_boxes),
+ lat=calc_histo_stat_props(lat, rebins_count=StyleProfile.hist_boxes),
bw=calc_norm_stat_props(io, StyleProfile.hist_boxes))
-#
-# def iter_io_results(rstorage: ResultStorage,
-# qds: List[int] = None,
-# op_types: List[str] = None,
-# sync_types: List[str] = None,
-# block_sizes: List[int] = None) -> Iterator[Tuple[TestSuiteConfig, FioJobConfig]]:
-#
-# for suite in rstorage.iter_suite(FioTest.name):
-# for job in rstorage.iter_job(suite):
-# fjob = cast(FioJobConfig, job)
-# assert int(fjob.vals['numjobs']) == 1
-#
-# if sync_types is not None and fjob.sync_mode in sync_types:
-# continue
-#
-# if block_sizes is not None and fjob.bsize not in block_sizes:
-# continue
-#
-# if op_types is not None and fjob.op_type not in op_types:
-# continue
-#
-# if qds is not None and fjob.qd not in qds:
-# continue
-#
-# yield suite, fjob
-
-
-AGG_TAG = 'ALL'
-
-
-def get_aggregated(rstorage: ResultStorage, suite: SuiteConfig, job: FioJobConfig, metric: str) -> TimeSeries:
- tss = list(rstorage.iter_ts(suite, job, metric=metric))
- ds = DataSource(suite_id=suite.storage_id,
- job_id=job.storage_id,
- node_id=AGG_TAG,
- sensor='fio',
- dev=AGG_TAG,
- metric=metric,
- tag='csv')
-
- agg_ts = TimeSeries(metric,
- raw=None,
- source=ds,
- data=numpy.zeros(tss[0].data.shape, dtype=tss[0].data.dtype),
- times=tss[0].times.copy(),
- units=tss[0].units)
-
- for ts in tss:
- if metric == 'lat' and (len(ts.data.shape) != 2 or ts.data.shape[1] != expected_lat_bins):
- logger.error("Sensor %s.%s on node %s has" +
- "shape=%s. Can only process sensors with shape=[X, %s].",
- ts.source.dev, ts.source.sensor, ts.source.node_id,
- ts.data.shape, expected_lat_bins)
- raise ValueError()
-
- if metric != 'lat' and len(ts.data.shape) != 1:
- logger.error("Sensor %s.%s on node %s has " +
- "shape=%s. Can only process 1D sensors.",
- ts.source.dev, ts.source.sensor, ts.source.node_id, ts.data.shape)
- raise ValueError()
-
- # TODO: match times on different ts
- agg_ts.data += ts.data
-
- return agg_ts
-
def is_sensor_numarray(sensor: str, metric: str) -> bool:
"""Returns True if sensor provides one-dimension array of numeric values. One number per one measurement."""
@@ -246,86 +198,6 @@
"""Returns True if sensor provides deltas for cumulative value. E.g. io completed in given period"""
return not is_level_sensor(sensor, metric)
-
-def get_sensor_for_time_range(storage: IResultStorage,
- node_id: str,
- sensor: str,
- dev: str,
- metric: str,
- time_range: Tuple[int, int]) -> numpy.array:
- """Return sensor values for given node for given period. Return per second estimated values array
-
- Raise an error if required range is not full covered by data in storage.
- First it finds range of results from sensor, which fully covers requested range.
- ...."""
-
- ds = DataSource(node_id=node_id, sensor=sensor, dev=dev, metric=metric)
- sensor_data = storage.load_sensor(ds)
- assert sensor_data.time_units == 'us'
-
- # collected_at is array of pairs (collection_started_at, collection_finished_at)
- # extract start time from each pair
- collection_start_at = sensor_data.times[::2] # type: numpy.array
-
- MICRO = 1000000
-
- # convert seconds to us
- begin = time_range[0] * MICRO
- end = time_range[1] * MICRO
-
- if begin < collection_start_at[0] or end > collection_start_at[-1] or end <= begin:
- raise AssertionError(("Incorrect data for get_sensor - time_range={!r}, collected_at=[{}, ..., {}]," +
- "sensor = {}_{}.{}.{}").format(time_range,
- sensor_data.times[0] // MICRO,
- sensor_data.times[-1] // MICRO,
- node_id, sensor, dev, metric))
-
- pos1, pos2 = numpy.searchsorted(collection_start_at, (begin, end))
-
- # current real data time chunk begin time
- edge_it = iter(collection_start_at[pos1 - 1: pos2 + 1])
-
- # current real data value
- val_it = iter(sensor_data.data[pos1 - 1: pos2 + 1])
-
- # result array, cumulative value per second
- result = numpy.zeros(int(end - begin) // MICRO)
- idx = 0
- curr_summ = 0
-
- # end of current time slot
- results_cell_ends = begin + MICRO
-
- # hack to unify looping
- real_data_end = next(edge_it)
- while results_cell_ends <= end:
- real_data_start = real_data_end
- real_data_end = next(edge_it)
- real_val_left = next(val_it)
-
- # real data "speed" for interval [real_data_start, real_data_end]
- real_val_ps = float(real_val_left) / (real_data_end - real_data_start)
-
- while real_data_end >= results_cell_ends and results_cell_ends <= end:
- # part of current real value, which is fit into current result cell
- curr_real_chunk = int((results_cell_ends - real_data_start) * real_val_ps)
-
- # calculate rest of real data for next result cell
- real_val_left -= curr_real_chunk
- result[idx] = curr_summ + curr_real_chunk
- idx += 1
- curr_summ = 0
-
- # adjust real data start time
- real_data_start = results_cell_ends
- results_cell_ends += MICRO
-
- # don't lost any real data
- curr_summ += real_val_left
-
- return result
-
-
# -------------- PLOT HELPERS FUNCTIONS ------------------------------------------------------------------------------
def get_emb_data_svg(plt: Any, format: str = 'svg') -> bytes:
@@ -411,38 +283,76 @@
@provide_plot
-def plot_v_over_time(title: str, units: str,
+def plot_simple_over_time(tss: List[Tuple[str, numpy.ndarray]],
+ title: str,
+ ylabel: str,
+ xlabel: str = "time, s",
+ average: bool = False,
+ colors: Any = ColorProfile,
+ style: Any = StyleProfile) -> None:
+ fig, ax = plt.subplots(figsize=(12, 6))
+ for name, arr in tss:
+ if average:
+ avg_vals = moving_average(arr, style.avg_range)
+ if style.approx_average:
+ time_points = numpy.arange(len(avg_vals))
+ avg_vals = approximate_curve(time_points, avg_vals, time_points, style.curve_approx_level)
+ arr = avg_vals
+ ax.plot(arr, label=name)
+ ax.set_title(title)
+ ax.set_ylabel(ylabel)
+ ax.set_xlabel(xlabel)
+ apply_style(style, eng=True)
+
+
+@provide_plot
+def plot_hmap_from_2d(data2d: numpy.ndarray,
+ title: str, ylabel: str, xlabel: str = 'time, s', bins: numpy.ndarray = None,
+ colors: Any = ColorProfile, style: Any = StyleProfile) -> None:
+ ioq1d, ranges = hmap_from_2d(data2d)
+ ax, _ = plot_hmap_with_y_histo(ioq1d, ranges, bins=bins)
+ ax.set_ylabel(ylabel)
+ ax.set_xlabel(xlabel)
+ ax.set_title(title)
+
+
+@provide_plot
+def plot_v_over_time(title: str,
+ units: str,
ts: TimeSeries,
plot_avg_dev: bool = True,
+ plot_points: bool = True,
colors: Any = ColorProfile, style: Any = StyleProfile) -> None:
min_time = min(ts.times)
- # /1000 is us to ms conversion
- time_points = numpy.array([(val_time - min_time) / 1000 for val_time in ts.times])
+ # convert time to ms
+ coef = float(unit_conversion_coef(ts.time_units, 's'))
+ time_points = numpy.array([(val_time - min_time) * coef for val_time in ts.times])
outliers_idxs = find_ouliers_ts(ts.data, cut_range=style.outliers_q_nd)
outliers_4q_idxs = find_ouliers_ts(ts.data, cut_range=style.outliers_hide_q_nd)
normal_idxs = numpy.logical_not(outliers_idxs)
outliers_idxs = outliers_idxs & numpy.logical_not(outliers_4q_idxs)
- hidden_outliers_count = numpy.count_nonzero(outliers_4q_idxs)
+ # hidden_outliers_count = numpy.count_nonzero(outliers_4q_idxs)
data = ts.data[normal_idxs]
data_times = time_points[normal_idxs]
outliers = ts.data[outliers_idxs]
outliers_times = time_points[outliers_idxs]
- alpha = colors.noise_alpha if plot_avg_dev else 1.0
- plt.plot(data_times, data, style.point_shape,
- color=colors.primary_color, alpha=alpha, label="Data")
- plt.plot(outliers_times, outliers, style.err_point_shape,
- color=colors.err_color, label="Outliers")
+ if plot_points:
+ alpha = colors.noise_alpha if plot_avg_dev else 1.0
+ plt.plot(data_times, data, style.point_shape,
+ color=colors.primary_color, alpha=alpha, label="Data")
+ plt.plot(outliers_times, outliers, style.err_point_shape,
+ color=colors.err_color, label="Outliers")
has_negative_dev = False
plus_minus = "\xb1"
if plot_avg_dev and len(data) < style.avg_range * 2:
- logger.warning("Array %r to small to plot average over %s points", title, style.avg_range)
+ logger.warning("Array %r to small to plot average over %s points", title, style.avg_range)
elif plot_avg_dev:
avg_vals = moving_average(data, style.avg_range)
dev_vals = moving_dev(data, style.avg_range)
@@ -467,7 +377,12 @@
plt.xlim(-5, max(time_points) + 5)
plt.xlabel("Time, seconds from test begin")
- plt.ylabel("{}. Average and {}stddev over {} points".format(units, plus_minus, style.avg_range))
+
+ if plot_avg_dev:
+ plt.ylabel("{}. Average and {}stddev over {} points".format(units, plus_minus, style.avg_range))
+ else:
+ plt.ylabel(units)
+
plt.title(title)
if has_negative_dev:
@@ -477,7 +392,9 @@
@provide_plot
-def plot_lat_over_time(title: str, ts: TimeSeries, bins_vals: List[int], samples: int = 5,
+def plot_lat_over_time(title: str, ts: TimeSeries,
+ ylabel: str,
+ samples: int = 5,
colors: Any = ColorProfile,
style: Any = StyleProfile) -> None:
@@ -499,16 +416,16 @@
# cut outliers
idx1, idx2 = hist_outliers_perc(agg_hist, style.outliers_lat)
agg_hist = agg_hist[idx1:idx2]
- curr_bins_vals = bins_vals[idx1:idx2]
+ curr_bins_vals = ts.histo_bins[idx1:idx2]
correct_coef = style.violin_point_count / sum(agg_hist)
if correct_coef > 1:
correct_coef = 1
else:
- curr_bins_vals = bins_vals
+ curr_bins_vals = ts.histo_bins
correct_coef = 1
- vals = numpy.empty(shape=(numpy.sum(agg_hist),), dtype='float32')
+ vals = numpy.empty(shape=[numpy.sum(agg_hist)], dtype='float32')
cidx = 0
non_zero, = agg_hist.nonzero()
@@ -541,36 +458,64 @@
plt.boxplot(agg_data, 0, '', positions=positions, labels=labels, widths=step / 4)
plt.xlim(min(times), max(times))
+ plt.ylabel(ylabel)
plt.xlabel("Time, seconds from test begin, sampled for ~{} seconds".format(int(step)))
- plt.ylabel("Latency, ms")
plt.title(title)
apply_style(style, eng=True, no_legend=True)
@provide_plot
-def plot_heatmap(title: str,
- ts: TimeSeries,
- bins_vals: List[int],
- colors: Any = ColorProfile,
- style: Any = StyleProfile) -> None:
+def plot_histo_heatmap(title: str,
+ ts: TimeSeries,
+ ylabel: str,
+ xlabel: str = "time, s",
+ colors: Any = ColorProfile,
+ style: Any = StyleProfile) -> None:
+ # only histogram-based ts can be plotted
assert len(ts.data.shape) == 2
- assert ts.data.shape[1] == len(bins_vals)
+ # Find global outliers. As load is expected to be stable during one job
+ # outliers range can be detected globally
total_hist = ts.data.sum(axis=0)
+ idx1, idx2 = hist_outliers_perc(total_hist,
+ bounds_perc=style.outliers_lat,
+ min_bins_left=style.hm_hist_bins_count)
- # idx1, idx2 = hist_outliers_perc(total_hist, style.outliers_lat)
- idx1, idx2 = ts_hist_outliers_perc(ts.data, bounds_perc=style.outliers_lat)
+ # merge outliers with most close non-outliers cell
+ orig_data = ts.data[:, idx1:idx2].copy()
+ if idx1 > 0:
+ orig_data[:, 0] += ts.data[:, :idx1].sum(axis=1)
- # don't cut too many bins
- min_bins_left = style.hm_hist_bins_count
- if idx2 - idx1 < min_bins_left:
- missed = min_bins_left - (idx2 - idx1) // 2
- idx2 = min(len(total_hist), idx2 + missed)
- idx1 = max(0, idx1 - missed)
+ if idx2 < ts.data.shape[1]:
+ orig_data[:, -1] += ts.data[:, idx2:].sum(axis=1)
- data = ts.data[:, idx1:idx2]
- bins_vals = bins_vals[idx1:idx2]
+ bins_vals = ts.histo_bins[idx1:idx2]
+
+ # rebin over X axis
+ # aggregate some lines in ts.data to plot not more than style.hm_x_slots x bins
+ agg_idx = float(len(orig_data)) / style.hm_x_slots
+ if agg_idx >= 2:
+ data = numpy.zeros([style.hm_x_slots, orig_data.shape[1]], dtype=numpy.float32) # type: List[numpy.ndarray]
+ next = agg_idx
+ count = 0
+ data_idx = 0
+ for idx, arr in enumerate(orig_data):
+ if idx >= next:
+ data[data_idx] /= count
+ data_idx += 1
+ next += agg_idx
+ count = 0
+ data[data_idx] += arr
+ count += 1
+
+ if count > 1:
+ data[-1] /= count
+ else:
+ data = orig_data
+
+ # rebin over Y axis
+ # =================
# don't using rebin_histogram here, as we need apply same bins for many arrays
step = (bins_vals[-1] - bins_vals[0]) / style.hm_hist_bins_count
@@ -586,35 +531,38 @@
cmap.append(curr_bins)
ncmap = numpy.array(cmap)
- xmin = 0
- xmax = (ts.times[-1] - ts.times[0]) / 1000 + 1
- ymin = new_bins_edges[0]
- ymax = new_bins_edges[-1]
+ # plot data
+ # =========
- fig, ax = plt.subplots(figsize=style.figsize)
+ fig = plt.figure(figsize=(12, 6))
+ boxes = 3
+ gs = gridspec.GridSpec(1, boxes)
+ ax = fig.add_subplot(gs[0, :boxes - 1])
- if style.heatmap_interpolation == '1d':
- interpolation = 'none'
- res = []
- for column in ncmap:
- new_x = numpy.linspace(0, len(column), style.heatmap_interpolation_points)
- old_x = numpy.arange(len(column)) + 0.5
- new_vals = numpy.interp(new_x, old_x, column)
- res.append(new_vals)
- ncmap = numpy.array(res)
- else:
- interpolation = style.heatmap_interpolation
+ labels = list(map(float2str, (new_bins_edges[:-1] + new_bins_edges[1:]) / 2)) + \
+ [float2str(new_bins_edges[-1]) + "+"]
+ seaborn.heatmap(ncmap[:,::-1].T, xticklabels=False, cmap="Blues", ax=ax)
+ ax.set_yticklabels(labels, rotation='horizontal')
+ ax.set_xticklabels([])
- ax.imshow(ncmap[:,::-1].T,
- interpolation=interpolation,
- extent=(xmin, xmax, ymin, ymax),
- cmap=colors.imshow_colormap)
+ # plot overall histogram
+ # =======================
- ax.set_aspect((xmax - xmin) / (ymax - ymin) * (6 / 9))
- ax.set_ylabel("Latency, ms")
- ax.set_xlabel("Test time, s")
+ ax2 = fig.add_subplot(gs[0, boxes - 1])
+ ax2.set_yticklabels([])
+ ax2.set_xticklabels([])
- plt.title(title)
+ histo = ncmap.sum(axis=0).reshape((-1,))
+ ax2.set_ylim(top=histo.size, bottom=0)
+ plt.barh(numpy.arange(histo.size) + 0.5, width=histo, axes=ax2)
+
+ # Set labels
+ # ==========
+
+ ax.set_title(title)
+ ax.set_ylabel(ylabel)
+ ax.set_xlabel(xlabel)
+
@provide_plot
@@ -873,28 +821,6 @@
"""Creates graphs, which show how IOPS and Latency depend on block size"""
-def summ_sensors(rstorage: ResultStorage,
- nodes: List[str],
- sensor: str,
- metric: str,
- time_range: Tuple[int, int]) -> Optional[numpy.array]:
-
- res = None # type: Optional[numpy.array]
- for node_id in nodes:
- for _, groups in rstorage.iter_sensors(node_id=node_id, sensor=sensor, metric=metric):
- data = get_sensor_for_time_range(rstorage,
- node_id=node_id,
- sensor=sensor,
- dev=groups['dev'],
- metric=metric,
- time_range=time_range)
- if res is None:
- res = data
- else:
- res += data
- return res
-
-
# IOPS/latency distribution
class StatInfo(JobReporter):
"""Statistic info for job results"""
@@ -957,49 +883,44 @@
["Data transfered", b2ssize(io_transfered) + "B", "-"]
]
-
storage = rstorage.storage
nodes = storage.load_list(NodeInfo, 'all_nodes') # type: List[NodeInfo]
- storage_nodes = [node.node_id for node in nodes if node.roles.intersection(STORAGE_ROLES)]
- test_nodes = [node.node_id for node in nodes if "testnode" in node.roles]
-
- trange = (job.reliable_info_range[0] / 1000, job.reliable_info_range[1] / 1000)
+ trange = (job.reliable_info_range[0] // 1000, job.reliable_info_range[1] // 1000)
ops_done = io_transfered / fjob.bsize / KB
all_metrics = [
- ("Test nodes net send", 'net-io', 'send_bytes', b2ssize, test_nodes, "B", io_transfered),
- ("Test nodes net recv", 'net-io', 'recv_bytes', b2ssize, test_nodes, "B", io_transfered),
+ ("Test nodes net send", 'net-io', 'send_bytes', b2ssize, ['testnode'], "B", io_transfered),
+ ("Test nodes net recv", 'net-io', 'recv_bytes', b2ssize, ['testnode'], "B", io_transfered),
- ("Test nodes disk write", 'block-io', 'sectors_written', b2ssize, test_nodes, "B", io_transfered),
- ("Test nodes disk read", 'block-io', 'sectors_read', b2ssize, test_nodes, "B", io_transfered),
- ("Test nodes writes", 'block-io', 'writes_completed', b2ssize_10, test_nodes, "OP", ops_done),
- ("Test nodes reads", 'block-io', 'reads_completed', b2ssize_10, test_nodes, "OP", ops_done),
+ ("Test nodes disk write", 'block-io', 'sectors_written', b2ssize, ['testnode'], "B", io_transfered),
+ ("Test nodes disk read", 'block-io', 'sectors_read', b2ssize, ['testnode'], "B", io_transfered),
+ ("Test nodes writes", 'block-io', 'writes_completed', b2ssize_10, ['testnode'], "OP", ops_done),
+ ("Test nodes reads", 'block-io', 'reads_completed', b2ssize_10, ['testnode'], "OP", ops_done),
- ("Storage nodes net send", 'net-io', 'send_bytes', b2ssize, storage_nodes, "B", io_transfered),
- ("Storage nodes net recv", 'net-io', 'recv_bytes', b2ssize, storage_nodes, "B", io_transfered),
+ ("Storage nodes net send", 'net-io', 'send_bytes', b2ssize, STORAGE_ROLES, "B", io_transfered),
+ ("Storage nodes net recv", 'net-io', 'recv_bytes', b2ssize, STORAGE_ROLES, "B", io_transfered),
- ("Storage nodes disk write", 'block-io', 'sectors_written', b2ssize, storage_nodes, "B", io_transfered),
- ("Storage nodes disk read", 'block-io', 'sectors_read', b2ssize, storage_nodes, "B", io_transfered),
- ("Storage nodes writes", 'block-io', 'writes_completed', b2ssize_10, storage_nodes, "OP", ops_done),
- ("Storage nodes reads", 'block-io', 'reads_completed', b2ssize_10, storage_nodes, "OP", ops_done),
+ ("Storage nodes disk write", 'block-io', 'sectors_written', b2ssize, STORAGE_ROLES, "B", io_transfered),
+ ("Storage nodes disk read", 'block-io', 'sectors_read', b2ssize, STORAGE_ROLES, "B", io_transfered),
+ ("Storage nodes writes", 'block-io', 'writes_completed', b2ssize_10, STORAGE_ROLES, "OP", ops_done),
+ ("Storage nodes reads", 'block-io', 'reads_completed', b2ssize_10, STORAGE_ROLES, "OP", ops_done),
]
all_agg = {}
- for descr, sensor, metric, ffunc, nodes, units, denom in all_metrics:
+ for descr, sensor, metric, ffunc, roles, units, denom in all_metrics:
if not nodes:
continue
- res_arr = summ_sensors(rstorage, nodes=nodes, sensor=sensor, metric=metric, time_range=trange)
- if res_arr is None:
+ res_ts = summ_sensors(rstorage, roles, sensor=sensor, metric=metric, time_range=trange)
+ if res_ts is None:
continue
- agg = res_arr.sum()
+ agg = res_ts.data.sum()
resource_data.append([descr, ffunc(agg) + units, "{:.1f}".format(agg / denom)])
all_agg[descr] = agg
-
cums = [
("Test nodes writes", "Test nodes reads", "Total test ops", b2ssize_10, "OP", ops_done),
("Storage nodes writes", "Storage nodes reads", "Total storage ops", b2ssize_10, "OP", ops_done),
@@ -1018,6 +939,120 @@
yield Menu1st.per_job, job.summary, HTMLBlock(res)
+# CPU load
+class CPULoadPlot(JobReporter):
+ def get_divs(self,
+ suite: SuiteConfig,
+ job: JobConfig,
+ rstorage: ResultStorage) -> Iterator[Tuple[str, str, HTMLBlock]]:
+
+ trange = (job.reliable_info_range[0] // 1000, job.reliable_info_range[1] // 1000)
+
+ # plot CPU time
+ for rt, roles in [('storage', STORAGE_ROLES), ('test', ['testnode'])]:
+ cpu_ts = {}
+ cpu_metrics = "idle guest iowait irq nice sirq steal sys user".split()
+ for name in cpu_metrics:
+ cpu_ts[name] = summ_sensors(rstorage, roles, sensor='system-cpu', metric=name, time_range=trange)
+
+ it = iter(cpu_ts.values())
+ total_over_time = next(it).data.copy()
+ for ts in it:
+ total_over_time += ts.data
+
+ fname = plot_simple_over_time(rstorage, cpu_ts['idle'].source(metric='allcpu', tag=rt + '.plt.svg'),
+ tss=[(name, ts.data * 100 / total_over_time) for name, ts in cpu_ts.items()],
+ average=True,
+ ylabel="CPU time %",
+ title="{} nodes CPU usage".format(rt.capitalize()))
+
+ yield Menu1st.per_job, job.summary, HTMLBlock(html.img(fname))
+
+
+# IO time and QD
+class QDIOTimeHeatmap(JobReporter):
+ def get_divs(self,
+ suite: SuiteConfig,
+ job: JobConfig,
+ rstorage: ResultStorage) -> Iterator[Tuple[str, str, HTMLBlock]]:
+
+ # TODO: fix this hardcode, need to track what devices are actually used on test and storage nodes
+ # use saved storage info in nodes
+
+ journal_devs = None
+ storage_devs = None
+ test_nodes_devs = ['rbd0']
+
+ for node in find_nodes_by_roles(rstorage, STORAGE_ROLES):
+ cjd = set(node.params['ceph_journal_devs'])
+ if journal_devs is None:
+ journal_devs = cjd
+ else:
+ assert journal_devs == cjd, "{!r} != {!r}".format(journal_devs, cjd)
+
+ csd = set(node.params['ceph_storage_devs'])
+ if storage_devs is None:
+ storage_devs = csd
+ else:
+ assert storage_devs == csd, "{!r} != {!r}".format(storage_devs, csd)
+
+ storage_nodes_devs = list(journal_devs) + list(storage_devs)
+ trange = (job.reliable_info_range[0] // 1000, job.reliable_info_range[1] // 1000)
+
+ for name, devs, roles in [('storage', storage_devs, STORAGE_ROLES),
+ ('journal', journal_devs, STORAGE_ROLES),
+ ('test', test_nodes_devs, ['testnode'])]:
+ # QD heatmap
+ ioq2d = find_sensors_to_2d(rstorage, roles, sensor='block-io', devs=devs,
+ metric='io_queue', time_range=trange)
+ fname = plot_hmap_from_2d(rstorage, DataSource(suite.storage_id,
+ job.storage_id,
+ AGG_TAG,
+ 'block-io',
+ name,
+ metric='io_queue',
+ tag="hmap.svg"),
+ ioq2d, ylabel="IO QD", title=name.capitalize() + " devs QD",
+ bins=StyleProfile.qd_bins,
+ xlabel='Time') # type: str
+ yield Menu1st.per_job, job.summary, HTMLBlock(html.img(fname))
+
+ # Block size heatmap
+ wc2d = find_sensors_to_2d(rstorage, roles, sensor='block-io', devs=devs,
+ metric='writes_completed', time_range=trange)
+ wc2d[wc2d < 1E-3] = 1
+ sw2d = find_sensors_to_2d(rstorage, roles, sensor='block-io', devs=devs,
+ metric='sectors_written', time_range=trange)
+ data2d = sw2d / wc2d / 1024
+ fname = plot_hmap_from_2d(rstorage, DataSource(suite.storage_id,
+ job.storage_id,
+ AGG_TAG,
+ 'block-io',
+ name,
+ metric='wr_block_size',
+ tag="hmap.svg"),
+ data2d, ylabel="IO bsize, KiB", title=name.capitalize() + " write block size",
+ xlabel='Time',
+ bins=StyleProfile.block_size_bins) # type: str
+ yield Menu1st.per_job, job.summary, HTMLBlock(html.img(fname))
+
+ # iotime heatmap
+ wtime2d = find_sensors_to_2d(rstorage, roles, sensor='block-io', devs=devs,
+ metric='io_time', time_range=trange)
+ fname = plot_hmap_from_2d(rstorage, DataSource(suite.storage_id,
+ job.storage_id,
+ AGG_TAG,
+ 'block-io',
+ name,
+ metric='io_time',
+ tag="hmap.svg"),
+ wtime2d, ylabel="IO time (ms) per second",
+ title=name.capitalize() + " iotime",
+ xlabel='Time',
+ bins=StyleProfile.iotime_bins) # type: str
+ yield Menu1st.per_job, job.summary, HTMLBlock(html.img(fname))
+
+
# IOPS/latency distribution
class IOHist(JobReporter):
"""IOPS.latency distribution histogram"""
@@ -1032,20 +1067,17 @@
yield Menu1st.per_job, fjob.summary, HTMLBlock(html.H2(html.center("Load histograms")))
- agg_lat = get_aggregated(rstorage, suite, fjob, "lat")
- bins_edges = numpy.array(get_lat_vals(agg_lat.data.shape[1]), dtype='float32') / 1000 # convert us to ms
- lat_stat_prop = calc_histo_stat_props(agg_lat, bins_edges, rebins_count=StyleProfile.hist_lat_boxes)
-
- # import IPython
- # IPython.embed()
-
- long_summary = cast(FioJobParams, fjob.params).long_summary
-
- title = "Latency distribution"
- units = "ms"
-
- fpath = plot_hist(rstorage, agg_lat.source(tag='hist.svg'), title, units, lat_stat_prop) # type: str
- yield Menu1st.per_job, fjob.summary, HTMLBlock(html.img(fpath))
+ # agg_lat = get_aggregated(rstorage, suite, fjob, "lat")
+ # # bins_edges = numpy.array(get_lat_vals(agg_lat.data.shape[1]), dtype='float32') / 1000 # convert us to ms
+ # lat_stat_prop = calc_histo_stat_props(agg_lat, bins_edges=None, rebins_count=StyleProfile.hist_lat_boxes)
+ #
+ # long_summary = cast(FioJobParams, fjob.params).long_summary
+ #
+ # title = "Latency distribution"
+ # units = "ms"
+ #
+ # fpath = plot_hist(rstorage, agg_lat.source(tag='hist.svg'), title, units, lat_stat_prop) # type: str
+ # yield Menu1st.per_job, fjob.summary, HTMLBlock(html.img(fpath))
agg_io = get_aggregated(rstorage, suite, fjob, "bw")
@@ -1075,30 +1107,35 @@
fjob = cast(FioJobConfig, job)
- yield Menu1st.per_job, fjob.summary, HTMLBlock(html.H2(html.center("Load over time")))
-
agg_io = get_aggregated(rstorage, suite, fjob, "bw")
if fjob.bsize >= LARGE_BLOCKS:
- title = "Bandwidth"
+ title = "Fio measured Bandwidth over time"
units = "MiBps"
agg_io.data //= MiB2KiB
else:
- title = "IOPS"
+ title = "Fio measured IOPS over time"
agg_io.data //= fjob.bsize
units = "IOPS"
fpath = plot_v_over_time(rstorage, agg_io.source(tag='ts.svg'), title, units, agg_io) # type: str
yield Menu1st.per_job, fjob.summary, HTMLBlock(html.img(fpath))
- agg_lat = get_aggregated(rstorage, suite, fjob, "lat")
- bins_edges = numpy.array(get_lat_vals(agg_lat.data.shape[1]), dtype='float32') / 1000
- title = "Latency"
+ agg_lat = get_aggregated(rstorage, suite, fjob, "lat").copy()
+ TARGET_UNITS = 'ms'
+ coef = unit_conversion_coef(agg_lat.units, TARGET_UNITS)
+ agg_lat.histo_bins = agg_lat.histo_bins.copy() * float(coef)
+ agg_lat.units = TARGET_UNITS
- fpath = plot_lat_over_time(rstorage, agg_lat.source(tag='ts.svg'), title, agg_lat, bins_edges) # type: str
+ fpath = plot_lat_over_time(rstorage, agg_lat.source(tag='ts.svg'), "Latency",
+ agg_lat, ylabel="Latency, " + agg_lat.units) # type: str
yield Menu1st.per_job, fjob.summary, HTMLBlock(html.img(fpath))
- title = "Latency heatmap"
- fpath = plot_heatmap(rstorage, agg_lat.source(tag='hmap.png'), title, agg_lat, bins_edges) # type: str
+ fpath = plot_histo_heatmap(rstorage,
+ agg_lat.source(tag='hmap.svg'),
+ "Latency heatmap",
+ agg_lat,
+ ylabel="Latency, " + agg_lat.units,
+ xlabel='Test time') # type: str
yield Menu1st.per_job, fjob.summary, HTMLBlock(html.img(fpath))
@@ -1128,40 +1165,21 @@
storage_sensors = [
('block-io', 'reads_completed', "Read ops", 'iops'),
('block-io', 'writes_completed', "Write ops", 'iops'),
- ('block-io', 'sectors_read', "Read kb", 'kb'),
- ('block-io', 'sectors_written', "Write kb", 'kb'),
+ ('block-io', 'sectors_read', "Read kb", 'KB'),
+ ('block-io', 'sectors_written', "Write kb", 'KB'),
]
def get_divs(self,
suite: SuiteConfig,
job: JobConfig,
rstorage: ResultStorage) -> Iterator[Tuple[str, str, HTMLBlock]]:
- # split nodes on test and other
- storage = rstorage.storage
- nodes = storage.load_list(NodeInfo, "all_nodes") # type: List[NodeInfo]
-
yield Menu1st.per_job, job.summary, HTMLBlock(html.H2(html.center("Cluster load")))
- test_nodes = {node.node_id for node in nodes if 'testnode' in node.roles}
- cluster_nodes = {node.node_id for node in nodes if 'testnode' not in node.roles}
# convert ms to s
time_range = (job.reliable_info_range[0] // MS2S, job.reliable_info_range[1] // MS2S)
- len = time_range[1] - time_range[0]
- for sensor, metric, sensor_title, units in self.storage_sensors:
- sum_testnode = numpy.zeros((len,))
- sum_other = numpy.zeros((len,))
- for path, groups in rstorage.iter_sensors(sensor=sensor, metric=metric):
- # todo: should return sensor units
- data = get_sensor_for_time_range(rstorage,
- groups['node_id'],
- sensor,
- groups['dev'],
- metric, time_range)
- if groups['node_id'] in test_nodes:
- sum_testnode += data
- else:
- sum_other += data
+ for sensor, metric, sensor_title, units in self.storage_sensors:
+ ts = summ_sensors(rstorage, ['testnode'], sensor, metric, time_range)
ds = DataSource(suite_id=suite.storage_id,
job_id=job.storage_id,
node_id="test_nodes",
@@ -1170,14 +1188,16 @@
metric=metric,
tag="ts.svg")
- # s to ms
+ data = ts.data if units != 'KB' else ts.data * float(unit_conversion_coef(ts.units, 'KB'))
+
ts = TimeSeries(name="",
- times=numpy.arange(*time_range) * MS2S,
- data=sum_testnode,
+ times=numpy.arange(*time_range),
+ data=data,
raw=None,
- units=units,
- time_units="us",
- source=ds)
+ units=units if ts.units is None else ts.units,
+ time_units=ts.time_units,
+ source=ds,
+ histo_bins=ts.histo_bins)
fpath = plot_v_over_time(rstorage, ds, sensor_title, sensor_title, ts=ts) # type: str
yield Menu1st.per_job, job.summary, HTMLBlock(html.img(fpath))
@@ -1211,11 +1231,12 @@
def run(self, ctx: TestRun) -> None:
rstorage = ResultStorage(ctx.storage)
- job_reporters = [StatInfo(), IOTime(), IOHist(), ClusterLoad()] # type: List[JobReporter]
- reporters = [IO_QD()] # type: List[Reporter]
+ job_reporters = [StatInfo(), IOTime(), IOHist(), ClusterLoad(), CPULoadPlot(),
+ QDIOTimeHeatmap()] # type: List[JobReporter]
+ reporters = []
+ # reporters = [IO_QD()] # type: List[Reporter]
# job_reporters = [ClusterLoad()]
- # reporters = []
root_dir = os.path.dirname(os.path.dirname(wally.__file__))
doc_templ_path = os.path.join(root_dir, "report_templates/index.html")
@@ -1232,25 +1253,30 @@
# StyleProfile.__dict__.update(ctx.config.reporting.style.raw())
items = defaultdict(lambda: defaultdict(list)) # type: Dict[str, Dict[str, List[HTMLBlock]]]
-
+ DEBUG = False
# TODO: filter reporters
for suite in rstorage.iter_suite(FioTest.name):
all_jobs = list(rstorage.iter_job(suite))
all_jobs.sort(key=lambda job: job.params)
for job in all_jobs:
for reporter in job_reporters:
+ logger.debug("Start reporter %s on job %s suite %s",
+ reporter.__class__.__name__, job.summary, suite.test_type)
for block, item, html in reporter.get_divs(suite, job, rstorage):
items[block][item].append(html)
if DEBUG:
break
for reporter in reporters:
+ logger.debug("Start reporter %s on suite %s", reporter.__class__.__name__, suite.test_type)
for block, item, html in reporter.get_divs(suite, rstorage):
items[block][item].append(html)
if DEBUG:
break
+ logger.debug("Generating result html")
+
for idx_1st, menu_1st in enumerate(sorted(items, key=lambda x: menu_1st_order.index(x))):
menu_block.append(
'<a href="#item{}" class="nav-group" data-toggle="collapse" data-parent="#MainMenu">{}</a>'