a lot of changes
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index dd52f33..36d3fcf 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -15,7 +15,8 @@
from wally.ssh_utils import (copy_paths, run_over_ssh,
save_to_remote,
# delete_file,
- connect, read_from_remote, Local)
+ connect, read_from_remote, Local,
+ exists)
from . import postgres
from . import mysql
@@ -124,7 +125,6 @@
pre_run_script = os.path.join(root, "prepare.sh")
run_script = os.path.join(root, "run.sh")
-
@classmethod
def format_for_console(cls, data):
tab = texttable.Texttable(max_width=120)
@@ -163,7 +163,14 @@
self.config_fname + '.cfg')
self.alive_check_interval = self.options.get('alive_check_interval')
- self.config_params = self.options.get('params', {})
+
+ self.config_params = {}
+ for name, val in self.options.get('params', {}).items():
+ if isinstance(val, (list, tuple)):
+ val = "{%" + ','.join(map(str, val)) + "%}"
+ self.config_params[name] = val
+
+ self.config_params['VM_COUNT'] = self.options['testnodes_count']
self.tool = self.options.get('tool', 'fio')
self.raw_cfg = open(self.config_fname).read()
self.configs = list(io_agent.parse_all_in_1(self.raw_cfg,
@@ -230,6 +237,8 @@
cmd_templ = "dd oflag=direct " + \
"if=/dev/zero of={0} bs={1} count={2}"
+ # cmd_templ = "fio --rw=write --bs={1} --direct=1 --size={2} "
+
if self.use_sudo:
cmd_templ = "sudo " + cmd_templ
@@ -282,7 +291,7 @@
except Exception as exc:
msg = "Failed to create folder {0} on remote {1}. Error: {2!s}"
msg = msg.format(self.remote_dir, self.node.get_conn_id(), exc)
- logger.error(msg)
+ logger.exception(msg)
raise StopTestError(msg, exc)
self.install_utils()
@@ -318,31 +327,36 @@
except OSError:
return False
- def get_test_status(self, die_timeout=3):
+ def get_test_status(self, res_file=None):
+ found_res_file = False
is_connected = None
is_running = None
pid = None
err = None
try:
- conn = connect(self.node.conn_url,
- conn_timeout=self.tcp_conn_timeout)
- with conn:
- with conn.open_sftp() as sftp:
- try:
- pid = read_from_remote(sftp, self.pid_file)
- is_running = True
- except (NameError, IOError, OSError) as exc:
- pid = None
+ # conn = connect(self.node.conn_url,
+ # conn_timeout=self.tcp_conn_timeout)
+ # with conn:
+ conn = self.node.connection
+ with conn.open_sftp() as sftp:
+ try:
+ pid = read_from_remote(sftp, self.pid_file)
+ is_running = True
+ except (NameError, IOError, OSError) as exc:
+ pid = None
+ is_running = False
+
+ if is_running:
+ if not self.check_process_is_running(sftp, pid):
+ try:
+ sftp.remove(self.pid_file)
+ except (IOError, NameError, OSError):
+ pass
is_running = False
- if is_running:
- if not self.check_process_is_running(sftp, pid):
- try:
- sftp.remove(self.pid_file)
- except (IOError, NameError, OSError):
- pass
- is_running = False
+ if res_file is not None:
+ found_res_file = exists(sftp, res_file)
is_connected = True
@@ -350,9 +364,9 @@
err = str(exc)
is_connected = False
- return is_connected, is_running, pid, err
+ return found_res_file, is_connected, is_running, pid, err
- def wait_till_finished(self, soft_timeout, timeout):
+ def wait_till_finished(self, soft_timeout, timeout, res_fname=None):
conn_id = self.node.get_conn_id()
end_of_wait_time = timeout + time.time()
soft_end_of_wait_time = soft_timeout + time.time()
@@ -366,7 +380,11 @@
while end_of_wait_time > time.time():
time.sleep(time_till_check)
- is_connected, is_running, npid, err = self.get_test_status()
+ found_res_file, is_connected, is_running, npid, err = \
+ self.get_test_status(res_fname)
+
+ if found_res_file and not is_running:
+ return
if is_connected and not is_running:
if pid is None:
@@ -437,7 +455,6 @@
try:
for data in parse_output(out_err):
- data['__meta__']['raw_cfg'] = self.raw_cfg
self.on_result_cb(data)
except (OSError, StopTestError):
raise
@@ -458,11 +475,14 @@
if self.options.get("use_sudo", True):
cmd_templ = "sudo " + cmd_templ
- params = " ".join("{0}={1}".format(k, v)
- for k, v in self.config_params.items())
+ params = []
+ for k, v in self.config_params.items():
+ if isinstance(v, basestring) and v.startswith("{%"):
+ continue
+ params.append("{0}={1}".format(k, v))
- if "" != params:
- params = "--params " + params
+ if [] != params:
+ params = "--params " + " ".join(params)
with self.node.connection.open_sftp() as sftp:
save_to_remote(sftp, self.task_file,
@@ -501,59 +521,67 @@
logger.debug(msg.format(conn_id, screen_name))
# TODO: add monitoring socket
- if self.node.connection is not Local:
- self.node.connection.close()
+ # if not isinstance(self.node.connection, Local):
+ # self.node.connection.close()
- self.wait_till_finished(soft_tout, timeout)
+ self.wait_till_finished(soft_tout, timeout, self.log_fl)
if not nolog:
logger.debug("Test on node {0} is finished".format(conn_id))
- if self.node.connection is not Local:
- conn_timeout = self.tcp_conn_timeout * 3
- self.node.connection = connect(self.node.conn_url,
- conn_timeout=conn_timeout)
+ # if self.node.connection is not Local:
+ # conn_timeout = self.tcp_conn_timeout * 3
+ # self.node.connection = connect(self.node.conn_url,
+ # conn_timeout=conn_timeout)
with self.node.connection.open_sftp() as sftp:
return read_from_remote(sftp, self.log_fl)
@classmethod
def merge_results(cls, results):
- if len(results) == 0:
- return None
+ merged = results[0]
+ for block in results[1:]:
+ assert block["__meta__"] == merged["__meta__"]
+ merged['res'].extend(block['res'])
+ return merged
- merged_result = results[0]
- merged_data = merged_result['res']
- mergable_fields = ['bw', 'clat', 'iops', 'lat', 'slat']
+ # @classmethod
+ # def merge_results(cls, results):
+ # if len(results) == 0:
+ # return None
- for res in results[1:]:
- mm = merged_result['__meta__']
- assert mm['raw_cfg'] == res['__meta__']['raw_cfg']
- assert mm['params'] == res['__meta__']['params']
- mm['timings'].extend(res['__meta__']['timings'])
+ # merged_result = results[0]
+ # merged_data = merged_result['res']
+ # mergable_fields = ['bw', 'clat', 'iops', 'lat', 'slat']
- data = res['res']
- for testname, test_data in data.items():
- if testname not in merged_data:
- merged_data[testname] = test_data
- continue
+ # for res in results[1:]:
+ # mm = merged_result['__meta__']
+ # assert mm['raw_cfg'] == res['__meta__']['raw_cfg']
+ # assert mm['params'] == res['__meta__']['params']
+ # mm['timings'].extend(res['__meta__']['timings'])
- res_test_data = merged_data[testname]
+ # data = res['res']
+ # for testname, test_data in data.items():
+ # if testname not in merged_data:
+ # merged_data[testname] = test_data
+ # continue
- diff = set(test_data.keys()).symmetric_difference(
- res_test_data.keys())
+ # res_test_data = merged_data[testname]
- msg = "Difference: {0}".format(",".join(diff))
- assert len(diff) == 0, msg
+ # diff = set(test_data.keys()).symmetric_difference(
+ # res_test_data.keys())
- for k, v in test_data.items():
- if k in mergable_fields:
- res_test_data[k].extend(v)
- else:
- msg = "{0!r} != {1!r}".format(res_test_data[k], v)
- assert res_test_data[k] == v, msg
+ # msg = "Difference: {0}".format(",".join(diff))
+ # assert len(diff) == 0, msg
- return merged_result
+ # for k, v in test_data.items():
+ # if k in mergable_fields:
+ # res_test_data[k].extend(v)
+ # else:
+ # msg = "{0!r} != {1!r}".format(res_test_data[k], v)
+ # assert res_test_data[k] == v, msg
+
+ # return merged_result
@classmethod
def format_for_console(cls, data, dinfo):
- return io_formatter.format_results_for_console(data, dinfo)
+ return io_formatter.format_results_for_console(dinfo)