blob: 02d507536d9cfda0db322c06eefe5ea2ebd0e816 [file] [log] [blame]
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +03001import ctypes
2import logging
3import os.path
kdanylov aka koder736e5c12017-05-07 17:27:14 +03004from typing import Tuple, List, Iterable, Iterator, Optional, Union, Dict
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +03005from fractions import Fraction
6
7import numpy
8
9from cephlib.numeric import auto_edges2
10
11import wally
12from .hlstorage import ResultStorage
13from .node_interfaces import NodeInfo
14from .result_classes import DataSource, TimeSeries, SuiteConfig, JobConfig
15from .suits.io.fio import FioJobConfig
16from .suits.io.fio_hist import expected_lat_bins
17from .utils import unit_conversion_coef
18
19
20logger = logging.getLogger("wally")
21
22# Separately for each test heatmaps & agg acroos whole time histos:
23# * fio latency heatmap for all instances
24# * data dev iops across all osd
25# * data dev bw across all osd
26# * date dev qd across all osd
27# * journal dev iops across all osd
28# * journal dev bw across all osd
29# * journal dev qd across all osd
30# * net dev pps across all hosts
31# * net dev bps across all hosts
32
33# Main API's
34# get sensors by pattern
35# allign values to seconds
36# cut ranges for particular test
37# transform into 2d histos (either make histos or rebin them) and clip outliers same time
38
39
40AGG_TAG = 'ALL'
41
42
43def find_nodes_by_roles(rstorage: ResultStorage, node_roles: Iterable[str]) -> List[NodeInfo]:
44 nodes = rstorage.storage.load_list(NodeInfo, 'all_nodes') # type: List[NodeInfo]
45 node_roles_s = set(node_roles)
46 return [node for node in nodes if node.roles.intersection(node_roles_s)]
47
48
49def find_all_sensors(rstorage: ResultStorage,
50 node_roles: Iterable[str],
51 sensor: str,
52 metric: str) -> Iterator[TimeSeries]:
53 all_nodes_rr = "|".join(node.node_id for node in find_nodes_by_roles(rstorage, node_roles))
54 all_nodes_rr = "(?P<node>{})".format(all_nodes_rr)
55
56 for path, ds in rstorage.iter_sensors(all_nodes_rr, sensor=sensor, metric=metric):
57 ts = rstorage.load_sensor(ds)
58
59 # for sensors ts.times is array of pairs - collection_start_at, colelction_finished_at
60 # to make this array consistent with times in load data second item if each pair is dropped
61 ts.times = ts.times[::2]
62 yield ts
63
64
65def find_all_series(rstorage: ResultStorage, suite: SuiteConfig, job: JobConfig, metric: str) -> Iterator[TimeSeries]:
66 "Iterated over selected metric for all nodes for given Suite/job"
67 return rstorage.iter_ts(suite, job, metric=metric)
68
69
70def get_aggregated(rstorage: ResultStorage, suite: SuiteConfig, job: FioJobConfig, metric: str) -> TimeSeries:
71 "Sum selected metric for all nodes for given Suite/job"
72
73 tss = list(find_all_series(rstorage, suite, job, metric))
74
kdanylov aka koder736e5c12017-05-07 17:27:14 +030075 # TODO replace this with universal interpolator
76 # for ts in tss:
77 # from_s = float(unit_conversion_coef('s', ts.time_units))
78 # prev_time = ts.times[0]
79 # res = [ts.data[0]]
80 #
81 # for ln, (tm, val) in enumerate(zip(ts.times[1:], ts.data[1:]), 1):
82 # assert tm > prev_time, "Failed tm > prev_time, src={}, ln={}".format(ts.source, ln)
83 # while tm - prev_time > from_s * 1.2:
84 # res.append(0)
85 # prev_time += from_s
86 # res.append(val)
87 # prev_time = tm
88
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +030089 if len(tss) == 0:
90 raise NameError("Can't found any TS for {},{},{}".format(suite, job, metric))
91
92 ds = DataSource(suite_id=suite.storage_id,
93 job_id=job.storage_id,
94 node_id=AGG_TAG,
95 sensor='fio',
96 dev=AGG_TAG,
97 metric=metric,
98 tag='csv')
99
100 agg_ts = TimeSeries(metric,
101 raw=None,
102 source=ds,
103 data=numpy.zeros(tss[0].data.shape, dtype=tss[0].data.dtype),
104 times=tss[0].times.copy(),
105 units=tss[0].units,
106 histo_bins=tss[0].histo_bins,
107 time_units=tss[0].time_units)
108
109 for ts in tss:
110 if metric == 'lat' and (len(ts.data.shape) != 2 or ts.data.shape[1] != expected_lat_bins):
111 msg = "Sensor {}.{} on node %s has shape={}. Can only process sensors with shape=[X, {}].".format(
112 ts.source.dev, ts.source.sensor, ts.source.node_id, ts.data.shape, expected_lat_bins)
113 logger.error(msg)
114 raise ValueError(msg)
115
116 if metric != 'lat' and len(ts.data.shape) != 1:
117 msg = "Sensor {}.{} on node {} has shape={}. Can only process 1D sensors.".format(
118 ts.source.dev, ts.source.sensor, ts.source.node_id, ts.data.shape)
119 logger.error(msg)
120 raise ValueError(msg)
121
122 # TODO: match times on different ts
kdanylov aka koder736e5c12017-05-07 17:27:14 +0300123 if abs(len(agg_ts.data) - len(ts.data)) > 1:
124 # import IPython
125 # IPython.embed()
126 pass
127 assert abs(len(agg_ts.data) - len(ts.data)) <= 1, \
128 "len(agg_ts.data)={}, len(ts.data)={}, need to be almost equals".format(len(agg_ts.data), len(ts.data))
129
130 mlen = min(len(agg_ts.data), len(ts.data))
131 agg_ts.data[:mlen] += ts.data[:mlen]
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300132
133 return agg_ts
134
135
136interpolated_cache = {}
137
138
139def interpolate_ts_on_seconds_border(ts: TimeSeries, nc: bool = False) -> TimeSeries:
140 "Interpolate time series to values on seconds borders"
kdanylov aka koder45183182017-04-30 23:55:40 +0300141 logging.warning("This implementation of interpolate_ts_on_seconds_border is deplricated and should be updated")
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300142
143 if not nc and ts.source.tpl in interpolated_cache:
144 return interpolated_cache[ts.source.tpl]
145
146 assert len(ts.times) == len(ts.data), "Time(={}) and data(={}) sizes doesn't equal for {!s}"\
147 .format(len(ts.times), len(ts.data), ts.source)
148
149 rcoef = 1 / unit_conversion_coef(ts.time_units, 's') # type: Union[int, Fraction]
150
151 if isinstance(rcoef, Fraction):
152 assert rcoef.denominator == 1, "Incorrect conversion coef {!r}".format(rcoef)
153 rcoef = rcoef.numerator
154
155 assert rcoef >= 1 and isinstance(rcoef, int), "Incorrect conversion coef {!r}".format(rcoef)
156 coef = int(rcoef) # make typechecker happy
157
158 # round to seconds border
159 begin = int(ts.times[0] / coef + 1) * coef
160 end = int(ts.times[-1] / coef) * coef
161
162 # current real data time chunk begin time
163 edge_it = iter(ts.times)
164
165 # current real data value
166 val_it = iter(ts.data)
167
168 # result array, cumulative value per second
169 result = numpy.empty([(end - begin) // coef], dtype=ts.data.dtype)
170 idx = 0
171 curr_summ = 0
172
173 # end of current time slot
174 results_cell_ends = begin + coef
175
176 # hack to unify looping
177 real_data_end = next(edge_it)
178 while results_cell_ends <= end:
179 real_data_start = real_data_end
180 real_data_end = next(edge_it)
181 real_val_left = next(val_it)
182
183 # real data "speed" for interval [real_data_start, real_data_end]
184 real_val_ps = float(real_val_left) / (real_data_end - real_data_start)
185
186 while real_data_end >= results_cell_ends and results_cell_ends <= end:
187 # part of current real value, which is fit into current result cell
188 curr_real_chunk = int((results_cell_ends - real_data_start) * real_val_ps)
189
190 # calculate rest of real data for next result cell
191 real_val_left -= curr_real_chunk
192 result[idx] = curr_summ + curr_real_chunk
193 idx += 1
194 curr_summ = 0
195
196 # adjust real data start time
197 real_data_start = results_cell_ends
198 results_cell_ends += coef
199
200 # don't lost any real data
201 curr_summ += real_val_left
202
203 assert idx == len(result), "Wrong output array size - idx(={}) != len(result)(={})".format(idx, len(result))
204
205 res_ts = TimeSeries(ts.name, None, result,
206 times=int(begin // coef) + numpy.arange(idx, dtype=ts.times.dtype),
207 units=ts.units,
208 time_units='s',
209 source=ts.source(),
210 histo_bins=ts.histo_bins)
211
212 if not nc:
213 interpolated_cache[ts.source.tpl] = res_ts
214
215 return res_ts
216
217
kdanylov aka koder736e5c12017-05-07 17:27:14 +0300218c_interp_func_agg = None
kdanylov aka koder45183182017-04-30 23:55:40 +0300219c_interp_func_qd = None
kdanylov aka koder736e5c12017-05-07 17:27:14 +0300220c_interp_func_fio = None
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300221
222
kdanylov aka koder736e5c12017-05-07 17:27:14 +0300223def c_interpolate_ts_on_seconds_border(ts: TimeSeries, nc: bool = False, tp: str = 'agg') -> TimeSeries:
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300224 "Interpolate time series to values on seconds borders"
kdanylov aka koder736e5c12017-05-07 17:27:14 +0300225 key = (ts.source.tpl, tp)
kdanylov aka koder45183182017-04-30 23:55:40 +0300226 if not nc and key in interpolated_cache:
227 return interpolated_cache[key].copy()
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300228
kdanylov aka koder45183182017-04-30 23:55:40 +0300229 # both data and times must be 1d compact arrays
230 assert len(ts.data.strides) == 1, "ts.data.strides must be 1D, not " + repr(ts.data.strides)
231 assert ts.data.dtype.itemsize == ts.data.strides[0], "ts.data array must be compact"
232 assert len(ts.times.strides) == 1, "ts.times.strides must be 1D, not " + repr(ts.times.strides)
233 assert ts.times.dtype.itemsize == ts.times.strides[0], "ts.times array must be compact"
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300234
235 assert len(ts.times) == len(ts.data), "Time(={}) and data(={}) sizes doesn't equal for {!s}"\
236 .format(len(ts.times), len(ts.data), ts.source)
237
238 rcoef = 1 / unit_conversion_coef(ts.time_units, 's') # type: Union[int, Fraction]
239
240 if isinstance(rcoef, Fraction):
241 assert rcoef.denominator == 1, "Incorrect conversion coef {!r}".format(rcoef)
242 rcoef = rcoef.numerator
243
244 assert rcoef >= 1 and isinstance(rcoef, int), "Incorrect conversion coef {!r}".format(rcoef)
245 coef = int(rcoef) # make typechecker happy
246
kdanylov aka koder736e5c12017-05-07 17:27:14 +0300247 global c_interp_func_agg
kdanylov aka koder45183182017-04-30 23:55:40 +0300248 global c_interp_func_qd
kdanylov aka koder736e5c12017-05-07 17:27:14 +0300249 global c_interp_func_fio
kdanylov aka koder45183182017-04-30 23:55:40 +0300250
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300251 uint64_p = ctypes.POINTER(ctypes.c_uint64)
252
kdanylov aka koder736e5c12017-05-07 17:27:14 +0300253 if c_interp_func_agg is None:
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300254 dirname = os.path.dirname(os.path.dirname(wally.__file__))
255 path = os.path.join(dirname, 'clib', 'libwally.so')
256 cdll = ctypes.CDLL(path)
kdanylov aka koder45183182017-04-30 23:55:40 +0300257
kdanylov aka koder736e5c12017-05-07 17:27:14 +0300258 c_interp_func_agg = cdll.interpolate_ts_on_seconds_border
kdanylov aka koder45183182017-04-30 23:55:40 +0300259 c_interp_func_qd = cdll.interpolate_ts_on_seconds_border_qd
kdanylov aka koder736e5c12017-05-07 17:27:14 +0300260
261 for func in (c_interp_func_agg, c_interp_func_qd):
262 func.argtypes = [
263 ctypes.c_uint, # input_size
264 ctypes.c_uint, # output_size
265 uint64_p, # times
266 uint64_p, # values
267 ctypes.c_uint, # time_scale_coef
268 uint64_p, # output
269 ]
270 func.restype = ctypes.c_uint # output array used size
271
272 c_interp_func_fio = cdll.interpolate_ts_on_seconds_border_fio
273 c_interp_func_fio.restype = ctypes.c_int
274 c_interp_func_fio.argtypes = [
275 ctypes.c_uint, # input_size
276 ctypes.c_uint, # output_size
277 uint64_p, # times
278 ctypes.c_uint, # time_scale_coef
279 uint64_p, # output indexes
280 ctypes.c_uint64, # empty placeholder
281 ]
kdanylov aka koder45183182017-04-30 23:55:40 +0300282
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300283 assert ts.data.dtype.name == 'uint64', "Data dtype for {}=={} != uint64".format(ts.source, ts.data.dtype.name)
284 assert ts.times.dtype.name == 'uint64', "Time dtype for {}=={} != uint64".format(ts.source, ts.times.dtype.name)
285
286 output_sz = int(ts.times[-1]) // coef - int(ts.times[0]) // coef + 2
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300287 result = numpy.zeros(output_sz, dtype=ts.data.dtype.name)
288
kdanylov aka koder736e5c12017-05-07 17:27:14 +0300289 if tp in ('qd', 'agg'):
290 func = c_interp_func_qd if tp == 'qd' else c_interp_func_agg
291 sz = func(ts.data.size,
292 output_sz,
293 ts.times.ctypes.data_as(uint64_p),
294 ts.data.ctypes.data_as(uint64_p),
295 coef,
296 result.ctypes.data_as(uint64_p))
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300297
kdanylov aka koder45183182017-04-30 23:55:40 +0300298 result = result[:sz]
299 output_sz = sz
kdanylov aka koder45183182017-04-30 23:55:40 +0300300
kdanylov aka koder736e5c12017-05-07 17:27:14 +0300301 rtimes = int(ts.times[0] // coef) + numpy.arange(output_sz, dtype=ts.times.dtype)
302 else:
303 assert tp == 'fio'
304 ridx = numpy.zeros(output_sz, dtype=ts.times.dtype)
305 no_data = (output_sz + 1)
306 sz_or_err = c_interp_func_fio(ts.times.size,
307 output_sz,
308 ts.times.ctypes.data_as(uint64_p),
309 coef,
310 ridx.ctypes.data_as(uint64_p),
311 no_data)
312
313 if sz_or_err <= 0:
314 raise ValueError("Error in input array at index %s. %s", -sz_or_err, ts.source)
315
316 rtimes = int(ts.times[0] // coef) + numpy.arange(sz_or_err, dtype=ts.times.dtype)
317
318 empty = numpy.zeros(len(ts.histo_bins), dtype=ts.data.dtype) if ts.source.metric == 'lat' else 0
319 res = []
320 for idx in ridx[:sz_or_err]:
321 if idx == no_data:
322 res.append(empty)
323 else:
324 res.append(ts.data[idx])
325 result = numpy.array(res, dtype=ts.data.dtype)
326
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300327 res_ts = TimeSeries(ts.name, None, result,
kdanylov aka koder45183182017-04-30 23:55:40 +0300328 times=rtimes,
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300329 units=ts.units,
330 time_units='s',
331 source=ts.source(),
332 histo_bins=ts.histo_bins)
333
334 if not nc:
kdanylov aka koder45183182017-04-30 23:55:40 +0300335 interpolated_cache[ts.source.tpl] = res_ts.copy()
336
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300337 return res_ts
338
339
340def get_ts_for_time_range(ts: TimeSeries, time_range: Tuple[int, int]) -> TimeSeries:
341 """Return sensor values for given node for given period. Return per second estimated values array
342 Raise an error if required range is not full covered by data in storage"""
343
344 assert ts.time_units == 's', "{} != s for {!s}".format(ts.time_units, ts.source)
345 assert len(ts.times) == len(ts.data), "Time(={}) and data(={}) sizes doesn't equal for {!s}"\
346 .format(len(ts.times), len(ts.data), ts.source)
347
348 if time_range[0] < ts.times[0] or time_range[1] > ts.times[-1]:
349 raise AssertionError(("Incorrect data for get_sensor - time_range={!r}, collected_at=[{}, ..., {}]," +
350 "sensor = {}_{}.{}.{}").format(time_range, ts.times[0], ts.times[-1],
351 ts.source.node_id, ts.source.sensor, ts.source.dev,
352 ts.source.metric))
353 idx1, idx2 = numpy.searchsorted(ts.times, time_range)
354 return TimeSeries(ts.name, None,
355 ts.data[idx1:idx2],
356 times=ts.times[idx1:idx2],
357 units=ts.units,
358 time_units=ts.time_units,
359 source=ts.source,
360 histo_bins=ts.histo_bins)
361
362
363def make_2d_histo(tss: List[TimeSeries],
364 outliers_range: Tuple[float, float] = (0.02, 0.98),
365 bins_count: int = 20,
366 log_bins: bool = False) -> TimeSeries:
367
368 # validate input data
369 for ts in tss:
370 assert len(ts.times) == len(ts.data), "Time(={}) and data(={}) sizes doesn't equal for {!s}"\
371 .format(len(ts.times), len(ts.data), ts.source)
372 assert ts.time_units == 's', "All arrays should have the same data units"
373 assert ts.units == tss[0].units, "All arrays should have the same data units"
374 assert ts.data.shape == tss[0].data.shape, "All arrays should have the same data size"
375 assert len(ts.data.shape) == 1, "All arrays should be 1d"
376
377 whole_arr = numpy.concatenate([ts.data for ts in tss])
378 whole_arr.shape = [len(tss), -1]
379
380 if outliers_range is not None:
381 max_vl, begin, end, min_vl = numpy.percentile(whole_arr,
382 [0, outliers_range[0] * 100, outliers_range[1] * 100, 100])
383 bins_edges = auto_edges2(begin, end, bins=bins_count, log_space=log_bins)
384 fixed_bins_edges = bins_edges.copy()
385 fixed_bins_edges[0] = begin
386 fixed_bins_edges[-1] = end
387 else:
388 begin, end = numpy.percentile(whole_arr, [0, 100])
389 bins_edges = auto_edges2(begin, end, bins=bins_count, log_space=log_bins)
390 fixed_bins_edges = bins_edges
391
392 res_data = numpy.concatenate(numpy.histogram(column, fixed_bins_edges) for column in whole_arr.T)
393 res_data.shape = (len(tss), -1)
394 res = TimeSeries(name=tss[0].name,
395 raw=None,
396 data=res_data,
397 times=tss[0].times,
398 units=tss[0].units,
399 source=tss[0].source,
400 time_units=tss[0].time_units,
401 histo_bins=bins_edges)
402 return res
403
404
405def aggregate_histograms(tss: List[TimeSeries],
406 outliers_range: Tuple[float, float] = (0.02, 0.98),
407 bins_count: int = 20,
408 log_bins: bool = False) -> TimeSeries:
409
410 # validate input data
411 for ts in tss:
412 assert len(ts.times) == len(ts.data), "Need to use stripped time"
413 assert ts.time_units == 's', "All arrays should have the same data units"
414 assert ts.units == tss[0].units, "All arrays should have the same data units"
415 assert ts.data.shape == tss[0].data.shape, "All arrays should have the same data size"
416 assert len(ts.data.shape) == 2, "All arrays should be 2d"
417 assert ts.histo_bins is not None, "All arrays should be 2d"
418
419 whole_arr = numpy.concatenate([ts.data for ts in tss])
420 whole_arr.shape = [len(tss), -1]
421
422 max_val = whole_arr.min()
423 min_val = whole_arr.max()
424
425 if outliers_range is not None:
426 begin, end = numpy.percentile(whole_arr, [outliers_range[0] * 100, outliers_range[1] * 100])
427 else:
428 begin = min_val
429 end = max_val
430
431 bins_edges = auto_edges2(begin, end, bins=bins_count, log_space=log_bins)
432
433 if outliers_range is not None:
434 fixed_bins_edges = bins_edges.copy()
435 fixed_bins_edges[0] = begin
436 fixed_bins_edges[-1] = end
437 else:
438 fixed_bins_edges = bins_edges
439
440 res_data = numpy.concatenate(numpy.histogram(column, fixed_bins_edges) for column in whole_arr.T)
441 res_data.shape = (len(tss), -1)
442 return TimeSeries(name=tss[0].name,
443 raw=None,
444 data=res_data,
445 times=tss[0].times,
446 units=tss[0].units,
447 source=tss[0].source,
448 time_units=tss[0].time_units,
449 histo_bins=fixed_bins_edges)
450
451
kdanylov aka koder45183182017-04-30 23:55:40 +0300452qd_metrics = {'io_queue'}
kdanylov aka koder736e5c12017-05-07 17:27:14 +0300453summ_sensors_cache = {} # type: Dict[Tuple[Tuple[str, ...], str, str, Tuple[int, int], int], Optional[TimeSeries]]
kdanylov aka koder45183182017-04-30 23:55:40 +0300454
455
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300456def summ_sensors(rstorage: ResultStorage,
457 roles: List[str],
458 sensor: str,
459 metric: str,
kdanylov aka koder736e5c12017-05-07 17:27:14 +0300460 time_range: Tuple[int, int],
461 nc: bool = False) -> Optional[TimeSeries]:
462
463 key = (tuple(roles), sensor, metric, time_range, id(ResultStorage))
464 if not nc and key in summ_sensors_cache:
465 return summ_sensors_cache[key].copy()
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300466
467 res = None # type: Optional[TimeSeries]
468 for node in find_nodes_by_roles(rstorage, roles):
469 for _, ds in rstorage.iter_sensors(node_id=node.node_id, sensor=sensor, metric=metric):
470 data = rstorage.load_sensor(ds)
kdanylov aka koder736e5c12017-05-07 17:27:14 +0300471 data = c_interpolate_ts_on_seconds_border(data, 'qd' if metric in qd_metrics else 'agg')
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300472 data = get_ts_for_time_range(data, time_range)
473 if res is None:
474 res = data
475 res.data = res.data.copy()
476 else:
477 res.data += data.data
kdanylov aka koder736e5c12017-05-07 17:27:14 +0300478
479 if not nc:
480 summ_sensors_cache[key] = res
481 if len(summ_sensors_cache) > 1024:
482 logger.warning("summ_sensors_cache cache too large %s > 1024", len(summ_sensors_cache))
483
484 return res if res is None else res.copy()
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300485
486
487def find_sensors_to_2d(rstorage: ResultStorage,
488 roles: List[str],
489 sensor: str,
490 devs: List[str],
491 metric: str,
492 time_range: Tuple[int, int]) -> numpy.ndarray:
493
494 res = [] # type: List[TimeSeries]
495 for node in find_nodes_by_roles(rstorage, roles):
496 for dev in devs:
497 for _, ds in rstorage.iter_sensors(node_id=node.node_id, sensor=sensor, dev=dev, metric=metric):
498 data = rstorage.load_sensor(ds)
kdanylov aka koder736e5c12017-05-07 17:27:14 +0300499 data = c_interpolate_ts_on_seconds_border(data, 'qd' if metric in qd_metrics else 'agg')
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +0300500 data = get_ts_for_time_range(data, time_range)
501 res.append(data.data)
502 res2d = numpy.concatenate(res)
503 res2d.shape = ((len(res), -1))
504 return res2d