a lot of fixes, improve visualization speed, add c++ code
diff --git a/wally/hlstorage.py b/wally/hlstorage.py
index 411b515..733eab0 100644
--- a/wally/hlstorage.py
+++ b/wally/hlstorage.py
@@ -1,12 +1,12 @@
import os
import pprint
import logging
-from typing import cast, Iterator, Tuple, Type, Dict, Optional, Any, List
+from typing import cast, Iterator, Tuple, Type, Dict, Optional, List
import numpy
from .suits.job import JobConfig
-from .result_classes import SuiteConfig, TimeSeries, DataSource, StatProps, IResultStorage
+from .result_classes import SuiteConfig, TimeSeries, DataSource, StatProps, IResultStorage, ArrayData
from .storage import Storage
from .utils import StopTestError
from .suits.all_suits import all_suits
@@ -75,87 +75,140 @@
def __init__(self, storage: Storage) -> None:
self.storage = storage
- self.cache = {} # type: Dict[str, Tuple[int, int, Any, List[str]]]
+ self.cache = {} # type: Dict[str, Tuple[int, int, ArrayData]]
def sync(self) -> None:
self.storage.sync()
# ----------------- SERIALIZATION / DESERIALIZATION -------------------------------------------------------------
- def load_array(self, path: str, skip_shape: bool = False) -> Tuple[numpy.array, Tuple[str, ...]]:
+ def read_headers(self, fd) -> Tuple[str, List[str], List[str], Optional[numpy.ndarray]]:
+ header = fd.readline().decode(self.csv_file_encoding).rstrip().split(",")
+ dtype, has_header2, header2_dtype, *ext_header = header
+
+ if has_header2 == 'true':
+ ln = fd.readline().decode(self.csv_file_encoding).strip()
+ header2 = numpy.fromstring(ln, sep=',', dtype=header2_dtype)
+ else:
+ assert has_header2 == 'false', \
+ "In file {} has_header2 is not true/false, but {!r}".format(fd.name, has_header2)
+ header2 = None
+ return dtype, ext_header, header, header2
+
+ def load_array(self, path: str) -> ArrayData:
+ """
+ Load array from file, shoult not be called directly
+ :param path: file path
+ :return: ArrayData
+ """
with self.storage.get_fd(path, "rb") as fd:
+ fd.seek(0, os.SEEK_SET)
+
stats = os.fstat(fd.fileno())
if path in self.cache:
- size, atime, obj, header = self.cache[path]
+ size, atime, arr_info = self.cache[path]
if size == stats.st_size and atime == stats.st_atime_ns:
- return obj, header
+ return arr_info
- header = fd.readline().decode(self.csv_file_encoding).strip().split(",")
+ data_dtype, header, _, header2 = self.read_headers(fd)
+ assert data_dtype == 'uint64', path
+ dt = fd.read().decode(self.csv_file_encoding).strip()
- if skip_shape:
- header = header[1:]
- dt = fd.read().decode("utf-8").strip()
+ if len(dt) != 0:
+ arr = numpy.fromstring(dt.replace("\n", ','), sep=',', dtype=data_dtype)
+ lines = dt.count("\n") + 1
+ assert len(set(ln.count(',') for ln in dt.split("\n"))) == 1, \
+ "Data lines in {!r} have different element count".format(path)
+ arr.shape = [lines] if lines == arr.size else [lines, -1]
+ else:
+ arr = None
- arr = numpy.fromstring(dt.replace("\n", ','), sep=',', dtype=header[0])
- if len(dt) != 0:
- lines = dt.count("\n") + 1
- columns = dt.split("\n", 1)[0].count(",") + 1
- assert lines * columns == len(arr)
- if columns == 1:
- arr.shape = (lines,)
- else:
- arr.shape = (lines, columns)
+ arr_data = ArrayData(header, header2, arr)
+ self.cache[path] = (stats.st_size, stats.st_atime_ns, arr_data)
+ return arr_data
- self.cache[path] = (stats.st_size, stats.st_atime_ns, arr, header[1:])
- return arr, header[1:]
+ def put_array(self, path: str, data: numpy.array, header: List[str], header2: numpy.ndarray = None,
+ append_on_exists: bool = False) -> None:
- def put_array(self, path:str, data: numpy.array, header: List[str], append_on_exists: bool = False) -> None:
- header = [data.dtype.name] + header
+ header = [data.dtype.name] + \
+ (['false', ''] if header2 is None else ['true', header2.dtype.name]) + \
+ header
exists = append_on_exists and path in self.storage
- if len(data.shape) == 1:
- # make array vertical to simplify reading
- vw = data.view().reshape((data.shape[0], 1))
- else:
- vw = data
+ vw = data.view().reshape((data.shape[0], 1)) if len(data.shape) == 1 else data
+ mode = "cb" if not exists else "rb+"
- with self.storage.get_fd(path, "cb" if not exists else "rb+") as fd:
+ with self.storage.get_fd(path, mode) as fd:
if exists:
- curr_header = fd.readline().decode(self.csv_file_encoding).rstrip().split(",")
- assert header == curr_header, \
- "Path {!r}. Expected header ({!r}) and current header ({!r}) don't match"\
- .format(path, header, curr_header)
+ data_dtype, _, full_header, curr_header2 = self.read_headers(fd)
+
+ assert data_dtype == data.dtype.name, \
+ "Path {!r}. Passed data type ({!r}) and current data type ({!r}) doesn't match"\
+ .format(path, data.dtype.name, data_dtype)
+
+ assert header == full_header, \
+ "Path {!r}. Passed header ({!r}) and current header ({!r}) doesn't match"\
+ .format(path, header, full_header)
+
+ assert header2 == curr_header2, \
+ "Path {!r}. Passed header2 != current header2: {!r}\n{!r}"\
+ .format(path, header2, curr_header2)
+
fd.seek(0, os.SEEK_END)
else:
fd.write((",".join(header) + "\n").encode(self.csv_file_encoding))
+ if header2 is not None:
+ fd.write((",".join(map(str, header2)) + "\n").encode(self.csv_file_encoding))
numpy.savetxt(fd, vw, delimiter=',', newline="\n", fmt="%lu")
def load_ts(self, ds: DataSource, path: str) -> TimeSeries:
- arr, header = self.load_array(path, skip_shape=True)
- units, time_units = header
+ """
+ Load time series, generated by fio or other tool, should not be called directly,
+ use iter_ts istead.
+ :param ds: data source path
+ :param path: path in data storage
+ :return: TimeSeries
+ """
+ (units, time_units), header2, data = self.load_array(path)
+ times = data[:,0]
+ ts_data = data[:,1:]
- data = arr[:,1:]
- if data.shape[1] == 1:
- data = data.reshape((-1,))
+ if ts_data.shape[1] == 1:
+ ts_data.shape = (ts_data.shape[0],)
return TimeSeries("{}.{}".format(ds.dev, ds.sensor),
raw=None,
- data=data,
- times=arr[:,0],
+ data=ts_data,
+ times=times,
source=ds,
units=units,
- time_units=time_units)
+ time_units=time_units,
+ histo_bins=header2)
def load_sensor(self, ds: DataSource) -> TimeSeries:
- collected_at, collect_header = self.load_array(DB_paths.sensor_time.format(**ds.__dict__))
- assert collect_header == [ds.node_id, 'collected_at', 'us'], repr(collect_header)
- data, data_header = self.load_array(DB_paths.sensor_data.format(**ds.__dict__))
+ # sensors has no shape
+ path = DB_paths.sensor_time.format(**ds.__dict__)
+ collect_header, must_be_none, collected_at = self.load_array(path)
+
+ # cut 'collection end' time
+ collected_at = collected_at[::2]
+
+ # there must be no histogram for collected_at
+ assert must_be_none is None, "Extra header2 {!r} in collect_at file at {!r}".format(must_be_none, path)
+ assert collect_header == [ds.node_id, 'collected_at', 'us'],\
+ "Unexpected collect_at header {!r} at {!r}".format(collect_header, path)
+ assert len(collected_at.shape) == 1, "Collected_at must be 1D at {!r}".format(path)
+
+ data_path = DB_paths.sensor_data.format(**ds.__dict__)
+ data_header, must_be_none, data = self.load_array(data_path)
+
+ # there must be no histogram for any sensors
+ assert must_be_none is None, "Extra header2 {!r} in sensor data file {!r}".format(must_be_none, data_path)
data_units = data_header[2]
- assert data_header == [ds.node_id, ds.metric_fqdn, data_units]
-
- assert len(data.shape) == 1
- assert len(collected_at.shape) == 1
+ assert data_header == [ds.node_id, ds.metric_fqdn, data_units], \
+ "Unexpected data header {!r} at {!r}".format(data_header, data_path)
+ assert len(data.shape) == 1, "Sensor data must be 1D at {!r}".format(data_path)
return TimeSeries(ds.metric_fqdn,
raw=None,
@@ -190,12 +243,12 @@
self.storage.put(job, path)
def put_ts(self, ts: TimeSeries) -> None:
- assert ts.data.dtype == ts.times.dtype
- assert ts.data.dtype.kind == 'u'
- assert ts.source.tag == self.ts_arr_tag
-
+ assert ts.data.dtype == ts.times.dtype, "Data type {!r} != time type {!r}".format(ts.data.dtype, ts.times.dtype)
+ assert ts.data.dtype.kind == 'u', "Only unsigned ints are accepted"
+ assert ts.source.tag == self.ts_arr_tag, "Incorrect source tag == {!r}, must be {!r}".format(ts.source.tag,
+ self.ts_arr_tag)
csv_path = DB_paths.ts.format(**ts.source.__dict__)
- header = [ts.data.dtype.name, ts.units, ts.time_units]
+ header = [ts.units, ts.time_units]
tv = ts.times.view().reshape((-1, 1))
if len(ts.data.shape) == 1:
@@ -204,7 +257,10 @@
dv = ts.data
result = numpy.concatenate((tv, dv), axis=1)
- self.put_array(csv_path, result, header)
+ if ts.histo_bins is not None:
+ self.put_array(csv_path, result, header, header2=ts.histo_bins)
+ else:
+ self.put_array(csv_path, result, header)
if ts.raw:
raw_path = DB_paths.ts.format(**ts.source(tag=ts.raw_tag).__dict__)
@@ -225,14 +281,20 @@
def put_report(self, report: str, name: str) -> str:
return self.storage.put_raw(report.encode(self.csv_file_encoding), DB_paths.report_root + name)
- def append_sensor(self, data: numpy.array, ds: DataSource, units: str) -> None:
+ def append_sensor(self, data: numpy.array, ds: DataSource, units: str, histo_bins: numpy.ndarray = None) -> None:
if ds.metric == 'collected_at':
path = DB_paths.sensor_time
metrics_fqn = 'collected_at'
else:
path = DB_paths.sensor_data
metrics_fqn = ds.metric_fqdn
- self.put_array(path.format(**ds.__dict__), data, [ds.node_id, metrics_fqn, units], append_on_exists=True)
+
+ if ds.metric == 'lat':
+ assert len(data.shape) == 2, "Latency should be histo array"
+ assert histo_bins is not None, "Latency should have histo bins"
+
+ path = path.format(**ds.__dict__)
+ self.put_array(path, data, [ds.node_id, metrics_fqn, units], header2=histo_bins, append_on_exists=True)
# ------------- GET DATA FROM STORAGE --------------------------------------------------------------------------
@@ -284,11 +346,12 @@
yield self.load_ts(ds, path)
def iter_sensors(self, node_id: str = None, sensor: str = None, dev: str = None, metric: str = None) -> \
- Iterator[Tuple[str, Dict[str, str]]]:
-
- path = fill_path(DB_paths.sensor_data_r, node_id=node_id, sensor=sensor, dev=dev, metric=metric)
+ Iterator[Tuple[str, DataSource]]:
+ vls = dict(node_id=node_id, sensor=sensor, dev=dev, metric=metric)
+ path = fill_path(DB_paths.sensor_data_r, **vls)
for is_file, path, groups in self.iter_paths(path):
- assert is_file
- yield path, groups
+ cvls = vls.copy()
+ cvls.update(groups)
+ yield path, DataSource(**cvls)