continue refactoring for report
diff --git a/scripts/Dockerfile b/scripts/Dockerfile
index 25ee360..e74d736 100644
--- a/scripts/Dockerfile
+++ b/scripts/Dockerfile
@@ -1,5 +1,5 @@
FROM ubuntu:14.04
-MAINTAINER Petr Lomakin <plomakin@mirantis.com>
+MAINTAINER Kostiantyn Danylov <koder.mail@gmail.com>
RUN apt-get update
@@ -13,6 +13,6 @@
RUN git clone https://github.com/Mirantis/disk_perf_test_tool.git \
/opt/disk_perf_tool
-RUN cd /opt/disk_perf_tool; bash install.sh --full
+RUN cd /opt/disk_perf_tool; bash scripts/install.sh --full
RUN ["/bin/bash"]
diff --git a/v2_plans.md b/v2_plans.md
index e6b2116..a85e5de 100644
--- a/v2_plans.md
+++ b/v2_plans.md
@@ -2,6 +2,7 @@
* With current code impossible to do vm count scan test
* TODO next
+ * Add settings to keep raw log files on disk (not fio output)
* Job description should have tuple of parameters, characterized load and abbreviated/readable description
* TS should have units, UI modules should use function to calculate coefficient for show values
* Get done iops amount from fio?
diff --git a/wally/common_types.py b/wally/common_types.py
index a4805c3..9464b57 100644
--- a/wally/common_types.py
+++ b/wally/common_types.py
@@ -25,12 +25,19 @@
class Storable(IStorable):
"""Default implementation"""
+ __ignore_fields__ = []
+
def raw(self) -> Dict[str, Any]:
- return {name: val for name, val in self.__dict__.items() if not name.startswith("_")}
+ return {name: val
+ for name, val in self.__dict__.items()
+ if not name.startswith("_") and name not in self.__ignore_fields__}
@classmethod
def fromraw(cls, data: Dict[str, Any]) -> 'IStorable':
obj = cls.__new__(cls)
+ if cls.__ignore_fields__:
+ data = data.copy()
+ data.update(dict.fromkeys(cls.__ignore_fields__))
obj.__dict__.update(data)
return obj
diff --git a/wally/result_classes.py b/wally/result_classes.py
index 1a148ed..62e74f0 100644
--- a/wally/result_classes.py
+++ b/wally/result_classes.py
@@ -8,35 +8,13 @@
from scipy.stats.mstats_basic import NormaltestResult
+from .suits.job import JobConfig
from .node_interfaces import IRPCNode
-from .common_types import IStorable, Storable
+from .common_types import Storable, IStorable
from .utils import round_digits, Number
-class TestJobConfig(Storable, metaclass=abc.ABCMeta):
- def __init__(self, idx: int) -> None:
- self.idx = idx
- self.reliable_info_time_range = None # type: Tuple[int, int]
- self.vals = OrderedDict() # type: Dict[str, Any]
-
- @property
- def storage_id(self) -> str:
- return "{}_{}".format(self.summary, self.idx)
-
- @abc.abstractproperty
- def characterized_tuple(self) -> Tuple:
- pass
-
- @abc.abstractproperty
- def summary(self, *excluded_fields) -> str:
- pass
-
- @abc.abstractproperty
- def long_summary(self, *excluded_fields) -> str:
- pass
-
-
-class TestSuiteConfig(IStorable):
+class SuiteConfig(Storable):
"""
Test suite input configuration.
@@ -46,6 +24,8 @@
nodes - nodes to run tests on
remote_dir - directory on nodes to be used for local files
"""
+ __ignore_fields__ = ['nodes', 'run_uuid', 'remote_dir']
+
def __init__(self,
test_type: str,
params: Dict[str, Any],
@@ -65,29 +45,12 @@
if type(o) is not self.__class__:
return False
- other = cast(TestSuiteConfig, o)
+ other = cast(SuiteConfig, o)
return (self.test_type == other.test_type and
self.params == other.params and
set(self.nodes_ids) == set(other.nodes_ids))
- def raw(self) -> Dict[str, Any]:
- res = self.__dict__.copy()
- del res['nodes']
- del res['run_uuid']
- del res['remote_dir']
- return res
-
- @classmethod
- def fromraw(cls, data: Dict[str, Any]) -> 'IStorable':
- obj = cls.__new__(cls)
- data = data.copy()
- data['nodes'] = None
- data['run_uuid'] = None
- data['remote_dir'] = None
- obj.__dict__.update(data)
- return obj
-
class DataSource:
def __init__(self,
@@ -124,12 +87,20 @@
raw: Optional[bytes],
data: numpy.array,
times: numpy.array,
+ units: str,
+ time_units: str = 'us',
second_axis_size: int = 1,
source: DataSource = None) -> None:
# Sensor name. Typically DEV_NAME.METRIC
self.name = name
+ # units for data
+ self.units = units
+
+ # units for time
+ self.time_units = time_units
+
# Time series times and values. Time in ms from Unix epoch.
self.times = times
self.data = data
@@ -158,8 +129,11 @@
JobMetrics = Dict[Tuple[str, str, str], TimeSeries]
-class StatProps(IStorable):
+class StatProps(Storable):
"Statistic properties for timeseries with unknown data distribution"
+
+ __ignore_fields__ = ['data']
+
def __init__(self, data: numpy.array) -> None:
self.perc_99 = None # type: float
self.perc_95 = None # type: float
@@ -185,20 +159,16 @@
return str(self)
def raw(self) -> Dict[str, Any]:
- data = self.__dict__.copy()
- del data['data']
- data['bins_mids'] = list(self.bins_mids)
- data['bins_populations'] = list(self.bins_populations)
+ data = super().raw()
+ data['bins_mids'] = list(data['bins_mids'])
+ data['bins_populations'] = list(data['bins_populations'])
return data
@classmethod
def fromraw(cls, data: Dict[str, Any]) -> 'StatProps':
data['bins_mids'] = numpy.array(data['bins_mids'])
data['bins_populations'] = numpy.array(data['bins_populations'])
- data['data'] = None
- res = cls.__new__(cls)
- res.__dict__.update(data)
- return res
+ return cast(StatProps, super().fromraw(data))
class HistoStatProps(StatProps):
@@ -236,29 +206,24 @@
return "\n".join(res)
def raw(self) -> Dict[str, Any]:
- data = self.__dict__.copy()
+ data = super().raw()
data['normtest'] = (data['nortest'].statistic, data['nortest'].pvalue)
- del data['data']
- data['bins_mids'] = list(self.bins_mids)
- data['bins_populations'] = list(self.bins_populations)
return data
@classmethod
def fromraw(cls, data: Dict[str, Any]) -> 'NormStatProps':
data['normtest'] = NormaltestResult(*data['normtest'])
- obj = StatProps.fromraw(data)
- obj.__class__ = cls
- return cast('NormStatProps', obj)
+ return cast(NormStatProps, super().fromraw(data))
JobStatMetrics = Dict[Tuple[str, str, str], StatProps]
-class TestJobResult:
+class JobResult:
"""Contains done test job information"""
def __init__(self,
- info: TestJobConfig,
+ info: JobConfig,
begin_time: int,
end_time: int,
raw: JobMetrics) -> None:
@@ -275,11 +240,11 @@
pass
@abc.abstractmethod
- def put_or_check_suite(self, suite: TestSuiteConfig) -> None:
+ def put_or_check_suite(self, suite: SuiteConfig) -> None:
pass
@abc.abstractmethod
- def put_job(self, suite: TestSuiteConfig, job: TestJobConfig) -> None:
+ def put_job(self, suite: SuiteConfig, job: JobConfig) -> None:
pass
@abc.abstractmethod
@@ -299,15 +264,15 @@
pass
@abc.abstractmethod
- def iter_suite(self, suite_type: str = None) -> Iterator[TestSuiteConfig]:
+ def iter_suite(self, suite_type: str = None) -> Iterator[SuiteConfig]:
pass
@abc.abstractmethod
- def iter_job(self, suite: TestSuiteConfig) -> Iterator[TestJobConfig]:
+ def iter_job(self, suite: SuiteConfig) -> Iterator[JobConfig]:
pass
@abc.abstractmethod
- def iter_ts(self, suite: TestSuiteConfig, job: TestJobConfig) -> Iterator[TimeSeries]:
+ def iter_ts(self, suite: SuiteConfig, job: JobConfig) -> Iterator[TimeSeries]:
pass
# return path to file to be inserted into report
diff --git a/wally/suits/io/fio.py b/wally/suits/io/fio.py
index 33e8343..bf2e6b3 100644
--- a/wally/suits/io/fio.py
+++ b/wally/suits/io/fio.py
@@ -9,7 +9,8 @@
from ...node_interfaces import IRPCNode
from ...node_utils import get_os
from ..itest import ThreadedTest
-from ...result_classes import TimeSeries, DataSource, TestJobConfig
+from ...result_classes import TimeSeries, DataSource
+from ..job import JobConfig
from .fio_task_parser import execution_time, fio_cfg_compile, FioJobConfig, FioParams, get_log_files
from . import rpc_plugin
from .fio_hist import expected_lat_bins
@@ -140,14 +141,14 @@
node.copy_file(fio_path, bz_dest, compress=False)
node.run("bzip2 --decompress {} ; chmod a+x {}".format(bz_dest, self.join_remote("fio")))
- def get_expected_runtime(self, job_config: TestJobConfig) -> int:
+ def get_expected_runtime(self, job_config: JobConfig) -> int:
return execution_time(cast(FioJobConfig, job_config))
- def prepare_iteration(self, node: IRPCNode, job: TestJobConfig) -> None:
+ def prepare_iteration(self, node: IRPCNode, job: JobConfig) -> None:
node.put_to_file(self.remote_task_file, str(job).encode("utf8"))
# TODO: get a link to substorage as a parameter
- def run_iteration(self, node: IRPCNode, job: TestJobConfig) -> List[TimeSeries]:
+ def run_iteration(self, node: IRPCNode, job: JobConfig) -> List[TimeSeries]:
exec_time = execution_time(cast(FioJobConfig, job))
diff --git a/wally/suits/io/fio_job.py b/wally/suits/io/fio_job.py
new file mode 100644
index 0000000..0f55e91
--- /dev/null
+++ b/wally/suits/io/fio_job.py
@@ -0,0 +1,183 @@
+import abc
+import copy
+from collections import OrderedDict
+from typing import Optional, Iterator, Union, Dict, Tuple, NamedTuple, Any, cast
+
+
+from ...utils import ssize2b, b2ssize
+from ..job import JobConfig, JobParams
+
+
+Var = NamedTuple('Var', [('name', str)])
+
+
+def is_fio_opt_true(vl: Union[str, int]) -> bool:
+ return str(vl).lower() in ['1', 'true', 't', 'yes', 'y']
+
+
+class FioJobParams(JobParams):
+ """Class contains all parameters, which significantly affects fio results.
+
+ oper - operation type - read/write/randread/...
+ sync_mode - direct/sync/async/direct+sync
+ bsize - block size in KiB
+ qd - IO queue depth,
+ thcount - thread count,
+ write_perc - write perc for mixed(read+write) loads
+
+ Like block size or operation type, but not file name or file size.
+ Can be used as key in dictionary.
+ """
+
+ sync2long = {'x': "sync direct",
+ 's': "sync",
+ 'd': "direct",
+ 'a': "buffered"}
+
+ @property
+ def sync_mode_long(self) -> str:
+ return self.sync2long[self['sync_mode']]
+
+ @property
+ def summary(self) -> str:
+ """Test short summary, used mostly for file names and short image description"""
+ res = "{0[oper]}{0[sync_mode]}{0[bsize]}".format(self)
+ if self['qd'] is not None:
+ res += "_qd" + str(self['qd'])
+ if self['thcount'] not in (1, None):
+ res += "th" + str(self['thcount'])
+ if self['write_perc'] is not None:
+ res += "wr" + str(self['write_perc'])
+ return res
+
+ @property
+ def long_summary(self) -> str:
+ """Readable long summary for management and deployment engineers"""
+ res = "{0[sync_mode_long]} {0[oper]} {1}".format(self, b2ssize(self['bsize'] * 1024))
+ if self['qd'] is not None:
+ res += " QD = " + str(self['qd'])
+ if self['thcount'] not in (1, None):
+ res += " threads={0[thcount]}".format(self)
+ if self['write_perc'] is not None:
+ res += " write_perc={0[write_perc]}%".format(self)
+ return res
+
+
+class FioJobConfig(JobConfig):
+ """Fio job configuration"""
+ ds2mode = {(True, True): 'x',
+ (True, False): 's',
+ (False, True): 'd',
+ (False, False): 'a'}
+
+ op_type2short = {"randread": "rr",
+ "randwrite": "rw",
+ "read": "sr",
+ "write": "sw",
+ "randrw": "rx"}
+
+ def __init__(self, name: str, idx: int) -> None:
+ JobConfig.__init__(self, idx)
+ self.name = name
+ self._sync_mode = None # type: Optional[str]
+ self._params = None # type: Optional[Dict[str, Any]]
+
+ # ------------- BASIC PROPERTIES -----------------------------------------------------------------------------------
+
+ @property
+ def write_perc(self) -> Optional[int]:
+ try:
+ return int(self.vals["rwmixwrite"])
+ except (KeyError, TypeError):
+ try:
+ return 100 - int(self.vals["rwmixread"])
+ except (KeyError, TypeError):
+ return None
+
+ @property
+ def qd(self) -> int:
+ return int(self.vals['iodepth'])
+
+ @property
+ def bsize(self) -> int:
+ bsize = ssize2b(self.vals['blocksize'])
+ assert bsize % 1024 == 0
+ return bsize // 1024
+
+ @property
+ def oper(self) -> str:
+ return self.vals['rw']
+
+ @property
+ def op_type_short(self) -> str:
+ return self.op_type2short[self.vals['rw']]
+
+ @property
+ def thcount(self) -> int:
+ return int(self.vals.get('numjobs', 1))
+
+ @property
+ def sync_mode(self) -> str:
+ if self._sync_mode is None:
+ direct = is_fio_opt_true(self.vals.get('direct', '0')) or \
+ not is_fio_opt_true(self.vals.get('buffered', '0'))
+ sync = is_fio_opt_true(self.vals.get('sync', '0'))
+ self._sync_mode = self.ds2mode[(sync, direct)]
+ return cast(str, self._sync_mode)
+
+ # ----------- COMPLEX PROPERTIES -----------------------------------------------------------------------------------
+
+ @property
+ def params(self) -> JobParams:
+ if self._params is None:
+ self._params = dict(oper=self.oper,
+ sync_mode=self.sync_mode,
+ bsize=self.bsize,
+ qd=self.qd,
+ thcount=self.thcount,
+ write_perc=self.write_perc)
+ return cast(JobParams, FioJobParams(**cast(Dict[str, Any], self._params)))
+
+ # ------------------------------------------------------------------------------------------------------------------
+
+ def __eq__(self, o: object) -> bool:
+ if not isinstance(o, FioJobConfig):
+ return False
+ return self.vals == cast(FioJobConfig, o).vals
+
+ def copy(self) -> 'FioJobConfig':
+ return copy.deepcopy(self)
+
+ def required_vars(self) -> Iterator[Tuple[str, Var]]:
+ for name, val in self.vals.items():
+ if isinstance(val, Var):
+ yield name, val
+
+ def is_free(self) -> bool:
+ return len(list(self.required_vars())) == 0
+
+ def __str__(self) -> str:
+ res = "[{0}]\n".format(self.params.summary)
+
+ for name, val in self.vals.items():
+ if name.startswith('_') or name == name.upper():
+ continue
+ if isinstance(val, Var):
+ res += "{0}={{{1}}}\n".format(name, val.name)
+ else:
+ res += "{0}={1}\n".format(name, val)
+
+ return res
+
+ def __repr__(self) -> str:
+ return str(self)
+
+ def raw(self) -> Dict[str, Any]:
+ res = super().raw()
+ res['vals'] = list(map(list, self.vals.items()))
+ return res
+
+ @classmethod
+ def fromraw(cls, data: Dict[str, Any]) -> 'FioJobConfig':
+ data['vals'] = OrderedDict(data['vals'])
+ return cast(FioJobConfig, super().fromraw(data))
diff --git a/wally/suits/io/fio_task_parser.py b/wally/suits/io/fio_task_parser.py
index 03702ae..c1b4bc3 100644
--- a/wally/suits/io/fio_task_parser.py
+++ b/wally/suits/io/fio_task_parser.py
@@ -3,24 +3,21 @@
import re
import os
import sys
-import copy
import os.path
import argparse
import itertools
-from typing import Optional, Iterator, Union, Dict, Iterable, List, TypeVar, Callable, Tuple, NamedTuple, Any, cast
+from typing import Optional, Iterator, Union, Dict, Iterable, List, Tuple, NamedTuple, Any
from collections import OrderedDict
-from ...result_classes import TestJobConfig
-from ...utils import sec_to_str, ssize2b, b2ssize, flatmap
-
+from ...utils import sec_to_str, ssize2b, flatmap
+from .fio_job import Var, FioJobConfig
SECTION = 0
SETTING = 1
INCLUDE = 2
-Var = NamedTuple('Var', [('name', str)])
CfgLine = NamedTuple('CfgLine',
[('fname', str),
('lineno', int),
@@ -28,203 +25,6 @@
('tp', int),
('name', str),
('val', Any)])
-FioTestSumm = NamedTuple("FioTestSumm",
- [("oper", str),
- ("sync_mode", str),
- ("bsize", int),
- ("qd", int),
- ("thcount", int),
- ("write_perc", Optional[int])])
-
-
-def is_fio_opt_true(vl: Union[str, int]) -> bool:
- return str(vl).lower() in ['1', 'true', 't', 'yes', 'y']
-
-
-class FioJobConfig(TestJobConfig):
-
- ds2mode = {(True, True): 'x',
- (True, False): 's',
- (False, True): 'd',
- (False, False): 'a'}
-
- sync2long = {'x': "sync direct",
- 's': "sync",
- 'd': "direct",
- 'a': "buffered"}
-
- op_type2short = {"randread": "rr",
- "randwrite": "rw",
- "read": "sr",
- "write": "sw",
- "randrw": "rx"}
-
- def __init__(self, name: str, idx: int) -> None:
- TestJobConfig.__init__(self, idx)
- self.name = name
- self._sync_mode = None # type: Optional[str]
- self._ctuple = None # type: Optional[FioTestSumm]
- self._ctuple_no_qd = None # type: Optional[FioTestSumm]
-
- # ------------- BASIC PROPERTIES -----------------------------------------------------------------------------------
-
- @property
- def write_perc(self) -> Optional[int]:
- try:
- return int(self.vals["rwmixwrite"])
- except (KeyError, TypeError):
- try:
- return 100 - int(self.vals["rwmixread"])
- except (KeyError, TypeError):
- return None
-
- @property
- def qd(self) -> int:
- return int(self.vals['iodepth'])
-
- @property
- def bsize(self) -> int:
- return ssize2b(self.vals['blocksize']) // 1024
-
- @property
- def oper(self) -> str:
- return self.vals['rw']
-
- @property
- def op_type_short(self) -> str:
- return self.op_type2short[self.vals['rw']]
-
- @property
- def thcount(self) -> int:
- return int(self.vals.get('numjobs', 1))
-
- @property
- def sync_mode(self) -> str:
- if self._sync_mode is None:
- direct = is_fio_opt_true(self.vals.get('direct', '0')) or \
- not is_fio_opt_true(self.vals.get('buffered', '0'))
- sync = is_fio_opt_true(self.vals.get('sync', '0'))
- self._sync_mode = self.ds2mode[(sync, direct)]
- return cast(str, self._sync_mode)
-
- @property
- def sync_mode_long(self) -> str:
- return self.sync2long[self.sync_mode]
-
- # ----------- COMPLEX PROPERTIES -----------------------------------------------------------------------------------
-
- @property
- def characterized_tuple(self) -> Tuple:
- if self._ctuple is None:
- self._ctuple = FioTestSumm(oper=self.oper,
- sync_mode=self.sync_mode,
- bsize=self.bsize,
- qd=self.qd,
- thcount=self.thcount,
- write_perc=self.write_perc)
-
- return cast(Tuple, self._ctuple)
-
- @property
- def characterized_tuple_no_qd(self) -> FioTestSumm:
- if self._ctuple_no_qd is None:
- self._ctuple_no_qd = FioTestSumm(oper=self.oper,
- sync_mode=self.sync_mode,
- bsize=self.bsize,
- qd=None,
- thcount=self.thcount,
- write_perc=self.write_perc)
-
- return cast(FioTestSumm, self._ctuple_no_qd)
-
- @property
- def long_summary(self) -> str:
- res = "{0.sync_mode_long} {0.oper} {1} QD={0.qd}".format(self, b2ssize(self.bsize * 1024))
- if self.thcount != 1:
- res += " threads={}".format(self.thcount)
- if self.write_perc is not None:
- res += " write_perc={}%".format(self.write_perc)
- return res
-
- @property
- def long_summary_no_qd(self) -> str:
- res = "{0.sync_mode_long} {0.oper} {1}".format(self, b2ssize(self.bsize * 1024))
- if self.thcount != 1:
- res += " threads={}".format(self.thcount)
- if self.write_perc is not None:
- res += " write_perc={}%".format(self.write_perc)
- return res
-
- @property
- def summary(self) -> str:
- tpl = cast(FioTestSumm, self.characterized_tuple)
- res = "{0.oper}{0.sync_mode}{0.bsize}_qd{0.qd}".format(tpl)
-
- if tpl.thcount != 1:
- res += "th" + str(tpl.thcount)
- if tpl.write_perc != 1:
- res += "wr" + str(tpl.write_perc)
-
- return res
-
- @property
- def summary_no_qd(self) -> str:
- tpl = cast(FioTestSumm, self.characterized_tuple)
- res = "{0.oper}{0.sync_mode}{0.bsize}".format(tpl)
-
- if tpl.thcount != 1:
- res += "th" + str(tpl.thcount)
- if tpl.write_perc != 1:
- res += "wr" + str(tpl.write_perc)
-
- return res
- # ------------------------------------------------------------------------------------------------------------------
-
- def __eq__(self, o: object) -> bool:
- if not isinstance(o, FioJobConfig):
- return False
- return self.vals == cast(FioJobConfig, o).vals
-
- def copy(self) -> 'FioJobConfig':
- return copy.deepcopy(self)
-
- def required_vars(self) -> Iterator[Tuple[str, Var]]:
- for name, val in self.vals.items():
- if isinstance(val, Var):
- yield name, val
-
- def is_free(self) -> bool:
- return len(list(self.required_vars())) == 0
-
- def __str__(self) -> str:
- res = "[{0}]\n".format(self.summary)
-
- for name, val in self.vals.items():
- if name.startswith('_') or name == name.upper():
- continue
- if isinstance(val, Var):
- res += "{0}={{{1}}}\n".format(name, val.name)
- else:
- res += "{0}={1}\n".format(name, val)
-
- return res
-
- def __repr__(self) -> str:
- return str(self)
-
- def raw(self) -> Dict[str, Any]:
- res = self.__dict__.copy()
- del res['_sync_mode']
- res['vals'] = [[key, val] for key, val in self.vals.items()]
- return res
-
- @classmethod
- def fromraw(cls, data: Dict[str, Any]) -> 'FioJobConfig':
- obj = cls.__new__(cls)
- data['vals'] = OrderedDict(data['vals'])
- data['_sync_mode'] = None
- obj.__dict__.update(data)
- return obj
class ParseError(ValueError):
diff --git a/wally/suits/job.py b/wally/suits/job.py
new file mode 100644
index 0000000..ce32e0e
--- /dev/null
+++ b/wally/suits/job.py
@@ -0,0 +1,61 @@
+import abc
+from typing import Dict, Any, Tuple
+from collections import OrderedDict
+
+from ..common_types import Storable
+
+
+class JobParams(metaclass=abc.ABCMeta):
+ """Class contains all job parameters, which significantly affects job results.
+ Like block size or operation type, but not file name or file size.
+ Can be used as key in dictionary
+ """
+
+ def __init__(self, **params: Dict[str, Any]) -> None:
+ self.params = params
+
+ @abc.abstractproperty
+ def summary(self) -> str:
+ """Test short summary, used mostly for file names and short image description"""
+ pass
+
+ @abc.abstractproperty
+ def long_summary(self) -> str:
+ """Readable long summary for management and deployment engineers"""
+ pass
+
+ def __getitem__(self, name: str) -> Any:
+ return self.params[name]
+
+ def __setitem__(self, name: str, val: Any) -> None:
+ self.params[name] = val
+
+ def __hash__(self) -> int:
+ return hash(tuple(sorted(self.params.items())))
+
+ def __eq__(self, o: 'JobParams') -> bool:
+ return sorted(self.params.items()) == sorted(o.params.items())
+
+
+class JobConfig(Storable, metaclass=abc.ABCMeta):
+ """Job config class"""
+
+ def __init__(self, idx: int) -> None:
+ # job id, used in storage to distinct jobs with same summary
+ self.idx = idx
+
+ # time interval, in seconds, when test was running on all nodes
+ self.reliable_info_time_range = None # type: Tuple[int, int]
+
+ # all job parameters, both from suite file and config file
+ self.vals = OrderedDict() # type: Dict[str, Any]
+
+ @property
+ def storage_id(self) -> str:
+ """unique string, used as key in storage"""
+ return "{}_{}".format(self.params.summary, self.idx)
+
+ @abc.abstractproperty
+ def params(self) -> JobParams:
+ """Should return a copy"""
+ pass
diff --git a/wally/utils.py b/wally/utils.py
index 78235a8..4551952 100644
--- a/wally/utils.py
+++ b/wally/utils.py
@@ -11,6 +11,8 @@
import threading
import contextlib
import subprocess
+from fractions import Fraction
+
from typing import Any, Tuple, Union, List, Iterator, Iterable, Optional, IO, cast, TypeVar, Callable
@@ -404,3 +406,35 @@
yield res
+_coefs = {
+ 'n': Fraction(1, 1000**3),
+ 'u': Fraction(1, 1000**2),
+ 'm': Fraction(1, 1000),
+ 'K': 1000,
+ 'M': 1000 ** 2,
+ 'G': 1000 ** 3,
+ 'Ki': 1024,
+ 'Mi': 1024 ** 2,
+ 'Gi': 1024 ** 3,
+}
+
+
+def split_unit(units: str) -> Tuple[Union[Fraction, int], str]:
+ if len(units) > 2 and units[:2] in _coefs:
+ return _coefs[units[:2]], units[2:]
+ if len(units) > 1 and units[0] in _coefs:
+ return _coefs[units[0]], units[1:]
+ else:
+ return Fraction(1), units
+
+
+def unit_conversion_coef(from_unit: str, to_unit: str) -> Union[Fraction, int]:
+ f1, u1 = split_unit(from_unit)
+ f2, u2 = split_unit(to_unit)
+
+ assert u1 == u2, "Can't convert {!r} to {!r}".format(from_unit, to_unit)
+
+ if isinstance(int, f1) and isinstance(int, f2) and f1 % f2 != 0:
+ return Fraction(f1, f2)
+
+ return f1 // f2