kdanylov aka koder | b083333 | 2017-05-13 20:39:17 +0300 | [diff] [blame] | 1 | import os |
kdanylov aka koder | 026e5f2 | 2017-05-15 01:04:39 +0300 | [diff] [blame] | 2 | import json |
kdanylov aka koder | b083333 | 2017-05-13 20:39:17 +0300 | [diff] [blame] | 3 | import pprint |
| 4 | import logging |
kdanylov aka koder | 026e5f2 | 2017-05-15 01:04:39 +0300 | [diff] [blame] | 5 | from typing import cast, Iterator, Tuple, Type, Optional, Any, Union, List |
kdanylov aka koder | b083333 | 2017-05-13 20:39:17 +0300 | [diff] [blame] | 6 | |
| 7 | import numpy |
| 8 | |
| 9 | from cephlib.wally_storage import WallyDB |
kdanylov aka koder | 84de1e4 | 2017-05-22 14:00:07 +0300 | [diff] [blame] | 10 | from cephlib.sensor_storage import SensorStorage |
kdanylov aka koder | b083333 | 2017-05-13 20:39:17 +0300 | [diff] [blame] | 11 | from cephlib.statistic import StatProps |
| 12 | from cephlib.numeric_types import DataSource, TimeSeries |
kdanylov aka koder | 026e5f2 | 2017-05-15 01:04:39 +0300 | [diff] [blame] | 13 | from cephlib.node import NodeInfo |
kdanylov aka koder | b083333 | 2017-05-13 20:39:17 +0300 | [diff] [blame] | 14 | |
| 15 | from .suits.job import JobConfig |
kdanylov aka koder | 026e5f2 | 2017-05-15 01:04:39 +0300 | [diff] [blame] | 16 | from .result_classes import SuiteConfig, IWallyStorage |
kdanylov aka koder | b083333 | 2017-05-13 20:39:17 +0300 | [diff] [blame] | 17 | from .utils import StopTestError |
| 18 | from .suits.all_suits import all_suits |
| 19 | |
| 20 | from cephlib.storage import Storage |
| 21 | |
| 22 | logger = logging.getLogger('wally') |
| 23 | |
| 24 | |
| 25 | def fill_path(path: str, **params) -> str: |
| 26 | for name, val in params.items(): |
| 27 | if val is not None: |
| 28 | path = path.replace("{" + name + "}", val) |
| 29 | return path |
| 30 | |
| 31 | |
kdanylov aka koder | 84de1e4 | 2017-05-22 14:00:07 +0300 | [diff] [blame] | 32 | class WallyStorage(IWallyStorage, SensorStorage): |
kdanylov aka koder | b083333 | 2017-05-13 20:39:17 +0300 | [diff] [blame] | 33 | def __init__(self, storage: Storage) -> None: |
kdanylov aka koder | 84de1e4 | 2017-05-22 14:00:07 +0300 | [diff] [blame] | 34 | SensorStorage.__init__(self, storage, WallyDB) |
| 35 | |
| 36 | def flush(self) -> None: |
| 37 | self.storage.flush() |
kdanylov aka koder | b083333 | 2017-05-13 20:39:17 +0300 | [diff] [blame] | 38 | |
| 39 | # ------------- CHECK DATA IN STORAGE ---------------------------------------------------------------------------- |
| 40 | def check_plot_file(self, source: DataSource) -> Optional[str]: |
| 41 | path = self.db_paths.plot.format(**source.__dict__) |
| 42 | fpath = self.storage.get_fname(self.db_paths.report_root + path) |
| 43 | return path if os.path.exists(fpath) else None |
| 44 | |
| 45 | # ------------- PUT DATA INTO STORAGE -------------------------------------------------------------------------- |
| 46 | def put_or_check_suite(self, suite: SuiteConfig) -> None: |
| 47 | path = self.db_paths.suite_cfg.format(suite_id=suite.storage_id) |
| 48 | if path in self.storage: |
| 49 | db_cfg = self.storage.load(SuiteConfig, path) |
| 50 | if db_cfg != suite: |
| 51 | logger.error("Current suite %s config is not equal to found in storage at %s", suite.test_type, path) |
| 52 | logger.debug("Current: \n%s\nStorage:\n%s", pprint.pformat(db_cfg), pprint.pformat(suite)) |
| 53 | raise StopTestError() |
| 54 | else: |
| 55 | self.storage.put(suite, path) |
| 56 | |
| 57 | def put_job(self, suite: SuiteConfig, job: JobConfig) -> None: |
| 58 | path = self.db_paths.job_cfg.format(suite_id=suite.storage_id, job_id=job.storage_id) |
| 59 | self.storage.put(job, path) |
| 60 | |
| 61 | def put_extra(self, data: bytes, source: DataSource) -> None: |
| 62 | self.storage.put_raw(data, self.db_paths.ts.format(**source.__dict__)) |
| 63 | |
| 64 | def put_stat(self, data: StatProps, source: DataSource) -> None: |
| 65 | self.storage.put(data, self.db_paths.stat.format(**source.__dict__)) |
| 66 | |
| 67 | # return path to file to be inserted into report |
| 68 | def put_plot_file(self, data: bytes, source: DataSource) -> str: |
| 69 | path = self.db_paths.plot.format(**source.__dict__) |
| 70 | self.storage.put_raw(data, self.db_paths.report_root + path) |
| 71 | return path |
| 72 | |
| 73 | def put_report(self, report: str, name: str) -> str: |
| 74 | return self.storage.put_raw(report.encode(self.csv_file_encoding), self.db_paths.report_root + name) |
| 75 | |
| 76 | def put_txt_report(self, suite: SuiteConfig, report: str) -> None: |
| 77 | path = self.db_paths.txt_report.format(suite_id=suite.storage_id) |
| 78 | self.storage.put_raw(report.encode('utf8'), path) |
| 79 | |
| 80 | def put_job_info(self, suite: SuiteConfig, job: JobConfig, key: str, data: Any) -> None: |
kdanylov aka koder | 84de1e4 | 2017-05-22 14:00:07 +0300 | [diff] [blame] | 81 | path = self.db_paths.job_extra.format(suite_id=suite.storage_id, job_id=job.storage_id, name=key) |
kdanylov aka koder | 026e5f2 | 2017-05-15 01:04:39 +0300 | [diff] [blame] | 82 | if isinstance(data, bytes): |
| 83 | self.storage.put_raw(data, path) |
| 84 | else: |
| 85 | self.storage.put(data, path) |
kdanylov aka koder | b083333 | 2017-05-13 20:39:17 +0300 | [diff] [blame] | 86 | |
| 87 | # ------------- GET DATA FROM STORAGE -------------------------------------------------------------------------- |
| 88 | |
| 89 | def get_stat(self, stat_cls: Type[StatProps], source: DataSource) -> StatProps: |
| 90 | return self.storage.load(stat_cls, self.db_paths.stat.format(**source.__dict__)) |
| 91 | |
kdanylov aka koder | b083333 | 2017-05-13 20:39:17 +0300 | [diff] [blame] | 92 | def get_txt_report(self, suite: SuiteConfig) -> Optional[str]: |
| 93 | path = self.db_paths.txt_report.format(suite_id=suite.storage_id) |
| 94 | if path in self.storage: |
| 95 | return self.storage.get_raw(path).decode('utf8') |
kdanylov aka koder | 026e5f2 | 2017-05-15 01:04:39 +0300 | [diff] [blame] | 96 | return None |
kdanylov aka koder | b083333 | 2017-05-13 20:39:17 +0300 | [diff] [blame] | 97 | |
| 98 | def get_job_info(self, suite: SuiteConfig, job: JobConfig, key: str) -> Any: |
kdanylov aka koder | 84de1e4 | 2017-05-22 14:00:07 +0300 | [diff] [blame] | 99 | path = self.db_paths.job_extra.format(suite_id=suite.storage_id, job_id=job.storage_id, name=key) |
kdanylov aka koder | b083333 | 2017-05-13 20:39:17 +0300 | [diff] [blame] | 100 | return self.storage.get(path, None) |
| 101 | # ------------- ITER OVER STORAGE ------------------------------------------------------------------------------ |
| 102 | |
| 103 | def iter_suite(self, suite_type: str = None) -> Iterator[SuiteConfig]: |
| 104 | for is_file, suite_info_path, groups in self.iter_paths(self.db_paths.suite_cfg_r): |
| 105 | assert is_file |
| 106 | suite = self.storage.load(SuiteConfig, suite_info_path) |
kdanylov aka koder | b083333 | 2017-05-13 20:39:17 +0300 | [diff] [blame] | 107 | assert suite.storage_id == groups['suite_id'] |
| 108 | if not suite_type or suite.test_type == suite_type: |
| 109 | yield suite |
| 110 | |
| 111 | def iter_job(self, suite: SuiteConfig) -> Iterator[JobConfig]: |
| 112 | job_glob = fill_path(self.db_paths.job_cfg_r, suite_id=suite.storage_id) |
| 113 | job_config_cls = all_suits[suite.test_type].job_config_cls |
| 114 | for is_file, path, groups in self.iter_paths(job_glob): |
| 115 | assert is_file |
| 116 | job = cast(JobConfig, self.storage.load(job_config_cls, path)) |
| 117 | assert job.storage_id == groups['job_id'] |
| 118 | yield job |
| 119 | |
kdanylov aka koder | 026e5f2 | 2017-05-15 01:04:39 +0300 | [diff] [blame] | 120 | def load_nodes(self) -> List[NodeInfo]: |
kdanylov aka koder | 84de1e4 | 2017-05-22 14:00:07 +0300 | [diff] [blame] | 121 | try: |
| 122 | return self.storage.other_caches['wally']['nodes'] |
| 123 | except KeyError: |
| 124 | nodes = self.storage.load_list(NodeInfo, WallyDB.all_nodes) |
kdanylov aka koder | 026e5f2 | 2017-05-15 01:04:39 +0300 | [diff] [blame] | 125 | if WallyDB.nodes_params in self.storage: |
| 126 | params = json.loads(self.storage.get_raw(WallyDB.nodes_params).decode('utf8')) |
kdanylov aka koder | 84de1e4 | 2017-05-22 14:00:07 +0300 | [diff] [blame] | 127 | for node in nodes: |
kdanylov aka koder | 026e5f2 | 2017-05-15 01:04:39 +0300 | [diff] [blame] | 128 | node.params = params.get(node.node_id, {}) |
kdanylov aka koder | 84de1e4 | 2017-05-22 14:00:07 +0300 | [diff] [blame] | 129 | self.storage.other_caches['wally']['nodes'] = nodes |
| 130 | return nodes |
kdanylov aka koder | 026e5f2 | 2017-05-15 01:04:39 +0300 | [diff] [blame] | 131 | |
kdanylov aka koder | b083333 | 2017-05-13 20:39:17 +0300 | [diff] [blame] | 132 | # ----------------- TS ------------------------------------------------------------------------------------------ |
| 133 | def get_ts(self, ds: DataSource) -> TimeSeries: |
| 134 | path = self.db_paths.ts.format_map(ds.__dict__) |
| 135 | (units, time_units), header2, content = self.storage.get_array(path) |
kdanylov aka koder | b083333 | 2017-05-13 20:39:17 +0300 | [diff] [blame] | 136 | times = content[:,0].copy() |
| 137 | data = content[:,1:] |
| 138 | |
| 139 | if data.shape[1] == 1: |
| 140 | data.shape = (data.shape[0],) |
| 141 | |
| 142 | return TimeSeries(data=data, times=times, source=ds, units=units, time_units=time_units, histo_bins=header2) |
| 143 | |
| 144 | def put_ts(self, ts: TimeSeries) -> None: |
| 145 | assert ts.data.dtype == ts.times.dtype, "Data type {!r} != time type {!r}".format(ts.data.dtype, ts.times.dtype) |
| 146 | assert ts.data.dtype.kind == 'u', "Only unsigned ints are accepted" |
| 147 | assert ts.source.tag == self.ts_arr_tag, \ |
| 148 | "Incorrect source tag == {!r}, must be {!r}".format(ts.source.tag, self.ts_arr_tag) |
| 149 | |
| 150 | if ts.source.metric == 'lat': |
| 151 | assert len(ts.data.shape) == 2, "Latency should be 2d array" |
| 152 | assert ts.histo_bins is not None, "Latency should have histo_bins field not empty" |
| 153 | |
| 154 | csv_path = self.db_paths.ts.format_map(ts.source.__dict__) |
| 155 | header = [ts.units, ts.time_units] |
| 156 | |
| 157 | tv = ts.times.view().reshape((-1, 1)) |
| 158 | |
| 159 | if len(ts.data.shape) == 1: |
| 160 | dv = ts.data.view().reshape((ts.times.shape[0], -1)) |
| 161 | else: |
| 162 | dv = ts.data |
| 163 | |
| 164 | result = numpy.concatenate((tv, dv), axis=1) |
| 165 | self.storage.put_array(csv_path, result, header, header2=ts.histo_bins, append_on_exists=False) |
| 166 | |
kdanylov aka koder | 84de1e4 | 2017-05-22 14:00:07 +0300 | [diff] [blame] | 167 | def iter_ts(self, **ds_parts: str) -> Iterator[DataSource]: |
| 168 | return self.iter_objs(self.db_paths.ts_r, **ds_parts) |