2.0 refactoring:
* Add type for most of functions
* Remove old fio run code, move to RPC/pluggable
* Remove most of sensors code, will move then to RPC
* Other refactoring
diff --git a/wally/suits/io/fio.py b/wally/suits/io/fio.py
index 7b77461..50bb1fd 100644
--- a/wally/suits/io/fio.py
+++ b/wally/suits/io/fio.py
@@ -3,51 +3,45 @@
import json
import stat
import random
-import shutil
+import hashlib
import os.path
import logging
import datetime
import functools
-import subprocess
import collections
+from typing import Dict, List, Callable, Any, Tuple, Optional
import yaml
-import paramiko
import texttable
from paramiko.ssh_exception import SSHException
from concurrent.futures import ThreadPoolExecutor, wait
import wally
-from wally.pretty_yaml import dumps
-from wally.statistic import round_3_digit, data_property, average
-from wally.utils import ssize2b, sec_to_str, StopTestError, Barrier, get_os
-from wally.ssh_utils import (save_to_remote, read_from_remote, BGSSHTask, reconnect)
+
+from ...pretty_yaml import dumps
+from ...statistic import round_3_digit, data_property, average
+from ...utils import ssize2b, sec_to_str, StopTestError, Barrier, get_os
+from ...inode import INode
+
+from ..itest import (TimeSeriesValue, PerfTest, TestResults, TestConfig)
from .fio_task_parser import (execution_time, fio_cfg_compile,
get_test_summary, get_test_summary_tuple,
get_test_sync_mode, FioJobSection)
+from .rpc_plugin import parse_fio_result
-from ..itest import (TimeSeriesValue, PerfTest, TestResults,
- run_on_node, TestConfig, MeasurementMatrix)
logger = logging.getLogger("wally")
-# Results folder structure
-# results/
-# {loadtype}_{num}/
-# config.yaml
-# ......
-
-
-class NoData(object):
+class NoData:
pass
-def cached_prop(func):
+def cached_prop(func: Callable[..., Any]) -> Callable[..., Any]:
@property
@functools.wraps(func)
- def closure(self):
+ def closure(self) -> Any:
val = getattr(self, "_" + func.__name__)
if val is NoData:
val = func(self)
@@ -56,7 +50,7 @@
return closure
-def load_fio_log_file(fname):
+def load_fio_log_file(fname: str) -> TimeSeriesValue:
with open(fname) as fd:
it = [ln.split(',')[:2] for ln in fd]
@@ -72,7 +66,7 @@
WRITE_IOPS_DISCSTAT_POS = 7
-def load_sys_log_file(ftype, fname):
+def load_sys_log_file(ftype: str, fname: str) -> TimeSeriesValue:
assert ftype == 'iops'
pval = None
with open(fname) as fd:
@@ -89,7 +83,7 @@
return TimeSeriesValue(vals)
-def load_test_results(folder, run_num):
+def load_test_results(folder: str, run_num: int) -> 'FioRunResult':
res = {}
params = None
@@ -97,7 +91,7 @@
params = yaml.load(open(fn).read())
conn_ids_set = set()
- rr = r"{0}_(?P<conn_id>.*?)_(?P<type>[^_.]*)\.\d+\.log$".format(run_num)
+ rr = r"{}_(?P<conn_id>.*?)_(?P<type>[^_.]*)\.\d+\.log$".format(run_num)
for fname in os.listdir(folder):
rm = re.match(rr, fname)
if rm is None:
@@ -115,7 +109,7 @@
conn_ids_set.add(conn_id)
- rr = r"{0}_(?P<conn_id>.*?)_(?P<type>[^_.]*)\.sys\.log$".format(run_num)
+ rr = r"{}_(?P<conn_id>.*?)_(?P<type>[^_.]*)\.sys\.log$".format(run_num)
for fname in os.listdir(folder):
rm = re.match(rr, fname)
if rm is None:
@@ -159,8 +153,8 @@
return FioRunResult(config, fio_task, mm_res, raw_res, params['intervals'], run_num)
-class Attrmapper(object):
- def __init__(self, dct):
+class Attrmapper:
+ def __init__(self, dct: Dict[str, Any]):
self.__dct = dct
def __getattr__(self, name):
@@ -170,8 +164,8 @@
raise AttributeError(name)
-class DiskPerfInfo(object):
- def __init__(self, name, summary, params, testnodes_count):
+class DiskPerfInfo:
+ def __init__(self, name: str, summary: str, params: Dict[str, Any], testnodes_count: int):
self.name = name
self.bw = None
self.iops = None
@@ -193,7 +187,7 @@
self.concurence = self.params['vals'].get('numjobs', 1)
-def get_lat_perc_50_95(lat_mks):
+def get_lat_perc_50_95(lat_mks: List[float]) -> Tuple[float, float]:
curr_perc = 0
perc_50 = None
perc_95 = None
@@ -224,8 +218,8 @@
return perc_50 / 1000., perc_95 / 1000.
-class IOTestResults(object):
- def __init__(self, suite_name, fio_results, log_directory):
+class IOTestResults:
+ def __init__(self, suite_name: str, fio_results: 'FioRunResult', log_directory: str):
self.suite_name = suite_name
self.fio_results = fio_results
self.log_directory = log_directory
@@ -236,7 +230,7 @@
def __len__(self):
return len(self.fio_results)
- def get_yamable(self):
+ def get_yamable(self) -> Dict[str, List[str]]:
items = [(fio_res.summary(), fio_res.idx) for fio_res in self]
return {self.suite_name: [self.log_directory] + items}
@@ -424,6 +418,10 @@
soft_runcycle = 5 * 60
retry_time = 30
+ zero_md5_hash = hashlib.md5()
+ zero_md5_hash.update(b"\x00" * 1024)
+ zero_md5 = zero_md5_hash.hexdigest()
+
def __init__(self, config):
PerfTest.__init__(self, config)
@@ -464,7 +462,7 @@
self.fio_configs = None
@classmethod
- def load(cls, suite_name, folder):
+ def load(cls, suite_name: str, folder: str) -> IOTestResults:
res = []
for fname in os.listdir(folder):
if re.match("\d+_params.yaml$", fname):
@@ -478,11 +476,9 @@
pass
# size is megabytes
- def check_prefill_required(self, rossh, fname, size, num_blocks=16):
+ def check_prefill_required(self, node: INode, fname: str, size: int, num_blocks: Optional[int]=16) -> bool:
try:
- with rossh.connection.open_sftp() as sftp:
- fstats = sftp.stat(fname)
-
+ fstats = node.stat_file(fname)
if stat.S_ISREG(fstats.st_mode) and fstats.st_size < size * 1024 ** 2:
return True
except EnvironmentError:
@@ -498,14 +494,13 @@
if self.use_sudo:
cmd = "sudo " + cmd
- zero_md5 = '0f343b0931126a20f133d67c2b018a3b'
bsize = size * (1024 ** 2)
offsets = [random.randrange(bsize - 1024) for _ in range(num_blocks)]
offsets.append(bsize - 1024)
offsets.append(0)
for offset in offsets:
- data = rossh(cmd.format(fname, offset), nolog=True)
+ data = node.run(cmd.format(fname, offset), nolog=True)
md = ""
for line in data.split("\n"):
@@ -517,12 +512,12 @@
logger.error("File data check is failed - " + data)
return True
- if zero_md5 == md:
+ if self.zero_md5 == md:
return True
return False
- def prefill_test_files(self, rossh, files, force=False):
+ def prefill_test_files(self, node: INode, files: List[str], force:bool=False) -> None:
if self.use_system_fio:
cmd_templ = "fio "
else:
@@ -542,7 +537,7 @@
ddtime = 0
for fname, curr_sz in files.items():
if not force:
- if not self.check_prefill_required(rossh, fname, curr_sz):
+ if not self.check_prefill_required(node, fname, curr_sz):
logger.debug("prefill is skipped")
continue
@@ -551,7 +546,7 @@
ssize += curr_sz
stime = time.time()
- rossh(cmd, timeout=curr_sz)
+ node.run(cmd, timeout=curr_sz)
ddtime += time.time() - stime
if ddtime > 1.0:
@@ -559,10 +554,10 @@
mess = "Initiall fio fill bw is {0} MiBps for this vm"
logger.info(mess.format(fill_bw))
- def install_utils(self, node, rossh, max_retry=3, timeout=5):
+ def install_utils(self, node: INode) -> None:
need_install = []
packs = [('screen', 'screen')]
- os_info = get_os(rossh)
+ os_info = get_os(node)
if self.use_system_fio:
packs.append(('fio', 'fio'))
@@ -575,7 +570,7 @@
continue
try:
- rossh('which ' + bin_name, nolog=True)
+ node.run('which ' + bin_name, nolog=True)
except OSError:
need_install.append(package)
@@ -585,14 +580,10 @@
else:
cmd = "sudo apt-get -y install " + " ".join(need_install)
- for _ in range(max_retry):
- try:
- rossh(cmd)
- break
- except OSError as err:
- time.sleep(timeout)
- else:
- raise OSError("Can't install - " + str(err))
+ try:
+ node.run(cmd)
+ except OSError as err:
+ raise OSError("Can't install - {}".format(" ".join(need_install))) from err
if not self.use_system_fio:
fio_dir = os.path.dirname(os.path.dirname(wally.__file__))
@@ -602,19 +593,16 @@
fio_path = os.path.join(fio_dir, fname)
if not os.path.exists(fio_path):
- raise RuntimeError("No prebuild fio available for {0}".format(os_info))
+ raise RuntimeError("No prebuild fio binary available for {0}".format(os_info))
bz_dest = self.join_remote('fio.bz2')
- with node.connection.open_sftp() as sftp:
- sftp.put(fio_path, bz_dest)
+ node.copy_file(fio_path, bz_dest)
+ node.run("bzip2 --decompress {}" + bz_dest, nolog=True)
+ node.run("chmod a+x " + self.join_remote("fio"), nolog=True)
- rossh("bzip2 --decompress " + bz_dest, nolog=True)
- rossh("chmod a+x " + self.join_remote("fio"), nolog=True)
-
- def pre_run(self):
+ def pre_run(self) -> None:
if 'FILESIZE' not in self.config_params:
- # need to detect file size
- pass
+ raise NotImplementedError("File size detection is not implemented")
self.fio_configs = fio_cfg_compile(self.raw_cfg,
self.config_fname,
@@ -641,36 +629,28 @@
force=self.force_prefill)
list(pool.map(fc, self.config.nodes))
- def pre_run_th(self, node, files, force):
+ def pre_run_th(self, node: INode, files: List[str], force_prefil: Optional[bool]=False) -> None:
try:
- # fill files with pseudo-random data
- rossh = run_on_node(node)
- rossh.connection = node.connection
+ cmd = 'mkdir -p "{0}"'.format(self.config.remote_dir)
+ if self.use_sudo:
+ cmd = "sudo " + cmd
+ cmd += " ; sudo chown {0} {1}".format(node.get_user(),
+ self.config.remote_dir)
+ node.run(cmd, nolog=True)
- try:
- cmd = 'mkdir -p "{0}"'.format(self.config.remote_dir)
- if self.use_sudo:
- cmd = "sudo " + cmd
- cmd += " ; sudo chown {0} {1}".format(node.get_user(),
- self.config.remote_dir)
- rossh(cmd, nolog=True)
+ assert self.config.remote_dir != "" and self.config.remote_dir != "/"
+ node.run("rm -rf {}/*".format(self.config.remote_dir), nolog=True)
- assert self.config.remote_dir != "" and self.config.remote_dir != "/"
- rossh("rm -rf {0}/*".format(self.config.remote_dir), nolog=True)
+ except Exception as exc:
+ msg = "Failed to create folder {} on remote {}."
+ msg = msg.format(self.config.remote_dir, node, exc)
+ logger.exception(msg)
+ raise StopTestError(msg) from exc
- except Exception as exc:
- msg = "Failed to create folder {0} on remote {1}. Error: {2!s}"
- msg = msg.format(self.config.remote_dir, node.get_conn_id(), exc)
- logger.exception(msg)
- raise StopTestError(msg, exc)
+ self.install_utils(node)
+ self.prefill_test_files(node, files, force_prefil)
- self.install_utils(node, rossh)
- self.prefill_test_files(rossh, files, force)
- except:
- logger.exception("XXXX")
- raise
-
- def show_test_execution_time(self):
+ def show_expected_execution_time(self) -> None:
if len(self.fio_configs) > 1:
# +10% - is a rough estimation for additional operations
# like sftp, etc
@@ -682,16 +662,17 @@
logger.info(msg.format(exec_time_s,
end_dt.strftime("%H:%M:%S")))
- def run(self):
+ def run(self) -> IOTestResults:
logger.debug("Run preparation")
self.pre_run()
- self.show_test_execution_time()
+ self.show_expected_execution_time()
+ num_nodes = len(self.config.nodes)
tname = os.path.basename(self.config_fname)
if tname.endswith('.cfg'):
tname = tname[:-4]
- barrier = Barrier(len(self.config.nodes))
+ barrier = Barrier(num_nodes)
results = []
# set of Operation_Mode_BlockSize str's
@@ -699,17 +680,14 @@
# they already too slow with previous thread count
lat_bw_limit_reached = set()
- with ThreadPoolExecutor(len(self.config.nodes)) as pool:
+ with ThreadPoolExecutor(num_nodes) as pool:
for pos, fio_cfg in enumerate(self.fio_configs):
- test_descr = get_test_summary(fio_cfg.vals).split("th")[0]
+ test_descr = get_test_summary(fio_cfg.vals, noqd=True)
if test_descr in lat_bw_limit_reached:
continue
- else:
- logger.info("Will run {0} test".format(fio_cfg.name))
- templ = "Test should takes about {0}." + \
- " Should finish at {1}," + \
- " will wait at most till {2}"
+ logger.info("Will run {} test".format(fio_cfg.name))
+ templ = "Test should takes about {}. Should finish at {}, will wait at most till {}"
exec_time = execution_time(fio_cfg)
exec_time_str = sec_to_str(exec_time)
timeout = int(exec_time + max(300, exec_time))
@@ -722,43 +700,37 @@
end_dt.strftime("%H:%M:%S"),
wait_till.strftime("%H:%M:%S")))
- func = functools.partial(self.do_run,
- barrier=barrier,
- fio_cfg=fio_cfg,
- pos=pos)
+ run_test_func = functools.partial(self.do_run,
+ barrier=barrier,
+ fio_cfg=fio_cfg,
+ pos=pos)
max_retr = 3
for idx in range(max_retr):
+ if 0 != idx:
+ logger.info("Sleeping %ss and retrying", self.retry_time)
+ time.sleep(self.retry_time)
+
try:
- intervals = list(pool.map(func, self.config.nodes))
+ intervals = list(pool.map(run_test_func, self.config.nodes))
if None not in intervals:
break
except (EnvironmentError, SSHException) as exc:
+ if max_retr - 1 == idx:
+ raise StopTestError("Fio failed") from exc
logger.exception("During fio run")
- if idx == max_retr - 1:
- raise StopTestError("Fio failed", exc)
- logger.info("Reconnectiong, sleeping %ss and retrying", self.retry_time)
-
- wait([pool.submit(node.connection.close)
- for node in self.config.nodes])
-
- time.sleep(self.retry_time)
-
- wait([pool.submit(reconnect, node.connection, node.conn_url)
- for node in self.config.nodes])
-
- fname = "{0}_task.fio".format(pos)
+ fname = "{}_task.fio".format(pos)
with open(os.path.join(self.config.log_directory, fname), "w") as fd:
fd.write(str(fio_cfg))
- params = {'vm_count': len(self.config.nodes)}
+ params = {'vm_count': num_nodes}
params['name'] = fio_cfg.name
params['vals'] = dict(fio_cfg.vals.items())
params['intervals'] = intervals
params['nodes'] = [node.get_conn_id() for node in self.config.nodes]
- fname = "{0}_params.yaml".format(pos)
+ fname = "{}_params.yaml".format(pos)
with open(os.path.join(self.config.log_directory, fname), "w") as fd:
fd.write(dumps(params))
@@ -770,7 +742,7 @@
# conver us to ms
if self.max_latency < lat_50:
- logger.info(("Will skip all subsequent tests of {0} " +
+ logger.info(("Will skip all subsequent tests of {} " +
"due to lat/bw limits").format(fio_cfg.name))
lat_bw_limit_reached.add(test_descr)
@@ -782,49 +754,7 @@
return IOTestResults(self.config.params['cfg'],
results, self.config.log_directory)
- def do_run(self, node, barrier, fio_cfg, pos, nolog=False):
- if self.use_sudo:
- sudo = "sudo "
- else:
- sudo = ""
-
- bash_file = """
-#!/bin/bash
-
-function get_dev() {{
- if [ -b "$1" ] ; then
- echo $1
- else
- echo $(df "$1" | tail -1 | awk '{{print $1}}')
- fi
-}}
-
-function log_io_activiti(){{
- local dest="$1"
- local dev=$(get_dev "$2")
- local sleep_time="$3"
- dev=$(basename "$dev")
-
- echo $dev
-
- for (( ; ; )) ; do
- grep -E "\\b$dev\\b" /proc/diskstats >> "$dest"
- sleep $sleep_time
- done
-}}
-
-sync
-cd {exec_folder}
-
-log_io_activiti {io_log_file} {test_file} 1 &
-local pid="$!"
-
-{fio_path}fio --output-format=json --output={out_file} --alloc-size=262144 {job_file} >{err_out_file} 2>&1
-echo $? >{res_code_file}
-kill -9 $pid
-
-"""
-
+ def do_run(self, node: INode, barrier: Barrier, fio_cfg, pos: int, nolog: bool=False):
exec_folder = self.config.remote_dir
if self.use_system_fio:
@@ -835,157 +765,20 @@
else:
fio_path = exec_folder
- bash_file = bash_file.format(out_file=self.results_file,
- job_file=self.task_file,
- err_out_file=self.err_out_file,
- res_code_file=self.exit_code_file,
- exec_folder=exec_folder,
- fio_path=fio_path,
- test_file=self.config_params['FILENAME'],
- io_log_file=self.io_log_file).strip()
-
- with node.connection.open_sftp() as sftp:
- save_to_remote(sftp, self.task_file, str(fio_cfg))
- save_to_remote(sftp, self.sh_file, bash_file)
-
exec_time = execution_time(fio_cfg)
-
- timeout = int(exec_time + max(300, exec_time))
- soft_tout = exec_time
-
- begin = time.time()
-
- fnames_before = run_on_node(node)("ls -1 " + exec_folder, nolog=True)
-
barrier.wait()
-
- task = BGSSHTask(node, self.use_sudo)
- task.start(sudo + "bash " + self.sh_file)
-
- while True:
- try:
- task.wait(soft_tout, timeout)
- break
- except paramiko.SSHException:
- pass
-
- try:
- node.connection.close()
- except:
- pass
-
- reconnect(node.connection, node.conn_url)
-
- end = time.time()
- rossh = run_on_node(node)
- fnames_after = rossh("ls -1 " + exec_folder, nolog=True)
-
- conn_id = node.get_conn_id().replace(":", "_")
- if not nolog:
- logger.debug("Test on node {0} is finished".format(conn_id))
-
- log_files_pref = []
- if 'write_lat_log' in fio_cfg.vals:
- fname = fio_cfg.vals['write_lat_log']
- log_files_pref.append(fname + '_clat')
- log_files_pref.append(fname + '_lat')
- log_files_pref.append(fname + '_slat')
-
- if 'write_iops_log' in fio_cfg.vals:
- fname = fio_cfg.vals['write_iops_log']
- log_files_pref.append(fname + '_iops')
-
- if 'write_bw_log' in fio_cfg.vals:
- fname = fio_cfg.vals['write_bw_log']
- log_files_pref.append(fname + '_bw')
-
- files = collections.defaultdict(lambda: [])
- all_files = [os.path.basename(self.results_file)]
- new_files = set(fnames_after.split()) - set(fnames_before.split())
-
- for fname in new_files:
- if fname.endswith('.log') and fname.split('.')[0] in log_files_pref:
- name, _ = os.path.splitext(fname)
- if fname.count('.') == 1:
- tp = name.split("_")[-1]
- cnt = 0
- else:
- tp_cnt = name.split("_")[-1]
- tp, cnt = tp_cnt.split('.')
- files[tp].append((int(cnt), fname))
- all_files.append(fname)
- elif fname == os.path.basename(self.io_log_file):
- files['iops'].append(('sys', fname))
- all_files.append(fname)
-
- arch_name = self.join_remote('wally_result.tar.gz')
- tmp_dir = os.path.join(self.config.log_directory, 'tmp_' + conn_id)
-
- if os.path.exists(tmp_dir):
- shutil.rmtree(tmp_dir)
-
- os.mkdir(tmp_dir)
- loc_arch_name = os.path.join(tmp_dir, 'wally_result.{0}.tar.gz'.format(conn_id))
- file_full_names = " ".join(all_files)
-
- try:
- os.unlink(loc_arch_name)
- except:
- pass
-
- with node.connection.open_sftp() as sftp:
- try:
- exit_code = read_from_remote(sftp, self.exit_code_file)
- except IOError:
- logger.error("No exit code file found on %s. Looks like process failed to start",
- conn_id)
- return None
-
- err_out = read_from_remote(sftp, self.err_out_file)
- exit_code = exit_code.strip()
-
- if exit_code != '0':
- msg = "fio exit with code {0}: {1}".format(exit_code, err_out)
- logger.critical(msg.strip())
- raise StopTestError("fio failed")
-
- rossh("rm -f {0}".format(arch_name), nolog=True)
- pack_files_cmd = "cd {0} ; tar zcvf {1} {2}".format(exec_folder, arch_name, file_full_names)
- rossh(pack_files_cmd, nolog=True)
- sftp.get(arch_name, loc_arch_name)
-
- unpack_files_cmd = "cd {0} ; tar xvzf {1} >/dev/null".format(tmp_dir, loc_arch_name)
- subprocess.check_call(unpack_files_cmd, shell=True)
- os.unlink(loc_arch_name)
-
- for ftype, fls in files.items():
- for idx, fname in fls:
- cname = os.path.join(tmp_dir, fname)
- loc_fname = "{0}_{1}_{2}.{3}.log".format(pos, conn_id, ftype, idx)
- loc_path = os.path.join(self.config.log_directory, loc_fname)
- os.rename(cname, loc_path)
-
- cname = os.path.join(tmp_dir,
- os.path.basename(self.results_file))
- loc_fname = "{0}_{1}_rawres.json".format(pos, conn_id)
- loc_path = os.path.join(self.config.log_directory, loc_fname)
- os.rename(cname, loc_path)
- os.rmdir(tmp_dir)
-
- remove_remote_res_files_cmd = "cd {0} ; rm -f {1} {2}".format(exec_folder,
- arch_name,
- file_full_names)
- rossh(remove_remote_res_files_cmd, nolog=True)
- return begin, end
+ run_data = node.rpc.fio.run_fio(self.use_sudo,
+ fio_path,
+ exec_folder,
+ str(fio_cfg),
+ exec_time + max(300, exec_time))
+ return parse_fio_result(run_data)
@classmethod
- def prepare_data(cls, results):
- """
- create a table with io performance report
- for console
- """
+ def prepare_data(cls, results) -> List[Dict[str, Any]]:
+ """create a table with io performance report for console"""
- def key_func(data):
+ def key_func(data) -> Tuple(str, str, str, str, int):
tpl = data.summary_tpl()
return (data.name,
tpl.oper,
@@ -1067,11 +860,8 @@
fiels_and_header_dct = dict((item.attr, item) for item in fiels_and_header)
@classmethod
- def format_for_console(cls, results):
- """
- create a table with io performance report
- for console
- """
+ def format_for_console(cls, results) -> str:
+ """create a table with io performance report for console"""
tab = texttable.Texttable(max_width=120)
tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
@@ -1090,11 +880,8 @@
return tab.draw()
@classmethod
- def format_diff_for_console(cls, list_of_results):
- """
- create a table with io performance report
- for console
- """
+ def format_diff_for_console(cls, list_of_results: List[Any]) -> str:
+ """create a table with io performance report for console"""
tab = texttable.Texttable(max_width=200)
tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)