blob: 367245870d06e2d4921347e026474de59927053b [file] [log] [blame]
import os
import logging
from typing import cast, Iterator, Tuple, Type, Dict, Optional, Any, List
import numpy
from .suits.job import JobConfig
from .result_classes import SuiteConfig, TimeSeries, DataSource, StatProps, IResultStorage
from .storage import Storage
from .utils import StopTestError
from .suits.all_suits import all_suits
logger = logging.getLogger('wally')
class DB_re:
node_id = r'\d+.\d+.\d+.\d+:\d+'
job_id = r'[-a-zA-Z0-9_]+_\d+'
suite_id = r'[a-z_]+_\d+'
sensor = r'[-a-z_]+'
dev = r'[-a-zA-Z0-9_]+'
tag = r'[a-z_.]+'
metric = r'[a-z_.]+'
class DB_paths:
suite_cfg_r = r'results/{suite_id}\.info\.yml'
job_root = r'results/{suite_id}.{job_id}/'
job_cfg_r = job_root + r'info\.yml'
# time series, data from load tool, sensor is a tool name
ts_r = job_root + r'{node_id}\.{sensor}\.{metric}.{tag}'
# statistica data for ts
stat_r = job_root + r'{node_id}\.{sensor}\.{metric}\.stat.yaml'
# sensor data
sensor_data_r = r'sensors/{node_id}_{sensor}\.{dev}\.{metric}\.csv'
sensor_time_r = r'sensors/{node_id}_collected_at\.csv'
report_root = 'report/'
plot_r = r'{suite_id}\.{job_id}/{node_id}\.{sensor}\.{dev}\.{metric}\.{tag}'
job_cfg = job_cfg_r.replace("\\.", '.')
suite_cfg = suite_cfg_r.replace("\\.", '.')
ts = ts_r.replace("\\.", '.')
stat = stat_r.replace("\\.", '.')
sensor_data = sensor_data_r.replace("\\.", '.')
sensor_time = sensor_time_r.replace("\\.", '.')
plot = plot_r.replace("\\.", '.')
DB_rr = {name: r"(?P<{}>{})".format(name, rr)
for name, rr in DB_re.__dict__.items()
if not name.startswith("__")}
def fill_path(path: str, **params) -> str:
for name, val in params.items():
if val is not None:
path = path.replace("{" + name + "}", val)
return path
class ResultStorage(IResultStorage):
# TODO: check that all path components match required patterns
ts_header_size = 64
ts_header_format = "!IIIcc"
ts_arr_tag = 'csv'
csv_file_encoding = 'ascii'
def __init__(self, storage: Storage) -> None:
self.storage = storage
self.cache = {} # type: Dict[str, Tuple[int, int, Any, List[str]]]
def sync(self) -> None:
self.storage.sync()
# ----------------- SERIALIZATION / DESERIALIZATION -------------------------------------------------------------
def load_array(self, path: str, skip_shape: bool = False) -> Tuple[numpy.array, Tuple[str, ...]]:
with self.storage.get_fd(path, "rb") as fd:
stats = os.fstat(fd.fileno())
if path in self.cache:
size, atime, obj, header = self.cache[path]
if size == stats.st_size and atime == stats.st_atime_ns:
return obj, header
header = fd.readline().decode(self.csv_file_encoding).strip().split(",")
print("header =", header)
if skip_shape:
header = header[1:]
dt = fd.read().decode("utf-8").strip()
print(dt.split("\n")[0])
arr = numpy.fromstring(dt.replace("\n", ','), sep=',', dtype=header[0])
if len(dt) != 0:
lines = dt.count("\n") + 1
columns = dt.split("\n", 1)[0].count(",") + 1
assert lines * columns == len(arr)
if columns == 1:
arr.shape = (lines,)
else:
arr.shape = (lines, columns)
self.cache[path] = (stats.st_size, stats.st_atime_ns, arr, header[1:])
return arr, header[1:]
def put_array(self, path:str, data: numpy.array, header: List[str], append_on_exists: bool = False) -> None:
header = [data.dtype.name] + header
exists = append_on_exists and path in self.storage
if len(data.shape) == 1:
# make array vertical to simplify reading
vw = data.view().reshape((data.shape[0], 1))
else:
vw = data
with self.storage.get_fd(path, "cb" if exists else "wb") as fd:
if exists:
curr_header = fd.readline().decode(self.csv_file_encoding).rstrip().split(",")
assert header == curr_header, \
"Path {!r}. Expected header ({!r}) and current header ({!r}) don't match"\
.format(path, header, curr_header)
fd.seek(0, os.SEEK_END)
else:
fd.write((",".join(header) + "\n").encode(self.csv_file_encoding))
numpy.savetxt(fd, vw, delimiter=',', newline="\n", fmt="%lu")
def load_ts(self, ds: DataSource, path: str) -> TimeSeries:
arr, header = self.load_array(path, skip_shape=True)
units, time_units = header
data = arr[:,1:]
if data.shape[1] == 1:
data = data.reshape((-1,))
return TimeSeries("{}.{}".format(ds.dev, ds.sensor),
raw=None,
data=data,
times=arr[:,0],
source=ds,
units=units,
time_units=time_units)
def load_sensor(self, ds: DataSource) -> TimeSeries:
collected_at, collect_header = self.load_array(DB_paths.sensor_time.format(**ds.__dict__))
assert collect_header == [ds.node_id, 'collected_at', 'us'], repr(collect_header)
data, data_header = self.load_array(DB_paths.sensor_data.format(**ds.__dict__))
data_units = data_header[2]
assert data_header == [ds.node_id, ds.metric_fqdn, data_units]
assert len(data.shape) == 1
assert len(collected_at.shape) == 1
return TimeSeries(ds.metric_fqdn,
raw=None,
data=data,
times=collected_at,
source=ds,
units=data_units,
time_units='us')
# ------------- CHECK DATA IN STORAGE ----------------------------------------------------------------------------
def check_plot_file(self, source: DataSource) -> Optional[str]:
path = DB_paths.plot.format(**source.__dict__)
fpath = self.storage.resolve_raw(DB_paths.report_root + path)
return path if os.path.exists(fpath) else None
# ------------- PUT DATA INTO STORAGE --------------------------------------------------------------------------
def put_or_check_suite(self, suite: SuiteConfig) -> None:
path = DB_paths.suite_cfg.format(suite_id=suite.storage_id)
if path in self.storage:
db_cfg = self.storage.get(path)
if db_cfg != suite:
logger.error("Current suite %s config is not equal to found in storage at %s", suite.test_type, path)
raise StopTestError()
self.storage.put(suite, path)
def put_job(self, suite: SuiteConfig, job: JobConfig) -> None:
path = DB_paths.job_cfg.format(suite_id=suite.storage_id, job_id=job.storage_id)
self.storage.put(job, path)
def put_ts(self, ts: TimeSeries) -> None:
assert ts.data.dtype == ts.times.dtype
assert ts.data.dtype.kind == 'u'
assert ts.source.tag == self.ts_arr_tag
csv_path = DB_paths.ts.format(**ts.source.__dict__)
header = [ts.data.dtype.name, ts.units, ts.time_units]
tv = ts.times.view().reshape((-1, 1))
if len(ts.data.shape) == 1:
dv = ts.data.view().reshape((ts.times.shape[0], -1))
else:
dv = ts.data
result = numpy.concatenate((tv, dv), axis=1)
self.put_array(csv_path, result, header)
if ts.raw:
raw_path = DB_paths.ts.format(**ts.source(tag=ts.raw_tag).__dict__)
self.storage.put_raw(ts.raw, raw_path)
def put_extra(self, data: bytes, source: DataSource) -> None:
self.storage.put(data, DB_paths.ts.format(**source.__dict__))
def put_stat(self, data: StatProps, source: DataSource) -> None:
self.storage.put(data, DB_paths.stat.format(**source.__dict__))
# return path to file to be inserted into report
def put_plot_file(self, data: bytes, source: DataSource) -> str:
path = DB_paths.plot.format(**source.__dict__)
self.storage.put_raw(data, DB_paths.report_root + path)
return path
def put_report(self, report: str, name: str) -> str:
return self.storage.put_raw(report.encode(self.csv_file_encoding), DB_paths.report_root + name)
def append_sensor(self, data: numpy.array, ds: DataSource, units: str) -> None:
if ds.metric == 'collected_at':
path = DB_paths.sensor_time
metrics_fqn = 'collected_at'
else:
path = DB_paths.sensor_data
metrics_fqn = ds.metric_fqdn
self.put_array(path.format(**ds.__dict__), data, [ds.node_id, metrics_fqn, units], append_on_exists=True)
# ------------- GET DATA FROM STORAGE --------------------------------------------------------------------------
def get_stat(self, stat_cls: Type[StatProps], source: DataSource) -> StatProps:
return self.storage.load(stat_cls, DB_paths.stat.format(**source.__dict__))
# ------------- ITER OVER STORAGE ------------------------------------------------------------------------------
def iter_paths(self, path_glob) -> Iterator[Tuple[bool, str, Dict[str, str]]]:
path = path_glob.format(**DB_rr).split("/")
yield from self.storage._iter_paths("", path, {})
def iter_suite(self, suite_type: str = None) -> Iterator[SuiteConfig]:
for is_file, suite_info_path, groups in self.iter_paths(DB_paths.suite_cfg_r):
assert is_file
suite = self.storage.load(SuiteConfig, suite_info_path)
# suite = cast(SuiteConfig, self.storage.load(SuiteConfig, suite_info_path))
assert suite.storage_id == groups['suite_id']
if not suite_type or suite.test_type == suite_type:
yield suite
def iter_job(self, suite: SuiteConfig) -> Iterator[JobConfig]:
job_glob = fill_path(DB_paths.job_cfg_r, suite_id=suite.storage_id)
job_config_cls = all_suits[suite.test_type].job_config_cls
for is_file, path, groups in self.iter_paths(job_glob):
assert is_file
job = cast(JobConfig, self.storage.load(job_config_cls, path))
assert job.storage_id == groups['job_id']
yield job
# iterate over test tool data
def iter_ts(self, suite: SuiteConfig, job: JobConfig, **filters) -> Iterator[TimeSeries]:
filters.update(suite_id=suite.storage_id, job_id=job.storage_id)
ts_glob = fill_path(DB_paths.ts_r, **filters)
for is_file, path, groups in self.iter_paths(ts_glob):
assert is_file
groups = groups.copy()
groups.update(filters)
ds = DataSource(suite_id=suite.storage_id,
job_id=job.storage_id,
node_id=groups["node_id"],
sensor=groups["sensor"],
dev=None,
metric=groups["metric"],
tag=groups["tag"])
yield self.load_ts(ds, path)
def iter_sensors(self, node_id: str = None, sensor: str = None, dev: str = None, metric: str = None) -> \
Iterator[Tuple[str, Dict[str, str]]]:
path = fill_path(DB_paths.sensor_data_r, node_id=node_id, sensor=sensor, dev=dev, metric=metric)
for is_file, path, groups in self.iter_paths(path):
assert is_file
yield path, groups