blob: 62e74f07c472dc41d3f8a3e3389673ca7cea66b4 [file] [log] [blame]
import abc
import array
from typing import Dict, List, Any, Optional, Tuple, cast, Type, Iterator
from collections import OrderedDict
import numpy
from scipy.stats.mstats_basic import NormaltestResult
from .suits.job import JobConfig
from .node_interfaces import IRPCNode
from .common_types import Storable, IStorable
from .utils import round_digits, Number
class SuiteConfig(Storable):
"""
Test suite input configuration.
test_type - test type name
params - parameters from yaml file for this test
run_uuid - UUID to be used to create file names & Co
nodes - nodes to run tests on
remote_dir - directory on nodes to be used for local files
"""
__ignore_fields__ = ['nodes', 'run_uuid', 'remote_dir']
def __init__(self,
test_type: str,
params: Dict[str, Any],
run_uuid: str,
nodes: List[IRPCNode],
remote_dir: str,
idx: int) -> None:
self.test_type = test_type
self.params = params
self.run_uuid = run_uuid
self.nodes = nodes
self.nodes_ids = [node.node_id for node in nodes]
self.remote_dir = remote_dir
self.storage_id = "{}_{}".format(self.test_type, idx)
def __eq__(self, o: object) -> bool:
if type(o) is not self.__class__:
return False
other = cast(SuiteConfig, o)
return (self.test_type == other.test_type and
self.params == other.params and
set(self.nodes_ids) == set(other.nodes_ids))
class DataSource:
def __init__(self,
suite_id: str = None,
job_id: str = None,
node_id: str = None,
dev: str = None,
sensor: str = None,
tag: str = None) -> None:
self.suite_id = suite_id
self.job_id = job_id
self.node_id = node_id
self.dev = dev
self.sensor = sensor
self.tag = tag
def __call__(self, **kwargs) -> 'DataSource':
dct = self.__dict__.copy()
dct.update(kwargs)
return self.__class__(**dct)
def __str__(self) -> str:
return "{0.suite_id}.{0.job_id}/{0.node_id}/{0.dev}.{0.sensor}.{0.tag}".format(self)
def __repr__(self) -> str:
return str(self)
class TimeSeries:
"""Data series from sensor - either system sensor or from load generator tool (e.g. fio)"""
def __init__(self,
name: str,
raw: Optional[bytes],
data: numpy.array,
times: numpy.array,
units: str,
time_units: str = 'us',
second_axis_size: int = 1,
source: DataSource = None) -> None:
# Sensor name. Typically DEV_NAME.METRIC
self.name = name
# units for data
self.units = units
# units for time
self.time_units = time_units
# Time series times and values. Time in ms from Unix epoch.
self.times = times
self.data = data
# Not equal to 1 in case of 2d sensors, like latency, when each measurement is a histogram.
self.second_axis_size = second_axis_size
# Raw sensor data (is provided). Like log file for fio iops/bw/lat.
self.raw = raw
self.source = source
def __str__(self) -> str:
res = "TS({}):\n".format(self.name)
res += " source={}\n".format(self.source)
res += " times_size={}\n".format(len(self.times))
res += " data_size={}\n".format(len(self.data))
res += " data_shape={}x{}\n".format(len(self.data) // self.second_axis_size, self.second_axis_size)
return res
def __repr__(self) -> str:
return str(self)
# (node_name, source_dev, metric_name) => metric_results
JobMetrics = Dict[Tuple[str, str, str], TimeSeries]
class StatProps(Storable):
"Statistic properties for timeseries with unknown data distribution"
__ignore_fields__ = ['data']
def __init__(self, data: numpy.array) -> None:
self.perc_99 = None # type: float
self.perc_95 = None # type: float
self.perc_90 = None # type: float
self.perc_50 = None # type: float
self.min = None # type: Number
self.max = None # type: Number
# bin_center: bin_count
self.bins_populations = None # type: numpy.array
self.bins_mids = None # type: numpy.array
self.data = data
def __str__(self) -> str:
res = ["{}(size = {}):".format(self.__class__.__name__, len(self.data))]
for name in ["perc_50", "perc_90", "perc_95", "perc_99"]:
res.append(" {} = {}".format(name, round_digits(getattr(self, name))))
res.append(" range {} {}".format(round_digits(self.min), round_digits(self.max)))
return "\n".join(res)
def __repr__(self) -> str:
return str(self)
def raw(self) -> Dict[str, Any]:
data = super().raw()
data['bins_mids'] = list(data['bins_mids'])
data['bins_populations'] = list(data['bins_populations'])
return data
@classmethod
def fromraw(cls, data: Dict[str, Any]) -> 'StatProps':
data['bins_mids'] = numpy.array(data['bins_mids'])
data['bins_populations'] = numpy.array(data['bins_populations'])
return cast(StatProps, super().fromraw(data))
class HistoStatProps(StatProps):
"""Statistic properties for 2D timeseries with unknown data distribution and histogram as input value.
Used for latency"""
def __init__(self, data: numpy.array, second_axis_size: int) -> None:
self.second_axis_size = second_axis_size
StatProps.__init__(self, data)
class NormStatProps(StatProps):
"Statistic properties for timeseries with normal data distribution. Used for iops/bw"
def __init__(self, data: numpy.array) -> None:
StatProps.__init__(self, data)
self.average = None # type: float
self.deviation = None # type: float
self.confidence = None # type: float
self.confidence_level = None # type: float
self.normtest = None # type: NormaltestResult
self.skew = None # type: float
self.kurt = None # type: float
def __str__(self) -> str:
res = ["NormStatProps(size = {}):".format(len(self.data)),
" distr = {} ~ {}".format(round_digits(self.average), round_digits(self.deviation)),
" confidence({0.confidence_level}) = {1}".format(self, round_digits(self.confidence)),
" perc_50 = {}".format(round_digits(self.perc_50)),
" perc_90 = {}".format(round_digits(self.perc_90)),
" perc_95 = {}".format(round_digits(self.perc_95)),
" perc_99 = {}".format(round_digits(self.perc_99)),
" range {} {}".format(round_digits(self.min), round_digits(self.max)),
" normtest = {0.normtest}".format(self),
" skew ~ kurt = {0.skew} ~ {0.kurt}".format(self)]
return "\n".join(res)
def raw(self) -> Dict[str, Any]:
data = super().raw()
data['normtest'] = (data['nortest'].statistic, data['nortest'].pvalue)
return data
@classmethod
def fromraw(cls, data: Dict[str, Any]) -> 'NormStatProps':
data['normtest'] = NormaltestResult(*data['normtest'])
return cast(NormStatProps, super().fromraw(data))
JobStatMetrics = Dict[Tuple[str, str, str], StatProps]
class JobResult:
"""Contains done test job information"""
def __init__(self,
info: JobConfig,
begin_time: int,
end_time: int,
raw: JobMetrics) -> None:
self.info = info
self.run_interval = (begin_time, end_time)
self.raw = raw # type: JobMetrics
self.processed = None # type: JobStatMetrics
class IResultStorage(metaclass=abc.ABCMeta):
@abc.abstractmethod
def sync(self) -> None:
pass
@abc.abstractmethod
def put_or_check_suite(self, suite: SuiteConfig) -> None:
pass
@abc.abstractmethod
def put_job(self, suite: SuiteConfig, job: JobConfig) -> None:
pass
@abc.abstractmethod
def put_ts(self, ts: TimeSeries) -> None:
pass
@abc.abstractmethod
def put_extra(self, data: bytes, source: DataSource) -> None:
pass
@abc.abstractmethod
def put_stat(self, data: StatProps, source: DataSource) -> None:
pass
@abc.abstractmethod
def get_stat(self, stat_cls: Type[StatProps], source: DataSource) -> StatProps:
pass
@abc.abstractmethod
def iter_suite(self, suite_type: str = None) -> Iterator[SuiteConfig]:
pass
@abc.abstractmethod
def iter_job(self, suite: SuiteConfig) -> Iterator[JobConfig]:
pass
@abc.abstractmethod
def iter_ts(self, suite: SuiteConfig, job: JobConfig) -> Iterator[TimeSeries]:
pass
# return path to file to be inserted into report
@abc.abstractmethod
def put_plot_file(self, data: bytes, source: DataSource) -> str:
pass