a lot of chenges
diff --git a/wally/suits/io/__init__.py b/wally/suits/io/__init__.py
index e69de29..4828850 100644
--- a/wally/suits/io/__init__.py
+++ b/wally/suits/io/__init__.py
@@ -0,0 +1,330 @@
+import time
+import json
+import os.path
+import logging
+import datetime
+
+from wally.utils import (ssize2b, open_for_append_or_create,
+ sec_to_str, StopTestError)
+
+from wally.ssh_utils import save_to_remote, read_from_remote, BGSSHTask
+
+from ..itest import IPerfTest, TestResults
+from .formatter import format_results_for_console
+from .fio_task_parser import (execution_time, fio_cfg_compile,
+ get_test_summary, FioJobSection)
+
+
+logger = logging.getLogger("wally")
+
+
+class IOTestResults(TestResults):
+ def summary(self):
+ return get_test_summary(self.config) + "vm" + str(self.vm_count)
+
+ def get_yamable(self):
+ return {
+ 'type': "fio_test",
+ 'params': self.params,
+ 'config': (self.config.name, self.config.vals),
+ 'results': self.results,
+ 'raw_result': self.raw_result,
+ 'run_interval': self.run_interval,
+ 'vm_count': self.vm_count
+ }
+
+ @classmethod
+ def from_yaml(cls, data):
+ name, vals = data['config']
+ sec = FioJobSection(name)
+ sec.vals = vals
+
+ return cls(sec, data['params'], data['results'],
+ data['raw_result'], data['run_interval'],
+ data['vm_count'])
+
+
+def get_slice_parts_offset(test_slice, real_inteval):
+ calc_exec_time = sum(map(execution_time, test_slice))
+ coef = (real_inteval[1] - real_inteval[0]) / calc_exec_time
+ curr_offset = real_inteval[0]
+ for section in test_slice:
+ slen = execution_time(section) * coef
+ yield (curr_offset, curr_offset + slen)
+ curr_offset += slen
+
+
+class IOPerfTest(IPerfTest):
+ tcp_conn_timeout = 30
+ max_pig_timeout = 5
+ soft_runcycle = 5 * 60
+
+ def __init__(self, *dt, **mp):
+ IPerfTest.__init__(self, *dt, **mp)
+ self.config_fname = self.options['cfg']
+
+ if '/' not in self.config_fname and '.' not in self.config_fname:
+ cfgs_dir = os.path.dirname(__file__)
+ self.config_fname = os.path.join(cfgs_dir,
+ self.config_fname + '.cfg')
+
+ self.alive_check_interval = self.options.get('alive_check_interval')
+
+ self.config_params = self.options.get('params', {}).copy()
+ self.tool = self.options.get('tool', 'fio')
+
+ raw_res = os.path.join(self.log_directory, "raw_results.txt")
+ self.fio_raw_results_file = open_for_append_or_create(raw_res)
+
+ self.io_py_remote = self.join_remote("agent.py")
+ self.results_file = self.join_remote("results.json")
+ self.pid_file = self.join_remote("pid")
+ self.task_file = self.join_remote("task.cfg")
+ self.use_sudo = self.options.get("use_sudo", True)
+ self.test_logging = self.options.get("test_logging", False)
+ self.raw_cfg = open(self.config_fname).read()
+ self.fio_configs = fio_cfg_compile(self.raw_cfg,
+ self.config_fname,
+ self.config_params,
+ split_on_names=self.test_logging)
+ self.fio_configs = list(self.fio_configs)
+
+ cmd_log = os.path.join(self.log_directory, "task_compiled.cfg")
+ fio_command_file = open_for_append_or_create(cmd_log)
+ splitter = "\n\n" + "-" * 60 + "\n\n"
+ fio_command_file.write(splitter.join(map(str, self.fio_configs)))
+
+ def __str__(self):
+ return "{0}({1})".format(self.__class__.__name__,
+ self.node.get_conn_id())
+
+ @classmethod
+ def load(cls, data):
+ return IOTestResults.from_yaml(data)
+
+ def cleanup(self):
+ # delete_file(conn, self.io_py_remote)
+ # Need to remove tempo files, used for testing
+ pass
+
+ def prefill_test_files(self):
+ files = {}
+ for cfg_slice in self.fio_configs:
+ for section in cfg_slice:
+ sz = ssize2b(section.vals['size'])
+ msz = sz / (1024 ** 2)
+
+ if sz % (1024 ** 2) != 0:
+ msz += 1
+
+ fname = section.vals['filename']
+
+ # if already has other test with the same file name
+ # take largest size
+ files[fname] = max(files.get(fname, 0), msz)
+
+ cmd_templ = "dd oflag=direct " + \
+ "if=/dev/zero of={0} bs={1} count={2}"
+
+ if self.use_sudo:
+ cmd_templ = "sudo " + cmd_templ
+
+ ssize = 0
+ stime = time.time()
+
+ for fname, curr_sz in files.items():
+ cmd = cmd_templ.format(fname, 1024 ** 2, curr_sz)
+ ssize += curr_sz
+ self.run_over_ssh(cmd, timeout=curr_sz)
+
+ ddtime = time.time() - stime
+ if ddtime > 1E-3:
+ fill_bw = int(ssize / ddtime)
+ mess = "Initiall dd fill bw is {0} MiBps for this vm"
+ logger.info(mess.format(fill_bw))
+ self.coordinate(('init_bw', fill_bw))
+
+ def install_utils(self, max_retry=3, timeout=5):
+ need_install = []
+ for bin_name, package in (('fio', 'fio'), ('screen', 'screen')):
+ try:
+ self.run_over_ssh('which ' + bin_name, nolog=True)
+ except OSError:
+ need_install.append(package)
+
+ if len(need_install) == 0:
+ return
+
+ cmd = "sudo apt-get -y install " + " ".join(need_install)
+
+ for i in range(max_retry):
+ try:
+ self.run_over_ssh(cmd)
+ break
+ except OSError as err:
+ time.sleep(timeout)
+ else:
+ raise OSError("Can't install - " + str(err))
+
+ def pre_run(self):
+ try:
+ cmd = 'mkdir -p "{0}"'.format(self.remote_dir)
+ if self.use_sudo:
+ cmd = "sudo " + cmd
+ cmd += " ; sudo chown {0} {1}".format(self.node.get_user(),
+ self.remote_dir)
+
+ self.run_over_ssh(cmd)
+ except Exception as exc:
+ msg = "Failed to create folder {0} on remote {1}. Error: {2!s}"
+ msg = msg.format(self.remote_dir, self.node.get_conn_id(), exc)
+ logger.exception(msg)
+ raise StopTestError(msg, exc)
+
+ self.install_utils()
+
+ if self.options.get('prefill_files', True):
+ self.prefill_test_files()
+ elif self.is_primary:
+ logger.warning("Prefilling of test files is disabled")
+
+ def run(self, barrier):
+ try:
+ if len(self.fio_configs) > 1 and self.is_primary:
+
+ exec_time = 0
+ for test_slice in self.fio_configs:
+ exec_time += sum(map(execution_time, test_slice))
+
+ # +10% - is a rough estimation for additional operations
+ # like sftp, etc
+ exec_time = int(exec_time * 1.1)
+
+ exec_time_s = sec_to_str(exec_time)
+ now_dt = datetime.datetime.now()
+ end_dt = now_dt + datetime.timedelta(0, exec_time)
+ msg = "Entire test should takes aroud: {0} and finished at {1}"
+ logger.info(msg.format(exec_time_s,
+ end_dt.strftime("%H:%M:%S")))
+
+ for pos, fio_cfg_slice in enumerate(self.fio_configs):
+ fio_cfg_slice = list(fio_cfg_slice)
+ names = [i.name for i in fio_cfg_slice]
+ msgs = []
+ already_processed = set()
+ for name in names:
+ if name not in already_processed:
+ already_processed.add(name)
+
+ if 1 == names.count(name):
+ msgs.append(name)
+ else:
+ frmt = "{0} * {1}"
+ msgs.append(frmt.format(name,
+ names.count(name)))
+
+ if self.is_primary:
+ logger.info("Will run tests: " + ", ".join(msgs))
+
+ nolog = (pos != 0) or not self.is_primary
+ out_err, interval = self.do_run(barrier, fio_cfg_slice,
+ nolog=nolog)
+
+ try:
+ full_raw_res = json.loads(out_err)
+
+ res = {"bw": [], "iops": [], "lat": [],
+ "clat": [], "slat": []}
+
+ for raw_result in full_raw_res['jobs']:
+ load_data = raw_result['mixed']
+
+ res["bw"].append(load_data["bw"])
+ res["iops"].append(load_data["iops"])
+ res["lat"].append(load_data["lat"]["mean"])
+ res["clat"].append(load_data["clat"]["mean"])
+ res["slat"].append(load_data["slat"]["mean"])
+
+ first = fio_cfg_slice[0]
+ p1 = first.vals.copy()
+ p1.pop('ramp_time', 0)
+
+ for nxt in fio_cfg_slice[1:]:
+ assert nxt.name == first.name
+ p2 = nxt.vals
+ p2.pop('_ramp_time', 0)
+
+ assert p1 == p2
+
+ tres = IOTestResults(first,
+ self.config_params, res,
+ full_raw_res, interval,
+ vm_count=self.total_nodes_count)
+ self.on_result_cb(tres)
+ except (OSError, StopTestError):
+ raise
+ except Exception as exc:
+ msg_templ = "Error during postprocessing results: {0!s}"
+ raise RuntimeError(msg_templ.format(exc))
+
+ finally:
+ barrier.exit()
+
+ def do_run(self, barrier, cfg_slice, nolog=False):
+ # return open("/tmp/lit-sunshine/io/results.json").read(), (1, 2)
+ conn_id = self.node.get_conn_id()
+
+ cmd_templ = "fio --output-format=json --output={1} " + \
+ "--alloc-size=262144 {0}"
+
+ if self.options.get("use_sudo", True):
+ cmd_templ = "sudo " + cmd_templ
+
+ task_fc = "\n\n".join(map(str, cfg_slice))
+ with self.node.connection.open_sftp() as sftp:
+ save_to_remote(sftp, self.task_file, task_fc)
+
+ cmd = cmd_templ.format(self.task_file, self.results_file)
+
+ exec_time = sum(map(execution_time, cfg_slice))
+ exec_time_str = sec_to_str(exec_time)
+
+ timeout = int(exec_time + max(300, exec_time))
+ soft_tout = exec_time
+ barrier.wait()
+
+ if self.is_primary:
+ templ = "Test should takes about {0}." + \
+ " Should finish at {1}," + \
+ " will wait at most till {2}"
+ now_dt = datetime.datetime.now()
+ end_dt = now_dt + datetime.timedelta(0, exec_time)
+ wait_till = now_dt + datetime.timedelta(0, timeout)
+
+ logger.info(templ.format(exec_time_str,
+ end_dt.strftime("%H:%M:%S"),
+ wait_till.strftime("%H:%M:%S")))
+
+ task = BGSSHTask(self.node, self.options.get("use_sudo", True))
+ begin = time.time()
+ task.start(cmd)
+ task.wait(soft_tout, timeout)
+ end = time.time()
+
+ if not nolog:
+ logger.debug("Test on node {0} is finished".format(conn_id))
+
+ with self.node.connection.open_sftp() as sftp:
+ return read_from_remote(sftp, self.results_file), (begin, end)
+
+ @classmethod
+ def merge_results(cls, results):
+ merged = results[0]
+ for block in results[1:]:
+ assert block["__meta__"] == merged["__meta__"]
+ merged['res'].extend(block['res'])
+ return merged
+
+ @classmethod
+ def format_for_console(cls, data, dinfo):
+ return format_results_for_console(dinfo)
diff --git a/wally/suits/io/agent.py b/wally/suits/io/agent.py
deleted file mode 100644
index 3c3e436..0000000
--- a/wally/suits/io/agent.py
+++ /dev/null
@@ -1,672 +0,0 @@
-import os
-import sys
-import time
-import json
-import copy
-import select
-import pprint
-import os.path
-import argparse
-import traceback
-import subprocess
-import itertools
-from collections import OrderedDict
-
-
-SECTION = 0
-SETTING = 1
-
-
-class FioJobSection(object):
- def __init__(self, name):
- self.name = name
- self.vals = OrderedDict()
- self.format_params = {}
-
- def copy(self):
- return copy.deepcopy(self)
-
-
-def to_bytes(sz):
- sz = sz.lower()
- try:
- return int(sz)
- except ValueError:
- if sz[-1] == 'm':
- return (1024 ** 2) * int(sz[:-1])
- if sz[-1] == 'k':
- return 1024 * int(sz[:-1])
- if sz[-1] == 'g':
- return (1024 ** 3) * int(sz[:-1])
- raise
-
-
-def fio_config_lexer(fio_cfg):
- for lineno, line in enumerate(fio_cfg.split("\n")):
- try:
- line = line.strip()
-
- if line.startswith("#") or line.startswith(";"):
- continue
-
- if line == "":
- continue
-
- if line.startswith('['):
- assert line.endswith(']'), "name should ends with ]"
- yield lineno, SECTION, line[1:-1], None
- elif '=' in line:
- opt_name, opt_val = line.split('=', 1)
- yield lineno, SETTING, opt_name.strip(), opt_val.strip()
- else:
- yield lineno, SETTING, line, '1'
- except Exception as exc:
- pref = "During parsing line number {0}\n{1!s}".format(lineno, exc)
- raise ValueError(pref)
-
-
-def fio_config_parse(lexer_iter, format_params):
- orig_format_params_keys = set(format_params)
- format_params = format_params.copy()
- in_defaults = False
- curr_section = None
- defaults = OrderedDict()
-
- for lineno, tp, name, val in lexer_iter:
- if tp == SECTION:
- if curr_section is not None:
- yield curr_section
-
- if name == 'defaults':
- in_defaults = True
- curr_section = None
- else:
- in_defaults = False
- curr_section = FioJobSection(name)
- curr_section.format_params = format_params.copy()
- curr_section.vals = defaults.copy()
- else:
- assert tp == SETTING
- if name == name.upper():
- msg = "Param not in default section in line " + str(lineno)
- assert in_defaults, msg
- if name not in orig_format_params_keys:
- # don't make parse_value for PARAMS
- # they would be parsed later
- # or this would breakes arrays
- format_params[name] = val
- elif in_defaults:
- defaults[name] = parse_value(val)
- else:
- msg = "data outside section, line " + str(lineno)
- assert curr_section is not None, msg
- curr_section.vals[name] = parse_value(val)
-
- if curr_section is not None:
- yield curr_section
-
-
-def parse_value(val):
- try:
- return int(val)
- except ValueError:
- pass
-
- try:
- return float(val)
- except ValueError:
- pass
-
- if val.startswith('{%'):
- assert val.endswith("%}")
- content = val[2:-2]
- vals = list(i.strip() for i in content.split(','))
- return map(parse_value, vals)
- return val
-
-
-def process_repeats(sec_iter):
-
- for sec in sec_iter:
- if '*' in sec.name:
- msg = "Only one '*' allowed in section name"
- assert sec.name.count('*') == 1, msg
-
- name, count = sec.name.split("*")
- sec.name = name.strip()
- count = count.strip()
-
- try:
- count = int(count.strip().format(**sec.format_params))
- except KeyError:
- raise ValueError("No parameter {0} given".format(count[1:-1]))
- except ValueError:
- msg = "Parameter {0} nas non-int value {1!r}"
- raise ValueError(msg.format(count[1:-1],
- count.format(**sec.format_params)))
-
- yield sec.copy()
-
- if 'ramp_time' in sec.vals:
- sec = sec.copy()
- sec.vals['_ramp_time'] = sec.vals.pop('ramp_time')
-
- for _ in range(count - 1):
- yield sec.copy()
- else:
- yield sec
-
-
-def process_cycles(sec_iter):
- # insert parametrized cycles
- sec_iter = try_format_params_into_section(sec_iter)
-
- for sec in sec_iter:
-
- cycles_var_names = []
- cycles_var_values = []
-
- for name, val in sec.vals.items():
- if isinstance(val, (list, tuple)):
- cycles_var_names.append(name)
- cycles_var_values.append(val)
-
- if len(cycles_var_names) == 0:
- yield sec
- else:
- for combination in itertools.product(*cycles_var_values):
- new_sec = sec.copy()
- new_sec.vals.update(zip(cycles_var_names, combination))
- yield new_sec
-
-
-def try_format_params_into_section(sec_iter):
- for sec in sec_iter:
- params = sec.format_params
- for name, val in sec.vals.items():
- if isinstance(val, basestring):
- try:
- sec.vals[name] = parse_value(val.format(**params))
- except:
- pass
-
- yield sec
-
-
-def format_params_into_section_finall(sec_iter, counter=[0]):
- group_report_err_msg = "Group reporting should be set if numjobs != 1"
-
- for sec in sec_iter:
-
- num_jobs = int(sec.vals.get('numjobs', '1'))
- if num_jobs != 1:
- assert 'group_reporting' in sec.vals, group_report_err_msg
-
- assert sec.vals.get('unified_rw_reporting', '1') in (1, '1')
- sec.vals['unified_rw_reporting'] = '1'
-
- params = sec.format_params.copy()
-
- fsize = to_bytes(sec.vals['size'])
- params['PER_TH_OFFSET'] = fsize // num_jobs
-
- for name, val in sec.vals.items():
- if isinstance(val, basestring):
- sec.vals[name] = parse_value(val.format(**params))
- else:
- assert isinstance(val, (int, float))
-
- params['UNIQ'] = 'UN{0}'.format(counter[0])
- params['COUNTER'] = str(counter[0])
- counter[0] += 1
- params['TEST_SUMM'] = get_test_summary(sec.vals,
- params.get('VM_COUNT', 1))
- params.update(sec.vals)
- sec.name = sec.name.format(**params)
-
- yield sec
-
-
-def fio_config_to_str(sec_iter):
- res = ""
-
- for pos, sec in enumerate(sec_iter):
- if pos != 0:
- res += "\n"
-
- res += "[{0}]\n".format(sec.name)
-
- for name, val in sec.vals.items():
- if name.startswith('_'):
- continue
- res += "{0}={1}\n".format(name, val)
-
- return res
-
-
-def get_test_sync_mode(config):
- try:
- return config['sync_mode']
- except KeyError:
- pass
-
- is_sync = str(config.get("sync", "0")) == "1"
- is_direct = str(config.get("direct", "0")) == "1"
-
- if is_sync and is_direct:
- return 'x'
- elif is_sync:
- return 's'
- elif is_direct:
- return 'd'
- else:
- return 'a'
-
-
-def get_test_summary(params, testnodes_count):
- rw = {"randread": "rr",
- "randwrite": "rw",
- "read": "sr",
- "write": "sw"}[params["rw"]]
-
- sync_mode = get_test_sync_mode(params)
- th_count = params.get('numjobs')
-
- if th_count is None:
- th_count = params.get('concurence', 1)
-
- return "{0}{1}{2}th{3}vm{4}".format(rw,
- sync_mode,
- params['blocksize'],
- th_count,
- testnodes_count)
-
-
-def calculate_execution_time(sec_iter):
- time = 0
- for sec in sec_iter:
- time += sec.vals.get('ramp_time', 0)
- time += sec.vals.get('runtime', 0)
- return time
-
-
-def slice_config(sec_iter, runcycle=None, max_jobs=1000,
- soft_runcycle=None, split_on_names=False):
- jcount = 0
- runtime = 0
- curr_slice = []
- prev_name = None
-
- for pos, sec in enumerate(sec_iter):
-
- if prev_name is not None:
- split_here = False
-
- if soft_runcycle is not None and prev_name != sec.name:
- split_here = (runtime > soft_runcycle)
-
- if split_on_names and prev_name != sec.name:
- split_here = True
-
- if split_here:
- yield curr_slice
- curr_slice = []
- runtime = 0
- jcount = 0
-
- prev_name = sec.name
-
- jc = sec.vals.get('numjobs', 1)
- msg = "numjobs should be integer, not {0!r}".format(jc)
- assert isinstance(jc, int), msg
-
- curr_task_time = calculate_execution_time([sec])
-
- if jc > max_jobs:
- err_templ = "Can't process job {0!r} - too large numjobs"
- raise ValueError(err_templ.format(sec.name))
-
- if runcycle is not None and len(curr_slice) != 0:
- rc_ok = curr_task_time + runtime <= runcycle
- else:
- rc_ok = True
-
- if jc + jcount <= max_jobs and rc_ok:
- runtime += curr_task_time
- jcount += jc
- curr_slice.append(sec)
- continue
-
- assert len(curr_slice) != 0
- yield curr_slice
-
- if '_ramp_time' in sec.vals:
- sec.vals['ramp_time'] = sec.vals.pop('_ramp_time')
- curr_task_time = calculate_execution_time([sec])
-
- runtime = curr_task_time
- jcount = jc
- curr_slice = [sec]
- prev_name = None
-
- if curr_slice != []:
- yield curr_slice
-
-
-def parse_all_in_1(source, test_params):
- lexer_it = fio_config_lexer(source)
- sec_it = fio_config_parse(lexer_it, test_params)
- sec_it = process_cycles(sec_it)
- sec_it = process_repeats(sec_it)
- return format_params_into_section_finall(sec_it)
-
-
-def parse_and_slice_all_in_1(source, test_params, **slice_params):
- sec_it = parse_all_in_1(source, test_params)
- return slice_config(sec_it, **slice_params)
-
-
-def compile_all_in_1(source, test_params, **slice_params):
- slices_it = parse_and_slice_all_in_1(source, test_params, **slice_params)
- for slices in slices_it:
- yield fio_config_to_str(slices)
-
-
-def do_run_fio(config_slice):
- benchmark_config = fio_config_to_str(config_slice)
- cmd = ["fio", "--output-format=json", "--alloc-size=262144", "-"]
- p = subprocess.Popen(cmd,
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
-
- start_time = time.time()
- # set timeout
- raw_out, raw_err = p.communicate(benchmark_config)
- end_time = time.time()
-
- if 0 != p.returncode:
- msg = "Fio failed with code: {0}\nOutput={1}"
- raise OSError(msg.format(p.returncode, raw_err))
-
- # HACK
- raw_out = "{" + raw_out.split('{', 1)[1]
-
- try:
- parsed_out = json.loads(raw_out)["jobs"]
- except KeyError:
- msg = "Can't parse fio output {0!r}: no 'jobs' found"
- raw_out = raw_out[:100]
- raise ValueError(msg.format(raw_out))
-
- except Exception as exc:
- msg = "Can't parse fio output: {0!r}\nError: {1!s}"
- raw_out = raw_out[:100]
- raise ValueError(msg.format(raw_out, exc))
-
- return zip(parsed_out, config_slice), (start_time, end_time)
-
-
-class FioResult(object):
- def __init__(self, name, params, run_interval, results):
- self.params = params.copy()
- self.name = name
- self.run_interval = run_interval
- self.results = results
-
- def json_obj(self):
- return self.__dict__
-
-
-def make_job_results(section, job_output, slice_timings):
- # merge by section.merge_id
-
- raw_result = job_output['mixed']
-
- res = {
- "bw": raw_result["bw"],
- "iops": raw_result["iops"],
- "lat": raw_result["lat"]["mean"],
- "clat": raw_result["clat"]["mean"],
- "slat": raw_result["slat"]["mean"]
- }
-
- vls = section.vals.copy()
-
- vls['sync_mode'] = get_test_sync_mode(vls)
- vls['concurence'] = vls.get('numjobs', 1)
-
- return FioResult(section.name, vls, slice_timings, res)
-
-
-def get_slice_parts_offset(test_slice, real_inteval):
- calc_exec_time = calculate_execution_time(test_slice)
- coef = (real_inteval[1] - real_inteval[0]) / calc_exec_time
- curr_offset = real_inteval[0]
- for section in test_slice:
- slen = calculate_execution_time([section]) * coef
- yield (curr_offset, curr_offset + slen)
- curr_offset += slen
-
-
-def run_fio(sliced_it, raw_results_func=None):
- sliced_list = list(sliced_it)
-
- curr_test_num = 0
- executed_tests = 0
- result = []
-
- for i, test_slice in enumerate(sliced_list):
- test_slice = list(test_slice)
-
- res_cfg_it, slice_timings = do_run_fio(test_slice)
- sec_intervals = get_slice_parts_offset(test_slice,
- slice_timings)
- res_cfg_it = enumerate(zip(res_cfg_it, sec_intervals),
- curr_test_num)
-
- section_names = []
- for curr_test_num, ((job_output, section), interval) in res_cfg_it:
- executed_tests += 1
- section_names.append(section.name)
-
- if raw_results_func is not None:
- raw_results_func(executed_tests,
- [job_output, section])
-
- msg = "{0} != {1}".format(section.name, job_output["jobname"])
- assert section.name == job_output["jobname"], msg
-
- result.append(make_job_results(section, job_output, interval))
-
- curr_test_num += 1
- msg_template = "Done {0} tests from {1}. ETA: {2}"
-
- rest = sliced_list[i:]
- time_eta = sum(map(calculate_execution_time, rest))
- test_left = sum(map(len, rest))
- print msg_template.format(curr_test_num,
- test_left,
- sec_to_str(time_eta))
-
- return result
-
-
-def run_benchmark(binary_tp, *argv, **kwargs):
- if 'fio' == binary_tp:
- return run_fio(*argv, **kwargs)
- raise ValueError("Unknown behcnmark {0}".format(binary_tp))
-
-
-def read_config(fd, timeout=10):
- job_cfg = ""
- etime = time.time() + timeout
- while True:
- wtime = etime - time.time()
- if wtime <= 0:
- raise IOError("No config provided")
-
- r, w, x = select.select([fd], [], [], wtime)
- if len(r) == 0:
- raise IOError("No config provided")
-
- char = fd.read(1)
- if '' == char:
- return job_cfg
-
- job_cfg += char
-
-
-def sec_to_str(seconds):
- h = seconds // 3600
- m = (seconds % 3600) // 60
- s = seconds % 60
- return "{0}:{1:02d}:{2:02d}".format(h, m, s)
-
-
-def parse_args(argv):
- parser = argparse.ArgumentParser(
- description="Run fio' and return result")
- parser.add_argument("--type", metavar="BINARY_TYPE",
- choices=['fio'], default='fio',
- help=argparse.SUPPRESS)
- parser.add_argument("--start-at", metavar="START_AT_UTC", type=int,
- help="Start execution at START_AT_UTC")
- parser.add_argument("--json", action="store_true", default=False,
- help="Json output format")
- parser.add_argument("-o", "--output", default='-', metavar="FILE_PATH",
- help="Store results to FILE_PATH")
- parser.add_argument("--estimate", action="store_true", default=False,
- help="Only estimate task execution time")
- parser.add_argument("--compile", action="store_true", default=False,
- help="Compile config file to fio config")
- parser.add_argument("--num-tests", action="store_true", default=False,
- help="Show total number of tests")
- parser.add_argument("--runcycle", type=int, default=None,
- metavar="MAX_CYCLE_SECONDS",
- help="Max cycle length in seconds")
- parser.add_argument("--show-raw-results", action='store_true',
- default=False, help="Output raw input and results")
- parser.add_argument("--params", nargs="*", metavar="PARAM=VAL",
- default=[],
- help="Provide set of pairs PARAM=VAL to" +
- "format into job description")
- parser.add_argument("-p", "--pid-file", metavar="FILE_TO_STORE_PID",
- default=None, help="Store pid to FILE_TO_STORE_PID " +
- "and remove this file on exit")
- parser.add_argument("jobfile")
- return parser.parse_args(argv)
-
-
-def main(argv):
- argv_obj = parse_args(argv)
-
- if argv_obj.jobfile == '-':
- job_cfg = read_config(sys.stdin)
- else:
- job_cfg = open(argv_obj.jobfile).read()
-
- if argv_obj.output == '-':
- out_fd = sys.stdout
- else:
- out_fd = open(argv_obj.output, "w")
-
- if argv_obj.pid_file is not None:
- with open(argv_obj.pid_file, "w") as fd:
- fd.write(str(os.getpid()))
-
- try:
- params = {}
- for param_val in argv_obj.params:
- assert '=' in param_val
- name, val = param_val.split("=", 1)
- params[name] = val
-
- slice_params = {
- 'runcycle': argv_obj.runcycle,
- }
-
- sliced_it = parse_and_slice_all_in_1(job_cfg, params, **slice_params)
-
- if argv_obj.estimate:
- it = map(calculate_execution_time, sliced_it)
- print sec_to_str(sum(it))
- return 0
-
- if argv_obj.num_tests or argv_obj.compile:
- if argv_obj.compile:
- for test_slice in sliced_it:
- out_fd.write(fio_config_to_str(test_slice))
- out_fd.write("\n#" + "-" * 70 + "\n\n")
-
- if argv_obj.num_tests:
- print len(list(sliced_it))
-
- return 0
-
- if argv_obj.start_at is not None:
- ctime = time.time()
- if argv_obj.start_at >= ctime:
- time.sleep(ctime - argv_obj.start_at)
-
- def raw_res_func(test_num, data):
- pref = "========= RAW_RESULTS({0}) =========\n".format(test_num)
- out_fd.write(pref)
- out_fd.write(json.dumps(data))
- out_fd.write("\n========= END OF RAW_RESULTS =========\n")
- out_fd.flush()
-
- rrfunc = raw_res_func if argv_obj.show_raw_results else None
-
- job_res = run_benchmark(argv_obj.type,
- sliced_it, rrfunc)
-
- res = {'__meta__': {'params': params,
- 'testnodes_count': int(params.get('VM_COUNT', 1))},
- 'res': [j.json_obj() for j in job_res]}
-
- oformat = 'json' if argv_obj.json else 'eval'
-
- msg = "========= RESULTS(format={0}) =========\n"
- out_fd.write(msg.format(oformat))
- if argv_obj.json:
- out_fd.write(json.dumps(res))
- else:
- out_fd.write(pprint.pformat(res) + "\n")
- out_fd.write("\n========= END OF RESULTS =========\n")
-
- return 0
- except:
- out_fd.write("============ ERROR =============\n")
- out_fd.write(traceback.format_exc() + "\n")
- out_fd.write("============ END OF ERROR =============\n")
- return 1
- finally:
- try:
- if out_fd is not sys.stdout:
- out_fd.flush()
- os.fsync(out_fd)
- out_fd.close()
- except Exception:
- traceback.print_exc()
-
- if argv_obj.pid_file is not None:
- if os.path.exists(argv_obj.pid_file):
- os.unlink(argv_obj.pid_file)
-
-
-def fake_main(x):
- import yaml
- time.sleep(60)
- out_fd = sys.stdout
- fname = "/tmp/perf_tests/metempirical_alisha/raw_results.yaml"
- res = yaml.load(open(fname).read())[0][1]
- out_fd.write("========= RESULTS(format=json) =========\n")
- out_fd.write(json.dumps(res))
- out_fd.write("\n========= END OF RESULTS =========\n")
- return 0
-
-
-if __name__ == '__main__':
- # exit(fake_main(sys.argv[1:]))
- exit(main(sys.argv[1:]))
diff --git a/wally/suits/io/ceph.cfg b/wally/suits/io/ceph.cfg
index f38b37c..26aa65f 100644
--- a/wally/suits/io/ceph.cfg
+++ b/wally/suits/io/ceph.cfg
@@ -1,28 +1,19 @@
-[defaults]
-wait_for_previous=1
-group_reporting=1
-time_based=1
-buffered=0
-iodepth=1
-softrandommap=1
-filename={FILENAME}
-NUM_ROUNDS=7
-thread=1
-
-# this is critical for correct results in multy-node run
-randrepeat=0
+[global]
+include defaults.cfg
NUMJOBS={% 1, 5, 10, 15, 40 %}
NUMJOBS_SHORT={% 1, 2, 3, 10 %}
+TEST_FILE_SIZE=100G
-size=100G
+size={TEST_FILE_SIZE}
ramp_time=15
runtime=60
+NUM_ROUNDS=7
# ---------------------------------------------------------------------
# check different thread count, sync mode. (latency, iops) = func(th_count)
# ---------------------------------------------------------------------
-[ceph_test_{TEST_SUMM} * {NUM_ROUNDS}]
+[ceph_test_{TEST_SUMM}]
blocksize=4k
rw=randwrite
sync=1
@@ -31,7 +22,7 @@
# ---------------------------------------------------------------------
# direct write
# ---------------------------------------------------------------------
-[ceph_test_{TEST_SUMM} * {NUM_ROUNDS}]
+[ceph_test_{TEST_SUMM}]
blocksize=4k
rw=randwrite
direct=1
@@ -41,7 +32,7 @@
# check different thread count, direct read mode. (latency, iops) = func(th_count)
# also check iops for randread
# ---------------------------------------------------------------------
-[ceph_test_{TEST_SUMM} * {NUM_ROUNDS}]
+[ceph_test_{TEST_SUMM}]
blocksize=4k
rw=randread
direct=1
@@ -51,7 +42,7 @@
# this is essentially sequential write/read operations
# we can't use sequential with numjobs > 1 due to caching and block merging
# ---------------------------------------------------------------------
-[ceph_test_{TEST_SUMM} * {NUM_ROUNDS}]
+[ceph_test_{TEST_SUMM}]
blocksize=16m
rw={% randread, randwrite %}
direct=1
diff --git a/wally/suits/io/check_distribution.cfg b/wally/suits/io/check_distribution.cfg
index e7cafd9..4746f37 100644
--- a/wally/suits/io/check_distribution.cfg
+++ b/wally/suits/io/check_distribution.cfg
@@ -1,19 +1,13 @@
-[defaults]
+[global]
+include defaults.cfg
NUM_ROUNDS=301
-# this is critical for correct results in multy-node run
-randrepeat=0
-
-[distrubution_test_{TEST_SUMM} * {NUM_ROUNDS}]
+[distrubution_test_{TEST_SUMM}]
blocksize=4k
rw=randwrite
direct=1
+
ramp_time=5
-buffered=0
-wait_for_previous
-filename={FILENAME}
-iodepth=1
-size=10G
-time_based
runtime=30
-group_reporting
+
+size=10G
diff --git a/wally/suits/io/check_linearity.cfg b/wally/suits/io/check_linearity.cfg
index 670e8b3..f7c37fb 100644
--- a/wally/suits/io/check_linearity.cfg
+++ b/wally/suits/io/check_linearity.cfg
@@ -1,33 +1,26 @@
-[defaults]
+[global]
+
+include defaults.cfg
NUM_ROUNDS=7
-# this is critical for correct results in multy-node run
-randrepeat=0
-
-ramp_time=5
-buffered=0
-wait_for_previous
-filename={FILENAME}
-iodepth=1
-size=10G
-time_based
+size={TEST_FILE_SIZE}
ramp_time=5
runtime=30
# ---------------------------------------------------------------------
# check read and write linearity. oper_time = func(size)
# ---------------------------------------------------------------------
-[linearity_test_{TEST_SUMM} * {NUM_ROUNDS}]
-blocksize={% 512, 1k, 4k, 8k, 16k, 32k, 64k, 128k, 256k, 512k, 1m, 2m, 4m %}
-rw={% randwrite, randread %}
-direct=1
+# [linearity_test_{TEST_SUMM}]
+# blocksize={BLOCK_SIZES}
+# rw={% randwrite, randread %}
+# direct=1
# ---------------------------------------------------------------------
# check sync write linearity. oper_time = func(size)
# check sync BW as well
# ---------------------------------------------------------------------
-[linearity_test_{TEST_SUMM} * {NUM_ROUNDS}]
-blocksize={% 512, 1k, 4k, 8k, 16k, 32k, 64k, 128k, 256k, 512k, 1m, 2m, 4m %}
+[linearity_test_{TEST_SUMM}]
+blocksize={BLOCK_SIZES}
rw=randwrite
sync=1
diff --git a/wally/suits/io/defaults.cfg b/wally/suits/io/defaults.cfg
new file mode 100644
index 0000000..51a8145
--- /dev/null
+++ b/wally/suits/io/defaults.cfg
@@ -0,0 +1,14 @@
+buffered=0
+group_reporting=1
+iodepth=1
+softrandommap=1
+thread=1
+time_based=1
+wait_for_previous=1
+
+# this is critical for correct results in multy-node run
+randrepeat=0
+
+filename={FILENAME}
+
+
diff --git a/wally/suits/io/fio_task_parser.py b/wally/suits/io/fio_task_parser.py
new file mode 100644
index 0000000..52c4bb3
--- /dev/null
+++ b/wally/suits/io/fio_task_parser.py
@@ -0,0 +1,458 @@
+import os
+import sys
+import copy
+import os.path
+import argparse
+import itertools
+from collections import OrderedDict, namedtuple
+
+
+from wally.utils import sec_to_str
+
+
+SECTION = 0
+SETTING = 1
+INCLUDE = 2
+
+
+Var = namedtuple('Var', ('name',))
+CfgLine = namedtuple('CfgLine', ('fname', 'lineno', 'oline',
+ 'tp', 'name', 'val'))
+
+
+class FioJobSection(object):
+ def __init__(self, name):
+ self.name = name
+ self.vals = OrderedDict()
+
+ def copy(self):
+ return copy.deepcopy(self)
+
+ def required_vars(self):
+ for name, val in self.vals.items():
+ if isinstance(val, Var):
+ yield name, val
+
+ def is_free(self):
+ return len(list(self.required_vars())) == 0
+
+ def __str__(self):
+ res = "[{0}]\n".format(self.name)
+
+ for name, val in self.vals.items():
+ if name.startswith('_') or name == name.upper():
+ continue
+ if isinstance(val, Var):
+ res += "{0}={{{1}}}\n".format(name, val.name)
+ else:
+ res += "{0}={1}\n".format(name, val)
+
+ return res
+
+
+def to_bytes(sz):
+ sz = sz.lower()
+ try:
+ return int(sz)
+ except ValueError:
+ if sz[-1] == 'm':
+ return (1024 ** 2) * int(sz[:-1])
+ if sz[-1] == 'k':
+ return 1024 * int(sz[:-1])
+ if sz[-1] == 'g':
+ return (1024 ** 3) * int(sz[:-1])
+ raise
+
+
+class ParseError(ValueError):
+ def __init__(self, msg, fname, lineno, line_cont=""):
+ ValueError.__init__(self, msg)
+ self.file_name = fname
+ self.lineno = lineno
+ self.line_cont = line_cont
+
+ def __str__(self):
+ msg = "In {0}:{1} ({2}) : {3}"
+ return msg.format(self.file_name,
+ self.lineno,
+ self.line_cont,
+ super(ParseError, self).__str__())
+
+
+def is_name(name):
+ if len(name) == 0:
+ return False
+
+ if name[0] != '_' and not name[0].isalpha():
+ return False
+
+ for ch in name[1:]:
+ if name[0] != '_' and not name[0].isalnum():
+ return False
+
+ return True
+
+
+def parse_value(val):
+ try:
+ return int(val)
+ except ValueError:
+ pass
+
+ try:
+ return float(val)
+ except ValueError:
+ pass
+
+ if val.startswith('{%'):
+ assert val.endswith("%}")
+ content = val[2:-2]
+ vals = list(i.strip() for i in content.split(','))
+ return map(parse_value, vals)
+
+ if val.startswith('{'):
+ assert val.endswith("}")
+ assert is_name(val[1:-1])
+ return Var(val[1:-1])
+ return val
+
+
+def fio_config_lexer(fio_cfg, fname):
+ for lineno, oline in enumerate(fio_cfg.split("\n")):
+ try:
+ line = oline.strip()
+
+ if line.startswith("#") or line.startswith(";"):
+ continue
+
+ if line == "":
+ continue
+
+ if '#' in line:
+ raise ParseError("# isn't allowed inside line",
+ fname, lineno, oline)
+
+ if line.startswith('['):
+ yield CfgLine(fname, lineno, oline, SECTION,
+ line[1:-1].strip(), None)
+ elif '=' in line:
+ opt_name, opt_val = line.split('=', 1)
+ yield CfgLine(fname, lineno, oline, SETTING,
+ opt_name.strip(),
+ parse_value(opt_val.strip()))
+ elif line.startswith("include "):
+ yield CfgLine(fname, lineno, oline, INCLUDE,
+ line.split(" ", 1)[1], None)
+ else:
+ yield CfgLine(fname, lineno, oline, SETTING, line, '1')
+
+ except Exception as exc:
+ raise ParseError(str(exc), fname, lineno, oline)
+
+
+def fio_config_parse(lexer_iter):
+ in_globals = False
+ curr_section = None
+ glob_vals = OrderedDict()
+ sections_count = 0
+
+ lexed_lines = list(lexer_iter)
+ one_more = True
+ includes = {}
+
+ while one_more:
+ new_lines = []
+ one_more = False
+ for line in lexed_lines:
+ fname, lineno, oline, tp, name, val = line
+
+ if INCLUDE == tp:
+ if not os.path.exists(fname):
+ dirname = '.'
+ else:
+ dirname = os.path.dirname(fname)
+
+ new_fname = os.path.join(dirname, name)
+ includes[new_fname] = (fname, lineno)
+
+ try:
+ cont = open(new_fname).read()
+ except IOError as err:
+ msg = "Error while including file {0}: {1}"
+ raise ParseError(msg.format(new_fname, err),
+ fname, lineno, oline)
+
+ new_lines.extend(fio_config_lexer(cont, new_fname))
+ one_more = True
+ else:
+ new_lines.append(line)
+
+ lexed_lines = new_lines
+
+ for fname, lineno, oline, tp, name, val in lexed_lines:
+ if tp == SECTION:
+ if curr_section is not None:
+ yield curr_section
+ curr_section = None
+
+ if name == 'global':
+ if sections_count != 0:
+ raise ParseError("[global] section should" +
+ " be only one and first",
+ fname, lineno, oline)
+ in_globals = True
+ else:
+ in_globals = False
+ curr_section = FioJobSection(name)
+ curr_section.vals = glob_vals.copy()
+ sections_count += 1
+ else:
+ assert tp == SETTING
+ if in_globals:
+ glob_vals[name] = val
+ elif name == name.upper():
+ raise ParseError("Param '" + name +
+ "' not in [global] section",
+ fname, lineno, oline)
+ elif curr_section is None:
+ raise ParseError("Data outside section",
+ fname, lineno, oline)
+ else:
+ curr_section.vals[name] = val
+
+ if curr_section is not None:
+ yield curr_section
+
+
+def process_repeats(sec):
+ sec = sec.copy()
+ count = sec.vals.pop('NUM_ROUNDS', 1)
+ assert isinstance(count, (int, long))
+
+ for _ in range(count):
+ yield sec.copy()
+
+ if 'ramp_time' in sec.vals:
+ sec.vals['_ramp_time'] = sec.vals.pop('ramp_time')
+
+
+def process_cycles(sec):
+ cycles = OrderedDict()
+
+ for name, val in sec.vals.items():
+ if isinstance(val, list) and name.upper() != name:
+ cycles[name] = val
+
+ if len(cycles) == 0:
+ yield sec
+ else:
+ for combination in itertools.product(*cycles.values()):
+ new_sec = sec.copy()
+ new_sec.vals.update(zip(cycles.keys(), combination))
+ yield new_sec
+
+
+def apply_params(sec, params):
+ processed_vals = OrderedDict()
+ processed_vals.update(params)
+ for name, val in sec.vals.items():
+ if name in params:
+ continue
+
+ if isinstance(val, Var):
+ if val.name in params:
+ val = params[val.name]
+ elif val.name in processed_vals:
+ val = processed_vals[val.name]
+ processed_vals[name] = val
+ sec = sec.copy()
+ sec.vals = processed_vals
+ return sec
+
+
+def finall_process(sec, counter=[0]):
+ sec = sec.copy()
+
+ if sec.vals.get('numjobs', '1') != 1:
+ msg = "Group reporting should be set if numjobs != 1"
+ assert 'group_reporting' in sec.vals, msg
+
+ sec.vals['unified_rw_reporting'] = '1'
+
+ params = sec.vals.copy()
+ params['UNIQ'] = 'UN{0}'.format(counter[0])
+ params['COUNTER'] = str(counter[0])
+ params['TEST_SUMM'] = get_test_summary(sec)
+ sec.name = sec.name.format(**params)
+ counter[0] += 1
+
+ return sec
+
+
+def get_test_sync_mode(sec):
+ is_sync = str(sec.vals.get("sync", "0")) == "1"
+ is_direct = str(sec.vals.get("direct", "0")) == "1"
+
+ if is_sync and is_direct:
+ return 'x'
+ elif is_sync:
+ return 's'
+ elif is_direct:
+ return 'd'
+ else:
+ return 'a'
+
+
+def get_test_summary(sec):
+ rw = {"randread": "rr",
+ "randwrite": "rw",
+ "read": "sr",
+ "write": "sw"}[sec.vals["rw"]]
+
+ sync_mode = get_test_sync_mode(sec)
+ th_count = sec.vals.get('numjobs')
+
+ if th_count is None:
+ th_count = sec.vals.get('concurence', 1)
+
+ return "{0}{1}{2}th{3}".format(rw,
+ sync_mode,
+ sec.vals['blocksize'],
+ th_count)
+
+
+def execution_time(sec):
+ return sec.vals.get('ramp_time', 0) + sec.vals.get('runtime', 0)
+
+
+def slice_config(sec_iter, runcycle=None, max_jobs=1000, split_on_names=False):
+ jcount = 0
+ runtime = 0
+ curr_slice = []
+ prev_name = None
+
+ for pos, sec in enumerate(sec_iter):
+
+ if prev_name is not None:
+ split_here = False
+
+ if split_on_names and prev_name != sec.name:
+ split_here = True
+
+ if split_here:
+ yield curr_slice
+ curr_slice = []
+ runtime = 0
+ jcount = 0
+
+ prev_name = sec.name
+
+ jc = sec.vals.get('numjobs', 1)
+ msg = "numjobs should be integer, not {0!r}".format(jc)
+ assert isinstance(jc, int), msg
+
+ curr_task_time = execution_time(sec)
+
+ if jc > max_jobs:
+ err_templ = "Can't process job {0!r} - too large numjobs"
+ raise ValueError(err_templ.format(sec.name))
+
+ if runcycle is not None and len(curr_slice) != 0:
+ rc_ok = curr_task_time + runtime <= runcycle
+ else:
+ rc_ok = True
+
+ if jc + jcount <= max_jobs and rc_ok:
+ runtime += curr_task_time
+ jcount += jc
+ curr_slice.append(sec)
+ continue
+
+ assert len(curr_slice) != 0
+ yield curr_slice
+
+ if '_ramp_time' in sec.vals:
+ sec.vals['ramp_time'] = sec.vals.pop('_ramp_time')
+ curr_task_time = execution_time(sec)
+
+ runtime = curr_task_time
+ jcount = jc
+ curr_slice = [sec]
+ prev_name = None
+
+ if curr_slice != []:
+ yield curr_slice
+
+
+def parse_all_in_1(source, fname=None):
+ return fio_config_parse(fio_config_lexer(source, fname))
+
+
+def flatmap(func, inp_iter):
+ for val in inp_iter:
+ for res in func(val):
+ yield res
+
+
+def fio_cfg_compile(source, fname, test_params, **slice_params):
+ it = parse_all_in_1(source, fname)
+ it = (apply_params(sec, test_params) for sec in it)
+ it = flatmap(process_cycles, it)
+ it = flatmap(process_repeats, it)
+ it = itertools.imap(finall_process, it)
+ return slice_config(it, **slice_params)
+
+
+def parse_args(argv):
+ parser = argparse.ArgumentParser(
+ description="Run fio' and return result")
+ parser.add_argument("--runcycle", type=int, default=None,
+ metavar="MAX_CYCLE_SECONDS",
+ help="Max cycle length in seconds")
+ parser.add_argument("-p", "--params", nargs="*", metavar="PARAM=VAL",
+ default=[],
+ help="Provide set of pairs PARAM=VAL to" +
+ "format into job description")
+ parser.add_argument("action", choices=['estimate', 'compile', 'num_tests'])
+ parser.add_argument("jobfile")
+ return parser.parse_args(argv)
+
+
+def main(argv):
+ argv_obj = parse_args(argv)
+
+ if argv_obj.jobfile == '-':
+ job_cfg = sys.stdin.read()
+ else:
+ job_cfg = open(argv_obj.jobfile).read()
+
+ params = {}
+ for param_val in argv_obj.params:
+ assert '=' in param_val
+ name, val = param_val.split("=", 1)
+ params[name] = parse_value(val)
+
+ slice_params = {
+ 'runcycle': argv_obj.runcycle,
+ }
+
+ sliced_it = fio_cfg_compile(job_cfg, argv_obj.jobfile,
+ params, **slice_params)
+
+ if argv_obj.action == 'estimate':
+ sum_time = 0
+ for cfg_slice in sliced_it:
+ sum_time += sum(map(execution_time, cfg_slice))
+ print sec_to_str(sum_time)
+ elif argv_obj.action == 'num_tests':
+ print sum(map(len, map(list, sliced_it)))
+ elif argv_obj.action == 'compile':
+ splitter = "\n#" + "-" * 70 + "\n\n"
+ for cfg_slice in sliced_it:
+ print splitter.join(map(str, cfg_slice))
+
+ return 0
+
+
+if __name__ == '__main__':
+ exit(main(sys.argv[1:]))
diff --git a/wally/suits/io/formatter.py b/wally/suits/io/formatter.py
index 22c090f..84b0a13 100644
--- a/wally/suits/io/formatter.py
+++ b/wally/suits/io/formatter.py
@@ -2,15 +2,21 @@
from wally.utils import ssize2b
from wally.statistic import round_3_digit
-from wally.suits.io.agent import get_test_summary
+from .fio_task_parser import get_test_summary, get_test_sync_mode
def key_func(data):
- p = data.params
+ p = data.params.vals
+
+ th_count = data.params.vals.get('numjobs')
+
+ if th_count is None:
+ th_count = data.params.vals.get('concurence', 1)
+
return (p['rw'],
- p['sync_mode'],
+ get_test_sync_mode(data.params),
ssize2b(p['blocksize']),
- int(p['concurence']) * data.testnodes_count,
+ int(th_count) * data.testnodes_count,
data.name)
@@ -41,8 +47,7 @@
prev_k = curr_k
- descr = get_test_summary(data.params, data.testnodes_count)
- test_dinfo = dinfo[data.name]
+ test_dinfo = dinfo[(data.name, data.summary)]
iops, _ = test_dinfo.iops.rounded_average_conf()
@@ -61,7 +66,7 @@
bw = round_3_digit(bw)
params = (data.name.rsplit('_', 1)[0],
- descr, int(iops), int(bw), str(conf_perc),
+ data.summary, int(iops), int(bw), str(conf_perc),
str(dev_perc),
int(iops_per_vm), int(bw_per_vm), lat)
tab.add_row(params)
diff --git a/wally/suits/io/hdd.cfg b/wally/suits/io/hdd.cfg
index 17d0509..21166e5 100644
--- a/wally/suits/io/hdd.cfg
+++ b/wally/suits/io/hdd.cfg
@@ -1,16 +1,11 @@
-[defaults]
-wait_for_previous
-group_reporting
-time_based
-buffered=0
-iodepth=1
-softrandommap=1
-filename={FILENAME}
-NUM_ROUNDS=7
-NUMJOBS={% 1, 5, 10, 15, 20, 30, 40, 80 %}
+[global]
+include defaults.cfg
-# this is critical for correct results in multy-node run
-randrepeat=0
+NUM_ROUNDS=3
+
+# NUMJOBS={% 1, 5, 10, 15, 20, 30, 40, 80 %}
+
+NUMJOBS={% 1, 5, 10 %}
size=10G
ramp_time=5
@@ -19,7 +14,7 @@
# ---------------------------------------------------------------------
# check different thread count, sync mode. (latency, iops) = func(th_count)
# ---------------------------------------------------------------------
-[hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
+[hdd_test_{TEST_SUMM}]
blocksize=4k
rw=randwrite
sync=1
@@ -29,7 +24,7 @@
# check different thread count, direct read mode. (latency, iops) = func(th_count)
# also check iops for randread
# ---------------------------------------------------------------------
-[hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
+[hdd_test_{TEST_SUMM}]
blocksize=4k
rw=randread
direct=1
@@ -38,7 +33,7 @@
# ---------------------------------------------------------------------
# check IOPS randwrite.
# ---------------------------------------------------------------------
-[hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
+[hdd_test_{TEST_SUMM}]
blocksize=4k
rw=randwrite
direct=1
@@ -47,7 +42,7 @@
# No reason for th count > 1 in case of sequantial operations
# They became random
# ---------------------------------------------------------------------
-[hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
+[hdd_test_{TEST_SUMM}]
blocksize=1m
rw={% read, write %}
direct=1
diff --git a/wally/suits/io/lat_vs_iops.cfg b/wally/suits/io/lat_vs_iops.cfg
index a587a96..dbafcbb 100644
--- a/wally/suits/io/lat_vs_iops.cfg
+++ b/wally/suits/io/lat_vs_iops.cfg
@@ -1,29 +1,40 @@
-[defaults]
-wait_for_previous=1
-filename={FILENAME}
+[global]
+include defaults.cfg
-# this is critical for correct results in multy-node run
-randrepeat=0
-
-iodepth=1
-size=100G
-group_reporting=1
-
-IOPS_LIMIT={% 100, 500 %}
+TEST_FILE_SIZE=100G
+size={TEST_FILE_SIZE}
ramp_time=5
runtime=30
-time_based=1
-buffered=0
-NUMJOBS=1
+blocksize=4k
+rw=randwrite
+sync=1
# ---------------------------------------------------------------------
# latency as function from IOPS
# ---------------------------------------------------------------------
-[lat_vs_iops{rate_iops}_{TEST_SUMM} * {NUM_ROUNDS}]
-blocksize=4k
-rw=randread
-direct=1
-numjobs={NUMJOBS}
-rate_iops={IOPS_LIMIT}
+[lat_vs_iops{rate_iops}_{TEST_SUMM}]
+numjobs=1
+rate_iops={% 20, 40, 60, 80, 100, 120, 160, 200, 250, 300 %}
+
+# ---------------------------------------------------------------------
+# latency as function from IOPS
+# ---------------------------------------------------------------------
+[lat_vs_iops{rate_iops}_{TEST_SUMM}]
+numjobs=3
+rate_iops={% 10, 20, 40, 60, 80, 100, 120, 160 %}
+
+# ---------------------------------------------------------------------
+# latency as function from IOPS
+# ---------------------------------------------------------------------
+[lat_vs_iops{rate_iops}_{TEST_SUMM}]
+numjobs=7
+rate_iops={% 5, 10, 20, 40, 50, 60, 70 %}
+
+# ---------------------------------------------------------------------
+# latency as function from IOPS
+# ---------------------------------------------------------------------
+[lat_vs_iops{rate_iops}_{TEST_SUMM}]
+numjobs=10
+rate_iops={% 5, 10, 20, 40, 50 %}
diff --git a/wally/suits/io/results_loader.py b/wally/suits/io/results_loader.py
deleted file mode 100644
index 988fe0e..0000000
--- a/wally/suits/io/results_loader.py
+++ /dev/null
@@ -1,56 +0,0 @@
-import re
-import json
-
-
-def parse_output(out_err):
- err_start_patt = r"(?ims)=+\s+ERROR\s+=+"
- err_end_patt = r"(?ims)=+\s+END OF ERROR\s+=+"
-
- for block in re.split(err_start_patt, out_err)[1:]:
- tb, garbage = re.split(err_end_patt, block)
- msg = "Test fails with error:\n" + tb.strip() + "\n"
- raise OSError(msg)
-
- start_patt = r"(?ims)=+\s+RESULTS\(format=json\)\s+=+"
- end_patt = r"(?ims)=+\s+END OF RESULTS\s+=+"
-
- for block in re.split(start_patt, out_err)[1:]:
- data, garbage = re.split(end_patt, block)
- yield json.loads(data.strip())
-
- start_patt = r"(?ims)=+\s+RESULTS\(format=eval\)\s+=+"
- end_patt = r"(?ims)=+\s+END OF RESULTS\s+=+"
-
- for block in re.split(start_patt, out_err)[1:]:
- data, garbage = re.split(end_patt, block)
- yield eval(data.strip())
-
-
-def filter_data(name_prefix, fields_to_select, **filters):
- def closure(data):
- for result in data:
- if name_prefix is not None:
- if not result['jobname'].startswith(name_prefix):
- continue
-
- for k, v in filters.items():
- if result.get(k) != v:
- break
- else:
- yield map(result.get, fields_to_select)
- return closure
-
-
-def filter_processed_data(name_prefix, fields_to_select, **filters):
- def closure(data):
- for name, result in data.items():
- if name_prefix is not None:
- if not name.startswith(name_prefix):
- continue
-
- for k, v in filters.items():
- if result.raw.get(k) != v:
- break
- else:
- yield map(result.raw.get, fields_to_select)
- return closure
diff --git a/wally/suits/io/verify.cfg b/wally/suits/io/verify.cfg
index 58b8450..9ebfad1 100644
--- a/wally/suits/io/verify.cfg
+++ b/wally/suits/io/verify.cfg
@@ -1,28 +1,18 @@
-[defaults]
-wait_for_previous
-group_reporting
-time_based
-buffered=0
-iodepth=1
-softrandommap=1
-filename={FILENAME}
-NUM_ROUNDS=1
-
-# this is critical for correct results in multy-node run
-randrepeat=0
-
+[global]
+include defaults.cfg
size=50G
ramp_time=5
runtime=60
+NUM_ROUNDS=2
# ---------------------------------------------------------------------
-[verify_{TEST_SUMM} * {NUM_ROUNDS}]
-blocksize=4k
-rw=randwrite
-direct=1
+#[verify_{TEST_SUMM}]
+#blocksize=4k
+#rw=randwrite
+#direct=1
# ---------------------------------------------------------------------
-[verify_{TEST_SUMM} * {NUM_ROUNDS}]
+[verify_{TEST_SUMM}]
blocksize=4k
rw=randread
direct=1