blob: 53b822b816d012d75c828057bcde636fc39ed44f [file] [log] [blame]
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +03001import ctypes
2import logging
3import os.path
4from typing import Tuple, List, Iterable, Iterator, Optional, Union
5from fractions import Fraction
6
7import numpy
8
9from cephlib.numeric import auto_edges2
10
11import wally
12from .hlstorage import ResultStorage
13from .node_interfaces import NodeInfo
14from .result_classes import DataSource, TimeSeries, SuiteConfig, JobConfig
15from .suits.io.fio import FioJobConfig
16from .suits.io.fio_hist import expected_lat_bins
17from .utils import unit_conversion_coef
18
19
20logger = logging.getLogger("wally")
21
22# Separately for each test heatmaps & agg acroos whole time histos:
23# * fio latency heatmap for all instances
24# * data dev iops across all osd
25# * data dev bw across all osd
26# * date dev qd across all osd
27# * journal dev iops across all osd
28# * journal dev bw across all osd
29# * journal dev qd across all osd
30# * net dev pps across all hosts
31# * net dev bps across all hosts
32
33# Main API's
34# get sensors by pattern
35# allign values to seconds
36# cut ranges for particular test
37# transform into 2d histos (either make histos or rebin them) and clip outliers same time
38
39
40AGG_TAG = 'ALL'
41
42
43def find_nodes_by_roles(rstorage: ResultStorage, node_roles: Iterable[str]) -> List[NodeInfo]:
44 nodes = rstorage.storage.load_list(NodeInfo, 'all_nodes') # type: List[NodeInfo]
45 node_roles_s = set(node_roles)
46 return [node for node in nodes if node.roles.intersection(node_roles_s)]
47
48
49def find_all_sensors(rstorage: ResultStorage,
50 node_roles: Iterable[str],
51 sensor: str,
52 metric: str) -> Iterator[TimeSeries]:
53 all_nodes_rr = "|".join(node.node_id for node in find_nodes_by_roles(rstorage, node_roles))
54 all_nodes_rr = "(?P<node>{})".format(all_nodes_rr)
55
56 for path, ds in rstorage.iter_sensors(all_nodes_rr, sensor=sensor, metric=metric):
57 ts = rstorage.load_sensor(ds)
58
59 # for sensors ts.times is array of pairs - collection_start_at, colelction_finished_at
60 # to make this array consistent with times in load data second item if each pair is dropped
61 ts.times = ts.times[::2]
62 yield ts
63
64
65def find_all_series(rstorage: ResultStorage, suite: SuiteConfig, job: JobConfig, metric: str) -> Iterator[TimeSeries]:
66 "Iterated over selected metric for all nodes for given Suite/job"
67 return rstorage.iter_ts(suite, job, metric=metric)
68
69
70def get_aggregated(rstorage: ResultStorage, suite: SuiteConfig, job: FioJobConfig, metric: str) -> TimeSeries:
71 "Sum selected metric for all nodes for given Suite/job"
72
73 tss = list(find_all_series(rstorage, suite, job, metric))
74
75 if len(tss) == 0:
76 raise NameError("Can't found any TS for {},{},{}".format(suite, job, metric))
77
78 ds = DataSource(suite_id=suite.storage_id,
79 job_id=job.storage_id,
80 node_id=AGG_TAG,
81 sensor='fio',
82 dev=AGG_TAG,
83 metric=metric,
84 tag='csv')
85
86 agg_ts = TimeSeries(metric,
87 raw=None,
88 source=ds,
89 data=numpy.zeros(tss[0].data.shape, dtype=tss[0].data.dtype),
90 times=tss[0].times.copy(),
91 units=tss[0].units,
92 histo_bins=tss[0].histo_bins,
93 time_units=tss[0].time_units)
94
95 for ts in tss:
96 if metric == 'lat' and (len(ts.data.shape) != 2 or ts.data.shape[1] != expected_lat_bins):
97 msg = "Sensor {}.{} on node %s has shape={}. Can only process sensors with shape=[X, {}].".format(
98 ts.source.dev, ts.source.sensor, ts.source.node_id, ts.data.shape, expected_lat_bins)
99 logger.error(msg)
100 raise ValueError(msg)
101
102 if metric != 'lat' and len(ts.data.shape) != 1:
103 msg = "Sensor {}.{} on node {} has shape={}. Can only process 1D sensors.".format(
104 ts.source.dev, ts.source.sensor, ts.source.node_id, ts.data.shape)
105 logger.error(msg)
106 raise ValueError(msg)
107
108 # TODO: match times on different ts
109 agg_ts.data += ts.data
110
111 return agg_ts
112
113
114interpolated_cache = {}
115
116
117def interpolate_ts_on_seconds_border(ts: TimeSeries, nc: bool = False) -> TimeSeries:
118 "Interpolate time series to values on seconds borders"
119
120 if not nc and ts.source.tpl in interpolated_cache:
121 return interpolated_cache[ts.source.tpl]
122
123 assert len(ts.times) == len(ts.data), "Time(={}) and data(={}) sizes doesn't equal for {!s}"\
124 .format(len(ts.times), len(ts.data), ts.source)
125
126 rcoef = 1 / unit_conversion_coef(ts.time_units, 's') # type: Union[int, Fraction]
127
128 if isinstance(rcoef, Fraction):
129 assert rcoef.denominator == 1, "Incorrect conversion coef {!r}".format(rcoef)
130 rcoef = rcoef.numerator
131
132 assert rcoef >= 1 and isinstance(rcoef, int), "Incorrect conversion coef {!r}".format(rcoef)
133 coef = int(rcoef) # make typechecker happy
134
135 # round to seconds border
136 begin = int(ts.times[0] / coef + 1) * coef
137 end = int(ts.times[-1] / coef) * coef
138
139 # current real data time chunk begin time
140 edge_it = iter(ts.times)
141
142 # current real data value
143 val_it = iter(ts.data)
144
145 # result array, cumulative value per second
146 result = numpy.empty([(end - begin) // coef], dtype=ts.data.dtype)
147 idx = 0
148 curr_summ = 0
149
150 # end of current time slot
151 results_cell_ends = begin + coef
152
153 # hack to unify looping
154 real_data_end = next(edge_it)
155 while results_cell_ends <= end:
156 real_data_start = real_data_end
157 real_data_end = next(edge_it)
158 real_val_left = next(val_it)
159
160 # real data "speed" for interval [real_data_start, real_data_end]
161 real_val_ps = float(real_val_left) / (real_data_end - real_data_start)
162
163 while real_data_end >= results_cell_ends and results_cell_ends <= end:
164 # part of current real value, which is fit into current result cell
165 curr_real_chunk = int((results_cell_ends - real_data_start) * real_val_ps)
166
167 # calculate rest of real data for next result cell
168 real_val_left -= curr_real_chunk
169 result[idx] = curr_summ + curr_real_chunk
170 idx += 1
171 curr_summ = 0
172
173 # adjust real data start time
174 real_data_start = results_cell_ends
175 results_cell_ends += coef
176
177 # don't lost any real data
178 curr_summ += real_val_left
179
180 assert idx == len(result), "Wrong output array size - idx(={}) != len(result)(={})".format(idx, len(result))
181
182 res_ts = TimeSeries(ts.name, None, result,
183 times=int(begin // coef) + numpy.arange(idx, dtype=ts.times.dtype),
184 units=ts.units,
185 time_units='s',
186 source=ts.source(),
187 histo_bins=ts.histo_bins)
188
189 if not nc:
190 interpolated_cache[ts.source.tpl] = res_ts
191
192 return res_ts
193
194
195c_interp_func = None
196cdll = None
197
198
199def c_interpolate_ts_on_seconds_border(ts: TimeSeries, nc: bool = False) -> TimeSeries:
200 "Interpolate time series to values on seconds borders"
201
202 if not nc and ts.source.tpl in interpolated_cache:
203 return interpolated_cache[ts.source.tpl]
204
205 assert len(ts.times) == len(ts.data), "Time(={}) and data(={}) sizes doesn't equal for {!s}"\
206 .format(len(ts.times), len(ts.data), ts.source)
207
208 rcoef = 1 / unit_conversion_coef(ts.time_units, 's') # type: Union[int, Fraction]
209
210 if isinstance(rcoef, Fraction):
211 assert rcoef.denominator == 1, "Incorrect conversion coef {!r}".format(rcoef)
212 rcoef = rcoef.numerator
213
214 assert rcoef >= 1 and isinstance(rcoef, int), "Incorrect conversion coef {!r}".format(rcoef)
215 coef = int(rcoef) # make typechecker happy
216
217 global cdll
218 global c_interp_func
219 uint64_p = ctypes.POINTER(ctypes.c_uint64)
220
221 if c_interp_func is None:
222 dirname = os.path.dirname(os.path.dirname(wally.__file__))
223 path = os.path.join(dirname, 'clib', 'libwally.so')
224 cdll = ctypes.CDLL(path)
225 c_interp_func = cdll.interpolate_ts_on_seconds_border_v2
226 c_interp_func.argtypes = [
227 ctypes.c_uint, # input_size
228 ctypes.c_uint, # output_size
229 uint64_p, # times
230 uint64_p, # values
231 ctypes.c_uint, # time_scale_coef
232 uint64_p, # output
233 ]
234 c_interp_func.restype = None
235
236 assert ts.data.dtype.name == 'uint64', "Data dtype for {}=={} != uint64".format(ts.source, ts.data.dtype.name)
237 assert ts.times.dtype.name == 'uint64', "Time dtype for {}=={} != uint64".format(ts.source, ts.times.dtype.name)
238
239 output_sz = int(ts.times[-1]) // coef - int(ts.times[0]) // coef + 2
240 # print("output_sz =", output_sz, "coef =", coef)
241 result = numpy.zeros(output_sz, dtype=ts.data.dtype.name)
242
243 c_interp_func(ts.data.size,
244 output_sz,
245 ts.times.ctypes.data_as(uint64_p),
246 ts.data.ctypes.data_as(uint64_p),
247 coef,
248 result.ctypes.data_as(uint64_p))
249
250 res_ts = TimeSeries(ts.name, None, result,
251 times=int(ts.times[0] // coef) + numpy.arange(output_sz, dtype=ts.times.dtype),
252 units=ts.units,
253 time_units='s',
254 source=ts.source(),
255 histo_bins=ts.histo_bins)
256
257 if not nc:
258 interpolated_cache[ts.source.tpl] = res_ts
259 return res_ts
260
261
262def get_ts_for_time_range(ts: TimeSeries, time_range: Tuple[int, int]) -> TimeSeries:
263 """Return sensor values for given node for given period. Return per second estimated values array
264 Raise an error if required range is not full covered by data in storage"""
265
266 assert ts.time_units == 's', "{} != s for {!s}".format(ts.time_units, ts.source)
267 assert len(ts.times) == len(ts.data), "Time(={}) and data(={}) sizes doesn't equal for {!s}"\
268 .format(len(ts.times), len(ts.data), ts.source)
269
270 if time_range[0] < ts.times[0] or time_range[1] > ts.times[-1]:
271 raise AssertionError(("Incorrect data for get_sensor - time_range={!r}, collected_at=[{}, ..., {}]," +
272 "sensor = {}_{}.{}.{}").format(time_range, ts.times[0], ts.times[-1],
273 ts.source.node_id, ts.source.sensor, ts.source.dev,
274 ts.source.metric))
275 idx1, idx2 = numpy.searchsorted(ts.times, time_range)
276 return TimeSeries(ts.name, None,
277 ts.data[idx1:idx2],
278 times=ts.times[idx1:idx2],
279 units=ts.units,
280 time_units=ts.time_units,
281 source=ts.source,
282 histo_bins=ts.histo_bins)
283
284
285def make_2d_histo(tss: List[TimeSeries],
286 outliers_range: Tuple[float, float] = (0.02, 0.98),
287 bins_count: int = 20,
288 log_bins: bool = False) -> TimeSeries:
289
290 # validate input data
291 for ts in tss:
292 assert len(ts.times) == len(ts.data), "Time(={}) and data(={}) sizes doesn't equal for {!s}"\
293 .format(len(ts.times), len(ts.data), ts.source)
294 assert ts.time_units == 's', "All arrays should have the same data units"
295 assert ts.units == tss[0].units, "All arrays should have the same data units"
296 assert ts.data.shape == tss[0].data.shape, "All arrays should have the same data size"
297 assert len(ts.data.shape) == 1, "All arrays should be 1d"
298
299 whole_arr = numpy.concatenate([ts.data for ts in tss])
300 whole_arr.shape = [len(tss), -1]
301
302 if outliers_range is not None:
303 max_vl, begin, end, min_vl = numpy.percentile(whole_arr,
304 [0, outliers_range[0] * 100, outliers_range[1] * 100, 100])
305 bins_edges = auto_edges2(begin, end, bins=bins_count, log_space=log_bins)
306 fixed_bins_edges = bins_edges.copy()
307 fixed_bins_edges[0] = begin
308 fixed_bins_edges[-1] = end
309 else:
310 begin, end = numpy.percentile(whole_arr, [0, 100])
311 bins_edges = auto_edges2(begin, end, bins=bins_count, log_space=log_bins)
312 fixed_bins_edges = bins_edges
313
314 res_data = numpy.concatenate(numpy.histogram(column, fixed_bins_edges) for column in whole_arr.T)
315 res_data.shape = (len(tss), -1)
316 res = TimeSeries(name=tss[0].name,
317 raw=None,
318 data=res_data,
319 times=tss[0].times,
320 units=tss[0].units,
321 source=tss[0].source,
322 time_units=tss[0].time_units,
323 histo_bins=bins_edges)
324 return res
325
326
327def aggregate_histograms(tss: List[TimeSeries],
328 outliers_range: Tuple[float, float] = (0.02, 0.98),
329 bins_count: int = 20,
330 log_bins: bool = False) -> TimeSeries:
331
332 # validate input data
333 for ts in tss:
334 assert len(ts.times) == len(ts.data), "Need to use stripped time"
335 assert ts.time_units == 's', "All arrays should have the same data units"
336 assert ts.units == tss[0].units, "All arrays should have the same data units"
337 assert ts.data.shape == tss[0].data.shape, "All arrays should have the same data size"
338 assert len(ts.data.shape) == 2, "All arrays should be 2d"
339 assert ts.histo_bins is not None, "All arrays should be 2d"
340
341 whole_arr = numpy.concatenate([ts.data for ts in tss])
342 whole_arr.shape = [len(tss), -1]
343
344 max_val = whole_arr.min()
345 min_val = whole_arr.max()
346
347 if outliers_range is not None:
348 begin, end = numpy.percentile(whole_arr, [outliers_range[0] * 100, outliers_range[1] * 100])
349 else:
350 begin = min_val
351 end = max_val
352
353 bins_edges = auto_edges2(begin, end, bins=bins_count, log_space=log_bins)
354
355 if outliers_range is not None:
356 fixed_bins_edges = bins_edges.copy()
357 fixed_bins_edges[0] = begin
358 fixed_bins_edges[-1] = end
359 else:
360 fixed_bins_edges = bins_edges
361
362 res_data = numpy.concatenate(numpy.histogram(column, fixed_bins_edges) for column in whole_arr.T)
363 res_data.shape = (len(tss), -1)
364 return TimeSeries(name=tss[0].name,
365 raw=None,
366 data=res_data,
367 times=tss[0].times,
368 units=tss[0].units,
369 source=tss[0].source,
370 time_units=tss[0].time_units,
371 histo_bins=fixed_bins_edges)
372
373
374def summ_sensors(rstorage: ResultStorage,
375 roles: List[str],
376 sensor: str,
377 metric: str,
378 time_range: Tuple[int, int]) -> Optional[TimeSeries]:
379
380 res = None # type: Optional[TimeSeries]
381 for node in find_nodes_by_roles(rstorage, roles):
382 for _, ds in rstorage.iter_sensors(node_id=node.node_id, sensor=sensor, metric=metric):
383 data = rstorage.load_sensor(ds)
384 data = c_interpolate_ts_on_seconds_border(data)
385 data = get_ts_for_time_range(data, time_range)
386 if res is None:
387 res = data
388 res.data = res.data.copy()
389 else:
390 res.data += data.data
391 return res
392
393
394def find_sensors_to_2d(rstorage: ResultStorage,
395 roles: List[str],
396 sensor: str,
397 devs: List[str],
398 metric: str,
399 time_range: Tuple[int, int]) -> numpy.ndarray:
400
401 res = [] # type: List[TimeSeries]
402 for node in find_nodes_by_roles(rstorage, roles):
403 for dev in devs:
404 for _, ds in rstorage.iter_sensors(node_id=node.node_id, sensor=sensor, dev=dev, metric=metric):
405 data = rstorage.load_sensor(ds)
406 data = c_interpolate_ts_on_seconds_border(data)
407 data = get_ts_for_time_range(data, time_range)
408 res.append(data.data)
409 res2d = numpy.concatenate(res)
410 res2d.shape = ((len(res), -1))
411 return res2d