improve bg. process monitoring
diff --git a/wally/run_test.py b/wally/run_test.py
index e0e37bc..891b9d3 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -84,11 +84,19 @@
test.pre_run()
logger.debug("Run test for {0}".format(node.get_conn_id()))
test.run(barrier)
+ except utils.StopTestError as exc:
+ pass
except Exception as exc:
- logger.exception("In test {0} for node {1}".format(test, node))
+ msg = "In test {0} for node {1}"
+ msg = msg.format(test, node.get_conn_id())
+ logger.exception(msg)
+ exc = utils.StopTestError(msg, exc)
try:
test.cleanup()
+ except utils.StopTestError as exc1:
+ if exc is None:
+ exc = exc1
except:
msg = "Duringf cleanup - in test {0} for node {1}"
logger.exception(msg.format(test, node))
@@ -154,6 +162,9 @@
while not res_q.empty():
val = res_q.get()
+ if isinstance(val, utils.StopTestError):
+ raise val
+
if isinstance(val, Exception):
msg = "Exception during test execution: {0!s}"
raise ValueError(msg.format(val))
@@ -554,14 +565,24 @@
try:
logger.info("Start {0.__name__} stage".format(stage))
stage(cfg_dict, ctx)
+ except utils.StopTestError as exc:
+ msg = "During {0.__name__} stage: {1}".format(stage, exc)
+ logger.error(msg)
except Exception as exc:
logger.exception("During {0.__name__} stage".format(stage))
- if exc is not None:
- raise exc, cls, tb
+ # if exc is not None:
+ # raise exc, cls, tb
- for report_stage in report_stages:
- report_stage(cfg_dict, ctx)
+ if exc is None:
+ for report_stage in report_stages:
+ report_stage(cfg_dict, ctx)
logger.info("All info stored in {0} folder".format(cfg_dict['var_dir']))
- return 0
+
+ if exc is None:
+ logger.info("Tests finished successfully")
+ return 0
+ else:
+ logger.error("Tests are failed. See detailed error above")
+ return 1
diff --git a/wally/ssh_utils.py b/wally/ssh_utils.py
index c6b1f70..efae92f 100644
--- a/wally/ssh_utils.py
+++ b/wally/ssh_utils.py
@@ -53,6 +53,10 @@
def open(cls, *args, **kwarhgs):
return open(*args, **kwarhgs)
+ @classmethod
+ def stat(cls, path):
+ return os.stat(path)
+
def __enter__(self):
return self
diff --git a/wally/suits/io/results_loader.py b/wally/suits/io/results_loader.py
index 3c8d9c5..c67dbe8 100644
--- a/wally/suits/io/results_loader.py
+++ b/wally/suits/io/results_loader.py
@@ -16,7 +16,7 @@
data = {}
for tp, pre_result in test_output:
- if tp != 'io':
+ if tp != 'io' or pre_result is None:
pass
for name, results in pre_result['res'].items():
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index ad5e8a5..1e948fb 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -8,7 +8,8 @@
from paramiko import SSHException
-from wally.utils import ssize_to_b, open_for_append_or_create, sec_to_str
+from wally.utils import (ssize_to_b, open_for_append_or_create,
+ sec_to_str, StopTestError)
from wally.ssh_utils import (copy_paths, run_over_ssh,
save_to_remote,
@@ -37,13 +38,17 @@
self.coordination_queue = coordination_queue
self.remote_dir = remote_dir
self.is_primary = is_primary
+ self.stop_requested = False
+
+ def request_stop(self):
+ self.stop_requested = True
def join_remote(self, path):
return os.path.join(self.remote_dir, path)
def coordinate(self, data):
if self.coordination_queue is not None:
- self.coordination_queue.put(data)
+ self.coordination_queue.put((self.node.get_conn_id(), data))
def pre_run(self):
pass
@@ -140,6 +145,7 @@
self.log_fl = self.join_remote("log.txt")
self.pid_file = self.join_remote("pid")
self.task_file = self.join_remote("task.cfg")
+ self.use_sudo = self.options.get("use_sudo", True)
fio_command_file = open_for_append_or_create(cmd_log)
@@ -148,6 +154,10 @@
fio_command_file.write(splitter.join(cfg_s_it))
self.fio_raw_results_file = open_for_append_or_create(raw_res)
+ def __str__(self):
+ return "{0}({1})".format(self.__class__.__name__,
+ self.node.get_conn_id())
+
def cleanup(self):
# delete_file(conn, self.io_py_remote)
# Need to remove tempo files, used for testing
@@ -172,7 +182,7 @@
cmd_templ = "dd oflag=direct " + \
"if=/dev/zero of={0} bs={1} count={2}"
- if self.options.get("use_sudo", True):
+ if self.use_sudo:
cmd_templ = "sudo " + cmd_templ
ssize = 0
@@ -215,7 +225,7 @@
def pre_run(self):
try:
cmd = 'mkdir -p "{0}"'.format(self.remote_dir)
- if self.options.get("use_sudo", True):
+ if self.use_sudo:
cmd = "sudo " + cmd
cmd += " ; sudo chown {0} {1}".format(self.node.get_user(),
self.remote_dir)
@@ -225,7 +235,7 @@
msg = "Failed to create folder {0} on remote {1}. Error: {2!s}"
msg = msg.format(self.remote_dir, self.node.get_conn_id(), exc)
logger.error(msg)
- raise
+ raise StopTestError(msg, exc)
self.install_utils()
@@ -238,9 +248,31 @@
elif self.is_primary:
logger.warning("Prefilling of test files is disabled")
- def get_test_status(self):
+ def check_process_is_running(self, sftp, pid):
+ try:
+ sftp.stat("/proc/{0}".format(pid))
+ return True
+ except OSError:
+ return False
+
+ def kill_remote_process(self, conn, pid, soft=True):
+ try:
+ if soft:
+ cmd = "kill {0}"
+ else:
+ cmd = "kill -9 {0}"
+
+ if self.use_sudo:
+ cmd = "sudo " + cmd
+
+ self.run_over_ssh(cmd.format(pid))
+ return True
+ except OSError:
+ return False
+
+ def get_test_status(self, die_timeout=3):
is_connected = None
- has_pid_file = None
+ is_running = None
pid = None
err = None
@@ -251,10 +283,15 @@
with conn.open_sftp() as sftp:
try:
pid = read_from_remote(sftp, self.pid_file)
- has_pid_file = True
+ is_running = True
except (NameError, IOError) as exc:
pid = None
- has_pid_file = False
+ is_running = False
+
+ if is_running:
+ if not self.check_process_is_running(sftp, pid):
+ sftp.remove(self.pid_file)
+ is_running = False
is_connected = True
@@ -262,7 +299,7 @@
err = str(exc)
is_connected = False
- return is_connected, has_pid_file, pid, err
+ return is_connected, is_running, pid, err
def wait_till_finished(self, timeout):
conn_id = self.node.get_conn_id()
@@ -271,21 +308,21 @@
# time_till_check = random.randint(30, 90)
time_till_check = 5
pid = None
- has_pid_file = False
+ is_running = False
pid_get_timeout = self.max_pig_timeout + time.time()
curr_connected = True
while end_of_wait_time > time.time():
time.sleep(time_till_check)
- is_connected, has_pid_file, pid, err = self.get_test_status()
+ is_connected, is_running, pid, err = self.get_test_status()
- if not has_pid_file:
+ if not is_running:
if pid is None and time.time() > pid_get_timeout:
msg = ("On node {0} pid file doesn't " +
"appears in time")
logger.error(msg.format(conn_id))
- raise RuntimeError("Start timeout")
+ raise StopTestError("Start timeout")
else:
# execution finished
break
@@ -366,7 +403,6 @@
conn_timeout=conn_timeout)
with self.node.connection.open_sftp() as sftp:
- # try to reboot and then connect
out_err = read_from_remote(sftp, self.log_fl)
finally:
@@ -378,7 +414,7 @@
try:
for data in parse_output(out_err):
self.on_result_cb(data)
- except OSError:
+ except (OSError, StopTestError):
raise
except Exception as exc:
msg_templ = "Error during postprocessing results: {0!s}"
diff --git a/wally/utils.py b/wally/utils.py
index ab825a6..dcc03cb 100644
--- a/wally/utils.py
+++ b/wally/utils.py
@@ -26,6 +26,12 @@
pass
+class StopTestError(RuntimeError):
+ def __init__(self, reason, orig_exc=None):
+ RuntimeError.__init__(self, reason)
+ self.orig_exc = orig_exc
+
+
class Barrier(object):
def __init__(self, count):
self.count = count