blob: 9dffb4961dcb5666887336650f30a9c37be1a3be [file] [log] [blame]
import copy
from collections import OrderedDict
from typing import Optional, Iterator, Union, Dict, Tuple, Any, cast
from cephlib.units import ssize2b, b2ssize
from ..job import JobConfig, JobParams, Var
def is_fio_opt_true(vl: Union[str, int]) -> bool:
return str(vl).lower() in ['1', 'true', 't', 'yes', 'y']
class FioJobParams(JobParams):
"""Class contains all parameters, which significantly affects fio results.
oper - operation type - read/write/randread/...
sync_mode - direct/sync/async/direct+sync
bsize - block size in KiB
qd - IO queue depth,
thcount - thread count,
write_perc - write perc for mixed(read+write) loads
Like block size or operation type, but not file name or file size.
Can be used as key in dictionary.
"""
sync2long = {'x': "sync direct",
's': "sync",
'd': "direct",
'a': "buffered"}
@property
def sync_mode_long(self) -> str:
return self.sync2long[self['sync_mode']]
@property
def summary(self) -> str:
"""Test short summary, used mostly for file names and short image description"""
res = f"{self['oper_short']}{self['sync_mode']}{self['bsize']}"
if self['qd'] is not None:
res += "_qd" + str(self['qd'])
if self['thcount'] not in (1, None):
res += "th" + str(self['thcount'])
if self['write_perc'] is not None:
res += "wr" + str(self['write_perc'])
return res
@property
def long_summary(self) -> str:
"""Readable long summary for management and deployment engineers"""
res = f"{self['oper']}, {self.sync_mode_long}, block size {b2ssize(self['bsize'] * 1024)}B"
if self['qd'] is not None:
res += ", QD = " + str(self['qd'])
if self['thcount'] not in (1, None):
res += f", threads={self['thcount']}"
if self['write_perc'] is not None:
res += f", fwrite_perc={self['write_perc']}%"
return res
def copy(self, **kwargs: Dict[str, Any]) -> 'FioJobParams':
np = self.params.copy()
np.update(kwargs)
return self.__class__(**np)
@property
def char_tpl(self) -> Tuple[Union[str, int], ...]:
mint = lambda x: -10000000000 if x is None else int(x)
return self['oper'], mint(self['bsize']), self['sync_mode'], \
mint(self['thcount']), mint(self['qd']), mint(self['write_perc'])
class FioJobConfig(JobConfig):
"""Fio job configuration"""
ds2mode = {(True, True): 'x',
(True, False): 's',
(False, True): 'd',
(False, False): 'a'}
op_type2short = {"randread": "rr",
"randwrite": "rw",
"read": "sr",
"write": "sw",
"randrw": "rx"}
def __init__(self, name: str, idx: int) -> None:
JobConfig.__init__(self, idx)
self.name = name
self._sync_mode: Optional[str] = None
self._params: Optional[Dict[str, Any]] = None
# ------------- BASIC PROPERTIES -----------------------------------------------------------------------------------
@property
def write_perc(self) -> Optional[int]:
try:
return int(self.vals["rwmixwrite"]) # type: ignore
except (KeyError, TypeError):
try:
return 100 - int(self.vals["rwmixread"]) # type: ignore
except (KeyError, TypeError):
return None
@property
def qd(self) -> int:
return int(self.vals.get('iodepth', '1')) # type: ignore
@property
def bsize(self) -> int:
bsize = ssize2b(self.vals['blocksize'])
assert bsize % 1024 == 0
return bsize // 1024
@property
def oper(self) -> str:
vl = self.vals['rw']
return vl if ':' not in vl else vl.split(":")[0] # type: ignore
@property
def op_type_short(self) -> str:
return self.op_type2short[self.oper]
@property
def thcount(self) -> int:
return int(self.vals.get('numjobs', 1)) # type: ignore
@property
def sync_mode(self) -> str:
if self._sync_mode is None:
direct = is_fio_opt_true(self.vals.get('direct', '0')) # type: ignore
direct = direct or not is_fio_opt_true(self.vals.get('buffered', '0')) # type: ignore
sync = is_fio_opt_true(self.vals.get('sync', '0')) # type: ignore
self._sync_mode = self.ds2mode[(sync, direct)]
return cast(str, self._sync_mode)
# ----------- COMPLEX PROPERTIES -----------------------------------------------------------------------------------
@property
def params(self) -> JobParams:
if self._params is None:
self._params = dict(oper=self.oper,
oper_short=self.op_type_short,
sync_mode=self.sync_mode,
bsize=self.bsize,
qd=self.qd,
thcount=self.thcount,
write_perc=self.write_perc)
return cast(JobParams, FioJobParams(**cast(Dict[str, Any], self._params)))
# ------------------------------------------------------------------------------------------------------------------
def __eq__(self, o: object) -> bool:
if not isinstance(o, FioJobConfig):
return False
return dict(self.vals) == dict(cast(FioJobConfig, o).vals)
def copy(self) -> 'FioJobConfig':
return copy.deepcopy(self)
def required_vars(self) -> Iterator[Tuple[str, Var]]:
for name, val in self.vals.items():
if isinstance(val, Var):
yield name, val
def is_free(self) -> bool:
return len(list(self.required_vars())) == 0
def __str__(self) -> str:
res = "[{0}]\n".format(self.summary)
for name, val in self.vals.items():
if name.startswith('_') or name == name.upper():
continue
if isinstance(val, Var):
res += "{0}={{{1}}}\n".format(name, val.name)
else:
res += "{0}={1}\n".format(name, val)
return res
def __repr__(self) -> str:
return str(self)
def raw(self) -> Dict[str, Any]:
res = super().raw()
res['vals'] = list(map(list, self.vals.items()))
return res
@classmethod
def fromraw(cls, data: Dict[str, Any]) -> 'FioJobConfig':
data['vals'] = OrderedDict(data['vals'])
data['_sync_mode'] = None
data['_params'] = None
return cast(FioJobConfig, super().fromraw(data))