fixes, fixes, fixes
diff --git a/wally/run_test.py b/wally/run_test.py
index 0b20657..07ac6e3 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -118,7 +118,7 @@
test_cls = tool_type_mapper[name]
rem_folder = cfg['default_test_local_folder'].format(name=name)
- for node in test_nodes:
+ for idx, node in enumerate(test_nodes):
msg = "Starting {0} test on {1} node"
logger.debug(msg.format(name, node.conn_url))
@@ -130,7 +130,11 @@
if not os.path.exists(dr):
os.makedirs(dr)
- test = test_cls(params, res_q.put, cfg['run_uuid'], node,
+ test = test_cls(options=params,
+ is_primary=(idx == 0),
+ on_result_cb=res_q.put,
+ test_uuid=cfg['run_uuid'],
+ node=node,
remote_dir=rem_folder,
log_directory=dr,
coordination_queue=coord_q)
diff --git a/wally/sensors/deploy_sensors.py b/wally/sensors/deploy_sensors.py
index 73e7902..6fff833 100644
--- a/wally/sensors/deploy_sensors.py
+++ b/wally/sensors/deploy_sensors.py
@@ -62,15 +62,18 @@
def stop_and_remove_sensor(conn, url, remote_path):
- cmd = 'env PYTHONPATH="{0}" python -m sensors.main -d stop'
+ try:
+ cmd = 'env PYTHONPATH="{0}" python -m sensors.main -d stop'
+ cmd = cmd.format(remote_path)
+ run_over_ssh(conn, cmd, node=url)
+ # some magic
+ time.sleep(0.3)
- run_over_ssh(conn, cmd.format(remote_path), node=url)
-
- # some magic
- time.sleep(0.3)
-
- # logger.warning("Sensors don't removed")
- run_over_ssh(conn, "rm -rf {0}".format(remote_path), node=url)
+ # logger.warning("Sensors don't removed")
+ run_over_ssh(conn, "rm -rf {0}".format(remote_path), node=url)
+ except Exception as exc:
+ msg = "Failed to remove sensors from node {0}: {1!s}"
+ logger.error(msg.format(url, exc))
def stop_and_remove_sensors(configs, remote_path='/tmp/sensors'):
diff --git a/wally/sensors_utils.py b/wally/sensors_utils.py
index 1bd2164..51a4dcb 100644
--- a/wally/sensors_utils.py
+++ b/wally/sensors_utils.py
@@ -119,7 +119,10 @@
if undeploy:
def remove_sensors_stage(cfg, ctx):
+ _, sensors_configs = get_sensors_config_for_nodes(cfg['sensors'],
+ nodes)
stop_and_remove_sensors(sensors_configs)
+
ctx.clear_calls_stack.append(remove_sensors_stage)
logger.info("Deploing new sensors on {0} node(s)".format(len(nodes)))
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index ed4e8c8..294909b 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -11,7 +11,7 @@
from wally.utils import ssize_to_b, open_for_append_or_create, sec_to_str
from wally.ssh_utils import (copy_paths, run_over_ssh,
- save_to_remote, ssh_mkdir,
+ save_to_remote,
# delete_file,
connect, read_from_remote, Local)
@@ -25,16 +25,18 @@
class IPerfTest(object):
- def __init__(self, on_result_cb, test_uuid, node,
+ def __init__(self, options, is_primary, on_result_cb, test_uuid, node,
log_directory=None,
coordination_queue=None,
remote_dir="/tmp/wally"):
+ self.options = options
self.on_result_cb = on_result_cb
self.log_directory = log_directory
self.node = node
self.test_uuid = test_uuid
self.coordination_queue = coordination_queue
self.remote_dir = remote_dir
+ self.is_primary = is_primary
def join_remote(self, path):
return os.path.join(self.remote_dir, path)
@@ -68,13 +70,12 @@
class TwoScriptTest(IPerfTest):
- def __init__(self, opts, *dt, **mp):
+ def __init__(self, *dt, **mp):
IPerfTest.__init__(self, *dt, **mp)
- self.opts = opts
- if 'run_script' in self.opts:
- self.run_script = self.opts['run_script']
- self.prepare_script = self.opts['prepare_script']
+ if 'run_script' in self.options:
+ self.run_script = self.options['run_script']
+ self.prepare_script = self.options['prepare_script']
def get_remote_for_script(self, script):
return os.path.join(self.tmp_dir, script.rpartition('/')[2])
@@ -93,7 +94,7 @@
def run(self, barrier):
remote_script = self.copy_script(self.node.connection, self.run_script)
cmd_opts = ' '.join(["%s %s" % (key, val) for key, val
- in self.opts.items()])
+ in self.options.items()])
cmd = remote_script + ' ' + cmd_opts
out_err = self.run_over_ssh(cmd)
self.on_result(out_err, cmd)
@@ -120,13 +121,12 @@
class IOPerfTest(IPerfTest):
- def __init__(self, test_options, *dt, **mp):
+ def __init__(self, *dt, **mp):
IPerfTest.__init__(self, *dt, **mp)
- self.options = test_options
- self.config_fname = test_options['cfg']
- self.alive_check_interval = test_options.get('alive_check_interval')
- self.config_params = test_options.get('params', {})
- self.tool = test_options.get('tool', 'fio')
+ self.config_fname = self.options['cfg']
+ self.alive_check_interval = self.options.get('alive_check_interval')
+ self.config_params = self.options.get('params', {})
+ self.tool = self.options.get('tool', 'fio')
self.raw_cfg = open(self.config_fname).read()
self.configs = list(io_agent.parse_all_in_1(self.raw_cfg,
self.config_params))
@@ -186,6 +186,7 @@
fill_bw = int(ssize / ddtime)
mess = "Initiall dd fill bw is {0} MiBps for this vm"
logger.info(mess.format(fill_bw))
+ self.coordinate(('init_bw', fill_bw))
def install_utils(self, max_retry=3, timeout=5):
need_install = []
@@ -232,10 +233,12 @@
if self.options.get('prefill_files', True):
self.prefill_test_files()
- else:
+ elif self.is_primary:
logger.warning("Prefilling of test files is disabled")
def run(self, barrier):
+ conn_id = self.node.get_conn_id()
+
cmd_templ = "screen -S {screen_name} -d -m " + \
"env python2 {0} -p {pid_file} -o {results_file} " + \
"--type {1} {2} --json {3}"
@@ -261,14 +264,16 @@
pid_file=self.pid_file,
results_file=self.log_fl,
screen_name=screen_name)
- logger.debug("Waiting on barrier")
-
+ msg = "Thread for node {0} is waiting on barrier"
+ logger.debug(msg.format(conn_id))
exec_time = io_agent.calculate_execution_time(self.configs)
exec_time_str = sec_to_str(exec_time)
try:
timeout = int(exec_time * 2 + 300)
- if barrier.wait():
+ barrier.wait()
+
+ if self.is_primary:
templ = "Test should takes about {0}." + \
" Should finish at {1}," + \
" will wait at most till {2}"
@@ -281,12 +286,13 @@
wait_till.strftime("%H:%M:%S")))
self.run_over_ssh(cmd)
- logger.debug("Test started in screen {0}".format(screen_name))
+ msg = "Test on node {0} started in screen {1}"
+ logger.debug(msg.format(conn_id, screen_name))
end_of_wait_time = timeout + time.time()
# time_till_check = random.randint(30, 90)
- time_till_check = 1
+ time_till_check = 5
pid = None
no_pid_file = True
@@ -298,8 +304,6 @@
if self.node.connection is not Local:
self.node.connection.close()
- conn_id = self.node.get_conn_id()
-
while end_of_wait_time > time.time():
conn = None
time.sleep(time_till_check)