continue refactoring for report
diff --git a/wally/suits/io/fio.py b/wally/suits/io/fio.py
index 33e8343..bf2e6b3 100644
--- a/wally/suits/io/fio.py
+++ b/wally/suits/io/fio.py
@@ -9,7 +9,8 @@
from ...node_interfaces import IRPCNode
from ...node_utils import get_os
from ..itest import ThreadedTest
-from ...result_classes import TimeSeries, DataSource, TestJobConfig
+from ...result_classes import TimeSeries, DataSource
+from ..job import JobConfig
from .fio_task_parser import execution_time, fio_cfg_compile, FioJobConfig, FioParams, get_log_files
from . import rpc_plugin
from .fio_hist import expected_lat_bins
@@ -140,14 +141,14 @@
node.copy_file(fio_path, bz_dest, compress=False)
node.run("bzip2 --decompress {} ; chmod a+x {}".format(bz_dest, self.join_remote("fio")))
- def get_expected_runtime(self, job_config: TestJobConfig) -> int:
+ def get_expected_runtime(self, job_config: JobConfig) -> int:
return execution_time(cast(FioJobConfig, job_config))
- def prepare_iteration(self, node: IRPCNode, job: TestJobConfig) -> None:
+ def prepare_iteration(self, node: IRPCNode, job: JobConfig) -> None:
node.put_to_file(self.remote_task_file, str(job).encode("utf8"))
# TODO: get a link to substorage as a parameter
- def run_iteration(self, node: IRPCNode, job: TestJobConfig) -> List[TimeSeries]:
+ def run_iteration(self, node: IRPCNode, job: JobConfig) -> List[TimeSeries]:
exec_time = execution_time(cast(FioJobConfig, job))
diff --git a/wally/suits/io/fio_job.py b/wally/suits/io/fio_job.py
new file mode 100644
index 0000000..0f55e91
--- /dev/null
+++ b/wally/suits/io/fio_job.py
@@ -0,0 +1,183 @@
+import abc
+import copy
+from collections import OrderedDict
+from typing import Optional, Iterator, Union, Dict, Tuple, NamedTuple, Any, cast
+
+
+from ...utils import ssize2b, b2ssize
+from ..job import JobConfig, JobParams
+
+
+Var = NamedTuple('Var', [('name', str)])
+
+
+def is_fio_opt_true(vl: Union[str, int]) -> bool:
+ return str(vl).lower() in ['1', 'true', 't', 'yes', 'y']
+
+
+class FioJobParams(JobParams):
+ """Class contains all parameters, which significantly affects fio results.
+
+ oper - operation type - read/write/randread/...
+ sync_mode - direct/sync/async/direct+sync
+ bsize - block size in KiB
+ qd - IO queue depth,
+ thcount - thread count,
+ write_perc - write perc for mixed(read+write) loads
+
+ Like block size or operation type, but not file name or file size.
+ Can be used as key in dictionary.
+ """
+
+ sync2long = {'x': "sync direct",
+ 's': "sync",
+ 'd': "direct",
+ 'a': "buffered"}
+
+ @property
+ def sync_mode_long(self) -> str:
+ return self.sync2long[self['sync_mode']]
+
+ @property
+ def summary(self) -> str:
+ """Test short summary, used mostly for file names and short image description"""
+ res = "{0[oper]}{0[sync_mode]}{0[bsize]}".format(self)
+ if self['qd'] is not None:
+ res += "_qd" + str(self['qd'])
+ if self['thcount'] not in (1, None):
+ res += "th" + str(self['thcount'])
+ if self['write_perc'] is not None:
+ res += "wr" + str(self['write_perc'])
+ return res
+
+ @property
+ def long_summary(self) -> str:
+ """Readable long summary for management and deployment engineers"""
+ res = "{0[sync_mode_long]} {0[oper]} {1}".format(self, b2ssize(self['bsize'] * 1024))
+ if self['qd'] is not None:
+ res += " QD = " + str(self['qd'])
+ if self['thcount'] not in (1, None):
+ res += " threads={0[thcount]}".format(self)
+ if self['write_perc'] is not None:
+ res += " write_perc={0[write_perc]}%".format(self)
+ return res
+
+
+class FioJobConfig(JobConfig):
+ """Fio job configuration"""
+ ds2mode = {(True, True): 'x',
+ (True, False): 's',
+ (False, True): 'd',
+ (False, False): 'a'}
+
+ op_type2short = {"randread": "rr",
+ "randwrite": "rw",
+ "read": "sr",
+ "write": "sw",
+ "randrw": "rx"}
+
+ def __init__(self, name: str, idx: int) -> None:
+ JobConfig.__init__(self, idx)
+ self.name = name
+ self._sync_mode = None # type: Optional[str]
+ self._params = None # type: Optional[Dict[str, Any]]
+
+ # ------------- BASIC PROPERTIES -----------------------------------------------------------------------------------
+
+ @property
+ def write_perc(self) -> Optional[int]:
+ try:
+ return int(self.vals["rwmixwrite"])
+ except (KeyError, TypeError):
+ try:
+ return 100 - int(self.vals["rwmixread"])
+ except (KeyError, TypeError):
+ return None
+
+ @property
+ def qd(self) -> int:
+ return int(self.vals['iodepth'])
+
+ @property
+ def bsize(self) -> int:
+ bsize = ssize2b(self.vals['blocksize'])
+ assert bsize % 1024 == 0
+ return bsize // 1024
+
+ @property
+ def oper(self) -> str:
+ return self.vals['rw']
+
+ @property
+ def op_type_short(self) -> str:
+ return self.op_type2short[self.vals['rw']]
+
+ @property
+ def thcount(self) -> int:
+ return int(self.vals.get('numjobs', 1))
+
+ @property
+ def sync_mode(self) -> str:
+ if self._sync_mode is None:
+ direct = is_fio_opt_true(self.vals.get('direct', '0')) or \
+ not is_fio_opt_true(self.vals.get('buffered', '0'))
+ sync = is_fio_opt_true(self.vals.get('sync', '0'))
+ self._sync_mode = self.ds2mode[(sync, direct)]
+ return cast(str, self._sync_mode)
+
+ # ----------- COMPLEX PROPERTIES -----------------------------------------------------------------------------------
+
+ @property
+ def params(self) -> JobParams:
+ if self._params is None:
+ self._params = dict(oper=self.oper,
+ sync_mode=self.sync_mode,
+ bsize=self.bsize,
+ qd=self.qd,
+ thcount=self.thcount,
+ write_perc=self.write_perc)
+ return cast(JobParams, FioJobParams(**cast(Dict[str, Any], self._params)))
+
+ # ------------------------------------------------------------------------------------------------------------------
+
+ def __eq__(self, o: object) -> bool:
+ if not isinstance(o, FioJobConfig):
+ return False
+ return self.vals == cast(FioJobConfig, o).vals
+
+ def copy(self) -> 'FioJobConfig':
+ return copy.deepcopy(self)
+
+ def required_vars(self) -> Iterator[Tuple[str, Var]]:
+ for name, val in self.vals.items():
+ if isinstance(val, Var):
+ yield name, val
+
+ def is_free(self) -> bool:
+ return len(list(self.required_vars())) == 0
+
+ def __str__(self) -> str:
+ res = "[{0}]\n".format(self.params.summary)
+
+ for name, val in self.vals.items():
+ if name.startswith('_') or name == name.upper():
+ continue
+ if isinstance(val, Var):
+ res += "{0}={{{1}}}\n".format(name, val.name)
+ else:
+ res += "{0}={1}\n".format(name, val)
+
+ return res
+
+ def __repr__(self) -> str:
+ return str(self)
+
+ def raw(self) -> Dict[str, Any]:
+ res = super().raw()
+ res['vals'] = list(map(list, self.vals.items()))
+ return res
+
+ @classmethod
+ def fromraw(cls, data: Dict[str, Any]) -> 'FioJobConfig':
+ data['vals'] = OrderedDict(data['vals'])
+ return cast(FioJobConfig, super().fromraw(data))
diff --git a/wally/suits/io/fio_task_parser.py b/wally/suits/io/fio_task_parser.py
index 03702ae..c1b4bc3 100644
--- a/wally/suits/io/fio_task_parser.py
+++ b/wally/suits/io/fio_task_parser.py
@@ -3,24 +3,21 @@
import re
import os
import sys
-import copy
import os.path
import argparse
import itertools
-from typing import Optional, Iterator, Union, Dict, Iterable, List, TypeVar, Callable, Tuple, NamedTuple, Any, cast
+from typing import Optional, Iterator, Union, Dict, Iterable, List, Tuple, NamedTuple, Any
from collections import OrderedDict
-from ...result_classes import TestJobConfig
-from ...utils import sec_to_str, ssize2b, b2ssize, flatmap
-
+from ...utils import sec_to_str, ssize2b, flatmap
+from .fio_job import Var, FioJobConfig
SECTION = 0
SETTING = 1
INCLUDE = 2
-Var = NamedTuple('Var', [('name', str)])
CfgLine = NamedTuple('CfgLine',
[('fname', str),
('lineno', int),
@@ -28,203 +25,6 @@
('tp', int),
('name', str),
('val', Any)])
-FioTestSumm = NamedTuple("FioTestSumm",
- [("oper", str),
- ("sync_mode", str),
- ("bsize", int),
- ("qd", int),
- ("thcount", int),
- ("write_perc", Optional[int])])
-
-
-def is_fio_opt_true(vl: Union[str, int]) -> bool:
- return str(vl).lower() in ['1', 'true', 't', 'yes', 'y']
-
-
-class FioJobConfig(TestJobConfig):
-
- ds2mode = {(True, True): 'x',
- (True, False): 's',
- (False, True): 'd',
- (False, False): 'a'}
-
- sync2long = {'x': "sync direct",
- 's': "sync",
- 'd': "direct",
- 'a': "buffered"}
-
- op_type2short = {"randread": "rr",
- "randwrite": "rw",
- "read": "sr",
- "write": "sw",
- "randrw": "rx"}
-
- def __init__(self, name: str, idx: int) -> None:
- TestJobConfig.__init__(self, idx)
- self.name = name
- self._sync_mode = None # type: Optional[str]
- self._ctuple = None # type: Optional[FioTestSumm]
- self._ctuple_no_qd = None # type: Optional[FioTestSumm]
-
- # ------------- BASIC PROPERTIES -----------------------------------------------------------------------------------
-
- @property
- def write_perc(self) -> Optional[int]:
- try:
- return int(self.vals["rwmixwrite"])
- except (KeyError, TypeError):
- try:
- return 100 - int(self.vals["rwmixread"])
- except (KeyError, TypeError):
- return None
-
- @property
- def qd(self) -> int:
- return int(self.vals['iodepth'])
-
- @property
- def bsize(self) -> int:
- return ssize2b(self.vals['blocksize']) // 1024
-
- @property
- def oper(self) -> str:
- return self.vals['rw']
-
- @property
- def op_type_short(self) -> str:
- return self.op_type2short[self.vals['rw']]
-
- @property
- def thcount(self) -> int:
- return int(self.vals.get('numjobs', 1))
-
- @property
- def sync_mode(self) -> str:
- if self._sync_mode is None:
- direct = is_fio_opt_true(self.vals.get('direct', '0')) or \
- not is_fio_opt_true(self.vals.get('buffered', '0'))
- sync = is_fio_opt_true(self.vals.get('sync', '0'))
- self._sync_mode = self.ds2mode[(sync, direct)]
- return cast(str, self._sync_mode)
-
- @property
- def sync_mode_long(self) -> str:
- return self.sync2long[self.sync_mode]
-
- # ----------- COMPLEX PROPERTIES -----------------------------------------------------------------------------------
-
- @property
- def characterized_tuple(self) -> Tuple:
- if self._ctuple is None:
- self._ctuple = FioTestSumm(oper=self.oper,
- sync_mode=self.sync_mode,
- bsize=self.bsize,
- qd=self.qd,
- thcount=self.thcount,
- write_perc=self.write_perc)
-
- return cast(Tuple, self._ctuple)
-
- @property
- def characterized_tuple_no_qd(self) -> FioTestSumm:
- if self._ctuple_no_qd is None:
- self._ctuple_no_qd = FioTestSumm(oper=self.oper,
- sync_mode=self.sync_mode,
- bsize=self.bsize,
- qd=None,
- thcount=self.thcount,
- write_perc=self.write_perc)
-
- return cast(FioTestSumm, self._ctuple_no_qd)
-
- @property
- def long_summary(self) -> str:
- res = "{0.sync_mode_long} {0.oper} {1} QD={0.qd}".format(self, b2ssize(self.bsize * 1024))
- if self.thcount != 1:
- res += " threads={}".format(self.thcount)
- if self.write_perc is not None:
- res += " write_perc={}%".format(self.write_perc)
- return res
-
- @property
- def long_summary_no_qd(self) -> str:
- res = "{0.sync_mode_long} {0.oper} {1}".format(self, b2ssize(self.bsize * 1024))
- if self.thcount != 1:
- res += " threads={}".format(self.thcount)
- if self.write_perc is not None:
- res += " write_perc={}%".format(self.write_perc)
- return res
-
- @property
- def summary(self) -> str:
- tpl = cast(FioTestSumm, self.characterized_tuple)
- res = "{0.oper}{0.sync_mode}{0.bsize}_qd{0.qd}".format(tpl)
-
- if tpl.thcount != 1:
- res += "th" + str(tpl.thcount)
- if tpl.write_perc != 1:
- res += "wr" + str(tpl.write_perc)
-
- return res
-
- @property
- def summary_no_qd(self) -> str:
- tpl = cast(FioTestSumm, self.characterized_tuple)
- res = "{0.oper}{0.sync_mode}{0.bsize}".format(tpl)
-
- if tpl.thcount != 1:
- res += "th" + str(tpl.thcount)
- if tpl.write_perc != 1:
- res += "wr" + str(tpl.write_perc)
-
- return res
- # ------------------------------------------------------------------------------------------------------------------
-
- def __eq__(self, o: object) -> bool:
- if not isinstance(o, FioJobConfig):
- return False
- return self.vals == cast(FioJobConfig, o).vals
-
- def copy(self) -> 'FioJobConfig':
- return copy.deepcopy(self)
-
- def required_vars(self) -> Iterator[Tuple[str, Var]]:
- for name, val in self.vals.items():
- if isinstance(val, Var):
- yield name, val
-
- def is_free(self) -> bool:
- return len(list(self.required_vars())) == 0
-
- def __str__(self) -> str:
- res = "[{0}]\n".format(self.summary)
-
- for name, val in self.vals.items():
- if name.startswith('_') or name == name.upper():
- continue
- if isinstance(val, Var):
- res += "{0}={{{1}}}\n".format(name, val.name)
- else:
- res += "{0}={1}\n".format(name, val)
-
- return res
-
- def __repr__(self) -> str:
- return str(self)
-
- def raw(self) -> Dict[str, Any]:
- res = self.__dict__.copy()
- del res['_sync_mode']
- res['vals'] = [[key, val] for key, val in self.vals.items()]
- return res
-
- @classmethod
- def fromraw(cls, data: Dict[str, Any]) -> 'FioJobConfig':
- obj = cls.__new__(cls)
- data['vals'] = OrderedDict(data['vals'])
- data['_sync_mode'] = None
- obj.__dict__.update(data)
- return obj
class ParseError(ValueError):