run tests on nodes in offline mode
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index 0f8afd4..5166fdd 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -1,12 +1,17 @@
import abc
import time
+import random
import os.path
import logging
import datetime
-from wally.ssh_utils import copy_paths, run_over_ssh, delete_file
from wally.utils import ssize_to_b, open_for_append_or_create, sec_to_str
+from wally.ssh_utils import (copy_paths, run_over_ssh,
+ save_to_remote, ssh_mkdir,
+ # delete_file,
+ connect, read_from_remote, Local)
+
from . import postgres
from .io import agent as io_agent
from .io import formatter as io_formatter
@@ -17,19 +22,20 @@
class IPerfTest(object):
- def __init__(self, on_result_cb, log_directory=None, node=None):
+ def __init__(self, on_result_cb, test_uuid, node, log_directory=None):
self.on_result_cb = on_result_cb
self.log_directory = log_directory
self.node = node
+ self.test_uuid = test_uuid
- def pre_run(self, conn):
+ def pre_run(self):
pass
- def cleanup(self, conn):
+ def cleanup(self):
pass
@abc.abstractmethod
- def run(self, conn, barrier):
+ def run(self, barrier):
pass
@classmethod
@@ -37,12 +43,16 @@
msg = "{0}.format_for_console".format(cls.__name__)
raise NotImplementedError(msg)
+ def run_over_ssh(self, cmd, **kwargs):
+ return run_over_ssh(self.node.connection, cmd,
+ node=self.node.get_conn_id(), **kwargs)
+
class TwoScriptTest(IPerfTest):
remote_tmp_dir = '/tmp'
- def __init__(self, opts, on_result_cb, log_directory=None, node=None):
- IPerfTest.__init__(self, on_result_cb, log_directory, node=node)
+ def __init__(self, opts, *dt, **mp):
+ IPerfTest.__init__(self, *dt, **mp)
self.opts = opts
if 'run_script' in self.opts:
@@ -52,22 +62,23 @@
def get_remote_for_script(self, script):
return os.path.join(self.tmp_dir, script.rpartition('/')[2])
- def copy_script(self, conn, src):
+ def copy_script(self, src):
remote_path = self.get_remote_for_script(src)
- copy_paths(conn, {src: remote_path})
+ copy_paths(self.node.connection, {src: remote_path})
return remote_path
- def pre_run(self, conn):
- remote_script = self.copy_script(conn, self.pre_run_script)
+ def pre_run(self):
+ remote_script = self.copy_script(self.node.connection,
+ self.pre_run_script)
cmd = remote_script
- run_over_ssh(conn, cmd, node=self.node)
+ self.run_over_ssh(cmd)
- def run(self, conn, barrier):
- remote_script = self.copy_script(conn, self.run_script)
+ def run(self, barrier):
+ remote_script = self.copy_script(self.node.connection, self.run_script)
cmd_opts = ' '.join(["%s %s" % (key, val) for key, val
in self.opts.items()])
cmd = remote_script + ' ' + cmd_opts
- out_err = run_over_ssh(conn, cmd, node=self.node)
+ out_err = self.run_over_ssh(cmd)
self.on_result(out_err, cmd)
def parse_results(self, out):
@@ -91,11 +102,13 @@
class IOPerfTest(IPerfTest):
- io_py_remote = "/tmp/disk_test_agent.py"
+ io_py_remote_templ = "/tmp/test/{0}/io/disk_test_agent.py"
+ log_fl_templ = "/tmp/test/{0}/io/disk_test_agent_log.txt"
+ pid_file_templ = "/tmp/test/{0}/io/disk_test_agent_pid_file"
+ task_file_templ = "/tmp/test/{0}/io/io_task.cfg"
- def __init__(self, test_options, on_result_cb,
- log_directory=None, node=None):
- IPerfTest.__init__(self, on_result_cb, log_directory, node=node)
+ def __init__(self, test_options, *dt, **mp):
+ IPerfTest.__init__(self, *dt, **mp)
self.options = test_options
self.config_fname = test_options['cfg']
self.alive_check_interval = test_options.get('alive_check_interval')
@@ -108,6 +121,11 @@
cmd_log = os.path.join(self.log_directory, "task_compiled.cfg")
raw_res = os.path.join(self.log_directory, "raw_results.txt")
+ self.io_py_remote = self.io_py_remote_templ.format(self.test_uuid)
+ self.log_fl = self.log_fl_templ.format(self.test_uuid)
+ self.pid_file = self.pid_file_templ.format(self.test_uuid)
+ self.task_file = self.task_file_templ.format(self.test_uuid)
+
fio_command_file = open_for_append_or_create(cmd_log)
cfg_s_it = io_agent.compile_all_in_1(self.raw_cfg, self.config_params)
@@ -115,29 +133,35 @@
fio_command_file.write(splitter.join(cfg_s_it))
self.fio_raw_results_file = open_for_append_or_create(raw_res)
- def cleanup(self, conn):
- delete_file(conn, self.io_py_remote)
+ def cleanup(self):
+ # delete_file(conn, self.io_py_remote)
# Need to remove tempo files, used for testing
+ pass
- def pre_run(self, conn):
+ def pre_run(self):
+ ssh_mkdir(self.node.connection.open_sftp(),
+ os.path.dirname(self.io_py_remote),
+ intermediate=True)
try:
- run_over_ssh(conn, 'which fio', node=self.node)
+ self.run_over_ssh('which fio')
except OSError:
# TODO: install fio, if not installed
- cmd = "sudo apt-get -y install fio"
-
for i in range(3):
try:
- run_over_ssh(conn, cmd, node=self.node)
+ self.run_over_ssh("sudo apt-get -y install fio")
break
except OSError as err:
time.sleep(3)
else:
raise OSError("Can't install fio - " + err.message)
- local_fname = io_agent.__file__.rsplit('.')[0] + ".py"
- self.files_to_copy = {local_fname: self.io_py_remote}
- copy_paths(conn, self.files_to_copy)
+ local_fname = os.path.splitext(io_agent.__file__)[0] + ".py"
+
+ self.files_to_copy = {
+ local_fname: self.io_py_remote,
+ }
+
+ copy_paths(self.node.connection, self.files_to_copy)
if self.options.get('prefill_files', True):
files = {}
@@ -155,17 +179,19 @@
# take largest size
files[fname] = max(files.get(fname, 0), msz)
- # logger.warning("dd run DISABLED")
- # cmd_templ = "dd if=/dev/zero of={0} bs={1} count={2}"
+ cmd_templ = "dd oflag=direct " + \
+ "if=/dev/zero of={0} bs={1} count={2}"
- cmd_templ = "sudo dd if=/dev/zero of={0} bs={1} count={2}"
+ if self.options.get("use_sudo", True):
+ cmd_templ = "sudo " + cmd_templ
+
ssize = 0
stime = time.time()
for fname, curr_sz in files.items():
cmd = cmd_templ.format(fname, 1024 ** 2, curr_sz)
ssize += curr_sz
- run_over_ssh(conn, cmd, timeout=curr_sz, node=self.node)
+ self.run_over_ssh(cmd, timeout=curr_sz)
ddtime = time.time() - stime
if ddtime > 1E-3:
@@ -175,9 +201,13 @@
else:
logger.warning("Test files prefill disabled")
- def run(self, conn, barrier):
- cmd_templ = "sudo env python2 {0} --type {1} {2} --json -"
- # cmd_templ = "env python2 {0} --type {1} {2} --json -"
+ def run(self, barrier):
+ cmd_templ = "screen -S {screen_name} -d -m " + \
+ "env python2 {0} -p {pid_file} -o {results_file} " + \
+ "--type {1} {2} --json {3}"
+
+ if self.options.get("use_sudo", True):
+ cmd_templ = "sudo " + cmd_templ
params = " ".join("{0}={1}".format(k, v)
for k, v in self.config_params.items())
@@ -185,14 +215,24 @@
if "" != params:
params = "--params " + params
- cmd = cmd_templ.format(self.io_py_remote, self.tool, params)
+ save_to_remote(self.node.connection.open_sftp(),
+ self.task_file, self.raw_cfg)
+
+ screen_name = self.test_uuid
+ cmd = cmd_templ.format(self.io_py_remote,
+ self.tool,
+ params,
+ self.task_file,
+ pid_file=self.pid_file,
+ results_file=self.log_fl,
+ screen_name=screen_name)
logger.debug("Waiting on barrier")
exec_time = io_agent.calculate_execution_time(self.configs)
exec_time_str = sec_to_str(exec_time)
try:
- timeout = int(exec_time * 1.2 + 300)
+ timeout = int(exec_time * 2 + 300)
if barrier.wait():
templ = "Test should takes about {0}." + \
" Should finish at {1}," + \
@@ -205,11 +245,67 @@
end_dt.strftime("%H:%M:%S"),
wait_till.strftime("%H:%M:%S")))
- out_err = run_over_ssh(conn, cmd,
- stdin_data=self.raw_cfg,
- timeout=timeout,
- node=self.node)
- logger.info("Done")
+ self.run_over_ssh(cmd)
+ logger.debug("Test started in screen {0}".format(screen_name))
+
+ end_of_wait_time = timeout + time.time()
+
+ # time_till_check = random.randint(30, 90)
+ time_till_check = 1
+
+ pid = None
+ no_pid_file = True
+ tcp_conn_timeout = 30
+ pid_get_timeout = 30 + time.time()
+
+ # TODO: add monitoring socket
+ if self.node.connection is not Local:
+ self.node.connection.close()
+
+ while end_of_wait_time > time.time():
+ conn = None
+ time.sleep(time_till_check)
+
+ try:
+ if self.node.connection is not Local:
+ conn = connect(self.node.conn_url,
+ conn_timeout=tcp_conn_timeout)
+ else:
+ conn = self.node.connection
+ except:
+ logging.exception("During connect")
+ continue
+
+ try:
+ pid = read_from_remote(conn.open_sftp(), self.pid_file)
+ no_pid_file = False
+ except (NameError, IOError):
+ no_pid_file = True
+
+ if conn is not Local:
+ conn.close()
+
+ if no_pid_file:
+ if pid is None:
+ if time.time() > pid_get_timeout:
+ msg = "On node {0} pid file doesn't " + \
+ "appears in time"
+ logging.error(msg.format(self.node.get_conn_id()))
+ raise RuntimeError("Start timeout")
+ else:
+ # execution finished
+ break
+
+ logger.debug("Done")
+
+ if self.node.connection is not Local:
+ timeout = tcp_conn_timeout * 3
+ self.node.connection = connect(self.node.conn_url,
+ conn_timeout=timeout)
+
+ # try to reboot and then connect
+ out_err = read_from_remote(self.node.connection.open_sftp(),
+ self.log_fl)
finally:
barrier.exit()