move even more code to cephlib
diff --git a/wally/result_storage.py b/wally/result_storage.py
index 282bdb5..70c46d7 100644
--- a/wally/result_storage.py
+++ b/wally/result_storage.py
@@ -1,17 +1,19 @@
import os
+import json
import pprint
import logging
-from typing import cast, Iterator, Tuple, Type, Optional, Any
+from typing import cast, Iterator, Tuple, Type, Optional, Any, Union, List
import numpy
from cephlib.wally_storage import WallyDB
-from cephlib.hlstorage import SensorStorageBase
+from cephlib.sensor_storage import SensorStorageBase
from cephlib.statistic import StatProps
from cephlib.numeric_types import DataSource, TimeSeries
+from cephlib.node import NodeInfo
from .suits.job import JobConfig
-from .result_classes import SuiteConfig, IResultStorage
+from .result_classes import SuiteConfig, IWallyStorage
from .utils import StopTestError
from .suits.all_suits import all_suits
@@ -27,9 +29,10 @@
return path
-class ResultStorage(IResultStorage, SensorStorageBase):
+class WallyStorage(IWallyStorage, SensorStorageBase):
def __init__(self, storage: Storage) -> None:
SensorStorageBase.__init__(self, storage, WallyDB)
+ self.cached_nodes = None
# ------------- CHECK DATA IN STORAGE ----------------------------------------------------------------------------
def check_plot_file(self, source: DataSource) -> Optional[str]:
@@ -74,18 +77,21 @@
def put_job_info(self, suite: SuiteConfig, job: JobConfig, key: str, data: Any) -> None:
path = self.db_paths.job_extra.format(suite_id=suite.storage_id, job_id=job.storage_id, tag=key)
- self.storage.put(data, path)
+ if isinstance(data, bytes):
+ self.storage.put_raw(data, path)
+ else:
+ self.storage.put(data, path)
# ------------- GET DATA FROM STORAGE --------------------------------------------------------------------------
def get_stat(self, stat_cls: Type[StatProps], source: DataSource) -> StatProps:
return self.storage.load(stat_cls, self.db_paths.stat.format(**source.__dict__))
-
def get_txt_report(self, suite: SuiteConfig) -> Optional[str]:
path = self.db_paths.txt_report.format(suite_id=suite.storage_id)
if path in self.storage:
return self.storage.get_raw(path).decode('utf8')
+ return None
def get_job_info(self, suite: SuiteConfig, job: JobConfig, key: str) -> Any:
path = self.db_paths.job_extra.format(suite_id=suite.storage_id, job_id=job.storage_id, tag=key)
@@ -110,11 +116,19 @@
assert job.storage_id == groups['job_id']
yield job
+ def load_nodes(self) -> List[NodeInfo]:
+ if not self.cached_nodes:
+ self.cached_nodes = self.storage.load_list(NodeInfo, WallyDB.all_nodes)
+ if WallyDB.nodes_params in self.storage:
+ params = json.loads(self.storage.get_raw(WallyDB.nodes_params).decode('utf8'))
+ for node in self.cached_nodes:
+ node.params = params.get(node.node_id, {})
+ return self.cached_nodes
+
# ----------------- TS ------------------------------------------------------------------------------------------
def get_ts(self, ds: DataSource) -> TimeSeries:
path = self.db_paths.ts.format_map(ds.__dict__)
(units, time_units), header2, content = self.storage.get_array(path)
-
times = content[:,0].copy()
data = content[:,1:]