fixes, fixes, fixes
diff --git a/wally/run_test.py b/wally/run_test.py
index 07ac6e3..e0e37bc 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -242,6 +242,7 @@
creds = None
if creds_type == 'clouds':
+ logger.info("Using OS credentials from 'cloud' section")
if 'openstack' in cfg['clouds']:
os_cfg = cfg['clouds']['openstack']
@@ -255,8 +256,10 @@
creds = ctx.fuel_openstack_creds
elif creds_type == 'ENV':
+ logger.info("Using OS credentials from shell environment")
user, passwd, tenant, auth_url = start_vms.ostack_get_creds()
elif os.path.isfile(creds_type):
+ logger.info("Using OS credentials from " + creds_type)
fc = open(creds_type).read()
echo = 'echo "$OS_TENANT_NAME:$OS_USERNAME:$OS_PASSWORD@$OS_AUTH_URL"'
@@ -295,6 +298,8 @@
'tenant': tenant,
'auth_url': auth_url}
+ msg = "OS_CREDS: user={name} tenant={tenant} auth_url={auth_url}"
+ logger.debug(msg.format(**creds))
return creds
diff --git a/wally/ssh_utils.py b/wally/ssh_utils.py
index 81f348c..c6b1f70 100644
--- a/wally/ssh_utils.py
+++ b/wally/ssh_utils.py
@@ -64,7 +64,9 @@
if creds == 'local':
return Local
- tcp_timeout = 30
+ tcp_timeout = 15
+ banner_timeout = 30
+
ssh = paramiko.SSHClient()
ssh.load_host_keys('/dev/null')
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
@@ -74,36 +76,40 @@
while True:
try:
+ tleft = etime - time.time()
+ c_tcp_timeout = min(tcp_timeout, tleft)
+ c_banner_timeout = min(banner_timeout, tleft)
+
if creds.passwd is not None:
ssh.connect(creds.host,
- timeout=tcp_timeout,
+ timeout=c_tcp_timeout,
username=creds.user,
password=creds.passwd,
port=creds.port,
allow_agent=False,
- look_for_keys=False)
- return ssh
-
- if creds.key_file is not None:
+ look_for_keys=False,
+ banner_timeout=c_banner_timeout)
+ elif creds.key_file is not None:
ssh.connect(creds.host,
username=creds.user,
- timeout=tcp_timeout,
+ timeout=c_tcp_timeout,
key_filename=creds.key_file,
look_for_keys=False,
- port=creds.port)
- return ssh
-
- key_file = os.path.expanduser('~/.ssh/id_rsa')
- ssh.connect(creds.host,
- username=creds.user,
- timeout=tcp_timeout,
- key_filename=key_file,
- look_for_keys=False,
- port=creds.port)
+ port=creds.port,
+ banner_timeout=c_banner_timeout)
+ else:
+ key_file = os.path.expanduser('~/.ssh/id_rsa')
+ ssh.connect(creds.host,
+ username=creds.user,
+ timeout=c_tcp_timeout,
+ key_filename=key_file,
+ look_for_keys=False,
+ port=creds.port,
+ banner_timeout=c_banner_timeout)
return ssh
except paramiko.PasswordRequiredException:
raise
- except socket.error:
+ except (socket.error, paramiko.SSHException):
if time.time() > etime:
raise
time.sleep(1)
@@ -323,7 +329,6 @@
stderr=subprocess.STDOUT)
stdoutdata, _ = proc.communicate(input=stdin_data)
-
if proc.returncode != 0:
templ = "SSH:{0} Cmd {1!r} failed with code {2}. Output: {3}"
raise OSError(templ.format(node, cmd, proc.returncode, stdoutdata))
diff --git a/wally/suits/io/agent.py b/wally/suits/io/agent.py
index 5fb835e..336b176 100644
--- a/wally/suits/io/agent.py
+++ b/wally/suits/io/agent.py
@@ -428,52 +428,41 @@
def run_fio(sliced_it, raw_results_func=None):
sliced_list = list(sliced_it)
- ok = True
- try:
- curr_test_num = 0
- executed_tests = 0
- result = {}
+ curr_test_num = 0
+ executed_tests = 0
+ result = {}
- for i, test_slice in enumerate(sliced_list):
- res_cfg_it = do_run_fio(test_slice)
- res_cfg_it = enumerate(res_cfg_it, curr_test_num)
+ for i, test_slice in enumerate(sliced_list):
+ res_cfg_it = do_run_fio(test_slice)
+ res_cfg_it = enumerate(res_cfg_it, curr_test_num)
- for curr_test_num, (job_output, section) in res_cfg_it:
- executed_tests += 1
+ for curr_test_num, (job_output, section) in res_cfg_it:
+ executed_tests += 1
- if raw_results_func is not None:
- raw_results_func(executed_tests,
- [job_output, section])
+ if raw_results_func is not None:
+ raw_results_func(executed_tests,
+ [job_output, section])
- msg = "{0} != {1}".format(section.name, job_output["jobname"])
- assert section.name == job_output["jobname"], msg
+ msg = "{0} != {1}".format(section.name, job_output["jobname"])
+ assert section.name == job_output["jobname"], msg
- if section.name.startswith('_'):
- continue
+ if section.name.startswith('_'):
+ continue
- add_job_results(section, job_output, result)
+ add_job_results(section, job_output, result)
- curr_test_num += 1
- msg_template = "Done {0} tests from {1}. ETA: {2}"
+ curr_test_num += 1
+ msg_template = "Done {0} tests from {1}. ETA: {2}"
- rest = sliced_list[i:]
- time_eta = sum(map(calculate_execution_time, rest))
- test_left = sum(map(len, rest))
- print msg_template.format(curr_test_num,
- test_left,
- sec_to_str(time_eta))
+ rest = sliced_list[i:]
+ time_eta = sum(map(calculate_execution_time, rest))
+ test_left = sum(map(len, rest))
+ print msg_template.format(curr_test_num,
+ test_left,
+ sec_to_str(time_eta))
- except (SystemExit, KeyboardInterrupt):
- raise
-
- except Exception:
- print "=========== ERROR ============="
- traceback.print_exc()
- print "======== END OF ERROR ========="
- ok = False
-
- return result, executed_tests, ok
+ return result, executed_tests
def run_benchmark(binary_tp, *argv, **kwargs):
@@ -603,8 +592,8 @@
rrfunc = raw_res_func if argv_obj.show_raw_results else None
stime = time.time()
- job_res, num_tests, ok = run_benchmark(argv_obj.type,
- sliced_it, rrfunc)
+ job_res, num_tests = run_benchmark(argv_obj.type,
+ sliced_it, rrfunc)
etime = time.time()
res = {'__meta__': {'raw_cfg': job_cfg, 'params': params},
@@ -622,8 +611,21 @@
out_fd.write(pprint.pformat(res) + "\n")
out_fd.write("\n========= END OF RESULTS =========\n")
- return 0 if ok else 1
+ return 0
+ except:
+ out_fd.write("============ ERROR =============\n")
+ out_fd.write(traceback.format_exc() + "\n")
+ out_fd.write("============ END OF ERROR =============\n")
+ return 1
finally:
+ try:
+ if out_fd is not sys.stdout:
+ out_fd.flush()
+ os.fsync(out_fd)
+ out_fd.close()
+ except Exception:
+ traceback.print_exc()
+
if argv_obj.pid_file is not None:
if os.path.exists(argv_obj.pid_file):
os.unlink(argv_obj.pid_file)
diff --git a/wally/suits/io/results_loader.py b/wally/suits/io/results_loader.py
index 9005450..3c8d9c5 100644
--- a/wally/suits/io/results_loader.py
+++ b/wally/suits/io/results_loader.py
@@ -29,6 +29,14 @@
def parse_output(out_err):
+ err_start_patt = r"(?ims)=+\s+ERROR\s+=+"
+ err_end_patt = r"(?ims)=+\s+END OF ERROR\s+=+"
+
+ for block in re.split(err_start_patt, out_err)[1:]:
+ tb, garbage = re.split(err_end_patt, block)
+ msg = "Test fails with error:\n" + tb.strip() + "\n"
+ raise OSError(msg)
+
start_patt = r"(?ims)=+\s+RESULTS\(format=json\)\s+=+"
end_patt = r"(?ims)=+\s+END OF RESULTS\s+=+"
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index 294909b..ad5e8a5 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -120,6 +120,8 @@
class IOPerfTest(IPerfTest):
+ tcp_conn_timeout = 30
+ max_pig_timeout = 30
def __init__(self, *dt, **mp):
IPerfTest.__init__(self, *dt, **mp)
@@ -236,6 +238,67 @@
elif self.is_primary:
logger.warning("Prefilling of test files is disabled")
+ def get_test_status(self):
+ is_connected = None
+ has_pid_file = None
+ pid = None
+ err = None
+
+ try:
+ conn = connect(self.node.conn_url,
+ conn_timeout=self.tcp_conn_timeout)
+ with conn:
+ with conn.open_sftp() as sftp:
+ try:
+ pid = read_from_remote(sftp, self.pid_file)
+ has_pid_file = True
+ except (NameError, IOError) as exc:
+ pid = None
+ has_pid_file = False
+
+ is_connected = True
+
+ except (socket.error, SSHException, EOFError) as exc:
+ err = str(exc)
+ is_connected = False
+
+ return is_connected, has_pid_file, pid, err
+
+ def wait_till_finished(self, timeout):
+ conn_id = self.node.get_conn_id()
+ end_of_wait_time = timeout + time.time()
+
+ # time_till_check = random.randint(30, 90)
+ time_till_check = 5
+ pid = None
+ has_pid_file = False
+ pid_get_timeout = self.max_pig_timeout + time.time()
+ curr_connected = True
+
+ while end_of_wait_time > time.time():
+ time.sleep(time_till_check)
+
+ is_connected, has_pid_file, pid, err = self.get_test_status()
+
+ if not has_pid_file:
+ if pid is None and time.time() > pid_get_timeout:
+ msg = ("On node {0} pid file doesn't " +
+ "appears in time")
+ logger.error(msg.format(conn_id))
+ raise RuntimeError("Start timeout")
+ else:
+ # execution finished
+ break
+
+ if is_connected and not curr_connected:
+ msg = "Connection with {0} is restored"
+ logger.debug(msg.format(conn_id))
+ elif not is_connected and curr_connected:
+ msg = "Lost connection with " + conn_id + ". Error: " + err
+ logger.debug(msg)
+
+ curr_connected = is_connected
+
def run(self, barrier):
conn_id = self.node.get_conn_id()
@@ -286,80 +349,26 @@
wait_till.strftime("%H:%M:%S")))
self.run_over_ssh(cmd)
+
msg = "Test on node {0} started in screen {1}"
logger.debug(msg.format(conn_id, screen_name))
- end_of_wait_time = timeout + time.time()
-
- # time_till_check = random.randint(30, 90)
- time_till_check = 5
-
- pid = None
- no_pid_file = True
- tcp_conn_timeout = 30
- pid_get_timeout = 30 + time.time()
- connection_ok = True
-
# TODO: add monitoring socket
if self.node.connection is not Local:
self.node.connection.close()
- while end_of_wait_time > time.time():
- conn = None
- time.sleep(time_till_check)
-
- try:
- if self.node.connection is not Local:
- conn = connect(self.node.conn_url,
- conn_timeout=tcp_conn_timeout)
- else:
- conn = self.node.connection
-
- try:
- with conn.open_sftp() as sftp:
- try:
- pid = read_from_remote(sftp, self.pid_file)
- no_pid_file = False
- except (NameError, IOError):
- no_pid_file = True
- finally:
- if conn is not Local:
- conn.close()
- conn = None
-
- if no_pid_file:
- if pid is None:
- if time.time() > pid_get_timeout:
- msg = ("On node {0} pid file doesn't " +
- "appears in time")
- logger.error(msg.format(conn_id))
- raise RuntimeError("Start timeout")
- else:
- # execution finished
- break
- if not connection_ok:
- msg = "Connection with {0} is restored"
- logger.debug(msg.format(conn_id))
- connection_ok = True
-
- except (socket.error, SSHException, EOFError) as exc:
- if connection_ok:
- connection_ok = False
- msg = "Lost connection with " + conn_id
- msg += ". Error: " + str(exc)
- logger.debug(msg)
-
+ self.wait_till_finished(timeout)
logger.debug("Done")
if self.node.connection is not Local:
- timeout = tcp_conn_timeout * 3
+ conn_timeout = self.tcp_conn_timeout * 3
self.node.connection = connect(self.node.conn_url,
- conn_timeout=timeout)
+ conn_timeout=conn_timeout)
with self.node.connection.open_sftp() as sftp:
# try to reboot and then connect
- out_err = read_from_remote(sftp,
- self.log_fl)
+ out_err = read_from_remote(sftp, self.log_fl)
+
finally:
barrier.exit()
@@ -369,6 +378,8 @@
try:
for data in parse_output(out_err):
self.on_result_cb(data)
+ except OSError:
+ raise
except Exception as exc:
msg_templ = "Error during postprocessing results: {0!s}"
raise RuntimeError(msg_templ.format(exc))