blob: a5ac400841fabb28ae36fbdab826acdb82cb15f1 [file] [log] [blame]
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +03001import ctypes
2import logging
3import os.path
kdanylov aka koder736e5c12017-05-07 17:27:14 +03004from typing import Tuple, List, Iterable, Iterator, Optional, Union, Dict
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +03005from fractions import Fraction
6
7import numpy
8
9from cephlib.numeric import auto_edges2
10
11import wally
12from .hlstorage import ResultStorage
13from .node_interfaces import NodeInfo
14from .result_classes import DataSource, TimeSeries, SuiteConfig, JobConfig
15from .suits.io.fio import FioJobConfig
16from .suits.io.fio_hist import expected_lat_bins
17from .utils import unit_conversion_coef
18
19
20logger = logging.getLogger("wally")
21
22# Separately for each test heatmaps & agg acroos whole time histos:
23# * fio latency heatmap for all instances
24# * data dev iops across all osd
25# * data dev bw across all osd
26# * date dev qd across all osd
27# * journal dev iops across all osd
28# * journal dev bw across all osd
29# * journal dev qd across all osd
30# * net dev pps across all hosts
31# * net dev bps across all hosts
32
33# Main API's
34# get sensors by pattern
35# allign values to seconds
36# cut ranges for particular test
37# transform into 2d histos (either make histos or rebin them) and clip outliers same time
38
39
40AGG_TAG = 'ALL'
41
42
43def find_nodes_by_roles(rstorage: ResultStorage, node_roles: Iterable[str]) -> List[NodeInfo]:
44 nodes = rstorage.storage.load_list(NodeInfo, 'all_nodes') # type: List[NodeInfo]
45 node_roles_s = set(node_roles)
46 return [node for node in nodes if node.roles.intersection(node_roles_s)]
47
48
49def find_all_sensors(rstorage: ResultStorage,
50 node_roles: Iterable[str],
51 sensor: str,
52 metric: str) -> Iterator[TimeSeries]:
53 all_nodes_rr = "|".join(node.node_id for node in find_nodes_by_roles(rstorage, node_roles))
54 all_nodes_rr = "(?P<node>{})".format(all_nodes_rr)
55
56 for path, ds in rstorage.iter_sensors(all_nodes_rr, sensor=sensor, metric=metric):
57 ts = rstorage.load_sensor(ds)
58
59 # for sensors ts.times is array of pairs - collection_start_at, colelction_finished_at
60 # to make this array consistent with times in load data second item if each pair is dropped
61 ts.times = ts.times[::2]
62 yield ts
63
64
65def find_all_series(rstorage: ResultStorage, suite: SuiteConfig, job: JobConfig, metric: str) -> Iterator[TimeSeries]:
66 "Iterated over selected metric for all nodes for given Suite/job"
67 return rstorage.iter_ts(suite, job, metric=metric)
68
69
70def get_aggregated(rstorage: ResultStorage, suite: SuiteConfig, job: FioJobConfig, metric: str) -> TimeSeries:
71 "Sum selected metric for all nodes for given Suite/job"
72
73 tss = list(find_all_series(rstorage, suite, job, metric))
74
75 if len(tss) == 0:
76 raise NameError("Can't found any TS for {},{},{}".format(suite, job, metric))
77
78 ds = DataSource(suite_id=suite.storage_id,
79 job_id=job.storage_id,
80 node_id=AGG_TAG,
81 sensor='fio',
82 dev=AGG_TAG,
83 metric=metric,
84 tag='csv')
85
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +030086 tss_inp = [c_interpolate_ts_on_seconds_border(ts, tp='fio', allow_broken_step=(metric == 'lat')) for ts in tss]
87 res = None
88 trange = job.reliable_info_range_s
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +030089
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +030090 for ts in tss_inp:
91 if ts.time_units != 's':
92 msg = "time_units must be 's' for fio sensor"
93 logger.error(msg)
94 raise ValueError(msg)
95
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +030096 if metric == 'lat' and (len(ts.data.shape) != 2 or ts.data.shape[1] != expected_lat_bins):
97 msg = "Sensor {}.{} on node %s has shape={}. Can only process sensors with shape=[X, {}].".format(
98 ts.source.dev, ts.source.sensor, ts.source.node_id, ts.data.shape, expected_lat_bins)
99 logger.error(msg)
100 raise ValueError(msg)
101
102 if metric != 'lat' and len(ts.data.shape) != 1:
103 msg = "Sensor {}.{} on node {} has shape={}. Can only process 1D sensors.".format(
104 ts.source.dev, ts.source.sensor, ts.source.node_id, ts.data.shape)
105 logger.error(msg)
106 raise ValueError(msg)
107
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +0300108 assert trange[0] >= ts.times[0] and trange[1] <= ts.times[-1], \
109 "[{}, {}] not in [{}, {}]".format(ts.times[0], ts.times[-1], trange[0], trange[-1])
kdanylov aka koder736e5c12017-05-07 17:27:14 +0300110
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +0300111 idx1, idx2 = numpy.searchsorted(ts.times, trange)
112 idx2 += 1
113
114 assert (idx2 - idx1) == (trange[1] - trange[0] + 1), \
115 "Broken time array at {} for {}".format(trange, ts.source)
116
117 dt = ts.data[idx1: idx2]
118 if res is None:
119 res = dt
120 else:
121 assert res.shape == dt.shape, "res.shape(={}) != dt.shape(={})".format(res.shape, dt.shape)
122 res += dt
123
124 agg_ts = TimeSeries(metric,
125 raw=None,
126 source=ds,
127 data=res,
128 times=tss_inp[0].times.copy(),
129 units=tss_inp[0].units,
130 histo_bins=tss_inp[0].histo_bins,
131 time_units=tss_inp[0].time_units)
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300132
133 return agg_ts
134
135
136interpolated_cache = {}
137
138
kdanylov aka koder736e5c12017-05-07 17:27:14 +0300139c_interp_func_agg = None
kdanylov aka koder45183182017-04-30 23:55:40 +0300140c_interp_func_qd = None
kdanylov aka koder736e5c12017-05-07 17:27:14 +0300141c_interp_func_fio = None
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300142
143
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +0300144def c_interpolate_ts_on_seconds_border(ts: TimeSeries, nc: bool = False, tp: str = 'agg',
145 allow_broken_step: bool = False) -> TimeSeries:
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300146 "Interpolate time series to values on seconds borders"
kdanylov aka koder736e5c12017-05-07 17:27:14 +0300147 key = (ts.source.tpl, tp)
kdanylov aka koder45183182017-04-30 23:55:40 +0300148 if not nc and key in interpolated_cache:
149 return interpolated_cache[key].copy()
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300150
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +0300151 if tp in ('qd', 'agg'):
152 # both data and times must be 1d compact arrays
153 assert len(ts.data.strides) == 1, "ts.data.strides must be 1D, not " + repr(ts.data.strides)
154 assert ts.data.dtype.itemsize == ts.data.strides[0], "ts.data array must be compact"
155
kdanylov aka koder45183182017-04-30 23:55:40 +0300156 assert len(ts.times.strides) == 1, "ts.times.strides must be 1D, not " + repr(ts.times.strides)
157 assert ts.times.dtype.itemsize == ts.times.strides[0], "ts.times array must be compact"
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300158
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +0300159 assert len(ts.times) == len(ts.data), "len(times)={} != len(data)={} for {!s}"\
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300160 .format(len(ts.times), len(ts.data), ts.source)
161
162 rcoef = 1 / unit_conversion_coef(ts.time_units, 's') # type: Union[int, Fraction]
163
164 if isinstance(rcoef, Fraction):
165 assert rcoef.denominator == 1, "Incorrect conversion coef {!r}".format(rcoef)
166 rcoef = rcoef.numerator
167
168 assert rcoef >= 1 and isinstance(rcoef, int), "Incorrect conversion coef {!r}".format(rcoef)
169 coef = int(rcoef) # make typechecker happy
170
kdanylov aka koder736e5c12017-05-07 17:27:14 +0300171 global c_interp_func_agg
kdanylov aka koder45183182017-04-30 23:55:40 +0300172 global c_interp_func_qd
kdanylov aka koder736e5c12017-05-07 17:27:14 +0300173 global c_interp_func_fio
kdanylov aka koder45183182017-04-30 23:55:40 +0300174
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300175 uint64_p = ctypes.POINTER(ctypes.c_uint64)
176
kdanylov aka koder736e5c12017-05-07 17:27:14 +0300177 if c_interp_func_agg is None:
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300178 dirname = os.path.dirname(os.path.dirname(wally.__file__))
179 path = os.path.join(dirname, 'clib', 'libwally.so')
180 cdll = ctypes.CDLL(path)
kdanylov aka koder45183182017-04-30 23:55:40 +0300181
kdanylov aka koder736e5c12017-05-07 17:27:14 +0300182 c_interp_func_agg = cdll.interpolate_ts_on_seconds_border
kdanylov aka koder45183182017-04-30 23:55:40 +0300183 c_interp_func_qd = cdll.interpolate_ts_on_seconds_border_qd
kdanylov aka koder736e5c12017-05-07 17:27:14 +0300184
185 for func in (c_interp_func_agg, c_interp_func_qd):
186 func.argtypes = [
187 ctypes.c_uint, # input_size
188 ctypes.c_uint, # output_size
189 uint64_p, # times
190 uint64_p, # values
191 ctypes.c_uint, # time_scale_coef
192 uint64_p, # output
193 ]
194 func.restype = ctypes.c_uint # output array used size
195
196 c_interp_func_fio = cdll.interpolate_ts_on_seconds_border_fio
197 c_interp_func_fio.restype = ctypes.c_int
198 c_interp_func_fio.argtypes = [
199 ctypes.c_uint, # input_size
200 ctypes.c_uint, # output_size
201 uint64_p, # times
202 ctypes.c_uint, # time_scale_coef
203 uint64_p, # output indexes
204 ctypes.c_uint64, # empty placeholder
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +0300205 ctypes.c_bool # allow broken steps
kdanylov aka koder736e5c12017-05-07 17:27:14 +0300206 ]
kdanylov aka koder45183182017-04-30 23:55:40 +0300207
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300208 assert ts.data.dtype.name == 'uint64', "Data dtype for {}=={} != uint64".format(ts.source, ts.data.dtype.name)
209 assert ts.times.dtype.name == 'uint64', "Time dtype for {}=={} != uint64".format(ts.source, ts.times.dtype.name)
210
211 output_sz = int(ts.times[-1]) // coef - int(ts.times[0]) // coef + 2
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300212 result = numpy.zeros(output_sz, dtype=ts.data.dtype.name)
213
kdanylov aka koder736e5c12017-05-07 17:27:14 +0300214 if tp in ('qd', 'agg'):
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +0300215 assert not allow_broken_step, "Broken steps aren't supported for non-fio arrays"
kdanylov aka koder736e5c12017-05-07 17:27:14 +0300216 func = c_interp_func_qd if tp == 'qd' else c_interp_func_agg
217 sz = func(ts.data.size,
218 output_sz,
219 ts.times.ctypes.data_as(uint64_p),
220 ts.data.ctypes.data_as(uint64_p),
221 coef,
222 result.ctypes.data_as(uint64_p))
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300223
kdanylov aka koder45183182017-04-30 23:55:40 +0300224 result = result[:sz]
225 output_sz = sz
kdanylov aka koder45183182017-04-30 23:55:40 +0300226
kdanylov aka koder736e5c12017-05-07 17:27:14 +0300227 rtimes = int(ts.times[0] // coef) + numpy.arange(output_sz, dtype=ts.times.dtype)
228 else:
229 assert tp == 'fio'
230 ridx = numpy.zeros(output_sz, dtype=ts.times.dtype)
231 no_data = (output_sz + 1)
232 sz_or_err = c_interp_func_fio(ts.times.size,
233 output_sz,
234 ts.times.ctypes.data_as(uint64_p),
235 coef,
236 ridx.ctypes.data_as(uint64_p),
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +0300237 no_data,
238 allow_broken_step)
kdanylov aka koder736e5c12017-05-07 17:27:14 +0300239 if sz_or_err <= 0:
kdanylov aka koder3a9e5db2017-05-09 20:00:44 +0300240 raise ValueError("Error in input array at index {}. {}".format(-sz_or_err, ts.source))
kdanylov aka koder736e5c12017-05-07 17:27:14 +0300241
242 rtimes = int(ts.times[0] // coef) + numpy.arange(sz_or_err, dtype=ts.times.dtype)
243
244 empty = numpy.zeros(len(ts.histo_bins), dtype=ts.data.dtype) if ts.source.metric == 'lat' else 0
245 res = []
246 for idx in ridx[:sz_or_err]:
247 if idx == no_data:
248 res.append(empty)
249 else:
250 res.append(ts.data[idx])
251 result = numpy.array(res, dtype=ts.data.dtype)
252
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300253 res_ts = TimeSeries(ts.name, None, result,
kdanylov aka koder45183182017-04-30 23:55:40 +0300254 times=rtimes,
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300255 units=ts.units,
256 time_units='s',
257 source=ts.source(),
258 histo_bins=ts.histo_bins)
259
260 if not nc:
kdanylov aka koder45183182017-04-30 23:55:40 +0300261 interpolated_cache[ts.source.tpl] = res_ts.copy()
262
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300263 return res_ts
264
265
266def get_ts_for_time_range(ts: TimeSeries, time_range: Tuple[int, int]) -> TimeSeries:
267 """Return sensor values for given node for given period. Return per second estimated values array
268 Raise an error if required range is not full covered by data in storage"""
269
270 assert ts.time_units == 's', "{} != s for {!s}".format(ts.time_units, ts.source)
271 assert len(ts.times) == len(ts.data), "Time(={}) and data(={}) sizes doesn't equal for {!s}"\
272 .format(len(ts.times), len(ts.data), ts.source)
273
274 if time_range[0] < ts.times[0] or time_range[1] > ts.times[-1]:
275 raise AssertionError(("Incorrect data for get_sensor - time_range={!r}, collected_at=[{}, ..., {}]," +
276 "sensor = {}_{}.{}.{}").format(time_range, ts.times[0], ts.times[-1],
277 ts.source.node_id, ts.source.sensor, ts.source.dev,
278 ts.source.metric))
279 idx1, idx2 = numpy.searchsorted(ts.times, time_range)
280 return TimeSeries(ts.name, None,
281 ts.data[idx1:idx2],
282 times=ts.times[idx1:idx2],
283 units=ts.units,
284 time_units=ts.time_units,
285 source=ts.source,
286 histo_bins=ts.histo_bins)
287
288
289def make_2d_histo(tss: List[TimeSeries],
290 outliers_range: Tuple[float, float] = (0.02, 0.98),
291 bins_count: int = 20,
292 log_bins: bool = False) -> TimeSeries:
293
294 # validate input data
295 for ts in tss:
296 assert len(ts.times) == len(ts.data), "Time(={}) and data(={}) sizes doesn't equal for {!s}"\
297 .format(len(ts.times), len(ts.data), ts.source)
298 assert ts.time_units == 's', "All arrays should have the same data units"
299 assert ts.units == tss[0].units, "All arrays should have the same data units"
300 assert ts.data.shape == tss[0].data.shape, "All arrays should have the same data size"
301 assert len(ts.data.shape) == 1, "All arrays should be 1d"
302
303 whole_arr = numpy.concatenate([ts.data for ts in tss])
304 whole_arr.shape = [len(tss), -1]
305
306 if outliers_range is not None:
307 max_vl, begin, end, min_vl = numpy.percentile(whole_arr,
308 [0, outliers_range[0] * 100, outliers_range[1] * 100, 100])
309 bins_edges = auto_edges2(begin, end, bins=bins_count, log_space=log_bins)
310 fixed_bins_edges = bins_edges.copy()
311 fixed_bins_edges[0] = begin
312 fixed_bins_edges[-1] = end
313 else:
314 begin, end = numpy.percentile(whole_arr, [0, 100])
315 bins_edges = auto_edges2(begin, end, bins=bins_count, log_space=log_bins)
316 fixed_bins_edges = bins_edges
317
318 res_data = numpy.concatenate(numpy.histogram(column, fixed_bins_edges) for column in whole_arr.T)
319 res_data.shape = (len(tss), -1)
320 res = TimeSeries(name=tss[0].name,
321 raw=None,
322 data=res_data,
323 times=tss[0].times,
324 units=tss[0].units,
325 source=tss[0].source,
326 time_units=tss[0].time_units,
327 histo_bins=bins_edges)
328 return res
329
330
331def aggregate_histograms(tss: List[TimeSeries],
332 outliers_range: Tuple[float, float] = (0.02, 0.98),
333 bins_count: int = 20,
334 log_bins: bool = False) -> TimeSeries:
335
336 # validate input data
337 for ts in tss:
338 assert len(ts.times) == len(ts.data), "Need to use stripped time"
339 assert ts.time_units == 's', "All arrays should have the same data units"
340 assert ts.units == tss[0].units, "All arrays should have the same data units"
341 assert ts.data.shape == tss[0].data.shape, "All arrays should have the same data size"
342 assert len(ts.data.shape) == 2, "All arrays should be 2d"
343 assert ts.histo_bins is not None, "All arrays should be 2d"
344
345 whole_arr = numpy.concatenate([ts.data for ts in tss])
346 whole_arr.shape = [len(tss), -1]
347
348 max_val = whole_arr.min()
349 min_val = whole_arr.max()
350
351 if outliers_range is not None:
352 begin, end = numpy.percentile(whole_arr, [outliers_range[0] * 100, outliers_range[1] * 100])
353 else:
354 begin = min_val
355 end = max_val
356
357 bins_edges = auto_edges2(begin, end, bins=bins_count, log_space=log_bins)
358
359 if outliers_range is not None:
360 fixed_bins_edges = bins_edges.copy()
361 fixed_bins_edges[0] = begin
362 fixed_bins_edges[-1] = end
363 else:
364 fixed_bins_edges = bins_edges
365
366 res_data = numpy.concatenate(numpy.histogram(column, fixed_bins_edges) for column in whole_arr.T)
367 res_data.shape = (len(tss), -1)
368 return TimeSeries(name=tss[0].name,
369 raw=None,
370 data=res_data,
371 times=tss[0].times,
372 units=tss[0].units,
373 source=tss[0].source,
374 time_units=tss[0].time_units,
375 histo_bins=fixed_bins_edges)
376
377
kdanylov aka koder45183182017-04-30 23:55:40 +0300378qd_metrics = {'io_queue'}
kdanylov aka koder736e5c12017-05-07 17:27:14 +0300379summ_sensors_cache = {} # type: Dict[Tuple[Tuple[str, ...], str, str, Tuple[int, int], int], Optional[TimeSeries]]
kdanylov aka koder45183182017-04-30 23:55:40 +0300380
381
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300382def summ_sensors(rstorage: ResultStorage,
383 roles: List[str],
384 sensor: str,
385 metric: str,
kdanylov aka koder736e5c12017-05-07 17:27:14 +0300386 time_range: Tuple[int, int],
387 nc: bool = False) -> Optional[TimeSeries]:
388
389 key = (tuple(roles), sensor, metric, time_range, id(ResultStorage))
390 if not nc and key in summ_sensors_cache:
391 return summ_sensors_cache[key].copy()
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300392
393 res = None # type: Optional[TimeSeries]
394 for node in find_nodes_by_roles(rstorage, roles):
395 for _, ds in rstorage.iter_sensors(node_id=node.node_id, sensor=sensor, metric=metric):
396 data = rstorage.load_sensor(ds)
kdanylov aka koder736e5c12017-05-07 17:27:14 +0300397 data = c_interpolate_ts_on_seconds_border(data, 'qd' if metric in qd_metrics else 'agg')
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300398 data = get_ts_for_time_range(data, time_range)
399 if res is None:
400 res = data
401 res.data = res.data.copy()
402 else:
403 res.data += data.data
kdanylov aka koder736e5c12017-05-07 17:27:14 +0300404
405 if not nc:
406 summ_sensors_cache[key] = res
407 if len(summ_sensors_cache) > 1024:
408 logger.warning("summ_sensors_cache cache too large %s > 1024", len(summ_sensors_cache))
409
410 return res if res is None else res.copy()
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300411
412
413def find_sensors_to_2d(rstorage: ResultStorage,
414 roles: List[str],
415 sensor: str,
416 devs: List[str],
417 metric: str,
418 time_range: Tuple[int, int]) -> numpy.ndarray:
419
420 res = [] # type: List[TimeSeries]
421 for node in find_nodes_by_roles(rstorage, roles):
422 for dev in devs:
423 for _, ds in rstorage.iter_sensors(node_id=node.node_id, sensor=sensor, dev=dev, metric=metric):
424 data = rstorage.load_sensor(ds)
kdanylov aka koder736e5c12017-05-07 17:27:14 +0300425 data = c_interpolate_ts_on_seconds_border(data, 'qd' if metric in qd_metrics else 'agg')
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300426 data = get_ts_for_time_range(data, time_range)
427 res.append(data.data)
428 res2d = numpy.concatenate(res)
429 res2d.shape = ((len(res), -1))
430 return res2d