koder aka kdanilov | 108ac36 | 2017-01-19 20:17:16 +0200 | [diff] [blame] | 1 | import os |
koder aka kdanilov | 108ac36 | 2017-01-19 20:17:16 +0200 | [diff] [blame] | 2 | import logging |
koder aka kdanilov | a732a60 | 2017-02-01 20:29:56 +0200 | [diff] [blame^] | 3 | from typing import cast, Iterator, Tuple, Type, Dict, Optional |
koder aka kdanilov | 108ac36 | 2017-01-19 20:17:16 +0200 | [diff] [blame] | 4 | |
| 5 | import numpy |
| 6 | |
koder aka kdanilov | a732a60 | 2017-02-01 20:29:56 +0200 | [diff] [blame^] | 7 | from .suits.job import JobConfig |
| 8 | from .result_classes import SuiteConfig, TimeSeries, DataSource, StatProps, IResultStorage |
| 9 | from .storage import Storage, csv_file_encoding |
| 10 | from .utils import StopTestError, str2shape, shape2str |
koder aka kdanilov | 108ac36 | 2017-01-19 20:17:16 +0200 | [diff] [blame] | 11 | from .suits.all_suits import all_suits |
| 12 | |
| 13 | |
| 14 | logger = logging.getLogger('wally') |
| 15 | |
| 16 | |
| 17 | class DB_re: |
| 18 | node_id = r'\d+.\d+.\d+.\d+:\d+' |
koder aka kdanilov | a732a60 | 2017-02-01 20:29:56 +0200 | [diff] [blame^] | 19 | job_id = r'[-a-zA-Z0-9_]+_\d+' |
| 20 | suite_id = r'[a-z_]+_\d+' |
| 21 | sensor = r'[-a-z_]+' |
koder aka kdanilov | 108ac36 | 2017-01-19 20:17:16 +0200 | [diff] [blame] | 22 | dev = r'[-a-zA-Z0-9_]+' |
koder aka kdanilov | 108ac36 | 2017-01-19 20:17:16 +0200 | [diff] [blame] | 23 | tag = r'[a-z_.]+' |
koder aka kdanilov | a732a60 | 2017-02-01 20:29:56 +0200 | [diff] [blame^] | 24 | metric = r'[a-z_.]+' |
koder aka kdanilov | 108ac36 | 2017-01-19 20:17:16 +0200 | [diff] [blame] | 25 | |
| 26 | |
| 27 | class DB_paths: |
koder aka kdanilov | a732a60 | 2017-02-01 20:29:56 +0200 | [diff] [blame^] | 28 | suite_cfg_r = r'results/{suite_id}\.info\.yml' |
koder aka kdanilov | 108ac36 | 2017-01-19 20:17:16 +0200 | [diff] [blame] | 29 | |
koder aka kdanilov | a732a60 | 2017-02-01 20:29:56 +0200 | [diff] [blame^] | 30 | job_root = r'results/{suite_id}.{job_id}/' |
| 31 | job_cfg_r = job_root + r'info\.yml' |
| 32 | |
| 33 | # time series, data from load tool, sensor is a tool name |
| 34 | ts_r = job_root + r'{node_id}\.{sensor}\.{metric}.{tag}' |
| 35 | |
| 36 | # statistica data for ts |
| 37 | stat_r = job_root + r'{node_id}\.{sensor}\.{metric}\.stat.yaml' |
| 38 | |
| 39 | # sensor data |
| 40 | sensor_data_r = r'sensors/{node_id}_{sensor}\.{dev}\.{metric}\.csv' |
| 41 | sensor_time_r = r'sensors/{node_id}_collected_at\.csv' |
| 42 | |
| 43 | report_root = 'report/' |
| 44 | plot_r = r'report/{suite_id}\.{job_id}/{node_id}\.{sensor}\.{dev}\.{metric}\.{tag}' |
| 45 | |
koder aka kdanilov | 108ac36 | 2017-01-19 20:17:16 +0200 | [diff] [blame] | 46 | job_cfg = job_cfg_r.replace("\\.", '.') |
koder aka kdanilov | a732a60 | 2017-02-01 20:29:56 +0200 | [diff] [blame^] | 47 | suite_cfg = suite_cfg_r.replace("\\.", '.') |
koder aka kdanilov | 108ac36 | 2017-01-19 20:17:16 +0200 | [diff] [blame] | 48 | ts = ts_r.replace("\\.", '.') |
koder aka kdanilov | 108ac36 | 2017-01-19 20:17:16 +0200 | [diff] [blame] | 49 | stat = stat_r.replace("\\.", '.') |
koder aka kdanilov | a732a60 | 2017-02-01 20:29:56 +0200 | [diff] [blame^] | 50 | sensor_data = sensor_data_r.replace("\\.", '.') |
| 51 | sensor_time = sensor_time_r.replace("\\.", '.') |
koder aka kdanilov | 108ac36 | 2017-01-19 20:17:16 +0200 | [diff] [blame] | 52 | plot = plot_r.replace("\\.", '.') |
| 53 | |
koder aka kdanilov | a732a60 | 2017-02-01 20:29:56 +0200 | [diff] [blame^] | 54 | |
| 55 | DB_rr = {name: r"(?P<{}>{})".format(name, rr) |
| 56 | for name, rr in DB_re.__dict__.items() |
| 57 | if not name.startswith("__")} |
koder aka kdanilov | 108ac36 | 2017-01-19 20:17:16 +0200 | [diff] [blame] | 58 | |
| 59 | |
koder aka kdanilov | a732a60 | 2017-02-01 20:29:56 +0200 | [diff] [blame^] | 60 | def fill_path(path: str, **params) -> str: |
| 61 | for name, val in params.items(): |
| 62 | if val is not None: |
| 63 | path = path.replace("{" + name + "}", val) |
| 64 | return path |
koder aka kdanilov | 108ac36 | 2017-01-19 20:17:16 +0200 | [diff] [blame] | 65 | |
| 66 | |
| 67 | class ResultStorage(IResultStorage): |
| 68 | # TODO: check that all path components match required patterns |
| 69 | |
koder aka kdanilov | a732a60 | 2017-02-01 20:29:56 +0200 | [diff] [blame^] | 70 | ts_header_size = 64 |
koder aka kdanilov | 108ac36 | 2017-01-19 20:17:16 +0200 | [diff] [blame] | 71 | ts_header_format = "!IIIcc" |
koder aka kdanilov | a732a60 | 2017-02-01 20:29:56 +0200 | [diff] [blame^] | 72 | ts_arr_tag = 'csv' |
koder aka kdanilov | 108ac36 | 2017-01-19 20:17:16 +0200 | [diff] [blame] | 73 | |
| 74 | def __init__(self, storage: Storage) -> None: |
| 75 | self.storage = storage |
| 76 | |
| 77 | def sync(self) -> None: |
| 78 | self.storage.sync() |
| 79 | |
koder aka kdanilov | a732a60 | 2017-02-01 20:29:56 +0200 | [diff] [blame^] | 80 | # ----------------- SERIALIZATION / DESERIALIZATION ------------------------------------------------------------- |
| 81 | |
| 82 | def load_ts(self, ds: DataSource, path: str) -> TimeSeries: |
| 83 | |
| 84 | with self.storage.get_fd(path, "rb") as fd: |
| 85 | header = fd.readline().decode(csv_file_encoding).strip().split(",") |
| 86 | shape, dtype, units, time_units = header |
| 87 | arr = numpy.loadtxt(fd, delimiter=',', dtype=dtype) |
| 88 | |
| 89 | return TimeSeries("{}.{}".format(ds.dev, ds.sensor), |
| 90 | raw=None, |
| 91 | data=arr[:,1:].reshape(str2shape(shape)), |
| 92 | times=arr[:,0], |
| 93 | source=ds, |
| 94 | units=units, |
| 95 | time_units=time_units) |
| 96 | |
| 97 | def load_sensor(self, ds: DataSource) -> TimeSeries: |
| 98 | collect_header, collected_at = self.storage.get_array(DB_paths.sensor_time.format(**ds.__dict__)) |
| 99 | assert collect_header == [ds.node_id, 'collected_at', 'us'], repr(collect_header) |
| 100 | |
| 101 | data_header, data = self.storage.get_array(DB_paths.sensor_data.format(**ds.__dict__)) |
| 102 | |
| 103 | data_units = data_header[2] |
| 104 | assert data_header == [ds.node_id, ds.metric_fqdn, data_units] |
| 105 | |
| 106 | return TimeSeries(ds.metric_fqdn, |
| 107 | raw=None, |
| 108 | data=data, |
| 109 | times=collected_at, |
| 110 | source=ds, |
| 111 | units=data_units, |
| 112 | time_units='us') |
| 113 | |
| 114 | # ------------- CHECK DATA IN STORAGE ---------------------------------------------------------------------------- |
| 115 | |
| 116 | def check_plot_file(self, source: DataSource) -> Optional[str]: |
| 117 | path = DB_paths.plot.format(**source.__dict__) |
| 118 | fpath = self.storage.resolve_raw(path) |
| 119 | return path if os.path.exists(fpath) else None |
| 120 | |
| 121 | # ------------- PUT DATA INTO STORAGE -------------------------------------------------------------------------- |
| 122 | |
| 123 | def put_or_check_suite(self, suite: SuiteConfig) -> None: |
koder aka kdanilov | 108ac36 | 2017-01-19 20:17:16 +0200 | [diff] [blame] | 124 | path = DB_paths.suite_cfg.format(suite_id=suite.storage_id) |
| 125 | if path in self.storage: |
| 126 | db_cfg = self.storage.get(path) |
| 127 | if db_cfg != suite: |
| 128 | logger.error("Current suite %s config is not equal to found in storage at %s", suite.test_type, path) |
| 129 | raise StopTestError() |
| 130 | |
| 131 | self.storage.put(suite, path) |
| 132 | |
koder aka kdanilov | a732a60 | 2017-02-01 20:29:56 +0200 | [diff] [blame^] | 133 | def put_job(self, suite: SuiteConfig, job: JobConfig) -> None: |
koder aka kdanilov | 108ac36 | 2017-01-19 20:17:16 +0200 | [diff] [blame] | 134 | path = DB_paths.job_cfg.format(suite_id=suite.storage_id, job_id=job.storage_id) |
| 135 | self.storage.put(job, path) |
| 136 | |
| 137 | def put_ts(self, ts: TimeSeries) -> None: |
koder aka kdanilov | a732a60 | 2017-02-01 20:29:56 +0200 | [diff] [blame^] | 138 | assert ts.data.dtype == ts.times.dtype |
| 139 | assert ts.data.dtype.kind == 'u' |
| 140 | assert ts.source.tag == self.ts_arr_tag |
koder aka kdanilov | 108ac36 | 2017-01-19 20:17:16 +0200 | [diff] [blame] | 141 | |
koder aka kdanilov | a732a60 | 2017-02-01 20:29:56 +0200 | [diff] [blame^] | 142 | csv_path = DB_paths.ts.format(**ts.source.__dict__) |
| 143 | header = [shape2str(ts.data.shape), |
| 144 | ts.data.dtype.name, |
| 145 | ts.units, |
| 146 | ts.time_units] |
koder aka kdanilov | 108ac36 | 2017-01-19 20:17:16 +0200 | [diff] [blame] | 147 | |
koder aka kdanilov | a732a60 | 2017-02-01 20:29:56 +0200 | [diff] [blame^] | 148 | with self.storage.get_fd(csv_path, "cb") as fd: |
| 149 | tv = ts.times.view().reshape((-1, 1)) |
koder aka kdanilov | 108ac36 | 2017-01-19 20:17:16 +0200 | [diff] [blame] | 150 | |
koder aka kdanilov | a732a60 | 2017-02-01 20:29:56 +0200 | [diff] [blame^] | 151 | if len(ts.data.shape) == 1: |
| 152 | dv = ts.data.view().reshape((ts.times.shape[0], -1)) |
| 153 | else: |
| 154 | dv = ts.data |
koder aka kdanilov | 108ac36 | 2017-01-19 20:17:16 +0200 | [diff] [blame] | 155 | |
koder aka kdanilov | a732a60 | 2017-02-01 20:29:56 +0200 | [diff] [blame^] | 156 | result = numpy.concatenate((tv, dv), axis=1) |
| 157 | fd.write((",".join(map(str, header)) + "\n").encode(csv_file_encoding)) |
| 158 | numpy.savetxt(fd, result, delimiter=',', newline="\n", fmt="%lu") |
koder aka kdanilov | 108ac36 | 2017-01-19 20:17:16 +0200 | [diff] [blame] | 159 | |
| 160 | if ts.raw: |
koder aka kdanilov | a732a60 | 2017-02-01 20:29:56 +0200 | [diff] [blame^] | 161 | raw_path = DB_paths.ts.format(**ts.source(tag=ts.raw_tag).__dict__) |
koder aka kdanilov | 108ac36 | 2017-01-19 20:17:16 +0200 | [diff] [blame] | 162 | self.storage.put_raw(ts.raw, raw_path) |
| 163 | |
| 164 | def put_extra(self, data: bytes, source: DataSource) -> None: |
koder aka kdanilov | a732a60 | 2017-02-01 20:29:56 +0200 | [diff] [blame^] | 165 | self.storage.put(data, DB_paths.ts.format(**source.__dict__)) |
koder aka kdanilov | 108ac36 | 2017-01-19 20:17:16 +0200 | [diff] [blame] | 166 | |
| 167 | def put_stat(self, data: StatProps, source: DataSource) -> None: |
koder aka kdanilov | a732a60 | 2017-02-01 20:29:56 +0200 | [diff] [blame^] | 168 | self.storage.put(data, DB_paths.stat.format(**source.__dict__)) |
koder aka kdanilov | 108ac36 | 2017-01-19 20:17:16 +0200 | [diff] [blame] | 169 | |
| 170 | # return path to file to be inserted into report |
| 171 | def put_plot_file(self, data: bytes, source: DataSource) -> str: |
| 172 | path = DB_paths.plot.format(**source.__dict__) |
| 173 | return cast(str, self.storage.put_raw(data, path)) |
| 174 | |
koder aka kdanilov | 108ac36 | 2017-01-19 20:17:16 +0200 | [diff] [blame] | 175 | def put_report(self, report: str, name: str) -> str: |
koder aka kdanilov | a732a60 | 2017-02-01 20:29:56 +0200 | [diff] [blame^] | 176 | return self.storage.put_raw(report.encode("utf8"), DB_paths.report_root + name) |
| 177 | |
| 178 | def append_sensor(self, data: numpy.array, ds: DataSource, units: str) -> None: |
| 179 | if ds.metric == 'collected_at': |
| 180 | path = DB_paths.sensor_time |
| 181 | metrics_fqn = 'collected_at' |
| 182 | else: |
| 183 | path = DB_paths.sensor_data |
| 184 | metrics_fqn = ds.metric_fqdn |
| 185 | self.storage.append([ds.node_id, metrics_fqn, units], data, path.format(**ds.__dict__)) |
| 186 | |
| 187 | # ------------- GET DATA FROM STORAGE -------------------------------------------------------------------------- |
| 188 | |
| 189 | def get_stat(self, stat_cls: Type[StatProps], source: DataSource) -> StatProps: |
| 190 | return self.storage.load(stat_cls, DB_paths.stat.format(**source.__dict__)) |
| 191 | |
| 192 | # ------------- ITER OVER STORAGE ------------------------------------------------------------------------------ |
| 193 | |
| 194 | def iter_paths(self, path_glob) -> Iterator[Tuple[bool, str, Dict[str, str]]]: |
| 195 | path = path_glob.format(**DB_rr).split("/") |
| 196 | yield from self.storage._iter_paths("", path, {}) |
| 197 | |
| 198 | def iter_suite(self, suite_type: str = None) -> Iterator[SuiteConfig]: |
| 199 | for is_file, suite_info_path, groups in self.iter_paths(DB_paths.suite_cfg_r): |
| 200 | assert is_file |
| 201 | suite = self.storage.load(SuiteConfig, suite_info_path) |
| 202 | # suite = cast(SuiteConfig, self.storage.load(SuiteConfig, suite_info_path)) |
| 203 | assert suite.storage_id == groups['suite_id'] |
| 204 | if not suite_type or suite.test_type == suite_type: |
| 205 | yield suite |
| 206 | |
| 207 | def iter_job(self, suite: SuiteConfig) -> Iterator[JobConfig]: |
| 208 | job_glob = fill_path(DB_paths.job_cfg_r, suite_id=suite.storage_id) |
| 209 | job_config_cls = all_suits[suite.test_type].job_config_cls |
| 210 | for is_file, path, groups in self.iter_paths(job_glob): |
| 211 | assert is_file |
| 212 | job = cast(JobConfig, self.storage.load(job_config_cls, path)) |
| 213 | assert job.storage_id == groups['job_id'] |
| 214 | yield job |
| 215 | |
| 216 | # iterate over test tool data |
| 217 | def iter_ts(self, suite: SuiteConfig, job: JobConfig, **filters) -> Iterator[TimeSeries]: |
| 218 | filters.update(suite_id=suite.storage_id, job_id=job.storage_id) |
| 219 | ts_glob = fill_path(DB_paths.ts_r, **filters) |
| 220 | |
| 221 | for is_file, path, groups in self.iter_paths(ts_glob): |
| 222 | assert is_file |
| 223 | groups = groups.copy() |
| 224 | groups.update(filters) |
| 225 | ds = DataSource(suite_id=suite.storage_id, |
| 226 | job_id=job.storage_id, |
| 227 | node_id=groups["node_id"], |
| 228 | sensor=groups["sensor"], |
| 229 | dev=None, |
| 230 | metric=groups["metric"], |
| 231 | tag=groups["tag"]) |
| 232 | yield self.load_ts(ds, path) |
| 233 | |
| 234 | def iter_sensors(self, node_id: str = None, sensor: str = None, dev: str = None, metric: str = None) -> \ |
| 235 | Iterator[Tuple[str, Dict[str, str]]]: |
| 236 | |
| 237 | path = fill_path(DB_paths.sensor_data_r, node_id=node_id, sensor=sensor, dev=dev, metric=metric) |
| 238 | for is_file, path, groups in self.iter_paths(path): |
| 239 | assert is_file |
| 240 | yield path, groups |
| 241 | |
| 242 | |