more improvements and fixes and new bugs
diff --git a/wally/suits/io/agent.py b/wally/suits/io/agent.py
index 7589346..d15d18e 100644
--- a/wally/suits/io/agent.py
+++ b/wally/suits/io/agent.py
@@ -55,6 +55,40 @@
counter = [0]
+def extract_iterables(vals):
+ iterable_names = []
+ iterable_values = []
+ rest = {}
+
+ for val_name, val in vals.items():
+ if val is None or not val.startswith('{%'):
+ rest[val_name] = val
+ else:
+ assert val.endswith("%}")
+ content = val[2:-2]
+ iterable_names.append(val_name)
+ iterable_values.append(list(i.strip() for i in content.split(',')))
+
+ return iterable_names, iterable_values, rest
+
+
+def format_params_into_section(sec, params, final=True):
+ processed_vals = {}
+
+ for val_name, val in sec.items():
+ if val is None:
+ processed_vals[val_name] = val
+ else:
+ try:
+ processed_vals[val_name] = val.format(**params)
+ except KeyError:
+ if final:
+ raise
+ processed_vals[val_name] = val
+
+ return processed_vals
+
+
def process_section(name, vals, defaults, format_params):
vals = vals.copy()
params = format_params.copy()
@@ -67,30 +101,25 @@
repeat = 1
# this code can be optimized
- iterable_names = []
- iterable_values = []
- processed_vals = {}
-
- for val_name, val in vals.items():
- if val is None:
- processed_vals[val_name] = val
- # remove hardcode
- elif val.startswith('{%'):
- assert val.endswith("%}")
- content = val[2:-2].format(**params)
- iterable_names.append(val_name)
- iterable_values.append(list(i.strip() for i in content.split(',')))
- else:
- processed_vals[val_name] = val.format(**params)
+ iterable_names, iterable_values, processed_vals = extract_iterables(vals)
group_report_err_msg = "Group reporting should be set if numjobs != 1"
if iterable_values == []:
+ processed_vals = format_params_into_section(processed_vals, params,
+ final=False)
params['UNIQ'] = 'UN{0}'.format(counter[0])
counter[0] += 1
params['TEST_SUMM'] = get_test_summary(processed_vals)
- if processed_vals.get('numjobs', '1') != '1':
+ num_jobs = int(processed_vals.get('numjobs', '1'))
+ fsize = to_bytes(processed_vals['size'])
+ params['PER_TH_OFFSET'] = fsize // num_jobs
+
+ processed_vals = format_params_into_section(processed_vals, params,
+ final=True)
+
+ if num_jobs != 1:
assert 'group_reporting' in processed_vals, group_report_err_msg
ramp_time = processed_vals.get('ramp_time')
@@ -104,11 +133,21 @@
processed_vals['ramp_time'] = ramp_time
else:
for it_vals in itertools.product(*iterable_values):
+ processed_vals = format_params_into_section(processed_vals, params,
+ final=False)
+
processed_vals.update(dict(zip(iterable_names, it_vals)))
params['UNIQ'] = 'UN{0}'.format(counter[0])
counter[0] += 1
params['TEST_SUMM'] = get_test_summary(processed_vals)
+ num_jobs = int(processed_vals.get('numjobs', '1'))
+ fsize = to_bytes(processed_vals['size'])
+ params['PER_TH_OFFSET'] = fsize // num_jobs
+
+ processed_vals = format_params_into_section(processed_vals, params,
+ final=True)
+
if processed_vals.get('numjobs', '1') != '1':
assert 'group_reporting' in processed_vals,\
group_report_err_msg
@@ -130,6 +169,7 @@
time = 0
for _, params in combinations:
time += int(params.get('ramp_time', 0))
+ time += int(params.get('_ramp_time', 0))
time += int(params.get('runtime', 0))
return time
@@ -236,6 +276,8 @@
return (1024 ** 2) * int(sz[:-1])
if sz[-1] == 'k':
return 1024 * int(sz[:-1])
+ if sz[-1] == 'g':
+ return (1024 ** 3) * int(sz[:-1])
raise
@@ -348,12 +390,17 @@
def do_run_fio(bconf):
benchmark_config = format_fio_config(bconf)
cmd = ["fio", "--output-format=json", "--alloc-size=262144", "-"]
- p = subprocess.Popen(cmd, stdin=subprocess.PIPE,
+ p = subprocess.Popen(cmd,
+ stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT)
+ stderr=subprocess.PIPE)
# set timeout
- raw_out, _ = p.communicate(benchmark_config)
+ raw_out, raw_err = p.communicate(benchmark_config)
+
+ if 0 != p.returncode:
+ msg = "Fio failed with code: {0}\nOutput={1}"
+ raise OSError(msg.format(p.returncode, raw_err))
try:
parsed_out = json.loads(raw_out)["jobs"]
@@ -373,7 +420,14 @@
MAX_JOBS = 1000
-def next_test_portion(whole_conf, runcycle):
+def next_test_portion(whole_conf, runcycle, cluster=False):
+ if cluster:
+ for name, sec in whole_conf:
+ if '_ramp_time' in sec:
+ sec['ramp_time'] = sec.pop('_ramp_time')
+ yield [(name, sec)]
+ return
+
jcount = 0
runtime = 0
bconf = []
@@ -458,12 +512,14 @@
res[jname] = j_res
-def compile(benchmark_config, params, runcycle=None):
+def compile(benchmark_config, params, skip=0, runcycle=None, cluster=False):
whole_conf = list(parse_fio_config_full(benchmark_config, params))
+ whole_conf = whole_conf[skip:]
res = ""
- for bconf in next_test_portion(whole_conf, runcycle):
+ for bconf in next_test_portion(whole_conf, runcycle, cluster=cluster):
res += format_fio_config(bconf)
+ res += "\n#" + "-" * 50 + "\n\n"
return res
@@ -473,7 +529,8 @@
runcycle=None,
raw_results_func=None,
skip_tests=0,
- fake_fio=False):
+ fake_fio=False,
+ cluster=False):
whole_conf = list(parse_fio_config_full(benchmark_config, params))
whole_conf = whole_conf[skip_tests:]
@@ -482,7 +539,7 @@
executed_tests = 0
ok = True
try:
- for bconf in next_test_portion(whole_conf, runcycle):
+ for bconf in next_test_portion(whole_conf, runcycle, cluster=cluster):
if fake_fio:
res_cfg_it = do_run_fio_fake(bconf)
@@ -504,7 +561,7 @@
continue
add_job_results(jname, job_output, jconfig, res)
-
+ curr_test_num += 1
msg_template = "Done {0} tests from {1}. ETA: {2}"
exec_time = estimate_cfg(benchmark_config, params, curr_test_num)
@@ -588,6 +645,8 @@
help="Skip NUM tests")
parser.add_argument("--faked-fio", action='store_true',
default=False, help="Emulate fio with 0 test time")
+ parser.add_argument("--cluster", action='store_true',
+ default=False, help="Apply cluster-test settings")
parser.add_argument("--params", nargs="*", metavar="PARAM=VAL",
default=[],
help="Provide set of pairs PARAM=VAL to" +
@@ -620,14 +679,16 @@
return 0
if argv_obj.num_tests or argv_obj.compile:
- bconf = list(parse_fio_config_full(job_cfg, params))
- bconf = bconf[argv_obj.skip_tests:]
-
if argv_obj.compile:
- out_fd.write(format_fio_config(bconf))
+ data = compile(job_cfg, params, argv_obj.skip_tests,
+ cluster=argv_obj.cluster)
+ out_fd.write(data)
out_fd.write("\n")
if argv_obj.num_tests:
+ bconf = list(parse_fio_config_full(job_cfg, params,
+ argv_obj.cluster))
+ bconf = bconf[argv_obj.skip_tests:]
print len(bconf)
return 0
@@ -653,7 +714,8 @@
argv_obj.runcycle,
rrfunc,
argv_obj.skip_tests,
- argv_obj.faked_fio)
+ argv_obj.faked_fio,
+ cluster=argv_obj.cluster)
etime = time.time()
res = {'__meta__': {'raw_cfg': job_cfg, 'params': params}, 'res': job_res}
diff --git a/wally/suits/io/formatter.py b/wally/suits/io/formatter.py
index 529b78a..a1da1c3 100644
--- a/wally/suits/io/formatter.py
+++ b/wally/suits/io/formatter.py
@@ -1,8 +1,8 @@
import texttable
from wally.utils import ssize_to_b
-from wally.statistic import med_dev
from wally.suits.io.agent import get_test_summary
+from wally.statistic import med_dev, round_deviation, round_3_digit
def key_func(k_data):
@@ -37,14 +37,24 @@
descr = get_test_summary(data)
- iops, _ = med_dev(data['iops'])
- bw, bwdev = med_dev(data['bw'])
+ iops, _ = round_deviation(med_dev(data['iops']))
+ bw, bwdev = round_deviation(med_dev(data['bw']))
# 3 * sigma
- dev_perc = int((bwdev * 300) / bw)
+ if 0 == bw:
+ assert 0 == bwdev
+ dev_perc = 0
+ else:
+ dev_perc = int((bwdev * 300) / bw)
- params = (descr, int(iops), int(bw), dev_perc,
- int(med_dev(data['lat'])[0]) // 1000)
+ med_lat, _ = round_deviation(med_dev(data['lat']))
+ med_lat = int(med_lat) // 1000
+
+ iops = round_3_digit(iops)
+ bw = round_3_digit(bw)
+ med_lat = round_3_digit(med_lat)
+
+ params = (descr, int(iops), int(bw), dev_perc, med_lat)
tab.add_row(params)
header = ["Description", "IOPS", "BW KiBps", "Dev * 3 %", "clat ms"]
diff --git a/wally/suits/io/io_scenario_ceph.cfg b/wally/suits/io/io_scenario_ceph.cfg
new file mode 100644
index 0000000..5e793f2
--- /dev/null
+++ b/wally/suits/io/io_scenario_ceph.cfg
@@ -0,0 +1,62 @@
+[defaults]
+wait_for_previous
+group_reporting
+time_based
+buffered=0
+iodepth=1
+softrandommap=1
+filename={FILENAME}
+NUM_ROUNDS=7
+
+size=5G
+ramp_time=20
+runtime=20
+
+# ---------------------------------------------------------------------
+# check different thread count, direct read/write mode. (bw, iops) = func(th_count)
+# also check BW for seq read/write.
+# ---------------------------------------------------------------------
+[ceph_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize=1m
+rw=read
+direct=1
+offset_increment={PER_TH_OFFSET}
+numjobs={% 20, 120 %}
+
+# # ---------------------------------------------------------------------
+# # check different thread count, sync mode. (latency, iops) = func(th_count)
+# # ---------------------------------------------------------------------
+# [hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
+# blocksize=4k
+# rw=randwrite
+# sync=1
+# numjobs={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
+#
+# # ---------------------------------------------------------------------
+# # check different thread count, direct read mode. (latency, iops) = func(th_count)
+# # also check iops for randread
+# # ---------------------------------------------------------------------
+# [hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
+# blocksize=4k
+# rw=randread
+# direct=1
+# numjobs={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
+#
+# # ---------------------------------------------------------------------
+# # check different thread count, direct read/write mode. (bw, iops) = func(th_count)
+# # also check BW for seq read/write.
+# # ---------------------------------------------------------------------
+# [hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
+# blocksize=1m
+# rw={% read, write %}
+# direct=1
+# offset_increment=1073741824 # 1G
+# numjobs={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
+#
+# # ---------------------------------------------------------------------
+# # check IOPS randwrite.
+# # ---------------------------------------------------------------------
+# [hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
+# blocksize=4k
+# rw=randwrite
+# direct=1
diff --git a/wally/suits/io/io_scenario_hdd.cfg b/wally/suits/io/io_scenario_hdd.cfg
index 5e24009..46191f2 100644
--- a/wally/suits/io/io_scenario_hdd.cfg
+++ b/wally/suits/io/io_scenario_hdd.cfg
@@ -8,8 +8,8 @@
filename={FILENAME}
NUM_ROUNDS=7
-ramp_time=5
size=10Gb
+ramp_time=5
runtime=30
# ---------------------------------------------------------------------
@@ -39,6 +39,7 @@
blocksize=1m
rw={% read, write %}
direct=1
+offset_increment=1073741824 # 1G
numjobs={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
# ---------------------------------------------------------------------
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index 1e93247..c5615bb 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -137,29 +137,45 @@
self.files_to_copy = {local_fname: self.io_py_remote}
copy_paths(conn, self.files_to_copy)
- files = {}
+ if self.options.get('prefill_files', True):
+ files = {}
- for secname, params in self.configs:
- sz = ssize_to_b(params['size'])
- msz = msz = sz / (1024 ** 2)
- if sz % (1024 ** 2) != 0:
- msz += 1
+ for secname, params in self.configs:
+ sz = ssize_to_b(params['size'])
+ msz = sz / (1024 ** 2)
+ if sz % (1024 ** 2) != 0:
+ msz += 1
- fname = params['filename']
- files[fname] = max(files.get(fname, 0), msz)
+ fname = params['filename']
- # logger.warning("dd run DISABLED")
- # cmd_templ = "dd if=/dev/zero of={0} bs={1} count={2}"
+ # if already has other test with the same file name
+ # take largest size
+ files[fname] = max(files.get(fname, 0), msz)
- cmd_templ = "sudo dd if=/dev/zero of={0} bs={1} count={2}"
- for fname, sz in files.items():
- cmd = cmd_templ.format(fname, 1024 ** 2, msz)
- run_over_ssh(conn, cmd, timeout=msz, node=self.node)
+ # logger.warning("dd run DISABLED")
+ # cmd_templ = "dd if=/dev/zero of={0} bs={1} count={2}"
+
+ cmd_templ = "sudo dd if=/dev/zero of={0} bs={1} count={2}"
+ ssize = 0
+ stime = time.time()
+
+ for fname, curr_sz in files.items():
+ cmd = cmd_templ.format(fname, 1024 ** 2, curr_sz)
+ ssize += curr_sz
+ run_over_ssh(conn, cmd, timeout=curr_sz, node=self.node)
+
+ ddtime = time.time() - stime
+ if ddtime > 1E-3:
+ fill_bw = int(ssize / ddtime)
+ mess = "Initiall dd fill bw is {0} MiBps for this vm"
+ logger.info(mess.format(fill_bw))
+ else:
+ logger.warning("Test files prefill disabled")
def run(self, conn, barrier):
# logger.warning("No tests runned")
# return
- cmd_templ = "sudo env python2 {0} --type {1} {2} --json -"
+ cmd_templ = "sudo env python2 {0} {3} --type {1} {2} --json -"
# cmd_templ = "env python2 {0} --type {1} {2} --json -"
params = " ".join("{0}={1}".format(k, v)
@@ -168,16 +184,24 @@
if "" != params:
params = "--params " + params
- cmd = cmd_templ.format(self.io_py_remote, self.tool, params)
+ if self.options.get('cluster', False):
+ logger.info("Cluster mode is used")
+ cluster_opt = "--cluster"
+ else:
+ logger.info("Non-cluster mode is used")
+ cluster_opt = ""
+
+ cmd = cmd_templ.format(self.io_py_remote, self.tool, params,
+ cluster_opt)
logger.debug("Waiting on barrier")
exec_time = io_agent.estimate_cfg(self.raw_cfg, self.config_params)
exec_time_str = sec_to_str(exec_time)
try:
+ timeout = int(exec_time * 1.2 + 300)
if barrier.wait():
templ = "Test should takes about {0}. Will wait at most {1}"
- timeout = int(exec_time * 1.1 + 300)
logger.info(templ.format(exec_time_str, sec_to_str(timeout)))
out_err = run_over_ssh(conn, cmd,