large commit. refactoring, fio code totally reworker, huge improvenent in test time and results, etc
diff --git a/tests/ b/tests/
new file mode 100644
index 0000000..7537438
--- /dev/null
+++ b/tests/
@@ -0,0 +1,457 @@
+import sys
+import time
+import json
+import select
+import pprint
+import argparse
+import traceback
+import subprocess
+import itertools
+from collections import OrderedDict
+def get_test_summary(params):
+ rw = {"randread": "rr",
+ "randwrite": "rw",
+ "read": "sr",
+ "write": "sw"}[params["rw"]]
+ if params.get("direct") == '1':
+ sync_mode = 'd'
+ elif params.get("sync") == '1':
+ sync_mode = 's'
+ else:
+ sync_mode = 'a'
+ th_count = int(params.get('numjobs', '1'))
+ return "{0}{1}{2}th{3}".format(rw, sync_mode,
+ params['blocksize'], th_count)
+counter = [0]
+def process_section(name, vals, defaults, format_params):
+ vals = vals.copy()
+ params = format_params.copy()
+ if '*' in name:
+ name, repeat = name.split('*')
+ name = name.strip()
+ repeat = int(repeat.format(**params))
+ else:
+ repeat = 1
+ # this code can be optimized
+ for i in range(repeat):
+ iterable_names = []
+ iterable_values = []
+ processed_vals = {}
+ for val_name, val in vals.items():
+ if val is None:
+ processed_vals[val_name] = val
+ # remove hardcode
+ elif val.startswith('{%'):
+ assert val.endswith("%}")
+ content = val[2:-2].format(**params)
+ iterable_names.append(val_name)
+ iterable_values.append(i.strip() for i in content.split(','))
+ else:
+ processed_vals[val_name] = val.format(**params)
+ if iterable_values == []:
+ params['UNIQ'] = 'UN{0}'.format(counter[0])
+ counter[0] += 1
+ params['TEST_SUMM'] = get_test_summary(processed_vals)
+ yield name.format(**params), processed_vals
+ else:
+ for it_vals in itertools.product(*iterable_values):
+ processed_vals.update(dict(zip(iterable_names, it_vals)))
+ params['UNIQ'] = 'UN{0}'.format(counter[0])
+ counter[0] += 1
+ params['TEST_SUMM'] = get_test_summary(processed_vals)
+ yield name.format(**params), processed_vals
+def calculate_execution_time(combinations):
+ time = 0
+ for _, params in combinations:
+ time += int(params.get('ramp_time', 0))
+ time += int(params.get('runtime', 0))
+ return time
+def parse_fio_config_full(fio_cfg, params=None):
+ defaults = {}
+ format_params = {}
+ if params is None:
+ ext_params = {}
+ else:
+ ext_params = params.copy()
+ curr_section = None
+ curr_section_name = None
+ for tp, name, val in parse_fio_config_iter(fio_cfg):
+ if tp == SECTION:
+ non_def = curr_section_name != 'defaults'
+ if curr_section_name is not None and non_def:
+ format_params.update(ext_params)
+ for sec in process_section(curr_section_name,
+ curr_section,
+ defaults,
+ format_params):
+ yield sec
+ if name == 'defaults':
+ curr_section = defaults
+ else:
+ curr_section = OrderedDict()
+ curr_section.update(defaults)
+ curr_section_name = name
+ else:
+ assert tp == SETTING
+ assert curr_section_name is not None, "no section name"
+ if name == name.upper():
+ assert curr_section_name == 'defaults'
+ format_params[name] = val
+ else:
+ curr_section[name] = val
+ if curr_section_name is not None and curr_section_name != 'defaults':
+ format_params.update(ext_params)
+ for sec in process_section(curr_section_name,
+ curr_section,
+ defaults,
+ format_params):
+ yield sec
+def parse_fio_config_iter(fio_cfg):
+ for lineno, line in enumerate(fio_cfg.split("\n")):
+ try:
+ line = line.strip()
+ if line.startswith("#") or line.startswith(";"):
+ continue
+ if line == "":
+ continue
+ if line.startswith('['):
+ assert line.endswith(']'), "name should ends with ]"
+ yield SECTION, line[1:-1], None
+ elif '=' in line:
+ opt_name, opt_val = line.split('=', 1)
+ yield SETTING, opt_name.strip(), opt_val.strip()
+ else:
+ yield SETTING, line, None
+ except Exception as exc:
+ pref = "During parsing line number {0}\n".format(lineno)
+ raise ValueError(pref + exc.message)
+def format_fio_config(fio_cfg):
+ res = ""
+ for pos, (name, section) in enumerate(fio_cfg):
+ if pos != 0:
+ res += "\n"
+ res += "[{0}]\n".format(name)
+ for opt_name, opt_val in section.items():
+ if opt_val is None:
+ res += opt_name + "\n"
+ else:
+ res += "{0}={1}\n".format(opt_name, opt_val)
+ return res
+def do_run_fio(bconf):
+ benchmark_config = format_fio_config(bconf)
+ cmd = ["fio", "--output-format=json", "-"]
+ p = subprocess.Popen(cmd, stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT)
+ # set timeout
+ raw_out, _ = p.communicate(benchmark_config)
+ try:
+ parsed_out = json.loads(raw_out)["jobs"]
+ except Exception:
+ msg = "Can't parse fio output: {0!r}\nError: {1}"
+ raise ValueError(msg.format(raw_out, traceback.format_exc()))
+ return zip(parsed_out, bconf)
+# limited by fio
+MAX_JOBS = 1000
+def next_test_portion(whole_conf, runcycle):
+ jcount = 0
+ runtime = 0
+ bconf = []
+ for pos, (name, sec) in enumerate(whole_conf):
+ jc = int(sec.get('numjobs', '1'))
+ if runcycle is not None:
+ curr_task_time = calculate_execution_time([(name, sec)])
+ else:
+ curr_task_time = 0
+ if jc > MAX_JOBS:
+ err_templ = "Can't process job {0!r} - too large numjobs"
+ raise ValueError(err_templ.format(name))
+ if runcycle is not None and len(bconf) != 0:
+ rc_ok = curr_task_time + runtime <= runcycle
+ else:
+ rc_ok = True
+ if jc + jcount <= MAX_JOBS and rc_ok:
+ runtime += curr_task_time
+ jcount += jc
+ bconf.append((name, sec))
+ continue
+ assert len(bconf) != 0
+ yield bconf
+ runtime = curr_task_time
+ jcount = jc
+ bconf = [(name, sec)]
+ if bconf != []:
+ yield bconf
+def add_job_results(jname, job_output, jconfig, res):
+ if job_output['write']['iops'] != 0:
+ raw_result = job_output['write']
+ else:
+ raw_result = job_output['read']
+ if jname not in res:
+ j_res = {}
+ j_res["action"] = jconfig["rw"]
+ j_res["direct_io"] = jconfig.get("direct", "0") == "1"
+ j_res["sync"] = jconfig.get("sync", "0") == "1"
+ j_res["concurence"] = int(jconfig.get("numjobs", 1))
+ j_res["size"] = jconfig["size"]
+ j_res["jobname"] = job_output["jobname"]
+ j_res["timings"] = (jconfig.get("runtime"),
+ jconfig.get("ramp_time"))
+ else:
+ j_res = res[jname]
+ assert j_res["action"] == jconfig["rw"]
+ assert j_res["direct_io"] == \
+ (jconfig.get("direct", "0") == "1")
+ assert j_res["sync"] == (jconfig.get("sync", "0") == "1")
+ assert j_res["concurence"] == int(jconfig.get("numjobs", 1))
+ assert j_res["size"] == jconfig["size"]
+ assert j_res["jobname"] == job_output["jobname"]
+ assert j_res["timings"] == (jconfig.get("runtime"),
+ jconfig.get("ramp_time"))
+ def j_app(name, x):
+ j_res.setdefault(name, []).append(x)
+ # 'bw_dev bw_mean bw_max bw_min'.split()
+ j_app("bw_mean", raw_result["bw_mean"])
+ j_app("iops", raw_result["iops"])
+ j_app("lat", raw_result["lat"]["mean"])
+ j_app("clat", raw_result["clat"]["mean"])
+ j_app("slat", raw_result["slat"]["mean"])
+ res[jname] = j_res
+def run_fio(benchmark_config,
+ params,
+ runcycle=None,
+ raw_results_func=None,
+ skip_tests=0):
+ whole_conf = list(parse_fio_config_full(benchmark_config, params))
+ whole_conf = whole_conf[skip_tests:]
+ res = {}
+ curr_test_num = skip_tests
+ execited_tests = 0
+ try:
+ for bconf in next_test_portion(whole_conf, runcycle):
+ res_cfg_it = do_run_fio(bconf)
+ res_cfg_it = enumerate(res_cfg_it, curr_test_num)
+ for curr_test_num, (job_output, (jname, jconfig)) in res_cfg_it:
+ execited_tests += 1
+ if raw_results_func is not None:
+ raw_results_func(curr_test_num,
+ [job_output, jname, jconfig])
+ assert jname == job_output["jobname"]
+ if jname.startswith('_'):
+ continue
+ add_job_results(jname, job_output, jconfig, res)
+ except (SystemExit, KeyboardInterrupt):
+ pass
+ except Exception:
+ traceback.print_exc()
+ return res, execited_tests
+def run_benchmark(binary_tp, *argv, **kwargs):
+ if 'fio' == binary_tp:
+ return run_fio(*argv, **kwargs)
+ raise ValueError("Unknown behcnmark {0}".format(binary_tp))
+def parse_args(argv):
+ parser = argparse.ArgumentParser(
+ description="Run fio' and return result")
+ parser.add_argument("--type", metavar="BINARY_TYPE",
+ choices=['fio'], default='fio',
+ help=argparse.SUPPRESS)
+ parser.add_argument("--start-at", metavar="START_AT_UTC", type=int,
+ help="Start execution at START_AT_UTC")
+ parser.add_argument("--json", action="store_true", default=False,
+ help="Json output format")
+ parser.add_argument("--output", default='-', metavar="FILE_PATH",
+ help="Store results to FILE_PATH")
+ parser.add_argument("--estimate", action="store_true", default=False,
+ help="Only estimate task execution time")
+ parser.add_argument("--compile", action="store_true", default=False,
+ help="Compile config file to fio config")
+ parser.add_argument("--num-tests", action="store_true", default=False,
+ help="Show total number of tests")
+ parser.add_argument("--runcycle", type=int, default=None,
+ metavar="MAX_CYCLE_SECONDS",
+ help="Max cycle length in seconds")
+ parser.add_argument("--show-raw-results", action='store_true',
+ default=False, help="Output raw input and results")
+ parser.add_argument("--skip-tests", type=int, default=0, metavar="NUM",
+ help="Skip NUM tests")
+ parser.add_argument("--params", nargs="*", metavar="PARAM=VAL",
+ default=[],
+ help="Provide set of pairs PARAM=VAL to" +
+ "format into job description")
+ parser.add_argument("jobfile")
+ return parser.parse_args(argv)
+def read_config(fd, timeout=10):
+ job_cfg = ""
+ etime = time.time() + timeout
+ while True:
+ wtime = etime - time.time()
+ if wtime <= 0:
+ raise IOError("No config provided")
+ r, w, x =[fd], [], [], wtime)
+ if len(r) == 0:
+ raise IOError("No config provided")
+ char =
+ if '' == char:
+ return job_cfg
+ job_cfg += char
+def main(argv):
+ argv_obj = parse_args(argv)
+ if argv_obj.jobfile == '-':
+ job_cfg = read_config(sys.stdin)
+ else:
+ job_cfg = open(argv_obj.jobfile).read()
+ if argv_obj.output == '-':
+ out_fd = sys.stdout
+ else:
+ out_fd = open(argv_obj.output, "w")
+ params = {}
+ for param_val in argv_obj.params:
+ assert '=' in param_val
+ name, val = param_val.split("=", 1)
+ params[name] = val
+ if argv_obj.num_tests or argv_obj.compile or argv_obj.estimate:
+ bconf = list(parse_fio_config_full(job_cfg, params))
+ bconf = bconf[argv_obj.skip_tests:]
+ if argv_obj.compile:
+ out_fd.write(format_fio_config(bconf))
+ out_fd.write("\n")
+ if argv_obj.num_tests:
+ print len(bconf)
+ if argv_obj.estimate:
+ seconds = calculate_execution_time(bconf)
+ h = seconds // 3600
+ m = (seconds % 3600) // 60
+ s = seconds % 60
+ print "{0}:{1}:{2}".format(h, m, s)
+ return 0
+ if argv_obj.start_at is not None:
+ ctime = time.time()
+ if argv_obj.start_at >= ctime:
+ time.sleep(ctime - argv_obj.start_at)
+ def raw_res_func(test_num, data):
+ pref = "========= RAW_RESULTS({0}) =========\n".format(test_num)
+ out_fd.write(pref)
+ out_fd.write(json.dumps(data))
+ out_fd.write("\n========= END OF RAW_RESULTS =========\n")
+ out_fd.flush()
+ rrfunc = raw_res_func if argv_obj.show_raw_results else None
+ stime = time.time()
+ job_res, num_tests = run_benchmark(argv_obj.type,
+ job_cfg,
+ params,
+ argv_obj.runcycle,
+ rrfunc,
+ argv_obj.skip_tests)
+ etime = time.time()
+ res = {'__meta__': {'raw_cfg': job_cfg}, 'res': job_res}
+ oformat = 'json' if argv_obj.json else 'eval'
+ out_fd.write("\nRun {} tests in {} seconds\n".format(num_tests,
+ int(etime - stime)))
+ out_fd.write("========= RESULTS(format={0}) =========\n".format(oformat))
+ if argv_obj.json:
+ out_fd.write(json.dumps(res))
+ else:
+ out_fd.write(pprint.pformat(res) + "\n")
+ out_fd.write("\n========= END OF RESULTS =========\n".format(oformat))
+ return 0
+if __name__ == '__main__':
+ exit(main(sys.argv[1:]))
diff --git a/tests/fio_configs/1.cfg b/tests/fio_configs/1.cfg
new file mode 100644
index 0000000..d5240cd
--- /dev/null
+++ b/tests/fio_configs/1.cfg
@@ -0,0 +1,100 @@
+[writetest_10 * 55]
+[writetest_20 * 55]
+[writetest_30 * 55]
+[writetest_120 * 55]
+[writetest_30_5 * 55]
+[writetest_30_10 * 55]
+[writetest_30_15 * 55]
diff --git a/tests/fio_configs/2.cfg b/tests/fio_configs/2.cfg
new file mode 100644
index 0000000..050e477
--- /dev/null
+++ b/tests/fio_configs/2.cfg
@@ -0,0 +1,13 @@
+[writetest_10_20 * 3]
diff --git a/tests/ b/tests/
deleted file mode 100644
index 2405f53..0000000
--- a/tests/
+++ /dev/null
@@ -1,97 +0,0 @@
-import sys
-import time
-import json
-import select
-import pprint
-import argparse
-import subprocess
-from StringIO import StringIO
-from ConfigParser import RawConfigParser
-def run_fio(benchmark_config):
- cmd = ["fio", "--output-format=json", "-"]
- p = subprocess.Popen(cmd, stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT)
- raw_out, _ = p.communicate(benchmark_config)
- job_output = json.loads(raw_out)["jobs"][0]
- if job_output['write']['iops'] != 0:
- raw_result = job_output['write']
- else:
- raw_result = job_output['read']
- res = {}
- # 'bw_dev bw_mean bw_max bw_min'.split()
- for field in ["bw_mean", "iops"]:
- res[field] = raw_result[field]
- res["lat"] = raw_result["lat"]["mean"]
- res["clat"] = raw_result["clat"]["mean"]
- res["slat"] = raw_result["slat"]["mean"]
- res["util"] = json.loads(raw_out)["disk_util"][0]
- res["util"] = dict((str(k), v) for k, v in res["util"].items())
- return res
-def run_benchmark(binary_tp, *argv, **kwargs):
- if 'fio' == binary_tp:
- return run_fio(*argv, **kwargs)
- raise ValueError("Unknown behcnmark {0}".format(binary_tp))
-def parse_args(argv):
- parser = argparse.ArgumentParser(
- description="Run fio' and return result")
- parser.add_argument(
- "--type", metavar="BINARY_TYPE",
- choices=['fio'], required=True)
- parser.add_argument("--start-at", metavar="START_TIME", type=int)
- parser.add_argument("--json", action="store_true", default=False)
- parser.add_argument("jobfile")
- return parser.parse_args(argv)
-def main(argv):
- argv_obj = parse_args(argv)
- if argv_obj.jobfile == '-':
- job_cfg = ""
- dtime = 10
- while True:
- r, w, x =[sys.stdin], [], [], dtime)
- if len(r) == 0:
- raise IOError("No config provided")
- char =
- if '' == char:
- break
- job_cfg += char
- dtime = 1
- else:
- job_cfg = open(argv_obj.jobfile).read()
- rcp = RawConfigParser()
- rcp.readfp(StringIO(job_cfg))
- assert len(rcp.sections()) == 1
- if argv_obj.start_at is not None:
- ctime = time.time()
- if argv_obj.start_at >= ctime:
- time.sleep(ctime - argv_obj.start_at)
- res = run_benchmark(argv_obj.type, job_cfg)
- res['__meta__'] = dict(rcp.items(rcp.sections()[0]))
- res['__meta__']['raw'] = job_cfg
- if argv_obj.json:
- sys.stdout.write(json.dumps(res))
- else:
- sys.stdout.write(pprint.pformat(res))
- sys.stdout.write("\n")
- return 0
-if __name__ == '__main__':
- exit(main(sys.argv[1:]))
diff --git a/tests/io_scenario_check_assumptions.cfg b/tests/io_scenario_check_assumptions.cfg
new file mode 100644
index 0000000..25d99bc
--- /dev/null
+++ b/tests/io_scenario_check_assumptions.cfg
@@ -0,0 +1,59 @@
+# ---------------------------------------------------------------------
+# check test time, no warmup time. iops = func(rune_time)
+# ---------------------------------------------------------------------
+[runtime_test_wo_wu_{TEST_SUMM}_{UNIQ} * {NUM_ROUNDS}]
+blocksize={% 4k, 1m %}
+runtime={% 10, 15, 20, 30, 60, 120 %}
+# ---------------------------------------------------------------------
+# check test time, with warmup time. iops = func(rune_time)
+# ---------------------------------------------------------------------
+[runtime_test_w_wu_{TEST_SUMM}_{UNIQ} * {NUM_ROUNDS}]
+blocksize={% 4k, 1m %}
+ramp_time={% 5, 10, 15 %}
+runtime={% 15, 30 %}
+# ---------------------------------------------------------------------
+# check read and write linearity. oper_time = func(size)
+# ---------------------------------------------------------------------
+[linearity_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize={% 512, 1k, 4k, 8k, 16k, 32k, 64k, 128k, 256k, 512k, 1m, 2m, 4m %}
+rw={% randwrite, randread %}
+# ---------------------------------------------------------------------
+# check sync write linearity. oper_time = func(size)
+# check sync BW as well
+# ---------------------------------------------------------------------
+[linearity_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize={% 512, 1k, 4k, 8k, 16k, 32k, 64k, 128k, 256k, 512k, 1m, 2m, 4m %}
+# ---------------------------------------------------------------------
+# check different thread count. (latency, bw) = func(th_count)
+# ---------------------------------------------------------------------
+[concurrence_test_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize={% 4k, 1m %}
+numjobs={% 1, 2, 5, 10, 15, 20, 25, 30, 35, 40 %}
diff --git a/tests/io_task.cfg b/tests/io_task.cfg
new file mode 100644
index 0000000..cc3d07d
--- /dev/null
+++ b/tests/io_task.cfg
@@ -0,0 +1,10 @@
diff --git a/tests/io_task_test.cfg b/tests/io_task_test.cfg
new file mode 100644
index 0000000..a319fa0
--- /dev/null
+++ b/tests/io_task_test.cfg
@@ -0,0 +1,24 @@
+# [__warmup]
+# blocksize=4k
+# filename=/tmp/xxx.bin
+# rw=randwrite
+# direct=1
+# buffered=0
+# iodepth=1
+# size=1Gb
+# runtime=5
+# time_based
+[writetest * 3]
diff --git a/tests/ b/tests/
new file mode 100644
index 0000000..e7fd3eb
--- /dev/null
+++ b/tests/
@@ -0,0 +1,155 @@
+import re
+import abc
+import json
+import os.path
+import logging
+from StringIO import StringIO
+from ConfigParser import RawConfigParser
+from tests import disk_test_agent
+from ssh_utils import copy_paths
+from utils import run_over_ssh, ssize_to_b
+logger = logging.getLogger("io-perf-tool")
+class IPerfTest(object):
+ def __init__(self, on_result_cb):
+ self.on_result_cb = on_result_cb
+ def pre_run(self, conn):
+ pass
+ @abc.abstractmethod
+ def run(self, conn, barrier):
+ pass
+class TwoScriptTest(IPerfTest):
+ def __init__(self, opts, testtool, on_result_cb, keep_tmp_files):
+ super(TwoScriptTest, self).__init__(on_result_cb)
+ self.opts = opts
+ self.pre_run_script = None
+ self.run_script = None
+ self.tmp_dir = "/tmp/"
+ self.set_run_script()
+ self.set_pre_run_script()
+ def set_run_script(self):
+ self.pre_run_script = self.opts.pre_run_script
+ def set_pre_run_script(self):
+ self.run_script = self.opts.run_script
+ def get_remote_for_script(self, script):
+ return os.path.join(self.tmp_dir, script.rpartition('/')[2])
+ def copy_script(self, conn, src):
+ remote_path = self.get_remote_for_script(src)
+ copy_paths(conn, {src: remote_path})
+ return remote_path
+ def pre_run(self, conn):
+ remote_script = self.copy_script(conn, self.pre_run_script)
+ cmd = remote_script
+ code, out_err = run_over_ssh(conn, cmd)
+ if code != 0:
+ raise Exception("Pre run failed. %s" % out_err)
+ def run(self, conn, barrier):
+ remote_script = self.copy_script(conn, self.run_script)
+ cmd = remote_script + ' ' + ' '.join(self.opts)
+ code, out_err = run_over_ssh(conn, cmd)
+ self.on_result(code, out_err, cmd)
+ def parse_results(self, out):
+ for line in out.split("\n"):
+ key, separator, value = line.partition(":")
+ if key and value:
+ self.on_result_cb((key, float(value)))
+ def on_result(self, code, out_err, cmd):
+ if 0 == code:
+ try:
+ self.parse_results(out_err)
+ except Exception as exc:
+ msg_templ = "Error during postprocessing results: {0!r}"
+ raise RuntimeError(msg_templ.format(exc.message))
+ else:
+ templ = "Command {0!r} failed with code {1}. Error output is:\n{2}"
+ logger.error(templ.format(cmd, code, out_err))
+class PgBenchTest(TwoScriptTest):
+ def set_run_script(self):
+ self.pre_run_script = "hl_tests/postgres/"
+ def set_pre_run_script(self):
+ self.run_script = "hl_tests/postgres/"
+class IOPerfTest(IPerfTest):
+ io_py_remote = "/tmp/"
+ def __init__(self,
+ test_options,
+ on_result_cb):
+ IPerfTest.__init__(self, on_result_cb)
+ self.options = test_options
+ self.config_fname = test_options['cfg']
+ self.config_params = test_options.get('params', {})
+ self.tool = test_options.get('tool', 'fio')
+ self.raw_cfg = open(self.config_fname).read()
+ parse_func = disk_test_agent.parse_fio_config_full
+ self.configs = parse_func(self.raw_cfg, self.config_params)
+ def pre_run(self, conn):
+ # TODO: install fio, if not installed
+ run_over_ssh(conn, "apt-get -y install fio")
+ local_fname = disk_test_agent.__file__.rsplit('.')[0] + ".py"
+ self.files_to_copy = {local_fname: self.io_py_remote}
+ copy_paths(conn, self.files_to_copy)
+ cmd_templ = "dd if=/dev/zero of={0} bs={1} count={2}"
+ for secname, params in self.configs:
+ sz = ssize_to_b(params['size'])
+ msz = msz = sz / (1024 ** 2)
+ if sz % (1024 ** 2) != 0:
+ msz += 1
+ cmd = cmd_templ.format(params['filename'], 1024 ** 2, msz)
+ code, out_err = run_over_ssh(conn, cmd)
+ if code != 0:
+ raise RuntimeError("Preparation failed " + out_err)
+ def run(self, conn, barrier):
+ cmd_templ = "env python2 {0} --type {1} --json -"
+ cmd = cmd_templ.format(self.io_py_remote, self.tool)
+ logger.debug("Run {0}".format(cmd))
+ try:
+ barrier.wait()
+ code, out_err = run_over_ssh(conn, cmd, stdin_data=self.raw_cfg)
+ self.on_result(code, out_err, cmd)
+ finally:
+ barrier.exit()
+ def on_result(self, code, out_err, cmd):
+ if 0 == code:
+ try:
+ start_patt = r"(?ims)=+\s+RESULTS\(format=json\)\s+=+"
+ end_patt = r"(?ims)=+\s+END OF RESULTS\s+=+"
+ for block in re.split(start_patt, out_err)[1:]:
+ data, garbage = re.split(end_patt, block)
+ self.on_result_cb(json.loads(data.strip()))
+ except Exception as exc:
+ msg_templ = "Error during postprocessing results: {0!r}"
+ raise RuntimeError(msg_templ.format(exc.message))
+ else:
+ templ = "Command {0!r} failed with code {1}. Output is:\n{2}"
+ logger.error(templ.format(cmd, code, out_err))