pre-release updates, bug fixes
diff --git a/tests/itest.py b/tests/itest.py
index 53c4af3..a048703 100644
--- a/tests/itest.py
+++ b/tests/itest.py
@@ -1,10 +1,10 @@
-import re
import abc
-import json
+import time
import os.path
import logging
from disk_perf_test_tool.tests import disk_test_agent
+from disk_perf_test_tool.tests.disk_test_agent import parse_fio_config_full
from disk_perf_test_tool.tests.io_results_loader import parse_output
from disk_perf_test_tool.ssh_utils import copy_paths
from disk_perf_test_tool.utils import run_over_ssh, ssize_to_b
@@ -51,17 +51,15 @@
def pre_run(self, conn):
remote_script = self.copy_script(conn, self.pre_run_script)
cmd = remote_script
- code, out_err = run_over_ssh(conn, cmd)
- if code != 0:
- raise Exception("Pre run failed. %s" % out_err)
+ run_over_ssh(conn, cmd)
def run(self, conn, barrier):
remote_script = self.copy_script(conn, self.run_script)
cmd_opts = ' '.join(["%s %s" % (key, val) for key, val
in self.opts.items()])
cmd = remote_script + ' ' + cmd_opts
- code, out_err = run_over_ssh(conn, cmd)
- self.on_result(code, out_err, cmd)
+ out_err = run_over_ssh(conn, cmd)
+ self.on_result(out_err, cmd)
def parse_results(self, out):
for line in out.split("\n"):
@@ -69,16 +67,12 @@
if key and value:
self.on_result_cb((key, float(value)))
- def on_result(self, code, out_err, cmd):
- if 0 == code:
- try:
- self.parse_results(out_err)
- except Exception as exc:
- msg_templ = "Error during postprocessing results: {0!r}"
- raise RuntimeError(msg_templ.format(exc.message))
- else:
- templ = "Command {0!r} failed with code {1}. Error output is:\n{2}"
- logger.error(templ.format(cmd, code, out_err))
+ def on_result(self, out_err, cmd):
+ try:
+ self.parse_results(out_err)
+ except Exception as exc:
+ msg_templ = "Error during postprocessing results: {0!r}. {1}"
+ raise RuntimeError(msg_templ.format(exc.message, out_err))
class PgBenchTest(TwoScriptTest):
@@ -102,14 +96,19 @@
self.config_params = test_options.get('params', {})
self.tool = test_options.get('tool', 'fio')
self.raw_cfg = open(self.config_fname).read()
-
- parse_func = disk_test_agent.parse_fio_config_full
- self.configs = parse_func(self.raw_cfg, self.config_params)
+ self.configs = parse_fio_config_full(self.raw_cfg, self.config_params)
def pre_run(self, conn):
# TODO: install fio, if not installed
- run_over_ssh(conn, "apt-get -y install fio")
+ cmd = "sudo apt-get -y install fio"
+
+ for i in range(3):
+ try:
+ run_over_ssh(conn, cmd)
+ break
+ except OSError:
+ time.sleep(3)
local_fname = disk_test_agent.__file__.rsplit('.')[0] + ".py"
self.files_to_copy = {local_fname: self.io_py_remote}
@@ -123,30 +122,64 @@
msz += 1
cmd = cmd_templ.format(params['filename'], 1024 ** 2, msz)
- code, out_err = run_over_ssh(conn, cmd)
-
- if code != 0:
- raise RuntimeError("Preparation failed " + out_err)
+ run_over_ssh(conn, cmd)
def run(self, conn, barrier):
- cmd_templ = "env python2 {0} --type {1} --json -"
- cmd = cmd_templ.format(self.io_py_remote, self.tool)
- logger.debug("Run {0}".format(cmd))
+ cmd_templ = "env python2 {0} --type {1} {2} --json -"
+
+ params = " ".join("{0}={1}".format(k, v)
+ for k, v in self.config_params.items())
+
+ if "" != params:
+ params = "--params " + params
+
+ cmd = cmd_templ.format(self.io_py_remote, self.tool, params)
+ logger.debug("Waiting on barrier")
try:
barrier.wait()
- code, out_err = run_over_ssh(conn, cmd, stdin_data=self.raw_cfg)
- self.on_result(code, out_err, cmd)
+ logger.debug("Run {0}".format(cmd))
+ out_err = run_over_ssh(conn, cmd, stdin_data=self.raw_cfg)
+ self.on_result(out_err, cmd)
finally:
barrier.exit()
- def on_result(self, code, out_err, cmd):
- if 0 == code:
- try:
- for data in parse_output(out_err):
- self.on_result_cb(data)
- except Exception as exc:
- msg_templ = "Error during postprocessing results: {0!r}"
- raise RuntimeError(msg_templ.format(exc.message))
- else:
- templ = "Command {0!r} failed with code {1}. Output is:\n{2}"
- logger.error(templ.format(cmd, code, out_err))
+ def on_result(self, out_err, cmd):
+ try:
+ for data in parse_output(out_err):
+ self.on_result_cb(data)
+ except Exception as exc:
+ msg_templ = "Error during postprocessing results: {0!r}"
+ raise RuntimeError(msg_templ.format(exc.message))
+
+ def merge_results(self, results):
+ merged_result = results[0]
+ merged_data = merged_result['res']
+ expected_keys = set(merged_data.keys())
+ mergable_fields = ['bw_mean', 'clat', 'iops', 'lat', 'slat']
+
+ for res in results[1:]:
+ assert res['__meta__'] == merged_result['__meta__']
+
+ data = res['res']
+ diff = set(data.keys()).symmetric_difference(expected_keys)
+
+ msg = "Difference: {0}".format(",".join(diff))
+ assert len(diff) == 0, msg
+
+ for testname, test_data in data.items():
+ res_test_data = merged_data[testname]
+
+ diff = set(test_data.keys()).symmetric_difference(
+ res_test_data.keys())
+
+ msg = "Difference: {0}".format(",".join(diff))
+ assert len(diff) == 0, msg
+
+ for k, v in test_data.items():
+ if k in mergable_fields:
+ res_test_data[k].extend(v)
+ else:
+ msg = "{0!r} != {1!r}".format(res_test_data[k], v)
+ assert res_test_data[k] == v, msg
+
+ return merged_result