blob: 2b9037ebad530bc853319fb1ad3dac314257afb4 [file] [log] [blame]
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +03001import logging
kdanylov aka koder84de1e42017-05-22 14:00:07 +03002from typing import Tuple, Iterator, List, Iterable, Dict, Union, Callable, Set
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +03003
4import numpy
5
kdanylov aka koderb0833332017-05-13 20:39:17 +03006from cephlib.numeric_types import DataSource, TimeSeries
7from cephlib.storage_selectors import c_interpolate_ts_on_seconds_border
kdanylov aka koder84de1e42017-05-22 14:00:07 +03008from cephlib.node import NodeInfo
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +03009
kdanylov aka koder026e5f22017-05-15 01:04:39 +030010from .result_classes import IWallyStorage
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +030011from .suits.io.fio_hist import expected_lat_bins
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +030012
13
14logger = logging.getLogger("wally")
15
16# Separately for each test heatmaps & agg acroos whole time histos:
17# * fio latency heatmap for all instances
18# * data dev iops across all osd
19# * data dev bw across all osd
20# * date dev qd across all osd
21# * journal dev iops across all osd
22# * journal dev bw across all osd
23# * journal dev qd across all osd
24# * net dev pps across all hosts
25# * net dev bps across all hosts
26
27# Main API's
28# get sensors by pattern
29# allign values to seconds
30# cut ranges for particular test
31# transform into 2d histos (either make histos or rebin them) and clip outliers same time
32
33
34AGG_TAG = 'ALL'
35
36
kdanylov aka koder026e5f22017-05-15 01:04:39 +030037def find_all_series(rstorage: IWallyStorage, suite_id: str, job_id: str, metric: str) -> Iterator[TimeSeries]:
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +030038 "Iterated over selected metric for all nodes for given Suite/job"
kdanylov aka koderb0833332017-05-13 20:39:17 +030039 return (rstorage.get_ts(ds) for ds in rstorage.iter_ts(suite_id=suite_id, job_id=job_id, metric=metric))
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +030040
41
kdanylov aka koder026e5f22017-05-15 01:04:39 +030042def get_aggregated(rstorage: IWallyStorage, suite_id: str, job_id: str, metric: str,
kdanylov aka koderb0833332017-05-13 20:39:17 +030043 trange: Tuple[int, int]) -> TimeSeries:
kdanylov aka koder84de1e42017-05-22 14:00:07 +030044 "Sum selected fio metric for all nodes for given Suite/job"
45
46 key = (id(rstorage), suite_id, job_id, metric, trange)
47 aggregated_cache = rstorage.storage.other_caches['aggregated']
48 if key in aggregated_cache:
49 return aggregated_cache[key].copy()
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +030050
kdanylov aka koderb0833332017-05-13 20:39:17 +030051 tss = list(find_all_series(rstorage, suite_id, job_id, metric))
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +030052
53 if len(tss) == 0:
kdanylov aka koder938f75f2018-06-27 01:52:44 +030054 raise NameError(f"Can't found any TS for {suite_id},{job_id},{metric}")
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +030055
kdanylov aka koder84de1e42017-05-22 14:00:07 +030056 c_intp = c_interpolate_ts_on_seconds_border
57 tss_inp = [c_intp(ts.select(trange), tp='fio', allow_broken_step=(metric == 'lat')) for ts in tss]
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +030058
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +030059 res = None
kdanylov aka koder84de1e42017-05-22 14:00:07 +030060 res_times = None
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +030061
kdanylov aka koder2e5fce12017-05-23 01:47:36 +030062 for ts, ts_orig in zip(tss_inp, tss):
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +030063 if ts.time_units != 's':
64 msg = "time_units must be 's' for fio sensor"
65 logger.error(msg)
66 raise ValueError(msg)
67
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +030068 if metric == 'lat' and (len(ts.data.shape) != 2 or ts.data.shape[1] != expected_lat_bins):
kdanylov aka koder938f75f2018-06-27 01:52:44 +030069 msg = f"Sensor {ts.source.dev}.{ts.source.sensor} on node {ts.source.node_id} " + \
70 f"has shape={ts.data.shape}. Can only process sensors with shape=[X, {expected_lat_bins}]."
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +030071 logger.error(msg)
72 raise ValueError(msg)
73
74 if metric != 'lat' and len(ts.data.shape) != 1:
kdanylov aka koder938f75f2018-06-27 01:52:44 +030075 msg = f"Sensor {ts.source.dev}.{ts.source.sensor} on node {ts.source.node_id} " + \
76 f"has shape={ts.data.shape}. Can only process 1D sensors."
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +030077 logger.error(msg)
78 raise ValueError(msg)
79
kdanylov aka koder938f75f2018-06-27 01:52:44 +030080 assert trange[0] >= ts.times[0] and trange[1] <= ts.times[-1], \
81 f"[{ts.times[0]}, {ts.times[-1]}] not in [{trange[0]}, {trange[-1]}]"
82
kdanylov aka koder736e5c12017-05-07 17:27:14 +030083
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +030084 idx1, idx2 = numpy.searchsorted(ts.times, trange)
85 idx2 += 1
86
87 assert (idx2 - idx1) == (trange[1] - trange[0] + 1), \
88 "Broken time array at {} for {}".format(trange, ts.source)
89
90 dt = ts.data[idx1: idx2]
91 if res is None:
kdanylov aka koder84de1e42017-05-22 14:00:07 +030092 res = dt.copy()
93 res_times = ts.times[idx1: idx2].copy()
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +030094 else:
kdanylov aka koder938f75f2018-06-27 01:52:44 +030095 assert res.shape == dt.shape, f"res.shape(={res.shape}) != dt.shape(={dt.shape})"
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +030096 res += dt
97
kdanylov aka koder84de1e42017-05-22 14:00:07 +030098 ds = DataSource(suite_id=suite_id, job_id=job_id, node_id=AGG_TAG, sensor='fio',
99 dev=AGG_TAG, metric=metric, tag='csv')
kdanylov aka koderb0833332017-05-13 20:39:17 +0300100 agg_ts = TimeSeries(res, source=ds,
kdanylov aka koder84de1e42017-05-22 14:00:07 +0300101 times=res_times,
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +0300102 units=tss_inp[0].units,
103 histo_bins=tss_inp[0].histo_bins,
104 time_units=tss_inp[0].time_units)
kdanylov aka koder84de1e42017-05-22 14:00:07 +0300105 aggregated_cache[key] = agg_ts
106 return agg_ts.copy()
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300107
kdanylov aka koder84de1e42017-05-22 14:00:07 +0300108
109def get_nodes(storage: IWallyStorage, roles: Iterable[str]) -> List[NodeInfo]:
110 return [node for node in storage.load_nodes() if node.roles.intersection(roles)]
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300111