| import abc |
| import json |
| import os.path |
| import logging |
| from StringIO import StringIO |
| from ConfigParser import RawConfigParser |
| |
| from tests import io |
| from ssh_utils import copy_paths |
| from utils import run_over_ssh, ssize_to_b |
| |
| |
| logger = logging.getLogger("io-perf-tool") |
| |
| |
| class IPerfTest(object): |
| def __init__(self, on_result_cb): |
| self.on_result_cb = on_result_cb |
| |
| def pre_run(self, conn): |
| pass |
| |
| @abc.abstractmethod |
| def run(self, conn, barrier): |
| pass |
| |
| |
| class TwoScriptTest(IPerfTest): |
| def __init__(self, opts, testtool, on_result_cb, keep_tmp_files): |
| super(TwoScriptTest, self).__init__(on_result_cb) |
| self.opts = opts |
| self.pre_run_script = None |
| self.run_script = None |
| self.tmp_dir = "/tmp/" |
| self.set_run_script() |
| self.set_pre_run_script() |
| |
| def set_run_script(self): |
| self.pre_run_script = self.opts.pre_run_script |
| |
| def set_pre_run_script(self): |
| self.run_script = self.opts.run_script |
| |
| def get_remote_for_script(self, script): |
| return os.path.join(self.tmp_dir, script.rpartition('/')[2]) |
| |
| def copy_script(self, conn, src): |
| remote_path = self.get_remote_for_script(src) |
| copy_paths(conn, {src: remote_path}) |
| return remote_path |
| |
| def pre_run(self, conn): |
| remote_script = self.copy_script(conn, self.pre_run_script) |
| cmd = remote_script |
| code, out_err = run_over_ssh(conn, cmd) |
| if code != 0: |
| raise Exception("Pre run failed. %s" % out_err) |
| |
| def run(self, conn, barrier): |
| remote_script = self.copy_script(conn, self.run_script) |
| cmd = remote_script + ' ' + ' '.join(self.opts) |
| code, out_err = run_over_ssh(conn, cmd) |
| self.on_result(code, out_err, cmd) |
| |
| def parse_results(self, out): |
| for line in out.split("\n"): |
| key, separator, value = line.partition(":") |
| if key and value: |
| self.on_result_cb((key, float(value))) |
| |
| def on_result(self, code, out_err, cmd): |
| if 0 == code: |
| try: |
| self.parse_results(out_err) |
| except Exception as exc: |
| msg_templ = "Error during postprocessing results: {0!r}" |
| raise RuntimeError(msg_templ.format(exc.message)) |
| else: |
| templ = "Command {0!r} failed with code {1}. Error output is:\n{2}" |
| logger.error(templ.format(cmd, code, out_err)) |
| |
| |
| class PgBenchTest(TwoScriptTest): |
| |
| def set_run_script(self): |
| self.pre_run_script = "hl_tests/postgres/prepare.sh" |
| |
| def set_pre_run_script(self): |
| self.run_script = "hl_tests/postgres/run.sh" |
| |
| |
| class IOPerfTest(IPerfTest): |
| io_py_remote = "/tmp/io.py" |
| |
| def __init__(self, |
| test_options, |
| on_result_cb): |
| IPerfTest.__init__(self, on_result_cb) |
| self.options = test_options |
| self.config_fname = test_options['config_file'] |
| self.tool = test_options['tool'] |
| self.configs = [] |
| |
| cp = RawConfigParser() |
| cp.readfp(open(self.config_fname)) |
| |
| for secname in cp.sections(): |
| params = dict(cp.items(secname)) |
| self.configs.append((secname, params)) |
| |
| def pre_run(self, conn): |
| local_fname = io.__file__.rsplit('.')[0] + ".py" |
| self.files_to_copy = {local_fname: self.io_py_remote} |
| copy_paths(conn, self.files_to_copy) |
| |
| cmd_templ = "dd if=/dev/zero of={0} bs={1} count={2}" |
| for secname, params in self.configs: |
| sz = ssize_to_b(params['size']) |
| msz = msz = sz / (1024 ** 2) |
| if sz % (1024 ** 2) != 0: |
| msz += 1 |
| |
| cmd = cmd_templ.format(params['filename'], 1024 ** 2, msz) |
| code, out_err = run_over_ssh(conn, cmd) |
| |
| if code != 0: |
| raise RuntimeError("Preparation failed " + out_err) |
| |
| def run(self, conn, barrier): |
| cmd_templ = "env python2 {0} --type {1} --json -" |
| cmd = cmd_templ.format(self.io_py_remote, self.tool) |
| try: |
| for secname, _params in self.configs: |
| params = _params.copy() |
| count = params.pop('count', 1) |
| |
| config = RawConfigParser() |
| config.add_section(secname) |
| |
| for k, v in params.items(): |
| config.set(secname, k, v) |
| |
| cfg = StringIO() |
| config.write(cfg) |
| |
| # FIX python config parser-fio incompatibility |
| # remove spaces around '=' |
| new_cfg = [] |
| config_data = cfg.getvalue() |
| for line in config_data.split("\n"): |
| if '=' in line: |
| name, val = line.split('=', 1) |
| name = name.strip() |
| val = val.strip() |
| line = "{0}={1}".format(name, val) |
| new_cfg.append(line) |
| |
| for _ in range(count): |
| barrier.wait() |
| code, out_err = run_over_ssh(conn, cmd, |
| stdin_data="\n".join(new_cfg)) |
| self.on_result(code, out_err, cmd) |
| finally: |
| barrier.exit() |
| |
| def on_result(self, code, out_err, cmd): |
| if 0 == code: |
| try: |
| for line in out_err.split("\n"): |
| if line.strip() != "": |
| self.on_result_cb(json.loads(line)) |
| except Exception as exc: |
| msg_templ = "Error during postprocessing results: {0!r}" |
| raise RuntimeError(msg_templ.format(exc.message)) |
| else: |
| templ = "Command {0!r} failed with code {1}. Output is:\n{2}" |
| logger.error(templ.format(cmd, code, out_err)) |