blob: f2c94884c627c4429d22360c7911d3f8904b1688 [file] [log] [blame]
koder aka kdanilov108ac362017-01-19 20:17:16 +02001import os
koder aka kdanilov108ac362017-01-19 20:17:16 +02002import logging
koder aka kdanilova732a602017-02-01 20:29:56 +02003from typing import cast, Iterator, Tuple, Type, Dict, Optional
koder aka kdanilov108ac362017-01-19 20:17:16 +02004
5import numpy
6
koder aka kdanilova732a602017-02-01 20:29:56 +02007from .suits.job import JobConfig
8from .result_classes import SuiteConfig, TimeSeries, DataSource, StatProps, IResultStorage
9from .storage import Storage, csv_file_encoding
10from .utils import StopTestError, str2shape, shape2str
koder aka kdanilov108ac362017-01-19 20:17:16 +020011from .suits.all_suits import all_suits
12
13
14logger = logging.getLogger('wally')
15
16
17class DB_re:
18 node_id = r'\d+.\d+.\d+.\d+:\d+'
koder aka kdanilova732a602017-02-01 20:29:56 +020019 job_id = r'[-a-zA-Z0-9_]+_\d+'
20 suite_id = r'[a-z_]+_\d+'
21 sensor = r'[-a-z_]+'
koder aka kdanilov108ac362017-01-19 20:17:16 +020022 dev = r'[-a-zA-Z0-9_]+'
koder aka kdanilov108ac362017-01-19 20:17:16 +020023 tag = r'[a-z_.]+'
koder aka kdanilova732a602017-02-01 20:29:56 +020024 metric = r'[a-z_.]+'
koder aka kdanilov108ac362017-01-19 20:17:16 +020025
26
27class DB_paths:
koder aka kdanilova732a602017-02-01 20:29:56 +020028 suite_cfg_r = r'results/{suite_id}\.info\.yml'
koder aka kdanilov108ac362017-01-19 20:17:16 +020029
koder aka kdanilova732a602017-02-01 20:29:56 +020030 job_root = r'results/{suite_id}.{job_id}/'
31 job_cfg_r = job_root + r'info\.yml'
32
33 # time series, data from load tool, sensor is a tool name
34 ts_r = job_root + r'{node_id}\.{sensor}\.{metric}.{tag}'
35
36 # statistica data for ts
37 stat_r = job_root + r'{node_id}\.{sensor}\.{metric}\.stat.yaml'
38
39 # sensor data
40 sensor_data_r = r'sensors/{node_id}_{sensor}\.{dev}\.{metric}\.csv'
41 sensor_time_r = r'sensors/{node_id}_collected_at\.csv'
42
43 report_root = 'report/'
44 plot_r = r'report/{suite_id}\.{job_id}/{node_id}\.{sensor}\.{dev}\.{metric}\.{tag}'
45
koder aka kdanilov108ac362017-01-19 20:17:16 +020046 job_cfg = job_cfg_r.replace("\\.", '.')
koder aka kdanilova732a602017-02-01 20:29:56 +020047 suite_cfg = suite_cfg_r.replace("\\.", '.')
koder aka kdanilov108ac362017-01-19 20:17:16 +020048 ts = ts_r.replace("\\.", '.')
koder aka kdanilov108ac362017-01-19 20:17:16 +020049 stat = stat_r.replace("\\.", '.')
koder aka kdanilova732a602017-02-01 20:29:56 +020050 sensor_data = sensor_data_r.replace("\\.", '.')
51 sensor_time = sensor_time_r.replace("\\.", '.')
koder aka kdanilov108ac362017-01-19 20:17:16 +020052 plot = plot_r.replace("\\.", '.')
53
koder aka kdanilova732a602017-02-01 20:29:56 +020054
55DB_rr = {name: r"(?P<{}>{})".format(name, rr)
56 for name, rr in DB_re.__dict__.items()
57 if not name.startswith("__")}
koder aka kdanilov108ac362017-01-19 20:17:16 +020058
59
koder aka kdanilova732a602017-02-01 20:29:56 +020060def fill_path(path: str, **params) -> str:
61 for name, val in params.items():
62 if val is not None:
63 path = path.replace("{" + name + "}", val)
64 return path
koder aka kdanilov108ac362017-01-19 20:17:16 +020065
66
67class ResultStorage(IResultStorage):
68 # TODO: check that all path components match required patterns
69
koder aka kdanilova732a602017-02-01 20:29:56 +020070 ts_header_size = 64
koder aka kdanilov108ac362017-01-19 20:17:16 +020071 ts_header_format = "!IIIcc"
koder aka kdanilova732a602017-02-01 20:29:56 +020072 ts_arr_tag = 'csv'
koder aka kdanilov108ac362017-01-19 20:17:16 +020073
74 def __init__(self, storage: Storage) -> None:
75 self.storage = storage
76
77 def sync(self) -> None:
78 self.storage.sync()
79
koder aka kdanilova732a602017-02-01 20:29:56 +020080 # ----------------- SERIALIZATION / DESERIALIZATION -------------------------------------------------------------
81
82 def load_ts(self, ds: DataSource, path: str) -> TimeSeries:
83
84 with self.storage.get_fd(path, "rb") as fd:
85 header = fd.readline().decode(csv_file_encoding).strip().split(",")
86 shape, dtype, units, time_units = header
87 arr = numpy.loadtxt(fd, delimiter=',', dtype=dtype)
88
89 return TimeSeries("{}.{}".format(ds.dev, ds.sensor),
90 raw=None,
91 data=arr[:,1:].reshape(str2shape(shape)),
92 times=arr[:,0],
93 source=ds,
94 units=units,
95 time_units=time_units)
96
97 def load_sensor(self, ds: DataSource) -> TimeSeries:
98 collect_header, collected_at = self.storage.get_array(DB_paths.sensor_time.format(**ds.__dict__))
99 assert collect_header == [ds.node_id, 'collected_at', 'us'], repr(collect_header)
100
101 data_header, data = self.storage.get_array(DB_paths.sensor_data.format(**ds.__dict__))
102
103 data_units = data_header[2]
104 assert data_header == [ds.node_id, ds.metric_fqdn, data_units]
105
106 return TimeSeries(ds.metric_fqdn,
107 raw=None,
108 data=data,
109 times=collected_at,
110 source=ds,
111 units=data_units,
112 time_units='us')
113
114 # ------------- CHECK DATA IN STORAGE ----------------------------------------------------------------------------
115
116 def check_plot_file(self, source: DataSource) -> Optional[str]:
117 path = DB_paths.plot.format(**source.__dict__)
118 fpath = self.storage.resolve_raw(path)
119 return path if os.path.exists(fpath) else None
120
121 # ------------- PUT DATA INTO STORAGE --------------------------------------------------------------------------
122
123 def put_or_check_suite(self, suite: SuiteConfig) -> None:
koder aka kdanilov108ac362017-01-19 20:17:16 +0200124 path = DB_paths.suite_cfg.format(suite_id=suite.storage_id)
125 if path in self.storage:
126 db_cfg = self.storage.get(path)
127 if db_cfg != suite:
128 logger.error("Current suite %s config is not equal to found in storage at %s", suite.test_type, path)
129 raise StopTestError()
130
131 self.storage.put(suite, path)
132
koder aka kdanilova732a602017-02-01 20:29:56 +0200133 def put_job(self, suite: SuiteConfig, job: JobConfig) -> None:
koder aka kdanilov108ac362017-01-19 20:17:16 +0200134 path = DB_paths.job_cfg.format(suite_id=suite.storage_id, job_id=job.storage_id)
135 self.storage.put(job, path)
136
137 def put_ts(self, ts: TimeSeries) -> None:
koder aka kdanilova732a602017-02-01 20:29:56 +0200138 assert ts.data.dtype == ts.times.dtype
139 assert ts.data.dtype.kind == 'u'
140 assert ts.source.tag == self.ts_arr_tag
koder aka kdanilov108ac362017-01-19 20:17:16 +0200141
koder aka kdanilova732a602017-02-01 20:29:56 +0200142 csv_path = DB_paths.ts.format(**ts.source.__dict__)
143 header = [shape2str(ts.data.shape),
144 ts.data.dtype.name,
145 ts.units,
146 ts.time_units]
koder aka kdanilov108ac362017-01-19 20:17:16 +0200147
koder aka kdanilova732a602017-02-01 20:29:56 +0200148 with self.storage.get_fd(csv_path, "cb") as fd:
149 tv = ts.times.view().reshape((-1, 1))
koder aka kdanilov108ac362017-01-19 20:17:16 +0200150
koder aka kdanilova732a602017-02-01 20:29:56 +0200151 if len(ts.data.shape) == 1:
152 dv = ts.data.view().reshape((ts.times.shape[0], -1))
153 else:
154 dv = ts.data
koder aka kdanilov108ac362017-01-19 20:17:16 +0200155
koder aka kdanilova732a602017-02-01 20:29:56 +0200156 result = numpy.concatenate((tv, dv), axis=1)
157 fd.write((",".join(map(str, header)) + "\n").encode(csv_file_encoding))
158 numpy.savetxt(fd, result, delimiter=',', newline="\n", fmt="%lu")
koder aka kdanilov108ac362017-01-19 20:17:16 +0200159
160 if ts.raw:
koder aka kdanilova732a602017-02-01 20:29:56 +0200161 raw_path = DB_paths.ts.format(**ts.source(tag=ts.raw_tag).__dict__)
koder aka kdanilov108ac362017-01-19 20:17:16 +0200162 self.storage.put_raw(ts.raw, raw_path)
163
164 def put_extra(self, data: bytes, source: DataSource) -> None:
koder aka kdanilova732a602017-02-01 20:29:56 +0200165 self.storage.put(data, DB_paths.ts.format(**source.__dict__))
koder aka kdanilov108ac362017-01-19 20:17:16 +0200166
167 def put_stat(self, data: StatProps, source: DataSource) -> None:
koder aka kdanilova732a602017-02-01 20:29:56 +0200168 self.storage.put(data, DB_paths.stat.format(**source.__dict__))
koder aka kdanilov108ac362017-01-19 20:17:16 +0200169
170 # return path to file to be inserted into report
171 def put_plot_file(self, data: bytes, source: DataSource) -> str:
172 path = DB_paths.plot.format(**source.__dict__)
173 return cast(str, self.storage.put_raw(data, path))
174
koder aka kdanilov108ac362017-01-19 20:17:16 +0200175 def put_report(self, report: str, name: str) -> str:
koder aka kdanilova732a602017-02-01 20:29:56 +0200176 return self.storage.put_raw(report.encode("utf8"), DB_paths.report_root + name)
177
178 def append_sensor(self, data: numpy.array, ds: DataSource, units: str) -> None:
179 if ds.metric == 'collected_at':
180 path = DB_paths.sensor_time
181 metrics_fqn = 'collected_at'
182 else:
183 path = DB_paths.sensor_data
184 metrics_fqn = ds.metric_fqdn
185 self.storage.append([ds.node_id, metrics_fqn, units], data, path.format(**ds.__dict__))
186
187 # ------------- GET DATA FROM STORAGE --------------------------------------------------------------------------
188
189 def get_stat(self, stat_cls: Type[StatProps], source: DataSource) -> StatProps:
190 return self.storage.load(stat_cls, DB_paths.stat.format(**source.__dict__))
191
192 # ------------- ITER OVER STORAGE ------------------------------------------------------------------------------
193
194 def iter_paths(self, path_glob) -> Iterator[Tuple[bool, str, Dict[str, str]]]:
195 path = path_glob.format(**DB_rr).split("/")
196 yield from self.storage._iter_paths("", path, {})
197
198 def iter_suite(self, suite_type: str = None) -> Iterator[SuiteConfig]:
199 for is_file, suite_info_path, groups in self.iter_paths(DB_paths.suite_cfg_r):
200 assert is_file
201 suite = self.storage.load(SuiteConfig, suite_info_path)
202 # suite = cast(SuiteConfig, self.storage.load(SuiteConfig, suite_info_path))
203 assert suite.storage_id == groups['suite_id']
204 if not suite_type or suite.test_type == suite_type:
205 yield suite
206
207 def iter_job(self, suite: SuiteConfig) -> Iterator[JobConfig]:
208 job_glob = fill_path(DB_paths.job_cfg_r, suite_id=suite.storage_id)
209 job_config_cls = all_suits[suite.test_type].job_config_cls
210 for is_file, path, groups in self.iter_paths(job_glob):
211 assert is_file
212 job = cast(JobConfig, self.storage.load(job_config_cls, path))
213 assert job.storage_id == groups['job_id']
214 yield job
215
216 # iterate over test tool data
217 def iter_ts(self, suite: SuiteConfig, job: JobConfig, **filters) -> Iterator[TimeSeries]:
218 filters.update(suite_id=suite.storage_id, job_id=job.storage_id)
219 ts_glob = fill_path(DB_paths.ts_r, **filters)
220
221 for is_file, path, groups in self.iter_paths(ts_glob):
222 assert is_file
223 groups = groups.copy()
224 groups.update(filters)
225 ds = DataSource(suite_id=suite.storage_id,
226 job_id=job.storage_id,
227 node_id=groups["node_id"],
228 sensor=groups["sensor"],
229 dev=None,
230 metric=groups["metric"],
231 tag=groups["tag"])
232 yield self.load_ts(ds, path)
233
234 def iter_sensors(self, node_id: str = None, sensor: str = None, dev: str = None, metric: str = None) -> \
235 Iterator[Tuple[str, Dict[str, str]]]:
236
237 path = fill_path(DB_paths.sensor_data_r, node_id=node_id, sensor=sensor, dev=dev, metric=metric)
238 for is_file, path, groups in self.iter_paths(path):
239 assert is_file
240 yield path, groups
241
242