fix multythreaded bugs in disk_test_agent, add linearity data and script
diff --git a/tests/disk_test_agent.py b/tests/disk_test_agent.py
index 4f23f90..d626889 100644
--- a/tests/disk_test_agent.py
+++ b/tests/disk_test_agent.py
@@ -1,3 +1,4 @@
+import re
import sys
import time
import json
@@ -65,18 +66,29 @@
else:
processed_vals[val_name] = val.format(**params)
+ group_report_err_msg = "Group reporting should be set if numjobs != 1"
+
if iterable_values == []:
params['UNIQ'] = 'UN{0}'.format(counter[0])
counter[0] += 1
params['TEST_SUMM'] = get_test_summary(processed_vals)
+
+ if processed_vals.get('numjobs', '1') != '1':
+ assert 'group_reporting' in processed_vals, group_report_err_msg
+
for i in range(repeat):
- yield name.format(**params), processed_vals
+ yield name.format(**params), processed_vals.copy()
else:
for it_vals in itertools.product(*iterable_values):
processed_vals.update(dict(zip(iterable_names, it_vals)))
params['UNIQ'] = 'UN{0}'.format(counter[0])
counter[0] += 1
params['TEST_SUMM'] = get_test_summary(processed_vals)
+
+ if processed_vals.get('numjobs', '1') != '1':
+ assert 'group_reporting' in processed_vals,\
+ group_report_err_msg
+
for i in range(repeat):
yield name.format(**params), processed_vals.copy()
@@ -372,7 +384,7 @@
j_res["direct_io"] = jconfig.get("direct", "0") == "1"
j_res["sync"] = jconfig.get("sync", "0") == "1"
j_res["concurence"] = int(jconfig.get("numjobs", 1))
- j_res["size"] = jconfig["size"]
+ j_res["blocksize"] = jconfig["blocksize"]
j_res["jobname"] = job_output["jobname"]
j_res["timings"] = (jconfig.get("runtime"),
jconfig.get("ramp_time"))
@@ -385,7 +397,7 @@
assert j_res["sync"] == (jconfig.get("sync", "0") == "1")
assert j_res["concurence"] == int(jconfig.get("numjobs", 1))
- assert j_res["size"] == jconfig["size"]
+ assert j_res["blocksize"] == jconfig["blocksize"]
assert j_res["jobname"] == job_output["jobname"]
assert j_res["timings"] == (jconfig.get("runtime"),
jconfig.get("ramp_time"))
@@ -414,7 +426,7 @@
whole_conf = whole_conf[skip_tests:]
res = {}
curr_test_num = skip_tests
- execited_tests = 0
+ executed_tests = 0
try:
for bconf in next_test_portion(whole_conf, runcycle):
@@ -426,9 +438,9 @@
res_cfg_it = enumerate(res_cfg_it, curr_test_num)
for curr_test_num, (job_output, (jname, jconfig)) in res_cfg_it:
- execited_tests += 1
+ executed_tests += 1
if raw_results_func is not None:
- raw_results_func(curr_test_num,
+ raw_results_func(executed_tests,
[job_output, jname, jconfig])
assert jname == job_output["jobname"], \
@@ -445,7 +457,7 @@
except Exception:
traceback.print_exc()
- return res, execited_tests
+ return res, executed_tests
def run_benchmark(binary_tp, *argv, **kwargs):
@@ -454,6 +466,22 @@
raise ValueError("Unknown behcnmark {0}".format(binary_tp))
+def parse_output(out_err):
+ start_patt = r"(?ims)=+\s+RESULTS\(format=json\)\s+=+"
+ end_patt = r"(?ims)=+\s+END OF RESULTS\s+=+"
+
+ for block in re.split(start_patt, out_err)[1:]:
+ data, garbage = re.split(end_patt, block)
+ yield json.loads(data.strip())
+
+ start_patt = r"(?ims)=+\s+RESULTS\(format=eval\)\s+=+"
+ end_patt = r"(?ims)=+\s+END OF RESULTS\s+=+"
+
+ for block in re.split(start_patt, out_err)[1:]:
+ data, garbage = re.split(end_patt, block)
+ yield eval(data.strip())
+
+
def parse_args(argv):
parser = argparse.ArgumentParser(
description="Run fio' and return result")
diff --git a/tests/io_scenario_check_th_count.cfg b/tests/io_scenario_check_th_count.cfg
index bc30c1c..478439e 100644
--- a/tests/io_scenario_check_th_count.cfg
+++ b/tests/io_scenario_check_th_count.cfg
@@ -1,6 +1,5 @@
[defaults]
NUM_ROUNDS=7
-
ramp_time=5
buffered=0
wait_for_previous
@@ -9,13 +8,20 @@
size=10Gb
time_based
runtime=30
+group_reporting
# ---------------------------------------------------------------------
# check different thread count. (latency, bw) = func(th_count)
# ---------------------------------------------------------------------
[concurrence_test_{TEST_SUMM} * {NUM_ROUNDS}]
blocksize={% 4k, 1m %}
-rw=randwrite
+rw={% randwrite, randread %}
direct=1
numjobs={% 1, 2, 5, 10, 15, 20, 25, 30, 35, 40 %}
+[concurrence_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize={% 4k, 1m %}
+rw=randwrite
+direct=0
+sync=1
+numjobs={% 1, 2, 5, 10, 15, 20, 25, 30, 35, 40 %}
diff --git a/tests/itest.py b/tests/itest.py
index 61c4124..b073879 100644
--- a/tests/itest.py
+++ b/tests/itest.py
@@ -3,13 +3,10 @@
import json
import os.path
import logging
-from StringIO import StringIO
-from ConfigParser import RawConfigParser
-from tests import disk_test_agent
-from ssh_utils import copy_paths
-from utils import run_over_ssh, ssize_to_b
-
+from disk_perf_test_tool.tests import disk_test_agent
+from disk_perf_test_tool.ssh_utils import copy_paths
+from disk_perf_test_tool.utils import run_over_ssh, ssize_to_b
logger = logging.getLogger("io-perf-tool")
@@ -144,11 +141,8 @@
def on_result(self, code, out_err, cmd):
if 0 == code:
try:
- start_patt = r"(?ims)=+\s+RESULTS\(format=json\)\s+=+"
- end_patt = r"(?ims)=+\s+END OF RESULTS\s+=+"
- for block in re.split(start_patt, out_err)[1:]:
- data, garbage = re.split(end_patt, block)
- self.on_result_cb(json.loads(data.strip()))
+ for data in disk_test_agent.parse_output(out_err):
+ self.on_result_cb(data)
except Exception as exc:
msg_templ = "Error during postprocessing results: {0!r}"
raise RuntimeError(msg_templ.format(exc.message))