blob: dcc3f6ecafecd95de7a5e862e9305dfec83aa54d [file] [log] [blame]
kdanylov aka kodercdfcdaf2017-04-29 10:03:39 +03001import shutil
2import tempfile
3import contextlib
4from typing import Tuple, Union, Dict, Any
5
6import numpy
7from oktest import ok
8
9
10from wally.result_classes import DataSource, TimeSeries, SuiteConfig
11from wally.suits.job import JobConfig, JobParams
12from wally.storage import make_storage
13from wally.hlstorage import ResultStorage
14
15
16@contextlib.contextmanager
17def in_temp_dir():
18 dname = tempfile.mkdtemp()
19 try:
20 yield dname
21 finally:
22 shutil.rmtree(dname)
23
24
25SUITE_ID = "suite1"
26JOB_ID = "job1"
27NODE_ID = "node1"
28SENSOR = "sensor"
29DEV = "dev"
30METRIC = "metric"
31TAG = "csv"
32DATA_UNITS = "x"
33TIME_UNITS = "us"
34
35
36class TestJobParams(JobParams):
37 def __init__(self) -> None:
38 JobParams.__init__(self)
39
40 @property
41 def summary(self) -> str:
42 return "UT_Job_CFG"
43
44 @property
45 def long_summary(self) -> str:
46 return "UT_Job_Config"
47
48 def copy(self, **updated) -> 'JobParams':
49 return self.__class__()
50
51 @property
52 def char_tpl(self) -> Tuple[Union[str, int, float, bool], ...]:
53 return (1, 2, 3)
54
55
56class TestJobConfig(JobConfig):
57 @property
58 def storage_id(self) -> str:
59 return JOB_ID
60
61 @property
62 def params(self) -> JobParams:
63 return TestJobParams()
64
65 def raw(self) -> Dict[str, Any]:
66 return {}
67
68 @classmethod
69 def fromraw(cls, data: Dict[str, Any]) -> 'TestJobConfig':
70 return cls()
71
72
73class TestSuiteConfig(SuiteConfig):
74 def __init__(self):
75 SuiteConfig.__init__(self, "UT", {}, "run_uuid", [], "/tmp", 0, False)
76 self.storage_id = SUITE_ID
77
78
79
80def test_sensor_ts():
81 with in_temp_dir() as root:
82 sensor_data = numpy.arange(5)
83 collected_at = numpy.arange(5) + 100
84
85 ds = DataSource(node_id=NODE_ID, sensor=SENSOR, dev=DEV, metric=METRIC)
86 cds = DataSource(node_id=NODE_ID, metric='collected_at')
87
88 with make_storage(root, existing=False) as storage:
89 rstorage = ResultStorage(storage)
90
91 rstorage.append_sensor(sensor_data, ds, units=DATA_UNITS, histo_bins=None)
92 rstorage.append_sensor(sensor_data, ds, units=DATA_UNITS, histo_bins=None)
93
94 rstorage.append_sensor(collected_at, cds, units=TIME_UNITS, histo_bins=None)
95 rstorage.append_sensor(collected_at + 5, cds, units=TIME_UNITS, histo_bins=None)
96
97 with make_storage(root, existing=True) as storage2:
98 rstorage2 = ResultStorage(storage2)
99 ts = rstorage2.load_sensor(ds)
100 assert (ts.data == numpy.array([0, 1, 2, 3, 4, 0, 1, 2, 3, 4])).all()
101 assert (ts.times == numpy.arange(10) + 100).all()
102
103
104def test_result_ts():
105 with in_temp_dir() as root:
106 sensor_data = numpy.arange(5, dtype=numpy.uint32)
107 collected_at = numpy.arange(5, dtype=numpy.uint32) + 100
108 ds = DataSource(suite_id=SUITE_ID, job_id=JOB_ID,
109 node_id=NODE_ID, sensor=SENSOR, dev=DEV, metric=METRIC, tag=TAG)
110
111 ts = TimeSeries("xxxx", None, sensor_data, collected_at, DATA_UNITS, ds, TIME_UNITS)
112
113 suite = TestSuiteConfig()
114 job = TestJobConfig(1)
115
116 with make_storage(root, existing=False) as storage:
117 rstorage = ResultStorage(storage)
118 rstorage.put_or_check_suite(suite)
119 rstorage.put_job(suite, job)
120 rstorage.put_ts(ts)
121
122 with make_storage(root, existing=True) as storage2:
123 rstorage2 = ResultStorage(storage2)
124 suits = list(rstorage2.iter_suite('UT'))
125 suits2 = list(rstorage2.iter_suite())
126 ok(len(suits)) == 1
127 ok(len(suits2)) == 1
128