fix bugs in c code, update interpolation, etc
diff --git a/tests/test_math.py b/tests/test_math.py
index d9c9688..9298792 100644
--- a/tests/test_math.py
+++ b/tests/test_math.py
@@ -1,13 +1,33 @@
import numpy
from wally.statistic import rebin_histogram
from wally.result_classes import DataSource, TimeSeries
-from wally.data_selectors import interpolate_ts_on_seconds_border, c_interpolate_ts_on_seconds_border
+from wally.data_selectors import c_interpolate_ts_on_seconds_border
+from wally.utils import unit_conversion_coef
def array_eq(x: numpy.array, y: numpy.array, max_diff: float = 1E-3) -> bool:
return numpy.abs(x - y).max() <= max_diff
+def test_conversion_coef():
+ units = [
+ ('x', 'mx', 1000),
+ ('Gx', 'Kx', 1000 ** 2),
+ ('Gx', 'x', 1000 ** 3),
+ ('x', 'Kix', 1.0 / 1024),
+ ('x', 'Mix', 1.0 / 1024 ** 2),
+ ('mx', 'Mix', 0.001 / 1024 ** 2),
+ ('Mix', 'Kix', 1024),
+ ('Kix', 'ux', 1024 * 1000 ** 2),
+ ]
+
+ for unit1, unit2, coef in units:
+ cc = float(unit_conversion_coef(unit1, unit2))
+ assert abs(cc / coef - 1) < 1E-5, "{} => {} == {}".format(unit1, unit2, cc)
+ rcc = float(unit_conversion_coef(unit2, unit1))
+ assert abs(rcc * cc - 1) < 1E-5, "{} => {} == {}".format(unit1, unit2, rcc)
+
+
def test_rebin_histo():
curr_histo = numpy.empty((100,), dtype=int)
curr_histo[:] = 1
@@ -51,6 +71,8 @@
ms_coef = 1000
s_offset = 377 * ms_coef
ms_offset = 300 + s_offset
+ borders = 10
+ block_size = 20
for i in range(16):
source_times = numpy.random.randint(100, size=samples, dtype='uint64') + \
@@ -60,24 +82,13 @@
ts = TimeSeries("test", raw=None, data=source_values, times=source_times, units=DATA_UNITS,
source=ds, time_units=TIME_UNITS)
- # ts2 = interpolate_ts_on_seconds_border(ts)
ts2 = c_interpolate_ts_on_seconds_border(ts, nc=True)
- # print()
- # print(ts.times)
- # print(ts.data, ts.data.sum())
- # print(ts2.times)
- # print(ts2.data, ts2.data.sum())
-
assert ts.time_units == 'ms'
assert ts2.time_units == 's'
assert ts2.times.dtype == ts.times.dtype
assert ts2.data.dtype == ts.data.dtype
- assert ts.data.sum() == ts2.data.sum()
-
- borders = 5
- block_size = samples // 10
for begin_idx in numpy.random.randint(borders, samples - borders, size=20):
begin_idx = int(begin_idx)
end_idx = min(begin_idx + block_size, ts.times.size - 1)
@@ -92,3 +103,36 @@
ts2_min = ts.data[ts2_begin_idx + 1: ts2_end_idx - 1].sum()
assert ts2_min <= ts_sum <= ts2_max, "NOT {} <= {} <= {}".format(ts2_min, ts_sum, ts2_max)
+
+
+def test_interpolate_qd():
+ ds = DataSource(node_id=NODE_ID, sensor=SENSOR, dev=DEV, metric=METRIC)
+ samples = 200
+ ms_coef = 1000
+ s_offset = 377 * ms_coef
+ ms_offset = 300 + s_offset
+
+ for i in range(16):
+ source_times = numpy.random.randint(100, size=samples, dtype='uint64') + \
+ ms_coef * numpy.arange(samples, dtype='uint64') + s_offset + ms_offset
+ source_values = numpy.random.randint(30, 60, size=samples, dtype='uint64')
+
+ ts = TimeSeries("test", raw=None, data=source_values, times=source_times, units=DATA_UNITS,
+ source=ds, time_units=TIME_UNITS)
+
+ ts2 = c_interpolate_ts_on_seconds_border(ts, nc=True, qd=True)
+
+ assert ts.time_units == 'ms'
+ assert ts2.time_units == 's'
+ assert ts2.times.dtype == ts.times.dtype
+ assert ts2.data.dtype == ts.data.dtype
+ assert ts2.data.size == ts2.times.size
+ assert abs(ts2.data.size - ts.data.size) <= 1
+
+ coef = unit_conversion_coef(ts2.time_units, ts.time_units)
+ assert isinstance(coef, int)
+
+ dtime = (ts2.times[1] - ts2.times[0]) * coef // 2
+
+ idxs = numpy.searchsorted(ts.times, ts2.times * coef - dtime)
+ assert (ts2.data == ts.data[idxs]).all()