fix muptiply tests in one run, update report generation
diff --git a/report_templates/base.html b/report_templates/base.html
new file mode 100644
index 0000000..c517ac9
--- /dev/null
+++ b/report_templates/base.html
@@ -0,0 +1,34 @@
+<!DOCTYPE html>
+<html>
+ <head>
+ <title>Performance Report</title>
+ <link rel="stylesheet"
+ href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/css/bootstrap.min.css">
+ </head>
+ <body>
+ <div class="page-header text-center">
+ <h2>Performance Report</h2>
+ </div>
+
+ <!--
+ 0) Menu
+ 1) Lab very short performance: max IOPS, max BW, EC2 VM count
+ 2) Engineering report
+ 3) boxplots
+ 4) BW/lat/IOPS = f(time) report
+ 5) Bottlneck/consumption reports
+ 6) Excessive lab info
+ 7) Report description
+ -->
+
+ <div class="container-fluid text-center">
+ <div class="row" style="margin-bottom: 40px">
+ <div class="col-md-12">
+ <center>
+ {% block content %}{% endblock %}
+ </center>
+ </div>
+ </div>
+ </div>
+ </body>
+</html>
\ No newline at end of file
diff --git a/report_templates/report_all.html b/report_templates/report_all.html
new file mode 100644
index 0000000..85d3bf6
--- /dev/null
+++ b/report_templates/report_all.html
@@ -0,0 +1,46 @@
+{% extends "base.html" %}
+{% block content %}
+
+{% if ceph_summary is not None or OS_summary is not None %}
+ <h4>Summary</h4>
+ <table style="width: auto;" class="table table-bordered table-striped">
+ {% if ceph_summary is not None %}
+ <tr>
+ <td>OSD count</td><td>{{ceph_summary.osd_count}}</td>
+ <td>Total Ceph disks count</td><td>{{ceph_summary.OSD_hdd_count}}</td>
+ </tr>
+ {% endif OS_summary is not None %}
+ <td>Compute count</td><td>{{OS_summary.compute_count}}</td>
+ {% endif %}
+ </table>
+{% endif %}
+
+{% if perf_summary is not None %}
+ <table><tr><td>
+ <H4>Random direct performance,<br>4KiB blocks</H4>
+ {% make_table(2, 'style="width: auto;" class="table table-bordered table-striped"',
+ "Operation", "IOPS",
+ Read, {{perf_summary.direct_iops_r_max[0]}} ~ {{perf_summary.direct_iops_r_max[1]}}%,
+ Write, {{perf_summary.direct_iops_w_max[0]}} ~ {{perf_summary.direct_iops_w_max[1]}}%) %}
+ </td><td> </td><td>
+ <H4>Random direct performance,<br>16MiB blocks</H4>
+ {% make_table(2, 'style="width: auto;" class="table table-bordered table-striped"',
+ "Operation", "BW MiBps",
+ Read, {{perf_summary.bw_read_max[0]}} ~ {{perf_summary.bw_read_max[1]}}%,
+ Write, {{perf_summary.bw_write_max[0]}} ~ {{perf_summary.bw_write_max[1]}}%) %}
+ </td><td> </td><td>
+ <H4>Maximal sync random write IOPS<br> for given latency, 4KiB</H4>
+ {% make_table(2, 'style="width: auto;" class="table table-bordered table-striped">',
+ "Latency ms", "IOPS",
+ 10, {{perf_summary.rws4k_10ms}},
+ 30, {{perf_summary.rws4k_30ms}},
+ 100, {{perf_summary.rws4k_100ms}}) %}
+ </td></tr></table>
+<br>
+
+{% make_table_nh(2, "",
+ {{perf_summary.rand_read_iops}}, {{perf_summary.rand_write_iops}},
+ {{perf_summary.rand_read_bw}}, {{perf_summary.rand_write_bw}}) %}
+{% endif %}
+
+{% endblock %}
diff --git a/report_templates/report_hdd.html b/report_templates/report_hdd.html
index ea04eb5..72f7c0b 100644
--- a/report_templates/report_hdd.html
+++ b/report_templates/report_hdd.html
@@ -71,12 +71,8 @@
</div>
</div>
<div class="row">
- <div class="col-md-6">
- <img src="charts/rand_read_4k.{img_ext}"/>
- </div>
- <div class="col-md-6">
- <img src="charts/rand_write_4k.{img_ext}"/>
- </div>
+ <div class="col-md-6">{rand_read_4k}</div>
+ <div class="col-md-6">{rand_write_4k}</div>
</div>
<!--div class="row">
<table style="width: auto;" class="table table-bordered table-striped">
diff --git a/report_templates/report_iops_vs_lat.html b/report_templates/report_iops_vs_lat.html
new file mode 100644
index 0000000..fcb7ec2
--- /dev/null
+++ b/report_templates/report_iops_vs_lat.html
@@ -0,0 +1,29 @@
+<!DOCTYPE html>
+<html>
+<head>
+ <title>Report</title>
+ <link rel="stylesheet"
+ href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/css/bootstrap.min.css">
+</head>
+
+<body>
+<div class="page-header text-center">
+ <h2>IOPS vs Block size</h2>
+</div>
+<div class="container-fluid text-center">
+
+ <div class="row" style="margin-bottom: 40px">
+ <div class="col-md-12">
+ <center>
+ <H3>Latency vs IOPS vs requested IOPS, {oper_descr}</H3> <br>
+ <table><tr>
+ <td>{iops_vs_lat}</td>
+ <td>{iops_vs_requested}</td>
+ </tr>
+ </center>
+ </div>
+ </div>
+</div>
+</body>
+
+</html>
diff --git a/wally/config.py b/wally/config.py
index 2a09d47..e587a8e 100644
--- a/wally/config.py
+++ b/wally/config.py
@@ -16,18 +16,58 @@
cfg_dict = {}
+class NoData(object):
+ @classmethod
+ def get(cls, name, x):
+ return cls
+
+
+class Config(object):
+ def get(self, name, defval=None):
+ obj = self.__dict__
+ for cname in name.split("."):
+ obj = obj.get(cname, NoData)
+
+ if obj is NoData:
+ return defval
+ return obj
+
+
+cfg = Config()
+cfg.__dict__ = cfg_dict
+
+
def mkdirs_if_unxists(path):
if not os.path.exists(path):
os.makedirs(path)
+def get_test_files(results_dir):
+ in_var_dir = functools.partial(os.path.join, results_dir)
+
+ res = dict(
+ run_params_file='run_params.yaml',
+ saved_config_file='config.yaml',
+ vm_ids_fname='os_vm_ids',
+ html_report_file='{0}_report.html',
+ text_report_file='report.txt',
+ log_file='log.txt',
+ sensor_storage='sensor_storage',
+ nodes_report_file='nodes.yaml',
+ results='results',
+ hwinfo_directory='hwinfo',
+ hwreport_fname='hwinfo.txt',
+ raw_results='raw_results.yaml')
+
+ res = dict((k, in_var_dir(v)) for k, v in res.items())
+ res['var_dir'] = results_dir
+ return res
+
+
def load_config(file_name, explicit_folder=None):
- first_load = len(cfg_dict) == 0
cfg_dict.update(yaml.load(open(file_name).read()))
- if first_load:
- var_dir = cfg_dict.get('internal', {}).get('var_dir_root', '/tmp')
-
+ var_dir = cfg_dict.get('internal', {}).get('var_dir_root', '/tmp')
run_uuid = None
if explicit_folder is None:
for i in range(10):
@@ -42,27 +82,18 @@
else:
results_dir = explicit_folder
- cfg_dict['var_dir'] = results_dir
+ cfg_dict.update(get_test_files(results_dir))
mkdirs_if_unxists(cfg_dict['var_dir'])
- in_var_dir = functools.partial(os.path.join, cfg_dict['var_dir'])
- run_params_file = in_var_dir('run_params.yaml')
-
if explicit_folder is not None:
- with open(run_params_file) as fd:
+ with open(cfg_dict['run_params_file']) as fd:
cfg_dict['run_uuid'] = yaml.load(fd)['run_uuid']
run_uuid = cfg_dict['run_uuid']
else:
- with open(run_params_file, 'w') as fd:
+ with open(cfg_dict['run_params_file'], 'w') as fd:
fd.write(dumps({'run_uuid': cfg_dict['run_uuid']}))
- cfg_dict['vm_ids_fname'] = in_var_dir('os_vm_ids')
- cfg_dict['html_report_file'] = in_var_dir('{0}_report.html')
- cfg_dict['text_report_file'] = in_var_dir('report.txt')
- cfg_dict['log_file'] = in_var_dir('log.txt')
- cfg_dict['sensor_storage'] = in_var_dir('sensor_storage')
mkdirs_if_unxists(cfg_dict['sensor_storage'])
- cfg_dict['nodes_report_file'] = in_var_dir('nodes.yaml')
if 'sensors_remote_path' not in cfg_dict:
cfg_dict['sensors_remote_path'] = '/tmp/sensors'
@@ -72,11 +103,7 @@
cfg_dict['default_test_local_folder'] = \
testnode_log_dir.format(cfg_dict['run_uuid'])
- cfg_dict['test_log_directory'] = in_var_dir('test_logs')
- mkdirs_if_unxists(cfg_dict['test_log_directory'])
-
- cfg_dict['hwinfo_directory'] = in_var_dir('hwinfo')
- cfg_dict['hwreport_fname'] = in_var_dir('hwinfo.txt')
+ mkdirs_if_unxists(cfg_dict['results'])
mkdirs_if_unxists(cfg_dict['hwinfo_directory'])
diff --git a/wally/fuel_rest_api.py b/wally/fuel_rest_api.py
index 621e637..b110fc2 100644
--- a/wally/fuel_rest_api.py
+++ b/wally/fuel_rest_api.py
@@ -349,7 +349,8 @@
creds['username'] = access['user']['value']
creds['password'] = access['password']['value']
creds['tenant_name'] = access['tenant']['value']
- creds['os_auth_url'] = self.get_networks()['public_vip']
+ creds['os_auth_url'] = "http://{0}:5000/v2.0".format(
+ self.get_networks()['public_vip'])
return creds
def get_nodes(self):
diff --git a/wally/report.py b/wally/report.py
index 04577a9..650666b 100644
--- a/wally/report.py
+++ b/wally/report.py
@@ -66,6 +66,11 @@
self.concurence = self.params.vals.get('numjobs', 1)
+# disk_info = None
+# base = None
+# linearity = None
+
+
def group_by_name(test_data):
name_map = collections.defaultdict(lambda: [])
@@ -186,9 +191,11 @@
plt.xlabel("Block size")
plt.ylabel("IO time, ms")
- plt.legend(loc=0)
+ plt.subplots_adjust(top=0.85)
+ plt.legend(bbox_to_anchor=(0.5, 1.2), loc='upper center')
plt.grid()
iotime_plot = get_emb_data_svg(plt)
+ plt.clf()
_, ax1 = plt.subplots()
plt.boxplot(data)
@@ -197,6 +204,7 @@
plt.xlabel("Block size")
plt.ylabel("IOPS")
plt.grid()
+ plt.subplots_adjust(top=0.85)
iops_plot = get_emb_data_svg(plt)
@@ -218,26 +226,53 @@
@report('lat_vs_iops', 'lat_vs_iops')
def lat_vs_iops(processed_results, path, lab_info):
lat_iops = collections.defaultdict(lambda: [])
+ requsted_vs_real = collections.defaultdict(lambda: {})
+
for res in processed_results.values():
if res.name.startswith('lat_vs_iops'):
lat_iops[res.concurence].append((res.lat.average / 1000.0,
res.lat.deviation / 1000.0,
res.iops.average,
res.iops.deviation))
+ requested_iops = res.p.rate_iops * res.concurence
+ requsted_vs_real[res.concurence][requested_iops] = \
+ (res.iops.average, res.iops.deviation)
- colors = ['red', 'green', 'blue', 'orange', 'magenta', "teal"][::-1]
+ colors = ['red', 'green', 'blue', 'orange', 'magenta', "teal"]
+ colors_it = iter(colors)
for conc, lat_iops in sorted(lat_iops.items()):
lat, dev, iops, iops_dev = zip(*lat_iops)
plt.errorbar(iops, lat, xerr=iops_dev, yerr=dev, fmt='ro',
label=str(conc) + " threads",
- color=colors.pop())
+ color=next(colors_it))
plt.xlabel("IOPS")
plt.ylabel("Latency, ms")
plt.grid()
plt.legend(loc=0)
- plt.show()
- exit(1)
+ plt_iops_vs_lat = get_emb_data_svg(plt)
+ plt.clf()
+
+ colors_it = iter(colors)
+ for conc, req_vs_real in sorted(requsted_vs_real.items()):
+ req, real = zip(*sorted(req_vs_real.items()))
+ iops, dev = zip(*real)
+ plt.errorbar(req, iops, yerr=dev, fmt='ro',
+ label=str(conc) + " threads",
+ color=next(colors_it))
+ plt.xlabel("Requested IOPS")
+ plt.ylabel("Get IOPS")
+ plt.grid()
+ plt.legend(loc=0)
+ plt_iops_vs_requested = get_emb_data_svg(plt)
+
+ res1 = processed_results.values()[0]
+ params_map = {'iops_vs_lat': plt_iops_vs_lat,
+ 'iops_vs_requested': plt_iops_vs_requested,
+ 'oper_descr': get_test_lcheck_params(res1).capitalize()}
+
+ with open(path, 'w') as fd:
+ fd.write(get_template('report_iops_vs_lat.html').format(**params_map))
def render_all_html(dest, info, lab_description, images, templ_name):
diff --git a/wally/run_test.py b/wally/run_test.py
index 22856b5..5e6b4f9 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -14,6 +14,7 @@
import collections
import yaml
+import texttable
import faulthandler
from concurrent.futures import ThreadPoolExecutor
@@ -23,7 +24,8 @@
from wally.timeseries import SensorDatastore
from wally import utils, report, ssh_utils, start_vms
from wally.suits import IOPerfTest, PgBenchTest, MysqlTest
-from wally.config import cfg_dict, load_config, setup_loggers
+from wally.config import (cfg_dict, load_config, setup_loggers,
+ get_test_files)
from wally.sensors_utils import with_sensors_util, sensors_info_util
TOOL_TYPE_MAPPER = {
@@ -150,8 +152,8 @@
res_q.put(exc)
-def run_single_test(test_nodes, name, test_cls, params,
- test_local_folder, run_uuid, counter=[0]):
+def run_single_test(test_nodes, name, test_cls, params, log_directory,
+ test_local_folder, run_uuid):
logger.info("Starting {0} tests".format(name))
res_q = Queue.Queue()
threads = []
@@ -163,13 +165,6 @@
msg = "Starting {0} test on {1} node"
logger.debug(msg.format(name, node.conn_url))
- dname = "{0}_{1}_{2}".format(name, counter[0], node.get_ip())
- counter[0] += 1
- dr = os.path.join(cfg_dict['test_log_directory'], dname)
-
- if not os.path.exists(dr):
- os.makedirs(dr)
-
params = params.copy()
params['testnodes_count'] = len(test_nodes)
test = test_cls(options=params,
@@ -178,16 +173,18 @@
test_uuid=run_uuid,
node=node,
remote_dir=rem_folder,
- log_directory=dr,
+ log_directory=log_directory,
coordination_queue=coord_q,
total_nodes_count=len(test_nodes))
- th = threading.Thread(None, test_thread, None,
+ th = threading.Thread(None, test_thread,
+ "Test:" + node.get_conn_id(),
(test, node, barrier, res_q))
threads.append(th)
th.daemon = True
th.start()
- th = threading.Thread(None, test_cls.coordination_th, None,
+ th = threading.Thread(None, test_cls.coordination_th,
+ "Coordination thread",
(coord_q, barrier, len(threads)))
threads.append(th)
th.daemon = True
@@ -198,6 +195,7 @@
while len(threads) != 0:
nthreads = []
+ time.sleep(0.1)
for th in threads:
if not th.is_alive():
@@ -271,6 +269,24 @@
if 0 == len(curr_test_nodes):
continue
+ # make a directory for results
+ all_tests_dirs = os.listdir(cfg_dict['results'])
+
+ if 'name' in params:
+ dir_name = "{0}_{1}".format(name, params['name'])
+ else:
+ for idx in range(len(all_tests_dirs) + 1):
+ dir_name = "{0}_{1}".format(name, idx)
+ if dir_name not in all_tests_dirs:
+ break
+ else:
+ raise utils.StopTestError(
+ "Can't select directory for test results")
+
+ dir_path = os.path.join(cfg_dict['results'], dir_name)
+ if not os.path.exists(dir_path):
+ os.mkdir(dir_path)
+
if cfg.get('suspend_unused_vms', True):
pausable_nodes_ids = suspend_vm_nodes(unused_nodes)
@@ -287,8 +303,11 @@
sens_nodes = curr_test_nodes + not_test_nodes
with sensors_info_util(cfg, sens_nodes) as sensor_data:
t_start = time.time()
- res = run_single_test(curr_test_nodes, name, test_cls,
+ res = run_single_test(curr_test_nodes,
+ name,
+ test_cls,
params,
+ dir_path,
cfg['default_test_local_folder'],
cfg['run_uuid'])
t_end = time.time()
@@ -379,9 +398,7 @@
def reuse_vms_stage(cfg, ctx):
- p = cfg.get('clouds', {})
- p = p.get('openstack', {})
- p = p.get('vms', [])
+ p = cfg.get('clouds', {}).get('openstack', {}).get('vms', [])
for creds in p:
vm_name_pattern, conn_pattern = creds.split(",")
@@ -425,6 +442,8 @@
def get_OS_credentials(cfg, ctx):
creds = None
+ tenant = None
+
if 'openstack' in cfg['clouds']:
os_cfg = cfg['clouds']['openstack']
if 'OPENRC' in os_cfg:
@@ -434,18 +453,18 @@
elif 'ENV' in os_cfg:
logger.info("Using OS credentials from shell environment")
user, passwd, tenant, auth_url = start_vms.ostack_get_creds()
- else:
+ elif 'OS_TENANT_NAME' in os_cfg:
logger.info("Using predefined credentials")
tenant = os_cfg['OS_TENANT_NAME'].strip()
user = os_cfg['OS_USERNAME'].strip()
passwd = os_cfg['OS_PASSWORD'].strip()
auth_url = os_cfg['OS_AUTH_URL'].strip()
- elif 'fuel' in cfg['clouds'] and \
- 'openstack_env' in cfg['clouds']['fuel']:
+ if tenant is None and 'fuel' in cfg['clouds'] and \
+ 'openstack_env' in cfg['clouds']['fuel']:
logger.info("Using fuel creds")
creds = ctx.fuel_openstack_creds
- else:
+ elif tenant is None:
logger.error("Can't found OS credentials")
raise utils.StopTestError("Can't found OS credentials", None)
@@ -472,7 +491,9 @@
start_vms.nova_connect(**os_creds)
params.update(config)
- params['keypair_file_private'] = params['keypair_name'] + ".pem"
+ if 'keypair_file_private' not in params:
+ params['keypair_file_private'] = params['keypair_name'] + ".pem"
+
params['group_name'] = cfg_dict['run_uuid']
if not config.get('skip_preparation', False):
@@ -499,7 +520,7 @@
def run_tests_stage(cfg, ctx):
- ctx.results = []
+ ctx.results = collections.defaultdict(lambda: [])
if 'tests' not in cfg:
return
@@ -532,11 +553,12 @@
with with_sensors_util(cfg_dict, ctx.nodes):
for test_group in config.get('tests', []):
- ctx.results.extend(run_tests(cfg, test_group,
- ctx.nodes))
+ for tp, res in run_tests(cfg, test_group, ctx.nodes):
+ ctx.results[tp].extend(res)
else:
with with_sensors_util(cfg_dict, ctx.nodes):
- ctx.results.extend(run_tests(cfg, group, ctx.nodes))
+ for tp, res in run_tests(cfg, group, ctx.nodes):
+ ctx.results[tp].extend(res)
def shut_down_vms_stage(cfg, ctx):
@@ -575,14 +597,14 @@
def store_raw_results_stage(cfg, ctx):
- raw_results = os.path.join(cfg_dict['var_dir'], 'raw_results.yaml')
+ raw_results = cfg_dict['raw_results']
if os.path.exists(raw_results):
cont = yaml.load(open(raw_results).read())
else:
cont = []
- cont.extend(utils.yamable(ctx.results))
+ cont.extend(utils.yamable(ctx.results).items())
raw_data = pretty_yaml.dumps(cont)
with open(raw_results, "w") as fd:
@@ -590,30 +612,33 @@
def console_report_stage(cfg, ctx):
- for tp, data in ctx.results:
- if 'io' == tp and data is not None:
- dinfo = report.process_disk_info(data)
- text_rep_fname = cfg['text_report_file']
- rep = IOPerfTest.format_for_console(data, dinfo)
+ first_report = True
+ text_rep_fname = cfg['text_report_file']
+ with open(text_rep_fname, "w") as fd:
+ for tp, data in ctx.results.items():
+ if 'io' == tp and data is not None:
+ dinfo = report.process_disk_info(data)
+ rep = IOPerfTest.format_for_console(data, dinfo)
+ elif tp in ['mysql', 'pgbench'] and data is not None:
+ rep = MysqlTest.format_for_console(data)
+ else:
+ logger.warning("Can't generate text report for " + tp)
+ continue
- with open(text_rep_fname, "w") as fd:
- fd.write(rep)
- fd.write("\n")
- fd.flush()
+ fd.write(rep)
+ fd.write("\n")
- logger.info("Text report were stored in " + text_rep_fname)
+ if first_report:
+ logger.info("Text report were stored in " + text_rep_fname)
+ first_report = False
+
print("\n" + rep + "\n")
- if tp in ['mysql', 'pgbench'] and data is not None:
- print("\n")
- print(MysqlTest.format_for_console(data))
- print("\n")
-
def html_report_stage(cfg, ctx):
html_rep_fname = cfg['html_report_file']
found = False
- for tp, data in ctx.results:
+ for tp, data in ctx.results.items():
if 'io' == tp and data is not None:
if found:
logger.error("Making reports for more than one " +
@@ -635,10 +660,10 @@
def load_data_from(var_dir):
def load_data_from_file(_, ctx):
raw_results = os.path.join(var_dir, 'raw_results.yaml')
- ctx.results = []
+ ctx.results = {}
for tp, results in yaml.load(open(raw_results).read()):
cls = TOOL_TYPE_MAPPER[tp]
- ctx.results.append((tp, map(cls.load, results)))
+ ctx.results[tp] = map(cls.load, results)
return load_data_from_file
@@ -684,6 +709,7 @@
help="Skip html report", default=False)
parser.add_argument("--params", metavar="testname.paramname",
help="Test params", default=[])
+ parser.add_argument("--ls", action='store_true', default=False)
parser.add_argument("config_file")
return parser.parse_args(argv[1:])
@@ -696,7 +722,52 @@
return func.__name__ + " stage"
+def list_results(path):
+ results = []
+
+ for dname in os.listdir(path):
+
+ cfg = get_test_files(os.path.join(path, dname))
+
+ if not os.path.isfile(cfg['raw_results']):
+ continue
+
+ res_mtime = time.ctime(os.path.getmtime(cfg['raw_results']))
+ cfg = yaml.load(open(cfg['saved_config_file']).read())
+
+ test_names = []
+
+ for block in cfg['tests']:
+ assert len(block.items()) == 1
+ name, data = block.items()[0]
+ if name == 'start_test_nodes':
+ for in_blk in data['tests']:
+ assert len(in_blk.items()) == 1
+ in_name, _ = in_blk.items()[0]
+ test_names.append(in_name)
+ else:
+ test_names.append(name)
+ results.append((dname, test_names, res_mtime))
+
+ tab = texttable.Texttable(max_width=120)
+ tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
+ tab.set_cols_align(["l", "l", "l"])
+ results.sort(key=lambda x: x[2])
+
+ for data in results:
+ dname, tests, mtime = data
+ tab.add_row([dname, ', '.join(tests), mtime])
+
+ tab.header(["Name", "Tests", "etime"])
+
+ print(tab.draw())
+
+
def main(argv):
+ if '--ls' in argv:
+ list_results(argv[-1])
+ exit(0)
+
faulthandler.register(signal.SIGUSR1, all_threads=True)
opts = parse_args(argv)
load_config(opts.config_file, opts.post_process_only)
@@ -708,6 +779,10 @@
setup_loggers(level, cfg_dict['log_file'])
+ if not os.path.exists(cfg_dict['saved_config_file']):
+ with open(cfg_dict['saved_config_file'], 'w') as fd:
+ fd.write(open(opts.config_file).read())
+
if opts.post_process_only is not None:
stages = [
load_data_from(opts.post_process_only)
diff --git a/wally/sensors/deploy_sensors.py b/wally/sensors/deploy_sensors.py
deleted file mode 100644
index 82ab21a..0000000
--- a/wally/sensors/deploy_sensors.py
+++ /dev/null
@@ -1,88 +0,0 @@
-import time
-import json
-import os.path
-import logging
-
-from wally.ssh_utils import (copy_paths, run_over_ssh,
- save_to_remote, read_from_remote)
-
-logger = logging.getLogger('wally.sensors')
-
-
-def wait_all_ok(futures):
- return all(future.result() for future in futures)
-
-
-def deploy_and_start_sensors(sensor_configs,
- remote_path='/tmp/sensors/sensors'):
-
- paths = {os.path.dirname(__file__): remote_path}
- with ThreadPoolExecutor(max_workers=32) as executor:
- futures = []
-
- for node_sensor_config in sensor_configs:
- futures.append(executor.submit(deploy_and_start_sensor,
- paths,
- node_sensor_config,
- remote_path))
-
- if not wait_all_ok(futures):
- raise RuntimeError("Sensor deployment fails on some nodes")
-
-
-def deploy_and_start_sensor(paths, node_sensor_config, remote_path):
- try:
- copy_paths(node_sensor_config.conn, paths)
- with node_sensor_config.conn.open_sftp() as sftp:
- config_remote_path = os.path.join(remote_path, "conf.json")
-
- sensors_config = node_sensor_config.sensors.copy()
- sensors_config['source_id'] = node_sensor_config.source_id
- with sftp.open(config_remote_path, "w") as fd:
- fd.write(json.dumps(sensors_config))
-
- cmd_templ = 'env PYTHONPATH="{0}" python -m ' + \
- "sensors.main -d start -u {1} {2}"
-
- cmd = cmd_templ.format(os.path.dirname(remote_path),
- node_sensor_config.monitor_url,
- config_remote_path)
-
- run_over_ssh(node_sensor_config.conn, cmd,
- node=node_sensor_config.url)
-
- except:
- msg = "During deploing sensors on {0}".format(node_sensor_config.url)
- logger.exception(msg)
- return False
- return True
-
-
-def stop_and_remove_sensor(conn, url, remote_path):
- try:
- cmd = 'env PYTHONPATH="{0}" python -m sensors.main -d stop'
- cmd = cmd.format(remote_path)
- run_over_ssh(conn, cmd, node=url)
- # some magic
- time.sleep(0.3)
-
- # logger.warning("Sensors don't removed")
- run_over_ssh(conn, "rm -rf {0}".format(remote_path),
- node=url, timeout=10)
- except Exception as exc:
- msg = "Failed to remove sensors from node {0}: {1!s}"
- logger.error(msg.format(url, exc))
-
-
-def stop_and_remove_sensors(configs, remote_path='/tmp/sensors'):
- with ThreadPoolExecutor(max_workers=32) as executor:
- futures = []
-
- for node_sensor_config in configs:
- futures.append(executor.submit(stop_and_remove_sensor,
- node_sensor_config.conn,
- node_sensor_config.url,
- remote_path))
-
- wait(futures)
- logger.debug("Sensors stopped and removed")
diff --git a/wally/ssh_utils.py b/wally/ssh_utils.py
index 1e8b647..7085be4 100644
--- a/wally/ssh_utils.py
+++ b/wally/ssh_utils.py
@@ -387,11 +387,6 @@
return True
except OSError:
return False
- # try:
- # sftp.stat("/proc/{0}".format(pid))
- # return True
- # except (OSError, IOError, NameError):
- # return False
def kill(self, soft=True, use_sudo=True):
assert self.pid is not None
@@ -415,6 +410,11 @@
soft_end_of_wait_time = soft_timeout + time.time()
time_till_check = random.randint(5, 10)
+ time_till_first_check = random.randint(2, 6)
+ time.sleep(time_till_first_check)
+ if not self.check_running():
+ return True
+
while self.check_running() and time.time() < soft_end_of_wait_time:
time.sleep(soft_end_of_wait_time - time.time())
diff --git a/wally/start_vms.py b/wally/start_vms.py
index 3ab4383..c533162 100644
--- a/wally/start_vms.py
+++ b/wally/start_vms.py
@@ -5,7 +5,7 @@
import logging
import subprocess
-from concurrent.futures import ThreadPoolExecutor, wait
+from concurrent.futures import ThreadPoolExecutor
from novaclient.exceptions import NotFound
from novaclient.client import Client as n_client
diff --git a/wally/suits/io/__init__.py b/wally/suits/io/__init__.py
index 4828850..afb6ac1 100644
--- a/wally/suits/io/__init__.py
+++ b/wally/suits/io/__init__.py
@@ -73,9 +73,6 @@
self.config_params = self.options.get('params', {}).copy()
self.tool = self.options.get('tool', 'fio')
- raw_res = os.path.join(self.log_directory, "raw_results.txt")
- self.fio_raw_results_file = open_for_append_or_create(raw_res)
-
self.io_py_remote = self.join_remote("agent.py")
self.results_file = self.join_remote("results.json")
self.pid_file = self.join_remote("pid")
@@ -89,11 +86,6 @@
split_on_names=self.test_logging)
self.fio_configs = list(self.fio_configs)
- cmd_log = os.path.join(self.log_directory, "task_compiled.cfg")
- fio_command_file = open_for_append_or_create(cmd_log)
- splitter = "\n\n" + "-" * 60 + "\n\n"
- fio_command_file.write(splitter.join(map(str, self.fio_configs)))
-
def __str__(self):
return "{0}({1})".format(self.__class__.__name__,
self.node.get_conn_id())
@@ -227,7 +219,7 @@
logger.info("Will run tests: " + ", ".join(msgs))
nolog = (pos != 0) or not self.is_primary
- out_err, interval = self.do_run(barrier, fio_cfg_slice,
+ out_err, interval = self.do_run(barrier, fio_cfg_slice, pos,
nolog=nolog)
try:
@@ -270,9 +262,10 @@
finally:
barrier.exit()
- def do_run(self, barrier, cfg_slice, nolog=False):
+ def do_run(self, barrier, cfg_slice, pos, nolog=False):
# return open("/tmp/lit-sunshine/io/results.json").read(), (1, 2)
conn_id = self.node.get_conn_id()
+ fconn_id = conn_id.replace(":", "_")
cmd_templ = "fio --output-format=json --output={1} " + \
"--alloc-size=262144 {0}"
@@ -284,6 +277,10 @@
with self.node.connection.open_sftp() as sftp:
save_to_remote(sftp, self.task_file, task_fc)
+ fname = "{0}_{1}.fio".format(pos, fconn_id)
+ with open(os.path.join(self.log_directory, fname), "w") as fd:
+ fd.write(task_fc)
+
cmd = cmd_templ.format(self.task_file, self.results_file)
exec_time = sum(map(execution_time, cfg_slice))
@@ -305,6 +302,7 @@
end_dt.strftime("%H:%M:%S"),
wait_till.strftime("%H:%M:%S")))
+ self.run_over_ssh("cd " + os.path.dirname(self.task_file), nolog=True)
task = BGSSHTask(self.node, self.options.get("use_sudo", True))
begin = time.time()
task.start(cmd)
@@ -314,8 +312,40 @@
if not nolog:
logger.debug("Test on node {0} is finished".format(conn_id))
+ log_files = set()
+ for cfg in cfg_slice:
+ if 'write_lat_log' in cfg.vals:
+ fname = cfg.vals['write_lat_log']
+ log_files.add(fname + '_clat.log')
+ log_files.add(fname + '_lat.log')
+ log_files.add(fname + '_slat.log')
+
+ if 'write_iops_log' in cfg.vals:
+ fname = cfg.vals['write_iops_log']
+ log_files.add(fname + '_iops.log')
+
with self.node.connection.open_sftp() as sftp:
- return read_from_remote(sftp, self.results_file), (begin, end)
+ result = read_from_remote(sftp, self.results_file)
+ sftp.remove(self.results_file)
+
+ fname = "{0}_{1}.json".format(pos, fconn_id)
+ with open(os.path.join(self.log_directory, fname), "w") as fd:
+ fd.write(result)
+
+ for fname in log_files:
+ try:
+ fc = read_from_remote(sftp, fname)
+ except:
+ continue
+ sftp.remove(fname)
+
+ loc_fname = "{0}_{1}_{2}".format(pos, fconn_id,
+ fname.split('_')[-1])
+ loc_path = os.path.join(self.log_directory, loc_fname)
+ with open(loc_path, "w") as fd:
+ fd.write(fc)
+
+ return result, (begin, end)
@classmethod
def merge_results(cls, results):
diff --git a/wally/suits/io/check_linearity.cfg b/wally/suits/io/check_linearity.cfg
index f7c37fb..6af5fcc 100644
--- a/wally/suits/io/check_linearity.cfg
+++ b/wally/suits/io/check_linearity.cfg
@@ -6,6 +6,7 @@
size={TEST_FILE_SIZE}
ramp_time=5
runtime=30
+BLOCK_SIZES={% 512,1k,2k,4k,8k,16k,32k,128k,256k,512k,1m %}
# ---------------------------------------------------------------------
# check read and write linearity. oper_time = func(size)
diff --git a/wally/suits/io/hdd.cfg b/wally/suits/io/hdd.cfg
index 21166e5..4b242a8 100644
--- a/wally/suits/io/hdd.cfg
+++ b/wally/suits/io/hdd.cfg
@@ -7,6 +7,10 @@
NUMJOBS={% 1, 5, 10 %}
+write_lat_log=fio_log
+write_iops_log=fio_log
+log_avg_msec=500
+
size=10G
ramp_time=5
runtime=30
diff --git a/wally/suits/io/rrd.cfg b/wally/suits/io/rrd.cfg
index 89833b9..8eaffea 100644
--- a/wally/suits/io/rrd.cfg
+++ b/wally/suits/io/rrd.cfg
@@ -1,27 +1,17 @@
-[defaults]
-wait_for_previous
-group_reporting
-time_based
-buffered=0
-iodepth=1
-softrandommap=1
-filename={FILENAME}
-NUM_ROUNDS=35
+[global]
+include defaults.cfg
-# this is critical for correct results in multy-node run
-randrepeat=0
+size={TEST_FILE_SIZE}
+ramp_time=5
+runtime=40
-size=30G
-ramp_time=15
-runtime=60
+write_lat_log=fio_log
+write_iops_log=fio_log
+log_avg_msec=500
# ---------------------------------------------------------------------
-# check different thread count, direct read mode. (latency, iops) = func(th_count)
-# also check iops for randread
-# ---------------------------------------------------------------------
-[rrd_test_{TEST_SUMM} * {NUM_ROUNDS}]
+[rws_{TEST_SUMM}]
blocksize=4k
-rw=randread
-direct=1
-numjobs=5
+rw=randwrite
+sync=1
diff --git a/wally/suits/io/verify.cfg b/wally/suits/io/verify.cfg
index 9ebfad1..b3162eb 100644
--- a/wally/suits/io/verify.cfg
+++ b/wally/suits/io/verify.cfg
@@ -1,18 +1,28 @@
[global]
include defaults.cfg
-size=50G
-ramp_time=5
-runtime=60
-NUM_ROUNDS=2
-# ---------------------------------------------------------------------
-#[verify_{TEST_SUMM}]
-#blocksize=4k
-#rw=randwrite
-#direct=1
+size={TEST_FILE_SIZE}
+ramp_time=5
+runtime=10
+
+write_lat_log=fio_log
+write_iops_log=fio_log
+log_avg_msec=500
# ---------------------------------------------------------------------
[verify_{TEST_SUMM}]
blocksize=4k
-rw=randread
+rw=randwrite
direct=1
+
+# ---------------------------------------------------------------------
+# [verify_{TEST_SUMM}]
+# blocksize=4k
+# rw=randwrite
+# sync=1
+
+# ---------------------------------------------------------------------
+# [verify_{TEST_SUMM}]
+# blocksize=4k
+# rw=randread
+# direct=1
diff --git a/wally/utils.py b/wally/utils.py
index cdee319..d94d333 100644
--- a/wally/utils.py
+++ b/wally/utils.py
@@ -184,7 +184,8 @@
rr = proc.communicate(input_data)
res.extend(rr)
- thread = threading.Thread(target=thread_func)
+ thread = threading.Thread(target=thread_func,
+ name="Local cmd execution")
thread.daemon = True
thread.start()
thread.join(timeout)