blob: 66a6ee5f3bfffaa3760fbf4c97158186b57d37b3 [file] [log] [blame]
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +03001import ctypes
2import logging
3import os.path
4from typing import Tuple, List, Iterable, Iterator, Optional, Union
5from fractions import Fraction
6
7import numpy
8
9from cephlib.numeric import auto_edges2
10
11import wally
12from .hlstorage import ResultStorage
13from .node_interfaces import NodeInfo
14from .result_classes import DataSource, TimeSeries, SuiteConfig, JobConfig
15from .suits.io.fio import FioJobConfig
16from .suits.io.fio_hist import expected_lat_bins
17from .utils import unit_conversion_coef
18
19
20logger = logging.getLogger("wally")
21
22# Separately for each test heatmaps & agg acroos whole time histos:
23# * fio latency heatmap for all instances
24# * data dev iops across all osd
25# * data dev bw across all osd
26# * date dev qd across all osd
27# * journal dev iops across all osd
28# * journal dev bw across all osd
29# * journal dev qd across all osd
30# * net dev pps across all hosts
31# * net dev bps across all hosts
32
33# Main API's
34# get sensors by pattern
35# allign values to seconds
36# cut ranges for particular test
37# transform into 2d histos (either make histos or rebin them) and clip outliers same time
38
39
40AGG_TAG = 'ALL'
41
42
43def find_nodes_by_roles(rstorage: ResultStorage, node_roles: Iterable[str]) -> List[NodeInfo]:
44 nodes = rstorage.storage.load_list(NodeInfo, 'all_nodes') # type: List[NodeInfo]
45 node_roles_s = set(node_roles)
46 return [node for node in nodes if node.roles.intersection(node_roles_s)]
47
48
49def find_all_sensors(rstorage: ResultStorage,
50 node_roles: Iterable[str],
51 sensor: str,
52 metric: str) -> Iterator[TimeSeries]:
53 all_nodes_rr = "|".join(node.node_id for node in find_nodes_by_roles(rstorage, node_roles))
54 all_nodes_rr = "(?P<node>{})".format(all_nodes_rr)
55
56 for path, ds in rstorage.iter_sensors(all_nodes_rr, sensor=sensor, metric=metric):
57 ts = rstorage.load_sensor(ds)
58
59 # for sensors ts.times is array of pairs - collection_start_at, colelction_finished_at
60 # to make this array consistent with times in load data second item if each pair is dropped
61 ts.times = ts.times[::2]
62 yield ts
63
64
65def find_all_series(rstorage: ResultStorage, suite: SuiteConfig, job: JobConfig, metric: str) -> Iterator[TimeSeries]:
66 "Iterated over selected metric for all nodes for given Suite/job"
67 return rstorage.iter_ts(suite, job, metric=metric)
68
69
70def get_aggregated(rstorage: ResultStorage, suite: SuiteConfig, job: FioJobConfig, metric: str) -> TimeSeries:
71 "Sum selected metric for all nodes for given Suite/job"
72
73 tss = list(find_all_series(rstorage, suite, job, metric))
74
75 if len(tss) == 0:
76 raise NameError("Can't found any TS for {},{},{}".format(suite, job, metric))
77
78 ds = DataSource(suite_id=suite.storage_id,
79 job_id=job.storage_id,
80 node_id=AGG_TAG,
81 sensor='fio',
82 dev=AGG_TAG,
83 metric=metric,
84 tag='csv')
85
86 agg_ts = TimeSeries(metric,
87 raw=None,
88 source=ds,
89 data=numpy.zeros(tss[0].data.shape, dtype=tss[0].data.dtype),
90 times=tss[0].times.copy(),
91 units=tss[0].units,
92 histo_bins=tss[0].histo_bins,
93 time_units=tss[0].time_units)
94
95 for ts in tss:
96 if metric == 'lat' and (len(ts.data.shape) != 2 or ts.data.shape[1] != expected_lat_bins):
97 msg = "Sensor {}.{} on node %s has shape={}. Can only process sensors with shape=[X, {}].".format(
98 ts.source.dev, ts.source.sensor, ts.source.node_id, ts.data.shape, expected_lat_bins)
99 logger.error(msg)
100 raise ValueError(msg)
101
102 if metric != 'lat' and len(ts.data.shape) != 1:
103 msg = "Sensor {}.{} on node {} has shape={}. Can only process 1D sensors.".format(
104 ts.source.dev, ts.source.sensor, ts.source.node_id, ts.data.shape)
105 logger.error(msg)
106 raise ValueError(msg)
107
108 # TODO: match times on different ts
109 agg_ts.data += ts.data
110
111 return agg_ts
112
113
114interpolated_cache = {}
115
116
117def interpolate_ts_on_seconds_border(ts: TimeSeries, nc: bool = False) -> TimeSeries:
118 "Interpolate time series to values on seconds borders"
kdanylov aka koder45183182017-04-30 23:55:40 +0300119 logging.warning("This implementation of interpolate_ts_on_seconds_border is deplricated and should be updated")
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300120
121 if not nc and ts.source.tpl in interpolated_cache:
122 return interpolated_cache[ts.source.tpl]
123
124 assert len(ts.times) == len(ts.data), "Time(={}) and data(={}) sizes doesn't equal for {!s}"\
125 .format(len(ts.times), len(ts.data), ts.source)
126
127 rcoef = 1 / unit_conversion_coef(ts.time_units, 's') # type: Union[int, Fraction]
128
129 if isinstance(rcoef, Fraction):
130 assert rcoef.denominator == 1, "Incorrect conversion coef {!r}".format(rcoef)
131 rcoef = rcoef.numerator
132
133 assert rcoef >= 1 and isinstance(rcoef, int), "Incorrect conversion coef {!r}".format(rcoef)
134 coef = int(rcoef) # make typechecker happy
135
136 # round to seconds border
137 begin = int(ts.times[0] / coef + 1) * coef
138 end = int(ts.times[-1] / coef) * coef
139
140 # current real data time chunk begin time
141 edge_it = iter(ts.times)
142
143 # current real data value
144 val_it = iter(ts.data)
145
146 # result array, cumulative value per second
147 result = numpy.empty([(end - begin) // coef], dtype=ts.data.dtype)
148 idx = 0
149 curr_summ = 0
150
151 # end of current time slot
152 results_cell_ends = begin + coef
153
154 # hack to unify looping
155 real_data_end = next(edge_it)
156 while results_cell_ends <= end:
157 real_data_start = real_data_end
158 real_data_end = next(edge_it)
159 real_val_left = next(val_it)
160
161 # real data "speed" for interval [real_data_start, real_data_end]
162 real_val_ps = float(real_val_left) / (real_data_end - real_data_start)
163
164 while real_data_end >= results_cell_ends and results_cell_ends <= end:
165 # part of current real value, which is fit into current result cell
166 curr_real_chunk = int((results_cell_ends - real_data_start) * real_val_ps)
167
168 # calculate rest of real data for next result cell
169 real_val_left -= curr_real_chunk
170 result[idx] = curr_summ + curr_real_chunk
171 idx += 1
172 curr_summ = 0
173
174 # adjust real data start time
175 real_data_start = results_cell_ends
176 results_cell_ends += coef
177
178 # don't lost any real data
179 curr_summ += real_val_left
180
181 assert idx == len(result), "Wrong output array size - idx(={}) != len(result)(={})".format(idx, len(result))
182
183 res_ts = TimeSeries(ts.name, None, result,
184 times=int(begin // coef) + numpy.arange(idx, dtype=ts.times.dtype),
185 units=ts.units,
186 time_units='s',
187 source=ts.source(),
188 histo_bins=ts.histo_bins)
189
190 if not nc:
191 interpolated_cache[ts.source.tpl] = res_ts
192
193 return res_ts
194
195
196c_interp_func = None
kdanylov aka koder45183182017-04-30 23:55:40 +0300197c_interp_func_qd = None
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300198
199
kdanylov aka koder45183182017-04-30 23:55:40 +0300200def c_interpolate_ts_on_seconds_border(ts: TimeSeries, nc: bool = False, qd: bool = False) -> TimeSeries:
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300201 "Interpolate time series to values on seconds borders"
kdanylov aka koder45183182017-04-30 23:55:40 +0300202 key = (ts.source.tpl, qd)
203 if not nc and key in interpolated_cache:
204 return interpolated_cache[key].copy()
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300205
kdanylov aka koder45183182017-04-30 23:55:40 +0300206 # both data and times must be 1d compact arrays
207 assert len(ts.data.strides) == 1, "ts.data.strides must be 1D, not " + repr(ts.data.strides)
208 assert ts.data.dtype.itemsize == ts.data.strides[0], "ts.data array must be compact"
209 assert len(ts.times.strides) == 1, "ts.times.strides must be 1D, not " + repr(ts.times.strides)
210 assert ts.times.dtype.itemsize == ts.times.strides[0], "ts.times array must be compact"
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300211
212 assert len(ts.times) == len(ts.data), "Time(={}) and data(={}) sizes doesn't equal for {!s}"\
213 .format(len(ts.times), len(ts.data), ts.source)
214
215 rcoef = 1 / unit_conversion_coef(ts.time_units, 's') # type: Union[int, Fraction]
216
217 if isinstance(rcoef, Fraction):
218 assert rcoef.denominator == 1, "Incorrect conversion coef {!r}".format(rcoef)
219 rcoef = rcoef.numerator
220
221 assert rcoef >= 1 and isinstance(rcoef, int), "Incorrect conversion coef {!r}".format(rcoef)
222 coef = int(rcoef) # make typechecker happy
223
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300224 global c_interp_func
kdanylov aka koder45183182017-04-30 23:55:40 +0300225 global c_interp_func_qd
226
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300227 uint64_p = ctypes.POINTER(ctypes.c_uint64)
228
229 if c_interp_func is None:
230 dirname = os.path.dirname(os.path.dirname(wally.__file__))
231 path = os.path.join(dirname, 'clib', 'libwally.so')
232 cdll = ctypes.CDLL(path)
kdanylov aka koder45183182017-04-30 23:55:40 +0300233
234 c_interp_func = cdll.interpolate_ts_on_seconds_border
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300235 c_interp_func.argtypes = [
236 ctypes.c_uint, # input_size
237 ctypes.c_uint, # output_size
238 uint64_p, # times
239 uint64_p, # values
240 ctypes.c_uint, # time_scale_coef
241 uint64_p, # output
242 ]
243 c_interp_func.restype = None
244
kdanylov aka koder45183182017-04-30 23:55:40 +0300245 c_interp_func_qd = cdll.interpolate_ts_on_seconds_border_qd
246 c_interp_func_qd.argtypes = [
247 ctypes.c_uint, # input_size
248 ctypes.c_uint, # output_size
249 uint64_p, # times
250 uint64_p, # values
251 ctypes.c_uint, # time_scale_coef
252 uint64_p, # output
253 ]
254 c_interp_func_qd.restype = ctypes.c_uint
255
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300256 assert ts.data.dtype.name == 'uint64', "Data dtype for {}=={} != uint64".format(ts.source, ts.data.dtype.name)
257 assert ts.times.dtype.name == 'uint64', "Time dtype for {}=={} != uint64".format(ts.source, ts.times.dtype.name)
258
259 output_sz = int(ts.times[-1]) // coef - int(ts.times[0]) // coef + 2
260 # print("output_sz =", output_sz, "coef =", coef)
261 result = numpy.zeros(output_sz, dtype=ts.data.dtype.name)
262
kdanylov aka koder45183182017-04-30 23:55:40 +0300263 if qd:
264 func = c_interp_func_qd
265 else:
266 func = c_interp_func
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300267
kdanylov aka koder45183182017-04-30 23:55:40 +0300268 sz = func(ts.data.size,
269 output_sz,
270 ts.times.ctypes.data_as(uint64_p),
271 ts.data.ctypes.data_as(uint64_p),
272 coef,
273 result.ctypes.data_as(uint64_p))
274
275 if qd:
276 result = result[:sz]
277 output_sz = sz
278 else:
279 assert sz is None
280
281 rtimes = int(ts.times[0] // coef) + numpy.arange(output_sz, dtype=ts.times.dtype)
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300282 res_ts = TimeSeries(ts.name, None, result,
kdanylov aka koder45183182017-04-30 23:55:40 +0300283 times=rtimes,
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300284 units=ts.units,
285 time_units='s',
286 source=ts.source(),
287 histo_bins=ts.histo_bins)
288
289 if not nc:
kdanylov aka koder45183182017-04-30 23:55:40 +0300290 interpolated_cache[ts.source.tpl] = res_ts.copy()
291
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300292 return res_ts
293
294
295def get_ts_for_time_range(ts: TimeSeries, time_range: Tuple[int, int]) -> TimeSeries:
296 """Return sensor values for given node for given period. Return per second estimated values array
297 Raise an error if required range is not full covered by data in storage"""
298
299 assert ts.time_units == 's', "{} != s for {!s}".format(ts.time_units, ts.source)
300 assert len(ts.times) == len(ts.data), "Time(={}) and data(={}) sizes doesn't equal for {!s}"\
301 .format(len(ts.times), len(ts.data), ts.source)
302
303 if time_range[0] < ts.times[0] or time_range[1] > ts.times[-1]:
304 raise AssertionError(("Incorrect data for get_sensor - time_range={!r}, collected_at=[{}, ..., {}]," +
305 "sensor = {}_{}.{}.{}").format(time_range, ts.times[0], ts.times[-1],
306 ts.source.node_id, ts.source.sensor, ts.source.dev,
307 ts.source.metric))
308 idx1, idx2 = numpy.searchsorted(ts.times, time_range)
309 return TimeSeries(ts.name, None,
310 ts.data[idx1:idx2],
311 times=ts.times[idx1:idx2],
312 units=ts.units,
313 time_units=ts.time_units,
314 source=ts.source,
315 histo_bins=ts.histo_bins)
316
317
318def make_2d_histo(tss: List[TimeSeries],
319 outliers_range: Tuple[float, float] = (0.02, 0.98),
320 bins_count: int = 20,
321 log_bins: bool = False) -> TimeSeries:
322
323 # validate input data
324 for ts in tss:
325 assert len(ts.times) == len(ts.data), "Time(={}) and data(={}) sizes doesn't equal for {!s}"\
326 .format(len(ts.times), len(ts.data), ts.source)
327 assert ts.time_units == 's', "All arrays should have the same data units"
328 assert ts.units == tss[0].units, "All arrays should have the same data units"
329 assert ts.data.shape == tss[0].data.shape, "All arrays should have the same data size"
330 assert len(ts.data.shape) == 1, "All arrays should be 1d"
331
332 whole_arr = numpy.concatenate([ts.data for ts in tss])
333 whole_arr.shape = [len(tss), -1]
334
335 if outliers_range is not None:
336 max_vl, begin, end, min_vl = numpy.percentile(whole_arr,
337 [0, outliers_range[0] * 100, outliers_range[1] * 100, 100])
338 bins_edges = auto_edges2(begin, end, bins=bins_count, log_space=log_bins)
339 fixed_bins_edges = bins_edges.copy()
340 fixed_bins_edges[0] = begin
341 fixed_bins_edges[-1] = end
342 else:
343 begin, end = numpy.percentile(whole_arr, [0, 100])
344 bins_edges = auto_edges2(begin, end, bins=bins_count, log_space=log_bins)
345 fixed_bins_edges = bins_edges
346
347 res_data = numpy.concatenate(numpy.histogram(column, fixed_bins_edges) for column in whole_arr.T)
348 res_data.shape = (len(tss), -1)
349 res = TimeSeries(name=tss[0].name,
350 raw=None,
351 data=res_data,
352 times=tss[0].times,
353 units=tss[0].units,
354 source=tss[0].source,
355 time_units=tss[0].time_units,
356 histo_bins=bins_edges)
357 return res
358
359
360def aggregate_histograms(tss: List[TimeSeries],
361 outliers_range: Tuple[float, float] = (0.02, 0.98),
362 bins_count: int = 20,
363 log_bins: bool = False) -> TimeSeries:
364
365 # validate input data
366 for ts in tss:
367 assert len(ts.times) == len(ts.data), "Need to use stripped time"
368 assert ts.time_units == 's', "All arrays should have the same data units"
369 assert ts.units == tss[0].units, "All arrays should have the same data units"
370 assert ts.data.shape == tss[0].data.shape, "All arrays should have the same data size"
371 assert len(ts.data.shape) == 2, "All arrays should be 2d"
372 assert ts.histo_bins is not None, "All arrays should be 2d"
373
374 whole_arr = numpy.concatenate([ts.data for ts in tss])
375 whole_arr.shape = [len(tss), -1]
376
377 max_val = whole_arr.min()
378 min_val = whole_arr.max()
379
380 if outliers_range is not None:
381 begin, end = numpy.percentile(whole_arr, [outliers_range[0] * 100, outliers_range[1] * 100])
382 else:
383 begin = min_val
384 end = max_val
385
386 bins_edges = auto_edges2(begin, end, bins=bins_count, log_space=log_bins)
387
388 if outliers_range is not None:
389 fixed_bins_edges = bins_edges.copy()
390 fixed_bins_edges[0] = begin
391 fixed_bins_edges[-1] = end
392 else:
393 fixed_bins_edges = bins_edges
394
395 res_data = numpy.concatenate(numpy.histogram(column, fixed_bins_edges) for column in whole_arr.T)
396 res_data.shape = (len(tss), -1)
397 return TimeSeries(name=tss[0].name,
398 raw=None,
399 data=res_data,
400 times=tss[0].times,
401 units=tss[0].units,
402 source=tss[0].source,
403 time_units=tss[0].time_units,
404 histo_bins=fixed_bins_edges)
405
406
kdanylov aka koder45183182017-04-30 23:55:40 +0300407qd_metrics = {'io_queue'}
408
409
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300410def summ_sensors(rstorage: ResultStorage,
411 roles: List[str],
412 sensor: str,
413 metric: str,
414 time_range: Tuple[int, int]) -> Optional[TimeSeries]:
415
416 res = None # type: Optional[TimeSeries]
417 for node in find_nodes_by_roles(rstorage, roles):
418 for _, ds in rstorage.iter_sensors(node_id=node.node_id, sensor=sensor, metric=metric):
419 data = rstorage.load_sensor(ds)
kdanylov aka koder45183182017-04-30 23:55:40 +0300420 data = c_interpolate_ts_on_seconds_border(data, qd=metric in qd_metrics)
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300421 data = get_ts_for_time_range(data, time_range)
422 if res is None:
423 res = data
424 res.data = res.data.copy()
425 else:
426 res.data += data.data
427 return res
428
429
430def find_sensors_to_2d(rstorage: ResultStorage,
431 roles: List[str],
432 sensor: str,
433 devs: List[str],
434 metric: str,
435 time_range: Tuple[int, int]) -> numpy.ndarray:
436
437 res = [] # type: List[TimeSeries]
438 for node in find_nodes_by_roles(rstorage, roles):
439 for dev in devs:
440 for _, ds in rstorage.iter_sensors(node_id=node.node_id, sensor=sensor, dev=dev, metric=metric):
441 data = rstorage.load_sensor(ds)
kdanylov aka koder45183182017-04-30 23:55:40 +0300442 data = c_interpolate_ts_on_seconds_border(data, qd=metric in qd_metrics)
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300443 data = get_ts_for_time_range(data, time_range)
444 res.append(data.data)
445 res2d = numpy.concatenate(res)
446 res2d.shape = ((len(res), -1))
447 return res2d