blob: 9cebffdd2d49f4e1fc3fee7d5aea26242c02b6a9 [file] [log] [blame]
import numpy
import pytest
from wally.statistic import rebin_histogram
from wally.result_classes import DataSource, TimeSeries
from wally.data_selectors import c_interpolate_ts_on_seconds_border
from wally.utils import unit_conversion_coef
def array_eq(x: numpy.array, y: numpy.array, max_diff: float = 1E-3) -> bool:
return numpy.abs(x - y).max() <= max_diff
def test_conversion_coef():
units = [
('x', 'mx', 1000),
('Gx', 'Kx', 1000 ** 2),
('Gx', 'x', 1000 ** 3),
('x', 'Kix', 1.0 / 1024),
('x', 'Mix', 1.0 / 1024 ** 2),
('mx', 'Mix', 0.001 / 1024 ** 2),
('Mix', 'Kix', 1024),
('Kix', 'ux', 1024 * 1000 ** 2),
]
for unit1, unit2, coef in units:
cc = float(unit_conversion_coef(unit1, unit2))
assert abs(cc / coef - 1) < 1E-5, "{} => {} == {}".format(unit1, unit2, cc)
rcc = float(unit_conversion_coef(unit2, unit1))
assert abs(rcc * cc - 1) < 1E-5, "{} => {} == {}".format(unit1, unit2, rcc)
def test_rebin_histo():
curr_histo = numpy.empty((100,), dtype=int)
curr_histo[:] = 1
edges = numpy.arange(100)
new_histo, new_edges = rebin_histogram(curr_histo, edges, 10)
assert new_edges.shape == (10,)
assert new_histo.shape == (10,)
assert new_edges.dtype.name.startswith('float')
assert new_histo.dtype.name.startswith('int')
assert array_eq(new_edges, numpy.arange(10) * 9.9)
assert new_histo.sum() == curr_histo.sum()
assert list(new_histo) == [10] * 10
new_histo, new_edges = rebin_histogram(curr_histo, edges, 3,
left_tail_idx=20,
right_tail_idx=50)
assert new_edges.shape == (3,)
assert new_histo.shape == (3,)
assert array_eq(new_edges, numpy.array([20, 30, 40]))
assert new_histo.sum() == curr_histo.sum()
assert list(new_histo) == [30, 10, 60]
SUITE_ID = "suite1"
JOB_ID = "job1"
NODE_ID = "node1"
SENSOR = "sensor"
DEV = "dev"
METRIC = "metric"
TAG = "csv"
DATA_UNITS = "x"
TIME_UNITS = "ms"
def test_interpolate():
ds = DataSource(node_id=NODE_ID, sensor=SENSOR, dev=DEV, metric=METRIC)
samples = 200
ms_coef = 1000
s_offset = 377 * ms_coef
ms_offset = 300 + s_offset
borders = 10
block_size = 20
for i in range(16):
source_times = numpy.random.randint(100, size=samples, dtype='uint64') + \
ms_coef * numpy.arange(samples, dtype='uint64') + s_offset + ms_offset
source_values = numpy.random.randint(30, 60, size=samples, dtype='uint64')
ts = TimeSeries("test", raw=None, data=source_values, times=source_times, units=DATA_UNITS,
source=ds, time_units=TIME_UNITS)
ts2 = c_interpolate_ts_on_seconds_border(ts, nc=True)
assert ts.time_units == 'ms'
assert ts2.time_units == 's'
assert ts2.times.dtype == ts.times.dtype
assert ts2.data.dtype == ts.data.dtype
for begin_idx in numpy.random.randint(borders, samples - borders, size=20):
begin_idx = int(begin_idx)
end_idx = min(begin_idx + block_size, ts.times.size - 1)
first_cell_begin_time = ts.times[begin_idx - 1]
last_cell_end_time = ts.times[end_idx]
ts_sum = ts.data[begin_idx:end_idx].sum()
ts2_begin_idx = numpy.searchsorted(ts2.times, first_cell_begin_time // ms_coef)
ts2_end_idx = numpy.searchsorted(ts2.times, last_cell_end_time // ms_coef) + 1
ts2_max = ts.data[ts2_begin_idx: ts2_end_idx].sum()
ts2_min = ts.data[ts2_begin_idx + 1: ts2_end_idx - 1].sum()
assert ts2_min <= ts_sum <= ts2_max, "NOT {} <= {} <= {}".format(ts2_min, ts_sum, ts2_max)
def test_interpolate2():
ds = DataSource(node_id=NODE_ID, sensor=SENSOR, dev=DEV, metric=METRIC)
samples = 5
ms_coef = 1000
source_times = numpy.arange(samples, dtype='uint64') * ms_coef + ms_coef + 347
source_values = numpy.random.randint(10, 1000, size=samples, dtype='uint64')
ts = TimeSeries("test", raw=None, data=source_values, times=source_times, units=DATA_UNITS,
source=ds, time_units=TIME_UNITS)
ts2 = c_interpolate_ts_on_seconds_border(ts, nc=True)
assert ts.time_units == 'ms'
assert ts2.time_units == 's'
assert ts2.times.dtype == ts.times.dtype
assert ts2.data.dtype == ts.data.dtype
assert (ts2.data == ts.data).all()
def test_interpolate_qd():
ds = DataSource(node_id=NODE_ID, sensor=SENSOR, dev=DEV, metric=METRIC)
samples = 200
ms_coef = 1000
s_offset = 377 * ms_coef
ms_offset = 300 + s_offset
for i in range(16):
source_times = numpy.random.randint(100, size=samples, dtype='uint64') + \
ms_coef * numpy.arange(samples, dtype='uint64') + s_offset + ms_offset
source_values = numpy.random.randint(30, 60, size=samples, dtype='uint64')
ts = TimeSeries("test", raw=None, data=source_values, times=source_times, units=DATA_UNITS,
source=ds, time_units=TIME_UNITS)
ts2 = c_interpolate_ts_on_seconds_border(ts, nc=True, tp='qd')
assert ts.time_units == 'ms'
assert ts2.time_units == 's'
assert ts2.times.dtype == ts.times.dtype
assert ts2.data.dtype == ts.data.dtype
assert ts2.data.size == ts2.times.size
coef = unit_conversion_coef(ts2.time_units, ts.time_units)
assert isinstance(coef, int)
dtime = (ts2.times[1] - ts2.times[0]) * coef // 2
idxs = numpy.searchsorted(ts.times, ts2.times * coef - dtime)
assert (ts2.data == ts.data[idxs]).all()
def test_interpolate_fio():
ds = DataSource(node_id=NODE_ID, sensor=SENSOR, dev=DEV, metric=METRIC)
ms_coef = 1000
s_offset = 377 * ms_coef
gap_start = 5
gap_size = 5
full_size = 15
times = list(range(gap_start)) + list(range(gap_start + gap_size, full_size))
src_times = numpy.array(times, dtype='uint64') * ms_coef + s_offset
src_values = numpy.random.randint(10, 100, size=len(src_times), dtype='uint64')
ts = TimeSeries("test", raw=None, data=src_values, times=src_times, units=DATA_UNITS,
source=ds, time_units=TIME_UNITS)
ts2 = c_interpolate_ts_on_seconds_border(ts, nc=True, tp='fio')
assert ts.time_units == 'ms'
assert ts2.time_units == 's'
assert ts2.times.dtype == ts.times.dtype
assert ts2.data.dtype == ts.data.dtype
assert ts2.times[0] == ts.times[0] // ms_coef
assert ts2.times[-1] == ts.times[-1] // ms_coef
assert ts2.data.size == ts2.times.size
expected_times = numpy.arange(ts.times[0] // ms_coef, ts.times[-1] // ms_coef + 1, dtype='uint64')
assert ts2.times.size == expected_times.size
assert (ts2.times == expected_times).all()
assert (ts2.data[:gap_start] == ts.data[:gap_start]).all()
assert (ts2.data[gap_start:gap_start + gap_size] == 0).all()
assert (ts2.data[gap_start + gap_size:] == ts.data[gap_start:]).all()
def test_interpolate_fio_negative():
ds = DataSource(node_id=NODE_ID, sensor=SENSOR, dev=DEV, metric=METRIC)
ms_coef = 1000
s_offset = 377 * ms_coef
src_times = (numpy.array([1, 2, 3, 4.5, 5, 6, 7]) * ms_coef + s_offset).astype('uint64')
src_values = numpy.random.randint(10, 100, size=len(src_times), dtype='uint64')
ts = TimeSeries("test", raw=None, data=src_values, times=src_times, units=DATA_UNITS,
source=ds, time_units=TIME_UNITS)
with pytest.raises(ValueError):
c_interpolate_ts_on_seconds_border(ts, nc=True, tp='fio')