add local sensor datastore, make IO tests granular
diff --git a/wally/config.py b/wally/config.py
index 83ba0ca..08a70d7 100644
--- a/wally/config.py
+++ b/wally/config.py
@@ -11,6 +11,7 @@
def pet_generate(x, y):
return str(uuid.uuid4())
+from pretty_yaml import dumps
cfg_dict = {}
@@ -45,12 +46,21 @@
mkdirs_if_unxists(cfg_dict['var_dir'])
in_var_dir = functools.partial(os.path.join, cfg_dict['var_dir'])
+ run_params_file = in_var_dir('run_params.yaml')
+
+ if explicit_folder is not None:
+ with open(run_params_file) as fd:
+ cfg_dict['run_uuid'] = yaml.load(fd)['run_uuid']
+ run_uuid = cfg_dict['run_uuid']
+ else:
+ with open(run_params_file, 'w') as fd:
+ fd.write(dumps({'run_uuid': cfg_dict['run_uuid']}))
cfg_dict['charts_img_path'] = in_var_dir('charts')
mkdirs_if_unxists(cfg_dict['charts_img_path'])
cfg_dict['vm_ids_fname'] = in_var_dir('os_vm_ids')
- cfg_dict['html_report_file'] = in_var_dir('report.html')
+ cfg_dict['html_report_file'] = in_var_dir('{0}_report.html')
cfg_dict['text_report_file'] = in_var_dir('report.txt')
cfg_dict['log_file'] = in_var_dir('log.txt')
cfg_dict['sensor_storage'] = in_var_dir('sensor_storage.txt')
diff --git a/wally/report.py b/wally/report.py
index 5198d13..aa7a4b0 100644
--- a/wally/report.py
+++ b/wally/report.py
@@ -43,7 +43,7 @@
lines=[
(latv, "msec", "rr", "lat"),
(iops_or_bw_per_vm, None, None,
- "IOPS per vm")
+ "IOPS per thread")
])
return str(ch)
@@ -150,14 +150,14 @@
report_funcs = []
-def report(names):
+def report(name, required_fields):
def closure(func):
- report_funcs.append((names.split(","), func))
+ report_funcs.append((required_fields.split(","), name, func))
return func
return closure
-@report('hdd_test_rrd4k,hdd_test_rws4k')
+@report('HDD', 'hdd_test_rrd4k,hdd_test_rws4k')
def make_hdd_report(processed_results, path, lab_info):
make_plots(processed_results, path)
di = get_disk_info(processed_results)
@@ -182,18 +182,26 @@
try:
processed_results = process_disk_info(results)
+ res_fields = sorted(processed_results.keys())
- for fields, func in report_funcs:
+ for fields, name, func in report_funcs:
for field in fields:
- if field not in processed_results:
+ pos = bisect.bisect_left(res_fields, field)
+
+ if pos == len(res_fields):
+ continue
+
+ if not res_fields[pos + 1].startswith(field):
break
else:
- func(processed_results, path, lab_info)
+ hpath = path.format(name)
+ func(processed_results, hpath, lab_info)
+ logger.debug(name + " report generated into " + hpath)
break
else:
logger.warning("No report generator found for this load")
except Exception as exc:
+ import traceback
+ traceback.print_exc()
logger.error("Failed to generate html report:" + str(exc))
- else:
- logger.info("Html report were stored in " + path)
diff --git a/wally/run_test.py b/wally/run_test.py
index 891b9d3..9cea103 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -2,6 +2,7 @@
import os
import sys
+import time
import Queue
import pprint
import logging
@@ -16,11 +17,16 @@
from concurrent.futures import ThreadPoolExecutor
from wally import pretty_yaml
-from wally.sensors_utils import deploy_sensors_stage
from wally.discover import discover, Node, undiscover
from wally import utils, report, ssh_utils, start_vms
from wally.suits.itest import IOPerfTest, PgBenchTest
from wally.config import cfg_dict, load_config, setup_loggers
+from wally.sensors_utils import deploy_sensors_stage, SensorDatastore
+
+try:
+ from wally import webui
+except ImportError:
+ webui = None
logger = logging.getLogger("wally")
@@ -478,6 +484,22 @@
return load_data_from_file
+def start_web_ui(cfg, ctx):
+ if webui is None:
+ logger.error("Can't start webui. Install cherrypy module")
+ ctx.web_thread = None
+ else:
+ th = threading.Thread(None, webui.web_main_thread, "webui", (None,))
+ th.daemon = True
+ th.start()
+ ctx.web_thread = th
+
+
+def stop_web_ui(cfg, ctx):
+ webui.web_main_stop()
+ time.sleep(1)
+
+
def parse_args(argv):
descr = "Disk io performance test suite"
parser = argparse.ArgumentParser(prog='wally', description=descr)
@@ -547,11 +569,15 @@
ctx.build_meta['build_descrption'] = opts.build_description
ctx.build_meta['build_type'] = opts.build_type
ctx.build_meta['username'] = opts.username
+ ctx.sensors_data = SensorDatastore()
cfg_dict['keep_vm'] = opts.keep_vm
cfg_dict['no_tests'] = opts.no_tests
cfg_dict['dont_discover_nodes'] = opts.dont_discover_nodes
+ if cfg_dict.get('run_web_ui', False):
+ start_web_ui(cfg_dict, ctx)
+
try:
for stage in stages:
logger.info("Start {0.__name__} stage".format(stage))
@@ -580,6 +606,9 @@
logger.info("All info stored in {0} folder".format(cfg_dict['var_dir']))
+ if cfg_dict.get('run_web_ui', False):
+ stop_web_ui(cfg_dict, ctx)
+
if exc is None:
logger.info("Tests finished successfully")
return 0
diff --git a/wally/sensors/main.py b/wally/sensors/main.py
index e8489d5..55a9584 100644
--- a/wally/sensors/main.py
+++ b/wally/sensors/main.py
@@ -29,7 +29,7 @@
else:
msg = "Sensor {0!r} isn't available".format(sensor_name)
raise ValueError(msg)
- return time.time(), result
+ return result
def parse_args(args):
@@ -54,10 +54,25 @@
sender = create_protocol(opts.url)
prev = {}
+ next_data_record_time = time.time()
while True:
- gtime, data = get_values(required_sensors.items())
- curr = {'time': SensorInfo(gtime, True)}
+ real_time = int(time.time())
+
+ if real_time < int(next_data_record_time):
+ if int(next_data_record_time) - real_time > 2:
+ print "Error: sleep too small portion!!"
+ report_time = int(next_data_record_time)
+ elif real_time > int(next_data_record_time):
+ if real_time - int(next_data_record_time) > 2:
+ report_time = real_time
+ else:
+ report_time = int(next_data_record_time)
+ else:
+ report_time = real_time
+
+ data = get_values(required_sensors.items())
+ curr = {'time': SensorInfo(report_time, True)}
for name, val in data.items():
if val.is_accumulated:
if name in prev:
@@ -69,9 +84,11 @@
if source_id is not None:
curr['source_id'] = source_id
+ print report_time, int((report_time - time.time()) * 10) * 0.1
sender.send(curr)
- time.sleep(opts.timeout)
+ next_data_record_time = report_time + opts.timeout + 0.5
+ time.sleep(next_data_record_time - time.time())
def pid_running(pid):
@@ -103,11 +120,13 @@
if pid_running(pid):
os.kill(pid, signal.SIGTERM)
- time.sleep(0.1)
+ time.sleep(0.5)
if pid_running(pid):
os.kill(pid, signal.SIGKILL)
+ time.sleep(0.5)
+
if os.path.isfile(pid_file):
os.unlink(pid_file)
elif opts.daemon == 'status':
diff --git a/wally/sensors_utils.py b/wally/sensors_utils.py
index 51a4dcb..e5b4347 100644
--- a/wally/sensors_utils.py
+++ b/wally/sensors_utils.py
@@ -1,4 +1,5 @@
import time
+import array
import Queue
import logging
import threading
@@ -11,14 +12,91 @@
stop_and_remove_sensors)
-logger = logging.getLogger("wally")
+logger = logging.getLogger("wally.sensors")
DEFAULT_RECEIVER_URL = "udp://{ip}:5699"
-def save_sensors_data(data_q, mon_q, fd):
+class SensorDatastore(object):
+ def __init__(self, stime=None):
+ self.lock = threading.Lock()
+ self.stime = stime
+
+ self.min_size = 60 * 60
+ self.max_size = 60 * 61
+
+ self.data = {
+ 'testnodes:io': array.array("B"),
+ 'testnodes:cpu': array.array("B"),
+ }
+
+ def get_values(self, name, start, end):
+ assert end >= start
+ if end == start:
+ return []
+
+ with self.lock:
+ curr_arr = self.data[name]
+ if self.stime is None:
+ return []
+
+ sidx = start - self.stime
+ eidx = end - self.stime
+
+ if sidx < 0 and eidx < 0:
+ return [0] * (end - start)
+ elif sidx < 0:
+ return [0] * (-sidx) + curr_arr[:eidx]
+ return curr_arr[sidx:eidx]
+
+ def set_values(self, start_time, vals):
+ with self.lock:
+ return self.add_values_l(start_time, vals)
+
+ def set_values_l(self, start_time, vals):
+ max_cut = 0
+ for name, values in vals.items():
+ curr_arr = self.data.setdefault(name, array.array("H"))
+
+ if self.stime is None:
+ self.stime = start_time
+
+ curr_end_time = len(curr_arr) + self.stime
+
+ if curr_end_time < start_time:
+ curr_arr.extend([0] * (start_time - curr_end_time))
+ curr_arr.extend(values)
+ elif curr_end_time > start_time:
+ logger.warning("Duplicated sensors data")
+ sindex = len(curr_arr) + (start_time - curr_end_time)
+
+ if sindex < 0:
+ values = values[-sindex:]
+ sindex = 0
+ logger.warning("Some data with timestamp before"
+ " beginning of measurememts. Skip them")
+
+ curr_arr[sindex:sindex + len(values)] = values
+ else:
+ curr_arr.extend(values)
+
+ if len(curr_arr) > self.max_size:
+ max_cut = max(len(curr_arr) - self.min_size, max_cut)
+
+ if max_cut > 0:
+ self.start_time += max_cut
+ for val in vals.values():
+ del val[:max_cut]
+
+
+def save_sensors_data(data_q, mon_q, fd, data_store, source2roles_map):
fd.write("\n")
+ BUFFER = 3
observed_nodes = set()
+ testnodes_data = {
+ 'io': {},
+ 'cpu': {},
+ }
try:
while True:
@@ -32,8 +110,27 @@
mon_q.put(addr + (data['source_id'],))
observed_nodes.add(addr)
- fd.write("{0!s} : {1!r}\n".format(time.time(),
- repr((addr, data))))
+ fd.write(repr((addr, data)) + "\n")
+
+ source_id = data.pop('source_id')
+ rep_time = data.pop('time')
+ if 'testnode' in source2roles_map.get(source_id, []):
+ vl = testnodes_data['io'].get(rep_time, 0)
+ sum_io_q = vl
+ testnodes_data['io'][rep_time] = sum_io_q
+
+ etime = time.time() - BUFFER
+ for name, vals in testnodes_data.items():
+ new_vals = {}
+ for rtime, value in vals.items():
+ if rtime < etime:
+ data_store.set_values("testnodes:io", rtime, [value])
+ else:
+ new_vals[rtime] = value
+
+ vals.clear()
+ vals.update(new_vals)
+
except Exception:
logger.exception("Error in sensors thread")
logger.info("Sensors thread exits")
@@ -42,6 +139,7 @@
def get_sensors_config_for_nodes(cfg, nodes):
monitored_nodes = []
sensors_configs = []
+ source2roles_map = {}
receiver_url = cfg.get("receiver_url", DEFAULT_RECEIVER_URL)
assert '{ip}' in receiver_url
@@ -64,6 +162,7 @@
ext_ip = utils.get_ip_for_target(ip)
monitor_url = receiver_url.format(ip=ext_ip)
+ source2roles_map[node.get_conn_id()] = node.roles
monitored_nodes.append(node)
sens_cfg = SensorConfig(node.connection,
node.get_conn_id(),
@@ -72,18 +171,20 @@
monitor_url=monitor_url)
sensors_configs.append(sens_cfg)
- return monitored_nodes, sensors_configs
+ return monitored_nodes, sensors_configs, source2roles_map
-def start_sensor_process_thread(ctx, cfg, sensors_configs):
+def start_sensor_process_thread(ctx, cfg, sensors_configs, source2roles_map):
receiver_url = cfg.get('receiver_url', DEFAULT_RECEIVER_URL)
sensors_data_q, stop_sensors_loop = \
start_listener_thread(receiver_url.format(ip='0.0.0.0'))
mon_q = Queue.Queue()
fd = open(cfg_dict['sensor_storage'], "w")
+
+ params = sensors_data_q, mon_q, fd, ctx.sensors_data, source2roles_map
sensor_listen_th = threading.Thread(None, save_sensors_data, None,
- (sensors_data_q, mon_q, fd))
+ params)
sensor_listen_th.daemon = True
sensor_listen_th.start()
@@ -105,8 +206,8 @@
if nodes is None:
nodes = ctx.nodes
- monitored_nodes, sensors_configs = get_sensors_config_for_nodes(cfg,
- nodes)
+ monitored_nodes, sensors_configs, source2roles_map = \
+ get_sensors_config_for_nodes(cfg, nodes)
if len(monitored_nodes) == 0:
logger.info("Nothing to monitor, no sensors would be installed")
@@ -115,12 +216,13 @@
if ctx.sensors_mon_q is None:
logger.info("Start sensors data receiving thread")
ctx.sensors_mon_q = start_sensor_process_thread(ctx, cfg,
- sensors_configs)
+ sensors_configs,
+ source2roles_map)
if undeploy:
def remove_sensors_stage(cfg, ctx):
- _, sensors_configs = get_sensors_config_for_nodes(cfg['sensors'],
- nodes)
+ _, sensors_configs, _ = \
+ get_sensors_config_for_nodes(cfg['sensors'], nodes)
stop_and_remove_sensors(sensors_configs)
ctx.clear_calls_stack.append(remove_sensors_stage)
diff --git a/wally/suits/io/agent.py b/wally/suits/io/agent.py
index 336b176..0a84ed1 100644
--- a/wally/suits/io/agent.py
+++ b/wally/suits/io/agent.py
@@ -290,12 +290,22 @@
return time
-def slice_config(sec_iter, runcycle=None, max_jobs=1000):
+def slice_config(sec_iter, runcycle=None, max_jobs=1000,
+ soft_runcycle=None):
jcount = 0
runtime = 0
curr_slice = []
+ prev_name = None
for pos, sec in enumerate(sec_iter):
+ if soft_runcycle is not None and prev_name != sec.name:
+ if runtime > soft_runcycle:
+ yield curr_slice
+ curr_slice = []
+ runtime = 0
+ jcount = 0
+
+ prev_name = sec.name
jc = sec.vals.get('numjobs', 1)
msg = "numjobs should be integer, not {0!r}".format(jc)
@@ -328,6 +338,7 @@
runtime = curr_task_time
jcount = jc
curr_slice = [sec]
+ prev_name = None
if curr_slice != []:
yield curr_slice
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index 1e948fb..70fdb66 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -126,7 +126,8 @@
class IOPerfTest(IPerfTest):
tcp_conn_timeout = 30
- max_pig_timeout = 30
+ max_pig_timeout = 5
+ soft_runcycle = 5 * 60
def __init__(self, *dt, **mp):
IPerfTest.__init__(self, *dt, **mp)
@@ -146,12 +147,28 @@
self.pid_file = self.join_remote("pid")
self.task_file = self.join_remote("task.cfg")
self.use_sudo = self.options.get("use_sudo", True)
+ self.test_logging = self.options.get("test_logging", False)
fio_command_file = open_for_append_or_create(cmd_log)
- cfg_s_it = io_agent.compile_all_in_1(self.raw_cfg, self.config_params)
+ if self.test_logging:
+ soft_runcycle = self.soft_runcycle
+ else:
+ soft_runcycle = None
+
+ self.fio_configs = io_agent.parse_and_slice_all_in_1(
+ self.raw_cfg,
+ self.config_params,
+ soft_runcycle=soft_runcycle)
+
+ self.fio_configs = list(self.fio_configs)
splitter = "\n\n" + "-" * 60 + "\n\n"
- fio_command_file.write(splitter.join(cfg_s_it))
+
+ cfg = splitter.join(
+ map(io_agent.fio_config_to_str,
+ self.fio_configs))
+
+ fio_command_file.write(cfg)
self.fio_raw_results_file = open_for_append_or_create(raw_res)
def __str__(self):
@@ -315,18 +332,22 @@
while end_of_wait_time > time.time():
time.sleep(time_till_check)
- is_connected, is_running, pid, err = self.get_test_status()
+ is_connected, is_running, npid, err = self.get_test_status()
- if not is_running:
- if pid is None and time.time() > pid_get_timeout:
- msg = ("On node {0} pid file doesn't " +
- "appears in time")
- logger.error(msg.format(conn_id))
- raise StopTestError("Start timeout")
+ if is_connected and not is_running:
+ if pid is None:
+ if time.time() > pid_get_timeout:
+ msg = ("On node {0} pid file doesn't " +
+ "appears in time")
+ logger.error(msg.format(conn_id))
+ raise StopTestError("Start timeout")
else:
# execution finished
break
+ if npid is not None:
+ pid = npid
+
if is_connected and not curr_connected:
msg = "Connection with {0} is restored"
logger.debug(msg.format(conn_id))
@@ -337,6 +358,49 @@
curr_connected = is_connected
def run(self, barrier):
+ try:
+ if len(self.fio_configs) > 1:
+
+ exec_time = 0
+ for test in self.fio_configs:
+ exec_time += io_agent.calculate_execution_time(test)
+
+ exec_time_s = sec_to_str(exec_time)
+ logger.info("Entire test should takes aroud: " + exec_time_s)
+
+ for pos, fio_cfg_slice in enumerate(self.fio_configs):
+ names = [i.name for i in fio_cfg_slice]
+ msgs = []
+ already_processed = set()
+ for name in names:
+ if name not in already_processed:
+ already_processed.add(name)
+
+ if 1 == names.count(name):
+ msgs.append(name)
+ else:
+ frmt = "{0} * {1}"
+ msgs.append(frmt.format(name,
+ names.count(name)))
+
+ logger.info("Will run tests: " + ", ".join(msgs))
+
+ out_err = self.do_run(barrier, fio_cfg_slice, nolog=(pos != 0))
+
+ try:
+ for data in parse_output(out_err):
+ data['__meta__']['raw_cfg'] = self.raw_cfg
+ self.on_result_cb(data)
+ except (OSError, StopTestError):
+ raise
+ except Exception as exc:
+ msg_templ = "Error during postprocessing results: {0!s}"
+ raise RuntimeError(msg_templ.format(exc))
+
+ finally:
+ barrier.exit()
+
+ def do_run(self, barrier, cfg, nolog=False):
conn_id = self.node.get_conn_id()
cmd_templ = "screen -S {screen_name} -d -m " + \
@@ -353,8 +417,8 @@
params = "--params " + params
with self.node.connection.open_sftp() as sftp:
- save_to_remote(sftp,
- self.task_file, self.raw_cfg)
+ save_to_remote(sftp, self.task_file,
+ io_agent.fio_config_to_str(cfg))
screen_name = self.test_uuid
cmd = cmd_templ.format(self.io_py_remote,
@@ -364,61 +428,46 @@
pid_file=self.pid_file,
results_file=self.log_fl,
screen_name=screen_name)
- msg = "Thread for node {0} is waiting on barrier"
- logger.debug(msg.format(conn_id))
- exec_time = io_agent.calculate_execution_time(self.configs)
+
+ exec_time = io_agent.calculate_execution_time(cfg)
exec_time_str = sec_to_str(exec_time)
- try:
- timeout = int(exec_time * 2 + 300)
- barrier.wait()
+ timeout = int(exec_time * 2 + 300)
+ barrier.wait()
+ ssh_nolog = nolog or (not self.is_primary)
+ self.run_over_ssh(cmd, nolog=ssh_nolog)
- if self.is_primary:
- templ = "Test should takes about {0}." + \
- " Should finish at {1}," + \
- " will wait at most till {2}"
- now_dt = datetime.datetime.now()
- end_dt = now_dt + datetime.timedelta(0, exec_time)
- wait_till = now_dt + datetime.timedelta(0, timeout)
+ if self.is_primary:
+ templ = "Test should takes about {0}." + \
+ " Should finish at {1}," + \
+ " will wait at most till {2}"
+ now_dt = datetime.datetime.now()
+ end_dt = now_dt + datetime.timedelta(0, exec_time)
+ wait_till = now_dt + datetime.timedelta(0, timeout)
- logger.info(templ.format(exec_time_str,
- end_dt.strftime("%H:%M:%S"),
- wait_till.strftime("%H:%M:%S")))
+ logger.info(templ.format(exec_time_str,
+ end_dt.strftime("%H:%M:%S"),
+ wait_till.strftime("%H:%M:%S")))
- self.run_over_ssh(cmd)
+ if not nolog:
+ msg = "Tests started in screen {1} on each testnode"
+ logger.debug(msg.format(conn_id, screen_name))
- msg = "Test on node {0} started in screen {1}"
- logger.debug(msg.format(conn_id, screen_name))
+ # TODO: add monitoring socket
+ if self.node.connection is not Local:
+ self.node.connection.close()
- # TODO: add monitoring socket
- if self.node.connection is not Local:
- self.node.connection.close()
+ self.wait_till_finished(timeout)
+ if not nolog:
+ logger.debug("Test on node {0} is finished".format(conn_id))
- self.wait_till_finished(timeout)
- logger.debug("Done")
+ if self.node.connection is not Local:
+ conn_timeout = self.tcp_conn_timeout * 3
+ self.node.connection = connect(self.node.conn_url,
+ conn_timeout=conn_timeout)
- if self.node.connection is not Local:
- conn_timeout = self.tcp_conn_timeout * 3
- self.node.connection = connect(self.node.conn_url,
- conn_timeout=conn_timeout)
-
- with self.node.connection.open_sftp() as sftp:
- out_err = read_from_remote(sftp, self.log_fl)
-
- finally:
- barrier.exit()
-
- self.on_result(out_err, cmd)
-
- def on_result(self, out_err, cmd):
- try:
- for data in parse_output(out_err):
- self.on_result_cb(data)
- except (OSError, StopTestError):
- raise
- except Exception as exc:
- msg_templ = "Error during postprocessing results: {0!s}"
- raise RuntimeError(msg_templ.format(exc))
+ with self.node.connection.open_sftp() as sftp:
+ return read_from_remote(sftp, self.log_fl)
def merge_results(self, results):
if len(results) == 0:
@@ -426,19 +475,17 @@
merged_result = results[0]
merged_data = merged_result['res']
- expected_keys = set(merged_data.keys())
mergable_fields = ['bw', 'clat', 'iops', 'lat', 'slat']
for res in results[1:]:
assert res['__meta__'] == merged_result['__meta__']
-
data = res['res']
- diff = set(data.keys()).symmetric_difference(expected_keys)
-
- msg = "Difference: {0}".format(",".join(diff))
- assert len(diff) == 0, msg
for testname, test_data in data.items():
+ if testname not in merged_data:
+ merged_data[testname] = test_data
+ continue
+
res_test_data = merged_data[testname]
diff = set(test_data.keys()).symmetric_difference(