remove fuel support, many bugfixes, add sudo support for some cmd, add default ssh user
diff --git a/wally/suits/io/defaults_qd.cfg b/wally/suits/io/defaults_qd.cfg
index c3dee19..ee10055 100644
--- a/wally/suits/io/defaults_qd.cfg
+++ b/wally/suits/io/defaults_qd.cfg
@@ -21,5 +21,6 @@
write_bw_log=fio_bw_log
log_avg_msec=1000
write_hist_log=fio_lat_hist_log
+log_hist_coarseness=0
log_hist_msec=1000
log_unix_epoch=1
diff --git a/wally/suits/io/fio.py b/wally/suits/io/fio.py
index 374e5a4..693a547 100644
--- a/wally/suits/io/fio.py
+++ b/wally/suits/io/fio.py
@@ -1,6 +1,6 @@
import os.path
import logging
-from typing import cast, Any, List, Union, Tuple, Optional
+from typing import cast, Any, List, Union
import numpy
@@ -14,7 +14,7 @@
from ..job import JobConfig
from .fio_task_parser import execution_time, fio_cfg_compile, FioJobConfig, FioParams, get_log_files
from . import rpc_plugin
-from .fio_hist import get_lat_vals, expected_lat_bins
+from .fio_hist import get_lat_vals
logger = logging.getLogger("wally")
@@ -209,7 +209,7 @@
raise StopTestError()
# TODO: fix units, need to get array type from stream
-
+ open("/tmp/tt", 'wb').write(raw_result)
parsed = [] # type: List[Union[List[int], int]]
times = []
@@ -223,11 +223,11 @@
if name == 'lat':
vals = [int(i.strip()) for i in rest]
- if len(vals) != expected_lat_bins:
- msg = "Expect {} bins in latency histogram, but found {} at time {}" \
- .format(expected_lat_bins, len(vals), time_ms_s)
- logger.error(msg)
- raise StopTestError(msg)
+ # if len(vals) != expected_lat_bins:
+ # msg = f"Expect {expected_lat_bins} bins in latency histogram, " + \
+ # f"but found {len(vals)} at time {time_ms_s}"
+ # logger.error(msg)
+ # raise StopTestError(msg)
parsed.append(vals)
else:
@@ -238,7 +238,7 @@
assert not self.suite.keep_raw_files, "keep_raw_files is not supported"
- histo_bins = None if name != 'lat' else numpy.array(get_lat_vals())
+ histo_bins = None if name != 'lat' else numpy.array(get_lat_vals(len(parsed[0])))
ts = TimeSeries(data=numpy.array(parsed, dtype='uint64'),
units=units,
times=numpy.array(times, dtype='uint64'),
diff --git a/wally/suits/io/fio_hist.py b/wally/suits/io/fio_hist.py
index a2ded70..fc32d0d 100644
--- a/wally/suits/io/fio_hist.py
+++ b/wally/suits/io/fio_hist.py
@@ -1,9 +1,6 @@
from typing import List
-expected_lat_bins = 1216
-
-
#---------------------------- FIO HIST LOG PARSE CODE -----------------------------------------------------------------
# Copy-paste from fio/tools/hist/fiologparser_hist.py.
@@ -52,6 +49,12 @@
return lower + (upper - lower) * edge
-def get_lat_vals(columns: int = expected_lat_bins, coarseness: int = 0) -> List[float]:
- return [plat_idx_to_val_coarse(val, coarseness) for val in range(columns)]
+def get_lat_vals(columns: int, coarseness: int = 0) -> List[float]:
+ # convert ns to ms
+ if columns == 1216:
+ coef = 1
+ elif columns == 1856:
+ coef = 1000
+
+ return [plat_idx_to_val_coarse(val, coarseness) / coef for val in range(columns)]
diff --git a/wally/suits/io/fio_job.py b/wally/suits/io/fio_job.py
index 6676895..9dffb49 100644
--- a/wally/suits/io/fio_job.py
+++ b/wally/suits/io/fio_job.py
@@ -1,13 +1,10 @@
import copy
from collections import OrderedDict
-from typing import Optional, Iterator, Union, Dict, Tuple, NamedTuple, Any, cast
+from typing import Optional, Iterator, Union, Dict, Tuple, Any, cast
from cephlib.units import ssize2b, b2ssize
-from ..job import JobConfig, JobParams
-
-
-Var = NamedTuple('Var', [('name', str)])
+from ..job import JobConfig, JobParams, Var
def is_fio_opt_true(vl: Union[str, int]) -> bool:
@@ -40,7 +37,7 @@
@property
def summary(self) -> str:
"""Test short summary, used mostly for file names and short image description"""
- res = "{0[oper_short]}{0[sync_mode]}{0[bsize]}".format(self)
+ res = f"{self['oper_short']}{self['sync_mode']}{self['bsize']}"
if self['qd'] is not None:
res += "_qd" + str(self['qd'])
if self['thcount'] not in (1, None):
@@ -52,13 +49,13 @@
@property
def long_summary(self) -> str:
"""Readable long summary for management and deployment engineers"""
- res = "{0[oper]}, {0.sync_mode_long}, block size {1}B".format(self, b2ssize(self['bsize'] * 1024))
+ res = f"{self['oper']}, {self.sync_mode_long}, block size {b2ssize(self['bsize'] * 1024)}B"
if self['qd'] is not None:
res += ", QD = " + str(self['qd'])
if self['thcount'] not in (1, None):
- res += ", threads={0[thcount]}".format(self)
+ res += f", threads={self['thcount']}"
if self['write_perc'] is not None:
- res += ", write_perc={0[write_perc]}%".format(self)
+ res += f", fwrite_perc={self['write_perc']}%"
return res
def copy(self, **kwargs: Dict[str, Any]) -> 'FioJobParams':
@@ -89,24 +86,24 @@
def __init__(self, name: str, idx: int) -> None:
JobConfig.__init__(self, idx)
self.name = name
- self._sync_mode = None # type: Optional[str]
- self._params = None # type: Optional[Dict[str, Any]]
+ self._sync_mode: Optional[str] = None
+ self._params: Optional[Dict[str, Any]] = None
# ------------- BASIC PROPERTIES -----------------------------------------------------------------------------------
@property
def write_perc(self) -> Optional[int]:
try:
- return int(self.vals["rwmixwrite"])
+ return int(self.vals["rwmixwrite"]) # type: ignore
except (KeyError, TypeError):
try:
- return 100 - int(self.vals["rwmixread"])
+ return 100 - int(self.vals["rwmixread"]) # type: ignore
except (KeyError, TypeError):
return None
@property
def qd(self) -> int:
- return int(self.vals.get('iodepth', '1'))
+ return int(self.vals.get('iodepth', '1')) # type: ignore
@property
def bsize(self) -> int:
@@ -117,7 +114,7 @@
@property
def oper(self) -> str:
vl = self.vals['rw']
- return vl if ':' not in vl else vl.split(":")[0]
+ return vl if ':' not in vl else vl.split(":")[0] # type: ignore
@property
def op_type_short(self) -> str:
@@ -125,14 +122,14 @@
@property
def thcount(self) -> int:
- return int(self.vals.get('numjobs', 1))
+ return int(self.vals.get('numjobs', 1)) # type: ignore
@property
def sync_mode(self) -> str:
if self._sync_mode is None:
- direct = is_fio_opt_true(self.vals.get('direct', '0')) or \
- not is_fio_opt_true(self.vals.get('buffered', '0'))
- sync = is_fio_opt_true(self.vals.get('sync', '0'))
+ direct = is_fio_opt_true(self.vals.get('direct', '0')) # type: ignore
+ direct = direct or not is_fio_opt_true(self.vals.get('buffered', '0')) # type: ignore
+ sync = is_fio_opt_true(self.vals.get('sync', '0')) # type: ignore
self._sync_mode = self.ds2mode[(sync, direct)]
return cast(str, self._sync_mode)
diff --git a/wally/suits/io/fio_task_parser.py b/wally/suits/io/fio_task_parser.py
index 5b91885..a9e13dc 100644
--- a/wally/suits/io/fio_task_parser.py
+++ b/wally/suits/io/fio_task_parser.py
@@ -1,9 +1,8 @@
#!/usr/bin/env python3
import re
-import os
import sys
-import os.path
+import pathlib
import argparse
import itertools
from typing import Optional, Iterator, Union, Dict, Iterable, List, Tuple, NamedTuple, Any
@@ -104,34 +103,30 @@
def fio_config_parse(lexer_iter: Iterable[CfgLine]) -> Iterator[FioJobConfig]:
in_globals = False
curr_section = None
- glob_vals = OrderedDict() # type: Dict[str, Any]
+ glob_vals: Dict[str, Any] = OrderedDict()
sections_count = 0
- lexed_lines = list(lexer_iter) # type: List[CfgLine]
+ lexed_lines: List[CfgLine] = list(lexer_iter)
one_more = True
- includes = {}
+ includes: Dict[str, Tuple[str, int]] = {}
while one_more:
- new_lines = [] # type: List[CfgLine]
+ new_lines: List[CfgLine] = []
one_more = False
for line in lexed_lines:
fname, lineno, oline, tp, name, val = line
if INCLUDE == tp:
- if not os.path.exists(fname):
- dirname = '.'
- else:
- dirname = os.path.dirname(fname)
-
- new_fname = os.path.join(dirname, name)
- includes[new_fname] = (fname, lineno)
+ fobj = pathlib.Path(fname)
+ new_fname = (fobj.parent / name) if fobj.exists() else pathlib.Path(name)
+ includes[str(new_fname)] = (fname, lineno)
try:
- cont = open(new_fname).read()
+ cont = new_fname.open().read()
except IOError as err:
- raise ParseError("Error while including file {}: {}".format(new_fname, err), fname, lineno, oline)
+ raise ParseError(f"Error while including file {new_fname}: {err}", fname, lineno, oline)
- new_lines.extend(fio_config_lexer(cont, new_fname))
+ new_lines.extend(fio_config_lexer(cont, str(new_fname)))
one_more = True
else:
new_lines.append(line)
@@ -161,7 +156,7 @@
if in_globals:
glob_vals[name] = val
elif name == name.upper():
- raise ParseError("Param {!r} not in [global] section".format(name), fname, lineno, oline)
+ raise ParseError(f"Param {name!r} not in [global] section", fname, lineno, oline)
elif curr_section is None:
raise ParseError("Data outside section", fname, lineno, oline)
else:
@@ -172,7 +167,7 @@
def process_cycles(sec: FioJobConfig) -> Iterator[FioJobConfig]:
- cycles = OrderedDict() # type: Dict[str, Any]
+ cycles: Dict[str, Any] = OrderedDict()
for name, val in sec.vals.items():
if isinstance(val, list) and name.upper() != name:
@@ -203,12 +198,12 @@
yield new_sec
-FioParamsVal = Union[str, Var]
+FioParamsVal = Union[str, Var, int]
FioParams = Dict[str, FioParamsVal]
def apply_params(sec: FioJobConfig, params: FioParams) -> FioJobConfig:
- processed_vals = OrderedDict() # type: Dict[str, Any]
+ processed_vals: Dict[str, Any] = OrderedDict()
processed_vals.update(params)
for name, val in sec.vals.items():
@@ -251,8 +246,7 @@
sec.vals['unified_rw_reporting'] = '1'
if isinstance(sec.vals['size'], Var):
- raise ValueError("Variable {0} isn't provided".format(
- sec.vals['size'].name))
+ raise ValueError(f"Variable {sec.vals['size'].name} isn't provided")
sz = ssize2b(sec.vals['size'])
offset = sz * ((MAGIC_OFFSET * counter[0]) % 1.0)
@@ -266,7 +260,7 @@
for vl in sec.vals.values():
if isinstance(vl, Var):
- raise ValueError("Variable {0} isn't provided".format(vl.name))
+ raise ValueError(f"Variable {vl.name} isn't provided")
params = sec.vals.copy()
params['UNIQ'] = 'UN{0}'.format(counter[0])
@@ -282,13 +276,11 @@
return sec.vals.get('ramp_time', 0) + sec.vals.get('runtime', 0)
-def parse_all_in_1(source:str, fname: str = None) -> Iterator[FioJobConfig]:
+def parse_all_in_1(source:str, fname: str) -> Iterator[FioJobConfig]:
return fio_config_parse(fio_config_lexer(source, fname))
def get_log_files(sec: FioJobConfig, iops: bool = False) -> Iterator[Tuple[str, str, str]]:
- res = [] # type: List[Tuple[str, str, str]]
-
keys = [('write_bw_log', 'bw', 'KiBps'),
('write_hist_log', 'lat', 'us')]
if iops:
@@ -304,8 +296,8 @@
test_params = test_params.copy()
if 'RAMPTIME' not in test_params and 'RUNTIME' in test_params:
- ramp = int(int(test_params['RUNTIME']) * 0.05)
- test_params['RAMPTIME'] = min(30, max(5, ramp))
+ ramp = int(int(test_params['RUNTIME']) * 0.05) # type: ignore
+ test_params['RAMPTIME'] = min(30, max(5, ramp)) # type: ignore
it = parse_all_in_1(source, fname)
it = (apply_params(sec, test_params) for sec in it)
diff --git a/wally/suits/io/rrd_raw.cfg b/wally/suits/io/rrd_raw.cfg
index 2b0fc74..e7ca82d 100644
--- a/wally/suits/io/rrd_raw.cfg
+++ b/wally/suits/io/rrd_raw.cfg
@@ -1,6 +1,6 @@
[test]
-blocksize=4k
-rw=randread
+blocksize=4m
+rw=write
iodepth=1
ramp_time=0
runtime=120
@@ -17,5 +17,8 @@
wait_for_previous=1
per_job_logs=0
randrepeat=0
-filename=/dev/sdb
-size=100G
\ No newline at end of file
+filename=/media/data/test.db
+size=50G
+;verify_pattern=0x00
+buffer_compress_percentage=99
+write_bw_log=/tmp/bw.non-compress.log
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index 26fd252..111e6bb 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -26,11 +26,12 @@
retry_time = 30
job_config_cls = None # type: type
- def __init__(self, storage: IWallyStorage, suite: SuiteConfig, on_idle: Callable[[], None] = None) -> None:
+ def __init__(self, storage: IWallyStorage, suite: SuiteConfig,
+ on_tests_boundry: Callable[[bool], None] = None) -> None:
self.suite = suite
self.stop_requested = False
self.sorted_nodes_ids = sorted(node.node_id for node in self.suite.nodes)
- self.on_idle = on_idle
+ self.on_tests_boundry = on_tests_boundry
self.storage = storage
def request_stop(self) -> None:
@@ -55,11 +56,11 @@
# used_max_diff = max((min_run_time * max_rel_time_diff), max_time_diff)
max_time_diff = 5
max_rel_time_diff = 0.05
- load_profile_name = None # type: str
+ load_profile_name: str = None # type: ignore
def __init__(self, *args, **kwargs) -> None:
PerfTest.__init__(self, *args, **kwargs)
- self.job_configs = None # type: List[JobConfig]
+ self.job_configs: List[JobConfig] = None # type: ignore
@abc.abstractmethod
def get_expected_runtime(self, iter_cfg: JobConfig) -> Optional[int]:
@@ -107,13 +108,12 @@
if None not in run_times:
# +10s - is a rough estimation for additional operations per iteration
- expected_run_time = int(sum(run_times) + 10 * len(not_in_storage))
-
+ expected_run_time: int = int(sum(run_times) + 10 * len(not_in_storage)) # type: ignore
exec_time_s, end_dt_s = get_time_interval_printable_info(expected_run_time)
logger.info("Entire test should takes around %s and finish at %s", exec_time_s, end_dt_s)
for job in not_in_storage:
- results = [] # type: List[TimeSeries]
+ results: List[TimeSeries] = []
for retry_idx in range(self.max_retry):
logger.info("Preparing job %s", job.params.summary)
@@ -121,8 +121,14 @@
wait([pool.submit(self.prepare_iteration, node, job) for node in self.suite.nodes])
expected_job_time = self.get_expected_runtime(job)
- exec_time_s, end_dt_s = get_time_interval_printable_info(expected_job_time)
- logger.info("Job should takes around %s and finish at %s", exec_time_s, end_dt_s)
+ if expected_job_time is None:
+ logger.info("Job execution time is unknown")
+ else:
+ exec_time_s, end_dt_s = get_time_interval_printable_info(expected_job_time)
+ logger.info("Job should takes around %s and finish at %s", exec_time_s, end_dt_s)
+
+ if self.on_tests_boundry is not None:
+ self.on_tests_boundry(True)
jfutures = [pool.submit(self.run_iteration, node, job) for node in self.suite.nodes]
failed = False
@@ -132,6 +138,9 @@
except EnvironmentError:
failed = True
+ if self.on_tests_boundry is not None:
+ self.on_tests_boundry(False)
+
if not failed:
break
@@ -145,8 +154,8 @@
results = []
# per node jobs start and stop times
- start_times = [] # type: List[int]
- stop_times = [] # type: List[int]
+ start_times: List[int] = []
+ stop_times: List[int] = []
for ts in results:
self.storage.put_ts(ts)
@@ -180,8 +189,6 @@
self.storage.put_job(self.suite, job)
self.storage.sync()
- if self.on_idle is not None:
- self.on_idle()
@abc.abstractmethod
def config_node(self, node: IRPCNode) -> None:
diff --git a/wally/suits/job.py b/wally/suits/job.py
index 8ef1093..d336807 100644
--- a/wally/suits/job.py
+++ b/wally/suits/job.py
@@ -1,10 +1,13 @@
import abc
-from typing import Dict, Any, Tuple, cast, Union
+from typing import Dict, Any, Tuple, cast, Union, NamedTuple
from collections import OrderedDict
from cephlib.istorage import Storable
+Var = NamedTuple('Var', [('name', str)])
+
+
class JobParams(metaclass=abc.ABCMeta):
"""Class contains all job parameters, which significantly affects job results.
Like block size or operation type, but not file name or file size.
@@ -41,12 +44,12 @@
def __eq__(self, o: object) -> bool:
if not isinstance(o, self.__class__):
- raise TypeError("Can't compare {!r} to {!r}".format(self.__class__.__qualname__, type(o).__qualname__))
+ raise TypeError(f"Can't compare {self.__class__.__qualname__!r} to {type(o).__qualname__!r}")
return sorted(self.params.items()) == sorted(cast(JobParams, o).params.items())
def __lt__(self, o: object) -> bool:
if not isinstance(o, self.__class__):
- raise TypeError("Can't compare {!r} to {!r}".format(self.__class__.__qualname__, type(o).__qualname__))
+ raise TypeError(f"Can't compare {self.__class__.__qualname__!r} to {type(o).__qualname__!r}")
return self.char_tpl < cast(JobParams, o).char_tpl
@property
@@ -63,11 +66,10 @@
self.idx = idx
# time interval, in seconds, when test was running on all nodes
- self.reliable_info_range = None # type: Tuple[int, int]
-
+ self.reliable_info_range: Tuple[int, int] = None # type: ignore
# all job parameters, both from suite file and config file
- self.vals = OrderedDict() # type: Dict[str, Any]
+ self.vals: Dict[str, Any] = OrderedDict()
@property
def reliable_info_range_s(self) -> Tuple[int, int]:
@@ -76,7 +78,7 @@
@property
def storage_id(self) -> str:
"""unique string, used as key in storage"""
- return "{}_{}".format(self.summary, self.idx)
+ return f"{self.summary}_{self.idx}"
@property
@abc.abstractmethod