2.0 is on the way
diff --git a/wally/suits/io/fio.py b/wally/suits/io/fio.py
index 2360a55..0f4ebde 100644
--- a/wally/suits/io/fio.py
+++ b/wally/suits/io/fio.py
@@ -1,970 +1,128 @@
-import re
-import time
-import json
-import stat
-import random
-import hashlib
import os.path
import logging
-import datetime
-import functools
-import collections
-from typing import Dict, List, Callable, Any, Tuple, Optional
-
-import yaml
-import texttable
-from paramiko.ssh_exception import SSHException
-from concurrent.futures import ThreadPoolExecutor, wait
+from typing import Dict, List, Union, cast
import wally
-from ...pretty_yaml import dumps
-from ...statistic import round_3_digit, data_property, average
-from ...utils import ssize2b, sec_to_str, StopTestError, Barrier, get_os
-from ...inode import INode
-
-from ..itest import (TimeSeriesValue, PerfTest, TestResults, TestConfig)
-
-from .fio_task_parser import (execution_time, fio_cfg_compile,
- get_test_summary, get_test_summary_tuple,
- get_test_sync_mode, FioJobSection)
-from .rpc_plugin import parse_fio_result
+from ...utils import ssize2b, StopTestError, get_os
+from ...node_interfaces import IRPCNode
+from ..itest import ThreadedTest, IterationConfig, RunTestRes
+from .fio_task_parser import execution_time, fio_cfg_compile, FioJobSection, FioParams
logger = logging.getLogger("wally")
-class NoData:
- pass
-
-
-def cached_prop(func: Callable[..., Any]) -> Callable[..., Any]:
- @property
- @functools.wraps(func)
- def closure(self) -> Any:
- val = getattr(self, "_" + func.__name__)
- if val is NoData:
- val = func(self)
- setattr(self, "_" + func.__name__, val)
- return val
- return closure
-
-
-def load_fio_log_file(fname: str) -> TimeSeriesValue:
- with open(fname) as fd:
- it = [ln.split(',')[:2] for ln in fd]
-
- vals = [(float(off) / 1000, # convert us to ms
- float(val.strip()) + 0.5) # add 0.5 to compemsate average value
- # as fio trimm all values in log to integer
- for off, val in it]
-
- return TimeSeriesValue(vals)
-
-
-READ_IOPS_DISCSTAT_POS = 3
-WRITE_IOPS_DISCSTAT_POS = 7
-
-
-def load_sys_log_file(ftype: str, fname: str) -> TimeSeriesValue:
- assert ftype == 'iops'
- pval = None
- with open(fname) as fd:
- iops = []
- for ln in fd:
- params = ln.split()
- cval = int(params[WRITE_IOPS_DISCSTAT_POS]) + \
- int(params[READ_IOPS_DISCSTAT_POS])
- if pval is not None:
- iops.append(cval - pval)
- pval = cval
-
- vals = [(idx * 1000, val) for idx, val in enumerate(iops)]
- return TimeSeriesValue(vals)
-
-
-def load_test_results(folder: str, run_num: int) -> 'FioRunResult':
- res = {}
- params = None
-
- fn = os.path.join(folder, str(run_num) + '_params.yaml')
- params = yaml.load(open(fn).read())
-
- conn_ids_set = set()
- rr = r"{}_(?P<conn_id>.*?)_(?P<type>[^_.]*)\.\d+\.log$".format(run_num)
- for fname in os.listdir(folder):
- rm = re.match(rr, fname)
- if rm is None:
- continue
-
- conn_id_s = rm.group('conn_id')
- conn_id = conn_id_s.replace('_', ':')
- ftype = rm.group('type')
-
- if ftype not in ('iops', 'bw', 'lat'):
- continue
-
- ts = load_fio_log_file(os.path.join(folder, fname))
- res.setdefault(ftype, {}).setdefault(conn_id, []).append(ts)
-
- conn_ids_set.add(conn_id)
-
- rr = r"{}_(?P<conn_id>.*?)_(?P<type>[^_.]*)\.sys\.log$".format(run_num)
- for fname in os.listdir(folder):
- rm = re.match(rr, fname)
- if rm is None:
- continue
-
- conn_id_s = rm.group('conn_id')
- conn_id = conn_id_s.replace('_', ':')
- ftype = rm.group('type')
-
- if ftype not in ('iops', 'bw', 'lat'):
- continue
-
- ts = load_sys_log_file(ftype, os.path.join(folder, fname))
- res.setdefault(ftype + ":sys", {}).setdefault(conn_id, []).append(ts)
-
- conn_ids_set.add(conn_id)
-
- mm_res = {}
-
- if len(res) == 0:
- raise ValueError("No data was found")
-
- for key, data in res.items():
- conn_ids = sorted(conn_ids_set)
- awail_ids = [conn_id for conn_id in conn_ids if conn_id in data]
- matr = [data[conn_id] for conn_id in awail_ids]
- mm_res[key] = MeasurementMatrix(matr, awail_ids)
-
- raw_res = {}
- for conn_id in conn_ids:
- fn = os.path.join(folder, "{0}_{1}_rawres.json".format(run_num, conn_id_s))
-
- # remove message hack
- fc = "{" + open(fn).read().split('{', 1)[1]
- raw_res[conn_id] = json.loads(fc)
-
- fio_task = FioJobSection(params['name'])
- fio_task.vals.update(params['vals'])
-
- config = TestConfig('io', params, None, params['nodes'], folder, None)
- return FioRunResult(config, fio_task, mm_res, raw_res, params['intervals'], run_num)
-
-
-class Attrmapper:
- def __init__(self, dct: Dict[str, Any]):
- self.__dct = dct
-
- def __getattr__(self, name):
- try:
- return self.__dct[name]
- except KeyError:
- raise AttributeError(name)
-
-
-class DiskPerfInfo:
- def __init__(self, name: str, summary: str, params: Dict[str, Any], testnodes_count: int):
- self.name = name
- self.bw = None
- self.iops = None
- self.lat = None
- self.lat_50 = None
- self.lat_95 = None
- self.lat_avg = None
-
- self.raw_bw = []
- self.raw_iops = []
- self.raw_lat = []
-
- self.params = params
- self.testnodes_count = testnodes_count
- self.summary = summary
- self.p = Attrmapper(self.params['vals'])
-
- self.sync_mode = get_test_sync_mode(self.params['vals'])
- self.concurence = self.params['vals'].get('numjobs', 1)
-
-
-def get_lat_perc_50_95(lat_mks: List[float]) -> Tuple[float, float]:
- curr_perc = 0
- perc_50 = None
- perc_95 = None
- pkey = None
- for key, val in sorted(lat_mks.items()):
- if curr_perc + val >= 50 and perc_50 is None:
- if pkey is None or val < 1.:
- perc_50 = key
- else:
- perc_50 = (50. - curr_perc) / val * (key - pkey) + pkey
-
- if curr_perc + val >= 95:
- if pkey is None or val < 1.:
- perc_95 = key
- else:
- perc_95 = (95. - curr_perc) / val * (key - pkey) + pkey
- break
-
- pkey = key
- curr_perc += val
-
- # for k, v in sorted(lat_mks.items()):
- # if k / 1000 > 0:
- # print "{0:>4}".format(k / 1000), v
-
- # print perc_50 / 1000., perc_95 / 1000.
- # exit(1)
- return perc_50 / 1000., perc_95 / 1000.
-
-
-class IOTestResults:
- def __init__(self, suite_name: str, fio_results: 'FioRunResult', log_directory: str):
- self.suite_name = suite_name
- self.fio_results = fio_results
- self.log_directory = log_directory
-
- def __iter__(self):
- return iter(self.fio_results)
-
- def __len__(self):
- return len(self.fio_results)
-
- def get_yamable(self) -> Dict[str, List[str]]:
- items = [(fio_res.summary(), fio_res.idx) for fio_res in self]
- return {self.suite_name: [self.log_directory] + items}
-
-
-class FioRunResult(TestResults):
- """
- Fio run results
- config: TestConfig
- fio_task: FioJobSection
- ts_results: {str: MeasurementMatrix[TimeSeriesValue]}
- raw_result: ????
- run_interval:(float, float) - test tun time, used for sensors
- """
- def __init__(self, config, fio_task, ts_results, raw_result, run_interval, idx):
-
- self.name = fio_task.name.rsplit("_", 1)[0]
- self.fio_task = fio_task
- self.idx = idx
-
- self.bw = ts_results['bw']
- self.lat = ts_results['lat']
- self.iops = ts_results['iops']
-
- if 'iops:sys' in ts_results:
- self.iops_sys = ts_results['iops:sys']
- else:
- self.iops_sys = None
-
- res = {"bw": self.bw,
- "lat": self.lat,
- "iops": self.iops,
- "iops:sys": self.iops_sys}
-
- self.sensors_data = None
- self._pinfo = None
- TestResults.__init__(self, config, res, raw_result, run_interval)
-
- def get_params_from_fio_report(self):
- nodes = self.bw.connections_ids
-
- iops = [self.raw_result[node]['jobs'][0]['mixed']['iops'] for node in nodes]
- total_ios = [self.raw_result[node]['jobs'][0]['mixed']['total_ios'] for node in nodes]
- runtime = [self.raw_result[node]['jobs'][0]['mixed']['runtime'] / 1000 for node in nodes]
- flt_iops = [float(ios) / rtime for ios, rtime in zip(total_ios, runtime)]
-
- bw = [self.raw_result[node]['jobs'][0]['mixed']['bw'] for node in nodes]
- total_bytes = [self.raw_result[node]['jobs'][0]['mixed']['io_bytes'] for node in nodes]
- flt_bw = [float(tbytes) / rtime for tbytes, rtime in zip(total_bytes, runtime)]
-
- return {'iops': iops,
- 'flt_iops': flt_iops,
- 'bw': bw,
- 'flt_bw': flt_bw}
-
- def summary(self):
- return get_test_summary(self.fio_task, len(self.config.nodes))
-
- def summary_tpl(self):
- return get_test_summary_tuple(self.fio_task, len(self.config.nodes))
-
- def get_lat_perc_50_95_multy(self):
- lat_mks = collections.defaultdict(lambda: 0)
- num_res = 0
-
- for result in self.raw_result.values():
- num_res += len(result['jobs'])
- for job_info in result['jobs']:
- for k, v in job_info['latency_ms'].items():
- if isinstance(k, basestring) and k.startswith('>='):
- lat_mks[int(k[2:]) * 1000] += v
- else:
- lat_mks[int(k) * 1000] += v
-
- for k, v in job_info['latency_us'].items():
- lat_mks[int(k)] += v
-
- for k, v in lat_mks.items():
- lat_mks[k] = float(v) / num_res
- return get_lat_perc_50_95(lat_mks)
-
- def disk_perf_info(self, avg_interval=2.0):
-
- if self._pinfo is not None:
- return self._pinfo
-
- testnodes_count = len(self.config.nodes)
-
- pinfo = DiskPerfInfo(self.name,
- self.summary(),
- self.params,
- testnodes_count)
-
- def prepare(data, drop=1):
- if data is None:
- return data
-
- res = []
- for ts_data in data:
- if ts_data.average_interval() < avg_interval:
- ts_data = ts_data.derived(avg_interval)
-
- # drop last value on bounds
- # as they may contains ranges without activities
- assert len(ts_data.values) >= drop + 1, str(drop) + " " + str(ts_data.values)
-
- if drop > 0:
- res.append(ts_data.values[:-drop])
- else:
- res.append(ts_data.values)
-
- return res
-
- def agg_data(matr):
- arr = sum(matr, [])
- min_len = min(map(len, arr))
- res = []
- for idx in range(min_len):
- res.append(sum(dt[idx] for dt in arr))
- return res
-
- pinfo.raw_lat = map(prepare, self.lat.per_vm())
- num_th = sum(map(len, pinfo.raw_lat))
- lat_avg = [val / num_th for val in agg_data(pinfo.raw_lat)]
- pinfo.lat_avg = data_property(lat_avg).average / 1000 # us to ms
-
- pinfo.lat_50, pinfo.lat_95 = self.get_lat_perc_50_95_multy()
- pinfo.lat = pinfo.lat_50
-
- pinfo.raw_bw = map(prepare, self.bw.per_vm())
- pinfo.raw_iops = map(prepare, self.iops.per_vm())
-
- if self.iops_sys is not None:
- pinfo.raw_iops_sys = map(prepare, self.iops_sys.per_vm())
- pinfo.iops_sys = data_property(agg_data(pinfo.raw_iops_sys))
- else:
- pinfo.raw_iops_sys = None
- pinfo.iops_sys = None
-
- fparams = self.get_params_from_fio_report()
- fio_report_bw = sum(fparams['flt_bw'])
- fio_report_iops = sum(fparams['flt_iops'])
-
- agg_bw = agg_data(pinfo.raw_bw)
- agg_iops = agg_data(pinfo.raw_iops)
-
- log_bw_avg = average(agg_bw)
- log_iops_avg = average(agg_iops)
-
- # update values to match average from fio report
- coef_iops = fio_report_iops / float(log_iops_avg)
- coef_bw = fio_report_bw / float(log_bw_avg)
-
- bw_log = data_property([val * coef_bw for val in agg_bw])
- iops_log = data_property([val * coef_iops for val in agg_iops])
-
- bw_report = data_property([fio_report_bw])
- iops_report = data_property([fio_report_iops])
-
- # When IOPS/BW per thread is too low
- # data from logs is rounded to match
- iops_per_th = sum(sum(pinfo.raw_iops, []), [])
- if average(iops_per_th) > 10:
- pinfo.iops = iops_log
- pinfo.iops2 = iops_report
- else:
- pinfo.iops = iops_report
- pinfo.iops2 = iops_log
-
- bw_per_th = sum(sum(pinfo.raw_bw, []), [])
- if average(bw_per_th) > 10:
- pinfo.bw = bw_log
- pinfo.bw2 = bw_report
- else:
- pinfo.bw = bw_report
- pinfo.bw2 = bw_log
-
- self._pinfo = pinfo
-
- return pinfo
-
-
-class IOPerfTest(PerfTest):
- tcp_conn_timeout = 30
- max_pig_timeout = 5
+class IOPerfTest(ThreadedTest):
soft_runcycle = 5 * 60
retry_time = 30
+ configs_dir = os.path.dirname(__file__) # type: str
- zero_md5_hash = hashlib.md5()
- zero_md5_hash.update(b"\x00" * 1024)
- zero_md5 = zero_md5_hash.hexdigest()
-
- def __init__(self, config):
- PerfTest.__init__(self, config)
+ def __init__(self, *args, **kwargs) -> None:
+ super().__init__(*args, **kwargs)
get = self.config.params.get
- do_get = self.config.params.__getitem__
- self.config_fname = do_get('cfg')
+ self.load_profile_name = self.config.params['load'] # type: str
+ self.name = "io." + self.load_profile_name
- if '/' not in self.config_fname and '.' not in self.config_fname:
- cfgs_dir = os.path.dirname(__file__)
- self.config_fname = os.path.join(cfgs_dir,
- self.config_fname + '.cfg')
-
- self.alive_check_interval = get('alive_check_interval')
- self.use_system_fio = get('use_system_fio', False)
-
- if get('prefill_files') is not None:
- logger.warning("prefill_files option is depricated. Use force_prefill instead")
-
- self.force_prefill = get('force_prefill', False)
- self.config_params = get('params', {}).copy()
-
- self.io_py_remote = self.join_remote("agent.py")
- self.results_file = self.join_remote("results.json")
- self.pid_file = self.join_remote("pid")
- self.task_file = self.join_remote("task.cfg")
- self.sh_file = self.join_remote("cmd.sh")
- self.err_out_file = self.join_remote("fio_err_out")
- self.io_log_file = self.join_remote("io_log.txt")
- self.exit_code_file = self.join_remote("exit_code")
-
- self.max_latency = get("max_lat", None)
- self.min_bw_per_thread = get("min_bw", None)
-
- self.use_sudo = get("use_sudo", True)
-
- self.raw_cfg = open(self.config_fname).read()
- self.fio_configs = None
-
- @classmethod
- def load(cls, suite_name: str, folder: str) -> IOTestResults:
- res = []
- for fname in os.listdir(folder):
- if re.match("\d+_params.yaml$", fname):
- num = int(fname.split('_')[0])
- res.append(load_test_results(folder, num))
- return IOTestResults(suite_name, res, folder)
-
- def cleanup(self):
- # delete_file(conn, self.io_py_remote)
- # Need to remove tempo files, used for testing
- pass
-
- # size is megabytes
- def check_prefill_required(self, node: INode, fname: str, size: int, num_blocks: Optional[int]=16) -> bool:
- try:
- fstats = node.stat_file(fname)
- if stat.S_ISREG(fstats.st_mode) and fstats.st_size < size * 1024 ** 2:
- return True
- except EnvironmentError:
- return True
-
- cmd = 'python -c "' + \
- "import sys;" + \
- "fd = open('{0}', 'rb');" + \
- "fd.seek({1});" + \
- "data = fd.read(1024); " + \
- "sys.stdout.write(data + ' ' * ( 1024 - len(data)))\" | md5sum"
-
- if self.use_sudo:
- cmd = "sudo " + cmd
-
- bsize = size * (1024 ** 2)
- offsets = [random.randrange(bsize - 1024) for _ in range(num_blocks)]
- offsets.append(bsize - 1024)
- offsets.append(0)
-
- for offset in offsets:
- data = node.run(cmd.format(fname, offset), nolog=True)
-
- md = ""
- for line in data.split("\n"):
- if "unable to resolve" not in line:
- md = line.split()[0].strip()
- break
-
- if len(md) != 32:
- logger.error("File data check is failed - " + data)
- return True
-
- if self.zero_md5 == md:
- return True
-
- return False
-
- def prefill_test_files(self, node: INode, files: List[str], force:bool=False) -> None:
- if self.use_system_fio:
- cmd_templ = "fio "
+ if os.path.isfile(self.load_profile_name):
+ self.load_profile_path = os.path.join(self.configs_dir, self.load_profile_name+ '.cfg') # type: str
else:
- cmd_templ = "{0}/fio ".format(self.config.remote_dir)
+ self.load_profile_path = self.load_profile_name
- if self.use_sudo:
- cmd_templ = "sudo " + cmd_templ
+ self.load_profile = open(self.load_profile_path, 'rt').read() # type: str
- cmd_templ += "--name=xxx --filename={0} --direct=1" + \
- " --bs=4m --size={1}m --rw=write"
-
- ssize = 0
-
- if force:
- logger.info("File prefilling is forced")
-
- ddtime = 0
- for fname, curr_sz in files.items():
- if not force:
- if not self.check_prefill_required(node, fname, curr_sz):
- logger.debug("prefill is skipped")
- continue
-
- logger.info("Prefilling file {0}".format(fname))
- cmd = cmd_templ.format(fname, curr_sz)
- ssize += curr_sz
-
- stime = time.time()
- node.run(cmd, timeout=curr_sz)
- ddtime += time.time() - stime
-
- if ddtime > 1.0:
- fill_bw = int(ssize / ddtime)
- mess = "Initiall fio fill bw is {0} MiBps for this vm"
- logger.info(mess.format(fill_bw))
-
- def install_utils(self, node: INode) -> None:
- need_install = []
- packs = [('screen', 'screen')]
- os_info = get_os(node)
+ self.use_system_fio = get('use_system_fio', False) # type: bool
if self.use_system_fio:
- packs.append(('fio', 'fio'))
+ self.fio_path = "fio" # type: str
else:
- packs.append(('bzip2', 'bzip2'))
+ self.fio_path = os.path.join(self.config.remote_dir, "fio")
- for bin_name, package in packs:
- if bin_name is None:
- need_install.append(package)
- continue
+ self.force_prefill = get('force_prefill', False) # type: bool
- try:
- node.run('which ' + bin_name, nolog=True)
- except OSError:
- need_install.append(package)
-
- if len(need_install) != 0:
- if 'redhat' == os_info.distro:
- cmd = "sudo yum -y install " + " ".join(need_install)
- else:
- cmd = "sudo apt-get -y install " + " ".join(need_install)
-
- try:
- node.run(cmd)
- except OSError as err:
- raise OSError("Can't install - {}".format(" ".join(need_install))) from err
-
- if not self.use_system_fio:
- fio_dir = os.path.dirname(os.path.dirname(wally.__file__))
- fio_dir = os.path.join(os.getcwd(), fio_dir)
- fio_dir = os.path.join(fio_dir, 'fio_binaries')
- fname = 'fio_{0.release}_{0.arch}.bz2'.format(os_info)
- fio_path = os.path.join(fio_dir, fname)
-
- if not os.path.exists(fio_path):
- raise RuntimeError("No prebuild fio binary available for {0}".format(os_info))
-
- bz_dest = self.join_remote('fio.bz2')
- node.copy_file(fio_path, bz_dest)
- node.run("bzip2 --decompress {}" + bz_dest, nolog=True)
- node.run("chmod a+x " + self.join_remote("fio"), nolog=True)
-
- def pre_run(self) -> None:
- if 'FILESIZE' not in self.config_params:
+ if 'FILESIZE' not in self.config.params:
raise NotImplementedError("File size detection is not implemented")
- self.fio_configs = fio_cfg_compile(self.raw_cfg,
- self.config_fname,
- self.config_params)
- self.fio_configs = list(self.fio_configs)
+ # self.max_latency = get("max_lat") # type: Optional[int]
+ # self.min_bw_per_thread = get("min_bw") # type: Optional[int]
- files = {}
+ self.use_sudo = get("use_sudo", True) # type: bool
+
+ self.fio_configs = list(fio_cfg_compile(self.load_profile,
+ self.load_profile_path,
+ cast(FioParams, self.config.params)))
+
+ if len(self.fio_configs) == 0:
+ logger.exception("Empty fio config provided")
+ raise StopTestError("Empty fio config provided")
+
+ self.iterations_configs = self.fio_configs # type: ignore
+ self.files_sizes = self.get_file_sizes()
+
+ self.exec_folder = self.config.remote_dir
+ self.fio_path = "" if self.use_system_fio else self.exec_folder
+
+ def get_file_sizes(self) -> Dict[str, int]:
+ files_sizes = {} # type: Dict[str, int]
+
for section in self.fio_configs:
sz = ssize2b(section.vals['size'])
- msz = sz / (1024 ** 2)
-
- if sz % (1024 ** 2) != 0:
- msz += 1
-
- fname = section.vals['filename']
+ msz = sz // (1024 ** 2) + (1 if sz % (1024 ** 2) != 0 else 0)
+ fname = section.vals['filename'] # type: str
# if already has other test with the same file name
# take largest size
- files[fname] = max(files.get(fname, 0), msz)
+ files_sizes[fname] = max(files_sizes.get(fname, 0), msz)
- with ThreadPoolExecutor(len(self.config.nodes)) as pool:
- fc = functools.partial(self.pre_run_th,
- files=files,
- force=self.force_prefill)
- list(pool.map(fc, self.config.nodes))
+ return files_sizes
- def pre_run_th(self, node: INode, files: List[str], force_prefil: Optional[bool]=False) -> None:
+ def config_node(self, node: IRPCNode) -> None:
try:
- cmd = 'mkdir -p "{0}"'.format(self.config.remote_dir)
- if self.use_sudo:
- cmd = "sudo " + cmd
- cmd += " ; sudo chown {0} {1}".format(node.get_user(),
- self.config.remote_dir)
- node.run(cmd, nolog=True)
-
- assert self.config.remote_dir != "" and self.config.remote_dir != "/"
- node.run("rm -rf {}/*".format(self.config.remote_dir), nolog=True)
-
+ node.conn.rmdir(self.config.remote_dir, recursive=True, ignore_missing=True)
+ node.conn.mkdir(self.config.remote_dir)
except Exception as exc:
- msg = "Failed to create folder {} on remote {}."
- msg = msg.format(self.config.remote_dir, node, exc)
+ msg = "Failed to create folder {} on remote {}.".format(self.config.remote_dir, node, exc)
logger.exception(msg)
raise StopTestError(msg) from exc
self.install_utils(node)
- self.prefill_test_files(node, files, force_prefil)
+ logger.info("Prefilling test files with random data")
+ fill_bw = node.conn.prefill_test_files(self.files_sizes, force=self.force_prefill, fio_path=self.fio_path)
+ if fill_bw is not None:
+ logger.info("Initial fio fill bw is {} MiBps for {}".format(fill_bw, node.info.node_id()))
- def show_expected_execution_time(self) -> None:
- if len(self.fio_configs) > 1:
- # +10% - is a rough estimation for additional operations
- # like sftp, etc
- exec_time = int(sum(map(execution_time, self.fio_configs)) * 1.1)
- exec_time_s = sec_to_str(exec_time)
- now_dt = datetime.datetime.now()
- end_dt = now_dt + datetime.timedelta(0, exec_time)
- msg = "Entire test should takes aroud: {0} and finished at {1}"
- logger.info(msg.format(exec_time_s,
- end_dt.strftime("%H:%M:%S")))
-
- def run(self) -> IOTestResults:
- logger.debug("Run preparation")
- self.pre_run()
- self.show_expected_execution_time()
- num_nodes = len(self.config.nodes)
-
- tname = os.path.basename(self.config_fname)
- if tname.endswith('.cfg'):
- tname = tname[:-4]
-
- barrier = Barrier(num_nodes)
- results = []
-
- # set of Operation_Mode_BlockSize str's
- # which should not be tested anymore, as
- # they already too slow with previous thread count
- lat_bw_limit_reached = set()
-
- with ThreadPoolExecutor(num_nodes) as pool:
- for pos, fio_cfg in enumerate(self.fio_configs):
- test_descr = get_test_summary(fio_cfg.vals, noqd=True)
- if test_descr in lat_bw_limit_reached:
- continue
-
- logger.info("Will run {} test".format(fio_cfg.name))
- templ = "Test should takes about {}. Should finish at {}, will wait at most till {}"
- exec_time = execution_time(fio_cfg)
- exec_time_str = sec_to_str(exec_time)
- timeout = int(exec_time + max(300, exec_time))
-
- now_dt = datetime.datetime.now()
- end_dt = now_dt + datetime.timedelta(0, exec_time)
- wait_till = now_dt + datetime.timedelta(0, timeout)
-
- logger.info(templ.format(exec_time_str,
- end_dt.strftime("%H:%M:%S"),
- wait_till.strftime("%H:%M:%S")))
-
- run_test_func = functools.partial(self.do_run,
- barrier=barrier,
- fio_cfg=fio_cfg,
- pos=pos)
-
- max_retr = 3
- for idx in range(max_retr):
- if 0 != idx:
- logger.info("Sleeping %ss and retrying", self.retry_time)
- time.sleep(self.retry_time)
-
- try:
- intervals = list(pool.map(run_test_func, self.config.nodes))
- if None not in intervals:
- break
- except (EnvironmentError, SSHException) as exc:
- if max_retr - 1 == idx:
- raise StopTestError("Fio failed") from exc
- logger.exception("During fio run")
-
- fname = "{}_task.fio".format(pos)
- with open(os.path.join(self.config.log_directory, fname), "w") as fd:
- fd.write(str(fio_cfg))
-
- params = {'vm_count': num_nodes}
- params['name'] = fio_cfg.name
- params['vals'] = dict(fio_cfg.vals.items())
- params['intervals'] = intervals
- params['nodes'] = [node.get_conn_id() for node in self.config.nodes]
-
- fname = "{}_params.yaml".format(pos)
- with open(os.path.join(self.config.log_directory, fname), "w") as fd:
- fd.write(dumps(params))
-
- res = load_test_results(self.config.log_directory, pos)
- results.append(res)
-
- if self.max_latency is not None:
- lat_50, _ = res.get_lat_perc_50_95_multy()
-
- # conver us to ms
- if self.max_latency < lat_50:
- logger.info(("Will skip all subsequent tests of {} " +
- "due to lat/bw limits").format(fio_cfg.name))
- lat_bw_limit_reached.add(test_descr)
-
- test_res = res.get_params_from_fio_report()
- if self.min_bw_per_thread is not None:
- if self.min_bw_per_thread > average(test_res['bw']):
- lat_bw_limit_reached.add(test_descr)
-
- return IOTestResults(self.config.params['cfg'],
- results, self.config.log_directory)
-
- def do_run(self, node: INode, barrier: Barrier, fio_cfg, pos: int, nolog: bool=False):
- exec_folder = self.config.remote_dir
-
+ def install_utils(self, node: IRPCNode) -> None:
if self.use_system_fio:
- fio_path = ""
- else:
- if not exec_folder.endswith("/"):
- fio_path = exec_folder + "/"
- else:
- fio_path = exec_folder
+ node.conn.install('fio', binary='fio')
- exec_time = execution_time(fio_cfg)
- barrier.wait()
- run_data = node.rpc.fio.run_fio(self.use_sudo,
- fio_path,
- exec_folder,
- str(fio_cfg),
+ if not self.use_system_fio:
+ os_info = get_os(node)
+ fio_dir = os.path.dirname(os.path.dirname(wally.__file__)) # type: str
+ fio_dir = os.path.join(os.getcwd(), fio_dir)
+ fio_dir = os.path.join(fio_dir, 'fio_binaries')
+ fname = 'fio_{0.release}_{0.arch}.bz2'.format(os_info)
+ fio_path = os.path.join(fio_dir, fname) # type: str
+
+ if not os.path.exists(fio_path):
+ raise RuntimeError("No prebuild fio binary available for {0}".format(os_info))
+
+ bz_dest = self.join_remote('fio.bz2') # type: str
+ node.copy_file(fio_path, bz_dest)
+ node.run("bzip2 --decompress {}" + bz_dest)
+ node.run("chmod a+x " + self.join_remote("fio"))
+
+ def get_expected_runtime(self, iteration_info: IterationConfig) -> int:
+ return execution_time(cast(FioJobSection, iteration_info))
+
+ def do_test(self, node: IRPCNode, iter_config: IterationConfig) -> RunTestRes:
+ exec_time = execution_time(cast(FioJobSection, iter_config))
+ raw_res = node.conn.fio.run_fio(self.fio_path,
+ self.exec_folder,
+ str(cast(FioJobSection, iter_config)),
exec_time + max(300, exec_time))
- return parse_fio_result(run_data)
+ # TODO(koder): fix next error
+ raise NotImplementedError("Need to extract time from test result")
+ return raw_res, (0, 0)
- @classmethod
- def prepare_data(cls, results) -> List[Dict[str, Any]]:
- """create a table with io performance report for console"""
-
- def key_func(data: FioRunResult) -> Tuple[str, str, str, str, int]:
- tpl = data.summary_tpl()
- return (data.name,
- tpl.oper,
- tpl.mode,
- ssize2b(tpl.bsize),
- int(tpl.th_count) * int(tpl.vm_count))
- res = []
-
- for item in sorted(results, key=key_func):
- test_dinfo = item.disk_perf_info()
- testnodes_count = len(item.config.nodes)
-
- iops, _ = test_dinfo.iops.rounded_average_conf()
-
- if test_dinfo.iops_sys is not None:
- iops_sys, iops_sys_conf = test_dinfo.iops_sys.rounded_average_conf()
- _, iops_sys_dev = test_dinfo.iops_sys.rounded_average_dev()
- iops_sys_per_vm = round_3_digit(iops_sys / testnodes_count)
- iops_sys = round_3_digit(iops_sys)
- else:
- iops_sys = None
- iops_sys_per_vm = None
- iops_sys_dev = None
- iops_sys_conf = None
-
- bw, bw_conf = test_dinfo.bw.rounded_average_conf()
- _, bw_dev = test_dinfo.bw.rounded_average_dev()
- conf_perc = int(round(bw_conf * 100 / bw))
- dev_perc = int(round(bw_dev * 100 / bw))
-
- lat_50 = round_3_digit(int(test_dinfo.lat_50))
- lat_95 = round_3_digit(int(test_dinfo.lat_95))
- lat_avg = round_3_digit(int(test_dinfo.lat_avg))
-
- iops_per_vm = round_3_digit(iops / testnodes_count)
- bw_per_vm = round_3_digit(bw / testnodes_count)
-
- iops = round_3_digit(iops)
- bw = round_3_digit(bw)
-
- summ = "{0.oper}{0.mode} {0.bsize:>4} {0.th_count:>3}th {0.vm_count:>2}vm".format(item.summary_tpl())
-
- res.append({"name": key_func(item)[0],
- "key": key_func(item)[:4],
- "summ": summ,
- "iops": int(iops),
- "bw": int(bw),
- "conf": str(conf_perc),
- "dev": str(dev_perc),
- "iops_per_vm": int(iops_per_vm),
- "bw_per_vm": int(bw_per_vm),
- "lat_50": lat_50,
- "lat_95": lat_95,
- "lat_avg": lat_avg,
-
- "iops_sys": iops_sys,
- "iops_sys_per_vm": iops_sys_per_vm,
- "sys_conf": iops_sys_conf,
- "sys_dev": iops_sys_dev})
-
- return res
-
- Field = collections.namedtuple("Field", ("header", "attr", "allign", "size"))
- fiels_and_header = [
- Field("Name", "name", "l", 7),
- Field("Description", "summ", "l", 19),
- Field("IOPS\ncum", "iops", "r", 3),
- # Field("IOPS_sys\ncum", "iops_sys", "r", 3),
- Field("KiBps\ncum", "bw", "r", 6),
- Field("Cnf %\n95%", "conf", "r", 3),
- Field("Dev%", "dev", "r", 3),
- Field("iops\n/vm", "iops_per_vm", "r", 3),
- Field("KiBps\n/vm", "bw_per_vm", "r", 6),
- Field("lat ms\nmedian", "lat_50", "r", 3),
- Field("lat ms\n95%", "lat_95", "r", 3),
- Field("lat\navg", "lat_avg", "r", 3),
- ]
-
- fiels_and_header_dct = dict((item.attr, item) for item in fiels_and_header)
-
- @classmethod
- def format_for_console(cls, results) -> str:
- """create a table with io performance report for console"""
-
- tab = texttable.Texttable(max_width=120)
- tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
- tab.set_cols_align([f.allign for f in cls.fiels_and_header])
- sep = ["-" * f.size for f in cls.fiels_and_header]
- tab.header([f.header for f in cls.fiels_and_header])
- prev_k = None
- for item in cls.prepare_data(results):
- if prev_k is not None:
- if prev_k != item["key"]:
- tab.add_row(sep)
-
- prev_k = item["key"]
- tab.add_row([item[f.attr] for f in cls.fiels_and_header])
-
- return tab.draw()
-
- @classmethod
- def format_diff_for_console(cls, list_of_results: List[Any]) -> str:
- """create a table with io performance report for console"""
-
- tab = texttable.Texttable(max_width=200)
- tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
-
- header = [
- cls.fiels_and_header_dct["name"].header,
- cls.fiels_and_header_dct["summ"].header,
- ]
- allign = ["l", "l"]
-
- header.append("IOPS ~ Cnf% ~ Dev%")
- allign.extend(["r"] * len(list_of_results))
- header.extend(
- "IOPS_{0} %".format(i + 2) for i in range(len(list_of_results[1:]))
- )
-
- header.append("BW")
- allign.extend(["r"] * len(list_of_results))
- header.extend(
- "BW_{0} %".format(i + 2) for i in range(len(list_of_results[1:]))
- )
-
- header.append("LAT")
- allign.extend(["r"] * len(list_of_results))
- header.extend(
- "LAT_{0}".format(i + 2) for i in range(len(list_of_results[1:]))
- )
-
- tab.header(header)
- sep = ["-" * 3] * len(header)
- processed_results = map(cls.prepare_data, list_of_results)
-
- key2results = []
- for res in processed_results:
- key2results.append(dict(
- ((item["name"], item["summ"]), item) for item in res
- ))
-
- prev_k = None
- iops_frmt = "{0[iops]} ~ {0[conf]:>2} ~ {0[dev]:>2}"
- for item in processed_results[0]:
- if prev_k is not None:
- if prev_k != item["key"]:
- tab.add_row(sep)
-
- prev_k = item["key"]
-
- key = (item['name'], item['summ'])
- line = list(key)
- base = key2results[0][key]
-
- line.append(iops_frmt.format(base))
-
- for test_results in key2results[1:]:
- val = test_results.get(key)
- if val is None:
- line.append("-")
- elif base['iops'] == 0:
- line.append("Nan")
- else:
- prc_val = {'dev': val['dev'], 'conf': val['conf']}
- prc_val['iops'] = int(100 * val['iops'] / base['iops'])
- line.append(iops_frmt.format(prc_val))
-
- line.append(base['bw'])
-
- for test_results in key2results[1:]:
- val = test_results.get(key)
- if val is None:
- line.append("-")
- elif base['bw'] == 0:
- line.append("Nan")
- else:
- line.append(int(100 * val['bw'] / base['bw']))
-
- for test_results in key2results:
- val = test_results.get(key)
- if val is None:
- line.append("-")
- else:
- line.append("{0[lat_50]} - {0[lat_95]}".format(val))
-
- tab.add_row(line)
-
- tab.set_cols_align(allign)
- return tab.draw()
diff --git a/wally/suits/io/fio_task_parser.py b/wally/suits/io/fio_task_parser.py
index 1b6ba21..1bdbb15 100644
--- a/wally/suits/io/fio_task_parser.py
+++ b/wally/suits/io/fio_task_parser.py
@@ -7,10 +7,11 @@
import os.path
import argparse
import itertools
-from typing import Optional, Iterator, Union, Dict, Iterable, List, TypeVar, Callable, Tuple
-from collections import OrderedDict, namedtuple
+from typing import Optional, Iterator, Union, Dict, Iterable, List, TypeVar, Callable, Tuple, NamedTuple, Any
+from collections import OrderedDict
+from ..itest import IterationConfig
from ...utils import sec_to_str, ssize2b
@@ -19,15 +20,27 @@
INCLUDE = 2
-Var = namedtuple('Var', ('name',))
-CfgLine = namedtuple('CfgLine', ('fname', 'lineno', 'oline',
- 'tp', 'name', 'val'))
+Var = NamedTuple('Var', [('name', str)])
+CfgLine = NamedTuple('CfgLine',
+ [('fname', str),
+ ('lineno', int),
+ ('oline', str),
+ ('tp', int),
+ ('name', str),
+ ('val', Any)])
+
+TestSumm = NamedTuple("TestSumm",
+ [("oper", str),
+ ("mode", str),
+ ("bsize", int),
+ ("iodepth", int),
+ ("vm_count", int)])
-class FioJobSection:
- def __init__(self, name: str):
+class FioJobSection(IterationConfig):
+ def __init__(self, name: str) -> None:
self.name = name
- self.vals = OrderedDict()
+ self.vals = OrderedDict() # type: Dict[str, Any]
def copy(self) -> 'FioJobSection':
return copy.deepcopy(self)
@@ -40,7 +53,7 @@
def is_free(self) -> bool:
return len(list(self.required_vars())) == 0
- def __str__(self):
+ def __str__(self) -> str:
res = "[{0}]\n".format(self.name)
for name, val in self.vals.items():
@@ -55,13 +68,13 @@
class ParseError(ValueError):
- def __init__(self, msg: str, fname: str, lineno: int, line_cont:Optional[str] =""):
+ def __init__(self, msg: str, fname: str, lineno: int, line_cont:Optional[str] = "") -> None:
ValueError.__init__(self, msg)
self.file_name = fname
self.lineno = lineno
self.line_cont = line_cont
- def __str__(self):
+ def __str__(self) -> str:
msg = "In {0}:{1} ({2}) : {3}"
return msg.format(self.file_name,
self.lineno,
@@ -70,10 +83,10 @@
def is_name(name: str) -> bool:
- return re.match("[a-zA-Z_][a-zA-Z_0-9]*", name)
+ return re.match("[a-zA-Z_][a-zA-Z_0-9]*", name) is not None
-def parse_value(val: str) -> Union[int, str, Dict, Var]:
+def parse_value(val: str) -> Union[int, str, float, List, Var]:
try:
return int(val)
except ValueError:
@@ -88,12 +101,13 @@
assert val.endswith("%}")
content = val[2:-2]
vals = list(i.strip() for i in content.split(','))
- return map(parse_value, vals)
+ return list(map(parse_value, vals))
if val.startswith('{'):
assert val.endswith("}")
assert is_name(val[1:-1])
return Var(val[1:-1])
+
return val
@@ -133,15 +147,15 @@
def fio_config_parse(lexer_iter: Iterable[CfgLine]) -> Iterator[FioJobSection]:
in_globals = False
curr_section = None
- glob_vals = OrderedDict()
+ glob_vals = OrderedDict() # type: Dict[str, Any]
sections_count = 0
- lexed_lines = list(lexer_iter)
+ lexed_lines = list(lexer_iter) # type: List[CfgLine]
one_more = True
includes = {}
while one_more:
- new_lines = []
+ new_lines = [] # type: List[CfgLine]
one_more = False
for line in lexed_lines:
fname, lineno, oline, tp, name, val = line
@@ -205,7 +219,7 @@
def process_cycles(sec: FioJobSection) -> Iterator[FioJobSection]:
- cycles = OrderedDict()
+ cycles = OrderedDict() # type: Dict[str, Any]
for name, val in sec.vals.items():
if isinstance(val, list) and name.upper() != name:
@@ -214,11 +228,11 @@
if len(cycles) == 0:
yield sec
else:
- # qd should changes faster
- numjobs = cycles.pop('qd', None)
- items = cycles.items()
+ # iodepth should changes faster
+ numjobs = cycles.pop('iodepth', None)
+ items = list(cycles.items())
- if len(items) > 0:
+ if items:
keys, vals = zip(*items)
keys = list(keys)
vals = list(vals)
@@ -228,7 +242,7 @@
if numjobs is not None:
vals.append(numjobs)
- keys.append('qd')
+ keys.append('iodepth')
for combination in itertools.product(*vals):
new_sec = sec.copy()
@@ -236,12 +250,12 @@
yield new_sec
-FIO_PARAM_VAL = Union[str, Var]
-FIO_PARAMS = Dict[str, FIO_PARAM_VAL]
+FioParamsVal = Union[str, Var]
+FioParams = Dict[str, FioParamsVal]
-def apply_params(sec: FioJobSection, params: FIO_PARAMS) -> FioJobSection:
- processed_vals = OrderedDict()
+def apply_params(sec: FioJobSection, params: FioParams) -> FioJobSection:
+ processed_vals = OrderedDict() # type: Dict[str, Any]
processed_vals.update(params)
for name, val in sec.vals.items():
if name in params:
@@ -329,9 +343,6 @@
return 'a'
-TestSumm = namedtuple("TestSumm", ("oper", "mode", "bsize", "iodepth", "vm_count"))
-
-
def get_test_summary_tuple(sec: FioJobSection, vm_count: int = None) -> TestSumm:
if isinstance(sec, dict):
vals = sec
@@ -355,12 +366,12 @@
vm_count)
-def get_test_summary(sec: FioJobSection, vm_count: int = None, noqd: bool = False) -> str:
+def get_test_summary(sec: FioJobSection, vm_count: int = None, noiodepth: bool = False) -> str:
tpl = get_test_summary_tuple(sec, vm_count)
res = "{0.oper}{0.mode}{0.bsize}".format(tpl)
- if not noqd:
- res += "qd{}".format(tpl.qd)
+ if not noiodepth:
+ res += "qd{}".format(tpl.iodepth)
if tpl.vm_count is not None:
res += "vm{}".format(tpl.vm_count)
@@ -387,7 +398,7 @@
yield res
-def fio_cfg_compile(source: str, fname: str, test_params: FIO_PARAMS) -> Iterator[FioJobSection]:
+def fio_cfg_compile(source: str, fname: str, test_params: FioParams) -> Iterator[FioJobSection]:
it = parse_all_in_1(source, fname)
it = (apply_params(sec, test_params) for sec in it)
it = flatmap(process_cycles, it)
diff --git a/wally/suits/io/rpc_plugin.py b/wally/suits/io/rpc_plugin.py
index ca3f0f3..8e2e09f 100644
--- a/wally/suits/io/rpc_plugin.py
+++ b/wally/suits/io/rpc_plugin.py
@@ -1,7 +1,20 @@
+import os
+import time
+import stat
+import random
+import subprocess
+
+
def rpc_run_fio(cfg):
fio_cmd_templ = "cd {exec_folder}; {fio_path}fio --output-format=json " + \
"--output={out_file} --alloc-size=262144 {job_file}"
+ result = {
+ "name": [float],
+ "lat_name": [[float]]
+ }
+
+ return result
# fnames_before = node.run("ls -1 " + exec_folder, nolog=True)
#
# timeout = int(exec_time + max(300, exec_time))
@@ -11,5 +24,66 @@
# fnames_after = node.run("ls -1 " + exec_folder, nolog=True)
#
-def parse_fio_result(data):
- pass
+def rpc_check_file_prefilled(path, used_size_mb):
+ used_size = used_size_mb * 1024 ** 2
+ blocks_to_check = 16
+
+ try:
+ fstats = os.stat(path)
+ if stat.S_ISREG(fstats.st_mode) and fstats.st_size < used_size:
+ return True
+ except EnvironmentError:
+ return True
+
+ offsets = [random.randrange(used_size - 1024) for _ in range(blocks_to_check)]
+ offsets.append(used_size - 1024)
+ offsets.append(0)
+
+ with open(path, 'rb') as fd:
+ for offset in offsets:
+ fd.seek(offset)
+ if b"\x00" * 1024 == fd.read(1024):
+ return True
+
+ return False
+
+
+def rpc_prefill_test_files(files, force=False, fio_path='fio'):
+ cmd_templ = "{0} --name=xxx --filename={1} --direct=1" + \
+ " --bs=4m --size={2}m --rw=write"
+
+ ssize = 0
+ ddtime = 0.0
+
+ for fname, curr_sz in files.items():
+ if not force:
+ if not rpc_check_file_prefilled(fname, curr_sz):
+ continue
+
+ cmd = cmd_templ.format(fio_path, fname, curr_sz)
+ ssize += curr_sz
+
+ stime = time.time()
+ subprocess.check_call(cmd)
+ ddtime += time.time() - stime
+
+ if ddtime > 1.0:
+ return int(ssize / ddtime)
+
+ return None
+
+
+def load_fio_log_file(fname):
+ with open(fname) as fd:
+ it = [ln.split(',')[:2] for ln in fd]
+
+ return [(float(off) / 1000, # convert us to ms
+ float(val.strip()) + 0.5) # add 0.5 to compemsate average value
+ # as fio trimm all values in log to integer
+ for off, val in it]
+
+
+
+
+
+
diff --git a/wally/suits/io/rpc_plugin.pyi b/wally/suits/io/rpc_plugin.pyi
new file mode 100644
index 0000000..1155007
--- /dev/null
+++ b/wally/suits/io/rpc_plugin.pyi
@@ -0,0 +1,8 @@
+from typing import Any, Optional, Dict, List
+
+def rpc_run_fio(cfg: Dict[str, str]) -> Any: ...
+def rpc_check_file_prefilled(path: str, used_size_mb: int) -> bool: ...
+def rpc_prefill_test_files(files: Dict[str, int], force: bool = False, fio_path: str = 'fio') -> Optional[int]: ...
+
+
+def load_fio_log_file(fname: str) -> List[float]: ...
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index 7004b8e..6d1eeee 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -2,38 +2,43 @@
import time
import logging
import os.path
-import functools
-from typing import Dict, Any, List, Tuple
+import datetime
+from typing import Dict, Any, List, Optional, Tuple, cast
from concurrent.futures import ThreadPoolExecutor
-from ..utils import Barrier, StopTestError
-from ..statistic import data_property
-from ..inode import INode
+from ..utils import Barrier, StopTestError, sec_to_str
+from ..node_interfaces import IRPCNode
from ..storage import Storage
+from ..result_classes import RawTestResults
+
+import agent
logger = logging.getLogger("wally")
-class TestConfig:
+__doc__ = "Contains base classes for performance tests"
+
+
+class TestInputConfig:
"""
this class describe test input configuration
- test_type:str - test type name
- params:{str:Any} - parameters from yaml file for this test
- test_uuid:str - UUID to be used to create filenames and Co
- log_directory:str - local directory to store results
- nodes:[Node] - node to run tests on
- remote_dir:str - directory on nodes to be used for local files
+ test_type - test type name
+ params - parameters from yaml file for this test
+ test_uuid - UUID to be used to create file names & Co
+ log_directory - local directory to store results
+ nodes - nodes to run tests on
+ remote_dir - directory on nodes to be used for local files
"""
def __init__(self,
test_type: str,
params: Dict[str, Any],
run_uuid: str,
- nodes: List[INode],
+ nodes: List[IRPCNode],
storage: Storage,
- remote_dir: str):
+ remote_dir: str) -> None:
self.test_type = test_type
self.params = params
self.run_uuid = run_uuid
@@ -42,150 +47,21 @@
self.remote_dir = remote_dir
-class TestResults:
- """
- this class describe test results
-
- config:TestConfig - test config object
- params:dict - parameters from yaml file for this test
- results:{str:MeasurementMesh} - test results object
- raw_result:Any - opaque object to store raw results
- run_interval:(float, float) - test tun time, used for sensors
- """
- def __init__(self,
- config: TestConfig,
- results: Dict[str, Any],
- raw_result: Any,
- run_interval: Tuple[float, float]):
- self.config = config
- self.params = config.params
- self.results = results
- self.raw_result = raw_result
- self.run_interval = run_interval
-
- def __str__(self):
- res = "{0}({1}):\n results:\n".format(
- self.__class__.__name__,
- self.summary())
-
- for name, val in self.results.items():
- res += " {0}={1}\n".format(name, val)
-
- res += " params:\n"
-
- for name, val in self.params.items():
- res += " {0}={1}\n".format(name, val)
-
- return res
-
- @abc.abstractmethod
- def summary(self):
- pass
-
- @abc.abstractmethod
- def get_yamable(self):
- pass
-
-
-# class MeasurementMatrix:
-# """
-# data:[[MeasurementResult]] - VM_COUNT x TH_COUNT matrix of MeasurementResult
-# """
-# def __init__(self, data, connections_ids):
-# self.data = data
-# self.connections_ids = connections_ids
-#
-# def per_vm(self):
-# return self.data
-#
-# def per_th(self):
-# return sum(self.data, [])
-
-
-class MeasurementResults:
- def stat(self):
- return data_property(self.data)
-
- def __str__(self):
- return 'TS([' + ", ".join(map(str, self.data)) + '])'
-
-
-class SimpleVals(MeasurementResults):
- """
- data:[float] - list of values
- """
- def __init__(self, data):
- self.data = data
-
-
-class TimeSeriesValue(MeasurementResults):
- """
- data:[(float, float, float)] - list of (start_time, lenght, average_value_for_interval)
- odata: original values
- """
- def __init__(self, data: List[Tuple[float, float, float]]):
- assert len(data) > 0
- self.odata = data[:]
- self.data = []
-
- cstart = 0
- for nstart, nval in data:
- self.data.append((cstart, nstart - cstart, nval))
- cstart = nstart
-
- @property
- def values(self) -> List[float]:
- return [val[2] for val in self.data]
-
- def average_interval(self) -> float:
- return float(sum([val[1] for val in self.data])) / len(self.data)
-
- def skip(self, seconds) -> 'TimeSeriesValue':
- nres = []
- for start, ln, val in self.data:
- nstart = start + ln - seconds
- if nstart > 0:
- nres.append([nstart, val])
- return self.__class__(nres)
-
- def derived(self, tdelta) -> 'TimeSeriesValue':
- end = self.data[-1][0] + self.data[-1][1]
- tdelta = float(tdelta)
-
- ln = end / tdelta
-
- if ln - int(ln) > 0:
- ln += 1
-
- res = [[tdelta * i, 0.0] for i in range(int(ln))]
-
- for start, lenght, val in self.data:
- start_idx = int(start / tdelta)
- end_idx = int((start + lenght) / tdelta)
-
- for idx in range(start_idx, end_idx + 1):
- rstart = tdelta * idx
- rend = tdelta * (idx + 1)
-
- intersection_ln = min(rend, start + lenght) - max(start, rstart)
- if intersection_ln > 0:
- try:
- res[idx][1] += val * intersection_ln / tdelta
- except IndexError:
- raise
-
- return self.__class__(res)
+class IterationConfig:
+ name = None # type: str
class PerfTest:
- """
- Very base class for tests
- config:TestConfig - test configuration
- stop_requested:bool - stop for test requested
- """
- def __init__(self, config):
+ """Base class for all tests"""
+ name = None # type: str
+ max_retry = 3
+ retry_time = 30
+
+ def __init__(self, config: TestInputConfig) -> None:
self.config = config
self.stop_requested = False
+ self.nodes = self.config.nodes # type: List[IRPCNode]
+ self.sorted_nodes_ids = sorted(node.info.node_id() for node in self.nodes)
def request_stop(self) -> None:
self.stop_requested = True
@@ -193,13 +69,8 @@
def join_remote(self, path: str) -> str:
return os.path.join(self.config.remote_dir, path)
- @classmethod
@abc.abstractmethod
- def load(cls, path: str):
- pass
-
- @abc.abstractmethod
- def run(self):
+ def run(self, storage: Storage) -> None:
pass
@abc.abstractmethod
@@ -207,69 +78,182 @@
pass
-class ThreadedTest(PerfTest):
- """
- Base class for tests, which spawn separated thread for each node
- """
+RunTestRes = Tuple[RawTestResults, Tuple[int, int]]
- def run(self) -> List[TestResults]:
- barrier = Barrier(len(self.config.nodes))
- th_test_func = functools.partial(self.th_test_func, barrier)
- with ThreadPoolExecutor(len(self.config.nodes)) as pool:
- return list(pool.map(th_test_func, self.config.nodes))
+class ThreadedTest(PerfTest, metaclass=abc.ABCMeta):
+ """Base class for tests, which spawn separated thread for each node"""
+
+ # max allowed time difference between starts and stops of run of the same test on different test nodes
+ # used_max_diff = max((min_run_time * max_rel_time_diff), max_time_diff)
+ max_time_diff = 5
+ max_rel_time_diff = 0.05
+
+ def __init__(self, config: TestInputConfig) -> None:
+ PerfTest.__init__(self, config)
+ self.iterations_configs = [None] # type: List[Optional[IterationConfig]]
@abc.abstractmethod
- def do_test(self, node: INode) -> TestResults:
+ def get_expected_runtime(self, iter_cfg: IterationConfig) -> Optional[int]:
pass
- def th_test_func(self, barrier: Barrier, node: INode) -> TestResults:
- test_name = self.__class__.__name__
- logger.debug("Starting {} test on {}".format(test_name , node))
- logger.debug("Run test preparation on {}".format(node))
- self.pre_run(node)
+ def get_not_done_stages(self, storage: Storage) -> Dict[int, IterationConfig]:
+ start_run_id = max(int(name) for _, name in storage.list('result')) + 1
+ not_in_storage = {} # type: Dict[int, IterationConfig]
+ for run_id, iteration_config in enumerate(self.iterations_configs, start_run_id):
+ info_path = "result/{}/info".format(run_id)
+ if info_path in storage:
+ info = cast(Dict[str, Any], storage[info_path]) # type: Dict[str, Any]
- # wait till all thread became ready
- barrier.wait()
+ assert isinstance(info, dict), \
+ "Broken storage at path {}. Expect test info dict, obtain {!r}".format(info_path, info)
- logger.debug("Run test on {}".format(node))
- try:
- return self.do_test(node)
- except Exception as exc:
- msg = "In test {} for {}".format(test_name, node)
- logger.exception(msg)
- raise StopTestError(msg) from exc
+ info = info.copy()
+ del info['begin_time']
+ del info['end_time']
- def pre_run(self, node: INode) -> None:
+ iter_name = "Unnamed" if iteration_config is None else iteration_config.name
+ expected_config = {
+ 'name': self.name,
+ 'iteration_name': iter_name,
+ 'iteration_config': iteration_config,
+ 'params': self.config.params,
+ 'nodes': self.sorted_nodes_ids
+ }
+
+ assert info == expected_config, \
+ "Test info at path {} is not equal to expected config." + \
+ "Maybe configuration was changed before test was restarted. " + \
+ "Current cfg is {!r}, expected cfg is {!r}".format(info_path, info, expected_config)
+
+ logger.info("Test iteration {} found in storage and will be skipped".format(iter_name))
+ else:
+ not_in_storage[run_id] = iteration_config
+ return not_in_storage
+
+ def run(self, storage: Storage) -> None:
+ not_in_storage = self.get_not_done_stages(storage)
+
+ if not not_in_storage:
+ logger.info("All test iteration in storage already. Skip test")
+ return
+
+ logger.debug("Run test {} on nodes {}.".format(self.name, ",".join(self.sorted_nodes_ids)))
+
+ barrier = Barrier(len(self.nodes))
+
+ logger.debug("Run preparation")
+
+ with ThreadPoolExecutor(len(self.nodes)) as pool:
+ list(pool.map(self.config_node, self.nodes))
+
+ # +5% - is a rough estimation for additional operations
+ run_times = [self.get_expected_runtime(iteration_config) for iteration_config in not_in_storage.values()]
+ if None not in run_times:
+ expected_run_time = int(sum(run_times) * 1.05)
+ exec_time_s = sec_to_str(expected_run_time)
+ now_dt = datetime.datetime.now()
+ end_dt = now_dt + datetime.timedelta(0, expected_run_time)
+ logger.info("Entire test should takes aroud: {} and finished at {:%H:%M:%S}"
+ .format(exec_time_s, end_dt))
+
+ for run_id, iteration_config in sorted(not_in_storage.items()):
+ iter_name = "Unnamed" if iteration_config is None else iteration_config.name
+ logger.info("Run test iteration {} ".format(iter_name))
+
+ results = [] # type: List[RunTestRes]
+ for idx in range(self.max_retry):
+ barrier.wait()
+ try:
+ futures = [pool.submit(self.do_test, node, iteration_config) for node in self.nodes]
+ results = [fut.result() for fut in futures]
+ except (EnvironmentError, agent.RPCError) as exc:
+ if self.max_retry - 1 == idx:
+ raise StopTestError("Fio failed") from exc
+ logger.exception("During fio run")
+ else:
+ if all(results):
+ break
+
+ logger.info("Sleeping %ss and retrying", self.retry_time)
+ time.sleep(self.retry_time)
+
+ start_times = [] # type: List[int]
+ stop_times = [] # type: List[int]
+
+ for (result, (t_start, t_stop)), node in zip(results, self.config.nodes):
+ for metrics_name, data in result.items():
+ path = "result/{}/measurement/{}/{}".format(run_id, node.info.node_id(), metrics_name)
+ storage[path] = data # type: ignore
+ start_times.append(t_start)
+ stop_times.append(t_stop)
+
+ min_start_time = min(start_times)
+ max_start_time = max(start_times)
+ min_stop_time = min(stop_times)
+ max_stop_time = max(stop_times)
+
+ max_allowed_time_diff = int((min_stop_time - max_start_time) * self.max_rel_time_diff)
+ max_allowed_time_diff = max(max_allowed_time_diff, self.max_time_diff)
+
+ if min_start_time + self.max_time_diff < max_allowed_time_diff:
+ logger.warning("Too large difference in {}:{} start time - {}. Max recommended difference is {}"
+ .format(self.name, iter_name, max_start_time - min_start_time, self.max_time_diff))
+
+ if min_stop_time + self.max_time_diff < max_allowed_time_diff:
+ logger.warning("Too large difference in {}:{} stop time - {}. Max recommended difference is {}"
+ .format(self.name, iter_name, max_start_time - min_start_time, self.max_time_diff))
+
+ test_config = {
+ 'name': self.name,
+ 'iteration_name': iter_name,
+ 'iteration_config': iteration_config,
+ 'params': self.config.params,
+ 'nodes': self.sorted_nodes_ids,
+ 'begin_time': min_start_time,
+ 'end_time': max_stop_time
+ }
+
+ storage["result/{}/info".format(run_id)] = test_config # type: ignore
+
+ @abc.abstractmethod
+ def config_node(self, node: IRPCNode) -> None:
+ pass
+
+ @abc.abstractmethod
+ def do_test(self, node: IRPCNode, iter_config: IterationConfig) -> RunTestRes:
pass
-class TwoScriptTest(ThreadedTest):
- def __init__(self, *dt, **mp):
+class TwoScriptTest(ThreadedTest, metaclass=abc.ABCMeta):
+ def __init__(self, *dt, **mp) -> None:
ThreadedTest.__init__(self, *dt, **mp)
- self.remote_dir = '/tmp'
self.prerun_script = self.config.params['prerun_script']
self.run_script = self.config.params['run_script']
-
self.prerun_tout = self.config.params.get('prerun_tout', 3600)
self.run_tout = self.config.params.get('run_tout', 3600)
+ self.iterations_configs = [None]
- def get_remote_for_script(self, script: str) -> str:
- return os.path.join(self.remote_dir, os.path.basename(script))
+ def get_expected_runtime(self, iter_cfg: IterationConfig) -> Optional[int]:
+ return None
- def pre_run(self, node: INode) -> None:
- copy_paths(node.connection,
- {self.run_script: self.get_remote_for_script(self.run_script),
- self.prerun_script: self.get_remote_for_script(self.prerun_script)})
+ def config_node(self, node: IRPCNode) -> None:
+ node.copy_file(self.run_script, self.join_remote(self.run_script))
+ node.copy_file(self.prerun_script, self.join_remote(self.prerun_script))
- cmd = self.get_remote_for_script(self.prerun_script)
+ cmd = self.join_remote(self.prerun_script)
cmd += ' ' + self.config.params.get('prerun_opts', '')
node.run(cmd, timeout=self.prerun_tout)
- def do_test(self, node: INode) -> TestResults:
- cmd = self.get_remote_for_script(self.run_script)
+ def do_test(self, node: IRPCNode, iter_config: IterationConfig) -> RunTestRes:
+ cmd = self.join_remote(self.run_script)
cmd += ' ' + self.config.params.get('run_opts', '')
t1 = time.time()
- res = node.run(cmd, timeout=self.run_tout)
+ res = self.parse_results(node.run(cmd, timeout=self.run_tout))
t2 = time.time()
- return TestResults(self.config, None, res, (t1, t2))
+ return res, (int(t1), int(t2))
+
+ @abc.abstractmethod
+ def parse_results(self, data: str) -> RawTestResults:
+ pass
+