blob: c71159f16684a198ba62e68242cf7629a092ebbd [file] [log] [blame]
koder aka kdanilov108ac362017-01-19 20:17:16 +02001import re
2import os
3import array
4import struct
5import logging
6from typing import cast, Iterator, Tuple, Type, Dict, Set, List, Optional
7
8import numpy
9
10from .result_classes import (TestSuiteConfig, TestJobConfig, TimeSeries, DataSource,
11 StatProps, IResultStorage)
12from .storage import Storage
13from .utils import StopTestError
14from .suits.all_suits import all_suits
15
16
17logger = logging.getLogger('wally')
18
19
20class DB_re:
21 node_id = r'\d+.\d+.\d+.\d+:\d+'
22 job_id = r'[-a-zA-Z0-9]+_\d+'
23 sensor = r'[a-z_]+'
24 dev = r'[-a-zA-Z0-9_]+'
25 suite_id = r'[a-z]+_\d+'
26 tag = r'[a-z_.]+'
27
28
29class DB_paths:
30 suite_cfg_r = r'results/{suite_id}_info\.yml'
31 suite_cfg = suite_cfg_r.replace("\\.", '.')
32
33 job_cfg_r = r'results/{suite_id}\.{job_id}/info\.yml'
34 job_cfg = job_cfg_r.replace("\\.", '.')
35
36 job_extra_r = r'results/{suite_id}\.{job_id}/{node_id}/{dev}\.{sensor}\.{tag}'
37 job_extra = job_extra_r.replace("\\.", '.')
38
39 ts_r = r'results/{suite_id}\.{job_id}/{node_id}/{dev}\.{sensor}\.{tag}'
40 ts = ts_r.replace("\\.", '.')
41
42 stat_r = r'results/{suite_id}\.{job_id}/{node_id}/{dev}\.{sensor}\.{tag}'
43 stat = stat_r.replace("\\.", '.')
44
45 plot_r = r'report/{suite_id}\.{job_id}/{node_id}/{dev}\.{sensor}\.{tag}'
46 plot = plot_r.replace("\\.", '.')
47
48 report = r'report/'
49
50
51DB_rr = {name: r"(?P<{}>{})".format(name, rr) for name, rr in DB_re.__dict__.items() if not name.startswith("__")}
52
53
54class ResultStorage(IResultStorage):
55 # TODO: check that all path components match required patterns
56
57 ts_header_format = "!IIIcc"
58 ts_arr_tag = 'bin'
59 ts_raw_tag = 'txt'
60
61 def __init__(self, storage: Storage) -> None:
62 self.storage = storage
63
64 def sync(self) -> None:
65 self.storage.sync()
66
67 def put_or_check_suite(self, suite: TestSuiteConfig) -> None:
68 path = DB_paths.suite_cfg.format(suite_id=suite.storage_id)
69 if path in self.storage:
70 db_cfg = self.storage.get(path)
71 if db_cfg != suite:
72 logger.error("Current suite %s config is not equal to found in storage at %s", suite.test_type, path)
73 raise StopTestError()
74
75 self.storage.put(suite, path)
76
77 def put_job(self, suite: TestSuiteConfig, job: TestJobConfig) -> None:
78 path = DB_paths.job_cfg.format(suite_id=suite.storage_id, job_id=job.storage_id)
79 self.storage.put(job, path)
80
81 def put_ts(self, ts: TimeSeries) -> None:
82 data = cast(List[int], ts.data)
83 times = cast(List[int], ts.times)
84
85 if len(data) % ts.second_axis_size != 0:
86 logger.error("Time series data size(%s) is not propotional to second_axis_size(%s).",
87 len(data), ts.second_axis_size)
88 raise StopTestError()
89
90 if len(data) // ts.second_axis_size != len(times):
91 logger.error("Unbalanced data and time srray sizes. %s", ts)
92 raise StopTestError()
93
94 bin_path = DB_paths.ts.format(**ts.source(tag=self.ts_arr_tag).__dict__)
95
96 with self.storage.get_fd(bin_path, "cb") as fd:
97 header = struct.pack(self.ts_header_format,
98 ts.second_axis_size,
99 len(data),
100 len(times),
101 ts.data.typecode.encode("ascii"),
102 ts.times.typecode.encode("ascii"))
103 fd.write(header)
104 ts.data.tofile(fd) # type: ignore
105 ts.times.tofile(fd) # type: ignore
106
107 if ts.raw:
108 raw_path = DB_paths.ts.format(**ts.source(tag=self.ts_raw_tag).__dict__)
109 self.storage.put_raw(ts.raw, raw_path)
110
111 def put_extra(self, data: bytes, source: DataSource) -> None:
112 path = DB_paths.job_cfg.format(**source.__dict__)
113 self.storage.put_raw(data, path)
114
115 def put_stat(self, data: StatProps, source: DataSource) -> None:
116 path = DB_paths.stat.format(**source.__dict__)
117 self.storage.put(data, path)
118
119 def get_stat(self, stat_cls: Type[StatProps], source: DataSource) -> StatProps:
120 path = DB_paths.stat.format(**source.__dict__)
121 return self.storage.load(stat_cls, path)
122
123 def iter_paths(self, path_glob) -> Iterator[Tuple[bool, str, Dict[str, str]]]:
124 path = path_glob.format(**DB_rr).split("/")
125 yield from self.storage._iter_paths("", path, {})
126
127 def iter_suite(self, suite_type: str = None) -> Iterator[TestSuiteConfig]:
128 for is_file, suite_info_path, groups in self.iter_paths(DB_paths.suite_cfg_r):
129 assert is_file
130 suite = cast(TestSuiteConfig, self.storage.load(TestSuiteConfig, suite_info_path))
131 assert suite.storage_id == groups['suite_id']
132 if not suite_type or suite.test_type == suite_type:
133 yield suite
134
135 def iter_job(self, suite: TestSuiteConfig) -> Iterator[TestJobConfig]:
136 job_glob = DB_paths.job_cfg_r.replace('{suite_id}', suite.storage_id)
137 job_config_cls = all_suits[suite.test_type].job_config_cls
138
139 for is_file, path, groups in self.iter_paths(job_glob):
140 assert is_file
141 job = cast(TestJobConfig, self.storage.load(job_config_cls, path))
142 assert job.storage_id == groups['job_id']
143 yield job
144
145 def iter_datasource(self, suite: TestSuiteConfig, job: TestJobConfig) -> Iterator[Tuple[DataSource, Dict[str, str]]]:
146 ts_glob = DB_paths.ts_r.replace('{suite_id}', suite.storage_id).replace('{job_id}', job.storage_id)
147 ts_found = {} # type: Dict[Tuple[str, str, str], Dict[str, str]]
148
149 for is_file, path, groups in self.iter_paths(ts_glob):
150 assert is_file
151 key = (groups['node_id'], groups['dev'], groups['sensor'])
152 ts_found.setdefault(key, {})[groups['tag']] = path
153
154 for (node_id, dev, sensor), tag2path in ts_found.items():
155 if self.ts_arr_tag in tag2path:
156 yield DataSource(suite_id=suite.storage_id,
157 job_id=job.storage_id,
158 node_id=node_id,
159 dev=dev, sensor=sensor, tag=None), tag2path
160
161 def load_ts(self, ds: DataSource, path: str) -> TimeSeries:
162 with self.storage.get_fd(path, "rb") as fd:
163 header = fd.read(struct.calcsize(self.ts_header_format))
164 second_axis_size, data_sz, time_sz, data_typecode, time_typecode = \
165 struct.unpack(self.ts_header_format, header)
166
167 data = array.array(data_typecode.decode("ascii"))
168 times = array.array(time_typecode.decode("ascii"))
169
170 data.fromfile(fd, data_sz) # type: ignore
171 times.fromfile(fd, time_sz) # type: ignore
172
173 return TimeSeries("{}.{}".format(ds.dev, ds.sensor),
174 raw=None,
175 data=numpy.array(data, dtype=numpy.dtype('float32')),
176 times=numpy.array(times),
177 second_axis_size=second_axis_size,
178 source=ds)
179
180 def iter_ts(self, suite: TestSuiteConfig, job: TestJobConfig, **filters) -> Iterator[TimeSeries]:
181 for ds, tag2path in self.iter_datasource(suite, job):
182 for name, val in filters.items():
183 if val != getattr(ds, name):
184 break
185 else:
186 ts = self.load_ts(ds, tag2path[self.ts_arr_tag])
187 if self.ts_raw_tag in tag2path:
188 ts.raw = self.storage.get_raw(tag2path[self.ts_raw_tag])
189
190 yield ts
191
192 # return path to file to be inserted into report
193 def put_plot_file(self, data: bytes, source: DataSource) -> str:
194 path = DB_paths.plot.format(**source.__dict__)
195 return cast(str, self.storage.put_raw(data, path))
196
197 def check_plot_file(self, source: DataSource) -> Optional[str]:
198 path = DB_paths.plot.format(**source.__dict__)
199 fpath = self.storage.resolve_raw(path)
200 if os.path.exists(fpath):
201 return fpath
202 return None
203
204 def put_report(self, report: str, name: str) -> str:
205 return self.storage.put_raw(report.encode("utf8"), DB_paths.report + name)