2.0 refactoring:
* Add type for most of functions
* Remove old fio run code, move to RPC/pluggable
* Remove most of sensors code, will move then to RPC
* Other refactoring
diff --git a/wally/suits/io/defaults.cfg b/wally/suits/io/defaults.cfg
new file mode 100644
index 0000000..8c8644b
--- /dev/null
+++ b/wally/suits/io/defaults.cfg
@@ -0,0 +1,24 @@
+buffered=0
+group_reporting=1
+iodepth=1
+unified_rw_reporting=1
+
+norandommap=1
+
+thread=1
+time_based=1
+wait_for_previous=1
+
+# this is critical for correct results in multy-node run
+randrepeat=0
+
+filename={FILENAME}
+
+size={TEST_FILE_SIZE}
+
+write_lat_log=fio_log
+write_iops_log=fio_log
+write_bw_log=fio_log
+log_avg_msec=500
+
+
diff --git a/wally/suits/io/fio.py b/wally/suits/io/fio.py
index 7b77461..50bb1fd 100644
--- a/wally/suits/io/fio.py
+++ b/wally/suits/io/fio.py
@@ -3,51 +3,45 @@
import json
import stat
import random
-import shutil
+import hashlib
import os.path
import logging
import datetime
import functools
-import subprocess
import collections
+from typing import Dict, List, Callable, Any, Tuple, Optional
import yaml
-import paramiko
import texttable
from paramiko.ssh_exception import SSHException
from concurrent.futures import ThreadPoolExecutor, wait
import wally
-from wally.pretty_yaml import dumps
-from wally.statistic import round_3_digit, data_property, average
-from wally.utils import ssize2b, sec_to_str, StopTestError, Barrier, get_os
-from wally.ssh_utils import (save_to_remote, read_from_remote, BGSSHTask, reconnect)
+
+from ...pretty_yaml import dumps
+from ...statistic import round_3_digit, data_property, average
+from ...utils import ssize2b, sec_to_str, StopTestError, Barrier, get_os
+from ...inode import INode
+
+from ..itest import (TimeSeriesValue, PerfTest, TestResults, TestConfig)
from .fio_task_parser import (execution_time, fio_cfg_compile,
get_test_summary, get_test_summary_tuple,
get_test_sync_mode, FioJobSection)
+from .rpc_plugin import parse_fio_result
-from ..itest import (TimeSeriesValue, PerfTest, TestResults,
- run_on_node, TestConfig, MeasurementMatrix)
logger = logging.getLogger("wally")
-# Results folder structure
-# results/
-# {loadtype}_{num}/
-# config.yaml
-# ......
-
-
-class NoData(object):
+class NoData:
pass
-def cached_prop(func):
+def cached_prop(func: Callable[..., Any]) -> Callable[..., Any]:
@property
@functools.wraps(func)
- def closure(self):
+ def closure(self) -> Any:
val = getattr(self, "_" + func.__name__)
if val is NoData:
val = func(self)
@@ -56,7 +50,7 @@
return closure
-def load_fio_log_file(fname):
+def load_fio_log_file(fname: str) -> TimeSeriesValue:
with open(fname) as fd:
it = [ln.split(',')[:2] for ln in fd]
@@ -72,7 +66,7 @@
WRITE_IOPS_DISCSTAT_POS = 7
-def load_sys_log_file(ftype, fname):
+def load_sys_log_file(ftype: str, fname: str) -> TimeSeriesValue:
assert ftype == 'iops'
pval = None
with open(fname) as fd:
@@ -89,7 +83,7 @@
return TimeSeriesValue(vals)
-def load_test_results(folder, run_num):
+def load_test_results(folder: str, run_num: int) -> 'FioRunResult':
res = {}
params = None
@@ -97,7 +91,7 @@
params = yaml.load(open(fn).read())
conn_ids_set = set()
- rr = r"{0}_(?P<conn_id>.*?)_(?P<type>[^_.]*)\.\d+\.log$".format(run_num)
+ rr = r"{}_(?P<conn_id>.*?)_(?P<type>[^_.]*)\.\d+\.log$".format(run_num)
for fname in os.listdir(folder):
rm = re.match(rr, fname)
if rm is None:
@@ -115,7 +109,7 @@
conn_ids_set.add(conn_id)
- rr = r"{0}_(?P<conn_id>.*?)_(?P<type>[^_.]*)\.sys\.log$".format(run_num)
+ rr = r"{}_(?P<conn_id>.*?)_(?P<type>[^_.]*)\.sys\.log$".format(run_num)
for fname in os.listdir(folder):
rm = re.match(rr, fname)
if rm is None:
@@ -159,8 +153,8 @@
return FioRunResult(config, fio_task, mm_res, raw_res, params['intervals'], run_num)
-class Attrmapper(object):
- def __init__(self, dct):
+class Attrmapper:
+ def __init__(self, dct: Dict[str, Any]):
self.__dct = dct
def __getattr__(self, name):
@@ -170,8 +164,8 @@
raise AttributeError(name)
-class DiskPerfInfo(object):
- def __init__(self, name, summary, params, testnodes_count):
+class DiskPerfInfo:
+ def __init__(self, name: str, summary: str, params: Dict[str, Any], testnodes_count: int):
self.name = name
self.bw = None
self.iops = None
@@ -193,7 +187,7 @@
self.concurence = self.params['vals'].get('numjobs', 1)
-def get_lat_perc_50_95(lat_mks):
+def get_lat_perc_50_95(lat_mks: List[float]) -> Tuple[float, float]:
curr_perc = 0
perc_50 = None
perc_95 = None
@@ -224,8 +218,8 @@
return perc_50 / 1000., perc_95 / 1000.
-class IOTestResults(object):
- def __init__(self, suite_name, fio_results, log_directory):
+class IOTestResults:
+ def __init__(self, suite_name: str, fio_results: 'FioRunResult', log_directory: str):
self.suite_name = suite_name
self.fio_results = fio_results
self.log_directory = log_directory
@@ -236,7 +230,7 @@
def __len__(self):
return len(self.fio_results)
- def get_yamable(self):
+ def get_yamable(self) -> Dict[str, List[str]]:
items = [(fio_res.summary(), fio_res.idx) for fio_res in self]
return {self.suite_name: [self.log_directory] + items}
@@ -424,6 +418,10 @@
soft_runcycle = 5 * 60
retry_time = 30
+ zero_md5_hash = hashlib.md5()
+ zero_md5_hash.update(b"\x00" * 1024)
+ zero_md5 = zero_md5_hash.hexdigest()
+
def __init__(self, config):
PerfTest.__init__(self, config)
@@ -464,7 +462,7 @@
self.fio_configs = None
@classmethod
- def load(cls, suite_name, folder):
+ def load(cls, suite_name: str, folder: str) -> IOTestResults:
res = []
for fname in os.listdir(folder):
if re.match("\d+_params.yaml$", fname):
@@ -478,11 +476,9 @@
pass
# size is megabytes
- def check_prefill_required(self, rossh, fname, size, num_blocks=16):
+ def check_prefill_required(self, node: INode, fname: str, size: int, num_blocks: Optional[int]=16) -> bool:
try:
- with rossh.connection.open_sftp() as sftp:
- fstats = sftp.stat(fname)
-
+ fstats = node.stat_file(fname)
if stat.S_ISREG(fstats.st_mode) and fstats.st_size < size * 1024 ** 2:
return True
except EnvironmentError:
@@ -498,14 +494,13 @@
if self.use_sudo:
cmd = "sudo " + cmd
- zero_md5 = '0f343b0931126a20f133d67c2b018a3b'
bsize = size * (1024 ** 2)
offsets = [random.randrange(bsize - 1024) for _ in range(num_blocks)]
offsets.append(bsize - 1024)
offsets.append(0)
for offset in offsets:
- data = rossh(cmd.format(fname, offset), nolog=True)
+ data = node.run(cmd.format(fname, offset), nolog=True)
md = ""
for line in data.split("\n"):
@@ -517,12 +512,12 @@
logger.error("File data check is failed - " + data)
return True
- if zero_md5 == md:
+ if self.zero_md5 == md:
return True
return False
- def prefill_test_files(self, rossh, files, force=False):
+ def prefill_test_files(self, node: INode, files: List[str], force:bool=False) -> None:
if self.use_system_fio:
cmd_templ = "fio "
else:
@@ -542,7 +537,7 @@
ddtime = 0
for fname, curr_sz in files.items():
if not force:
- if not self.check_prefill_required(rossh, fname, curr_sz):
+ if not self.check_prefill_required(node, fname, curr_sz):
logger.debug("prefill is skipped")
continue
@@ -551,7 +546,7 @@
ssize += curr_sz
stime = time.time()
- rossh(cmd, timeout=curr_sz)
+ node.run(cmd, timeout=curr_sz)
ddtime += time.time() - stime
if ddtime > 1.0:
@@ -559,10 +554,10 @@
mess = "Initiall fio fill bw is {0} MiBps for this vm"
logger.info(mess.format(fill_bw))
- def install_utils(self, node, rossh, max_retry=3, timeout=5):
+ def install_utils(self, node: INode) -> None:
need_install = []
packs = [('screen', 'screen')]
- os_info = get_os(rossh)
+ os_info = get_os(node)
if self.use_system_fio:
packs.append(('fio', 'fio'))
@@ -575,7 +570,7 @@
continue
try:
- rossh('which ' + bin_name, nolog=True)
+ node.run('which ' + bin_name, nolog=True)
except OSError:
need_install.append(package)
@@ -585,14 +580,10 @@
else:
cmd = "sudo apt-get -y install " + " ".join(need_install)
- for _ in range(max_retry):
- try:
- rossh(cmd)
- break
- except OSError as err:
- time.sleep(timeout)
- else:
- raise OSError("Can't install - " + str(err))
+ try:
+ node.run(cmd)
+ except OSError as err:
+ raise OSError("Can't install - {}".format(" ".join(need_install))) from err
if not self.use_system_fio:
fio_dir = os.path.dirname(os.path.dirname(wally.__file__))
@@ -602,19 +593,16 @@
fio_path = os.path.join(fio_dir, fname)
if not os.path.exists(fio_path):
- raise RuntimeError("No prebuild fio available for {0}".format(os_info))
+ raise RuntimeError("No prebuild fio binary available for {0}".format(os_info))
bz_dest = self.join_remote('fio.bz2')
- with node.connection.open_sftp() as sftp:
- sftp.put(fio_path, bz_dest)
+ node.copy_file(fio_path, bz_dest)
+ node.run("bzip2 --decompress {}" + bz_dest, nolog=True)
+ node.run("chmod a+x " + self.join_remote("fio"), nolog=True)
- rossh("bzip2 --decompress " + bz_dest, nolog=True)
- rossh("chmod a+x " + self.join_remote("fio"), nolog=True)
-
- def pre_run(self):
+ def pre_run(self) -> None:
if 'FILESIZE' not in self.config_params:
- # need to detect file size
- pass
+ raise NotImplementedError("File size detection is not implemented")
self.fio_configs = fio_cfg_compile(self.raw_cfg,
self.config_fname,
@@ -641,36 +629,28 @@
force=self.force_prefill)
list(pool.map(fc, self.config.nodes))
- def pre_run_th(self, node, files, force):
+ def pre_run_th(self, node: INode, files: List[str], force_prefil: Optional[bool]=False) -> None:
try:
- # fill files with pseudo-random data
- rossh = run_on_node(node)
- rossh.connection = node.connection
+ cmd = 'mkdir -p "{0}"'.format(self.config.remote_dir)
+ if self.use_sudo:
+ cmd = "sudo " + cmd
+ cmd += " ; sudo chown {0} {1}".format(node.get_user(),
+ self.config.remote_dir)
+ node.run(cmd, nolog=True)
- try:
- cmd = 'mkdir -p "{0}"'.format(self.config.remote_dir)
- if self.use_sudo:
- cmd = "sudo " + cmd
- cmd += " ; sudo chown {0} {1}".format(node.get_user(),
- self.config.remote_dir)
- rossh(cmd, nolog=True)
+ assert self.config.remote_dir != "" and self.config.remote_dir != "/"
+ node.run("rm -rf {}/*".format(self.config.remote_dir), nolog=True)
- assert self.config.remote_dir != "" and self.config.remote_dir != "/"
- rossh("rm -rf {0}/*".format(self.config.remote_dir), nolog=True)
+ except Exception as exc:
+ msg = "Failed to create folder {} on remote {}."
+ msg = msg.format(self.config.remote_dir, node, exc)
+ logger.exception(msg)
+ raise StopTestError(msg) from exc
- except Exception as exc:
- msg = "Failed to create folder {0} on remote {1}. Error: {2!s}"
- msg = msg.format(self.config.remote_dir, node.get_conn_id(), exc)
- logger.exception(msg)
- raise StopTestError(msg, exc)
+ self.install_utils(node)
+ self.prefill_test_files(node, files, force_prefil)
- self.install_utils(node, rossh)
- self.prefill_test_files(rossh, files, force)
- except:
- logger.exception("XXXX")
- raise
-
- def show_test_execution_time(self):
+ def show_expected_execution_time(self) -> None:
if len(self.fio_configs) > 1:
# +10% - is a rough estimation for additional operations
# like sftp, etc
@@ -682,16 +662,17 @@
logger.info(msg.format(exec_time_s,
end_dt.strftime("%H:%M:%S")))
- def run(self):
+ def run(self) -> IOTestResults:
logger.debug("Run preparation")
self.pre_run()
- self.show_test_execution_time()
+ self.show_expected_execution_time()
+ num_nodes = len(self.config.nodes)
tname = os.path.basename(self.config_fname)
if tname.endswith('.cfg'):
tname = tname[:-4]
- barrier = Barrier(len(self.config.nodes))
+ barrier = Barrier(num_nodes)
results = []
# set of Operation_Mode_BlockSize str's
@@ -699,17 +680,14 @@
# they already too slow with previous thread count
lat_bw_limit_reached = set()
- with ThreadPoolExecutor(len(self.config.nodes)) as pool:
+ with ThreadPoolExecutor(num_nodes) as pool:
for pos, fio_cfg in enumerate(self.fio_configs):
- test_descr = get_test_summary(fio_cfg.vals).split("th")[0]
+ test_descr = get_test_summary(fio_cfg.vals, noqd=True)
if test_descr in lat_bw_limit_reached:
continue
- else:
- logger.info("Will run {0} test".format(fio_cfg.name))
- templ = "Test should takes about {0}." + \
- " Should finish at {1}," + \
- " will wait at most till {2}"
+ logger.info("Will run {} test".format(fio_cfg.name))
+ templ = "Test should takes about {}. Should finish at {}, will wait at most till {}"
exec_time = execution_time(fio_cfg)
exec_time_str = sec_to_str(exec_time)
timeout = int(exec_time + max(300, exec_time))
@@ -722,43 +700,37 @@
end_dt.strftime("%H:%M:%S"),
wait_till.strftime("%H:%M:%S")))
- func = functools.partial(self.do_run,
- barrier=barrier,
- fio_cfg=fio_cfg,
- pos=pos)
+ run_test_func = functools.partial(self.do_run,
+ barrier=barrier,
+ fio_cfg=fio_cfg,
+ pos=pos)
max_retr = 3
for idx in range(max_retr):
+ if 0 != idx:
+ logger.info("Sleeping %ss and retrying", self.retry_time)
+ time.sleep(self.retry_time)
+
try:
- intervals = list(pool.map(func, self.config.nodes))
+ intervals = list(pool.map(run_test_func, self.config.nodes))
if None not in intervals:
break
except (EnvironmentError, SSHException) as exc:
+ if max_retr - 1 == idx:
+ raise StopTestError("Fio failed") from exc
logger.exception("During fio run")
- if idx == max_retr - 1:
- raise StopTestError("Fio failed", exc)
- logger.info("Reconnectiong, sleeping %ss and retrying", self.retry_time)
-
- wait([pool.submit(node.connection.close)
- for node in self.config.nodes])
-
- time.sleep(self.retry_time)
-
- wait([pool.submit(reconnect, node.connection, node.conn_url)
- for node in self.config.nodes])
-
- fname = "{0}_task.fio".format(pos)
+ fname = "{}_task.fio".format(pos)
with open(os.path.join(self.config.log_directory, fname), "w") as fd:
fd.write(str(fio_cfg))
- params = {'vm_count': len(self.config.nodes)}
+ params = {'vm_count': num_nodes}
params['name'] = fio_cfg.name
params['vals'] = dict(fio_cfg.vals.items())
params['intervals'] = intervals
params['nodes'] = [node.get_conn_id() for node in self.config.nodes]
- fname = "{0}_params.yaml".format(pos)
+ fname = "{}_params.yaml".format(pos)
with open(os.path.join(self.config.log_directory, fname), "w") as fd:
fd.write(dumps(params))
@@ -770,7 +742,7 @@
# conver us to ms
if self.max_latency < lat_50:
- logger.info(("Will skip all subsequent tests of {0} " +
+ logger.info(("Will skip all subsequent tests of {} " +
"due to lat/bw limits").format(fio_cfg.name))
lat_bw_limit_reached.add(test_descr)
@@ -782,49 +754,7 @@
return IOTestResults(self.config.params['cfg'],
results, self.config.log_directory)
- def do_run(self, node, barrier, fio_cfg, pos, nolog=False):
- if self.use_sudo:
- sudo = "sudo "
- else:
- sudo = ""
-
- bash_file = """
-#!/bin/bash
-
-function get_dev() {{
- if [ -b "$1" ] ; then
- echo $1
- else
- echo $(df "$1" | tail -1 | awk '{{print $1}}')
- fi
-}}
-
-function log_io_activiti(){{
- local dest="$1"
- local dev=$(get_dev "$2")
- local sleep_time="$3"
- dev=$(basename "$dev")
-
- echo $dev
-
- for (( ; ; )) ; do
- grep -E "\\b$dev\\b" /proc/diskstats >> "$dest"
- sleep $sleep_time
- done
-}}
-
-sync
-cd {exec_folder}
-
-log_io_activiti {io_log_file} {test_file} 1 &
-local pid="$!"
-
-{fio_path}fio --output-format=json --output={out_file} --alloc-size=262144 {job_file} >{err_out_file} 2>&1
-echo $? >{res_code_file}
-kill -9 $pid
-
-"""
-
+ def do_run(self, node: INode, barrier: Barrier, fio_cfg, pos: int, nolog: bool=False):
exec_folder = self.config.remote_dir
if self.use_system_fio:
@@ -835,157 +765,20 @@
else:
fio_path = exec_folder
- bash_file = bash_file.format(out_file=self.results_file,
- job_file=self.task_file,
- err_out_file=self.err_out_file,
- res_code_file=self.exit_code_file,
- exec_folder=exec_folder,
- fio_path=fio_path,
- test_file=self.config_params['FILENAME'],
- io_log_file=self.io_log_file).strip()
-
- with node.connection.open_sftp() as sftp:
- save_to_remote(sftp, self.task_file, str(fio_cfg))
- save_to_remote(sftp, self.sh_file, bash_file)
-
exec_time = execution_time(fio_cfg)
-
- timeout = int(exec_time + max(300, exec_time))
- soft_tout = exec_time
-
- begin = time.time()
-
- fnames_before = run_on_node(node)("ls -1 " + exec_folder, nolog=True)
-
barrier.wait()
-
- task = BGSSHTask(node, self.use_sudo)
- task.start(sudo + "bash " + self.sh_file)
-
- while True:
- try:
- task.wait(soft_tout, timeout)
- break
- except paramiko.SSHException:
- pass
-
- try:
- node.connection.close()
- except:
- pass
-
- reconnect(node.connection, node.conn_url)
-
- end = time.time()
- rossh = run_on_node(node)
- fnames_after = rossh("ls -1 " + exec_folder, nolog=True)
-
- conn_id = node.get_conn_id().replace(":", "_")
- if not nolog:
- logger.debug("Test on node {0} is finished".format(conn_id))
-
- log_files_pref = []
- if 'write_lat_log' in fio_cfg.vals:
- fname = fio_cfg.vals['write_lat_log']
- log_files_pref.append(fname + '_clat')
- log_files_pref.append(fname + '_lat')
- log_files_pref.append(fname + '_slat')
-
- if 'write_iops_log' in fio_cfg.vals:
- fname = fio_cfg.vals['write_iops_log']
- log_files_pref.append(fname + '_iops')
-
- if 'write_bw_log' in fio_cfg.vals:
- fname = fio_cfg.vals['write_bw_log']
- log_files_pref.append(fname + '_bw')
-
- files = collections.defaultdict(lambda: [])
- all_files = [os.path.basename(self.results_file)]
- new_files = set(fnames_after.split()) - set(fnames_before.split())
-
- for fname in new_files:
- if fname.endswith('.log') and fname.split('.')[0] in log_files_pref:
- name, _ = os.path.splitext(fname)
- if fname.count('.') == 1:
- tp = name.split("_")[-1]
- cnt = 0
- else:
- tp_cnt = name.split("_")[-1]
- tp, cnt = tp_cnt.split('.')
- files[tp].append((int(cnt), fname))
- all_files.append(fname)
- elif fname == os.path.basename(self.io_log_file):
- files['iops'].append(('sys', fname))
- all_files.append(fname)
-
- arch_name = self.join_remote('wally_result.tar.gz')
- tmp_dir = os.path.join(self.config.log_directory, 'tmp_' + conn_id)
-
- if os.path.exists(tmp_dir):
- shutil.rmtree(tmp_dir)
-
- os.mkdir(tmp_dir)
- loc_arch_name = os.path.join(tmp_dir, 'wally_result.{0}.tar.gz'.format(conn_id))
- file_full_names = " ".join(all_files)
-
- try:
- os.unlink(loc_arch_name)
- except:
- pass
-
- with node.connection.open_sftp() as sftp:
- try:
- exit_code = read_from_remote(sftp, self.exit_code_file)
- except IOError:
- logger.error("No exit code file found on %s. Looks like process failed to start",
- conn_id)
- return None
-
- err_out = read_from_remote(sftp, self.err_out_file)
- exit_code = exit_code.strip()
-
- if exit_code != '0':
- msg = "fio exit with code {0}: {1}".format(exit_code, err_out)
- logger.critical(msg.strip())
- raise StopTestError("fio failed")
-
- rossh("rm -f {0}".format(arch_name), nolog=True)
- pack_files_cmd = "cd {0} ; tar zcvf {1} {2}".format(exec_folder, arch_name, file_full_names)
- rossh(pack_files_cmd, nolog=True)
- sftp.get(arch_name, loc_arch_name)
-
- unpack_files_cmd = "cd {0} ; tar xvzf {1} >/dev/null".format(tmp_dir, loc_arch_name)
- subprocess.check_call(unpack_files_cmd, shell=True)
- os.unlink(loc_arch_name)
-
- for ftype, fls in files.items():
- for idx, fname in fls:
- cname = os.path.join(tmp_dir, fname)
- loc_fname = "{0}_{1}_{2}.{3}.log".format(pos, conn_id, ftype, idx)
- loc_path = os.path.join(self.config.log_directory, loc_fname)
- os.rename(cname, loc_path)
-
- cname = os.path.join(tmp_dir,
- os.path.basename(self.results_file))
- loc_fname = "{0}_{1}_rawres.json".format(pos, conn_id)
- loc_path = os.path.join(self.config.log_directory, loc_fname)
- os.rename(cname, loc_path)
- os.rmdir(tmp_dir)
-
- remove_remote_res_files_cmd = "cd {0} ; rm -f {1} {2}".format(exec_folder,
- arch_name,
- file_full_names)
- rossh(remove_remote_res_files_cmd, nolog=True)
- return begin, end
+ run_data = node.rpc.fio.run_fio(self.use_sudo,
+ fio_path,
+ exec_folder,
+ str(fio_cfg),
+ exec_time + max(300, exec_time))
+ return parse_fio_result(run_data)
@classmethod
- def prepare_data(cls, results):
- """
- create a table with io performance report
- for console
- """
+ def prepare_data(cls, results) -> List[Dict[str, Any]]:
+ """create a table with io performance report for console"""
- def key_func(data):
+ def key_func(data) -> Tuple(str, str, str, str, int):
tpl = data.summary_tpl()
return (data.name,
tpl.oper,
@@ -1067,11 +860,8 @@
fiels_and_header_dct = dict((item.attr, item) for item in fiels_and_header)
@classmethod
- def format_for_console(cls, results):
- """
- create a table with io performance report
- for console
- """
+ def format_for_console(cls, results) -> str:
+ """create a table with io performance report for console"""
tab = texttable.Texttable(max_width=120)
tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
@@ -1090,11 +880,8 @@
return tab.draw()
@classmethod
- def format_diff_for_console(cls, list_of_results):
- """
- create a table with io performance report
- for console
- """
+ def format_diff_for_console(cls, list_of_results: List[Any]) -> str:
+ """create a table with io performance report for console"""
tab = texttable.Texttable(max_width=200)
tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
diff --git a/wally/suits/io/fio_task_parser.py b/wally/suits/io/fio_task_parser.py
index 0f788ed..233f6e2 100644
--- a/wally/suits/io/fio_task_parser.py
+++ b/wally/suits/io/fio_task_parser.py
@@ -1,13 +1,17 @@
+#!/usr/bin/env python3
+
+import re
import os
import sys
import copy
import os.path
import argparse
import itertools
+from typing import Optional, Generator, Union, Dict, Iterable, Any, List, TypeVar, Callable
from collections import OrderedDict, namedtuple
-from wally.utils import sec_to_str, ssize2b
+from ...utils import sec_to_str, ssize2b
SECTION = 0
@@ -20,20 +24,20 @@
'tp', 'name', 'val'))
-class FioJobSection(object):
- def __init__(self, name):
+class FioJobSection:
+ def __init__(self, name: str):
self.name = name
self.vals = OrderedDict()
- def copy(self):
+ def copy(self) -> 'FioJobSection':
return copy.deepcopy(self)
- def required_vars(self):
+ def required_vars(self) -> Generator[str, Var]:
for name, val in self.vals.items():
if isinstance(val, Var):
yield name, val
- def is_free(self):
+ def is_free(self) -> bool:
return len(list(self.required_vars())) == 0
def __str__(self):
@@ -51,7 +55,7 @@
class ParseError(ValueError):
- def __init__(self, msg, fname, lineno, line_cont=""):
+ def __init__(self, msg: str, fname: str, lineno: int, line_cont:Optional[str] =""):
ValueError.__init__(self, msg)
self.file_name = fname
self.lineno = lineno
@@ -65,21 +69,11 @@
super(ParseError, self).__str__())
-def is_name(name):
- if len(name) == 0:
- return False
-
- if name[0] != '_' and not name[0].isalpha():
- return False
-
- for ch in name[1:]:
- if name[0] != '_' and not name[0].isalnum():
- return False
-
- return True
+def is_name(name: str) -> bool:
+ return re.match("[a-zA-Z_][a-zA-Z_0-9]*", name)
-def parse_value(val):
+def parse_value(val: str) -> Union[int, str, Dict, Var]:
try:
return int(val)
except ValueError:
@@ -103,7 +97,7 @@
return val
-def fio_config_lexer(fio_cfg, fname):
+def fio_config_lexer(fio_cfg: str, fname: str) -> Generator[CfgLine]:
for lineno, oline in enumerate(fio_cfg.split("\n")):
try:
line = oline.strip()
@@ -136,7 +130,7 @@
raise ParseError(str(exc), fname, lineno, oline)
-def fio_config_parse(lexer_iter):
+def fio_config_parse(lexer_iter: Iterable[CfgLine]) -> Generator[FioJobSection]:
in_globals = False
curr_section = None
glob_vals = OrderedDict()
@@ -210,19 +204,7 @@
yield curr_section
-def process_repeats(sec):
- sec = sec.copy()
- count = sec.vals.pop('NUM_ROUNDS', 1)
- assert isinstance(count, (int, long))
-
- for _ in range(count):
- yield sec.copy()
-
- if 'ramp_time' in sec.vals:
- sec.vals['_ramp_time'] = sec.vals.pop('ramp_time')
-
-
-def process_cycles(sec):
+def process_cycles(sec: FioJobSection) -> Generator[FioJobSection]:
cycles = OrderedDict()
for name, val in sec.vals.items():
@@ -232,8 +214,8 @@
if len(cycles) == 0:
yield sec
else:
- # thread should changes faster
- numjobs = cycles.pop('numjobs', None)
+ # qd should changes faster
+ numjobs = cycles.pop('qd', None)
items = cycles.items()
if len(items) > 0:
@@ -246,7 +228,7 @@
if numjobs is not None:
vals.append(numjobs)
- keys.append('numjobs')
+ keys.append('qd')
for combination in itertools.product(*vals):
new_sec = sec.copy()
@@ -254,7 +236,11 @@
yield new_sec
-def apply_params(sec, params):
+FIO_PARAM_VAL = Union[str, Var]
+FIO_PARAMS = Dict[str, FIO_PARAM_VAL]
+
+
+def apply_params(sec: FioJobSection, params: FIO_PARAMS) -> FioJobSection:
processed_vals = OrderedDict()
processed_vals.update(params)
for name, val in sec.vals.items():
@@ -273,10 +259,7 @@
return sec
-MAGIC_OFFSET = 0.1885
-
-
-def abbv_name_to_full(name):
+def abbv_name_to_full(name: str) -> str:
assert len(name) == 3
smode = {
@@ -291,12 +274,11 @@
off_mode[name[0]] + " " + oper[name[1]]
-def finall_process(sec, counter=[0]):
- sec = sec.copy()
+MAGIC_OFFSET = 0.1885
- if sec.vals.get('numjobs', '1') != 1:
- msg = "Group reporting should be set if numjobs != 1"
- assert 'group_reporting' in sec.vals, msg
+
+def finall_process(sec: FioJobSection, counter: Optional[List[int]] = [0]) -> FioJobSection:
+ sec = sec.copy()
sec.vals['unified_rw_reporting'] = '1'
@@ -328,7 +310,7 @@
return sec
-def get_test_sync_mode(sec):
+def get_test_sync_mode(sec: FioJobSection) -> str:
if isinstance(sec, dict):
vals = sec
else:
@@ -347,10 +329,10 @@
return 'a'
-TestSumm = namedtuple("TestSumm", ("oper", "mode", "bsize", "th_count", "vm_count"))
+TestSumm = namedtuple("TestSumm", ("oper", "mode", "bsize", "iodepth", "vm_count"))
-def get_test_summary_tuple(sec, vm_count=None):
+def get_test_summary_tuple(sec: FioJobSection, vm_count=None) -> TestSumm:
if isinstance(sec, dict):
vals = sec
else:
@@ -365,48 +347,51 @@
"readwrite": "sm"}[vals["rw"]]
sync_mode = get_test_sync_mode(sec)
- th_count = vals.get('numjobs')
-
- if th_count is None:
- th_count = vals.get('concurence', 1)
return TestSumm(rw,
sync_mode,
vals['blocksize'],
- th_count,
+ vals['iodepth'],
vm_count)
-def get_test_summary(sec, vm_count=None):
+def get_test_summary(sec: FioJobSection, vm_count: int=None, noqd: Optional[bool]=False) -> str:
tpl = get_test_summary_tuple(sec, vm_count)
- res = "{0.oper}{0.mode}{0.bsize}th{0.th_count}".format(tpl)
+
+ res = "{0.oper}{0.mode}{0.bsize}".format(tpl)
+ if not noqd:
+ res += "qd{}".format(tpl.qd)
if tpl.vm_count is not None:
- res += "vm" + str(tpl.vm_count)
+ res += "vm{}".format(tpl.vm_count)
return res
-def execution_time(sec):
+def execution_time(sec: FioJobSection) -> int:
return sec.vals.get('ramp_time', 0) + sec.vals.get('runtime', 0)
-def parse_all_in_1(source, fname=None):
+def parse_all_in_1(source:str, fname: str=None) -> Generator[FioJobSection]:
return fio_config_parse(fio_config_lexer(source, fname))
-def flatmap(func, inp_iter):
+FM_FUNC_INPUT = TypeVar("FM_FUNC_INPUT")
+FM_FUNC_RES = TypeVar("FM_FUNC_RES")
+
+
+def flatmap(func: Callable[[FM_FUNC_INPUT], Iterable[FM_FUNC_RES]],
+ inp_iter: Iterable[FM_FUNC_INPUT]) -> Generator[FM_FUNC_RES]:
for val in inp_iter:
for res in func(val):
yield res
-def fio_cfg_compile(source, fname, test_params):
+def fio_cfg_compile(source: str, fname: str, test_params: FIO_PARAMS) -> Generator[FioJobSection]:
it = parse_all_in_1(source, fname)
it = (apply_params(sec, test_params) for sec in it)
it = flatmap(process_cycles, it)
- it = flatmap(process_repeats, it)
- return itertools.imap(finall_process, it)
+ return map(finall_process, it)
def parse_args(argv):
@@ -438,12 +423,12 @@
sec_it = fio_cfg_compile(job_cfg, argv_obj.jobfile, params)
if argv_obj.action == 'estimate':
- print sec_to_str(sum(map(execution_time, sec_it)))
+ print(sec_to_str(sum(map(execution_time, sec_it))))
elif argv_obj.action == 'num_tests':
- print sum(map(len, map(list, sec_it)))
+ print(sum(map(len, map(list, sec_it))))
elif argv_obj.action == 'compile':
splitter = "\n#" + "-" * 70 + "\n\n"
- print splitter.join(map(str, sec_it))
+ print(splitter.join(map(str, sec_it)))
return 0
diff --git a/wally/suits/io/rpc_plugin.py b/wally/suits/io/rpc_plugin.py
new file mode 100644
index 0000000..ca3f0f3
--- /dev/null
+++ b/wally/suits/io/rpc_plugin.py
@@ -0,0 +1,15 @@
+def rpc_run_fio(cfg):
+ fio_cmd_templ = "cd {exec_folder}; {fio_path}fio --output-format=json " + \
+ "--output={out_file} --alloc-size=262144 {job_file}"
+
+ # fnames_before = node.run("ls -1 " + exec_folder, nolog=True)
+ #
+ # timeout = int(exec_time + max(300, exec_time))
+ # soft_end_time = time.time() + exec_time
+ # logger.error("Fio timeouted on node {}. Killing it".format(node))
+ # end = time.time()
+ # fnames_after = node.run("ls -1 " + exec_folder, nolog=True)
+ #
+
+def parse_fio_result(data):
+ pass
diff --git a/wally/suits/io/rrd.cfg b/wally/suits/io/rrd.cfg
index f42dff6..86de738 100644
--- a/wally/suits/io/rrd.cfg
+++ b/wally/suits/io/rrd.cfg
@@ -1,6 +1,6 @@
[global]
include defaults.cfg
-QD={% 1, 5 %}
+NUMJOBS=8
ramp_time=5
runtime=5