blob: de2f86d2162b468d7af8d51b6cfe83a4fa273749 [file] [log] [blame]
kdanylov aka koderb0833332017-05-13 20:39:17 +03001import os
kdanylov aka koder026e5f22017-05-15 01:04:39 +03002import json
kdanylov aka koderb0833332017-05-13 20:39:17 +03003import pprint
4import logging
kdanylov aka koder026e5f22017-05-15 01:04:39 +03005from typing import cast, Iterator, Tuple, Type, Optional, Any, Union, List
kdanylov aka koderb0833332017-05-13 20:39:17 +03006
7import numpy
8
9from cephlib.wally_storage import WallyDB
kdanylov aka koder84de1e42017-05-22 14:00:07 +030010from cephlib.sensor_storage import SensorStorage
kdanylov aka koderb0833332017-05-13 20:39:17 +030011from cephlib.statistic import StatProps
12from cephlib.numeric_types import DataSource, TimeSeries
kdanylov aka koder026e5f22017-05-15 01:04:39 +030013from cephlib.node import NodeInfo
kdanylov aka koderb0833332017-05-13 20:39:17 +030014
15from .suits.job import JobConfig
kdanylov aka koder026e5f22017-05-15 01:04:39 +030016from .result_classes import SuiteConfig, IWallyStorage
kdanylov aka koderb0833332017-05-13 20:39:17 +030017from .utils import StopTestError
18from .suits.all_suits import all_suits
19
20from cephlib.storage import Storage
21
22logger = logging.getLogger('wally')
23
24
25def fill_path(path: str, **params) -> str:
26 for name, val in params.items():
27 if val is not None:
28 path = path.replace("{" + name + "}", val)
29 return path
30
31
kdanylov aka koder84de1e42017-05-22 14:00:07 +030032class WallyStorage(IWallyStorage, SensorStorage):
kdanylov aka koderb0833332017-05-13 20:39:17 +030033 def __init__(self, storage: Storage) -> None:
kdanylov aka koder84de1e42017-05-22 14:00:07 +030034 SensorStorage.__init__(self, storage, WallyDB)
35
36 def flush(self) -> None:
37 self.storage.flush()
kdanylov aka koderb0833332017-05-13 20:39:17 +030038
39 # ------------- CHECK DATA IN STORAGE ----------------------------------------------------------------------------
40 def check_plot_file(self, source: DataSource) -> Optional[str]:
41 path = self.db_paths.plot.format(**source.__dict__)
42 fpath = self.storage.get_fname(self.db_paths.report_root + path)
43 return path if os.path.exists(fpath) else None
44
45 # ------------- PUT DATA INTO STORAGE --------------------------------------------------------------------------
46 def put_or_check_suite(self, suite: SuiteConfig) -> None:
47 path = self.db_paths.suite_cfg.format(suite_id=suite.storage_id)
48 if path in self.storage:
49 db_cfg = self.storage.load(SuiteConfig, path)
50 if db_cfg != suite:
51 logger.error("Current suite %s config is not equal to found in storage at %s", suite.test_type, path)
52 logger.debug("Current: \n%s\nStorage:\n%s", pprint.pformat(db_cfg), pprint.pformat(suite))
53 raise StopTestError()
54 else:
55 self.storage.put(suite, path)
56
57 def put_job(self, suite: SuiteConfig, job: JobConfig) -> None:
58 path = self.db_paths.job_cfg.format(suite_id=suite.storage_id, job_id=job.storage_id)
59 self.storage.put(job, path)
60
61 def put_extra(self, data: bytes, source: DataSource) -> None:
62 self.storage.put_raw(data, self.db_paths.ts.format(**source.__dict__))
63
64 def put_stat(self, data: StatProps, source: DataSource) -> None:
65 self.storage.put(data, self.db_paths.stat.format(**source.__dict__))
66
67 # return path to file to be inserted into report
68 def put_plot_file(self, data: bytes, source: DataSource) -> str:
69 path = self.db_paths.plot.format(**source.__dict__)
70 self.storage.put_raw(data, self.db_paths.report_root + path)
71 return path
72
73 def put_report(self, report: str, name: str) -> str:
74 return self.storage.put_raw(report.encode(self.csv_file_encoding), self.db_paths.report_root + name)
75
76 def put_txt_report(self, suite: SuiteConfig, report: str) -> None:
77 path = self.db_paths.txt_report.format(suite_id=suite.storage_id)
78 self.storage.put_raw(report.encode('utf8'), path)
79
80 def put_job_info(self, suite: SuiteConfig, job: JobConfig, key: str, data: Any) -> None:
kdanylov aka koder84de1e42017-05-22 14:00:07 +030081 path = self.db_paths.job_extra.format(suite_id=suite.storage_id, job_id=job.storage_id, name=key)
kdanylov aka koder026e5f22017-05-15 01:04:39 +030082 if isinstance(data, bytes):
83 self.storage.put_raw(data, path)
84 else:
85 self.storage.put(data, path)
kdanylov aka koderb0833332017-05-13 20:39:17 +030086
87 # ------------- GET DATA FROM STORAGE --------------------------------------------------------------------------
88
89 def get_stat(self, stat_cls: Type[StatProps], source: DataSource) -> StatProps:
90 return self.storage.load(stat_cls, self.db_paths.stat.format(**source.__dict__))
91
kdanylov aka koderb0833332017-05-13 20:39:17 +030092 def get_txt_report(self, suite: SuiteConfig) -> Optional[str]:
93 path = self.db_paths.txt_report.format(suite_id=suite.storage_id)
94 if path in self.storage:
95 return self.storage.get_raw(path).decode('utf8')
kdanylov aka koder026e5f22017-05-15 01:04:39 +030096 return None
kdanylov aka koderb0833332017-05-13 20:39:17 +030097
98 def get_job_info(self, suite: SuiteConfig, job: JobConfig, key: str) -> Any:
kdanylov aka koder84de1e42017-05-22 14:00:07 +030099 path = self.db_paths.job_extra.format(suite_id=suite.storage_id, job_id=job.storage_id, name=key)
kdanylov aka koderb0833332017-05-13 20:39:17 +0300100 return self.storage.get(path, None)
101 # ------------- ITER OVER STORAGE ------------------------------------------------------------------------------
102
103 def iter_suite(self, suite_type: str = None) -> Iterator[SuiteConfig]:
104 for is_file, suite_info_path, groups in self.iter_paths(self.db_paths.suite_cfg_r):
105 assert is_file
106 suite = self.storage.load(SuiteConfig, suite_info_path)
kdanylov aka koderb0833332017-05-13 20:39:17 +0300107 assert suite.storage_id == groups['suite_id']
108 if not suite_type or suite.test_type == suite_type:
109 yield suite
110
111 def iter_job(self, suite: SuiteConfig) -> Iterator[JobConfig]:
112 job_glob = fill_path(self.db_paths.job_cfg_r, suite_id=suite.storage_id)
113 job_config_cls = all_suits[suite.test_type].job_config_cls
114 for is_file, path, groups in self.iter_paths(job_glob):
115 assert is_file
116 job = cast(JobConfig, self.storage.load(job_config_cls, path))
117 assert job.storage_id == groups['job_id']
118 yield job
119
kdanylov aka koder026e5f22017-05-15 01:04:39 +0300120 def load_nodes(self) -> List[NodeInfo]:
kdanylov aka koder84de1e42017-05-22 14:00:07 +0300121 try:
122 return self.storage.other_caches['wally']['nodes']
123 except KeyError:
124 nodes = self.storage.load_list(NodeInfo, WallyDB.all_nodes)
kdanylov aka koder026e5f22017-05-15 01:04:39 +0300125 if WallyDB.nodes_params in self.storage:
126 params = json.loads(self.storage.get_raw(WallyDB.nodes_params).decode('utf8'))
kdanylov aka koder84de1e42017-05-22 14:00:07 +0300127 for node in nodes:
kdanylov aka koder026e5f22017-05-15 01:04:39 +0300128 node.params = params.get(node.node_id, {})
kdanylov aka koder84de1e42017-05-22 14:00:07 +0300129 self.storage.other_caches['wally']['nodes'] = nodes
130 return nodes
kdanylov aka koder026e5f22017-05-15 01:04:39 +0300131
kdanylov aka koderb0833332017-05-13 20:39:17 +0300132 # ----------------- TS ------------------------------------------------------------------------------------------
133 def get_ts(self, ds: DataSource) -> TimeSeries:
134 path = self.db_paths.ts.format_map(ds.__dict__)
135 (units, time_units), header2, content = self.storage.get_array(path)
kdanylov aka koderb0833332017-05-13 20:39:17 +0300136 times = content[:,0].copy()
137 data = content[:,1:]
138
139 if data.shape[1] == 1:
140 data.shape = (data.shape[0],)
141
142 return TimeSeries(data=data, times=times, source=ds, units=units, time_units=time_units, histo_bins=header2)
143
144 def put_ts(self, ts: TimeSeries) -> None:
145 assert ts.data.dtype == ts.times.dtype, "Data type {!r} != time type {!r}".format(ts.data.dtype, ts.times.dtype)
146 assert ts.data.dtype.kind == 'u', "Only unsigned ints are accepted"
147 assert ts.source.tag == self.ts_arr_tag, \
148 "Incorrect source tag == {!r}, must be {!r}".format(ts.source.tag, self.ts_arr_tag)
149
150 if ts.source.metric == 'lat':
151 assert len(ts.data.shape) == 2, "Latency should be 2d array"
152 assert ts.histo_bins is not None, "Latency should have histo_bins field not empty"
153
154 csv_path = self.db_paths.ts.format_map(ts.source.__dict__)
155 header = [ts.units, ts.time_units]
156
157 tv = ts.times.view().reshape((-1, 1))
158
159 if len(ts.data.shape) == 1:
160 dv = ts.data.view().reshape((ts.times.shape[0], -1))
161 else:
162 dv = ts.data
163
164 result = numpy.concatenate((tv, dv), axis=1)
165 self.storage.put_array(csv_path, result, header, header2=ts.histo_bins, append_on_exists=False)
166
kdanylov aka koder84de1e42017-05-22 14:00:07 +0300167 def iter_ts(self, **ds_parts: str) -> Iterator[DataSource]:
168 return self.iter_objs(self.db_paths.ts_r, **ds_parts)