improve paramiko integration, replace .mesage for exceptions with str(exc)
diff --git a/wally/config.py b/wally/config.py
index d8b7085..7c5d377 100644
--- a/wally/config.py
+++ b/wally/config.py
@@ -127,6 +127,14 @@
fh.setLevel(logging.DEBUG)
logger.addHandler(fh)
logger_api.addHandler(fh)
+ else:
+ fh = None
logger_api.addHandler(sh)
logger_api.setLevel(logging.WARNING)
+
+ logger = logging.getLogger('paramiko')
+ logger.setLevel(logging.WARNING)
+ logger.addHandler(sh)
+ if fh is not None:
+ logger.addHandler(fh)
diff --git a/wally/report.py b/wally/report.py
index 85ac388..461c2a0 100644
--- a/wally/report.py
+++ b/wally/report.py
@@ -169,6 +169,6 @@
di = get_disk_info(processed_results)
render_html(path, di, lab_info)
except Exception as exc:
- logger.error("Failed to generate html report:" + exc.message)
+ logger.error("Failed to generate html report:" + str(exc))
else:
logger.info("Html report were stored in " + path)
diff --git a/wally/run_test.py b/wally/run_test.py
index 01bb8f6..b0a3bf1 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -5,7 +5,6 @@
import Queue
import pprint
import logging
-import StringIO
import argparse
import functools
import threading
@@ -65,8 +64,8 @@
raise ValueError("Unknown url type {0}".format(node.conn_url))
except Exception as exc:
# logger.exception("During connect to " + node.get_conn_id())
- msg = "During connect to {0}: {1}".format(node.get_conn_id(),
- exc.message)
+ msg = "During connect to {0}: {1!s}".format(node.get_conn_id(),
+ exc)
logger.error(msg)
node.connection = None
@@ -115,6 +114,8 @@
test_number_per_type[name] = test_num + 1
threads = []
barrier = utils.Barrier(len(test_nodes))
+ coord_q = Queue.Queue()
+ test_cls = tool_type_mapper[name]
for node in test_nodes:
msg = "Starting {0} test on {1} node"
@@ -128,21 +129,28 @@
if not os.path.exists(dr):
os.makedirs(dr)
- test = tool_type_mapper[name](params, res_q.put, test_uuid, node,
- log_directory=dr)
+ test = test_cls(params, res_q.put, test_uuid, node,
+ log_directory=dr,
+ coordination_queue=coord_q)
th = threading.Thread(None, test_thread, None,
(test, node, barrier, res_q))
threads.append(th)
th.daemon = True
th.start()
+ th = threading.Thread(None, test_cls.coordination_th, None,
+ (coord_q, barrier, len(threads)))
+ threads.append(th)
+ th.daemon = True
+ th.start()
+
def gather_results(res_q, results):
while not res_q.empty():
val = res_q.get()
if isinstance(val, Exception):
- msg = "Exception during test execution: {0}"
- raise ValueError(msg.format(val.message))
+ msg = "Exception during test execution: {0!s}"
+ raise ValueError(msg.format(val))
results.append(val)
@@ -528,10 +536,7 @@
logger.info("Start {0.__name__} stage".format(stage))
stage(cfg_dict, ctx)
except Exception as exc:
- emsg = exc.message
- if emsg == "":
- emsg = str(exc)
- msg = "Exception during {0.__name__}: {1}".format(stage, emsg)
+ msg = "Exception during {0.__name__}: {1!s}".format(stage, exc)
logger.error(msg)
finally:
exc, cls, tb = sys.exc_info()
diff --git a/wally/suits/io/agent.py b/wally/suits/io/agent.py
index 25910b4..5fb835e 100644
--- a/wally/suits/io/agent.py
+++ b/wally/suits/io/agent.py
@@ -61,8 +61,8 @@
else:
yield lineno, SETTING, line, None
except Exception as exc:
- pref = "During parsing line number {0}\n".format(lineno)
- raise ValueError(pref + exc.message)
+ pref = "During parsing line number {0}\n{1!s}".format(lineno, exc)
+ raise ValueError(pref)
def fio_config_parse(lexer_iter, format_params):
@@ -378,9 +378,9 @@
raise ValueError(msg.format(raw_out))
except Exception as exc:
- msg = "Can't parse fio output: {0!r}\nError: {1}"
+ msg = "Can't parse fio output: {0!r}\nError: {1!s}"
raw_out = raw_out[:100]
- raise ValueError(msg.format(raw_out, exc.message))
+ raise ValueError(msg.format(raw_out, exc))
return zip(parsed_out, config_slice)
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index 116e116..57fd3e7 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -25,11 +25,17 @@
class IPerfTest(object):
- def __init__(self, on_result_cb, test_uuid, node, log_directory=None):
+ def __init__(self, on_result_cb, test_uuid, node,
+ log_directory=None, coordination_queue=None):
self.on_result_cb = on_result_cb
self.log_directory = log_directory
self.node = node
self.test_uuid = test_uuid
+ self.coordination_queue = coordination_queue
+
+ def coordinate(self, data):
+ if self.coordination_queue is not None:
+ self.coordination_queue.put(data)
def pre_run(self):
pass
@@ -50,6 +56,10 @@
return run_over_ssh(self.node.connection, cmd,
node=self.node.get_conn_id(), **kwargs)
+ @classmethod
+ def coordination_th(cls, coord_q, barrier, num_threads):
+ pass
+
class TwoScriptTest(IPerfTest):
remote_tmp_dir = '/tmp'
@@ -94,8 +104,8 @@
try:
self.parse_results(out_err)
except Exception as exc:
- msg_templ = "Error during postprocessing results: {0!r}. {1}"
- raise RuntimeError(msg_templ.format(exc.message, out_err))
+ msg_templ = "Error during postprocessing results: {0!s}. {1}"
+ raise RuntimeError(msg_templ.format(exc, out_err))
class PgBenchTest(TwoScriptTest):
@@ -158,7 +168,7 @@
except OSError as err:
time.sleep(3)
else:
- raise OSError("Can't install fio - " + err.message)
+ raise OSError("Can't install fio - " + str(err))
local_fname = os.path.splitext(io_agent.__file__)[0] + ".py"
@@ -284,15 +294,15 @@
try:
with conn.open_sftp() as sftp:
- pid = read_from_remote(sftp, self.pid_file)
- no_pid_file = False
- except (NameError, IOError):
- no_pid_file = True
-
- sftp.close()
-
- if conn is not Local:
- conn.close()
+ try:
+ pid = read_from_remote(sftp, self.pid_file)
+ no_pid_file = False
+ except (NameError, IOError):
+ no_pid_file = True
+ finally:
+ if conn is not Local:
+ conn.close()
+ conn = None
if no_pid_file:
if pid is None:
@@ -309,11 +319,11 @@
logger.debug(msg.format(conn_id))
connection_ok = True
- except (socket.error, SSHException) as exc:
+ except (socket.error, SSHException, EOFError) as exc:
if connection_ok:
connection_ok = False
msg = "Lost connection with " + conn_id
- msg += ". Error: " + exc.message
+ msg += ". Error: " + str(exc)
logger.debug(msg)
logger.debug("Done")
@@ -325,7 +335,7 @@
with self.node.connection.open_sftp() as sftp:
# try to reboot and then connect
- out_err = read_from_remote(,
+ out_err = read_from_remote(sftp,
self.log_fl)
finally:
barrier.exit()
@@ -337,8 +347,8 @@
for data in parse_output(out_err):
self.on_result_cb(data)
except Exception as exc:
- msg_templ = "Error during postprocessing results: {0!r}"
- raise RuntimeError(msg_templ.format(exc.message))
+ msg_templ = "Error during postprocessing results: {0!s}"
+ raise RuntimeError(msg_templ.format(exc))
def merge_results(self, results):
if len(results) == 0:
diff --git a/wally/utils.py b/wally/utils.py
index 71fbe57..e3fda71 100644
--- a/wally/utils.py
+++ b/wally/utils.py
@@ -62,8 +62,8 @@
yield
except Exception as exc:
if isinstance(exc, types) and not isinstance(exc, StopIteration):
- templ = "Error during {0} stage: {1}"
- logger.debug(templ.format(action, exc.message))
+ templ = "Error during {0} stage: {1!s}"
+ logger.debug(templ.format(action, exc))
raise