fix bugs in c code, update interpolation, etc
diff --git a/wally/data_selectors.py b/wally/data_selectors.py
index 53b822b..66a6ee5 100644
--- a/wally/data_selectors.py
+++ b/wally/data_selectors.py
@@ -116,6 +116,7 @@
def interpolate_ts_on_seconds_border(ts: TimeSeries, nc: bool = False) -> TimeSeries:
"Interpolate time series to values on seconds borders"
+ logging.warning("This implementation of interpolate_ts_on_seconds_border is deplricated and should be updated")
if not nc and ts.source.tpl in interpolated_cache:
return interpolated_cache[ts.source.tpl]
@@ -193,14 +194,20 @@
c_interp_func = None
-cdll = None
+c_interp_func_qd = None
-def c_interpolate_ts_on_seconds_border(ts: TimeSeries, nc: bool = False) -> TimeSeries:
+def c_interpolate_ts_on_seconds_border(ts: TimeSeries, nc: bool = False, qd: bool = False) -> TimeSeries:
"Interpolate time series to values on seconds borders"
+ key = (ts.source.tpl, qd)
+ if not nc and key in interpolated_cache:
+ return interpolated_cache[key].copy()
- if not nc and ts.source.tpl in interpolated_cache:
- return interpolated_cache[ts.source.tpl]
+ # both data and times must be 1d compact arrays
+ assert len(ts.data.strides) == 1, "ts.data.strides must be 1D, not " + repr(ts.data.strides)
+ assert ts.data.dtype.itemsize == ts.data.strides[0], "ts.data array must be compact"
+ assert len(ts.times.strides) == 1, "ts.times.strides must be 1D, not " + repr(ts.times.strides)
+ assert ts.times.dtype.itemsize == ts.times.strides[0], "ts.times array must be compact"
assert len(ts.times) == len(ts.data), "Time(={}) and data(={}) sizes doesn't equal for {!s}"\
.format(len(ts.times), len(ts.data), ts.source)
@@ -214,15 +221,17 @@
assert rcoef >= 1 and isinstance(rcoef, int), "Incorrect conversion coef {!r}".format(rcoef)
coef = int(rcoef) # make typechecker happy
- global cdll
global c_interp_func
+ global c_interp_func_qd
+
uint64_p = ctypes.POINTER(ctypes.c_uint64)
if c_interp_func is None:
dirname = os.path.dirname(os.path.dirname(wally.__file__))
path = os.path.join(dirname, 'clib', 'libwally.so')
cdll = ctypes.CDLL(path)
- c_interp_func = cdll.interpolate_ts_on_seconds_border_v2
+
+ c_interp_func = cdll.interpolate_ts_on_seconds_border
c_interp_func.argtypes = [
ctypes.c_uint, # input_size
ctypes.c_uint, # output_size
@@ -233,6 +242,17 @@
]
c_interp_func.restype = None
+ c_interp_func_qd = cdll.interpolate_ts_on_seconds_border_qd
+ c_interp_func_qd.argtypes = [
+ ctypes.c_uint, # input_size
+ ctypes.c_uint, # output_size
+ uint64_p, # times
+ uint64_p, # values
+ ctypes.c_uint, # time_scale_coef
+ uint64_p, # output
+ ]
+ c_interp_func_qd.restype = ctypes.c_uint
+
assert ts.data.dtype.name == 'uint64', "Data dtype for {}=={} != uint64".format(ts.source, ts.data.dtype.name)
assert ts.times.dtype.name == 'uint64', "Time dtype for {}=={} != uint64".format(ts.source, ts.times.dtype.name)
@@ -240,22 +260,35 @@
# print("output_sz =", output_sz, "coef =", coef)
result = numpy.zeros(output_sz, dtype=ts.data.dtype.name)
- c_interp_func(ts.data.size,
- output_sz,
- ts.times.ctypes.data_as(uint64_p),
- ts.data.ctypes.data_as(uint64_p),
- coef,
- result.ctypes.data_as(uint64_p))
+ if qd:
+ func = c_interp_func_qd
+ else:
+ func = c_interp_func
+ sz = func(ts.data.size,
+ output_sz,
+ ts.times.ctypes.data_as(uint64_p),
+ ts.data.ctypes.data_as(uint64_p),
+ coef,
+ result.ctypes.data_as(uint64_p))
+
+ if qd:
+ result = result[:sz]
+ output_sz = sz
+ else:
+ assert sz is None
+
+ rtimes = int(ts.times[0] // coef) + numpy.arange(output_sz, dtype=ts.times.dtype)
res_ts = TimeSeries(ts.name, None, result,
- times=int(ts.times[0] // coef) + numpy.arange(output_sz, dtype=ts.times.dtype),
+ times=rtimes,
units=ts.units,
time_units='s',
source=ts.source(),
histo_bins=ts.histo_bins)
if not nc:
- interpolated_cache[ts.source.tpl] = res_ts
+ interpolated_cache[ts.source.tpl] = res_ts.copy()
+
return res_ts
@@ -371,6 +404,9 @@
histo_bins=fixed_bins_edges)
+qd_metrics = {'io_queue'}
+
+
def summ_sensors(rstorage: ResultStorage,
roles: List[str],
sensor: str,
@@ -381,7 +417,7 @@
for node in find_nodes_by_roles(rstorage, roles):
for _, ds in rstorage.iter_sensors(node_id=node.node_id, sensor=sensor, metric=metric):
data = rstorage.load_sensor(ds)
- data = c_interpolate_ts_on_seconds_border(data)
+ data = c_interpolate_ts_on_seconds_border(data, qd=metric in qd_metrics)
data = get_ts_for_time_range(data, time_range)
if res is None:
res = data
@@ -403,7 +439,7 @@
for dev in devs:
for _, ds in rstorage.iter_sensors(node_id=node.node_id, sensor=sensor, dev=dev, metric=metric):
data = rstorage.load_sensor(ds)
- data = c_interpolate_ts_on_seconds_border(data)
+ data = c_interpolate_ts_on_seconds_border(data, qd=metric in qd_metrics)
data = get_ts_for_time_range(data, time_range)
res.append(data.data)
res2d = numpy.concatenate(res)