blob: 411b515a5a4888aaacac4cdbe9be42f796e65768 [file] [log] [blame]
koder aka kdanilov108ac362017-01-19 20:17:16 +02001import os
kdanylov aka koder150b2192017-04-01 16:53:01 +03002import pprint
koder aka kdanilov108ac362017-01-19 20:17:16 +02003import logging
kdanylov aka koder0e0cfcb2017-03-27 22:19:09 +03004from typing import cast, Iterator, Tuple, Type, Dict, Optional, Any, List
koder aka kdanilov108ac362017-01-19 20:17:16 +02005
6import numpy
7
koder aka kdanilova732a602017-02-01 20:29:56 +02008from .suits.job import JobConfig
9from .result_classes import SuiteConfig, TimeSeries, DataSource, StatProps, IResultStorage
kdanylov aka koder0e0cfcb2017-03-27 22:19:09 +030010from .storage import Storage
11from .utils import StopTestError
koder aka kdanilov108ac362017-01-19 20:17:16 +020012from .suits.all_suits import all_suits
13
14
15logger = logging.getLogger('wally')
16
17
18class DB_re:
19 node_id = r'\d+.\d+.\d+.\d+:\d+'
koder aka kdanilova732a602017-02-01 20:29:56 +020020 job_id = r'[-a-zA-Z0-9_]+_\d+'
21 suite_id = r'[a-z_]+_\d+'
22 sensor = r'[-a-z_]+'
koder aka kdanilov108ac362017-01-19 20:17:16 +020023 dev = r'[-a-zA-Z0-9_]+'
koder aka kdanilov108ac362017-01-19 20:17:16 +020024 tag = r'[a-z_.]+'
koder aka kdanilova732a602017-02-01 20:29:56 +020025 metric = r'[a-z_.]+'
koder aka kdanilov108ac362017-01-19 20:17:16 +020026
27
28class DB_paths:
koder aka kdanilova732a602017-02-01 20:29:56 +020029 suite_cfg_r = r'results/{suite_id}\.info\.yml'
koder aka kdanilov108ac362017-01-19 20:17:16 +020030
kdanylov aka koder150b2192017-04-01 16:53:01 +030031 job_root = r'results/{suite_id}\.{job_id}/'
koder aka kdanilova732a602017-02-01 20:29:56 +020032 job_cfg_r = job_root + r'info\.yml'
33
34 # time series, data from load tool, sensor is a tool name
kdanylov aka koder150b2192017-04-01 16:53:01 +030035 ts_r = job_root + r'{node_id}\.{sensor}\.{metric}\.{tag}'
koder aka kdanilova732a602017-02-01 20:29:56 +020036
37 # statistica data for ts
kdanylov aka koder150b2192017-04-01 16:53:01 +030038 stat_r = job_root + r'{node_id}\.{sensor}\.{metric}\.stat\.yaml'
koder aka kdanilova732a602017-02-01 20:29:56 +020039
40 # sensor data
41 sensor_data_r = r'sensors/{node_id}_{sensor}\.{dev}\.{metric}\.csv'
42 sensor_time_r = r'sensors/{node_id}_collected_at\.csv'
43
44 report_root = 'report/'
kdanylov aka koder0e0cfcb2017-03-27 22:19:09 +030045 plot_r = r'{suite_id}\.{job_id}/{node_id}\.{sensor}\.{dev}\.{metric}\.{tag}'
koder aka kdanilova732a602017-02-01 20:29:56 +020046
koder aka kdanilov108ac362017-01-19 20:17:16 +020047 job_cfg = job_cfg_r.replace("\\.", '.')
koder aka kdanilova732a602017-02-01 20:29:56 +020048 suite_cfg = suite_cfg_r.replace("\\.", '.')
koder aka kdanilov108ac362017-01-19 20:17:16 +020049 ts = ts_r.replace("\\.", '.')
koder aka kdanilov108ac362017-01-19 20:17:16 +020050 stat = stat_r.replace("\\.", '.')
koder aka kdanilova732a602017-02-01 20:29:56 +020051 sensor_data = sensor_data_r.replace("\\.", '.')
52 sensor_time = sensor_time_r.replace("\\.", '.')
koder aka kdanilov108ac362017-01-19 20:17:16 +020053 plot = plot_r.replace("\\.", '.')
54
koder aka kdanilova732a602017-02-01 20:29:56 +020055
56DB_rr = {name: r"(?P<{}>{})".format(name, rr)
57 for name, rr in DB_re.__dict__.items()
58 if not name.startswith("__")}
koder aka kdanilov108ac362017-01-19 20:17:16 +020059
60
koder aka kdanilova732a602017-02-01 20:29:56 +020061def fill_path(path: str, **params) -> str:
62 for name, val in params.items():
63 if val is not None:
64 path = path.replace("{" + name + "}", val)
65 return path
koder aka kdanilov108ac362017-01-19 20:17:16 +020066
67
68class ResultStorage(IResultStorage):
69 # TODO: check that all path components match required patterns
70
koder aka kdanilova732a602017-02-01 20:29:56 +020071 ts_header_size = 64
koder aka kdanilov108ac362017-01-19 20:17:16 +020072 ts_header_format = "!IIIcc"
koder aka kdanilova732a602017-02-01 20:29:56 +020073 ts_arr_tag = 'csv'
kdanylov aka koder0e0cfcb2017-03-27 22:19:09 +030074 csv_file_encoding = 'ascii'
koder aka kdanilov108ac362017-01-19 20:17:16 +020075
76 def __init__(self, storage: Storage) -> None:
77 self.storage = storage
kdanylov aka koder0e0cfcb2017-03-27 22:19:09 +030078 self.cache = {} # type: Dict[str, Tuple[int, int, Any, List[str]]]
koder aka kdanilov108ac362017-01-19 20:17:16 +020079
80 def sync(self) -> None:
81 self.storage.sync()
82
koder aka kdanilova732a602017-02-01 20:29:56 +020083 # ----------------- SERIALIZATION / DESERIALIZATION -------------------------------------------------------------
kdanylov aka koder0e0cfcb2017-03-27 22:19:09 +030084 def load_array(self, path: str, skip_shape: bool = False) -> Tuple[numpy.array, Tuple[str, ...]]:
85 with self.storage.get_fd(path, "rb") as fd:
86 stats = os.fstat(fd.fileno())
87 if path in self.cache:
88 size, atime, obj, header = self.cache[path]
89 if size == stats.st_size and atime == stats.st_atime_ns:
90 return obj, header
91
92 header = fd.readline().decode(self.csv_file_encoding).strip().split(",")
kdanylov aka koder150b2192017-04-01 16:53:01 +030093
kdanylov aka koder0e0cfcb2017-03-27 22:19:09 +030094 if skip_shape:
95 header = header[1:]
96 dt = fd.read().decode("utf-8").strip()
kdanylov aka koder150b2192017-04-01 16:53:01 +030097
kdanylov aka koder0e0cfcb2017-03-27 22:19:09 +030098 arr = numpy.fromstring(dt.replace("\n", ','), sep=',', dtype=header[0])
99 if len(dt) != 0:
100 lines = dt.count("\n") + 1
101 columns = dt.split("\n", 1)[0].count(",") + 1
102 assert lines * columns == len(arr)
103 if columns == 1:
104 arr.shape = (lines,)
105 else:
106 arr.shape = (lines, columns)
107
108 self.cache[path] = (stats.st_size, stats.st_atime_ns, arr, header[1:])
109 return arr, header[1:]
110
111 def put_array(self, path:str, data: numpy.array, header: List[str], append_on_exists: bool = False) -> None:
112 header = [data.dtype.name] + header
113
114 exists = append_on_exists and path in self.storage
115 if len(data.shape) == 1:
116 # make array vertical to simplify reading
117 vw = data.view().reshape((data.shape[0], 1))
118 else:
119 vw = data
120
kdanylov aka koder150b2192017-04-01 16:53:01 +0300121 with self.storage.get_fd(path, "cb" if not exists else "rb+") as fd:
kdanylov aka koder0e0cfcb2017-03-27 22:19:09 +0300122 if exists:
123 curr_header = fd.readline().decode(self.csv_file_encoding).rstrip().split(",")
124 assert header == curr_header, \
125 "Path {!r}. Expected header ({!r}) and current header ({!r}) don't match"\
126 .format(path, header, curr_header)
127 fd.seek(0, os.SEEK_END)
128 else:
129 fd.write((",".join(header) + "\n").encode(self.csv_file_encoding))
130
131 numpy.savetxt(fd, vw, delimiter=',', newline="\n", fmt="%lu")
koder aka kdanilova732a602017-02-01 20:29:56 +0200132
133 def load_ts(self, ds: DataSource, path: str) -> TimeSeries:
kdanylov aka koder0e0cfcb2017-03-27 22:19:09 +0300134 arr, header = self.load_array(path, skip_shape=True)
135 units, time_units = header
koder aka kdanilova732a602017-02-01 20:29:56 +0200136
kdanylov aka koder0e0cfcb2017-03-27 22:19:09 +0300137 data = arr[:,1:]
138 if data.shape[1] == 1:
139 data = data.reshape((-1,))
koder aka kdanilova732a602017-02-01 20:29:56 +0200140
141 return TimeSeries("{}.{}".format(ds.dev, ds.sensor),
142 raw=None,
kdanylov aka koder0e0cfcb2017-03-27 22:19:09 +0300143 data=data,
koder aka kdanilova732a602017-02-01 20:29:56 +0200144 times=arr[:,0],
145 source=ds,
146 units=units,
147 time_units=time_units)
148
149 def load_sensor(self, ds: DataSource) -> TimeSeries:
kdanylov aka koder0e0cfcb2017-03-27 22:19:09 +0300150 collected_at, collect_header = self.load_array(DB_paths.sensor_time.format(**ds.__dict__))
koder aka kdanilova732a602017-02-01 20:29:56 +0200151 assert collect_header == [ds.node_id, 'collected_at', 'us'], repr(collect_header)
kdanylov aka koder0e0cfcb2017-03-27 22:19:09 +0300152 data, data_header = self.load_array(DB_paths.sensor_data.format(**ds.__dict__))
koder aka kdanilova732a602017-02-01 20:29:56 +0200153
154 data_units = data_header[2]
155 assert data_header == [ds.node_id, ds.metric_fqdn, data_units]
156
kdanylov aka koder0e0cfcb2017-03-27 22:19:09 +0300157 assert len(data.shape) == 1
158 assert len(collected_at.shape) == 1
159
koder aka kdanilova732a602017-02-01 20:29:56 +0200160 return TimeSeries(ds.metric_fqdn,
161 raw=None,
162 data=data,
163 times=collected_at,
164 source=ds,
165 units=data_units,
166 time_units='us')
167
168 # ------------- CHECK DATA IN STORAGE ----------------------------------------------------------------------------
169
170 def check_plot_file(self, source: DataSource) -> Optional[str]:
171 path = DB_paths.plot.format(**source.__dict__)
kdanylov aka koder0e0cfcb2017-03-27 22:19:09 +0300172 fpath = self.storage.resolve_raw(DB_paths.report_root + path)
koder aka kdanilova732a602017-02-01 20:29:56 +0200173 return path if os.path.exists(fpath) else None
174
175 # ------------- PUT DATA INTO STORAGE --------------------------------------------------------------------------
176
177 def put_or_check_suite(self, suite: SuiteConfig) -> None:
koder aka kdanilov108ac362017-01-19 20:17:16 +0200178 path = DB_paths.suite_cfg.format(suite_id=suite.storage_id)
179 if path in self.storage:
kdanylov aka koder150b2192017-04-01 16:53:01 +0300180 db_cfg = self.storage.load(SuiteConfig, path)
koder aka kdanilov108ac362017-01-19 20:17:16 +0200181 if db_cfg != suite:
182 logger.error("Current suite %s config is not equal to found in storage at %s", suite.test_type, path)
kdanylov aka koder150b2192017-04-01 16:53:01 +0300183 logger.debug("Current: \n%s\nStorage:\n%s", pprint.pformat(db_cfg), pprint.pformat(suite))
koder aka kdanilov108ac362017-01-19 20:17:16 +0200184 raise StopTestError()
kdanylov aka koder150b2192017-04-01 16:53:01 +0300185 else:
186 self.storage.put(suite, path)
koder aka kdanilov108ac362017-01-19 20:17:16 +0200187
koder aka kdanilova732a602017-02-01 20:29:56 +0200188 def put_job(self, suite: SuiteConfig, job: JobConfig) -> None:
koder aka kdanilov108ac362017-01-19 20:17:16 +0200189 path = DB_paths.job_cfg.format(suite_id=suite.storage_id, job_id=job.storage_id)
190 self.storage.put(job, path)
191
192 def put_ts(self, ts: TimeSeries) -> None:
koder aka kdanilova732a602017-02-01 20:29:56 +0200193 assert ts.data.dtype == ts.times.dtype
194 assert ts.data.dtype.kind == 'u'
195 assert ts.source.tag == self.ts_arr_tag
koder aka kdanilov108ac362017-01-19 20:17:16 +0200196
koder aka kdanilova732a602017-02-01 20:29:56 +0200197 csv_path = DB_paths.ts.format(**ts.source.__dict__)
kdanylov aka koder0e0cfcb2017-03-27 22:19:09 +0300198 header = [ts.data.dtype.name, ts.units, ts.time_units]
koder aka kdanilov108ac362017-01-19 20:17:16 +0200199
kdanylov aka koder0e0cfcb2017-03-27 22:19:09 +0300200 tv = ts.times.view().reshape((-1, 1))
201 if len(ts.data.shape) == 1:
202 dv = ts.data.view().reshape((ts.times.shape[0], -1))
203 else:
204 dv = ts.data
koder aka kdanilov108ac362017-01-19 20:17:16 +0200205
kdanylov aka koder0e0cfcb2017-03-27 22:19:09 +0300206 result = numpy.concatenate((tv, dv), axis=1)
207 self.put_array(csv_path, result, header)
koder aka kdanilov108ac362017-01-19 20:17:16 +0200208
209 if ts.raw:
koder aka kdanilova732a602017-02-01 20:29:56 +0200210 raw_path = DB_paths.ts.format(**ts.source(tag=ts.raw_tag).__dict__)
koder aka kdanilov108ac362017-01-19 20:17:16 +0200211 self.storage.put_raw(ts.raw, raw_path)
212
213 def put_extra(self, data: bytes, source: DataSource) -> None:
kdanylov aka koder150b2192017-04-01 16:53:01 +0300214 self.storage.put_raw(data, DB_paths.ts.format(**source.__dict__))
koder aka kdanilov108ac362017-01-19 20:17:16 +0200215
216 def put_stat(self, data: StatProps, source: DataSource) -> None:
koder aka kdanilova732a602017-02-01 20:29:56 +0200217 self.storage.put(data, DB_paths.stat.format(**source.__dict__))
koder aka kdanilov108ac362017-01-19 20:17:16 +0200218
219 # return path to file to be inserted into report
220 def put_plot_file(self, data: bytes, source: DataSource) -> str:
221 path = DB_paths.plot.format(**source.__dict__)
kdanylov aka koder0e0cfcb2017-03-27 22:19:09 +0300222 self.storage.put_raw(data, DB_paths.report_root + path)
223 return path
koder aka kdanilov108ac362017-01-19 20:17:16 +0200224
koder aka kdanilov108ac362017-01-19 20:17:16 +0200225 def put_report(self, report: str, name: str) -> str:
kdanylov aka koder0e0cfcb2017-03-27 22:19:09 +0300226 return self.storage.put_raw(report.encode(self.csv_file_encoding), DB_paths.report_root + name)
koder aka kdanilova732a602017-02-01 20:29:56 +0200227
228 def append_sensor(self, data: numpy.array, ds: DataSource, units: str) -> None:
229 if ds.metric == 'collected_at':
230 path = DB_paths.sensor_time
231 metrics_fqn = 'collected_at'
232 else:
233 path = DB_paths.sensor_data
234 metrics_fqn = ds.metric_fqdn
kdanylov aka koder0e0cfcb2017-03-27 22:19:09 +0300235 self.put_array(path.format(**ds.__dict__), data, [ds.node_id, metrics_fqn, units], append_on_exists=True)
koder aka kdanilova732a602017-02-01 20:29:56 +0200236
237 # ------------- GET DATA FROM STORAGE --------------------------------------------------------------------------
238
239 def get_stat(self, stat_cls: Type[StatProps], source: DataSource) -> StatProps:
240 return self.storage.load(stat_cls, DB_paths.stat.format(**source.__dict__))
241
242 # ------------- ITER OVER STORAGE ------------------------------------------------------------------------------
243
244 def iter_paths(self, path_glob) -> Iterator[Tuple[bool, str, Dict[str, str]]]:
245 path = path_glob.format(**DB_rr).split("/")
246 yield from self.storage._iter_paths("", path, {})
247
248 def iter_suite(self, suite_type: str = None) -> Iterator[SuiteConfig]:
249 for is_file, suite_info_path, groups in self.iter_paths(DB_paths.suite_cfg_r):
250 assert is_file
251 suite = self.storage.load(SuiteConfig, suite_info_path)
252 # suite = cast(SuiteConfig, self.storage.load(SuiteConfig, suite_info_path))
253 assert suite.storage_id == groups['suite_id']
254 if not suite_type or suite.test_type == suite_type:
255 yield suite
256
257 def iter_job(self, suite: SuiteConfig) -> Iterator[JobConfig]:
258 job_glob = fill_path(DB_paths.job_cfg_r, suite_id=suite.storage_id)
259 job_config_cls = all_suits[suite.test_type].job_config_cls
260 for is_file, path, groups in self.iter_paths(job_glob):
261 assert is_file
262 job = cast(JobConfig, self.storage.load(job_config_cls, path))
263 assert job.storage_id == groups['job_id']
264 yield job
265
266 # iterate over test tool data
267 def iter_ts(self, suite: SuiteConfig, job: JobConfig, **filters) -> Iterator[TimeSeries]:
268 filters.update(suite_id=suite.storage_id, job_id=job.storage_id)
269 ts_glob = fill_path(DB_paths.ts_r, **filters)
koder aka kdanilova732a602017-02-01 20:29:56 +0200270 for is_file, path, groups in self.iter_paths(ts_glob):
kdanylov aka koder150b2192017-04-01 16:53:01 +0300271 tag = groups["tag"]
272 if tag != 'csv':
273 continue
koder aka kdanilova732a602017-02-01 20:29:56 +0200274 assert is_file
275 groups = groups.copy()
276 groups.update(filters)
277 ds = DataSource(suite_id=suite.storage_id,
278 job_id=job.storage_id,
279 node_id=groups["node_id"],
280 sensor=groups["sensor"],
281 dev=None,
282 metric=groups["metric"],
kdanylov aka koder150b2192017-04-01 16:53:01 +0300283 tag=tag)
koder aka kdanilova732a602017-02-01 20:29:56 +0200284 yield self.load_ts(ds, path)
285
286 def iter_sensors(self, node_id: str = None, sensor: str = None, dev: str = None, metric: str = None) -> \
287 Iterator[Tuple[str, Dict[str, str]]]:
288
289 path = fill_path(DB_paths.sensor_data_r, node_id=node_id, sensor=sensor, dev=dev, metric=metric)
290 for is_file, path, groups in self.iter_paths(path):
291 assert is_file
292 yield path, groups
293
294