run tests on nodes in offline mode
diff --git a/wally/run_test.py b/wally/run_test.py
index d578349..d4b33bf 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -2,7 +2,6 @@
import os
import sys
-import time
import Queue
import pprint
import logging
@@ -78,23 +77,26 @@
def test_thread(test, node, barrier, res_q):
+ exc = None
try:
logger.debug("Run preparation for {0}".format(node.get_conn_id()))
- test.pre_run(node.connection)
+ test.pre_run()
logger.debug("Run test for {0}".format(node.get_conn_id()))
- test.run(node.connection, barrier)
+ test.run(barrier)
except Exception as exc:
logger.exception("In test {0} for node {1}".format(test, node))
- res_q.put(exc)
try:
- test.cleanup(node.connection)
+ test.cleanup()
except:
msg = "Duringf cleanup - in test {0} for node {1}"
logger.exception(msg.format(test, node))
+ if exc is not None:
+ res_q.put(exc)
-def run_tests(test_block, nodes):
+
+def run_tests(test_block, nodes, test_uuid):
tool_type_mapper = {
"io": IOPerfTest,
"pgbench": PgBenchTest,
@@ -124,8 +126,8 @@
if not os.path.exists(dr):
os.makedirs(dr)
- test = tool_type_mapper[name](params, res_q.put, dr,
- node=node.get_conn_id())
+ test = tool_type_mapper[name](params, res_q.put, test_uuid, node,
+ log_directory=dr)
th = threading.Thread(None, test_thread, None,
(test, node, barrier, res_q))
threads.append(th)
@@ -313,10 +315,13 @@
if not cfg['no_tests']:
for test_group in config.get('tests', []):
- ctx.results.extend(run_tests(test_group, ctx.nodes))
+ test_res = run_tests(test_group, ctx.nodes,
+ cfg['run_uuid'])
+ ctx.results.extend(test_res)
else:
if not cfg['no_tests']:
- ctx.results.extend(run_tests(group, ctx.nodes))
+ test_res = run_tests(group, ctx.nodes, cfg['run_uuid'])
+ ctx.results.extend(test_res)
def shut_down_vms_stage(cfg, ctx):
diff --git a/wally/sensors/daemonize.py b/wally/sensors/daemonize.py
index a4fa157..bc4ab81 100644
--- a/wally/sensors/daemonize.py
+++ b/wally/sensors/daemonize.py
@@ -1,15 +1,14 @@
# #!/usr/bin/python
-import fcntl
import os
import pwd
import grp
import sys
+import fcntl
import signal
-import resource
-import logging
import atexit
-from logging import handlers
+import logging
+import resource
class Daemonize(object):
@@ -155,7 +154,7 @@
# actually have such capabilities
# on the machine we are running this.
if os.path.isfile(syslog_address):
- syslog = handlers.SysLogHandler(syslog_address)
+ syslog = logging.handlers.SysLogHandler(syslog_address)
if self.verbose:
syslog.setLevel(logging.DEBUG)
else:
diff --git a/wally/ssh_utils.py b/wally/ssh_utils.py
index f4de3b6..d3d6547 100644
--- a/wally/ssh_utils.py
+++ b/wally/ssh_utils.py
@@ -28,6 +28,9 @@
@classmethod
def put(cls, localfile, remfile):
+ dirname = os.path.dirname(remfile)
+ if not os.path.exists(dirname):
+ os.makedirs(dirname)
shutil.copyfile(localfile, remfile)
@classmethod
@@ -105,6 +108,16 @@
time.sleep(1)
+def save_to_remote(sftp, path, content):
+ with sftp.open(path, "wb") as fd:
+ fd.write(content)
+
+
+def read_from_remote(sftp, path):
+ with sftp.open(path, "rb") as fd:
+ return fd.read()
+
+
def normalize_dirpath(dirpath):
while dirpath.endswith("/"):
dirpath = dirpath[:-1]
@@ -119,7 +132,7 @@
if intermediate:
try:
sftp.mkdir(remotepath, mode=mode)
- except IOError:
+ except (IOError, OSError):
upper_dir = remotepath.rsplit("/", 1)[0]
if upper_dir == '' or upper_dir == '/':
@@ -265,6 +278,9 @@
# ip_host:port:path_to_key_file
# ip_host::path_to_key_file
+ if uri.startswith("ssh://"):
+ uri = uri[len("ssh://"):]
+
res = ConnCreds()
res.port = "22"
res.key_file = None
diff --git a/wally/suits/io/agent.py b/wally/suits/io/agent.py
index b11f20e..25910b4 100644
--- a/wally/suits/io/agent.py
+++ b/wally/suits/io/agent.py
@@ -1,9 +1,11 @@
+import os
import sys
import time
import json
import copy
import select
import pprint
+import os.path
import argparse
import traceback
import subprocess
@@ -516,7 +518,7 @@
help="Start execution at START_AT_UTC")
parser.add_argument("--json", action="store_true", default=False,
help="Json output format")
- parser.add_argument("--output", default='-', metavar="FILE_PATH",
+ parser.add_argument("-o", "--output", default='-', metavar="FILE_PATH",
help="Store results to FILE_PATH")
parser.add_argument("--estimate", action="store_true", default=False,
help="Only estimate task execution time")
@@ -533,6 +535,9 @@
default=[],
help="Provide set of pairs PARAM=VAL to" +
"format into job description")
+ parser.add_argument("-p", "--pid-file", metavar="FILE_TO_STORE_PID",
+ default=None, help="Store pid to FILE_TO_STORE_PID " +
+ "and remove this file on exit")
parser.add_argument("jobfile")
return parser.parse_args(argv)
@@ -550,65 +555,78 @@
else:
out_fd = open(argv_obj.output, "w")
- params = {}
- for param_val in argv_obj.params:
- assert '=' in param_val
- name, val = param_val.split("=", 1)
- params[name] = val
+ if argv_obj.pid_file is not None:
+ with open(argv_obj.pid_file, "w") as fd:
+ fd.write(str(os.getpid()))
- slice_params = {
- 'runcycle': argv_obj.runcycle,
- }
+ try:
+ params = {}
+ for param_val in argv_obj.params:
+ assert '=' in param_val
+ name, val = param_val.split("=", 1)
+ params[name] = val
- sliced_it = parse_and_slice_all_in_1(job_cfg, params, **slice_params)
+ slice_params = {
+ 'runcycle': argv_obj.runcycle,
+ }
- if argv_obj.estimate:
- it = map(calculate_execution_time, sliced_it)
- print sec_to_str(sum(it))
- return 0
+ sliced_it = parse_and_slice_all_in_1(job_cfg, params, **slice_params)
- if argv_obj.num_tests or argv_obj.compile:
- if argv_obj.compile:
- for test_slice in sliced_it:
- out_fd.write(fio_config_to_str(test_slice))
- out_fd.write("\n#" + "-" * 70 + "\n\n")
+ if argv_obj.estimate:
+ it = map(calculate_execution_time, sliced_it)
+ print sec_to_str(sum(it))
+ return 0
- if argv_obj.num_tests:
- print len(list(sliced_it))
+ if argv_obj.num_tests or argv_obj.compile:
+ if argv_obj.compile:
+ for test_slice in sliced_it:
+ out_fd.write(fio_config_to_str(test_slice))
+ out_fd.write("\n#" + "-" * 70 + "\n\n")
- return 0
+ if argv_obj.num_tests:
+ print len(list(sliced_it))
- if argv_obj.start_at is not None:
- ctime = time.time()
- if argv_obj.start_at >= ctime:
- time.sleep(ctime - argv_obj.start_at)
+ return 0
- def raw_res_func(test_num, data):
- pref = "========= RAW_RESULTS({0}) =========\n".format(test_num)
- out_fd.write(pref)
- out_fd.write(json.dumps(data))
- out_fd.write("\n========= END OF RAW_RESULTS =========\n")
- out_fd.flush()
+ if argv_obj.start_at is not None:
+ ctime = time.time()
+ if argv_obj.start_at >= ctime:
+ time.sleep(ctime - argv_obj.start_at)
- rrfunc = raw_res_func if argv_obj.show_raw_results else None
+ def raw_res_func(test_num, data):
+ pref = "========= RAW_RESULTS({0}) =========\n".format(test_num)
+ out_fd.write(pref)
+ out_fd.write(json.dumps(data))
+ out_fd.write("\n========= END OF RAW_RESULTS =========\n")
+ out_fd.flush()
- stime = time.time()
- job_res, num_tests, ok = run_benchmark(argv_obj.type, sliced_it, rrfunc)
- etime = time.time()
+ rrfunc = raw_res_func if argv_obj.show_raw_results else None
- res = {'__meta__': {'raw_cfg': job_cfg, 'params': params}, 'res': job_res}
+ stime = time.time()
+ job_res, num_tests, ok = run_benchmark(argv_obj.type,
+ sliced_it, rrfunc)
+ etime = time.time()
- oformat = 'json' if argv_obj.json else 'eval'
- out_fd.write("\nRun {0} tests in {1} seconds\n".format(num_tests,
- int(etime - stime)))
- out_fd.write("========= RESULTS(format={0}) =========\n".format(oformat))
- if argv_obj.json:
- out_fd.write(json.dumps(res))
- else:
- out_fd.write(pprint.pformat(res) + "\n")
- out_fd.write("\n========= END OF RESULTS =========\n")
+ res = {'__meta__': {'raw_cfg': job_cfg, 'params': params},
+ 'res': job_res}
- return 0 if ok else 1
+ oformat = 'json' if argv_obj.json else 'eval'
+ msg = "\nRun {0} tests in {1} seconds\n"
+ out_fd.write(msg.format(num_tests, int(etime - stime)))
+
+ msg = "========= RESULTS(format={0}) =========\n"
+ out_fd.write(msg.format(oformat))
+ if argv_obj.json:
+ out_fd.write(json.dumps(res))
+ else:
+ out_fd.write(pprint.pformat(res) + "\n")
+ out_fd.write("\n========= END OF RESULTS =========\n")
+
+ return 0 if ok else 1
+ finally:
+ if argv_obj.pid_file is not None:
+ if os.path.exists(argv_obj.pid_file):
+ os.unlink(argv_obj.pid_file)
def fake_main(x):
diff --git a/wally/suits/io/ceph.cfg b/wally/suits/io/ceph.cfg
index cc8e411..1352b1e 100644
--- a/wally/suits/io/ceph.cfg
+++ b/wally/suits/io/ceph.cfg
@@ -9,7 +9,7 @@
NUM_ROUNDS=7
NUMJOBS={% 1, 5, 10, 15, 20, 30, 40, 80, 120 %}
-size=5G
+size=30G
ramp_time=30
runtime=60
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index 0f8afd4..5166fdd 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -1,12 +1,17 @@
import abc
import time
+import random
import os.path
import logging
import datetime
-from wally.ssh_utils import copy_paths, run_over_ssh, delete_file
from wally.utils import ssize_to_b, open_for_append_or_create, sec_to_str
+from wally.ssh_utils import (copy_paths, run_over_ssh,
+ save_to_remote, ssh_mkdir,
+ # delete_file,
+ connect, read_from_remote, Local)
+
from . import postgres
from .io import agent as io_agent
from .io import formatter as io_formatter
@@ -17,19 +22,20 @@
class IPerfTest(object):
- def __init__(self, on_result_cb, log_directory=None, node=None):
+ def __init__(self, on_result_cb, test_uuid, node, log_directory=None):
self.on_result_cb = on_result_cb
self.log_directory = log_directory
self.node = node
+ self.test_uuid = test_uuid
- def pre_run(self, conn):
+ def pre_run(self):
pass
- def cleanup(self, conn):
+ def cleanup(self):
pass
@abc.abstractmethod
- def run(self, conn, barrier):
+ def run(self, barrier):
pass
@classmethod
@@ -37,12 +43,16 @@
msg = "{0}.format_for_console".format(cls.__name__)
raise NotImplementedError(msg)
+ def run_over_ssh(self, cmd, **kwargs):
+ return run_over_ssh(self.node.connection, cmd,
+ node=self.node.get_conn_id(), **kwargs)
+
class TwoScriptTest(IPerfTest):
remote_tmp_dir = '/tmp'
- def __init__(self, opts, on_result_cb, log_directory=None, node=None):
- IPerfTest.__init__(self, on_result_cb, log_directory, node=node)
+ def __init__(self, opts, *dt, **mp):
+ IPerfTest.__init__(self, *dt, **mp)
self.opts = opts
if 'run_script' in self.opts:
@@ -52,22 +62,23 @@
def get_remote_for_script(self, script):
return os.path.join(self.tmp_dir, script.rpartition('/')[2])
- def copy_script(self, conn, src):
+ def copy_script(self, src):
remote_path = self.get_remote_for_script(src)
- copy_paths(conn, {src: remote_path})
+ copy_paths(self.node.connection, {src: remote_path})
return remote_path
- def pre_run(self, conn):
- remote_script = self.copy_script(conn, self.pre_run_script)
+ def pre_run(self):
+ remote_script = self.copy_script(self.node.connection,
+ self.pre_run_script)
cmd = remote_script
- run_over_ssh(conn, cmd, node=self.node)
+ self.run_over_ssh(cmd)
- def run(self, conn, barrier):
- remote_script = self.copy_script(conn, self.run_script)
+ def run(self, barrier):
+ remote_script = self.copy_script(self.node.connection, self.run_script)
cmd_opts = ' '.join(["%s %s" % (key, val) for key, val
in self.opts.items()])
cmd = remote_script + ' ' + cmd_opts
- out_err = run_over_ssh(conn, cmd, node=self.node)
+ out_err = self.run_over_ssh(cmd)
self.on_result(out_err, cmd)
def parse_results(self, out):
@@ -91,11 +102,13 @@
class IOPerfTest(IPerfTest):
- io_py_remote = "/tmp/disk_test_agent.py"
+ io_py_remote_templ = "/tmp/test/{0}/io/disk_test_agent.py"
+ log_fl_templ = "/tmp/test/{0}/io/disk_test_agent_log.txt"
+ pid_file_templ = "/tmp/test/{0}/io/disk_test_agent_pid_file"
+ task_file_templ = "/tmp/test/{0}/io/io_task.cfg"
- def __init__(self, test_options, on_result_cb,
- log_directory=None, node=None):
- IPerfTest.__init__(self, on_result_cb, log_directory, node=node)
+ def __init__(self, test_options, *dt, **mp):
+ IPerfTest.__init__(self, *dt, **mp)
self.options = test_options
self.config_fname = test_options['cfg']
self.alive_check_interval = test_options.get('alive_check_interval')
@@ -108,6 +121,11 @@
cmd_log = os.path.join(self.log_directory, "task_compiled.cfg")
raw_res = os.path.join(self.log_directory, "raw_results.txt")
+ self.io_py_remote = self.io_py_remote_templ.format(self.test_uuid)
+ self.log_fl = self.log_fl_templ.format(self.test_uuid)
+ self.pid_file = self.pid_file_templ.format(self.test_uuid)
+ self.task_file = self.task_file_templ.format(self.test_uuid)
+
fio_command_file = open_for_append_or_create(cmd_log)
cfg_s_it = io_agent.compile_all_in_1(self.raw_cfg, self.config_params)
@@ -115,29 +133,35 @@
fio_command_file.write(splitter.join(cfg_s_it))
self.fio_raw_results_file = open_for_append_or_create(raw_res)
- def cleanup(self, conn):
- delete_file(conn, self.io_py_remote)
+ def cleanup(self):
+ # delete_file(conn, self.io_py_remote)
# Need to remove tempo files, used for testing
+ pass
- def pre_run(self, conn):
+ def pre_run(self):
+ ssh_mkdir(self.node.connection.open_sftp(),
+ os.path.dirname(self.io_py_remote),
+ intermediate=True)
try:
- run_over_ssh(conn, 'which fio', node=self.node)
+ self.run_over_ssh('which fio')
except OSError:
# TODO: install fio, if not installed
- cmd = "sudo apt-get -y install fio"
-
for i in range(3):
try:
- run_over_ssh(conn, cmd, node=self.node)
+ self.run_over_ssh("sudo apt-get -y install fio")
break
except OSError as err:
time.sleep(3)
else:
raise OSError("Can't install fio - " + err.message)
- local_fname = io_agent.__file__.rsplit('.')[0] + ".py"
- self.files_to_copy = {local_fname: self.io_py_remote}
- copy_paths(conn, self.files_to_copy)
+ local_fname = os.path.splitext(io_agent.__file__)[0] + ".py"
+
+ self.files_to_copy = {
+ local_fname: self.io_py_remote,
+ }
+
+ copy_paths(self.node.connection, self.files_to_copy)
if self.options.get('prefill_files', True):
files = {}
@@ -155,17 +179,19 @@
# take largest size
files[fname] = max(files.get(fname, 0), msz)
- # logger.warning("dd run DISABLED")
- # cmd_templ = "dd if=/dev/zero of={0} bs={1} count={2}"
+ cmd_templ = "dd oflag=direct " + \
+ "if=/dev/zero of={0} bs={1} count={2}"
- cmd_templ = "sudo dd if=/dev/zero of={0} bs={1} count={2}"
+ if self.options.get("use_sudo", True):
+ cmd_templ = "sudo " + cmd_templ
+
ssize = 0
stime = time.time()
for fname, curr_sz in files.items():
cmd = cmd_templ.format(fname, 1024 ** 2, curr_sz)
ssize += curr_sz
- run_over_ssh(conn, cmd, timeout=curr_sz, node=self.node)
+ self.run_over_ssh(cmd, timeout=curr_sz)
ddtime = time.time() - stime
if ddtime > 1E-3:
@@ -175,9 +201,13 @@
else:
logger.warning("Test files prefill disabled")
- def run(self, conn, barrier):
- cmd_templ = "sudo env python2 {0} --type {1} {2} --json -"
- # cmd_templ = "env python2 {0} --type {1} {2} --json -"
+ def run(self, barrier):
+ cmd_templ = "screen -S {screen_name} -d -m " + \
+ "env python2 {0} -p {pid_file} -o {results_file} " + \
+ "--type {1} {2} --json {3}"
+
+ if self.options.get("use_sudo", True):
+ cmd_templ = "sudo " + cmd_templ
params = " ".join("{0}={1}".format(k, v)
for k, v in self.config_params.items())
@@ -185,14 +215,24 @@
if "" != params:
params = "--params " + params
- cmd = cmd_templ.format(self.io_py_remote, self.tool, params)
+ save_to_remote(self.node.connection.open_sftp(),
+ self.task_file, self.raw_cfg)
+
+ screen_name = self.test_uuid
+ cmd = cmd_templ.format(self.io_py_remote,
+ self.tool,
+ params,
+ self.task_file,
+ pid_file=self.pid_file,
+ results_file=self.log_fl,
+ screen_name=screen_name)
logger.debug("Waiting on barrier")
exec_time = io_agent.calculate_execution_time(self.configs)
exec_time_str = sec_to_str(exec_time)
try:
- timeout = int(exec_time * 1.2 + 300)
+ timeout = int(exec_time * 2 + 300)
if barrier.wait():
templ = "Test should takes about {0}." + \
" Should finish at {1}," + \
@@ -205,11 +245,67 @@
end_dt.strftime("%H:%M:%S"),
wait_till.strftime("%H:%M:%S")))
- out_err = run_over_ssh(conn, cmd,
- stdin_data=self.raw_cfg,
- timeout=timeout,
- node=self.node)
- logger.info("Done")
+ self.run_over_ssh(cmd)
+ logger.debug("Test started in screen {0}".format(screen_name))
+
+ end_of_wait_time = timeout + time.time()
+
+ # time_till_check = random.randint(30, 90)
+ time_till_check = 1
+
+ pid = None
+ no_pid_file = True
+ tcp_conn_timeout = 30
+ pid_get_timeout = 30 + time.time()
+
+ # TODO: add monitoring socket
+ if self.node.connection is not Local:
+ self.node.connection.close()
+
+ while end_of_wait_time > time.time():
+ conn = None
+ time.sleep(time_till_check)
+
+ try:
+ if self.node.connection is not Local:
+ conn = connect(self.node.conn_url,
+ conn_timeout=tcp_conn_timeout)
+ else:
+ conn = self.node.connection
+ except:
+ logging.exception("During connect")
+ continue
+
+ try:
+ pid = read_from_remote(conn.open_sftp(), self.pid_file)
+ no_pid_file = False
+ except (NameError, IOError):
+ no_pid_file = True
+
+ if conn is not Local:
+ conn.close()
+
+ if no_pid_file:
+ if pid is None:
+ if time.time() > pid_get_timeout:
+ msg = "On node {0} pid file doesn't " + \
+ "appears in time"
+ logging.error(msg.format(self.node.get_conn_id()))
+ raise RuntimeError("Start timeout")
+ else:
+ # execution finished
+ break
+
+ logger.debug("Done")
+
+ if self.node.connection is not Local:
+ timeout = tcp_conn_timeout * 3
+ self.node.connection = connect(self.node.conn_url,
+ conn_timeout=timeout)
+
+ # try to reboot and then connect
+ out_err = read_from_remote(self.node.connection.open_sftp(),
+ self.log_fl)
finally:
barrier.exit()