a lot of chenges
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index 36d3fcf..09e93f0 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -1,35 +1,47 @@
import abc
-import time
-import socket
-import random
import os.path
-import logging
-import datetime
-
-from paramiko import SSHException, SFTPError
-import texttable
-
-from wally.utils import (ssize2b, open_for_append_or_create,
- sec_to_str, StopTestError)
-
-from wally.ssh_utils import (copy_paths, run_over_ssh,
- save_to_remote,
- # delete_file,
- connect, read_from_remote, Local,
- exists)
-
-from . import postgres
-from . import mysql
-from .io import agent as io_agent
-from .io import formatter as io_formatter
-from .io.results_loader import parse_output
-logger = logging.getLogger("wally")
+from wally.ssh_utils import run_over_ssh, copy_paths
+
+
+class TestResults(object):
+ def __init__(self, config, params, results,
+ raw_result, run_interval, vm_count):
+ self.config = config
+ self.params = params
+ self.results = results
+ self.raw_result = raw_result
+ self.run_interval = run_interval
+ self.vm_count = vm_count
+
+ def __str__(self):
+ res = "{0}({1}):\n results:\n".format(
+ self.__class__.__name__,
+ self.summary())
+
+ for name, val in self.results.items():
+ res += " {0}={1}\n".format(name, val)
+
+ res += " params:\n"
+
+ for name, val in self.params.items():
+ res += " {0}={1}\n".format(name, val)
+
+ return res
+
+ @abc.abstractmethod
+ def summary(self):
+ pass
+
+ @abc.abstractmethod
+ def get_yamable(self):
+ pass
class IPerfTest(object):
def __init__(self, options, is_primary, on_result_cb, test_uuid, node,
+ total_nodes_count,
log_directory=None,
coordination_queue=None,
remote_dir="/tmp/wally"):
@@ -42,6 +54,7 @@
self.remote_dir = remote_dir
self.is_primary = is_primary
self.stop_requested = False
+ self.total_nodes_count = total_nodes_count
def request_stop(self):
self.stop_requested = True
@@ -59,6 +72,11 @@
def cleanup(self):
pass
+ @classmethod
+ @abc.abstractmethod
+ def load(cls, data):
+ pass
+
@abc.abstractmethod
def run(self, barrier):
pass
@@ -118,470 +136,3 @@
def merge_results(self, results):
tpcm = sum([val[1] for val in results])
return {"res": {"TpmC": tpcm}}
-
-
-class PgBenchTest(TwoScriptTest):
- root = os.path.dirname(postgres.__file__)
- pre_run_script = os.path.join(root, "prepare.sh")
- run_script = os.path.join(root, "run.sh")
-
- @classmethod
- def format_for_console(cls, data):
- tab = texttable.Texttable(max_width=120)
- tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
- tab.header(["TpmC"])
- tab.add_row([data['res']['TpmC']])
- return tab.draw()
-
-
-class MysqlTest(TwoScriptTest):
- root = os.path.dirname(mysql.__file__)
- pre_run_script = os.path.join(root, "prepare.sh")
- run_script = os.path.join(root, "run.sh")
-
- @classmethod
- def format_for_console(cls, data):
- tab = texttable.Texttable(max_width=120)
- tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
- tab.header(["TpmC"])
- tab.add_row([data['res']['TpmC']])
- return tab.draw()
-
-
-class IOPerfTest(IPerfTest):
- tcp_conn_timeout = 30
- max_pig_timeout = 5
- soft_runcycle = 5 * 60
-
- def __init__(self, *dt, **mp):
- IPerfTest.__init__(self, *dt, **mp)
- self.config_fname = self.options['cfg']
-
- if '/' not in self.config_fname and '.' not in self.config_fname:
- cfgs_dir = os.path.dirname(io_agent.__file__)
- self.config_fname = os.path.join(cfgs_dir,
- self.config_fname + '.cfg')
-
- self.alive_check_interval = self.options.get('alive_check_interval')
-
- self.config_params = {}
- for name, val in self.options.get('params', {}).items():
- if isinstance(val, (list, tuple)):
- val = "{%" + ','.join(map(str, val)) + "%}"
- self.config_params[name] = val
-
- self.config_params['VM_COUNT'] = self.options['testnodes_count']
- self.tool = self.options.get('tool', 'fio')
- self.raw_cfg = open(self.config_fname).read()
- self.configs = list(io_agent.parse_all_in_1(self.raw_cfg,
- self.config_params))
-
- cmd_log = os.path.join(self.log_directory, "task_compiled.cfg")
- raw_res = os.path.join(self.log_directory, "raw_results.txt")
-
- self.io_py_remote = self.join_remote("agent.py")
- self.log_fl = self.join_remote("log.txt")
- self.pid_file = self.join_remote("pid")
- self.task_file = self.join_remote("task.cfg")
- self.use_sudo = self.options.get("use_sudo", True)
- self.test_logging = self.options.get("test_logging", False)
-
- fio_command_file = open_for_append_or_create(cmd_log)
-
- if self.test_logging:
- soft_runcycle = self.soft_runcycle
- else:
- soft_runcycle = None
-
- self.fio_configs = io_agent.parse_and_slice_all_in_1(
- self.raw_cfg,
- self.config_params,
- soft_runcycle=soft_runcycle,
- split_on_names=self.test_logging)
-
- self.fio_configs = list(self.fio_configs)
- splitter = "\n\n" + "-" * 60 + "\n\n"
-
- cfg = splitter.join(
- map(io_agent.fio_config_to_str,
- self.fio_configs))
-
- fio_command_file.write(cfg)
- self.fio_raw_results_file = open_for_append_or_create(raw_res)
-
- def __str__(self):
- return "{0}({1})".format(self.__class__.__name__,
- self.node.get_conn_id())
-
- def cleanup(self):
- # delete_file(conn, self.io_py_remote)
- # Need to remove tempo files, used for testing
- pass
-
- def prefill_test_files(self):
- files = {}
-
- for section in self.configs:
- sz = ssize2b(section.vals['size'])
- msz = sz / (1024 ** 2)
-
- if sz % (1024 ** 2) != 0:
- msz += 1
-
- fname = section.vals['filename']
-
- # if already has other test with the same file name
- # take largest size
- files[fname] = max(files.get(fname, 0), msz)
-
- cmd_templ = "dd oflag=direct " + \
- "if=/dev/zero of={0} bs={1} count={2}"
-
- # cmd_templ = "fio --rw=write --bs={1} --direct=1 --size={2} "
-
- if self.use_sudo:
- cmd_templ = "sudo " + cmd_templ
-
- ssize = 0
- stime = time.time()
-
- for fname, curr_sz in files.items():
- cmd = cmd_templ.format(fname, 1024 ** 2, curr_sz)
- ssize += curr_sz
- self.run_over_ssh(cmd, timeout=curr_sz)
-
- ddtime = time.time() - stime
- if ddtime > 1E-3:
- fill_bw = int(ssize / ddtime)
- mess = "Initiall dd fill bw is {0} MiBps for this vm"
- logger.info(mess.format(fill_bw))
- self.coordinate(('init_bw', fill_bw))
-
- def install_utils(self, max_retry=3, timeout=5):
- need_install = []
- for bin_name, package in (('fio', 'fio'), ('screen', 'screen')):
- try:
- self.run_over_ssh('which ' + bin_name, nolog=True)
- except OSError:
- need_install.append(package)
-
- if len(need_install) == 0:
- return
-
- cmd = "sudo apt-get -y install " + " ".join(need_install)
-
- for i in range(max_retry):
- try:
- self.run_over_ssh(cmd)
- break
- except OSError as err:
- time.sleep(timeout)
- else:
- raise OSError("Can't install - " + str(err))
-
- def pre_run(self):
- try:
- cmd = 'mkdir -p "{0}"'.format(self.remote_dir)
- if self.use_sudo:
- cmd = "sudo " + cmd
- cmd += " ; sudo chown {0} {1}".format(self.node.get_user(),
- self.remote_dir)
-
- self.run_over_ssh(cmd)
- except Exception as exc:
- msg = "Failed to create folder {0} on remote {1}. Error: {2!s}"
- msg = msg.format(self.remote_dir, self.node.get_conn_id(), exc)
- logger.exception(msg)
- raise StopTestError(msg, exc)
-
- self.install_utils()
-
- local_fname = os.path.splitext(io_agent.__file__)[0] + ".py"
- files_to_copy = {local_fname: self.io_py_remote}
- copy_paths(self.node.connection, files_to_copy)
-
- if self.options.get('prefill_files', True):
- self.prefill_test_files()
- elif self.is_primary:
- logger.warning("Prefilling of test files is disabled")
-
- def check_process_is_running(self, sftp, pid):
- try:
- sftp.stat("/proc/{0}".format(pid))
- return True
- except (OSError, IOError, NameError):
- return False
-
- def kill_remote_process(self, conn, pid, soft=True):
- try:
- if soft:
- cmd = "kill {0}"
- else:
- cmd = "kill -9 {0}"
-
- if self.use_sudo:
- cmd = "sudo " + cmd
-
- self.run_over_ssh(cmd.format(pid))
- return True
- except OSError:
- return False
-
- def get_test_status(self, res_file=None):
- found_res_file = False
- is_connected = None
- is_running = None
- pid = None
- err = None
-
- try:
- # conn = connect(self.node.conn_url,
- # conn_timeout=self.tcp_conn_timeout)
- # with conn:
- conn = self.node.connection
- with conn.open_sftp() as sftp:
- try:
- pid = read_from_remote(sftp, self.pid_file)
- is_running = True
- except (NameError, IOError, OSError) as exc:
- pid = None
- is_running = False
-
- if is_running:
- if not self.check_process_is_running(sftp, pid):
- try:
- sftp.remove(self.pid_file)
- except (IOError, NameError, OSError):
- pass
- is_running = False
-
- if res_file is not None:
- found_res_file = exists(sftp, res_file)
-
- is_connected = True
-
- except (socket.error, SSHException, EOFError, SFTPError) as exc:
- err = str(exc)
- is_connected = False
-
- return found_res_file, is_connected, is_running, pid, err
-
- def wait_till_finished(self, soft_timeout, timeout, res_fname=None):
- conn_id = self.node.get_conn_id()
- end_of_wait_time = timeout + time.time()
- soft_end_of_wait_time = soft_timeout + time.time()
-
- time_till_check = random.randint(5, 10)
- pid = None
- is_running = False
- pid_get_timeout = self.max_pig_timeout + time.time()
- curr_connected = True
-
- while end_of_wait_time > time.time():
- time.sleep(time_till_check)
-
- found_res_file, is_connected, is_running, npid, err = \
- self.get_test_status(res_fname)
-
- if found_res_file and not is_running:
- return
-
- if is_connected and not is_running:
- if pid is None:
- if time.time() > pid_get_timeout:
- msg = ("On node {0} pid file doesn't " +
- "appears in time")
- logger.error(msg.format(conn_id))
- raise StopTestError("Start timeout")
- else:
- # execution finished
- break
-
- if npid is not None:
- pid = npid
-
- if is_connected and pid is not None and is_running:
- if time.time() < soft_end_of_wait_time:
- time.sleep(soft_end_of_wait_time - time.time())
-
- if is_connected and not curr_connected:
- msg = "Connection with {0} is restored"
- logger.debug(msg.format(conn_id))
- elif not is_connected and curr_connected:
- msg = "Lost connection with " + conn_id + ". Error: " + err
- logger.debug(msg)
-
- curr_connected = is_connected
-
- def run(self, barrier):
- try:
- if len(self.fio_configs) > 1 and self.is_primary:
-
- exec_time = 0
- for test in self.fio_configs:
- exec_time += io_agent.calculate_execution_time(test)
-
- # +5% - is a rough estimation for additional operations
- # like sftp, etc
- exec_time = int(exec_time * 1.05)
-
- exec_time_s = sec_to_str(exec_time)
- now_dt = datetime.datetime.now()
- end_dt = now_dt + datetime.timedelta(0, exec_time)
- msg = "Entire test should takes aroud: {0} and finished at {1}"
- logger.info(msg.format(exec_time_s,
- end_dt.strftime("%H:%M:%S")))
-
- for pos, fio_cfg_slice in enumerate(self.fio_configs):
- names = [i.name for i in fio_cfg_slice]
- msgs = []
- already_processed = set()
- for name in names:
- if name not in already_processed:
- already_processed.add(name)
-
- if 1 == names.count(name):
- msgs.append(name)
- else:
- frmt = "{0} * {1}"
- msgs.append(frmt.format(name,
- names.count(name)))
-
- if self.is_primary:
- logger.info("Will run tests: " + ", ".join(msgs))
-
- nolog = (pos != 0) or not self.is_primary
- out_err = self.do_run(barrier, fio_cfg_slice, nolog=nolog)
-
- try:
- for data in parse_output(out_err):
- self.on_result_cb(data)
- except (OSError, StopTestError):
- raise
- except Exception as exc:
- msg_templ = "Error during postprocessing results: {0!s}"
- raise RuntimeError(msg_templ.format(exc))
-
- finally:
- barrier.exit()
-
- def do_run(self, barrier, cfg, nolog=False):
- conn_id = self.node.get_conn_id()
-
- cmd_templ = "screen -S {screen_name} -d -m " + \
- "env python2 {0} -p {pid_file} -o {results_file} " + \
- "--type {1} {2} --json {3}"
-
- if self.options.get("use_sudo", True):
- cmd_templ = "sudo " + cmd_templ
-
- params = []
- for k, v in self.config_params.items():
- if isinstance(v, basestring) and v.startswith("{%"):
- continue
- params.append("{0}={1}".format(k, v))
-
- if [] != params:
- params = "--params " + " ".join(params)
-
- with self.node.connection.open_sftp() as sftp:
- save_to_remote(sftp, self.task_file,
- io_agent.fio_config_to_str(cfg))
-
- screen_name = self.test_uuid
- cmd = cmd_templ.format(self.io_py_remote,
- self.tool,
- params,
- self.task_file,
- pid_file=self.pid_file,
- results_file=self.log_fl,
- screen_name=screen_name)
-
- exec_time = io_agent.calculate_execution_time(cfg)
- exec_time_str = sec_to_str(exec_time)
-
- timeout = int(exec_time + max(300, exec_time))
- soft_tout = exec_time
- barrier.wait()
- self.run_over_ssh(cmd, nolog=nolog)
- if self.is_primary:
- templ = "Test should takes about {0}." + \
- " Should finish at {1}," + \
- " will wait at most till {2}"
- now_dt = datetime.datetime.now()
- end_dt = now_dt + datetime.timedelta(0, exec_time)
- wait_till = now_dt + datetime.timedelta(0, timeout)
-
- logger.info(templ.format(exec_time_str,
- end_dt.strftime("%H:%M:%S"),
- wait_till.strftime("%H:%M:%S")))
-
- if not nolog:
- msg = "Tests started in screen {1} on each testnode"
- logger.debug(msg.format(conn_id, screen_name))
-
- # TODO: add monitoring socket
- # if not isinstance(self.node.connection, Local):
- # self.node.connection.close()
-
- self.wait_till_finished(soft_tout, timeout, self.log_fl)
- if not nolog:
- logger.debug("Test on node {0} is finished".format(conn_id))
-
- # if self.node.connection is not Local:
- # conn_timeout = self.tcp_conn_timeout * 3
- # self.node.connection = connect(self.node.conn_url,
- # conn_timeout=conn_timeout)
-
- with self.node.connection.open_sftp() as sftp:
- return read_from_remote(sftp, self.log_fl)
-
- @classmethod
- def merge_results(cls, results):
- merged = results[0]
- for block in results[1:]:
- assert block["__meta__"] == merged["__meta__"]
- merged['res'].extend(block['res'])
- return merged
-
- # @classmethod
- # def merge_results(cls, results):
- # if len(results) == 0:
- # return None
-
- # merged_result = results[0]
- # merged_data = merged_result['res']
- # mergable_fields = ['bw', 'clat', 'iops', 'lat', 'slat']
-
- # for res in results[1:]:
- # mm = merged_result['__meta__']
- # assert mm['raw_cfg'] == res['__meta__']['raw_cfg']
- # assert mm['params'] == res['__meta__']['params']
- # mm['timings'].extend(res['__meta__']['timings'])
-
- # data = res['res']
- # for testname, test_data in data.items():
- # if testname not in merged_data:
- # merged_data[testname] = test_data
- # continue
-
- # res_test_data = merged_data[testname]
-
- # diff = set(test_data.keys()).symmetric_difference(
- # res_test_data.keys())
-
- # msg = "Difference: {0}".format(",".join(diff))
- # assert len(diff) == 0, msg
-
- # for k, v in test_data.items():
- # if k in mergable_fields:
- # res_test_data[k].extend(v)
- # else:
- # msg = "{0!r} != {1!r}".format(res_test_data[k], v)
- # assert res_test_data[k] == v, msg
-
- # return merged_result
-
- @classmethod
- def format_for_console(cls, data, dinfo):
- return io_formatter.format_results_for_console(dinfo)