fixing fio runner
diff --git a/wally/suits/io/ceph.cfg b/wally/suits/io/ceph.cfg
index 9287030..a44c749 100644
--- a/wally/suits/io/ceph.cfg
+++ b/wally/suits/io/ceph.cfg
@@ -1,9 +1,10 @@
[global]
-include defaults.cfg
+include defaults_qd.cfg
-NUMJOBS_R={% 1, 5, 10, 15, 25, 40, 80, 120 %}
-NUMJOBS_W={% 1, 5, 10, 15, 25, 40 %}
-NUMJOBS_SEQ_OPS={% 1, 3, 10 %}
+QD_R={% 1, 5, 10, 15, 25, 40, 80, 120 %}
+QD_W={% 1, 5, 10, 15, 25, 40 %}
+QD_SEQ_R={% 1, 3, 10 %}
+QD_SEQ_W={% 1, 2, 4 %}
ramp_time=30
runtime=180
@@ -14,8 +15,7 @@
[ceph_{TEST_SUMM}]
blocksize=4k
rw=randwrite
-sync=1
-numjobs={NUMJOBS_W}
+iodepth={QD_W}
# ---------------------------------------------------------------------
# check different thread count, direct read mode. (latency, iops) = func(th_count)
@@ -25,24 +25,34 @@
blocksize=4k
rw=randread
direct=1
-numjobs={NUMJOBS_R}
+iodepth={QD_R}
# ---------------------------------------------------------------------
-# direct write
+# sync write
# ---------------------------------------------------------------------
[ceph_{TEST_SUMM}]
blocksize=4k
rw=randwrite
direct=1
+sync=1
numjobs=1
# ---------------------------------------------------------------------
-# this is essentially sequential write/read operations
+# this is essentially sequential write operations
# we can't use sequential with numjobs > 1 due to caching and block merging
# ---------------------------------------------------------------------
[ceph_{TEST_SUMM}]
blocksize=16m
-rw={% randread, randwrite %}
+rw=randwrite
direct=1
-numjobs={NUMJOBS_SEQ_OPS}
+iodepth={QD_SEQ_W}
+# ---------------------------------------------------------------------
+# this is essentially sequential read operations
+# we can't use sequential with numjobs > 1 due to caching and block merging
+# ---------------------------------------------------------------------
+[ceph_{TEST_SUMM}]
+blocksize=16m
+rw=randread
+direct=1
+iodepth={QD_SEQ_R}
diff --git a/wally/suits/io/defaults_qd.cfg b/wally/suits/io/defaults_qd.cfg
index 873e6b7..0418e8a 100644
--- a/wally/suits/io/defaults_qd.cfg
+++ b/wally/suits/io/defaults_qd.cfg
@@ -1,5 +1,6 @@
buffered=0
direct=1
+sync=0
ioengine=libaio
group_reporting=1
@@ -9,16 +10,17 @@
thread=1
time_based=1
wait_for_previous=1
+per_job_logs=0
# this is critical for correct results in multy-node run
randrepeat=0
filename={FILENAME}
-size={TEST_FILE_SIZE}
-iodepth={QD}
+size={FILESIZE}
write_iops_log=fio_iops_log
+write_bw_log=fio_ibw_log
log_avg_msec=1000
-write_hist_log=fio_log_h
+write_hist_log=fio_lat_hist_log
log_hist_msec=1000
log_unix_epoch=1
diff --git a/wally/suits/io/fio.py b/wally/suits/io/fio.py
index 1b5f38e..e055d98 100644
--- a/wally/suits/io/fio.py
+++ b/wally/suits/io/fio.py
@@ -1,14 +1,14 @@
import os.path
import logging
-from typing import Dict, List, Union, cast
+from typing import cast
import wally
-from ...utils import ssize2b, StopTestError, get_os
+from ...utils import StopTestError, get_os, ssize2b
from ...node_interfaces import IRPCNode
from ..itest import ThreadedTest, IterationConfig, RunTestRes
-from .fio_task_parser import execution_time, fio_cfg_compile, FioJobSection, FioParams
-
+from .fio_task_parser import execution_time, fio_cfg_compile, FioJobSection, FioParams, get_log_files
+from . import rpc_plugin
logger = logging.getLogger("wally")
@@ -23,82 +23,97 @@
get = self.config.params.get
+ self.remote_task_file = self.join_remote("task.fio")
+ self.remote_output_file = self.join_remote("fio_result.json")
+ self.use_system_fio = get('use_system_fio', False) # type: bool
+ self.use_sudo = get("use_sudo", True) # type: bool
+ self.force_prefill = get('force_prefill', False) # type: bool
+
self.load_profile_name = self.config.params['load'] # type: str
self.name = "io." + self.load_profile_name
if os.path.isfile(self.load_profile_name):
- self.load_profile_path = os.path.join(self.configs_dir, self.load_profile_name+ '.cfg') # type: str
+ self.load_profile_path = self.load_profile_name # type: str
else:
- self.load_profile_path = self.load_profile_name
+ self.load_profile_path = os.path.join(self.configs_dir, self.load_profile_name+ '.cfg')
self.load_profile = open(self.load_profile_path, 'rt').read() # type: str
- self.use_system_fio = get('use_system_fio', False) # type: bool
-
if self.use_system_fio:
self.fio_path = "fio" # type: str
else:
self.fio_path = os.path.join(self.config.remote_dir, "fio")
- self.force_prefill = get('force_prefill', False) # type: bool
+ self.load_params = self.config.params['params']
+ self.file_name = self.load_params['FILENAME']
- if 'FILESIZE' not in self.config.params:
- raise NotImplementedError("File size detection is not implemented")
+ if 'FILESIZE' not in self.load_params:
+ logger.debug("Getting test file sizes on all nodes")
+ try:
+ sizes = {node.conn.fs.file_stat(self.file_name)['size']
+ for node in self.config.nodes}
+ except Exception:
+ logger.exception("FILESIZE is not set in config file and fail to detect it." +
+ "Set FILESIZE or fix error and rerun test")
+ raise StopTestError()
- # self.max_latency = get("max_lat") # type: Optional[int]
- # self.min_bw_per_thread = get("min_bw") # type: Optional[int]
+ if len(sizes) != 1:
+ logger.error("IO target file %r has different sizes on test nodes - %r",
+ self.file_name, sizes)
+ raise StopTestError()
- self.use_sudo = get("use_sudo", True) # type: bool
+ self.file_size = list(sizes)[0]
+ logger.info("Detected test file size is %s", self.file_size)
+ self.load_params['FILESIZE'] = self.file_size
+ else:
+ self.file_size = ssize2b(self.load_params['FILESIZE'])
- self.fio_configs = list(fio_cfg_compile(self.load_profile,
- self.load_profile_path,
- cast(FioParams, self.config.params)))
+ self.fio_configs = list(fio_cfg_compile(self.load_profile, self.load_profile_path,
+ cast(FioParams, self.load_params)))
if len(self.fio_configs) == 0:
- logger.exception("Empty fio config provided")
- raise StopTestError("Empty fio config provided")
+ logger.error("Empty fio config provided")
+ raise StopTestError()
self.iterations_configs = self.fio_configs # type: ignore
- self.files_sizes = self.get_file_sizes()
-
self.exec_folder = self.config.remote_dir
- self.fio_path = "" if self.use_system_fio else self.exec_folder
-
- def get_file_sizes(self) -> Dict[str, int]:
- files_sizes = {} # type: Dict[str, int]
-
- for section in self.fio_configs:
- sz = ssize2b(section.vals['size'])
- msz = sz // (1024 ** 2) + (1 if sz % (1024 ** 2) != 0 else 0)
- fname = section.vals['filename'] # type: str
-
- # if already has other test with the same file name
- # take largest size
- files_sizes[fname] = max(files_sizes.get(fname, 0), msz)
-
- return files_sizes
def config_node(self, node: IRPCNode) -> None:
+ plugin_code = open(rpc_plugin.__file__.rsplit(".", 1)[0] + ".py", "rb").read()
+ node.upload_plugin(code=plugin_code, name="fio")
+
try:
- node.conn.rmdir(self.config.remote_dir, recursive=True, ignore_missing=True)
- node.conn.mkdir(self.config.remote_dir)
- except Exception as exc:
- msg = "Failed to create folder {} on remote {}.".format(self.config.remote_dir, node)
+ node.conn.fs.rmtree(self.config.remote_dir)
+ except Exception:
+ pass
+
+ try:
+ node.conn.fs.makedirs(self.config.remote_dir)
+ except Exception:
+ msg = "Failed to recreate folder {} on remote {}.".format(self.config.remote_dir, node)
logger.exception(msg)
- raise StopTestError(msg) from exc
+ raise StopTestError()
self.install_utils(node)
- logger.info("Prefilling test files with random data")
- fill_bw = node.conn.prefill_test_files(self.files_sizes, force=self.force_prefill, fio_path=self.fio_path)
+
+ mb = int(self.file_size / 1024 ** 2)
+ logger.info("Filling test file %s with %sMiB of random data", self.file_name, mb)
+ fill_bw = node.conn.fio.fill_file(self.file_name, mb, force=self.force_prefill, fio_path=self.fio_path)
if fill_bw is not None:
- logger.info("Initial fio fill bw is {} MiBps for {}".format(fill_bw, node.info.node_id()))
+ logger.info("Initial fio fill bw is {} MiBps for {}".format(fill_bw, node))
+
+ fio_config = "\n".join(map(str, self.iterations_configs))
+ node.put_to_file(self.remote_task_file, fio_config.encode("utf8"))
def install_utils(self, node: IRPCNode) -> None:
+ os_info = get_os(node)
if self.use_system_fio:
- node.conn.install('fio', binary='fio')
-
- if not self.use_system_fio:
- os_info = get_os(node)
+ if os_info.distro != 'ubuntu':
+ logger.error("Only ubuntu supported on test VM")
+ raise StopTestError()
+ node.conn.fio.install('fio', binary='fio')
+ else:
+ node.conn.fio.install('bzip2', binary='bzip2')
fio_dir = os.path.dirname(os.path.dirname(wally.__file__)) # type: str
fio_dir = os.path.join(os.getcwd(), fio_dir)
fio_dir = os.path.join(fio_dir, 'fio_binaries')
@@ -106,23 +121,32 @@
fio_path = os.path.join(fio_dir, fname) # type: str
if not os.path.exists(fio_path):
- raise RuntimeError("No prebuild fio binary available for {0}".format(os_info))
+ logger.error("No prebuild fio binary available for {0}".format(os_info))
+ raise StopTestError()
bz_dest = self.join_remote('fio.bz2') # type: str
node.copy_file(fio_path, bz_dest)
- node.run("bzip2 --decompress {}" + bz_dest)
- node.run("chmod a+x " + self.join_remote("fio"))
+ node.run("bzip2 --decompress {} ; chmod a+x {}".format(bz_dest, self.join_remote("fio")))
def get_expected_runtime(self, iteration_info: IterationConfig) -> int:
return execution_time(cast(FioJobSection, iteration_info))
def do_test(self, node: IRPCNode, iter_config: IterationConfig) -> RunTestRes:
exec_time = execution_time(cast(FioJobSection, iter_config))
- raw_res = node.conn.fio.run_fio(self.fio_path,
- self.exec_folder,
- str(cast(FioJobSection, iter_config)),
- exec_time + max(300, exec_time))
+ fio_cmd_templ = "cd {exec_folder}; " + \
+ "{fio_path} --output-format=json --output={out_file} --alloc-size=262144 {job_file}"
+
+ bw_log, iops_log, lat_hist_log = get_log_files(iter_config)
+
+ cmd = fio_cmd_templ.format(exec_folder=self.exec_folder,
+ fio_path=self.fio_path,
+ out_file=self.remote_output_file,
+ job_file=self.remote_task_file)
+ raw_res = node.run(cmd, timeout=exec_time + max(300, exec_time))
+
+ return
+
# TODO(koder): fix next error
- raise NotImplementedError("Need to extract time from test result")
- return raw_res, (0, 0)
+ # raise NotImplementedError("Need to extract time from test result")
+ # return raw_res, (0, 0)
diff --git a/wally/suits/io/fio_task_parser.py b/wally/suits/io/fio_task_parser.py
index 1bdbb15..aaf4b36 100644
--- a/wally/suits/io/fio_task_parser.py
+++ b/wally/suits/io/fio_task_parser.py
@@ -291,7 +291,7 @@
MAGIC_OFFSET = 0.1885
-def finall_process(sec: FioJobSection, counter: List[int] = [0]) -> FioJobSection:
+def final_process(sec: FioJobSection, counter: List[int] = [0]) -> FioJobSection:
sec = sec.copy()
sec.vals['unified_rw_reporting'] = '1'
@@ -362,7 +362,7 @@
return TestSumm(rw,
sync_mode,
vals['blocksize'],
- vals['iodepth'],
+ vals.get('iodepth', '1'),
vm_count)
@@ -398,11 +398,15 @@
yield res
+def get_log_files(sec: FioJobSection) -> Tuple[Optional[str], Optional[str], Optional[str]]:
+ return sec.vals.get('write_iops_log'), sec.vals.get('write_bw_log'), sec.vals.get('write_hist_log')
+
+
def fio_cfg_compile(source: str, fname: str, test_params: FioParams) -> Iterator[FioJobSection]:
it = parse_all_in_1(source, fname)
it = (apply_params(sec, test_params) for sec in it)
it = flatmap(process_cycles, it)
- return map(finall_process, it)
+ return map(final_process, it)
def parse_args(argv):
diff --git a/wally/suits/io/rpc_plugin.py b/wally/suits/io/rpc_plugin.py
index 8e2e09f..306af28 100644
--- a/wally/suits/io/rpc_plugin.py
+++ b/wally/suits/io/rpc_plugin.py
@@ -2,29 +2,19 @@
import time
import stat
import random
+import logging
import subprocess
-def rpc_run_fio(cfg):
- fio_cmd_templ = "cd {exec_folder}; {fio_path}fio --output-format=json " + \
- "--output={out_file} --alloc-size=262144 {job_file}"
+mod_name = "fio"
+__version__ = (0, 1)
- result = {
- "name": [float],
- "lat_name": [[float]]
- }
- return result
- # fnames_before = node.run("ls -1 " + exec_folder, nolog=True)
- #
- # timeout = int(exec_time + max(300, exec_time))
- # soft_end_time = time.time() + exec_time
- # logger.error("Fio timeouted on node {}. Killing it".format(node))
- # end = time.time()
- # fnames_after = node.run("ls -1 " + exec_folder, nolog=True)
- #
+logger = logging.getLogger("agent.fio")
+SensorsMap = {}
-def rpc_check_file_prefilled(path, used_size_mb):
+
+def check_file_prefilled(path, used_size_mb):
used_size = used_size_mb * 1024 ** 2
blocks_to_check = 16
@@ -48,42 +38,24 @@
return False
-def rpc_prefill_test_files(files, force=False, fio_path='fio'):
- cmd_templ = "{0} --name=xxx --filename={1} --direct=1" + \
- " --bs=4m --size={2}m --rw=write"
+def rpc_fill_file(fname, size, force=False, fio_path='fio'):
+ if not force:
+ if not check_file_prefilled(fname, size):
+ return
- ssize = 0
- ddtime = 0.0
+ assert size % 4 == 0, "File size must be proportional to 4M"
- for fname, curr_sz in files.items():
- if not force:
- if not rpc_check_file_prefilled(fname, curr_sz):
- continue
+ cmd_templ = "{} --name=xxx --filename={} --direct=1 --bs=4m --size={}m --rw=write"
- cmd = cmd_templ.format(fio_path, fname, curr_sz)
- ssize += curr_sz
+ run_time = time.time()
+ subprocess.check_output(cmd_templ.format(fio_path, fname, size), shell=True)
+ run_time = time.time() - run_time
- stime = time.time()
- subprocess.check_call(cmd)
- ddtime += time.time() - stime
-
- if ddtime > 1.0:
- return int(ssize / ddtime)
-
- return None
+ return None if run_time < 1.0 else int(size / run_time)
-def load_fio_log_file(fname):
- with open(fname) as fd:
- it = [ln.split(',')[:2] for ln in fd]
-
- return [(float(off) / 1000, # convert us to ms
- float(val.strip()) + 0.5) # add 0.5 to compemsate average value
- # as fio trimm all values in log to integer
- for off, val in it]
-
-
-
-
-
-
+def rpc_install(name, binary):
+ try:
+ subprocess.check_output("which {}".format(binary), shell=True)
+ except:
+ subprocess.check_output("apt-get install -y {}".format(name), shell=True)
diff --git a/wally/suits/io/rrd.cfg b/wally/suits/io/rrd.cfg
index 86de738..1075aea 100644
--- a/wally/suits/io/rrd.cfg
+++ b/wally/suits/io/rrd.cfg
@@ -1,6 +1,5 @@
[global]
-include defaults.cfg
-NUMJOBS=8
+include defaults_qd.cfg
ramp_time=5
runtime=5
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index 8636596..f328e13 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -12,8 +12,6 @@
from ..storage import Storage
from ..result_classes import RawTestResults
-import agent
-
logger = logging.getLogger("wally")
@@ -70,7 +68,7 @@
return os.path.join(self.config.remote_dir, path)
@abc.abstractmethod
- def run(self, storage: Storage) -> None:
+ def run(self) -> None:
pass
@abc.abstractmethod
@@ -98,9 +96,15 @@
pass
def get_not_done_stages(self, storage: Storage) -> Dict[int, IterationConfig]:
- start_run_id = max(int(name) for _, name in storage.list('result')) + 1
+ done_stages = list(storage.list('result'))
+ if len(done_stages) == 0:
+ start_run_id = 0
+ else:
+ start_run_id = max(int(name) for _, name in done_stages) + 1
+
not_in_storage = {} # type: Dict[int, IterationConfig]
- for run_id, iteration_config in enumerate(self.iterations_configs, start_run_id):
+
+ for run_id, iteration_config in enumerate(self.iterations_configs[start_run_id:], start_run_id):
info_path = "result/{}/info".format(run_id)
if info_path in storage:
info = cast(Dict[str, Any], storage[info_path]) # type: Dict[str, Any]
@@ -131,8 +135,8 @@
not_in_storage[run_id] = iteration_config
return not_in_storage
- def run(self, storage: Storage) -> None:
- not_in_storage = self.get_not_done_stages(storage)
+ def run(self) -> None:
+ not_in_storage = self.get_not_done_stages(self.config.storage)
if not not_in_storage:
logger.info("All test iteration in storage already. Skip test")
@@ -171,9 +175,6 @@
if self.max_retry - 1 == idx:
raise StopTestError("Fio failed") from exc
logger.exception("During fio run")
- else:
- if all(results):
- break
logger.info("Sleeping %ss and retrying", self.retry_time)
time.sleep(self.retry_time)
@@ -181,7 +182,7 @@
start_times = [] # type: List[int]
stop_times = [] # type: List[int]
- mstorage = storage.sub_storage("result", str(run_id), "measurement")
+ mstorage = self.config.storage.sub_storage("result", str(run_id), "measurement")
for (result, (t_start, t_stop)), node in zip(results, self.config.nodes):
for metrics_name, data in result.items():
mstorage[node.info.node_id(), metrics_name] = data # type: ignore
@@ -214,7 +215,7 @@
'end_time': max_stop_time
}
- storage["result", str(run_id), "info"] = test_config # type: ignore
+ self.config.storage["result", str(run_id), "info"] = test_config # type: ignore
@abc.abstractmethod
def config_node(self, node: IRPCNode) -> None: