koder aka kdanilov | 108ac36 | 2017-01-19 20:17:16 +0200 | [diff] [blame] | 1 | import re |
| 2 | import os |
| 3 | import array |
| 4 | import struct |
| 5 | import logging |
| 6 | from typing import cast, Iterator, Tuple, Type, Dict, Set, List, Optional |
| 7 | |
| 8 | import numpy |
| 9 | |
| 10 | from .result_classes import (TestSuiteConfig, TestJobConfig, TimeSeries, DataSource, |
| 11 | StatProps, IResultStorage) |
| 12 | from .storage import Storage |
| 13 | from .utils import StopTestError |
| 14 | from .suits.all_suits import all_suits |
| 15 | |
| 16 | |
| 17 | logger = logging.getLogger('wally') |
| 18 | |
| 19 | |
| 20 | class DB_re: |
| 21 | node_id = r'\d+.\d+.\d+.\d+:\d+' |
| 22 | job_id = r'[-a-zA-Z0-9]+_\d+' |
| 23 | sensor = r'[a-z_]+' |
| 24 | dev = r'[-a-zA-Z0-9_]+' |
| 25 | suite_id = r'[a-z]+_\d+' |
| 26 | tag = r'[a-z_.]+' |
| 27 | |
| 28 | |
| 29 | class DB_paths: |
| 30 | suite_cfg_r = r'results/{suite_id}_info\.yml' |
| 31 | suite_cfg = suite_cfg_r.replace("\\.", '.') |
| 32 | |
| 33 | job_cfg_r = r'results/{suite_id}\.{job_id}/info\.yml' |
| 34 | job_cfg = job_cfg_r.replace("\\.", '.') |
| 35 | |
| 36 | job_extra_r = r'results/{suite_id}\.{job_id}/{node_id}/{dev}\.{sensor}\.{tag}' |
| 37 | job_extra = job_extra_r.replace("\\.", '.') |
| 38 | |
| 39 | ts_r = r'results/{suite_id}\.{job_id}/{node_id}/{dev}\.{sensor}\.{tag}' |
| 40 | ts = ts_r.replace("\\.", '.') |
| 41 | |
| 42 | stat_r = r'results/{suite_id}\.{job_id}/{node_id}/{dev}\.{sensor}\.{tag}' |
| 43 | stat = stat_r.replace("\\.", '.') |
| 44 | |
| 45 | plot_r = r'report/{suite_id}\.{job_id}/{node_id}/{dev}\.{sensor}\.{tag}' |
| 46 | plot = plot_r.replace("\\.", '.') |
| 47 | |
| 48 | report = r'report/' |
| 49 | |
| 50 | |
| 51 | DB_rr = {name: r"(?P<{}>{})".format(name, rr) for name, rr in DB_re.__dict__.items() if not name.startswith("__")} |
| 52 | |
| 53 | |
| 54 | class ResultStorage(IResultStorage): |
| 55 | # TODO: check that all path components match required patterns |
| 56 | |
| 57 | ts_header_format = "!IIIcc" |
| 58 | ts_arr_tag = 'bin' |
| 59 | ts_raw_tag = 'txt' |
| 60 | |
| 61 | def __init__(self, storage: Storage) -> None: |
| 62 | self.storage = storage |
| 63 | |
| 64 | def sync(self) -> None: |
| 65 | self.storage.sync() |
| 66 | |
| 67 | def put_or_check_suite(self, suite: TestSuiteConfig) -> None: |
| 68 | path = DB_paths.suite_cfg.format(suite_id=suite.storage_id) |
| 69 | if path in self.storage: |
| 70 | db_cfg = self.storage.get(path) |
| 71 | if db_cfg != suite: |
| 72 | logger.error("Current suite %s config is not equal to found in storage at %s", suite.test_type, path) |
| 73 | raise StopTestError() |
| 74 | |
| 75 | self.storage.put(suite, path) |
| 76 | |
| 77 | def put_job(self, suite: TestSuiteConfig, job: TestJobConfig) -> None: |
| 78 | path = DB_paths.job_cfg.format(suite_id=suite.storage_id, job_id=job.storage_id) |
| 79 | self.storage.put(job, path) |
| 80 | |
| 81 | def put_ts(self, ts: TimeSeries) -> None: |
| 82 | data = cast(List[int], ts.data) |
| 83 | times = cast(List[int], ts.times) |
| 84 | |
| 85 | if len(data) % ts.second_axis_size != 0: |
| 86 | logger.error("Time series data size(%s) is not propotional to second_axis_size(%s).", |
| 87 | len(data), ts.second_axis_size) |
| 88 | raise StopTestError() |
| 89 | |
| 90 | if len(data) // ts.second_axis_size != len(times): |
| 91 | logger.error("Unbalanced data and time srray sizes. %s", ts) |
| 92 | raise StopTestError() |
| 93 | |
| 94 | bin_path = DB_paths.ts.format(**ts.source(tag=self.ts_arr_tag).__dict__) |
| 95 | |
| 96 | with self.storage.get_fd(bin_path, "cb") as fd: |
| 97 | header = struct.pack(self.ts_header_format, |
| 98 | ts.second_axis_size, |
| 99 | len(data), |
| 100 | len(times), |
| 101 | ts.data.typecode.encode("ascii"), |
| 102 | ts.times.typecode.encode("ascii")) |
| 103 | fd.write(header) |
| 104 | ts.data.tofile(fd) # type: ignore |
| 105 | ts.times.tofile(fd) # type: ignore |
| 106 | |
| 107 | if ts.raw: |
| 108 | raw_path = DB_paths.ts.format(**ts.source(tag=self.ts_raw_tag).__dict__) |
| 109 | self.storage.put_raw(ts.raw, raw_path) |
| 110 | |
| 111 | def put_extra(self, data: bytes, source: DataSource) -> None: |
| 112 | path = DB_paths.job_cfg.format(**source.__dict__) |
| 113 | self.storage.put_raw(data, path) |
| 114 | |
| 115 | def put_stat(self, data: StatProps, source: DataSource) -> None: |
| 116 | path = DB_paths.stat.format(**source.__dict__) |
| 117 | self.storage.put(data, path) |
| 118 | |
| 119 | def get_stat(self, stat_cls: Type[StatProps], source: DataSource) -> StatProps: |
| 120 | path = DB_paths.stat.format(**source.__dict__) |
| 121 | return self.storage.load(stat_cls, path) |
| 122 | |
| 123 | def iter_paths(self, path_glob) -> Iterator[Tuple[bool, str, Dict[str, str]]]: |
| 124 | path = path_glob.format(**DB_rr).split("/") |
| 125 | yield from self.storage._iter_paths("", path, {}) |
| 126 | |
| 127 | def iter_suite(self, suite_type: str = None) -> Iterator[TestSuiteConfig]: |
| 128 | for is_file, suite_info_path, groups in self.iter_paths(DB_paths.suite_cfg_r): |
| 129 | assert is_file |
| 130 | suite = cast(TestSuiteConfig, self.storage.load(TestSuiteConfig, suite_info_path)) |
| 131 | assert suite.storage_id == groups['suite_id'] |
| 132 | if not suite_type or suite.test_type == suite_type: |
| 133 | yield suite |
| 134 | |
| 135 | def iter_job(self, suite: TestSuiteConfig) -> Iterator[TestJobConfig]: |
| 136 | job_glob = DB_paths.job_cfg_r.replace('{suite_id}', suite.storage_id) |
| 137 | job_config_cls = all_suits[suite.test_type].job_config_cls |
| 138 | |
| 139 | for is_file, path, groups in self.iter_paths(job_glob): |
| 140 | assert is_file |
| 141 | job = cast(TestJobConfig, self.storage.load(job_config_cls, path)) |
| 142 | assert job.storage_id == groups['job_id'] |
| 143 | yield job |
| 144 | |
| 145 | def iter_datasource(self, suite: TestSuiteConfig, job: TestJobConfig) -> Iterator[Tuple[DataSource, Dict[str, str]]]: |
| 146 | ts_glob = DB_paths.ts_r.replace('{suite_id}', suite.storage_id).replace('{job_id}', job.storage_id) |
| 147 | ts_found = {} # type: Dict[Tuple[str, str, str], Dict[str, str]] |
| 148 | |
| 149 | for is_file, path, groups in self.iter_paths(ts_glob): |
| 150 | assert is_file |
| 151 | key = (groups['node_id'], groups['dev'], groups['sensor']) |
| 152 | ts_found.setdefault(key, {})[groups['tag']] = path |
| 153 | |
| 154 | for (node_id, dev, sensor), tag2path in ts_found.items(): |
| 155 | if self.ts_arr_tag in tag2path: |
| 156 | yield DataSource(suite_id=suite.storage_id, |
| 157 | job_id=job.storage_id, |
| 158 | node_id=node_id, |
| 159 | dev=dev, sensor=sensor, tag=None), tag2path |
| 160 | |
| 161 | def load_ts(self, ds: DataSource, path: str) -> TimeSeries: |
| 162 | with self.storage.get_fd(path, "rb") as fd: |
| 163 | header = fd.read(struct.calcsize(self.ts_header_format)) |
| 164 | second_axis_size, data_sz, time_sz, data_typecode, time_typecode = \ |
| 165 | struct.unpack(self.ts_header_format, header) |
| 166 | |
| 167 | data = array.array(data_typecode.decode("ascii")) |
| 168 | times = array.array(time_typecode.decode("ascii")) |
| 169 | |
| 170 | data.fromfile(fd, data_sz) # type: ignore |
| 171 | times.fromfile(fd, time_sz) # type: ignore |
| 172 | |
| 173 | return TimeSeries("{}.{}".format(ds.dev, ds.sensor), |
| 174 | raw=None, |
| 175 | data=numpy.array(data, dtype=numpy.dtype('float32')), |
| 176 | times=numpy.array(times), |
| 177 | second_axis_size=second_axis_size, |
| 178 | source=ds) |
| 179 | |
| 180 | def iter_ts(self, suite: TestSuiteConfig, job: TestJobConfig, **filters) -> Iterator[TimeSeries]: |
| 181 | for ds, tag2path in self.iter_datasource(suite, job): |
| 182 | for name, val in filters.items(): |
| 183 | if val != getattr(ds, name): |
| 184 | break |
| 185 | else: |
| 186 | ts = self.load_ts(ds, tag2path[self.ts_arr_tag]) |
| 187 | if self.ts_raw_tag in tag2path: |
| 188 | ts.raw = self.storage.get_raw(tag2path[self.ts_raw_tag]) |
| 189 | |
| 190 | yield ts |
| 191 | |
| 192 | # return path to file to be inserted into report |
| 193 | def put_plot_file(self, data: bytes, source: DataSource) -> str: |
| 194 | path = DB_paths.plot.format(**source.__dict__) |
| 195 | return cast(str, self.storage.put_raw(data, path)) |
| 196 | |
| 197 | def check_plot_file(self, source: DataSource) -> Optional[str]: |
| 198 | path = DB_paths.plot.format(**source.__dict__) |
| 199 | fpath = self.storage.resolve_raw(path) |
| 200 | if os.path.exists(fpath): |
| 201 | return fpath |
| 202 | return None |
| 203 | |
| 204 | def put_report(self, report: str, name: str) -> str: |
| 205 | return self.storage.put_raw(report.encode("utf8"), DB_paths.report + name) |