improve bg. process monitoring
diff --git a/wally/suits/io/results_loader.py b/wally/suits/io/results_loader.py
index 3c8d9c5..c67dbe8 100644
--- a/wally/suits/io/results_loader.py
+++ b/wally/suits/io/results_loader.py
@@ -16,7 +16,7 @@
data = {}
for tp, pre_result in test_output:
- if tp != 'io':
+ if tp != 'io' or pre_result is None:
pass
for name, results in pre_result['res'].items():
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index ad5e8a5..1e948fb 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -8,7 +8,8 @@
from paramiko import SSHException
-from wally.utils import ssize_to_b, open_for_append_or_create, sec_to_str
+from wally.utils import (ssize_to_b, open_for_append_or_create,
+ sec_to_str, StopTestError)
from wally.ssh_utils import (copy_paths, run_over_ssh,
save_to_remote,
@@ -37,13 +38,17 @@
self.coordination_queue = coordination_queue
self.remote_dir = remote_dir
self.is_primary = is_primary
+ self.stop_requested = False
+
+ def request_stop(self):
+ self.stop_requested = True
def join_remote(self, path):
return os.path.join(self.remote_dir, path)
def coordinate(self, data):
if self.coordination_queue is not None:
- self.coordination_queue.put(data)
+ self.coordination_queue.put((self.node.get_conn_id(), data))
def pre_run(self):
pass
@@ -140,6 +145,7 @@
self.log_fl = self.join_remote("log.txt")
self.pid_file = self.join_remote("pid")
self.task_file = self.join_remote("task.cfg")
+ self.use_sudo = self.options.get("use_sudo", True)
fio_command_file = open_for_append_or_create(cmd_log)
@@ -148,6 +154,10 @@
fio_command_file.write(splitter.join(cfg_s_it))
self.fio_raw_results_file = open_for_append_or_create(raw_res)
+ def __str__(self):
+ return "{0}({1})".format(self.__class__.__name__,
+ self.node.get_conn_id())
+
def cleanup(self):
# delete_file(conn, self.io_py_remote)
# Need to remove tempo files, used for testing
@@ -172,7 +182,7 @@
cmd_templ = "dd oflag=direct " + \
"if=/dev/zero of={0} bs={1} count={2}"
- if self.options.get("use_sudo", True):
+ if self.use_sudo:
cmd_templ = "sudo " + cmd_templ
ssize = 0
@@ -215,7 +225,7 @@
def pre_run(self):
try:
cmd = 'mkdir -p "{0}"'.format(self.remote_dir)
- if self.options.get("use_sudo", True):
+ if self.use_sudo:
cmd = "sudo " + cmd
cmd += " ; sudo chown {0} {1}".format(self.node.get_user(),
self.remote_dir)
@@ -225,7 +235,7 @@
msg = "Failed to create folder {0} on remote {1}. Error: {2!s}"
msg = msg.format(self.remote_dir, self.node.get_conn_id(), exc)
logger.error(msg)
- raise
+ raise StopTestError(msg, exc)
self.install_utils()
@@ -238,9 +248,31 @@
elif self.is_primary:
logger.warning("Prefilling of test files is disabled")
- def get_test_status(self):
+ def check_process_is_running(self, sftp, pid):
+ try:
+ sftp.stat("/proc/{0}".format(pid))
+ return True
+ except OSError:
+ return False
+
+ def kill_remote_process(self, conn, pid, soft=True):
+ try:
+ if soft:
+ cmd = "kill {0}"
+ else:
+ cmd = "kill -9 {0}"
+
+ if self.use_sudo:
+ cmd = "sudo " + cmd
+
+ self.run_over_ssh(cmd.format(pid))
+ return True
+ except OSError:
+ return False
+
+ def get_test_status(self, die_timeout=3):
is_connected = None
- has_pid_file = None
+ is_running = None
pid = None
err = None
@@ -251,10 +283,15 @@
with conn.open_sftp() as sftp:
try:
pid = read_from_remote(sftp, self.pid_file)
- has_pid_file = True
+ is_running = True
except (NameError, IOError) as exc:
pid = None
- has_pid_file = False
+ is_running = False
+
+ if is_running:
+ if not self.check_process_is_running(sftp, pid):
+ sftp.remove(self.pid_file)
+ is_running = False
is_connected = True
@@ -262,7 +299,7 @@
err = str(exc)
is_connected = False
- return is_connected, has_pid_file, pid, err
+ return is_connected, is_running, pid, err
def wait_till_finished(self, timeout):
conn_id = self.node.get_conn_id()
@@ -271,21 +308,21 @@
# time_till_check = random.randint(30, 90)
time_till_check = 5
pid = None
- has_pid_file = False
+ is_running = False
pid_get_timeout = self.max_pig_timeout + time.time()
curr_connected = True
while end_of_wait_time > time.time():
time.sleep(time_till_check)
- is_connected, has_pid_file, pid, err = self.get_test_status()
+ is_connected, is_running, pid, err = self.get_test_status()
- if not has_pid_file:
+ if not is_running:
if pid is None and time.time() > pid_get_timeout:
msg = ("On node {0} pid file doesn't " +
"appears in time")
logger.error(msg.format(conn_id))
- raise RuntimeError("Start timeout")
+ raise StopTestError("Start timeout")
else:
# execution finished
break
@@ -366,7 +403,6 @@
conn_timeout=conn_timeout)
with self.node.connection.open_sftp() as sftp:
- # try to reboot and then connect
out_err = read_from_remote(sftp, self.log_fl)
finally:
@@ -378,7 +414,7 @@
try:
for data in parse_output(out_err):
self.on_result_cb(data)
- except OSError:
+ except (OSError, StopTestError):
raise
except Exception as exc:
msg_templ = "Error during postprocessing results: {0!s}"