blob: 795c66b2278150a6462d6f71dc52bad3ae315625 [file] [log] [blame]
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +03001import logging
kdanylov aka koderb0833332017-05-13 20:39:17 +03002from typing import Tuple, Iterator
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +03003
4import numpy
5
kdanylov aka koderb0833332017-05-13 20:39:17 +03006from cephlib.numeric_types import DataSource, TimeSeries
7from cephlib.storage_selectors import c_interpolate_ts_on_seconds_border
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +03008
kdanylov aka koder026e5f22017-05-15 01:04:39 +03009from .result_classes import IWallyStorage
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +030010from .suits.io.fio_hist import expected_lat_bins
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +030011
12
13logger = logging.getLogger("wally")
14
15# Separately for each test heatmaps & agg acroos whole time histos:
16# * fio latency heatmap for all instances
17# * data dev iops across all osd
18# * data dev bw across all osd
19# * date dev qd across all osd
20# * journal dev iops across all osd
21# * journal dev bw across all osd
22# * journal dev qd across all osd
23# * net dev pps across all hosts
24# * net dev bps across all hosts
25
26# Main API's
27# get sensors by pattern
28# allign values to seconds
29# cut ranges for particular test
30# transform into 2d histos (either make histos or rebin them) and clip outliers same time
31
32
33AGG_TAG = 'ALL'
34
35
kdanylov aka koder026e5f22017-05-15 01:04:39 +030036def find_all_series(rstorage: IWallyStorage, suite_id: str, job_id: str, metric: str) -> Iterator[TimeSeries]:
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +030037 "Iterated over selected metric for all nodes for given Suite/job"
kdanylov aka koderb0833332017-05-13 20:39:17 +030038 return (rstorage.get_ts(ds) for ds in rstorage.iter_ts(suite_id=suite_id, job_id=job_id, metric=metric))
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +030039
40
kdanylov aka koder026e5f22017-05-15 01:04:39 +030041def get_aggregated(rstorage: IWallyStorage, suite_id: str, job_id: str, metric: str,
kdanylov aka koderb0833332017-05-13 20:39:17 +030042 trange: Tuple[int, int]) -> TimeSeries:
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +030043 "Sum selected metric for all nodes for given Suite/job"
44
kdanylov aka koderb0833332017-05-13 20:39:17 +030045 tss = list(find_all_series(rstorage, suite_id, job_id, metric))
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +030046
47 if len(tss) == 0:
kdanylov aka koderb0833332017-05-13 20:39:17 +030048 raise NameError("Can't found any TS for {},{},{}".format(suite_id, job_id, metric))
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +030049
kdanylov aka koderb0833332017-05-13 20:39:17 +030050 ds = DataSource(suite_id=suite_id, job_id=job_id, node_id=AGG_TAG, sensor='fio',
51 dev=AGG_TAG, metric=metric, tag='csv')
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +030052
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +030053 tss_inp = [c_interpolate_ts_on_seconds_border(ts, tp='fio', allow_broken_step=(metric == 'lat')) for ts in tss]
54 res = None
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +030055
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +030056 for ts in tss_inp:
57 if ts.time_units != 's':
58 msg = "time_units must be 's' for fio sensor"
59 logger.error(msg)
60 raise ValueError(msg)
61
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +030062 if metric == 'lat' and (len(ts.data.shape) != 2 or ts.data.shape[1] != expected_lat_bins):
kdanylov aka koder026e5f22017-05-15 01:04:39 +030063 msg = "Sensor {}.{} on node {} has shape={}. Can only process sensors with shape=[X, {}].".format(
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +030064 ts.source.dev, ts.source.sensor, ts.source.node_id, ts.data.shape, expected_lat_bins)
65 logger.error(msg)
66 raise ValueError(msg)
67
68 if metric != 'lat' and len(ts.data.shape) != 1:
69 msg = "Sensor {}.{} on node {} has shape={}. Can only process 1D sensors.".format(
70 ts.source.dev, ts.source.sensor, ts.source.node_id, ts.data.shape)
71 logger.error(msg)
72 raise ValueError(msg)
73
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +030074 assert trange[0] >= ts.times[0] and trange[1] <= ts.times[-1], \
75 "[{}, {}] not in [{}, {}]".format(ts.times[0], ts.times[-1], trange[0], trange[-1])
kdanylov aka koder736e5c12017-05-07 17:27:14 +030076
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +030077 idx1, idx2 = numpy.searchsorted(ts.times, trange)
78 idx2 += 1
79
80 assert (idx2 - idx1) == (trange[1] - trange[0] + 1), \
81 "Broken time array at {} for {}".format(trange, ts.source)
82
83 dt = ts.data[idx1: idx2]
84 if res is None:
85 res = dt
86 else:
87 assert res.shape == dt.shape, "res.shape(={}) != dt.shape(={})".format(res.shape, dt.shape)
88 res += dt
89
kdanylov aka koderb0833332017-05-13 20:39:17 +030090 agg_ts = TimeSeries(res, source=ds,
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +030091 times=tss_inp[0].times.copy(),
92 units=tss_inp[0].units,
93 histo_bins=tss_inp[0].histo_bins,
94 time_units=tss_inp[0].time_units)
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +030095
96 return agg_ts
97