add local sensor datastore, make IO tests granular
diff --git a/wally/suits/io/agent.py b/wally/suits/io/agent.py
index 336b176..0a84ed1 100644
--- a/wally/suits/io/agent.py
+++ b/wally/suits/io/agent.py
@@ -290,12 +290,22 @@
return time
-def slice_config(sec_iter, runcycle=None, max_jobs=1000):
+def slice_config(sec_iter, runcycle=None, max_jobs=1000,
+ soft_runcycle=None):
jcount = 0
runtime = 0
curr_slice = []
+ prev_name = None
for pos, sec in enumerate(sec_iter):
+ if soft_runcycle is not None and prev_name != sec.name:
+ if runtime > soft_runcycle:
+ yield curr_slice
+ curr_slice = []
+ runtime = 0
+ jcount = 0
+
+ prev_name = sec.name
jc = sec.vals.get('numjobs', 1)
msg = "numjobs should be integer, not {0!r}".format(jc)
@@ -328,6 +338,7 @@
runtime = curr_task_time
jcount = jc
curr_slice = [sec]
+ prev_name = None
if curr_slice != []:
yield curr_slice
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index 1e948fb..70fdb66 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -126,7 +126,8 @@
class IOPerfTest(IPerfTest):
tcp_conn_timeout = 30
- max_pig_timeout = 30
+ max_pig_timeout = 5
+ soft_runcycle = 5 * 60
def __init__(self, *dt, **mp):
IPerfTest.__init__(self, *dt, **mp)
@@ -146,12 +147,28 @@
self.pid_file = self.join_remote("pid")
self.task_file = self.join_remote("task.cfg")
self.use_sudo = self.options.get("use_sudo", True)
+ self.test_logging = self.options.get("test_logging", False)
fio_command_file = open_for_append_or_create(cmd_log)
- cfg_s_it = io_agent.compile_all_in_1(self.raw_cfg, self.config_params)
+ if self.test_logging:
+ soft_runcycle = self.soft_runcycle
+ else:
+ soft_runcycle = None
+
+ self.fio_configs = io_agent.parse_and_slice_all_in_1(
+ self.raw_cfg,
+ self.config_params,
+ soft_runcycle=soft_runcycle)
+
+ self.fio_configs = list(self.fio_configs)
splitter = "\n\n" + "-" * 60 + "\n\n"
- fio_command_file.write(splitter.join(cfg_s_it))
+
+ cfg = splitter.join(
+ map(io_agent.fio_config_to_str,
+ self.fio_configs))
+
+ fio_command_file.write(cfg)
self.fio_raw_results_file = open_for_append_or_create(raw_res)
def __str__(self):
@@ -315,18 +332,22 @@
while end_of_wait_time > time.time():
time.sleep(time_till_check)
- is_connected, is_running, pid, err = self.get_test_status()
+ is_connected, is_running, npid, err = self.get_test_status()
- if not is_running:
- if pid is None and time.time() > pid_get_timeout:
- msg = ("On node {0} pid file doesn't " +
- "appears in time")
- logger.error(msg.format(conn_id))
- raise StopTestError("Start timeout")
+ if is_connected and not is_running:
+ if pid is None:
+ if time.time() > pid_get_timeout:
+ msg = ("On node {0} pid file doesn't " +
+ "appears in time")
+ logger.error(msg.format(conn_id))
+ raise StopTestError("Start timeout")
else:
# execution finished
break
+ if npid is not None:
+ pid = npid
+
if is_connected and not curr_connected:
msg = "Connection with {0} is restored"
logger.debug(msg.format(conn_id))
@@ -337,6 +358,49 @@
curr_connected = is_connected
def run(self, barrier):
+ try:
+ if len(self.fio_configs) > 1:
+
+ exec_time = 0
+ for test in self.fio_configs:
+ exec_time += io_agent.calculate_execution_time(test)
+
+ exec_time_s = sec_to_str(exec_time)
+ logger.info("Entire test should takes aroud: " + exec_time_s)
+
+ for pos, fio_cfg_slice in enumerate(self.fio_configs):
+ names = [i.name for i in fio_cfg_slice]
+ msgs = []
+ already_processed = set()
+ for name in names:
+ if name not in already_processed:
+ already_processed.add(name)
+
+ if 1 == names.count(name):
+ msgs.append(name)
+ else:
+ frmt = "{0} * {1}"
+ msgs.append(frmt.format(name,
+ names.count(name)))
+
+ logger.info("Will run tests: " + ", ".join(msgs))
+
+ out_err = self.do_run(barrier, fio_cfg_slice, nolog=(pos != 0))
+
+ try:
+ for data in parse_output(out_err):
+ data['__meta__']['raw_cfg'] = self.raw_cfg
+ self.on_result_cb(data)
+ except (OSError, StopTestError):
+ raise
+ except Exception as exc:
+ msg_templ = "Error during postprocessing results: {0!s}"
+ raise RuntimeError(msg_templ.format(exc))
+
+ finally:
+ barrier.exit()
+
+ def do_run(self, barrier, cfg, nolog=False):
conn_id = self.node.get_conn_id()
cmd_templ = "screen -S {screen_name} -d -m " + \
@@ -353,8 +417,8 @@
params = "--params " + params
with self.node.connection.open_sftp() as sftp:
- save_to_remote(sftp,
- self.task_file, self.raw_cfg)
+ save_to_remote(sftp, self.task_file,
+ io_agent.fio_config_to_str(cfg))
screen_name = self.test_uuid
cmd = cmd_templ.format(self.io_py_remote,
@@ -364,61 +428,46 @@
pid_file=self.pid_file,
results_file=self.log_fl,
screen_name=screen_name)
- msg = "Thread for node {0} is waiting on barrier"
- logger.debug(msg.format(conn_id))
- exec_time = io_agent.calculate_execution_time(self.configs)
+
+ exec_time = io_agent.calculate_execution_time(cfg)
exec_time_str = sec_to_str(exec_time)
- try:
- timeout = int(exec_time * 2 + 300)
- barrier.wait()
+ timeout = int(exec_time * 2 + 300)
+ barrier.wait()
+ ssh_nolog = nolog or (not self.is_primary)
+ self.run_over_ssh(cmd, nolog=ssh_nolog)
- if self.is_primary:
- templ = "Test should takes about {0}." + \
- " Should finish at {1}," + \
- " will wait at most till {2}"
- now_dt = datetime.datetime.now()
- end_dt = now_dt + datetime.timedelta(0, exec_time)
- wait_till = now_dt + datetime.timedelta(0, timeout)
+ if self.is_primary:
+ templ = "Test should takes about {0}." + \
+ " Should finish at {1}," + \
+ " will wait at most till {2}"
+ now_dt = datetime.datetime.now()
+ end_dt = now_dt + datetime.timedelta(0, exec_time)
+ wait_till = now_dt + datetime.timedelta(0, timeout)
- logger.info(templ.format(exec_time_str,
- end_dt.strftime("%H:%M:%S"),
- wait_till.strftime("%H:%M:%S")))
+ logger.info(templ.format(exec_time_str,
+ end_dt.strftime("%H:%M:%S"),
+ wait_till.strftime("%H:%M:%S")))
- self.run_over_ssh(cmd)
+ if not nolog:
+ msg = "Tests started in screen {1} on each testnode"
+ logger.debug(msg.format(conn_id, screen_name))
- msg = "Test on node {0} started in screen {1}"
- logger.debug(msg.format(conn_id, screen_name))
+ # TODO: add monitoring socket
+ if self.node.connection is not Local:
+ self.node.connection.close()
- # TODO: add monitoring socket
- if self.node.connection is not Local:
- self.node.connection.close()
+ self.wait_till_finished(timeout)
+ if not nolog:
+ logger.debug("Test on node {0} is finished".format(conn_id))
- self.wait_till_finished(timeout)
- logger.debug("Done")
+ if self.node.connection is not Local:
+ conn_timeout = self.tcp_conn_timeout * 3
+ self.node.connection = connect(self.node.conn_url,
+ conn_timeout=conn_timeout)
- if self.node.connection is not Local:
- conn_timeout = self.tcp_conn_timeout * 3
- self.node.connection = connect(self.node.conn_url,
- conn_timeout=conn_timeout)
-
- with self.node.connection.open_sftp() as sftp:
- out_err = read_from_remote(sftp, self.log_fl)
-
- finally:
- barrier.exit()
-
- self.on_result(out_err, cmd)
-
- def on_result(self, out_err, cmd):
- try:
- for data in parse_output(out_err):
- self.on_result_cb(data)
- except (OSError, StopTestError):
- raise
- except Exception as exc:
- msg_templ = "Error during postprocessing results: {0!s}"
- raise RuntimeError(msg_templ.format(exc))
+ with self.node.connection.open_sftp() as sftp:
+ return read_from_remote(sftp, self.log_fl)
def merge_results(self, results):
if len(results) == 0:
@@ -426,19 +475,17 @@
merged_result = results[0]
merged_data = merged_result['res']
- expected_keys = set(merged_data.keys())
mergable_fields = ['bw', 'clat', 'iops', 'lat', 'slat']
for res in results[1:]:
assert res['__meta__'] == merged_result['__meta__']
-
data = res['res']
- diff = set(data.keys()).symmetric_difference(expected_keys)
-
- msg = "Difference: {0}".format(",".join(diff))
- assert len(diff) == 0, msg
for testname, test_data in data.items():
+ if testname not in merged_data:
+ merged_data[testname] = test_data
+ continue
+
res_test_data = merged_data[testname]
diff = set(test_data.keys()).symmetric_difference(