blob: 0f4ebdece4e6f4f11574622abec303f089a8dc5d [file] [log] [blame]
import os.path
import logging
from typing import Dict, List, Union, cast
import wally
from ...utils import ssize2b, StopTestError, get_os
from ...node_interfaces import IRPCNode
from ..itest import ThreadedTest, IterationConfig, RunTestRes
from .fio_task_parser import execution_time, fio_cfg_compile, FioJobSection, FioParams
logger = logging.getLogger("wally")
class IOPerfTest(ThreadedTest):
soft_runcycle = 5 * 60
retry_time = 30
configs_dir = os.path.dirname(__file__) # type: str
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
get = self.config.params.get
self.load_profile_name = self.config.params['load'] # type: str
self.name = "io." + self.load_profile_name
if os.path.isfile(self.load_profile_name):
self.load_profile_path = os.path.join(self.configs_dir, self.load_profile_name+ '.cfg') # type: str
else:
self.load_profile_path = self.load_profile_name
self.load_profile = open(self.load_profile_path, 'rt').read() # type: str
self.use_system_fio = get('use_system_fio', False) # type: bool
if self.use_system_fio:
self.fio_path = "fio" # type: str
else:
self.fio_path = os.path.join(self.config.remote_dir, "fio")
self.force_prefill = get('force_prefill', False) # type: bool
if 'FILESIZE' not in self.config.params:
raise NotImplementedError("File size detection is not implemented")
# self.max_latency = get("max_lat") # type: Optional[int]
# self.min_bw_per_thread = get("min_bw") # type: Optional[int]
self.use_sudo = get("use_sudo", True) # type: bool
self.fio_configs = list(fio_cfg_compile(self.load_profile,
self.load_profile_path,
cast(FioParams, self.config.params)))
if len(self.fio_configs) == 0:
logger.exception("Empty fio config provided")
raise StopTestError("Empty fio config provided")
self.iterations_configs = self.fio_configs # type: ignore
self.files_sizes = self.get_file_sizes()
self.exec_folder = self.config.remote_dir
self.fio_path = "" if self.use_system_fio else self.exec_folder
def get_file_sizes(self) -> Dict[str, int]:
files_sizes = {} # type: Dict[str, int]
for section in self.fio_configs:
sz = ssize2b(section.vals['size'])
msz = sz // (1024 ** 2) + (1 if sz % (1024 ** 2) != 0 else 0)
fname = section.vals['filename'] # type: str
# if already has other test with the same file name
# take largest size
files_sizes[fname] = max(files_sizes.get(fname, 0), msz)
return files_sizes
def config_node(self, node: IRPCNode) -> None:
try:
node.conn.rmdir(self.config.remote_dir, recursive=True, ignore_missing=True)
node.conn.mkdir(self.config.remote_dir)
except Exception as exc:
msg = "Failed to create folder {} on remote {}.".format(self.config.remote_dir, node, exc)
logger.exception(msg)
raise StopTestError(msg) from exc
self.install_utils(node)
logger.info("Prefilling test files with random data")
fill_bw = node.conn.prefill_test_files(self.files_sizes, force=self.force_prefill, fio_path=self.fio_path)
if fill_bw is not None:
logger.info("Initial fio fill bw is {} MiBps for {}".format(fill_bw, node.info.node_id()))
def install_utils(self, node: IRPCNode) -> None:
if self.use_system_fio:
node.conn.install('fio', binary='fio')
if not self.use_system_fio:
os_info = get_os(node)
fio_dir = os.path.dirname(os.path.dirname(wally.__file__)) # type: str
fio_dir = os.path.join(os.getcwd(), fio_dir)
fio_dir = os.path.join(fio_dir, 'fio_binaries')
fname = 'fio_{0.release}_{0.arch}.bz2'.format(os_info)
fio_path = os.path.join(fio_dir, fname) # type: str
if not os.path.exists(fio_path):
raise RuntimeError("No prebuild fio binary available for {0}".format(os_info))
bz_dest = self.join_remote('fio.bz2') # type: str
node.copy_file(fio_path, bz_dest)
node.run("bzip2 --decompress {}" + bz_dest)
node.run("chmod a+x " + self.join_remote("fio"))
def get_expected_runtime(self, iteration_info: IterationConfig) -> int:
return execution_time(cast(FioJobSection, iteration_info))
def do_test(self, node: IRPCNode, iter_config: IterationConfig) -> RunTestRes:
exec_time = execution_time(cast(FioJobSection, iter_config))
raw_res = node.conn.fio.run_fio(self.fio_path,
self.exec_folder,
str(cast(FioJobSection, iter_config)),
exec_time + max(300, exec_time))
# TODO(koder): fix next error
raise NotImplementedError("Need to extract time from test result")
return raw_res, (0, 0)