remove fuel support, many bugfixes, add sudo support for some cmd, add default ssh user
diff --git a/wally/suits/io/defaults_qd.cfg b/wally/suits/io/defaults_qd.cfg
index c3dee19..ee10055 100644
--- a/wally/suits/io/defaults_qd.cfg
+++ b/wally/suits/io/defaults_qd.cfg
@@ -21,5 +21,6 @@
write_bw_log=fio_bw_log
log_avg_msec=1000
write_hist_log=fio_lat_hist_log
+log_hist_coarseness=0
log_hist_msec=1000
log_unix_epoch=1
diff --git a/wally/suits/io/fio.py b/wally/suits/io/fio.py
index 374e5a4..693a547 100644
--- a/wally/suits/io/fio.py
+++ b/wally/suits/io/fio.py
@@ -1,6 +1,6 @@
import os.path
import logging
-from typing import cast, Any, List, Union, Tuple, Optional
+from typing import cast, Any, List, Union
import numpy
@@ -14,7 +14,7 @@
from ..job import JobConfig
from .fio_task_parser import execution_time, fio_cfg_compile, FioJobConfig, FioParams, get_log_files
from . import rpc_plugin
-from .fio_hist import get_lat_vals, expected_lat_bins
+from .fio_hist import get_lat_vals
logger = logging.getLogger("wally")
@@ -209,7 +209,7 @@
raise StopTestError()
# TODO: fix units, need to get array type from stream
-
+ open("/tmp/tt", 'wb').write(raw_result)
parsed = [] # type: List[Union[List[int], int]]
times = []
@@ -223,11 +223,11 @@
if name == 'lat':
vals = [int(i.strip()) for i in rest]
- if len(vals) != expected_lat_bins:
- msg = "Expect {} bins in latency histogram, but found {} at time {}" \
- .format(expected_lat_bins, len(vals), time_ms_s)
- logger.error(msg)
- raise StopTestError(msg)
+ # if len(vals) != expected_lat_bins:
+ # msg = f"Expect {expected_lat_bins} bins in latency histogram, " + \
+ # f"but found {len(vals)} at time {time_ms_s}"
+ # logger.error(msg)
+ # raise StopTestError(msg)
parsed.append(vals)
else:
@@ -238,7 +238,7 @@
assert not self.suite.keep_raw_files, "keep_raw_files is not supported"
- histo_bins = None if name != 'lat' else numpy.array(get_lat_vals())
+ histo_bins = None if name != 'lat' else numpy.array(get_lat_vals(len(parsed[0])))
ts = TimeSeries(data=numpy.array(parsed, dtype='uint64'),
units=units,
times=numpy.array(times, dtype='uint64'),
diff --git a/wally/suits/io/fio_hist.py b/wally/suits/io/fio_hist.py
index a2ded70..fc32d0d 100644
--- a/wally/suits/io/fio_hist.py
+++ b/wally/suits/io/fio_hist.py
@@ -1,9 +1,6 @@
from typing import List
-expected_lat_bins = 1216
-
-
#---------------------------- FIO HIST LOG PARSE CODE -----------------------------------------------------------------
# Copy-paste from fio/tools/hist/fiologparser_hist.py.
@@ -52,6 +49,12 @@
return lower + (upper - lower) * edge
-def get_lat_vals(columns: int = expected_lat_bins, coarseness: int = 0) -> List[float]:
- return [plat_idx_to_val_coarse(val, coarseness) for val in range(columns)]
+def get_lat_vals(columns: int, coarseness: int = 0) -> List[float]:
+ # convert ns to ms
+ if columns == 1216:
+ coef = 1
+ elif columns == 1856:
+ coef = 1000
+
+ return [plat_idx_to_val_coarse(val, coarseness) / coef for val in range(columns)]
diff --git a/wally/suits/io/fio_job.py b/wally/suits/io/fio_job.py
index 6676895..9dffb49 100644
--- a/wally/suits/io/fio_job.py
+++ b/wally/suits/io/fio_job.py
@@ -1,13 +1,10 @@
import copy
from collections import OrderedDict
-from typing import Optional, Iterator, Union, Dict, Tuple, NamedTuple, Any, cast
+from typing import Optional, Iterator, Union, Dict, Tuple, Any, cast
from cephlib.units import ssize2b, b2ssize
-from ..job import JobConfig, JobParams
-
-
-Var = NamedTuple('Var', [('name', str)])
+from ..job import JobConfig, JobParams, Var
def is_fio_opt_true(vl: Union[str, int]) -> bool:
@@ -40,7 +37,7 @@
@property
def summary(self) -> str:
"""Test short summary, used mostly for file names and short image description"""
- res = "{0[oper_short]}{0[sync_mode]}{0[bsize]}".format(self)
+ res = f"{self['oper_short']}{self['sync_mode']}{self['bsize']}"
if self['qd'] is not None:
res += "_qd" + str(self['qd'])
if self['thcount'] not in (1, None):
@@ -52,13 +49,13 @@
@property
def long_summary(self) -> str:
"""Readable long summary for management and deployment engineers"""
- res = "{0[oper]}, {0.sync_mode_long}, block size {1}B".format(self, b2ssize(self['bsize'] * 1024))
+ res = f"{self['oper']}, {self.sync_mode_long}, block size {b2ssize(self['bsize'] * 1024)}B"
if self['qd'] is not None:
res += ", QD = " + str(self['qd'])
if self['thcount'] not in (1, None):
- res += ", threads={0[thcount]}".format(self)
+ res += f", threads={self['thcount']}"
if self['write_perc'] is not None:
- res += ", write_perc={0[write_perc]}%".format(self)
+ res += f", fwrite_perc={self['write_perc']}%"
return res
def copy(self, **kwargs: Dict[str, Any]) -> 'FioJobParams':
@@ -89,24 +86,24 @@
def __init__(self, name: str, idx: int) -> None:
JobConfig.__init__(self, idx)
self.name = name
- self._sync_mode = None # type: Optional[str]
- self._params = None # type: Optional[Dict[str, Any]]
+ self._sync_mode: Optional[str] = None
+ self._params: Optional[Dict[str, Any]] = None
# ------------- BASIC PROPERTIES -----------------------------------------------------------------------------------
@property
def write_perc(self) -> Optional[int]:
try:
- return int(self.vals["rwmixwrite"])
+ return int(self.vals["rwmixwrite"]) # type: ignore
except (KeyError, TypeError):
try:
- return 100 - int(self.vals["rwmixread"])
+ return 100 - int(self.vals["rwmixread"]) # type: ignore
except (KeyError, TypeError):
return None
@property
def qd(self) -> int:
- return int(self.vals.get('iodepth', '1'))
+ return int(self.vals.get('iodepth', '1')) # type: ignore
@property
def bsize(self) -> int:
@@ -117,7 +114,7 @@
@property
def oper(self) -> str:
vl = self.vals['rw']
- return vl if ':' not in vl else vl.split(":")[0]
+ return vl if ':' not in vl else vl.split(":")[0] # type: ignore
@property
def op_type_short(self) -> str:
@@ -125,14 +122,14 @@
@property
def thcount(self) -> int:
- return int(self.vals.get('numjobs', 1))
+ return int(self.vals.get('numjobs', 1)) # type: ignore
@property
def sync_mode(self) -> str:
if self._sync_mode is None:
- direct = is_fio_opt_true(self.vals.get('direct', '0')) or \
- not is_fio_opt_true(self.vals.get('buffered', '0'))
- sync = is_fio_opt_true(self.vals.get('sync', '0'))
+ direct = is_fio_opt_true(self.vals.get('direct', '0')) # type: ignore
+ direct = direct or not is_fio_opt_true(self.vals.get('buffered', '0')) # type: ignore
+ sync = is_fio_opt_true(self.vals.get('sync', '0')) # type: ignore
self._sync_mode = self.ds2mode[(sync, direct)]
return cast(str, self._sync_mode)
diff --git a/wally/suits/io/fio_task_parser.py b/wally/suits/io/fio_task_parser.py
index 5b91885..a9e13dc 100644
--- a/wally/suits/io/fio_task_parser.py
+++ b/wally/suits/io/fio_task_parser.py
@@ -1,9 +1,8 @@
#!/usr/bin/env python3
import re
-import os
import sys
-import os.path
+import pathlib
import argparse
import itertools
from typing import Optional, Iterator, Union, Dict, Iterable, List, Tuple, NamedTuple, Any
@@ -104,34 +103,30 @@
def fio_config_parse(lexer_iter: Iterable[CfgLine]) -> Iterator[FioJobConfig]:
in_globals = False
curr_section = None
- glob_vals = OrderedDict() # type: Dict[str, Any]
+ glob_vals: Dict[str, Any] = OrderedDict()
sections_count = 0
- lexed_lines = list(lexer_iter) # type: List[CfgLine]
+ lexed_lines: List[CfgLine] = list(lexer_iter)
one_more = True
- includes = {}
+ includes: Dict[str, Tuple[str, int]] = {}
while one_more:
- new_lines = [] # type: List[CfgLine]
+ new_lines: List[CfgLine] = []
one_more = False
for line in lexed_lines:
fname, lineno, oline, tp, name, val = line
if INCLUDE == tp:
- if not os.path.exists(fname):
- dirname = '.'
- else:
- dirname = os.path.dirname(fname)
-
- new_fname = os.path.join(dirname, name)
- includes[new_fname] = (fname, lineno)
+ fobj = pathlib.Path(fname)
+ new_fname = (fobj.parent / name) if fobj.exists() else pathlib.Path(name)
+ includes[str(new_fname)] = (fname, lineno)
try:
- cont = open(new_fname).read()
+ cont = new_fname.open().read()
except IOError as err:
- raise ParseError("Error while including file {}: {}".format(new_fname, err), fname, lineno, oline)
+ raise ParseError(f"Error while including file {new_fname}: {err}", fname, lineno, oline)
- new_lines.extend(fio_config_lexer(cont, new_fname))
+ new_lines.extend(fio_config_lexer(cont, str(new_fname)))
one_more = True
else:
new_lines.append(line)
@@ -161,7 +156,7 @@
if in_globals:
glob_vals[name] = val
elif name == name.upper():
- raise ParseError("Param {!r} not in [global] section".format(name), fname, lineno, oline)
+ raise ParseError(f"Param {name!r} not in [global] section", fname, lineno, oline)
elif curr_section is None:
raise ParseError("Data outside section", fname, lineno, oline)
else:
@@ -172,7 +167,7 @@
def process_cycles(sec: FioJobConfig) -> Iterator[FioJobConfig]:
- cycles = OrderedDict() # type: Dict[str, Any]
+ cycles: Dict[str, Any] = OrderedDict()
for name, val in sec.vals.items():
if isinstance(val, list) and name.upper() != name:
@@ -203,12 +198,12 @@
yield new_sec
-FioParamsVal = Union[str, Var]
+FioParamsVal = Union[str, Var, int]
FioParams = Dict[str, FioParamsVal]
def apply_params(sec: FioJobConfig, params: FioParams) -> FioJobConfig:
- processed_vals = OrderedDict() # type: Dict[str, Any]
+ processed_vals: Dict[str, Any] = OrderedDict()
processed_vals.update(params)
for name, val in sec.vals.items():
@@ -251,8 +246,7 @@
sec.vals['unified_rw_reporting'] = '1'
if isinstance(sec.vals['size'], Var):
- raise ValueError("Variable {0} isn't provided".format(
- sec.vals['size'].name))
+ raise ValueError(f"Variable {sec.vals['size'].name} isn't provided")
sz = ssize2b(sec.vals['size'])
offset = sz * ((MAGIC_OFFSET * counter[0]) % 1.0)
@@ -266,7 +260,7 @@
for vl in sec.vals.values():
if isinstance(vl, Var):
- raise ValueError("Variable {0} isn't provided".format(vl.name))
+ raise ValueError(f"Variable {vl.name} isn't provided")
params = sec.vals.copy()
params['UNIQ'] = 'UN{0}'.format(counter[0])
@@ -282,13 +276,11 @@
return sec.vals.get('ramp_time', 0) + sec.vals.get('runtime', 0)
-def parse_all_in_1(source:str, fname: str = None) -> Iterator[FioJobConfig]:
+def parse_all_in_1(source:str, fname: str) -> Iterator[FioJobConfig]:
return fio_config_parse(fio_config_lexer(source, fname))
def get_log_files(sec: FioJobConfig, iops: bool = False) -> Iterator[Tuple[str, str, str]]:
- res = [] # type: List[Tuple[str, str, str]]
-
keys = [('write_bw_log', 'bw', 'KiBps'),
('write_hist_log', 'lat', 'us')]
if iops:
@@ -304,8 +296,8 @@
test_params = test_params.copy()
if 'RAMPTIME' not in test_params and 'RUNTIME' in test_params:
- ramp = int(int(test_params['RUNTIME']) * 0.05)
- test_params['RAMPTIME'] = min(30, max(5, ramp))
+ ramp = int(int(test_params['RUNTIME']) * 0.05) # type: ignore
+ test_params['RAMPTIME'] = min(30, max(5, ramp)) # type: ignore
it = parse_all_in_1(source, fname)
it = (apply_params(sec, test_params) for sec in it)
diff --git a/wally/suits/io/rrd_raw.cfg b/wally/suits/io/rrd_raw.cfg
index 2b0fc74..e7ca82d 100644
--- a/wally/suits/io/rrd_raw.cfg
+++ b/wally/suits/io/rrd_raw.cfg
@@ -1,6 +1,6 @@
[test]
-blocksize=4k
-rw=randread
+blocksize=4m
+rw=write
iodepth=1
ramp_time=0
runtime=120
@@ -17,5 +17,8 @@
wait_for_previous=1
per_job_logs=0
randrepeat=0
-filename=/dev/sdb
-size=100G
\ No newline at end of file
+filename=/media/data/test.db
+size=50G
+;verify_pattern=0x00
+buffer_compress_percentage=99
+write_bw_log=/tmp/bw.non-compress.log