blob: 3e6bc3e7168a38271f2b43eb14b7c62534598606 [file] [log] [blame]
import logging
from typing import Tuple, Iterator
import numpy
from cephlib.numeric_types import DataSource, TimeSeries
from cephlib.storage_selectors import c_interpolate_ts_on_seconds_border
from .result_classes import IResultStorage
from .suits.io.fio_hist import expected_lat_bins
logger = logging.getLogger("wally")
# Separately for each test heatmaps & agg acroos whole time histos:
# * fio latency heatmap for all instances
# * data dev iops across all osd
# * data dev bw across all osd
# * date dev qd across all osd
# * journal dev iops across all osd
# * journal dev bw across all osd
# * journal dev qd across all osd
# * net dev pps across all hosts
# * net dev bps across all hosts
# Main API's
# get sensors by pattern
# allign values to seconds
# cut ranges for particular test
# transform into 2d histos (either make histos or rebin them) and clip outliers same time
AGG_TAG = 'ALL'
def find_all_series(rstorage: IResultStorage, suite_id: str, job_id: str, metric: str) -> Iterator[TimeSeries]:
"Iterated over selected metric for all nodes for given Suite/job"
return (rstorage.get_ts(ds) for ds in rstorage.iter_ts(suite_id=suite_id, job_id=job_id, metric=metric))
def get_aggregated(rstorage: IResultStorage, suite_id: str, job_id: str, metric: str,
trange: Tuple[int, int]) -> TimeSeries:
"Sum selected metric for all nodes for given Suite/job"
tss = list(find_all_series(rstorage, suite_id, job_id, metric))
if len(tss) == 0:
raise NameError("Can't found any TS for {},{},{}".format(suite_id, job_id, metric))
ds = DataSource(suite_id=suite_id, job_id=job_id, node_id=AGG_TAG, sensor='fio',
dev=AGG_TAG, metric=metric, tag='csv')
tss_inp = [c_interpolate_ts_on_seconds_border(ts, tp='fio', allow_broken_step=(metric == 'lat')) for ts in tss]
res = None
for ts in tss_inp:
if ts.time_units != 's':
msg = "time_units must be 's' for fio sensor"
logger.error(msg)
raise ValueError(msg)
if metric == 'lat' and (len(ts.data.shape) != 2 or ts.data.shape[1] != expected_lat_bins):
msg = "Sensor {}.{} on node %s has shape={}. Can only process sensors with shape=[X, {}].".format(
ts.source.dev, ts.source.sensor, ts.source.node_id, ts.data.shape, expected_lat_bins)
logger.error(msg)
raise ValueError(msg)
if metric != 'lat' and len(ts.data.shape) != 1:
msg = "Sensor {}.{} on node {} has shape={}. Can only process 1D sensors.".format(
ts.source.dev, ts.source.sensor, ts.source.node_id, ts.data.shape)
logger.error(msg)
raise ValueError(msg)
assert trange[0] >= ts.times[0] and trange[1] <= ts.times[-1], \
"[{}, {}] not in [{}, {}]".format(ts.times[0], ts.times[-1], trange[0], trange[-1])
idx1, idx2 = numpy.searchsorted(ts.times, trange)
idx2 += 1
assert (idx2 - idx1) == (trange[1] - trange[0] + 1), \
"Broken time array at {} for {}".format(trange, ts.source)
dt = ts.data[idx1: idx2]
if res is None:
res = dt
else:
assert res.shape == dt.shape, "res.shape(={}) != dt.shape(={})".format(res.shape, dt.shape)
res += dt
agg_ts = TimeSeries(res, source=ds,
times=tss_inp[0].times.copy(),
units=tss_inp[0].units,
histo_bins=tss_inp[0].histo_bins,
time_units=tss_inp[0].time_units)
return agg_ts