fixes, fixes
diff --git a/wally/data_selectors.py b/wally/data_selectors.py
index 66a6ee5..02d5075 100644
--- a/wally/data_selectors.py
+++ b/wally/data_selectors.py
@@ -1,7 +1,7 @@
import ctypes
import logging
import os.path
-from typing import Tuple, List, Iterable, Iterator, Optional, Union
+from typing import Tuple, List, Iterable, Iterator, Optional, Union, Dict
from fractions import Fraction
import numpy
@@ -72,6 +72,20 @@
tss = list(find_all_series(rstorage, suite, job, metric))
+ # TODO replace this with universal interpolator
+ # for ts in tss:
+ # from_s = float(unit_conversion_coef('s', ts.time_units))
+ # prev_time = ts.times[0]
+ # res = [ts.data[0]]
+ #
+ # for ln, (tm, val) in enumerate(zip(ts.times[1:], ts.data[1:]), 1):
+ # assert tm > prev_time, "Failed tm > prev_time, src={}, ln={}".format(ts.source, ln)
+ # while tm - prev_time > from_s * 1.2:
+ # res.append(0)
+ # prev_time += from_s
+ # res.append(val)
+ # prev_time = tm
+
if len(tss) == 0:
raise NameError("Can't found any TS for {},{},{}".format(suite, job, metric))
@@ -106,7 +120,15 @@
raise ValueError(msg)
# TODO: match times on different ts
- agg_ts.data += ts.data
+ if abs(len(agg_ts.data) - len(ts.data)) > 1:
+ # import IPython
+ # IPython.embed()
+ pass
+ assert abs(len(agg_ts.data) - len(ts.data)) <= 1, \
+ "len(agg_ts.data)={}, len(ts.data)={}, need to be almost equals".format(len(agg_ts.data), len(ts.data))
+
+ mlen = min(len(agg_ts.data), len(ts.data))
+ agg_ts.data[:mlen] += ts.data[:mlen]
return agg_ts
@@ -193,13 +215,14 @@
return res_ts
-c_interp_func = None
+c_interp_func_agg = None
c_interp_func_qd = None
+c_interp_func_fio = None
-def c_interpolate_ts_on_seconds_border(ts: TimeSeries, nc: bool = False, qd: bool = False) -> TimeSeries:
+def c_interpolate_ts_on_seconds_border(ts: TimeSeries, nc: bool = False, tp: str = 'agg') -> TimeSeries:
"Interpolate time series to values on seconds borders"
- key = (ts.source.tpl, qd)
+ key = (ts.source.tpl, tp)
if not nc and key in interpolated_cache:
return interpolated_cache[key].copy()
@@ -221,64 +244,86 @@
assert rcoef >= 1 and isinstance(rcoef, int), "Incorrect conversion coef {!r}".format(rcoef)
coef = int(rcoef) # make typechecker happy
- global c_interp_func
+ global c_interp_func_agg
global c_interp_func_qd
+ global c_interp_func_fio
uint64_p = ctypes.POINTER(ctypes.c_uint64)
- if c_interp_func is None:
+ if c_interp_func_agg is None:
dirname = os.path.dirname(os.path.dirname(wally.__file__))
path = os.path.join(dirname, 'clib', 'libwally.so')
cdll = ctypes.CDLL(path)
- c_interp_func = cdll.interpolate_ts_on_seconds_border
- c_interp_func.argtypes = [
- ctypes.c_uint, # input_size
- ctypes.c_uint, # output_size
- uint64_p, # times
- uint64_p, # values
- ctypes.c_uint, # time_scale_coef
- uint64_p, # output
- ]
- c_interp_func.restype = None
-
+ c_interp_func_agg = cdll.interpolate_ts_on_seconds_border
c_interp_func_qd = cdll.interpolate_ts_on_seconds_border_qd
- c_interp_func_qd.argtypes = [
- ctypes.c_uint, # input_size
- ctypes.c_uint, # output_size
- uint64_p, # times
- uint64_p, # values
- ctypes.c_uint, # time_scale_coef
- uint64_p, # output
- ]
- c_interp_func_qd.restype = ctypes.c_uint
+
+ for func in (c_interp_func_agg, c_interp_func_qd):
+ func.argtypes = [
+ ctypes.c_uint, # input_size
+ ctypes.c_uint, # output_size
+ uint64_p, # times
+ uint64_p, # values
+ ctypes.c_uint, # time_scale_coef
+ uint64_p, # output
+ ]
+ func.restype = ctypes.c_uint # output array used size
+
+ c_interp_func_fio = cdll.interpolate_ts_on_seconds_border_fio
+ c_interp_func_fio.restype = ctypes.c_int
+ c_interp_func_fio.argtypes = [
+ ctypes.c_uint, # input_size
+ ctypes.c_uint, # output_size
+ uint64_p, # times
+ ctypes.c_uint, # time_scale_coef
+ uint64_p, # output indexes
+ ctypes.c_uint64, # empty placeholder
+ ]
assert ts.data.dtype.name == 'uint64', "Data dtype for {}=={} != uint64".format(ts.source, ts.data.dtype.name)
assert ts.times.dtype.name == 'uint64', "Time dtype for {}=={} != uint64".format(ts.source, ts.times.dtype.name)
output_sz = int(ts.times[-1]) // coef - int(ts.times[0]) // coef + 2
- # print("output_sz =", output_sz, "coef =", coef)
result = numpy.zeros(output_sz, dtype=ts.data.dtype.name)
- if qd:
- func = c_interp_func_qd
- else:
- func = c_interp_func
+ if tp in ('qd', 'agg'):
+ func = c_interp_func_qd if tp == 'qd' else c_interp_func_agg
+ sz = func(ts.data.size,
+ output_sz,
+ ts.times.ctypes.data_as(uint64_p),
+ ts.data.ctypes.data_as(uint64_p),
+ coef,
+ result.ctypes.data_as(uint64_p))
- sz = func(ts.data.size,
- output_sz,
- ts.times.ctypes.data_as(uint64_p),
- ts.data.ctypes.data_as(uint64_p),
- coef,
- result.ctypes.data_as(uint64_p))
-
- if qd:
result = result[:sz]
output_sz = sz
- else:
- assert sz is None
- rtimes = int(ts.times[0] // coef) + numpy.arange(output_sz, dtype=ts.times.dtype)
+ rtimes = int(ts.times[0] // coef) + numpy.arange(output_sz, dtype=ts.times.dtype)
+ else:
+ assert tp == 'fio'
+ ridx = numpy.zeros(output_sz, dtype=ts.times.dtype)
+ no_data = (output_sz + 1)
+ sz_or_err = c_interp_func_fio(ts.times.size,
+ output_sz,
+ ts.times.ctypes.data_as(uint64_p),
+ coef,
+ ridx.ctypes.data_as(uint64_p),
+ no_data)
+
+ if sz_or_err <= 0:
+ raise ValueError("Error in input array at index %s. %s", -sz_or_err, ts.source)
+
+ rtimes = int(ts.times[0] // coef) + numpy.arange(sz_or_err, dtype=ts.times.dtype)
+
+ empty = numpy.zeros(len(ts.histo_bins), dtype=ts.data.dtype) if ts.source.metric == 'lat' else 0
+ res = []
+ for idx in ridx[:sz_or_err]:
+ if idx == no_data:
+ res.append(empty)
+ else:
+ res.append(ts.data[idx])
+ result = numpy.array(res, dtype=ts.data.dtype)
+
res_ts = TimeSeries(ts.name, None, result,
times=rtimes,
units=ts.units,
@@ -405,26 +450,38 @@
qd_metrics = {'io_queue'}
+summ_sensors_cache = {} # type: Dict[Tuple[Tuple[str, ...], str, str, Tuple[int, int], int], Optional[TimeSeries]]
def summ_sensors(rstorage: ResultStorage,
roles: List[str],
sensor: str,
metric: str,
- time_range: Tuple[int, int]) -> Optional[TimeSeries]:
+ time_range: Tuple[int, int],
+ nc: bool = False) -> Optional[TimeSeries]:
+
+ key = (tuple(roles), sensor, metric, time_range, id(ResultStorage))
+ if not nc and key in summ_sensors_cache:
+ return summ_sensors_cache[key].copy()
res = None # type: Optional[TimeSeries]
for node in find_nodes_by_roles(rstorage, roles):
for _, ds in rstorage.iter_sensors(node_id=node.node_id, sensor=sensor, metric=metric):
data = rstorage.load_sensor(ds)
- data = c_interpolate_ts_on_seconds_border(data, qd=metric in qd_metrics)
+ data = c_interpolate_ts_on_seconds_border(data, 'qd' if metric in qd_metrics else 'agg')
data = get_ts_for_time_range(data, time_range)
if res is None:
res = data
res.data = res.data.copy()
else:
res.data += data.data
- return res
+
+ if not nc:
+ summ_sensors_cache[key] = res
+ if len(summ_sensors_cache) > 1024:
+ logger.warning("summ_sensors_cache cache too large %s > 1024", len(summ_sensors_cache))
+
+ return res if res is None else res.copy()
def find_sensors_to_2d(rstorage: ResultStorage,
@@ -439,7 +496,7 @@
for dev in devs:
for _, ds in rstorage.iter_sensors(node_id=node.node_id, sensor=sensor, dev=dev, metric=metric):
data = rstorage.load_sensor(ds)
- data = c_interpolate_ts_on_seconds_border(data, qd=metric in qd_metrics)
+ data = c_interpolate_ts_on_seconds_border(data, 'qd' if metric in qd_metrics else 'agg')
data = get_ts_for_time_range(data, time_range)
res.append(data.data)
res2d = numpy.concatenate(res)