many updates in report code and in storage structure, this commit is broken
diff --git a/wally/report.py b/wally/report.py
index f8d8c5a..0b0540e 100644
--- a/wally/report.py
+++ b/wally/report.py
@@ -1,36 +1,32 @@
import os
-import re
import abc
-import bisect
import logging
from io import BytesIO
from functools import wraps
-from typing import Dict, Any, Iterator, Tuple, cast, List, Callable
+from typing import Dict, Any, Iterator, Tuple, cast, List, Callable, Set, Optional
from collections import defaultdict
import numpy
-import matplotlib
-# have to be before pyplot import to avoid tkinter(default graph frontend) import error
-matplotlib.use('svg')
-import matplotlib.pyplot as plt
import scipy.stats
+import matplotlib.pyplot as plt
import wally
from . import html
-from .utils import b2ssize
from .stage import Stage, StepOrder
from .test_run_class import TestRun
from .hlstorage import ResultStorage
from .node_interfaces import NodeInfo
-from .storage import Storage
-from .statistic import calc_norm_stat_props, calc_histo_stat_props
-from .result_classes import (StatProps, DataSource, TimeSeries, TestSuiteConfig,
- NormStatProps, HistoStatProps, TestJobConfig)
+from .utils import b2ssize, b2ssize_10, STORAGE_ROLES
+from .statistic import (calc_norm_stat_props, calc_histo_stat_props, moving_average, moving_dev,
+ hist_outliers_perc, ts_hist_outliers_perc, find_ouliers_ts, approximate_curve,
+ rebin_histogram)
+from .result_classes import (StatProps, DataSource, TimeSeries, NormStatProps, HistoStatProps, SuiteConfig,
+ IResultStorage)
from .suits.io.fio_hist import get_lat_vals, expected_lat_bins
from .suits.io.fio import FioTest, FioJobConfig
-from .suits.io.fio_task_parser import FioTestSumm
-from .statistic import approximate_curve, average, dev
+from .suits.io.fio_job import FioJobParams
+from .suits.job import JobConfig
logger = logging.getLogger("wally")
@@ -48,34 +44,61 @@
# ---------------- PROFILES ------------------------------------------------------------------------------------------
+# this is default values, real values is loaded from config
+
class ColorProfile:
primary_color = 'b'
suppl_color1 = 'teal'
suppl_color2 = 'magenta'
+ suppl_color3 = 'orange'
box_color = 'y'
+ err_color = 'red'
noise_alpha = 0.3
subinfo_alpha = 0.7
+ imshow_colormap = None # type: str
+
class StyleProfile:
grid = True
tide_layout = True
hist_boxes = 10
+ hist_lat_boxes = 25
+ hm_hist_bins_count = 25
min_points_for_dev = 5
dev_range_x = 2.0
dev_perc = 95
- avg_range = 20
+ point_shape = 'o'
+ err_point_shape = '*'
- curve_approx_level = 5
+ avg_range = 20
+ approx_average = True
+
+ curve_approx_level = 6
curve_approx_points = 100
assert avg_range >= min_points_for_dev
+ # figure size in inches
+ figsize = (10, 6)
+
extra_io_spine = True
legend_for_eng = True
+ heatmap_interpolation = '1d'
+ heatmap_interpolation_points = 300
+ outliers_q_nd = 3.0
+ outliers_hide_q_nd = 4.0
+ outliers_lat = (0.01, 0.995)
+
+ violin_instead_of_box = True
+ violin_point_count = 30000
+
+ heatmap_colorbar = False
+
+ min_iops_vs_qd_jobs = 3
units = {
'bw': ("MiBps", MiB2KiB, "bandwith"),
@@ -126,36 +149,10 @@
# -------------- AGGREGATION AND STAT FUNCTIONS ----------------------------------------------------------------------
-rexpr = {
- 'sensor': r'(?P<sensor>[-a-z]+)',
- 'dev': r'(?P<dev>[^.]+)',
- 'metric': r'(?P<metric>[a-z_]+)',
- 'node': r'(?P<node>\d+\.\d+\.\d+\.\d+:\d+)',
-}
-def iter_sensors(storage: Storage, node: str = None, sensor: str = None, dev: str = None, metric: str = None):
- if node is None:
- node = rexpr['node']
- if sensor is None:
- sensor = rexpr['sensor']
- if dev is None:
- dev = rexpr['dev']
- if metric is None:
- metric = rexpr['metric']
-
- rr = r"{}_{}\.{}\.{}$".format(node, sensor, dev, metric)
- sensor_name_re = re.compile(rr)
-
- for is_file, sensor_data_name in storage.list("sensors"):
- if is_file:
- rr = sensor_name_re.match(sensor_data_name)
- if rr:
- yield 'sensors/' + sensor_data_name, rr.groupdict()
-
-
-def make_iosum(rstorage: ResultStorage, suite: TestSuiteConfig, job: FioJobConfig) -> IOSummary:
+def make_iosum(rstorage: ResultStorage, suite: SuiteConfig, job: FioJobConfig) -> IOSummary:
lat = get_aggregated(rstorage, suite, job, "lat")
- bins_edges = numpy.array(get_lat_vals(lat.second_axis_size), dtype='float32') / 1000
+ bins_edges = numpy.array(get_lat_vals(lat.data.shape[1]), dtype='float32') / 1000
io = get_aggregated(rstorage, suite, job, "bw")
return IOSummary(job.qd,
@@ -191,34 +188,38 @@
# yield suite, fjob
-def get_aggregated(rstorage: ResultStorage, suite: TestSuiteConfig, job: FioJobConfig, sensor: str) -> TimeSeries:
- tss = list(rstorage.iter_ts(suite, job, sensor=sensor))
+AGG_TAG = 'ALL'
+
+
+def get_aggregated(rstorage: ResultStorage, suite: SuiteConfig, job: FioJobConfig, metric: str) -> TimeSeries:
+ tss = list(rstorage.iter_ts(suite, job, sensor=metric))
ds = DataSource(suite_id=suite.storage_id,
job_id=job.storage_id,
- node_id="__all__",
- dev='fio',
- sensor=sensor,
- tag=None)
+ node_id=AGG_TAG,
+ sensor='fio',
+ dev=AGG_TAG,
+ metric=metric,
+ tag='csv')
- agg_ts = TimeSeries(sensor,
+ agg_ts = TimeSeries(metric,
raw=None,
source=ds,
data=numpy.zeros(tss[0].data.shape, dtype=tss[0].data.dtype),
times=tss[0].times.copy(),
- second_axis_size=tss[0].second_axis_size)
+ units=tss[0].units)
for ts in tss:
- if sensor == 'lat' and ts.second_axis_size != expected_lat_bins:
+ if metric == 'lat' and (len(ts.data.shape) != 2 or ts.data.shape[1] != expected_lat_bins):
logger.error("Sensor %s.%s on node %s has" +
- "second_axis_size=%s. Can only process sensors with second_axis_size=%s.",
+ "shape=%s. Can only process sensors with shape=[X, %s].",
ts.source.dev, ts.source.sensor, ts.source.node_id,
- ts.second_axis_size, expected_lat_bins)
+ ts.data.shape, expected_lat_bins)
continue
- if sensor != 'lat' and ts.second_axis_size != 1:
+ if metric != 'lat' and len(ts.data.shape) != 1:
logger.error("Sensor %s.%s on node %s has" +
- "second_axis_size=%s. Can only process sensors with second_axis_size=1.",
- ts.source.dev, ts.source.sensor, ts.source.node_id, ts.second_axis_size)
+ "shape=%s. Can only process 1D sensors.",
+ ts.source.dev, ts.source.sensor, ts.source.node_id, ts.data.shape)
continue
# TODO: match times on different ts
@@ -227,24 +228,135 @@
return agg_ts
+def is_sensor_numarray(sensor: str, metric: str) -> bool:
+ """Returns True if sensor provides one-dimension array of numeric values. One number per one measurement."""
+ return True
+
+
+LEVEL_SENSORS = {("block-io", "io_queue"),
+ ("system-cpu", "procs_blocked"),
+ ("system-cpu", "procs_queue")}
+
+
+def is_level_sensor(sensor: str, metric: str) -> bool:
+ """Returns True if sensor measure level of any kind, E.g. queue depth."""
+ return (sensor, metric) in LEVEL_SENSORS
+
+
+def is_delta_sensor(sensor: str, metric: str) -> bool:
+ """Returns True if sensor provides deltas for cumulative value. E.g. io completed in given period"""
+ return not is_level_sensor(sensor, metric)
+
+
+def get_sensor_for_time_range(storage: IResultStorage,
+ node_id: str,
+ sensor: str,
+ dev: str,
+ metric: str,
+ time_range: Tuple[int, int]) -> numpy.array:
+ """Return sensor values for given node for given period. Return per second estimated values array
+
+ Raise an error if required range is not full covered by data in storage.
+ First it finds range of results from sensor, which fully covers requested range.
+ ...."""
+
+ ds = DataSource(node_id=node_id, sensor=sensor, dev=dev, metric=metric)
+ sensor_data = storage.load_sensor(ds)
+ assert sensor_data.time_units == 'us'
+
+ # collected_at is array of pairs (collection_started_at, collection_finished_at)
+ # extract start time from each pair
+ collection_start_at = sensor_data.times[::2] # type: numpy.array
+
+ MICRO = 1000000
+
+ # convert seconds to us
+ begin = time_range[0] * MICRO
+ end = time_range[1] * MICRO
+
+ if begin < collection_start_at[0] or end > collection_start_at[-1] or end <= begin:
+ raise AssertionError(("Incorrect data for get_sensor - time_range={!r}, collected_at=[{}, ..., {}]," +
+ "sensor = {}_{}.{}.{}").format(time_range,
+ sensor_data.times[0] // MICRO,
+ sensor_data.times[-1] // MICRO,
+ node_id, sensor, dev, metric))
+
+ pos1, pos2 = numpy.searchsorted(collection_start_at, (begin, end))
+
+ # current real data time chunk begin time
+ edge_it = iter(collection_start_at[pos1 - 1: pos2 + 1])
+
+ # current real data value
+ val_it = iter(sensor_data.data[pos1 - 1: pos2 + 1])
+
+ # result array, cumulative value per second
+ result = numpy.zeros((end - begin) // MICRO)
+ idx = 0
+ curr_summ = 0
+
+ # end of current time slot
+ results_cell_ends = begin + MICRO
+
+ # hack to unify looping
+ real_data_end = next(edge_it)
+ while results_cell_ends <= end:
+ real_data_start = real_data_end
+ real_data_end = next(edge_it)
+ real_val_left = next(val_it)
+
+ # real data "speed" for interval [real_data_start, real_data_end]
+ real_val_ps = float(real_val_left) / (real_data_end - real_data_start)
+
+ while real_data_end >= results_cell_ends and results_cell_ends <= end:
+ # part of current real value, which is fit into current result cell
+ curr_real_chunk = int((results_cell_ends - real_data_start) * real_val_ps)
+
+ # calculate rest of real data for next result cell
+ real_val_left -= curr_real_chunk
+ result[idx] = curr_summ + curr_real_chunk
+ idx += 1
+ curr_summ = 0
+
+ # adjust real data start time
+ real_data_start = results_cell_ends
+ results_cell_ends += MICRO
+
+ # don't lost any real data
+ curr_summ += real_val_left
+
+ return result
+
+
# -------------- PLOT HELPERS FUNCTIONS ------------------------------------------------------------------------------
-def get_emb_data_svg(plt: Any) -> bytes:
+def get_emb_data_svg(plt: Any, format: str = 'svg') -> bytes:
bio = BytesIO()
- plt.savefig(bio, format='svg')
- img_start = "<!-- Created with matplotlib (http://matplotlib.org/) -->"
- return bio.getvalue().decode("utf8").split(img_start, 1)[1].encode("utf8")
+ if format in ('png', 'jpg'):
+ plt.savefig(bio, format=format)
+ return bio.getvalue()
+ elif format == 'svg':
+ plt.savefig(bio, format='svg')
+ img_start = "<!-- Created with matplotlib (http://matplotlib.org/) -->"
+ return bio.getvalue().decode("utf8").split(img_start, 1)[1].encode("utf8")
def provide_plot(func: Callable[..., None]) -> Callable[..., str]:
@wraps(func)
- def closure1(storage: ResultStorage, path: DataSource, *args, **kwargs) -> str:
+ def closure1(storage: ResultStorage,
+ path: DataSource,
+ *args, **kwargs) -> str:
fpath = storage.check_plot_file(path)
if not fpath:
+ format = path.tag.split(".")[-1]
+
+ plt.figure(figsize=StyleProfile.figsize)
+ plt.subplots_adjust(right=0.66)
+
func(*args, **kwargs)
- fpath = storage.put_plot_file(get_emb_data_svg(plt), path)
+ fpath = storage.put_plot_file(get_emb_data_svg(plt, format=format), path)
+ logger.debug("Plot %s saved to %r", path, fpath)
plt.clf()
- logger.debug("Save plot for %s to %r", path, fpath)
+ plt.close('all')
return fpath
return closure1
@@ -269,11 +381,9 @@
style: Any = StyleProfile) -> None:
# TODO: unit should came from ts
- total = sum(prop.bins_populations)
- mids = prop.bins_mids
- normed_bins = [population / total for population in prop.bins_populations]
- bar_width = mids[1] - mids[0]
- plt.bar(mids - bar_width / 2, normed_bins, color=colors.box_color, width=bar_width, label="Real data")
+ normed_bins = prop.bins_populations / prop.bins_populations.sum()
+ bar_width = prop.bins_edges[1] - prop.bins_edges[0]
+ plt.bar(prop.bins_edges, normed_bins, color=colors.box_color, width=bar_width, label="Real data")
plt.xlabel(units)
plt.ylabel("Value probability")
@@ -284,18 +394,20 @@
nprop = cast(NormStatProps, prop)
stats = scipy.stats.norm(nprop.average, nprop.deviation)
- # xpoints = numpy.linspace(mids[0], mids[-1], style.curve_approx_points)
- # ypoints = stats.pdf(xpoints) / style.curve_approx_points
+ new_edges, step = numpy.linspace(prop.bins_edges[0], prop.bins_edges[-1],
+ len(prop.bins_edges) * 10, retstep=True)
- edges, step = numpy.linspace(mids[0], mids[-1], len(mids) * 10, retstep=True)
-
- ypoints = stats.cdf(edges) * 11
+ ypoints = stats.cdf(new_edges) * 11
ypoints = [next - prev for (next, prev) in zip(ypoints[1:], ypoints[:-1])]
- xpoints = (edges[1:] + edges[:-1]) / 2
+ xpoints = (new_edges[1:] + new_edges[:-1]) / 2
- plt.plot(xpoints, ypoints, color=colors.primary_color, label="Expected from\nnormal distribution")
+ plt.plot(xpoints, ypoints, color=colors.primary_color, label="Expected from\nnormal\ndistribution")
dist_plotted = True
+ plt.gca().set_xlim(left=prop.bins_edges[0])
+ if prop.log_bins:
+ plt.xscale('log')
+
apply_style(style, eng=True, no_legend=not dist_plotted)
@@ -308,58 +420,67 @@
min_time = min(ts.times)
# /1000 is us to ms conversion
- time_points = [(val_time - min_time) / 1000 for val_time in ts.times]
+ time_points = numpy.array([(val_time - min_time) / 1000 for val_time in ts.times])
+
+ outliers_idxs = find_ouliers_ts(ts.data, cut_range=style.outliers_q_nd)
+ outliers_4q_idxs = find_ouliers_ts(ts.data, cut_range=style.outliers_hide_q_nd)
+ normal_idxs = numpy.logical_not(outliers_idxs)
+ outliers_idxs = outliers_idxs & numpy.logical_not(outliers_4q_idxs)
+ hidden_outliers_count = numpy.count_nonzero(outliers_4q_idxs)
+
+ data = ts.data[normal_idxs]
+ data_times = time_points[normal_idxs]
+ outliers = ts.data[outliers_idxs]
+ outliers_times = time_points[outliers_idxs]
alpha = colors.noise_alpha if plot_avg_dev else 1.0
- plt.plot(time_points, ts.data, "o", color=colors.primary_color, alpha=alpha, label="Data")
+ plt.plot(data_times, data, style.point_shape,
+ color=colors.primary_color, alpha=alpha, label="Data")
+ plt.plot(outliers_times, outliers, style.err_point_shape,
+ color=colors.err_color, label="Outliers")
- if plot_avg_dev:
- avg_vals = []
- low_vals_dev = []
- hight_vals_dev = []
- avg_times = []
- dev_times = []
+ has_negative_dev = False
+ plus_minus = "\xb1"
- start = (len(ts.data) % style.avg_range) // 2
- points = list(range(start, len(ts.data) + 1, style.avg_range))
+ if plot_avg_dev and len(data) < style.avg_range * 2:
+ logger.warning("Array %r to small to plot average over %s points", title, style.avg_range)
+ elif plot_avg_dev:
+ avg_vals = moving_average(data, style.avg_range)
+ dev_vals = moving_dev(data, style.avg_range)
+ avg_times = moving_average(data_times, style.avg_range)
- for begin, end in zip(points[:-1], points[1:]):
- vals = ts.data[begin: end]
+ if style.approx_average:
+ avg_vals = approximate_curve(avg_times, avg_vals, avg_times, style.curve_approx_level)
+ dev_vals = approximate_curve(avg_times, dev_vals, avg_times, style.curve_approx_level)
- cavg = average(vals)
- cdev = dev(vals)
- tavg = average(time_points[begin: end])
+ plt.plot(avg_times, avg_vals, c=colors.suppl_color1, label="Average")
- avg_vals.append(cavg)
- avg_times.append(tavg)
-
- low_vals_dev.append(cavg - style.dev_range_x * cdev)
- hight_vals_dev.append(cavg + style.dev_range_x * cdev)
- dev_times.append(tavg)
-
- avg_timepoints = cast(List[float], numpy.linspace(avg_times[0], avg_times[-1], style.curve_approx_points))
-
- low_vals_dev = approximate_curve(dev_times, low_vals_dev, avg_timepoints, style.curve_approx_level)
- hight_vals_dev = approximate_curve(dev_times, hight_vals_dev, avg_timepoints, style.curve_approx_level)
- new_vals_avg = approximate_curve(avg_times, avg_vals, avg_timepoints, style.curve_approx_level)
-
- plt.plot(avg_timepoints, new_vals_avg, c=colors.suppl_color1,
- label="Average\nover {}s".format(style.avg_range))
- plt.plot(avg_timepoints, low_vals_dev, c=colors.suppl_color2,
- label="Avg \xB1 {} * stdev\nover {}s".format(style.dev_range_x, style.avg_range))
- plt.plot(avg_timepoints, hight_vals_dev, c=colors.suppl_color2)
+ low_vals_dev = avg_vals - dev_vals * style.dev_range_x
+ hight_vals_dev = avg_vals + dev_vals * style.dev_range_x
+ if style.dev_range_x - int(style.dev_range_x) < 0.01:
+ plt.plot(avg_times, low_vals_dev, c=colors.suppl_color2,
+ label="{}{}*stdev".format(plus_minus, int(style.dev_range_x)))
+ else:
+ plt.plot(avg_times, low_vals_dev, c=colors.suppl_color2,
+ label="{}{}*stdev".format(plus_minus, style.dev_range_x))
+ plt.plot(avg_times, hight_vals_dev, c=colors.suppl_color2)
+ has_negative_dev = low_vals_dev.min() < 0
plt.xlim(-5, max(time_points) + 5)
-
plt.xlabel("Time, seconds from test begin")
- plt.ylabel("{}. Average and \xB1stddev over {} points".format(units, style.avg_range))
+ plt.ylabel("{}. Average and {}stddev over {} points".format(units, plus_minus, style.avg_range))
plt.title(title)
+
+ if has_negative_dev:
+ plt.gca().set_ylim(bottom=0)
+
apply_style(style, eng=True)
@provide_plot
def plot_lat_over_time(title: str, ts: TimeSeries, bins_vals: List[int], samples: int = 5,
- colors: Any = ColorProfile, style: Any = StyleProfile) -> None:
+ colors: Any = ColorProfile,
+ style: Any = StyleProfile) -> None:
min_time = min(ts.times)
times = [int(tm - min_time + 500) // 1000 for tm in ts.times]
@@ -368,42 +489,59 @@
points = [times[int(i * step + 0.5)] for i in range(samples)]
points.append(times[-1])
bounds = list(zip(points[:-1], points[1:]))
- data = numpy.array(ts.data, dtype='int32')
- data.shape = [len(ts.data) // ts.second_axis_size, ts.second_axis_size] # type: ignore
agg_data = []
positions = []
labels = []
- min_idxs = []
- max_idxs = []
-
for begin, end in bounds:
- agg_hist = numpy.sum(data[begin:end], axis=0)
+ agg_hist = ts.data[begin:end].sum(axis=0)
+
+ if style.violin_instead_of_box:
+ # cut outliers
+ idx1, idx2 = hist_outliers_perc(agg_hist, style.outliers_lat)
+ agg_hist = agg_hist[idx1:idx2]
+ curr_bins_vals = bins_vals[idx1:idx2]
+
+ correct_coef = style.violin_point_count / sum(agg_hist)
+ if correct_coef > 1:
+ correct_coef = 1
+ else:
+ curr_bins_vals = bins_vals
+ correct_coef = 1
vals = numpy.empty(shape=(numpy.sum(agg_hist),), dtype='float32')
cidx = 0
- non_zero = agg_hist.nonzero()[0]
- min_idxs.append(non_zero[0])
- max_idxs.append(non_zero[-1])
+ non_zero, = agg_hist.nonzero()
for pos in non_zero:
- vals[cidx:cidx + agg_hist[pos]] = bins_vals[pos]
- cidx += agg_hist[pos]
+ count = int(agg_hist[pos] * correct_coef + 0.5)
- agg_data.append(vals)
+ if count != 0:
+ vals[cidx: cidx + count] = curr_bins_vals[pos]
+ cidx += count
+
+ agg_data.append(vals[:cidx])
positions.append((end + begin) / 2)
labels.append(str((end + begin) // 2))
- min_y = bins_vals[min(min_idxs)]
- max_y = bins_vals[max(max_idxs)]
+ if style.violin_instead_of_box:
+ patches = plt.violinplot(agg_data,
+ positions=positions,
+ showmeans=True,
+ showmedians=True,
+ widths=step / 2)
- min_y -= (max_y - min_y) * 0.05
- max_y += (max_y - min_y) * 0.05
+ patches['cmeans'].set_color("blue")
+ patches['cmedians'].set_color("green")
+ if style.legend_for_eng:
+ legend_location = "center left"
+ legend_bbox_to_anchor = (1.03, 0.81)
+ plt.legend([patches['cmeans'], patches['cmedians']], ["mean", "median"],
+ loc=legend_location, bbox_to_anchor=legend_bbox_to_anchor)
+ else:
+ plt.boxplot(agg_data, 0, '', positions=positions, labels=labels, widths=step / 4)
- # plot box size adjust (only plot, not spines and legend)
- plt.boxplot(agg_data, 0, '', positions=positions, labels=labels, widths=step / 4)
plt.xlim(min(times), max(times))
- plt.ylim(min_y, max_y)
plt.xlabel("Time, seconds from test begin, sampled for ~{} seconds".format(int(step)))
plt.ylabel("Latency, ms")
plt.title(title)
@@ -411,19 +549,74 @@
@provide_plot
-def plot_heatmap(title: str, ts: TimeSeries, bins_vals: List[int], samples: int = 5,
- colors: Any = ColorProfile, style: Any = StyleProfile) -> None:
- hist_bins_count = 20
- bin_top = [100 * 2 ** i for i in range(20)]
- bin_ranges = [[0, 0]]
- cborder_it = iter(bin_top)
- cborder = next(cborder_it)
- for bin_val in bins_vals:
- if bin_val < cborder:
- bin_ranges
+def plot_heatmap(title: str,
+ ts: TimeSeries,
+ bins_vals: List[int],
+ colors: Any = ColorProfile,
+ style: Any = StyleProfile) -> None:
- # bins: [100us, 200us, ...., 104s]
- # msp origin bins ranges to heatmap bins
+ assert len(ts.data.shape) == 2
+ assert ts.data.shape[1] == len(bins_vals)
+
+ total_hist = ts.data.sum(axis=0)
+
+ # idx1, idx2 = hist_outliers_perc(total_hist, style.outliers_lat)
+ idx1, idx2 = ts_hist_outliers_perc(ts.data, bounds_perc=style.outliers_lat)
+
+ # don't cut too many bins
+ min_bins_left = style.hm_hist_bins_count
+ if idx2 - idx1 < min_bins_left:
+ missed = min_bins_left - (idx2 - idx1) // 2
+ idx2 = min(len(total_hist), idx2 + missed)
+ idx1 = max(0, idx1 - missed)
+
+ data = ts.data[:, idx1:idx2]
+ bins_vals = bins_vals[idx1:idx2]
+
+ # don't using rebin_histogram here, as we need apply same bins for many arrays
+ step = (bins_vals[-1] - bins_vals[0]) / style.hm_hist_bins_count
+ new_bins_edges = numpy.arange(style.hm_hist_bins_count) * step + bins_vals[0]
+ bin_mapping = numpy.clip(numpy.searchsorted(new_bins_edges, bins_vals) - 1, 0, len(new_bins_edges) - 1)
+
+ # map origin bins ranges to heatmap bins, iterate over rows
+ cmap = []
+ for line in data:
+ curr_bins = [0] * style.hm_hist_bins_count
+ for idx, count in zip(bin_mapping, line):
+ curr_bins[idx] += count
+ cmap.append(curr_bins)
+ ncmap = numpy.array(cmap)
+
+ xmin = 0
+ xmax = (ts.times[-1] - ts.times[0]) / 1000 + 1
+ ymin = new_bins_edges[0]
+ ymax = new_bins_edges[-1]
+
+ fig, ax = plt.subplots(figsize=style.figsize)
+
+ if style.heatmap_interpolation == '1d':
+ interpolation = 'none'
+ res = []
+ for column in ncmap:
+ new_x = numpy.linspace(0, len(column), style.heatmap_interpolation_points)
+ old_x = numpy.arange(len(column)) + 0.5
+ new_vals = numpy.interp(new_x, old_x, column)
+ res.append(new_vals)
+ ncmap = numpy.array(res)
+ else:
+ interpolation = style.heatmap_interpolation
+
+ ax.imshow(ncmap[:,::-1].T,
+ interpolation=interpolation,
+ extent=(xmin, xmax, ymin, ymax),
+ cmap=colors.imshow_colormap)
+
+ ax.set_aspect((xmax - xmin) / (ymax - ymin) * (6 / 9))
+ ax.set_ylabel("Latency, ms")
+ ax.set_xlabel("Test time, s")
+
+ plt.title(title)
+
@provide_plot
def io_chart(title: str,
@@ -441,9 +634,6 @@
# offset from center of bar to deviation/confidence range indicator
err_x_offset = 0.05
- # figure size in inches
- figsize = (12, 6)
-
# extra space on top and bottom, comparing to maximal tight layout
extra_y_space = 0.05
@@ -472,7 +662,7 @@
# gs = gridspec.GridSpec(1, 3, width_ratios=[1, 4, 1])
# p1 = plt.subplot(gs[1])
- fig, p1 = plt.subplots(figsize=figsize)
+ fig, p1 = plt.subplots(figsize=StyleProfile.figsize)
# plot IOPS/BW bars
if block_size >= LARGE_BLOCKS:
@@ -560,7 +750,9 @@
# legend box
handles2, labels2 = p2.get_legend_handles_labels()
- plt.legend(handles1 + handles2, labels1 + labels2, loc=legend_location, bbox_to_anchor=legend_bbox_to_anchor)
+ plt.legend(handles1 + handles2, labels1 + labels2,
+ loc=legend_location,
+ bbox_to_anchor=legend_bbox_to_anchor)
# adjust central box size to fit legend
plt.subplots_adjust(**plot_box_adjust)
@@ -574,11 +766,35 @@
data = None # type: str
js_links = [] # type: List[str]
css_links = [] # type: List[str]
+ order_attr = None # type: Any
+
+ def __init__(self, data: str, order_attr: Any = None) -> None:
+ self.data = data
+ self.order_attr = order_attr
+
+ def __eq__(self, o: object) -> bool:
+ return o.order_attr == self.order_attr # type: ignore
+
+ def __lt__(self, o: object) -> bool:
+ return o.order_attr > self.order_attr # type: ignore
+
+
+class Table:
+ def __init__(self, header: List[str]) -> None:
+ self.header = header
+ self.data = []
+
+ def add_line(self, values: List[str]) -> None:
+ self.data.append(values)
+
+ def html(self):
+ return html.table("", self.header, self.data)
class Menu1st:
engineering = "Engineering"
summary = "Summary"
+ per_job = "Per Job"
class Menu2ndEng:
@@ -591,56 +807,66 @@
io_lat_qd = "IO & Lat vs QD"
-menu_1st_order = [Menu1st.summary, Menu1st.engineering]
+menu_1st_order = [Menu1st.summary, Menu1st.engineering, Menu1st.per_job]
# -------------------- REPORTS --------------------------------------------------------------------------------------
class Reporter(metaclass=abc.ABCMeta):
+ suite_types = set() # type: Set[str]
+
@abc.abstractmethod
- def get_divs(self, suite: TestSuiteConfig, storage: ResultStorage) -> Iterator[Tuple[str, str, HTMLBlock]]:
+ def get_divs(self, suite: SuiteConfig, storage: ResultStorage) -> Iterator[Tuple[str, str, HTMLBlock]]:
+ pass
+
+
+class JobReporter(metaclass=abc.ABCMeta):
+ suite_type = set() # type: Set[str]
+
+ @abc.abstractmethod
+ def get_divs(self,
+ suite: SuiteConfig,
+ job: JobConfig,
+ storage: ResultStorage) -> Iterator[Tuple[str, str, HTMLBlock]]:
pass
# Main performance report
class PerformanceSummary(Reporter):
- """Creates graph, which show how IOPS and Latency depend on QD"""
+ """Aggregated summary fro storage"""
# Main performance report
class IO_QD(Reporter):
"""Creates graph, which show how IOPS and Latency depend on QD"""
- def get_divs(self, suite: TestSuiteConfig, rstorage: ResultStorage) -> Iterator[Tuple[str, str, HTMLBlock]]:
- ts_map = {} # type: Dict[FioTestSumm, List[IOSummary]]
- str_summary = {} # type: Dict[FioTestSumm, List[IOSummary]]
+ suite_types = {'fio'}
+
+ def get_divs(self, suite: SuiteConfig, rstorage: ResultStorage) -> Iterator[Tuple[str, str, HTMLBlock]]:
+ ts_map = defaultdict(list) # type: Dict[FioJobParams, List[Tuple[SuiteConfig, FioJobConfig]]]
+ str_summary = {} # type: Dict[FioJobParams, List[IOSummary]]
for job in rstorage.iter_job(suite):
fjob = cast(FioJobConfig, job)
- tpl_no_qd = fjob.characterized_tuple_no_qd()
- io_summ = make_iosum(rstorage, suite, job)
+ fjob_no_qd = cast(FioJobParams, fjob.params.copy(qd=None))
+ str_summary[fjob_no_qd] = (fjob_no_qd.summary, fjob_no_qd.long_summary)
+ ts_map[fjob_no_qd].append((suite, fjob))
- if tpl_no_qd not in ts_map:
- ts_map[tpl_no_qd] = [io_summ]
- str_summary[tpl_no_qd] = (fjob.summary_no_qd(), fjob.long_summary_no_qd())
- else:
- ts_map[tpl_no_qd].append(io_summ)
+ for tpl, suites_jobs in ts_map.items():
+ if len(suites_jobs) > StyleProfile.min_iops_vs_qd_jobs:
+ iosums = [make_iosum(rstorage, suite, job) for suite, job in suites_jobs]
+ iosums.sort(key=lambda x: x.qd)
+ summary, summary_long = str_summary[tpl]
+ ds = DataSource(suite_id=suite.storage_id,
+ job_id=summary,
+ node_id=AGG_TAG,
+ sensor="fio",
+ dev=AGG_TAG,
+ metric="io_over_qd",
+ tag="svg")
- for tpl, iosums in ts_map.items():
- iosums.sort(key=lambda x: x.qd)
- summary, summary_long = str_summary[tlp]
-
- ds = DataSource(suite_id=suite.storage_id,
- job_id="io_over_qd_".format(summary),
- node_id="__all__",
- dev='fio',
- sensor="io_over_qd",
- tag="svg")
-
- title = "IOPS, BW, Lat vs. QD.\n" + summary_long
- fpath = io_chart(rstorage, ds, title=title, legend="IOPS/BW", iosums=iosums)
- yield Menu1st.summary, Menu2ndSumm.io_lat_qd, html.img(fpath)
- if DEBUG:
- return
+ title = "IOPS, BW, Lat vs. QD.\n" + summary_long
+ fpath = io_chart(rstorage, ds, title=title, legend="IOPS/BW", iosums=iosums) # type: str
+ yield Menu1st.summary, Menu2ndSumm.io_lat_qd, HTMLBlock(html.img(fpath))
# Linearization report
@@ -648,162 +874,234 @@
"""Creates graphs, which show how IOPS and Latency depend on block size"""
+def summ_sensors(rstorage: ResultStorage,
+ nodes: List[str],
+ sensor: str,
+ metric: str,
+ time_range: Tuple[int, int]) -> Optional[numpy.array]:
+
+ res = None # type: Optional[numpy.array]
+ for node_id in nodes:
+ for _, groups in rstorage.iter_sensors(node_id=node_id, sensor=sensor, metric=metric):
+ data = get_sensor_for_time_range(rstorage,
+ node_id=node_id,
+ sensor=sensor,
+ dev=groups['dev'],
+ metric=metric,
+ time_range=time_range)
+ if res is None:
+ res = data
+ else:
+ res += data
+ return res
+
+
# IOPS/latency distribution
-class IOHist(Reporter):
+class StatInfo(JobReporter):
+ """Statistic info for job results"""
+ suite_types = {'fio'}
+
+ def get_divs(self, suite: SuiteConfig, job: JobConfig,
+ rstorage: ResultStorage) -> Iterator[Tuple[str, str, HTMLBlock]]:
+
+ fjob = cast(FioJobConfig, job)
+ io_sum = make_iosum(rstorage, suite, fjob)
+
+ summary_data = [
+ ["Summary", job.params.long_summary],
+ ]
+
+ res = html.H2(html.center("Test summary"))
+ res += html.table("Test info", None, summary_data)
+ stat_data_headers = ["Name", "Average ~ Dev", "Conf interval", "Mediana", "Mode", "Kurt / Skew", "95%", "99%"]
+
+ KB = 1024
+ bw_data = ["Bandwidth",
+ "{}Bps ~ {}Bps".format(b2ssize(io_sum.bw.average * KB), b2ssize(io_sum.bw.deviation * KB)),
+ b2ssize(io_sum.bw.confidence * KB) + "Bps",
+ b2ssize(io_sum.bw.perc_50 * KB) + "Bps",
+ "-",
+ "{:.2f} / {:.2f}".format(io_sum.bw.kurt, io_sum.bw.skew),
+ b2ssize(io_sum.bw.perc_5 * KB) + "Bps",
+ b2ssize(io_sum.bw.perc_1 * KB) + "Bps"]
+
+ iops_data = ["IOPS",
+ "{}IOPS ~ {}IOPS".format(b2ssize_10(io_sum.bw.average / fjob.bsize),
+ b2ssize_10(io_sum.bw.deviation / fjob.bsize)),
+ b2ssize_10(io_sum.bw.confidence / fjob.bsize) + "IOPS",
+ b2ssize_10(io_sum.bw.perc_50 / fjob.bsize) + "IOPS",
+ "-",
+ "{:.2f} / {:.2f}".format(io_sum.bw.kurt, io_sum.bw.skew),
+ b2ssize_10(io_sum.bw.perc_5 / fjob.bsize) + "IOPS",
+ b2ssize_10(io_sum.bw.perc_1 / fjob.bsize) + "IOPS"]
+
+ MICRO = 1000000
+ # latency
+ lat_data = ["Latency",
+ "-",
+ "-",
+ b2ssize_10(io_sum.bw.perc_50 / MICRO) + "s",
+ "-",
+ "-",
+ b2ssize_10(io_sum.bw.perc_95 / MICRO) + "s",
+ b2ssize_10(io_sum.bw.perc_99 / MICRO) + "s"]
+
+ # sensor usage
+ stat_data = [iops_data, bw_data, lat_data]
+ res += html.table("Load stats info", stat_data_headers, stat_data)
+
+ resource_headers = ["Resource", "Usage count", "Proportional to work done"]
+
+ io_transfered = io_sum.bw.data.sum() * KB
+ resource_data = [
+ ["IO made", b2ssize_10(io_transfered / KB / fjob.bsize) + "OP", "-"],
+ ["Data transfered", b2ssize(io_transfered) + "B", "-"]
+ ]
+
+
+ storage = rstorage.storage
+ nodes = storage.load_list(NodeInfo, 'all_nodes') # type: List[NodeInfo]
+
+ storage_nodes = [node.node_id for node in nodes if node.roles.intersection(STORAGE_ROLES)]
+ test_nodes = [node.node_id for node in nodes if "testnode" in node.roles]
+
+ trange = [job.reliable_info_range[0] / 1000, job.reliable_info_range[1] / 1000]
+ ops_done = io_transfered / fjob.bsize / KB
+
+ all_metrics = [
+ ("Test nodes net send", 'net-io', 'send_bytes', b2ssize, test_nodes, "B", io_transfered),
+ ("Test nodes net recv", 'net-io', 'recv_bytes', b2ssize, test_nodes, "B", io_transfered),
+
+ ("Test nodes disk write", 'block-io', 'sectors_written', b2ssize, test_nodes, "B", io_transfered),
+ ("Test nodes disk read", 'block-io', 'sectors_read', b2ssize, test_nodes, "B", io_transfered),
+ ("Test nodes writes", 'block-io', 'writes_completed', b2ssize_10, test_nodes, "OP", ops_done),
+ ("Test nodes reads", 'block-io', 'reads_completed', b2ssize_10, test_nodes, "OP", ops_done),
+
+ ("Storage nodes net send", 'net-io', 'send_bytes', b2ssize, storage_nodes, "B", io_transfered),
+ ("Storage nodes net recv", 'net-io', 'recv_bytes', b2ssize, storage_nodes, "B", io_transfered),
+
+ ("Storage nodes disk write", 'block-io', 'sectors_written', b2ssize, storage_nodes, "B", io_transfered),
+ ("Storage nodes disk read", 'block-io', 'sectors_read', b2ssize, storage_nodes, "B", io_transfered),
+ ("Storage nodes writes", 'block-io', 'writes_completed', b2ssize_10, storage_nodes, "OP", ops_done),
+ ("Storage nodes reads", 'block-io', 'reads_completed', b2ssize_10, storage_nodes, "OP", ops_done),
+ ]
+
+ all_agg = {}
+
+ for descr, sensor, metric, ffunc, nodes, units, denom in all_metrics:
+ if not nodes:
+ continue
+
+ res_arr = summ_sensors(rstorage, nodes=nodes, sensor=sensor, metric=metric, time_range=trange)
+ if res_arr is None:
+ continue
+
+ agg = res_arr.sum()
+ resource_data.append([descr, ffunc(agg) + units, "{:.1f}".format(agg / denom)])
+ all_agg[descr] = agg
+
+
+ cums = [
+ ("Test nodes writes", "Test nodes reads", "Total test ops", b2ssize_10, "OP", ops_done),
+ ("Storage nodes writes", "Storage nodes reads", "Total storage ops", b2ssize_10, "OP", ops_done),
+ ("Storage nodes disk write", "Storage nodes disk read", "Total storage IO size", b2ssize,
+ "B", io_transfered),
+ ("Test nodes disk write", "Test nodes disk read", "Total test nodes IO size", b2ssize, "B", io_transfered),
+ ]
+
+ for name1, name2, descr, ffunc, units, denom in cums:
+ if name1 in all_agg and name2 in all_agg:
+ agg = all_agg[name1] + all_agg[name2]
+ resource_data.append([descr, ffunc(agg) + units, "{:.1f}".format(agg / denom)])
+
+ res += html.table("Resources usage", resource_headers, resource_data)
+
+ yield Menu1st.per_job, job.summary, HTMLBlock(res)
+
+
+# IOPS/latency distribution
+class IOHist(JobReporter):
"""IOPS.latency distribution histogram"""
- def get_divs(self, suite: TestSuiteConfig, rstorage: ResultStorage) -> Iterator[Tuple[str, str, HTMLBlock]]:
- for job in rstorage.iter_job(suite):
- fjob = cast(FioJobConfig, job)
- agg_lat = get_aggregated(rstorage, suite, fjob, "lat")
- bins_edges = numpy.array(get_lat_vals(agg_lat.second_axis_size), dtype='float32') / 1000 # convert us to ms
- lat_stat_prop = calc_histo_stat_props(agg_lat, bins_edges, bins_count=StyleProfile.hist_boxes)
+ suite_types = {'fio'}
- title = "Latency distribution. " + fjob.long_summary
- units = "ms"
+ def get_divs(self,
+ suite: SuiteConfig,
+ job: JobConfig,
+ rstorage: ResultStorage) -> Iterator[Tuple[str, str, HTMLBlock]]:
- fpath = plot_hist(rstorage, agg_lat.source(tag='hist.svg'), title, units, lat_stat_prop)
- if DEBUG:
- yield Menu1st.summary, Menu2ndSumm.io_lat_qd, html.img(fpath)
- else:
- yield Menu1st.engineering, Menu2ndEng.hist, html.img(fpath)
+ fjob = cast(FioJobConfig, job)
- agg_io = get_aggregated(rstorage, suite, fjob, "bw")
+ yield Menu1st.per_job, fjob.summary, HTMLBlock(html.H2(html.center("Load histograms")))
- if fjob.bsize >= LARGE_BLOCKS:
- title = "BW distribution. " + fjob.long_summary
- units = "MiBps"
- agg_io.data /= MiB2KiB
- else:
- title = "IOPS distribution. " + fjob.long_summary
- agg_io.data /= fjob.bsize
- units = "IOPS"
+ agg_lat = get_aggregated(rstorage, suite, fjob, "lat")
+ bins_edges = numpy.array(get_lat_vals(agg_lat.data.shape[1]), dtype='float32') / 1000 # convert us to ms
+ lat_stat_prop = calc_histo_stat_props(agg_lat, bins_edges, bins_count=StyleProfile.hist_lat_boxes)
- io_stat_prop = calc_norm_stat_props(agg_io, bins_count=StyleProfile.hist_boxes)
- fpath = plot_hist(rstorage, agg_io.source(tag='hist.svg'), title, units, io_stat_prop)
- if DEBUG:
- yield Menu1st.summary, Menu2ndSumm.io_lat_qd, html.img(fpath)
- return
- else:
- yield Menu1st.engineering, Menu2ndEng.hist, html.img(fpath)
+ # import IPython
+ # IPython.embed()
+
+ long_summary = cast(FioJobParams, fjob.params).long_summary
+
+ title = "Latency distribution"
+ units = "ms"
+
+ fpath = plot_hist(rstorage, agg_lat.source(tag='hist.svg'), title, units, lat_stat_prop) # type: str
+ yield Menu1st.per_job, fjob.summary, HTMLBlock(html.img(fpath))
+
+ agg_io = get_aggregated(rstorage, suite, fjob, "bw")
+
+ if fjob.bsize >= LARGE_BLOCKS:
+ title = "BW distribution"
+ units = "MiBps"
+ agg_io.data //= MiB2KiB
+ else:
+ title = "IOPS distribution"
+ agg_io.data //= fjob.bsize
+ units = "IOPS"
+
+ io_stat_prop = calc_norm_stat_props(agg_io, bins_count=StyleProfile.hist_boxes)
+ fpath = plot_hist(rstorage, agg_io.source(tag='hist.svg'), title, units, io_stat_prop) # type: str
+ yield Menu1st.per_job, fjob.summary, HTMLBlock(html.img(fpath))
# IOPS/latency over test time for each job
-class IOTime(Reporter):
+class IOTime(JobReporter):
"""IOPS/latency during test"""
- def get_divs(self, suite: TestSuiteConfig, rstorage: ResultStorage) -> Iterator[Tuple[str, str, HTMLBlock]]:
- for job in rstorage.iter_job(suite):
- fjob = cast(FioJobConfig, job)
- agg_lat = get_aggregated(rstorage, suite, fjob, "lat")
- bins_edges = numpy.array(get_lat_vals(agg_lat.second_axis_size), dtype='float32') / 1000
- title = "Latency during test. " + fjob.long_summary
+ suite_types = {'fio'}
- fpath = plot_lat_over_time(rstorage, agg_lat.source(tag='ts.svg'), title, agg_lat, bins_edges)
- if DEBUG:
- yield Menu1st.summary, Menu2ndSumm.io_lat_qd, html.img(fpath)
- else:
- yield Menu1st.engineering, Menu2ndEng.lat_time, html.img(fpath)
+ def get_divs(self,
+ suite: SuiteConfig,
+ job: JobConfig,
+ rstorage: ResultStorage) -> Iterator[Tuple[str, str, HTMLBlock]]:
- fpath = plot_heatmap(rstorage, agg_lat.source(tag='hmap.svg'), title, agg_lat, bins_edges)
- if DEBUG:
- yield Menu1st.summary, Menu2ndSumm.io_lat_qd, html.img(fpath)
- else:
- yield Menu1st.engineering, Menu2ndEng.lat_time, html.img(fpath)
+ fjob = cast(FioJobConfig, job)
- agg_io = get_aggregated(rstorage, suite, fjob, "bw")
- if fjob.bsize >= LARGE_BLOCKS:
- title = "BW during test. " + fjob.long_summary
- units = "MiBps"
- agg_io.data /= MiB2KiB
- else:
- title = "IOPS during test. " + fjob.long_summary
- agg_io.data /= fjob.bsize
- units = "IOPS"
+ yield Menu1st.per_job, fjob.summary, HTMLBlock(html.H2(html.center("Load over time")))
- fpath = plot_v_over_time(rstorage, agg_io.source(tag='ts.svg'), title, units, agg_io)
+ agg_io = get_aggregated(rstorage, suite, fjob, "bw")
+ if fjob.bsize >= LARGE_BLOCKS:
+ title = "Bandwidth"
+ units = "MiBps"
+ agg_io.data //= MiB2KiB
+ else:
+ title = "IOPS"
+ agg_io.data //= fjob.bsize
+ units = "IOPS"
- if DEBUG:
- yield Menu1st.summary, Menu2ndSumm.io_lat_qd, html.img(fpath)
- return
- else:
- yield Menu1st.engineering, Menu2ndEng.iops_time, html.img(fpath)
+ fpath = plot_v_over_time(rstorage, agg_io.source(tag='ts.svg'), title, units, agg_io) # type: str
+ yield Menu1st.per_job, fjob.summary, HTMLBlock(html.img(fpath))
+ agg_lat = get_aggregated(rstorage, suite, fjob, "lat")
+ bins_edges = numpy.array(get_lat_vals(agg_lat.data.shape[1]), dtype='float32') / 1000
+ title = "Latency"
-def is_sensor_numarray(sensor: str, metric: str) -> bool:
- """Returns True if sensor provides one-dimension array of numeric values. One number per one measurement."""
- return True
+ fpath = plot_lat_over_time(rstorage, agg_lat.source(tag='ts.svg'), title, agg_lat, bins_edges) # type: str
+ yield Menu1st.per_job, fjob.summary, HTMLBlock(html.img(fpath))
+ title = "Latency heatmap"
+ fpath = plot_heatmap(rstorage, agg_lat.source(tag='hmap.png'), title, agg_lat, bins_edges) # type: str
-LEVEL_SENSORS = {("block-io", "io_queue"),
- ("system-cpu", "procs_blocked"),
- ("system-cpu", "procs_queue")}
-
-
-def is_level_sensor(sensor: str, metric: str) -> bool:
- """Returns True if sensor measure level of any kind, E.g. queue depth."""
- return (sensor, metric) in LEVEL_SENSORS
-
-
-def is_delta_sensor(sensor: str, metric: str) -> bool:
- """Returns True if sensor provides deltas for cumulative value. E.g. io completed in given period"""
- return not is_level_sensor(sensor, metric)
-
-
-
-def get_sensor(storage: Storage, node: str, sensor: str, dev: str, metric: str,
- time_range: Tuple[int, int]) -> numpy.array:
- """Return sensor values for given node for given period. Return per second estimated values array
-
- Raise an error if required range is not full covered by data in storage.
- First it finds range of results from sensor, which fully covers requested range.
- ...."""
-
- collected_at = numpy.array(storage.get_array("sensors/{}_collected_at".format(node)), dtype="int")
- data = numpy.array(storage.get_array("sensors/{}_{}.{}.{}".format(node, sensor, dev, metric)))
-
- # collected_at is array of pairs (collection_started_at, collection_finished_at)
- collection_start_at = collected_at[::2]
-
- MICRO = 1000000
-
- # convert secods to us
- begin = time_range[0] * MICRO
- end = time_range[1] * MICRO
-
- if begin < collection_start_at[0] or end > collection_start_at[-1] or end <= begin:
- raise AssertionError(("Incorrect data for get_sensor - time_range={!r}, collected_at=[{}, ..., {}]," +
- "sensor = {}_{}.{}.{}").format(time_range,
- collected_at[0] // MICRO,
- collected_at[-1] // MICRO,
- node, sensor, dev, metric))
-
- pos1, pos2 = numpy.searchsorted(collection_start_at, (begin, end))
- assert pos1 >= 1
-
- time_bounds = collection_start_at[pos1 - 1: pos2]
- edge_it = iter(time_bounds)
- val_it = iter(data[pos1 - 1: pos2])
-
- result = []
- curr_summ = 0
-
- results_cell_ends = begin + MICRO
- curr_end = next(edge_it)
-
- while results_cell_ends <= end:
- curr_start = curr_end
- curr_end = next(edge_it)
- curr_val = next(val_it)
- while curr_end >= results_cell_ends and results_cell_ends <= end:
- current_part = (results_cell_ends - curr_start) / (curr_end - curr_start) * curr_val
- result.append(curr_summ + current_part)
- curr_summ = 0
- curr_val -= current_part
- curr_start = results_cell_ends
- results_cell_ends += MICRO
- curr_summ += curr_val
-
- assert len(result) == (end - begin) // MICRO
- return result
+ yield Menu1st.per_job, fjob.summary, HTMLBlock(html.img(fpath))
class ResourceUsage:
@@ -824,50 +1122,65 @@
# Cluster load over test time
-class ClusterLoad(Reporter):
+class ClusterLoad(JobReporter):
"""IOPS/latency during test"""
+ # TODO: units should came from sensor
storage_sensors = [
- ('block-io', 'reads_completed', "Read ops"),
- ('block-io', 'writes_completed', "Write ops"),
- ('block-io', 'sectors_read', "Read kb"),
- ('block-io', 'sectors_written', "Write kb"),
+ ('block-io', 'reads_completed', "Read ops", 'iops'),
+ ('block-io', 'writes_completed', "Write ops", 'iops'),
+ ('block-io', 'sectors_read', "Read kb", 'kb'),
+ ('block-io', 'sectors_written', "Write kb", 'kb'),
]
- def get_divs(self, suite: TestSuiteConfig, rstorage: ResultStorage) -> Iterator[Tuple[str, str, HTMLBlock]]:
+ def get_divs(self,
+ suite: SuiteConfig,
+ job: JobConfig,
+ rstorage: ResultStorage) -> Iterator[Tuple[str, str, HTMLBlock]]:
# split nodes on test and other
storage = rstorage.storage
nodes = storage.load_list(NodeInfo, "all_nodes") # type: List[NodeInfo]
+ yield Menu1st.per_job, job.summary, HTMLBlock(html.H2(html.center("Cluster load")))
test_nodes = {node.node_id for node in nodes if 'testnode' in node.roles}
cluster_nodes = {node.node_id for node in nodes if 'testnode' not in node.roles}
- for job in rstorage.iter_job(suite):
- # convert ms to s
- time_range = (job.reliable_info_starts_at // MS2S, job.reliable_info_stops_at // MS2S)
- len = time_range[1] - time_range[0]
+ # convert ms to s
+ time_range = (job.reliable_info_range[0] // MS2S, job.reliable_info_range[1] // MS2S)
+ len = time_range[1] - time_range[0]
+ for sensor, metric, sensor_title, units in self.storage_sensors:
+ sum_testnode = numpy.zeros((len,))
+ sum_other = numpy.zeros((len,))
+ for path, groups in rstorage.iter_sensors(sensor=sensor, metric=metric):
+ # todo: should return sensor units
+ data = get_sensor_for_time_range(rstorage,
+ groups['node_id'],
+ sensor,
+ groups['dev'],
+ metric, time_range)
+ if groups['node_id'] in test_nodes:
+ sum_testnode += data
+ else:
+ sum_other += data
- for sensor, metric, sensor_title in self.storage_sensors:
- sum_testnode = numpy.zeros((len,))
- sum_other = numpy.zeros((len,))
+ ds = DataSource(suite_id=suite.storage_id,
+ job_id=job.storage_id,
+ node_id="test_nodes",
+ sensor=sensor,
+ dev=AGG_TAG,
+ metric=metric,
+ tag="ts.svg")
- for path, groups in iter_sensors(rstorage.storage, sensor=sensor, metric=metric):
- data = get_sensor(rstorage.storage, groups['node'], sensor, groups['dev'], metric, time_range)
- if groups['node'] in test_nodes:
- sum_testnode += data
- else:
- sum_other += data
-
- ds = DataSource(suite_id=suite.storage_id, job_id=job.summary, node_id="cluster",
- dev=sensor, sensor=metric, tag="ts.svg")
-
- # s to ms
- ts = TimeSeries(name="", times=numpy.arange(*time_range) * MS2S, data=sum_testnode, raw=None)
- fpath = plot_v_over_time(rstorage, ds, "{}.{}".format(sensor, metric), sensor_title, ts=ts)
- yield Menu1st.engineering, Menu2ndEng.iops_time, html.img(fpath)
-
- if DEBUG:
- return
+ # s to ms
+ ts = TimeSeries(name="",
+ times=numpy.arange(*time_range) * MS2S,
+ data=sum_testnode,
+ raw=None,
+ units=units,
+ time_units="us",
+ source=ds)
+ fpath = plot_v_over_time(rstorage, ds, sensor_title, sensor_title, ts=ts) # type: str
+ yield Menu1st.per_job, job.summary, HTMLBlock(html.img(fpath))
# Ceph cluster summary
@@ -897,7 +1210,12 @@
def run(self, ctx: TestRun) -> None:
rstorage = ResultStorage(ctx.storage)
- reporters = [ClusterLoad()] # IO_QD(), IOTime(), IOHist()] # type: List[Reporter]
+
+ job_reporters = [StatInfo(), IOTime(), IOHist(), ClusterLoad()] # type: List[JobReporter]
+ reporters = [IO_QD()] # type: List[Reporter]
+
+ # job_reporters = [ClusterLoad()]
+ # reporters = []
root_dir = os.path.dirname(os.path.dirname(wally.__file__))
doc_templ_path = os.path.join(root_dir, "report_templates/index.html")
@@ -909,14 +1227,30 @@
content_block = []
link_idx = 0
- matplotlib.rcParams.update({'font.size': 10})
+ # matplotlib.rcParams.update(ctx.config.reporting.matplotlib_params.raw())
+ # ColorProfile.__dict__.update(ctx.config.reporting.colors.raw())
+ # StyleProfile.__dict__.update(ctx.config.reporting.style.raw())
- items = defaultdict(lambda: defaultdict(list)) # type: Dict[str, Dict[str, list]]
+ items = defaultdict(lambda: defaultdict(list)) # type: Dict[str, Dict[str, List[HTMLBlock]]]
+
+ # TODO: filter reporters
for suite in rstorage.iter_suite(FioTest.name):
+ all_jobs = list(rstorage.iter_job(suite))
+ all_jobs.sort(key=lambda job: job.params)
+ for job in all_jobs:
+ for reporter in job_reporters:
+ for block, item, html in reporter.get_divs(suite, job, rstorage):
+ items[block][item].append(html)
+ if DEBUG:
+ break
+
for reporter in reporters:
for block, item, html in reporter.get_divs(suite, rstorage):
items[block][item].append(html)
+ if DEBUG:
+ break
+
for idx_1st, menu_1st in enumerate(sorted(items, key=lambda x: menu_1st_order.index(x))):
menu_block.append(
'<a href="#item{}" class="nav-group" data-toggle="collapse" data-parent="#MainMenu">{}</a>'
@@ -927,7 +1261,7 @@
menu_block.append(' <a href="#content{}" class="nav-group-item">{}</a>'
.format(link_idx, menu_2nd))
content_block.append('<div id="content{}">'.format(link_idx))
- content_block.extend(" " + x for x in items[menu_1st][menu_2nd])
+ content_block.extend(" " + x.data for x in items[menu_1st][menu_2nd])
content_block.append('</div>')
link_idx += 1
menu_block.append('</div>')
@@ -946,1504 +1280,3 @@
def run(self, ctx: TestRun) -> None:
# TODO(koder): load data from storage
raise NotImplementedError("...")
-
-
-# --------------------------- LEGASY --------------------------------------------------------------------------------
-
-
-# # disk_info = None
-# # base = None
-# # linearity = None
-#
-#
-# def group_by_name(test_data):
-# name_map = collections.defaultdict(lambda: [])
-#
-# for data in test_data:
-# name_map[(data.name, data.summary())].append(data)
-#
-# return name_map
-#
-#
-# def report(name, required_fields):
-# def closure(func):
-# report_funcs.append((required_fields.split(","), name, func))
-# return func
-# return closure
-#
-#
-# def get_test_lcheck_params(pinfo):
-# res = [{
-# 's': 'sync',
-# 'd': 'direct',
-# 'a': 'async',
-# 'x': 'sync direct'
-# }[pinfo.sync_mode]]
-#
-# res.append(pinfo.p.rw)
-#
-# return " ".join(res)
-#
-#
-# def get_emb_data_svg(plt):
-# sio = StringIO()
-# plt.savefig(sio, format='svg')
-# img_start = "<!-- Created with matplotlib (http://matplotlib.org/) -->"
-# return sio.getvalue().split(img_start, 1)[1]
-#
-#
-# def get_template(templ_name):
-# very_root_dir = os.path.dirname(os.path.dirname(wally.__file__))
-# templ_dir = os.path.join(very_root_dir, 'report_templates')
-# templ_file = os.path.join(templ_dir, templ_name)
-# return open(templ_file, 'r').read()
-#
-#
-# def group_by(data, func):
-# if len(data) < 2:
-# yield data
-# return
-#
-# ndata = [(func(dt), dt) for dt in data]
-# ndata.sort(key=func)
-# pkey, dt = ndata[0]
-# curr_list = [dt]
-#
-# for key, val in ndata[1:]:
-# if pkey != key:
-# yield curr_list
-# curr_list = [val]
-# else:
-# curr_list.append(val)
-# pkey = key
-#
-# yield curr_list
-#
-#
-# @report('linearity', 'linearity_test')
-# def linearity_report(processed_results, lab_info, comment):
-# labels_and_data_mp = collections.defaultdict(lambda: [])
-# vls = {}
-#
-# # plot io_time = func(bsize)
-# for res in processed_results.values():
-# if res.name.startswith('linearity_test'):
-# iotimes = [1000. / val for val in res.iops.raw]
-#
-# op_summ = get_test_summary(res.params)[:3]
-#
-# labels_and_data_mp[op_summ].append(
-# [res.p.blocksize, res.iops.raw, iotimes])
-#
-# cvls = res.params.vals.copy()
-# del cvls['blocksize']
-# del cvls['rw']
-#
-# cvls.pop('sync', None)
-# cvls.pop('direct', None)
-# cvls.pop('buffered', None)
-#
-# if op_summ not in vls:
-# vls[op_summ] = cvls
-# else:
-# assert cvls == vls[op_summ]
-#
-# all_labels = None
-# _, ax1 = plt.subplots()
-# for name, labels_and_data in labels_and_data_mp.items():
-# labels_and_data.sort(key=lambda x: ssize2b(x[0]))
-#
-# labels, _, iotimes = zip(*labels_and_data)
-#
-# if all_labels is None:
-# all_labels = labels
-# else:
-# assert all_labels == labels
-#
-# plt.boxplot(iotimes)
-# if len(labels_and_data) > 2 and \
-# ssize2b(labels_and_data[-2][0]) >= 4096:
-#
-# xt = range(1, len(labels) + 1)
-#
-# def io_time(sz, bw, initial_lat):
-# return sz / bw + initial_lat
-#
-# x = numpy.array(map(ssize2b, labels))
-# y = numpy.array([sum(dt) / len(dt) for dt in iotimes])
-# popt, _ = scipy.optimize.curve_fit(io_time, x, y, p0=(100., 1.))
-#
-# y1 = io_time(x, *popt)
-# plt.plot(xt, y1, linestyle='--',
-# label=name + ' LS linear approx')
-#
-# for idx, (sz, _, _) in enumerate(labels_and_data):
-# if ssize2b(sz) >= 4096:
-# break
-#
-# bw = (x[-1] - x[idx]) / (y[-1] - y[idx])
-# lat = y[-1] - x[-1] / bw
-# y2 = io_time(x, bw, lat)
-# plt.plot(xt, y2, linestyle='--',
-# label=abbv_name_to_full(name) +
-# ' (4k & max) linear approx')
-#
-# plt.setp(ax1, xticklabels=labels)
-#
-# plt.xlabel("Block size")
-# plt.ylabel("IO time, ms")
-#
-# plt.subplots_adjust(top=0.85)
-# plt.legend(bbox_to_anchor=(0.5, 1.15),
-# loc='upper center',
-# prop={'size': 10}, ncol=2)
-# plt.grid()
-# iotime_plot = get_emb_data_svg(plt)
-# plt.clf()
-#
-# # plot IOPS = func(bsize)
-# _, ax1 = plt.subplots()
-#
-# for name, labels_and_data in labels_and_data_mp.items():
-# labels_and_data.sort(key=lambda x: ssize2b(x[0]))
-# _, data, _ = zip(*labels_and_data)
-# plt.boxplot(data)
-# avg = [float(sum(arr)) / len(arr) for arr in data]
-# xt = range(1, len(data) + 1)
-# plt.plot(xt, avg, linestyle='--',
-# label=abbv_name_to_full(name) + " avg")
-#
-# plt.setp(ax1, xticklabels=labels)
-# plt.xlabel("Block size")
-# plt.ylabel("IOPS")
-# plt.legend(bbox_to_anchor=(0.5, 1.15),
-# loc='upper center',
-# prop={'size': 10}, ncol=2)
-# plt.grid()
-# plt.subplots_adjust(top=0.85)
-#
-# iops_plot = get_emb_data_svg(plt)
-#
-# res = set(get_test_lcheck_params(res) for res in processed_results.values())
-# ncount = list(set(res.testnodes_count for res in processed_results.values()))
-# conc = list(set(res.concurence for res in processed_results.values()))
-#
-# assert len(conc) == 1
-# assert len(ncount) == 1
-#
-# descr = {
-# 'vm_count': ncount[0],
-# 'concurence': conc[0],
-# 'oper_descr': ", ".join(res).capitalize()
-# }
-#
-# params_map = {'iotime_vs_size': iotime_plot,
-# 'iops_vs_size': iops_plot,
-# 'descr': descr}
-#
-# return get_template('report_linearity.html').format(**params_map)
-#
-#
-# @report('lat_vs_iops', 'lat_vs_iops')
-# def lat_vs_iops(processed_results, lab_info, comment):
-# lat_iops = collections.defaultdict(lambda: [])
-# requsted_vs_real = collections.defaultdict(lambda: {})
-#
-# for res in processed_results.values():
-# if res.name.startswith('lat_vs_iops'):
-# lat_iops[res.concurence].append((res.lat,
-# 0,
-# res.iops.average,
-# res.iops.deviation))
-# # lat_iops[res.concurence].append((res.lat.average / 1000.0,
-# # res.lat.deviation / 1000.0,
-# # res.iops.average,
-# # res.iops.deviation))
-# requested_iops = res.p.rate_iops * res.concurence
-# requsted_vs_real[res.concurence][requested_iops] = \
-# (res.iops.average, res.iops.deviation)
-#
-# colors = ['red', 'green', 'blue', 'orange', 'magenta', "teal"]
-# colors_it = iter(colors)
-# for conc, lat_iops in sorted(lat_iops.items()):
-# lat, dev, iops, iops_dev = zip(*lat_iops)
-# plt.errorbar(iops, lat, xerr=iops_dev, yerr=dev, fmt='ro',
-# label=str(conc) + " threads",
-# color=next(colors_it))
-#
-# plt.xlabel("IOPS")
-# plt.ylabel("Latency, ms")
-# plt.grid()
-# plt.legend(loc=0)
-# plt_iops_vs_lat = get_emb_data_svg(plt)
-# plt.clf()
-#
-# colors_it = iter(colors)
-# for conc, req_vs_real in sorted(requsted_vs_real.items()):
-# req, real = zip(*sorted(req_vs_real.items()))
-# iops, dev = zip(*real)
-# plt.errorbar(req, iops, yerr=dev, fmt='ro',
-# label=str(conc) + " threads",
-# color=next(colors_it))
-# plt.xlabel("Requested IOPS")
-# plt.ylabel("Get IOPS")
-# plt.grid()
-# plt.legend(loc=0)
-# plt_iops_vs_requested = get_emb_data_svg(plt)
-#
-# res1 = processed_results.values()[0]
-# params_map = {'iops_vs_lat': plt_iops_vs_lat,
-# 'iops_vs_requested': plt_iops_vs_requested,
-# 'oper_descr': get_test_lcheck_params(res1).capitalize()}
-#
-# return get_template('report_iops_vs_lat.html').format(**params_map)
-#
-#
-# def render_all_html(comment, info, lab_description, images, templ_name):
-# data = info.__dict__.copy()
-# for name, val in data.items():
-# if not name.startswith('__'):
-# if val is None:
-# if name in ('direct_iops_w64_max', 'direct_iops_w_max'):
-# data[name] = ('-', '-', '-')
-# else:
-# data[name] = '-'
-# elif isinstance(val, (int, float, long)):
-# data[name] = round_3_digit(val)
-#
-# data['bw_read_max'] = (data['bw_read_max'][0] // 1024,
-# data['bw_read_max'][1],
-# data['bw_read_max'][2])
-#
-# data['bw_write_max'] = (data['bw_write_max'][0] // 1024,
-# data['bw_write_max'][1],
-# data['bw_write_max'][2])
-#
-# images.update(data)
-# templ = get_template(templ_name)
-# return templ.format(lab_info=lab_description,
-# comment=comment,
-# **images)
-#
-#
-# def io_chart(title, concurence,
-# latv, latv_min, latv_max,
-# iops_or_bw, iops_or_bw_err,
-# legend,
-# log_iops=False,
-# log_lat=False,
-# boxplots=False,
-# latv_50=None,
-# latv_95=None,
-# error2=None):
-#
-# matplotlib.rcParams.update({'font.size': 10})
-# points = " MiBps" if legend == 'BW' else ""
-# lc = len(concurence)
-# width = 0.35
-# xt = range(1, lc + 1)
-#
-# op_per_vm = [v / (vm * th) for v, (vm, th) in zip(iops_or_bw, concurence)]
-# fig, p1 = plt.subplots()
-# xpos = [i - width / 2 for i in xt]
-#
-# p1.bar(xpos, iops_or_bw,
-# width=width,
-# color='y',
-# label=legend)
-#
-# err1_leg = None
-# for pos, y, err in zip(xpos, iops_or_bw, iops_or_bw_err):
-# err1_leg = p1.errorbar(pos + width / 2,
-# y,
-# err,
-# color='magenta')
-#
-# err2_leg = None
-# if error2 is not None:
-# for pos, y, err in zip(xpos, iops_or_bw, error2):
-# err2_leg = p1.errorbar(pos + width / 2 + 0.08,
-# y,
-# err,
-# lw=2,
-# alpha=0.5,
-# color='teal')
-#
-# p1.grid(True)
-# p1.plot(xt, op_per_vm, '--', label=legend + "/thread", color='black')
-# handles1, labels1 = p1.get_legend_handles_labels()
-#
-# handles1 += [err1_leg]
-# labels1 += ["95% conf"]
-#
-# if err2_leg is not None:
-# handles1 += [err2_leg]
-# labels1 += ["95% dev"]
-#
-# p2 = p1.twinx()
-#
-# if latv_50 is None:
-# p2.plot(xt, latv_max, label="lat max")
-# p2.plot(xt, latv, label="lat avg")
-# p2.plot(xt, latv_min, label="lat min")
-# else:
-# p2.plot(xt, latv_50, label="lat med")
-# p2.plot(xt, latv_95, label="lat 95%")
-#
-# plt.xlim(0.5, lc + 0.5)
-# plt.xticks(xt, ["{0} * {1}".format(vm, th) for (vm, th) in concurence])
-# p1.set_xlabel("VM Count * Thread per VM")
-# p1.set_ylabel(legend + points)
-# p2.set_ylabel("Latency ms")
-# plt.title(title)
-# handles2, labels2 = p2.get_legend_handles_labels()
-#
-# plt.legend(handles1 + handles2, labels1 + labels2,
-# loc='center left', bbox_to_anchor=(1.1, 0.81))
-#
-# if log_iops:
-# p1.set_yscale('log')
-#
-# if log_lat:
-# p2.set_yscale('log')
-#
-# plt.subplots_adjust(right=0.68)
-#
-# return get_emb_data_svg(plt)
-#
-#
-# def make_plots(processed_results, plots):
-# """
-# processed_results: [PerfInfo]
-# plots = [(test_name_prefix:str, fname:str, description:str)]
-# """
-# files = {}
-# for name_pref, fname, desc in plots:
-# chart_data = []
-#
-# for res in processed_results:
-# summ = res.name + "_" + res.summary
-# if summ.startswith(name_pref):
-# chart_data.append(res)
-#
-# if len(chart_data) == 0:
-# raise ValueError("Can't found any date for " + name_pref)
-#
-# use_bw = ssize2b(chart_data[0].p.blocksize) > 16 * 1024
-#
-# chart_data.sort(key=lambda x: x.params['vals']['numjobs'])
-#
-# lat = None
-# lat_min = None
-# lat_max = None
-#
-# lat_50 = [x.lat_50 for x in chart_data]
-# lat_95 = [x.lat_95 for x in chart_data]
-#
-# lat_diff_max = max(x.lat_95 / x.lat_50 for x in chart_data)
-# lat_log_scale = (lat_diff_max > 10)
-#
-# testnodes_count = x.testnodes_count
-# concurence = [(testnodes_count, x.concurence)
-# for x in chart_data]
-#
-# if use_bw:
-# data = [x.bw.average / 1000 for x in chart_data]
-# data_conf = [x.bw.confidence / 1000 for x in chart_data]
-# data_dev = [x.bw.deviation * 2.5 / 1000 for x in chart_data]
-# name = "BW"
-# else:
-# data = [x.iops.average for x in chart_data]
-# data_conf = [x.iops.confidence for x in chart_data]
-# data_dev = [x.iops.deviation * 2 for x in chart_data]
-# name = "IOPS"
-#
-# fc = io_chart(title=desc,
-# concurence=concurence,
-#
-# latv=lat,
-# latv_min=lat_min,
-# latv_max=lat_max,
-#
-# iops_or_bw=data,
-# iops_or_bw_err=data_conf,
-#
-# legend=name,
-# log_lat=lat_log_scale,
-#
-# latv_50=lat_50,
-# latv_95=lat_95,
-#
-# error2=data_dev)
-# files[fname] = fc
-#
-# return files
-#
-#
-# def find_max_where(processed_results, sync_mode, blocksize, rw, iops=True):
-# result = None
-# attr = 'iops' if iops else 'bw'
-# for measurement in processed_results:
-# ok = measurement.sync_mode == sync_mode
-# ok = ok and (measurement.p.blocksize == blocksize)
-# ok = ok and (measurement.p.rw == rw)
-#
-# if ok:
-# field = getattr(measurement, attr)
-#
-# if result is None:
-# result = field
-# elif field.average > result.average:
-# result = field
-#
-# return result
-#
-#
-# def get_disk_info(processed_results):
-# di = DiskInfo()
-# di.direct_iops_w_max = find_max_where(processed_results,
-# 'd', '4k', 'randwrite')
-# di.direct_iops_r_max = find_max_where(processed_results,
-# 'd', '4k', 'randread')
-#
-# di.direct_iops_w64_max = find_max_where(processed_results,
-# 'd', '64k', 'randwrite')
-#
-# for sz in ('16m', '64m'):
-# di.bw_write_max = find_max_where(processed_results,
-# 'd', sz, 'randwrite', False)
-# if di.bw_write_max is not None:
-# break
-#
-# if di.bw_write_max is None:
-# for sz in ('1m', '2m', '4m', '8m'):
-# di.bw_write_max = find_max_where(processed_results,
-# 'd', sz, 'write', False)
-# if di.bw_write_max is not None:
-# break
-#
-# for sz in ('16m', '64m'):
-# di.bw_read_max = find_max_where(processed_results,
-# 'd', sz, 'randread', False)
-# if di.bw_read_max is not None:
-# break
-#
-# if di.bw_read_max is None:
-# di.bw_read_max = find_max_where(processed_results,
-# 'd', '1m', 'read', False)
-#
-# rws4k_iops_lat_th = []
-# for res in processed_results:
-# if res.sync_mode in 'xs' and res.p.blocksize == '4k':
-# if res.p.rw != 'randwrite':
-# continue
-# rws4k_iops_lat_th.append((res.iops.average,
-# res.lat,
-# # res.lat.average,
-# res.concurence))
-#
-# rws4k_iops_lat_th.sort(key=lambda x: x[2])
-#
-# latv = [lat for _, lat, _ in rws4k_iops_lat_th]
-#
-# for tlat in [10, 30, 100]:
-# pos = bisect.bisect_left(latv, tlat)
-# if 0 == pos:
-# setattr(di, 'rws4k_{}ms'.format(tlat), 0)
-# elif pos == len(latv):
-# iops3, _, _ = rws4k_iops_lat_th[-1]
-# iops3 = int(round_3_digit(iops3))
-# setattr(di, 'rws4k_{}ms'.format(tlat), ">=" + str(iops3))
-# else:
-# lat1 = latv[pos - 1]
-# lat2 = latv[pos]
-#
-# iops1, _, th1 = rws4k_iops_lat_th[pos - 1]
-# iops2, _, th2 = rws4k_iops_lat_th[pos]
-#
-# th_lat_coef = (th2 - th1) / (lat2 - lat1)
-# th3 = th_lat_coef * (tlat - lat1) + th1
-#
-# th_iops_coef = (iops2 - iops1) / (th2 - th1)
-# iops3 = th_iops_coef * (th3 - th1) + iops1
-# iops3 = int(round_3_digit(iops3))
-# setattr(di, 'rws4k_{}ms'.format(tlat), iops3)
-#
-# hdi = DiskInfo()
-#
-# def pp(x):
-# med, conf = x.rounded_average_conf()
-# conf_perc = int(float(conf) / med * 100)
-# dev_perc = int(float(x.deviation) / med * 100)
-# return (round_3_digit(med), conf_perc, dev_perc)
-#
-# hdi.direct_iops_r_max = pp(di.direct_iops_r_max)
-#
-# if di.direct_iops_w_max is not None:
-# hdi.direct_iops_w_max = pp(di.direct_iops_w_max)
-# else:
-# hdi.direct_iops_w_max = None
-#
-# if di.direct_iops_w64_max is not None:
-# hdi.direct_iops_w64_max = pp(di.direct_iops_w64_max)
-# else:
-# hdi.direct_iops_w64_max = None
-#
-# hdi.bw_write_max = pp(di.bw_write_max)
-# hdi.bw_read_max = pp(di.bw_read_max)
-#
-# hdi.rws4k_10ms = di.rws4k_10ms if 0 != di.rws4k_10ms else None
-# hdi.rws4k_30ms = di.rws4k_30ms if 0 != di.rws4k_30ms else None
-# hdi.rws4k_100ms = di.rws4k_100ms if 0 != di.rws4k_100ms else None
-# return hdi
-#
-#
-# @report('hdd', 'hdd')
-# def make_hdd_report(processed_results, lab_info, comment):
-# plots = [
-# ('hdd_rrd4k', 'rand_read_4k', 'Random read 4k direct IOPS'),
-# ('hdd_rwx4k', 'rand_write_4k', 'Random write 4k sync IOPS')
-# ]
-# perf_infos = [res.disk_perf_info() for res in processed_results]
-# images = make_plots(perf_infos, plots)
-# di = get_disk_info(perf_infos)
-# return render_all_html(comment, di, lab_info, images, "report_hdd.html")
-#
-#
-# @report('cinder_iscsi', 'cinder_iscsi')
-# def make_cinder_iscsi_report(processed_results, lab_info, comment):
-# plots = [
-# ('cinder_iscsi_rrd4k', 'rand_read_4k', 'Random read 4k direct IOPS'),
-# ('cinder_iscsi_rwx4k', 'rand_write_4k', 'Random write 4k sync IOPS')
-# ]
-# perf_infos = [res.disk_perf_info() for res in processed_results]
-# try:
-# images = make_plots(perf_infos, plots)
-# except ValueError:
-# plots = [
-# ('cinder_iscsi_rrd4k', 'rand_read_4k', 'Random read 4k direct IOPS'),
-# ('cinder_iscsi_rws4k', 'rand_write_4k', 'Random write 4k sync IOPS')
-# ]
-# images = make_plots(perf_infos, plots)
-# di = get_disk_info(perf_infos)
-#
-# return render_all_html(comment, di, lab_info, images, "report_cinder_iscsi.html")
-#
-#
-# @report('ceph', 'ceph')
-# def make_ceph_report(processed_results, lab_info, comment):
-# plots = [
-# ('ceph_rrd4k', 'rand_read_4k', 'Random read 4k direct IOPS'),
-# ('ceph_rws4k', 'rand_write_4k', 'Random write 4k sync IOPS'),
-# ('ceph_rrd16m', 'rand_read_16m', 'Random read 16m direct MiBps'),
-# ('ceph_rwd16m', 'rand_write_16m',
-# 'Random write 16m direct MiBps'),
-# ]
-#
-# perf_infos = [res.disk_perf_info() for res in processed_results]
-# images = make_plots(perf_infos, plots)
-# di = get_disk_info(perf_infos)
-# return render_all_html(comment, di, lab_info, images, "report_ceph.html")
-#
-#
-# @report('mixed', 'mixed')
-# def make_mixed_report(processed_results, lab_info, comment):
-# #
-# # IOPS(X% read) = 100 / ( X / IOPS_W + (100 - X) / IOPS_R )
-# #
-#
-# perf_infos = [res.disk_perf_info() for res in processed_results]
-# mixed = collections.defaultdict(lambda: [])
-#
-# is_ssd = False
-# for res in perf_infos:
-# if res.name.startswith('mixed'):
-# if res.name.startswith('mixed-ssd'):
-# is_ssd = True
-# mixed[res.concurence].append((res.p.rwmixread,
-# res.lat,
-# 0,
-# # res.lat.average / 1000.0,
-# # res.lat.deviation / 1000.0,
-# res.iops.average,
-# res.iops.deviation))
-#
-# if len(mixed) == 0:
-# raise ValueError("No mixed load found")
-#
-# fig, p1 = plt.subplots()
-# p2 = p1.twinx()
-#
-# colors = ['red', 'green', 'blue', 'orange', 'magenta', "teal"]
-# colors_it = iter(colors)
-# for conc, mix_lat_iops in sorted(mixed.items()):
-# mix_lat_iops = sorted(mix_lat_iops)
-# read_perc, lat, dev, iops, iops_dev = zip(*mix_lat_iops)
-# p1.errorbar(read_perc, iops, color=next(colors_it),
-# yerr=iops_dev, label=str(conc) + " th")
-#
-# p2.errorbar(read_perc, lat, color=next(colors_it),
-# ls='--', yerr=dev, label=str(conc) + " th lat")
-#
-# if is_ssd:
-# p1.set_yscale('log')
-# p2.set_yscale('log')
-#
-# p1.set_xlim(-5, 105)
-#
-# read_perc = set(read_perc)
-# read_perc.add(0)
-# read_perc.add(100)
-# read_perc = sorted(read_perc)
-#
-# plt.xticks(read_perc, map(str, read_perc))
-#
-# p1.grid(True)
-# p1.set_xlabel("% of reads")
-# p1.set_ylabel("Mixed IOPS")
-# p2.set_ylabel("Latency, ms")
-#
-# handles1, labels1 = p1.get_legend_handles_labels()
-# handles2, labels2 = p2.get_legend_handles_labels()
-# plt.subplots_adjust(top=0.85)
-# plt.legend(handles1 + handles2, labels1 + labels2,
-# bbox_to_anchor=(0.5, 1.15),
-# loc='upper center',
-# prop={'size': 12}, ncol=3)
-# plt.show()
-#
-#
-# def make_load_report(idx, results_dir, fname):
-# dpath = os.path.join(results_dir, "io_" + str(idx))
-# files = sorted(os.listdir(dpath))
-# gf = lambda x: "_".join(x.rsplit(".", 1)[0].split('_')[:3])
-#
-# for key, group in itertools.groupby(files, gf):
-# fname = os.path.join(dpath, key + ".fio")
-#
-# cfgs = list(parse_all_in_1(open(fname).read(), fname))
-#
-# fname = os.path.join(dpath, key + "_lat.log")
-#
-# curr = []
-# arrays = []
-#
-# with open(fname) as fd:
-# for offset, lat, _, _ in csv.reader(fd):
-# offset = int(offset)
-# lat = int(lat)
-# if len(curr) > 0 and curr[-1][0] > offset:
-# arrays.append(curr)
-# curr = []
-# curr.append((offset, lat))
-# arrays.append(curr)
-# conc = int(cfgs[0].vals.get('numjobs', 1))
-#
-# if conc != 5:
-# continue
-#
-# assert len(arrays) == len(cfgs) * conc
-#
-# garrays = [[(0, 0)] for _ in range(conc)]
-#
-# for offset in range(len(cfgs)):
-# for acc, new_arr in zip(garrays, arrays[offset * conc:(offset + 1) * conc]):
-# last = acc[-1][0]
-# for off, lat in new_arr:
-# acc.append((off / 1000. + last, lat / 1000.))
-#
-# for cfg, arr in zip(cfgs, garrays):
-# plt.plot(*zip(*arr[1:]))
-# plt.show()
-# exit(1)
-#
-#
-# def make_io_report(dinfo, comment, path, lab_info=None):
-# lab_info = {
-# "total_disk": "None",
-# "total_memory": "None",
-# "nodes_count": "None",
-# "processor_count": "None"
-# }
-#
-# try:
-# res_fields = sorted(v.name for v in dinfo)
-#
-# found = False
-# for fields, name, func in report_funcs:
-# for field in fields:
-# pos = bisect.bisect_left(res_fields, field)
-#
-# if pos == len(res_fields):
-# break
-#
-# if not res_fields[pos].startswith(field):
-# break
-# else:
-# found = True
-# hpath = path.format(name)
-#
-# try:
-# report = func(dinfo, lab_info, comment)
-# except:
-# logger.exception("Diring {0} report generation".format(name))
-# continue
-#
-# if report is not None:
-# try:
-# with open(hpath, "w") as fd:
-# fd.write(report)
-# except:
-# logger.exception("Diring saving {0} report".format(name))
-# continue
-# logger.info("Report {0} saved into {1}".format(name, hpath))
-# else:
-# logger.warning("No report produced by {0!r}".format(name))
-#
-# if not found:
-# logger.warning("No report generator found for this load")
-#
-# except Exception as exc:
-# import traceback
-# traceback.print_exc()
-# logger.error("Failed to generate html report:" + str(exc))
-#
-#
-# # @classmethod
-# # def prepare_data(cls, results) -> List[Dict[str, Any]]:
-# # """create a table with io performance report for console"""
-# #
-# # def key_func(data: FioRunResult) -> Tuple[str, str, str, str, int]:
-# # tpl = data.summary_tpl()
-# # return (data.name,
-# # tpl.oper,
-# # tpl.mode,
-# # ssize2b(tpl.bsize),
-# # int(tpl.th_count) * int(tpl.vm_count))
-# # res = []
-# #
-# # for item in sorted(results, key=key_func):
-# # test_dinfo = item.disk_perf_info()
-# # testnodes_count = len(item.config.nodes)
-# #
-# # iops, _ = test_dinfo.iops.rounded_average_conf()
-# #
-# # if test_dinfo.iops_sys is not None:
-# # iops_sys, iops_sys_conf = test_dinfo.iops_sys.rounded_average_conf()
-# # _, iops_sys_dev = test_dinfo.iops_sys.rounded_average_dev()
-# # iops_sys_per_vm = round_3_digit(iops_sys / testnodes_count)
-# # iops_sys = round_3_digit(iops_sys)
-# # else:
-# # iops_sys = None
-# # iops_sys_per_vm = None
-# # iops_sys_dev = None
-# # iops_sys_conf = None
-# #
-# # bw, bw_conf = test_dinfo.bw.rounded_average_conf()
-# # _, bw_dev = test_dinfo.bw.rounded_average_dev()
-# # conf_perc = int(round(bw_conf * 100 / bw))
-# # dev_perc = int(round(bw_dev * 100 / bw))
-# #
-# # lat_50 = round_3_digit(int(test_dinfo.lat_50))
-# # lat_95 = round_3_digit(int(test_dinfo.lat_95))
-# # lat_avg = round_3_digit(int(test_dinfo.lat_avg))
-# #
-# # iops_per_vm = round_3_digit(iops / testnodes_count)
-# # bw_per_vm = round_3_digit(bw / testnodes_count)
-# #
-# # iops = round_3_digit(iops)
-# # bw = round_3_digit(bw)
-# #
-# # summ = "{0.oper}{0.mode} {0.bsize:>4} {0.th_count:>3}th {0.vm_count:>2}vm".format(item.summary_tpl())
-# #
-# # res.append({"name": key_func(item)[0],
-# # "key": key_func(item)[:4],
-# # "summ": summ,
-# # "iops": int(iops),
-# # "bw": int(bw),
-# # "conf": str(conf_perc),
-# # "dev": str(dev_perc),
-# # "iops_per_vm": int(iops_per_vm),
-# # "bw_per_vm": int(bw_per_vm),
-# # "lat_50": lat_50,
-# # "lat_95": lat_95,
-# # "lat_avg": lat_avg,
-# #
-# # "iops_sys": iops_sys,
-# # "iops_sys_per_vm": iops_sys_per_vm,
-# # "sys_conf": iops_sys_conf,
-# # "sys_dev": iops_sys_dev})
-# #
-# # return res
-# #
-# # Field = collections.namedtuple("Field", ("header", "attr", "allign", "size"))
-# # fiels_and_header = [
-# # Field("Name", "name", "l", 7),
-# # Field("Description", "summ", "l", 19),
-# # Field("IOPS\ncum", "iops", "r", 3),
-# # # Field("IOPS_sys\ncum", "iops_sys", "r", 3),
-# # Field("KiBps\ncum", "bw", "r", 6),
-# # Field("Cnf %\n95%", "conf", "r", 3),
-# # Field("Dev%", "dev", "r", 3),
-# # Field("iops\n/vm", "iops_per_vm", "r", 3),
-# # Field("KiBps\n/vm", "bw_per_vm", "r", 6),
-# # Field("lat ms\nmedian", "lat_50", "r", 3),
-# # Field("lat ms\n95%", "lat_95", "r", 3),
-# # Field("lat\navg", "lat_avg", "r", 3),
-# # ]
-# #
-# # fiels_and_header_dct = dict((item.attr, item) for item in fiels_and_header)
-# #
-# # @classmethod
-# # def format_for_console(cls, results) -> str:
-# # """create a table with io performance report for console"""
-# #
-# # tab = texttable.Texttable(max_width=120)
-# # tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
-# # tab.set_cols_align([f.allign for f in cls.fiels_and_header])
-# # sep = ["-" * f.size for f in cls.fiels_and_header]
-# # tab.header([f.header for f in cls.fiels_and_header])
-# # prev_k = None
-# # for item in cls.prepare_data(results):
-# # if prev_k is not None:
-# # if prev_k != item["key"]:
-# # tab.add_row(sep)
-# #
-# # prev_k = item["key"]
-# # tab.add_row([item[f.attr] for f in cls.fiels_and_header])
-# #
-# # return tab.draw()
-# #
-# # @classmethod
-# # def format_diff_for_console(cls, list_of_results: List[Any]) -> str:
-# # """create a table with io performance report for console"""
-# #
-# # tab = texttable.Texttable(max_width=200)
-# # tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
-# #
-# # header = [
-# # cls.fiels_and_header_dct["name"].header,
-# # cls.fiels_and_header_dct["summ"].header,
-# # ]
-# # allign = ["l", "l"]
-# #
-# # header.append("IOPS ~ Cnf% ~ Dev%")
-# # allign.extend(["r"] * len(list_of_results))
-# # header.extend(
-# # "IOPS_{0} %".format(i + 2) for i in range(len(list_of_results[1:]))
-# # )
-# #
-# # header.append("BW")
-# # allign.extend(["r"] * len(list_of_results))
-# # header.extend(
-# # "BW_{0} %".format(i + 2) for i in range(len(list_of_results[1:]))
-# # )
-# #
-# # header.append("LAT")
-# # allign.extend(["r"] * len(list_of_results))
-# # header.extend(
-# # "LAT_{0}".format(i + 2) for i in range(len(list_of_results[1:]))
-# # )
-# #
-# # tab.header(header)
-# # sep = ["-" * 3] * len(header)
-# # processed_results = map(cls.prepare_data, list_of_results)
-# #
-# # key2results = []
-# # for res in processed_results:
-# # key2results.append(dict(
-# # ((item["name"], item["summ"]), item) for item in res
-# # ))
-# #
-# # prev_k = None
-# # iops_frmt = "{0[iops]} ~ {0[conf]:>2} ~ {0[dev]:>2}"
-# # for item in processed_results[0]:
-# # if prev_k is not None:
-# # if prev_k != item["key"]:
-# # tab.add_row(sep)
-# #
-# # prev_k = item["key"]
-# #
-# # key = (item['name'], item['summ'])
-# # line = list(key)
-# # base = key2results[0][key]
-# #
-# # line.append(iops_frmt.format(base))
-# #
-# # for test_results in key2results[1:]:
-# # val = test_results.get(key)
-# # if val is None:
-# # line.append("-")
-# # elif base['iops'] == 0:
-# # line.append("Nan")
-# # else:
-# # prc_val = {'dev': val['dev'], 'conf': val['conf']}
-# # prc_val['iops'] = int(100 * val['iops'] / base['iops'])
-# # line.append(iops_frmt.format(prc_val))
-# #
-# # line.append(base['bw'])
-# #
-# # for test_results in key2results[1:]:
-# # val = test_results.get(key)
-# # if val is None:
-# # line.append("-")
-# # elif base['bw'] == 0:
-# # line.append("Nan")
-# # else:
-# # line.append(int(100 * val['bw'] / base['bw']))
-# #
-# # for test_results in key2results:
-# # val = test_results.get(key)
-# # if val is None:
-# # line.append("-")
-# # else:
-# # line.append("{0[lat_50]} - {0[lat_95]}".format(val))
-# #
-# # tab.add_row(line)
-# #
-# # tab.set_cols_align(allign)
-# # return tab.draw()
-#
-#
-# # READ_IOPS_DISCSTAT_POS = 3
-# # WRITE_IOPS_DISCSTAT_POS = 7
-# #
-# #
-# # def load_sys_log_file(ftype: str, fname: str) -> TimeSeriesValue:
-# # assert ftype == 'iops'
-# # pval = None
-# # with open(fname) as fd:
-# # iops = []
-# # for ln in fd:
-# # params = ln.split()
-# # cval = int(params[WRITE_IOPS_DISCSTAT_POS]) + \
-# # int(params[READ_IOPS_DISCSTAT_POS])
-# # if pval is not None:
-# # iops.append(cval - pval)
-# # pval = cval
-# #
-# # vals = [(idx * 1000, val) for idx, val in enumerate(iops)]
-# # return TimeSeriesValue(vals)
-# #
-# #
-# # def load_test_results(folder: str, run_num: int) -> 'FioRunResult':
-# # res = {}
-# # params = None
-# #
-# # fn = os.path.join(folder, str(run_num) + '_params.yaml')
-# # params = yaml.load(open(fn).read())
-# #
-# # conn_ids_set = set()
-# # rr = r"{}_(?P<conn_id>.*?)_(?P<type>[^_.]*)\.\d+\.log$".format(run_num)
-# # for fname in os.listdir(folder):
-# # rm = re.match(rr, fname)
-# # if rm is None:
-# # continue
-# #
-# # conn_id_s = rm.group('conn_id')
-# # conn_id = conn_id_s.replace('_', ':')
-# # ftype = rm.group('type')
-# #
-# # if ftype not in ('iops', 'bw', 'lat'):
-# # continue
-# #
-# # ts = load_fio_log_file(os.path.join(folder, fname))
-# # res.setdefault(ftype, {}).setdefault(conn_id, []).append(ts)
-# #
-# # conn_ids_set.add(conn_id)
-# #
-# # rr = r"{}_(?P<conn_id>.*?)_(?P<type>[^_.]*)\.sys\.log$".format(run_num)
-# # for fname in os.listdir(folder):
-# # rm = re.match(rr, fname)
-# # if rm is None:
-# # continue
-# #
-# # conn_id_s = rm.group('conn_id')
-# # conn_id = conn_id_s.replace('_', ':')
-# # ftype = rm.group('type')
-# #
-# # if ftype not in ('iops', 'bw', 'lat'):
-# # continue
-# #
-# # ts = load_sys_log_file(ftype, os.path.join(folder, fname))
-# # res.setdefault(ftype + ":sys", {}).setdefault(conn_id, []).append(ts)
-# #
-# # conn_ids_set.add(conn_id)
-# #
-# # mm_res = {}
-# #
-# # if len(res) == 0:
-# # raise ValueError("No data was found")
-# #
-# # for key, data in res.items():
-# # conn_ids = sorted(conn_ids_set)
-# # awail_ids = [conn_id for conn_id in conn_ids if conn_id in data]
-# # matr = [data[conn_id] for conn_id in awail_ids]
-# # mm_res[key] = MeasurementMatrix(matr, awail_ids)
-# #
-# # raw_res = {}
-# # for conn_id in conn_ids:
-# # fn = os.path.join(folder, "{0}_{1}_rawres.json".format(run_num, conn_id_s))
-# #
-# # # remove message hack
-# # fc = "{" + open(fn).read().split('{', 1)[1]
-# # raw_res[conn_id] = json.loads(fc)
-# #
-# # fio_task = FioJobSection(params['name'])
-# # fio_task.vals.update(params['vals'])
-# #
-# # config = TestConfig('io', params, None, params['nodes'], folder, None)
-# # return FioRunResult(config, fio_task, mm_res, raw_res, params['intervals'], run_num)
-# #
-#
-# # class DiskPerfInfo:
-# # def __init__(self, name: str, summary: str, params: Dict[str, Any], testnodes_count: int) -> None:
-# # self.name = name
-# # self.bw = None
-# # self.iops = None
-# # self.lat = None
-# # self.lat_50 = None
-# # self.lat_95 = None
-# # self.lat_avg = None
-# #
-# # self.raw_bw = []
-# # self.raw_iops = []
-# # self.raw_lat = []
-# #
-# # self.params = params
-# # self.testnodes_count = testnodes_count
-# # self.summary = summary
-# #
-# # self.sync_mode = get_test_sync_mode(self.params['vals'])
-# # self.concurence = self.params['vals'].get('numjobs', 1)
-# #
-# #
-# # class IOTestResults:
-# # def __init__(self, suite_name: str, fio_results: 'FioRunResult', log_directory: str):
-# # self.suite_name = suite_name
-# # self.fio_results = fio_results
-# # self.log_directory = log_directory
-# #
-# # def __iter__(self):
-# # return iter(self.fio_results)
-# #
-# # def __len__(self):
-# # return len(self.fio_results)
-# #
-# # def get_yamable(self) -> Dict[str, List[str]]:
-# # items = [(fio_res.summary(), fio_res.idx) for fio_res in self]
-# # return {self.suite_name: [self.log_directory] + items}
-#
-#
-# # class FioRunResult(TestResults):
-# # """
-# # Fio run results
-# # config: TestConfig
-# # fio_task: FioJobSection
-# # ts_results: {str: MeasurementMatrix[TimeSeriesValue]}
-# # raw_result: ????
-# # run_interval:(float, float) - test tun time, used for sensors
-# # """
-# # def __init__(self, config, fio_task, ts_results, raw_result, run_interval, idx):
-# #
-# # self.name = fio_task.name.rsplit("_", 1)[0]
-# # self.fio_task = fio_task
-# # self.idx = idx
-# #
-# # self.bw = ts_results['bw']
-# # self.lat = ts_results['lat']
-# # self.iops = ts_results['iops']
-# #
-# # if 'iops:sys' in ts_results:
-# # self.iops_sys = ts_results['iops:sys']
-# # else:
-# # self.iops_sys = None
-# #
-# # res = {"bw": self.bw,
-# # "lat": self.lat,
-# # "iops": self.iops,
-# # "iops:sys": self.iops_sys}
-# #
-# # self.sensors_data = None
-# # self._pinfo = None
-# # TestResults.__init__(self, config, res, raw_result, run_interval)
-# #
-# # def get_params_from_fio_report(self):
-# # nodes = self.bw.connections_ids
-# #
-# # iops = [self.raw_result[node]['jobs'][0]['mixed']['iops'] for node in nodes]
-# # total_ios = [self.raw_result[node]['jobs'][0]['mixed']['total_ios'] for node in nodes]
-# # runtime = [self.raw_result[node]['jobs'][0]['mixed']['runtime'] / 1000 for node in nodes]
-# # flt_iops = [float(ios) / rtime for ios, rtime in zip(total_ios, runtime)]
-# #
-# # bw = [self.raw_result[node]['jobs'][0]['mixed']['bw'] for node in nodes]
-# # total_bytes = [self.raw_result[node]['jobs'][0]['mixed']['io_bytes'] for node in nodes]
-# # flt_bw = [float(tbytes) / rtime for tbytes, rtime in zip(total_bytes, runtime)]
-# #
-# # return {'iops': iops,
-# # 'flt_iops': flt_iops,
-# # 'bw': bw,
-# # 'flt_bw': flt_bw}
-# #
-# # def summary(self):
-# # return get_test_summary(self.fio_task, len(self.config.nodes))
-# #
-# # def summary_tpl(self):
-# # return get_test_summary_tuple(self.fio_task, len(self.config.nodes))
-# #
-# # def get_lat_perc_50_95_multy(self):
-# # lat_mks = collections.defaultdict(lambda: 0)
-# # num_res = 0
-# #
-# # for result in self.raw_result.values():
-# # num_res += len(result['jobs'])
-# # for job_info in result['jobs']:
-# # for k, v in job_info['latency_ms'].items():
-# # if isinstance(k, basestring) and k.startswith('>='):
-# # lat_mks[int(k[2:]) * 1000] += v
-# # else:
-# # lat_mks[int(k) * 1000] += v
-# #
-# # for k, v in job_info['latency_us'].items():
-# # lat_mks[int(k)] += v
-# #
-# # for k, v in lat_mks.items():
-# # lat_mks[k] = float(v) / num_res
-# # return get_lat_perc_50_95(lat_mks)
-# #
-# # def disk_perf_info(self, avg_interval=2.0):
-# #
-# # if self._pinfo is not None:
-# # return self._pinfo
-# #
-# # testnodes_count = len(self.config.nodes)
-# #
-# # pinfo = DiskPerfInfo(self.name,
-# # self.summary(),
-# # self.params,
-# # testnodes_count)
-# #
-# # def prepare(data, drop=1):
-# # if data is None:
-# # return data
-# #
-# # res = []
-# # for ts_data in data:
-# # if ts_data.average_interval() < avg_interval:
-# # ts_data = ts_data.derived(avg_interval)
-# #
-# # # drop last value on bounds
-# # # as they may contains ranges without activities
-# # assert len(ts_data.values) >= drop + 1, str(drop) + " " + str(ts_data.values)
-# #
-# # if drop > 0:
-# # res.append(ts_data.values[:-drop])
-# # else:
-# # res.append(ts_data.values)
-# #
-# # return res
-# #
-# # def agg_data(matr):
-# # arr = sum(matr, [])
-# # min_len = min(map(len, arr))
-# # res = []
-# # for idx in range(min_len):
-# # res.append(sum(dt[idx] for dt in arr))
-# # return res
-# #
-# # pinfo.raw_lat = map(prepare, self.lat.per_vm())
-# # num_th = sum(map(len, pinfo.raw_lat))
-# # lat_avg = [val / num_th for val in agg_data(pinfo.raw_lat)]
-# # pinfo.lat_avg = data_property(lat_avg).average / 1000 # us to ms
-# #
-# # pinfo.lat_50, pinfo.lat_95 = self.get_lat_perc_50_95_multy()
-# # pinfo.lat = pinfo.lat_50
-# #
-# # pinfo.raw_bw = map(prepare, self.bw.per_vm())
-# # pinfo.raw_iops = map(prepare, self.iops.per_vm())
-# #
-# # if self.iops_sys is not None:
-# # pinfo.raw_iops_sys = map(prepare, self.iops_sys.per_vm())
-# # pinfo.iops_sys = data_property(agg_data(pinfo.raw_iops_sys))
-# # else:
-# # pinfo.raw_iops_sys = None
-# # pinfo.iops_sys = None
-# #
-# # fparams = self.get_params_from_fio_report()
-# # fio_report_bw = sum(fparams['flt_bw'])
-# # fio_report_iops = sum(fparams['flt_iops'])
-# #
-# # agg_bw = agg_data(pinfo.raw_bw)
-# # agg_iops = agg_data(pinfo.raw_iops)
-# #
-# # log_bw_avg = average(agg_bw)
-# # log_iops_avg = average(agg_iops)
-# #
-# # # update values to match average from fio report
-# # coef_iops = fio_report_iops / float(log_iops_avg)
-# # coef_bw = fio_report_bw / float(log_bw_avg)
-# #
-# # bw_log = data_property([val * coef_bw for val in agg_bw])
-# # iops_log = data_property([val * coef_iops for val in agg_iops])
-# #
-# # bw_report = data_property([fio_report_bw])
-# # iops_report = data_property([fio_report_iops])
-# #
-# # # When IOPS/BW per thread is too low
-# # # data from logs is rounded to match
-# # iops_per_th = sum(sum(pinfo.raw_iops, []), [])
-# # if average(iops_per_th) > 10:
-# # pinfo.iops = iops_log
-# # pinfo.iops2 = iops_report
-# # else:
-# # pinfo.iops = iops_report
-# # pinfo.iops2 = iops_log
-# #
-# # bw_per_th = sum(sum(pinfo.raw_bw, []), [])
-# # if average(bw_per_th) > 10:
-# # pinfo.bw = bw_log
-# # pinfo.bw2 = bw_report
-# # else:
-# # pinfo.bw = bw_report
-# # pinfo.bw2 = bw_log
-# #
-# # self._pinfo = pinfo
-# #
-# # return pinfo
-#
-# # class TestResult:
-# # """Hold all information for a given test - test info,
-# # sensors data and performance results for test period from all nodes"""
-# # run_id = None # type: int
-# # test_info = None # type: Any
-# # begin_time = None # type: int
-# # end_time = None # type: int
-# # sensors = None # Dict[Tuple[str, str, str], TimeSeries]
-# # performance = None # Dict[Tuple[str, str], TimeSeries]
-# #
-# # class TestResults:
-# # """
-# # this class describe test results
-# #
-# # config:TestConfig - test config object
-# # params:dict - parameters from yaml file for this test
-# # results:{str:MeasurementMesh} - test results object
-# # raw_result:Any - opaque object to store raw results
-# # run_interval:(float, float) - test tun time, used for sensors
-# # """
-# #
-# # def __init__(self,
-# # config: TestConfig,
-# # results: Dict[str, Any],
-# # raw_result: Any,
-# # run_interval: Tuple[float, float]) -> None:
-# # self.config = config
-# # self.params = config.params
-# # self.results = results
-# # self.raw_result = raw_result
-# # self.run_interval = run_interval
-# #
-# # def __str__(self) -> str:
-# # res = "{0}({1}):\n results:\n".format(
-# # self.__class__.__name__,
-# # self.summary())
-# #
-# # for name, val in self.results.items():
-# # res += " {0}={1}\n".format(name, val)
-# #
-# # res += " params:\n"
-# #
-# # for name, val in self.params.items():
-# # res += " {0}={1}\n".format(name, val)
-# #
-# # return res
-# #
-# # def summary(self) -> str:
-# # raise NotImplementedError()
-# # return ""
-# #
-# # def get_yamable(self) -> Any:
-# # raise NotImplementedError()
-# # return None
-#
-#
-#
-# # class MeasurementMatrix:
-# # """
-# # data:[[MeasurementResult]] - VM_COUNT x TH_COUNT matrix of MeasurementResult
-# # """
-# # def __init__(self, data, connections_ids):
-# # self.data = data
-# # self.connections_ids = connections_ids
-# #
-# # def per_vm(self):
-# # return self.data
-# #
-# # def per_th(self):
-# # return sum(self.data, [])
-#
-#
-# # class MeasurementResults:
-# # data = None # type: List[Any]
-# #
-# # def stat(self) -> StatProps:
-# # return data_property(self.data)
-# #
-# # def __str__(self) -> str:
-# # return 'TS([' + ", ".join(map(str, self.data)) + '])'
-# #
-# #
-# # class SimpleVals(MeasurementResults):
-# # """
-# # data:[float] - list of values
-# # """
-# # def __init__(self, data: List[float]) -> None:
-# # self.data = data
-# #
-# #
-# # class TimeSeriesValue(MeasurementResults):
-# # """
-# # data:[(float, float, float)] - list of (start_time, lenght, average_value_for_interval)
-# # odata: original values
-# # """
-# # def __init__(self, data: List[Tuple[float, float]]) -> None:
-# # assert len(data) > 0
-# # self.odata = data[:]
-# # self.data = [] # type: List[Tuple[float, float, float]]
-# #
-# # cstart = 0.0
-# # for nstart, nval in data:
-# # self.data.append((cstart, nstart - cstart, nval))
-# # cstart = nstart
-# #
-# # @property
-# # def values(self) -> List[float]:
-# # return [val[2] for val in self.data]
-# #
-# # def average_interval(self) -> float:
-# # return float(sum([val[1] for val in self.data])) / len(self.data)
-# #
-# # def skip(self, seconds) -> 'TimeSeriesValue':
-# # nres = []
-# # for start, ln, val in self.data:
-# # nstart = start + ln - seconds
-# # if nstart > 0:
-# # nres.append([nstart, val])
-# # return self.__class__(nres)
-# #
-# # def derived(self, tdelta) -> 'TimeSeriesValue':
-# # end = self.data[-1][0] + self.data[-1][1]
-# # tdelta = float(tdelta)
-# #
-# # ln = end / tdelta
-# #
-# # if ln - int(ln) > 0:
-# # ln += 1
-# #
-# # res = [[tdelta * i, 0.0] for i in range(int(ln))]
-# #
-# # for start, lenght, val in self.data:
-# # start_idx = int(start / tdelta)
-# # end_idx = int((start + lenght) / tdelta)
-# #
-# # for idx in range(start_idx, end_idx + 1):
-# # rstart = tdelta * idx
-# # rend = tdelta * (idx + 1)
-# #
-# # intersection_ln = min(rend, start + lenght) - max(start, rstart)
-# # if intersection_ln > 0:
-# # try:
-# # res[idx][1] += val * intersection_ln / tdelta
-# # except IndexError:
-# # raise
-# #
-# # return self.__class__(res)
-#
-#
-# def console_report_stage(ctx: TestRun) -> None:
-# # TODO(koder): load data from storage
-# raise NotImplementedError("...")
-# # first_report = True
-# # text_rep_fname = ctx.config.text_report_file
-# #
-# # with open(text_rep_fname, "w") as fd:
-# # for tp, data in ctx.results.items():
-# # if 'io' == tp and data is not None:
-# # rep_lst = []
-# # for result in data:
-# # rep_lst.append(
-# # IOPerfTest.format_for_console(list(result)))
-# # rep = "\n\n".join(rep_lst)
-# # elif tp in ['mysql', 'pgbench'] and data is not None:
-# # rep = MysqlTest.format_for_console(data)
-# # elif tp == 'omg':
-# # rep = OmgTest.format_for_console(data)
-# # else:
-# # logger.warning("Can't generate text report for " + tp)
-# # continue
-# #
-# # fd.write(rep)
-# # fd.write("\n")
-# #
-# # if first_report:
-# # logger.info("Text report were stored in " + text_rep_fname)
-# # first_report = False
-# #
-# # print("\n" + rep + "\n")
-#
-#
-# # def test_load_report_stage(cfg: Config, ctx: TestRun) -> None:
-# # load_rep_fname = cfg.load_report_file
-# # found = False
-# # for idx, (tp, data) in enumerate(ctx.results.items()):
-# # if 'io' == tp and data is not None:
-# # if found:
-# # logger.error("Making reports for more than one " +
-# # "io block isn't supported! All " +
-# # "report, except first are skipped")
-# # continue
-# # found = True
-# # report.make_load_report(idx, cfg['results'], load_rep_fname)
-# #
-# #
-#
-# # def html_report_stage(ctx: TestRun) -> None:
-# # TODO(koder): load data from storage
-# # raise NotImplementedError("...")
-# # html_rep_fname = cfg.html_report_file
-# # found = False
-# # for tp, data in ctx.results.items():
-# # if 'io' == tp and data is not None:
-# # if found or len(data) > 1:
-# # logger.error("Making reports for more than one " +
-# # "io block isn't supported! All " +
-# # "report, except first are skipped")
-# # continue
-# # found = True
-# # report.make_io_report(list(data[0]),
-# # cfg.get('comment', ''),
-# # html_rep_fname,
-# # lab_info=ctx.nodes)
-#
-# #
-# # def load_data_from_path(test_res_dir: str) -> Mapping[str, List[Any]]:
-# # files = get_test_files(test_res_dir)
-# # raw_res = yaml_load(open(files['raw_results']).read())
-# # res = collections.defaultdict(list)
-# #
-# # for tp, test_lists in raw_res:
-# # for tests in test_lists:
-# # for suite_name, suite_data in tests.items():
-# # result_folder = suite_data[0]
-# # res[tp].append(TOOL_TYPE_MAPPER[tp].load(suite_name, result_folder))
-# #
-# # return res
-# #
-# #
-# # def load_data_from_path_stage(var_dir: str, _, ctx: TestRun) -> None:
-# # for tp, vals in load_data_from_path(var_dir).items():
-# # ctx.results.setdefault(tp, []).extend(vals)
-# #
-# #
-# # def load_data_from(var_dir: str) -> Callable[[TestRun], None]:
-# # return functools.partial(load_data_from_path_stage, var_dir)