a lot of changes
diff --git a/TODO b/TODO
index a2ea785..84b8c5e 100644
--- a/TODO
+++ b/TODO
@@ -1,4 +1,9 @@
Finding bottlenecks (алена) - починить процессор
+unified_rw_reporting=1
+fadvise_hint=0
+Изменить с репорте сенсоров все на %
+Добавить в репорт количество операций
+посмотреть что с сетевыми картами
Resource consumption:
добавить процессор,
добавить время в IO,
diff --git a/report_templates/report_ceph.html b/report_templates/report_ceph.html
index fb9b875..d597ced 100644
--- a/report_templates/report_ceph.html
+++ b/report_templates/report_ceph.html
@@ -70,11 +70,11 @@
</div>
<center><br>
<table><tr>
- <td><img src="charts/rand_read_4k.png" /></td>
- <td><img src="charts/rand_write_4k.png" /></td>
+ <td><img src="charts/rand_read_4k.{img_ext}" /></td>
+ <td><img src="charts/rand_write_4k.{img_ext}" /></td>
</tr><tr>
- <td><img src="charts/rand_read_16m.png" /></td>
- <td><img src="charts/rand_write_16m.png" /></td>
+ <td><img src="charts/rand_read_16m.{img_ext}" /></td>
+ <td><img src="charts/rand_write_16m.{img_ext}" /></td>
</tr></table>
</center>
</center>
diff --git a/report_templates/report_hdd.html b/report_templates/report_hdd.html
index 40fadcc..ea04eb5 100644
--- a/report_templates/report_hdd.html
+++ b/report_templates/report_hdd.html
@@ -72,10 +72,10 @@
</div>
<div class="row">
<div class="col-md-6">
- <img src="charts/rand_read_4k.png"/>
+ <img src="charts/rand_read_4k.{img_ext}"/>
</div>
<div class="col-md-6">
- <img src="charts/rand_write_4k.png"/>
+ <img src="charts/rand_write_4k.{img_ext}"/>
</div>
</div>
<!--div class="row">
diff --git a/scripts/fio_tests_configs/io_task.cfg b/scripts/fio_tests_configs/io_task.cfg
index 38d0be4..821da37 100644
--- a/scripts/fio_tests_configs/io_task.cfg
+++ b/scripts/fio_tests_configs/io_task.cfg
@@ -1,13 +1,14 @@
[hdd_test]
blocksize=4k
-rw=randwrite
-sync=1
-group_reporting
-time_based
+rw=randread
+group_reporting=1
buffered=0
iodepth=1
-filename=/tmp/xxx
-ramp_time=5
-size=10Gb
-runtime=30
+direct=1
+filename=/tmp/t/x
+size=1m
+numjobs=4
+ioengine=null
+randrepeat=0
+# io_size=4g
diff --git a/scripts/postprocessing/bottleneck.py b/scripts/postprocessing/bottleneck.py
index a257d13..cfe9df2 100644
--- a/scripts/postprocessing/bottleneck.py
+++ b/scripts/postprocessing/bottleneck.py
@@ -1,12 +1,16 @@
""" Analize test results for finding bottlenecks """
import sys
+import csv
+import time
+import bisect
import os.path
import argparse
import collections
import yaml
+import texttable
from wally.utils import b2ssize
@@ -20,48 +24,124 @@
self.values = values # [((dev, sensor), value)]
-def load_results(fd):
- res = []
- source_id2nostname = {}
-
- for line in fd:
- line = line.strip()
- if line != "":
- _, data = eval(line)
- ctime = data.pop('time')
- source_id = data.pop('source_id')
- hostname = data.pop('hostname')
-
- data = [(k.split('.'), v) for k, v in data.items()]
-
- sd = SensorsData(source_id, hostname, ctime, data)
- res.append((ctime, sd))
- source_id2nostname[source_id] = hostname
-
- res.sort(key=lambda x: x[0])
- return res, source_id2nostname
-
-
-critical_values = dict(
- io_queue=1,
- mem_usage_percent=0.8)
-
-
class SensorInfo(object):
def __init__(self, name, native_ext, to_bytes_coef):
self.name = name
self.native_ext = native_ext
self.to_bytes_coef = to_bytes_coef
-SINFO = [
+_SINFO = [
SensorInfo('recv_bytes', 'B', 1),
SensorInfo('send_bytes', 'B', 1),
SensorInfo('sectors_written', 'Sect', 512),
SensorInfo('sectors_read', 'Sect', 512),
]
+SINFO_MAP = dict((sinfo.name, sinfo) for sinfo in _SINFO)
+to_bytes = dict((sinfo.name, sinfo.to_bytes_coef) for sinfo in _SINFO)
-SINFO_MAP = dict((sinfo.name, sinfo) for sinfo in SINFO)
+
+def load_results(fd):
+ data = fd.read(100)
+ fd.seek(0, os.SEEK_SET)
+
+ # t = time.time()
+ if '(' in data or '{' in data:
+ res, source_id2nostname = load_results_eval(fd)
+ else:
+ res, source_id2nostname = load_results_csv(fd)
+
+ # print int(((time.time() - t) * 1000000) / len(res)), len(res)
+
+ return res, source_id2nostname
+
+
+def load_results_csv(fd):
+
+ fields = {}
+ res = []
+ source_id2nostname = {}
+ coefs = {}
+
+ # cached for performance
+ ii = int
+ zz = zip
+ SD = SensorsData
+ ra = res.append
+
+ for row in csv.reader(fd):
+ if len(row) == 0:
+ continue
+ ip, port = row[:2]
+ ip_port = (ip, ii(port))
+
+ if ip_port not in fields:
+ sensors = [i.split('.') for i in row[4:]]
+ fields[ip_port] = row[2:4] + sensors
+ source_id2nostname[row[2]] = row[3]
+ coefs[ip_port] = [to_bytes.get(s[1], 1) for s in sensors]
+ else:
+ fld = fields[ip_port]
+ processed_data = []
+ a = processed_data.append
+
+ # this cycle is critical for performance
+ # don't "refactor" it, unles you are confident
+ # in what you are doing
+ for dev_sensor, val, coef in zz(fld[2:], row[3:], coefs[ip_port]):
+ a((dev_sensor, ii(val) * coef))
+
+ ctime = ii(row[2])
+ sd = SD(fld[0], fld[1], ctime, processed_data)
+ ra((ctime, sd))
+
+ res.sort(key=lambda x: x[0])
+ return res, source_id2nostname
+
+
+def load_results_eval(fd):
+ res = []
+ source_id2nostname = {}
+
+ for line in fd:
+ if line.strip() == "":
+ continue
+
+ _, data = eval(line)
+ ctime = data.pop('time')
+ source_id = data.pop('source_id')
+ hostname = data.pop('hostname')
+
+ processed_data = []
+ for k, v in data.items():
+ dev, sensor = k.split('.')
+ processed_data.append(((dev, sensor),
+ v * to_bytes.get(sensor, 1)))
+
+ sd = SensorsData(source_id, hostname, ctime, processed_data)
+ res.append((ctime, sd))
+ source_id2nostname[source_id] = hostname
+
+ res.sort(key=lambda x: x[0])
+ return res, source_id2nostname
+
+
+def load_test_timings(fd):
+ result = {} # test name - [(start_time, finish_time)]
+ data = yaml.load(fd.read())
+ assert len(data) == 1
+ test_type, test_data = data[0]
+ assert test_type == 'io'
+ for test_names, interval in test_data['__meta__']['timings']:
+ assert len(set(test_names)) == 1
+ if test_names[0] not in result:
+ result[test_names[0]] = interval
+ return result
+
+
+critical_values = dict(
+ io_queue=1,
+ mem_usage_percent=0.8)
class AggregatedData(object):
@@ -152,23 +232,22 @@
return "\n".join(table)
-def print_consumption(agg, roles, min_transfer=0):
+def print_consumption(agg, min_transfer=None):
rev_items = []
for (node_or_role, dev), v in agg.all_together.items():
rev_items.append((int(v), node_or_role + ':' + dev))
res = sorted(rev_items, reverse=True)
- sinfo = SINFO_MAP[agg.sensor_name]
- if sinfo.to_bytes_coef is not None:
+ if min_transfer is not None:
res = [(v, k)
for (v, k) in res
- if v * sinfo.to_bytes_coef >= min_transfer]
+ if v >= min_transfer]
if len(res) == 0:
return None
- res = [(b2ssize(v) + sinfo.native_ext, k) for (v, k) in res]
+ res = [(b2ssize(v) + "B", k) for (v, k) in res]
max_name_sz = max(len(name) for _, name in res)
max_val_sz = max(len(val) for val, _ in res)
@@ -227,12 +306,18 @@
max_data = 0
for sensor_name, agg in consumption.items():
if sensor_name in SINFO_MAP:
- tb = SINFO_MAP[sensor_name].to_bytes_coef
- if tb is not None:
- max_data = max(max_data, agg.per_role.get('testnode', 0) * tb)
+ max_data = max(max_data, agg.per_role.get('testnode', 0))
return max_data
+def get_data_for_interval(data, interval):
+ begin, end = interval
+ times = [ctime for ctime, _ in data]
+ b_p = bisect.bisect_left(times, begin)
+ e_p = bisect.bisect_right(times, end)
+ return data[b_p:e_p]
+
+
def main(argv):
opts = parse_args(argv)
@@ -242,8 +327,11 @@
roles_file = os.path.join(opts.results_folder,
'nodes.yaml')
- src2roles = yaml.load(open(roles_file))
+ raw_results_file = os.path.join(opts.results_folder,
+ 'raw_results.yaml')
+ src2roles = yaml.load(open(roles_file))
+ timings = load_test_timings(open(raw_results_file))
with open(sensors_data_fname) as fd:
data, source_id2hostname = load_results(fd)
@@ -252,23 +340,62 @@
# print print_bottlenecks(data, opts.max_bottlenek)
# print print_bottlenecks(data, opts.max_bottlenek)
- consumption = total_consumption(data, roles_map)
+ for name, interval in sorted(timings.items()):
+ print
+ print
+ print "-" * 30 + " " + name + " " + "-" * 30
+ print
- testdata_sz = get_testdata_size(consumption) // 1024
- for name in ('recv_bytes', 'send_bytes',
- 'sectors_read', 'sectors_written'):
- table = print_consumption(consumption[name], roles_map, testdata_sz)
- if table is None:
- print "Consumption of", name, "is negligible"
- else:
- ln = max(map(len, table.split('\n')))
- print '-' * ln
- print name.center(ln)
- print '-' * ln
- print table
- print '-' * ln
- print
+ data_chunk = get_data_for_interval(data, interval)
+ consumption = total_consumption(data_chunk, roles_map)
+
+ testdata_sz = get_testdata_size(consumption) // 100
+
+ fields = ('recv_bytes', 'send_bytes',
+ 'sectors_read', 'sectors_written')
+ per_consumer_table = {}
+
+ all_consumers = set(consumption.values()[0].all_together)
+ all_consumers_sum = []
+
+ for consumer in all_consumers:
+ tb = per_consumer_table[consumer] = []
+ vl = 0
+ for name in fields:
+ val = consumption[name].all_together[consumer]
+ if val < testdata_sz:
+ val = 0
+ vl += int(val)
+ tb.append(b2ssize(int(val)) + "B")
+ all_consumers_sum.append((vl, consumer))
+
+ all_consumers_sum.sort(reverse=True)
+ tt = texttable.Texttable(max_width=130)
+ tt.set_cols_align(["l"] + ["r"] * len(fields))
+ tt.header(["Name"] + list(fields))
+
+ for summ, consumer in all_consumers_sum:
+ if summ > 0:
+ tt.add_row([".".join(consumer)] +
+ [v if v != '0B' else '-'
+ for v in per_consumer_table[consumer]])
+
+ tt.set_deco(texttable.Texttable.VLINES | texttable.Texttable.HEADER)
+ print tt.draw()
+
+ # if name in consumption:
+ # table = print_consumption(consumption[name], testdata_sz)
+ # if table is None:
+ # print "Consumption of", name, "is negligible"
+ # else:
+ # ln = max(map(len, table.split('\n')))
+ # print '-' * ln
+ # print name.center(ln)
+ # print '-' * ln
+ # print table
+ # print '-' * ln
+ # print
if __name__ == "__main__":
exit(main(sys.argv))
diff --git a/wally/charts.py b/wally/charts.py
index 828438c..b4472a4 100644
--- a/wally/charts.py
+++ b/wally/charts.py
@@ -122,8 +122,7 @@
bar.legend(*legend)
bar.scale(*scale)
- img_name = file_name + ".png"
- img_path = os.path.join(cfg_dict['charts_img_path'], img_name)
+ img_path = file_name + ".png"
if not os.path.exists(img_path):
bar.save(img_path)
diff --git a/wally/discover/node.py b/wally/discover/node.py
index 9161a21..a3d58f9 100644
--- a/wally/discover/node.py
+++ b/wally/discover/node.py
@@ -10,6 +10,7 @@
self.conn_url = conn_url
self.connection = None
self.monitor_ip = None
+ self.os_vm_id = None
def get_ip(self):
if self.conn_url == 'local':
diff --git a/wally/report.py b/wally/report.py
index 5b4c858..b334fa5 100644
--- a/wally/report.py
+++ b/wally/report.py
@@ -132,7 +132,7 @@
linearity_report = report('linearity', 'linearity_test')(linearity_report)
-def render_all_html(dest, info, lab_description, templ_name):
+def render_all_html(dest, info, lab_description, img_ext, templ_name):
very_root_dir = os.path.dirname(os.path.dirname(wally.__file__))
templ_dir = os.path.join(very_root_dir, 'report_templates')
templ_file = os.path.join(templ_dir, templ_name)
@@ -151,16 +151,19 @@
data['bw_write_max'] = (data['bw_write_max'][0] // 1024,
data['bw_write_max'][1])
- report = templ.format(lab_info=lab_description, **data)
+ report = templ.format(lab_info=lab_description, img_ext=img_ext,
+ **data)
open(dest, 'w').write(report)
-def render_hdd_html(dest, info, lab_description):
- render_all_html(dest, info, lab_description, "report_hdd.html")
+def render_hdd_html(dest, info, lab_description, img_ext):
+ render_all_html(dest, info, lab_description, img_ext,
+ "report_hdd.html")
-def render_ceph_html(dest, info, lab_description):
- render_all_html(dest, info, lab_description, "report_ceph.html")
+def render_ceph_html(dest, info, lab_description, img_ext):
+ render_all_html(dest, info, lab_description, img_ext,
+ "report_ceph.html")
def io_chart(title, concurence,
@@ -172,49 +175,90 @@
legend = [legend]
iops_or_bw_per_vm = []
- for i in range(len(concurence)):
- iops_or_bw_per_vm.append(iops_or_bw[i] / concurence[i])
+ for iops, conc in zip(iops_or_bw, concurence):
+ iops_or_bw_per_vm.append(iops / conc)
bar_dev_bottom = []
bar_dev_top = []
- for i in range(len(bar_data)):
- bar_dev_top.append(bar_data[i] + bar_dev[i])
- bar_dev_bottom.append(bar_data[i] - bar_dev[i])
+ for val, err in zip(bar_data, bar_dev):
+ bar_dev_top.append(val + err)
+ bar_dev_bottom.append(val - err)
- ch = charts.render_vertical_bar(title, legend, [bar_data], [bar_dev_top],
- [bar_dev_bottom], file_name=fname,
- scale_x=concurence, label_x="clients",
- label_y=legend[0],
- lines=[
- (latv, "msec", "rr", "lat"),
- # (latv_min, None, None, "lat_min"),
- # (latv_max, None, None, "lat_max"),
- (iops_or_bw_per_vm, None, None,
- legend[0] + " per client")
- ])
- return str(ch)
+ charts.render_vertical_bar(title, legend, [bar_data], [bar_dev_top],
+ [bar_dev_bottom], file_name=fname,
+ scale_x=concurence, label_x="clients",
+ label_y=legend[0],
+ lines=[
+ (latv, "msec", "rr", "lat"),
+ # (latv_min, None, None, "lat_min"),
+ # (latv_max, None, None, "lat_max"),
+ (iops_or_bw_per_vm, None, None,
+ legend[0] + " per client")
+ ])
-def make_hdd_plots(processed_results, path):
+def io_chart_mpl(title, concurence,
+ latv, latv_min, latv_max,
+ iops_or_bw, iops_or_bw_err,
+ legend, fname):
+ points = " MiBps" if legend == 'BW' else ""
+ lc = len(concurence)
+ width = 0.35
+ xt = range(1, lc + 1)
+
+ op_per_vm = [v / c for v, c in zip(iops_or_bw, concurence)]
+ fig, p1 = plt.subplots()
+ xpos = [i - width / 2 for i in xt]
+
+ p1.bar(xpos, iops_or_bw, width=width, yerr=iops_or_bw_err,
+ color='y',
+ label=legend)
+
+ p1.set_yscale('log')
+ p1.grid(True)
+ p1.plot(xt, op_per_vm, label=legend + " per vm")
+ p1.legend()
+
+ p2 = p1.twinx()
+ p2.set_yscale('log')
+ p2.plot(xt, latv_max, label="latency max")
+ p2.plot(xt, latv, label="latency avg")
+ p2.plot(xt, latv_min, label="latency min")
+
+ plt.xlim(0.5, lc + 0.5)
+ plt.xticks(xt, map(str, concurence))
+ p1.set_xlabel("Threads")
+ p1.set_ylabel(legend + points)
+ p2.set_ylabel("Latency ms")
+ plt.title(title)
+ # plt.legend(, loc=2, borderaxespad=0.)
+ # plt.legend(bbox_to_anchor=(1.05, 1), loc=2)
+ plt.legend(loc=2)
+ plt.savefig(fname, format=fname.split('.')[-1])
+
+
+def make_hdd_plots(processed_results, charts_dir):
plots = [
('hdd_test_rrd4k', 'rand_read_4k', 'Random read 4k direct IOPS'),
('hdd_test_rws4k', 'rand_write_4k', 'Random write 4k sync IOPS')
]
- make_plots(processed_results, path, plots)
+ return make_plots(processed_results, charts_dir, plots)
-def make_ceph_plots(processed_results, path):
+def make_ceph_plots(processed_results, charts_dir):
plots = [
('ceph_test_rrd4k', 'rand_read_4k', 'Random read 4k direct IOPS'),
('ceph_test_rws4k', 'rand_write_4k', 'Random write 4k sync IOPS'),
- ('ceph_test_rrd16m', 'rand_read_16m', 'Random read 16m direct MiBps'),
+ ('ceph_test_rrd16m', 'rand_read_16m',
+ 'Random read 16m direct MiBps'),
('ceph_test_rwd16m', 'rand_write_16m',
'Random write 16m direct MiBps'),
]
- make_plots(processed_results, path, plots)
+ return make_plots(processed_results, charts_dir, plots)
-def make_plots(processed_results, path, plots, max_lat=400000):
+def make_plots(processed_results, charts_dir, plots):
+ file_ext = None
for name_pref, fname, desc in plots:
chart_data = []
@@ -231,9 +275,8 @@
# if x.lat.average < max_lat]
lat = [x.lat.average / 1000 for x in chart_data]
-
- lat_min = [x.lat.min / 1000 for x in chart_data if x.lat.min < max_lat]
- lat_max = [x.lat.max / 1000 for x in chart_data if x.lat.max < max_lat]
+ lat_min = [x.lat.min / 1000 for x in chart_data]
+ lat_max = [x.lat.max / 1000 for x in chart_data]
vm_count = x.meta['testnodes_count']
concurence = [x.raw['concurence'] * vm_count for x in chart_data]
@@ -247,8 +290,16 @@
data_dev = [x.iops.confidence for x in chart_data]
name = "IOPS"
- io_chart(desc, concurence, lat, lat_min, lat_max,
- data, data_dev, name, fname)
+ fname = os.path.join(charts_dir, fname)
+ if plt is not None:
+ io_chart_mpl(desc, concurence, lat, lat_min, lat_max,
+ data, data_dev, name, fname + '.svg')
+ file_ext = 'svg'
+ else:
+ io_chart(desc, concurence, lat, lat_min, lat_max,
+ data, data_dev, name, fname + '.png')
+ file_ext = 'png'
+ return file_ext
def find_max_where(processed_results, sync_mode, blocksize, rw, iops=True):
@@ -343,20 +394,20 @@
@report('HDD', 'hdd_test_rrd4k,hdd_test_rws4k')
-def make_hdd_report(processed_results, path, lab_info):
- make_hdd_plots(processed_results, path)
+def make_hdd_report(processed_results, path, charts_path, lab_info):
+ img_ext = make_hdd_plots(processed_results, charts_path)
di = get_disk_info(processed_results)
- render_hdd_html(path, di, lab_info)
+ render_hdd_html(path, di, lab_info, img_ext)
@report('Ceph', 'ceph_test')
-def make_ceph_report(processed_results, path, lab_info):
- make_ceph_plots(processed_results, path)
+def make_ceph_report(processed_results, path, charts_path, lab_info):
+ img_ext = make_ceph_plots(processed_results, charts_path)
di = get_disk_info(processed_results)
- render_ceph_html(path, di, lab_info)
+ render_ceph_html(path, di, lab_info, img_ext)
-def make_io_report(dinfo, results, path, lab_info=None):
+def make_io_report(dinfo, results, path, charts_path, lab_info=None):
lab_info = {
"total_disk": "None",
"total_memory": "None",
@@ -378,7 +429,7 @@
else:
hpath = path.format(name)
logger.debug("Generatins report " + name + " into " + hpath)
- func(dinfo, hpath, lab_info)
+ func(dinfo, hpath, charts_path, lab_info)
break
else:
logger.warning("No report generator found for this load")
diff --git a/wally/run_test.py b/wally/run_test.py
index 72ba4cf..dc6637f 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -9,8 +9,8 @@
import argparse
import functools
import threading
-import contextlib
import subprocess
+import contextlib
import collections
import yaml
@@ -152,6 +152,10 @@
test_nodes = [node for node in nodes
if 'testnode' in node.roles]
+ if len(test_nodes) == 0:
+ logger.error("No test nodes found")
+ return
+
test_number_per_type = {}
res_q = Queue.Queue()
@@ -257,7 +261,9 @@
if node.connection is None:
if 'testnode' in node.roles:
msg = "Can't connect to testnode {0}"
- raise RuntimeError(msg.format(node.get_conn_id()))
+ msg = msg.format(node.get_conn_id())
+ logger.error(msg)
+ raise utils.StopTestError(msg)
else:
msg = "Node {0} would be excluded - can't connect"
logger.warning(msg.format(node.get_conn_id()))
@@ -308,9 +314,12 @@
os_creds = get_OS_credentials(cfg, ctx, "clouds")
conn = start_vms.nova_connect(**os_creds)
- for ip in start_vms.find_vms(conn, vm_name_pattern):
+ for ip, vm_id in start_vms.find_vms(conn, vm_name_pattern):
node = Node(conn_pattern.format(ip=ip), ['testnode'])
+ node.os_vm_id = vm_id
ctx.nodes.append(node)
+ except utils.StopTestError:
+ raise
except Exception as exc:
msg = "Vm like {0} lookup failed".format(vm_name_pattern)
logger.exception(msg)
@@ -345,33 +354,27 @@
echo = 'echo "$OS_TENANT_NAME:$OS_USERNAME:$OS_PASSWORD@$OS_AUTH_URL"'
- p = subprocess.Popen(['/bin/bash'], shell=False,
- stdout=subprocess.PIPE,
- stdin=subprocess.PIPE,
- stderr=subprocess.STDOUT)
- p.stdin.write(fc + "\n" + echo)
- p.stdin.close()
- code = p.wait()
- data = p.stdout.read().strip()
-
- if code != 0:
+ try:
+ data = utils.run_locally(['/bin/bash'], input=fc + "\n" + echo)
+ except subprocess.CalledProcessError as exc:
msg = "Failed to get creads from openrc file: " + data
- logger.error(msg)
- raise RuntimeError(msg)
+ logger.exception(msg)
+ raise utils.StopTestError(msg, exc)
try:
user, tenant, passwd_auth_url = data.split(':', 2)
passwd, auth_url = passwd_auth_url.rsplit("@", 1)
assert (auth_url.startswith("https://") or
auth_url.startswith("http://"))
- except Exception:
+ except Exception as exc:
msg = "Failed to get creads from openrc file: " + data
logger.exception(msg)
- raise
+ raise utils.StopTestError(msg, exc)
else:
msg = "Creds {0!r} isn't supported".format(creds_type)
- raise ValueError(msg)
+ logger.error(msg)
+ raise utils.StopTestError(msg, None)
if creds is None:
creds = {'name': user,
@@ -385,7 +388,7 @@
@contextlib.contextmanager
-def create_vms_ctx(ctx, cfg, config):
+def create_vms_ctx(ctx, cfg, config, already_has_count=0):
params = cfg['vm_configs'][config['cfg_name']].copy()
os_nodes_ids = []
@@ -398,11 +401,13 @@
params['keypair_file_private'] = params['keypair_name'] + ".pem"
params['group_name'] = cfg_dict['run_uuid']
- start_vms.prepare_os_subpr(params=params, **os_creds)
+ if not config.get('skip_preparation', False):
+ start_vms.prepare_os_subpr(params=params, **os_creds)
new_nodes = []
try:
- for new_node, node_id in start_vms.launch_vms(params):
+ for new_node, node_id in start_vms.launch_vms(params,
+ already_has_count):
new_node.roles.append('testnode')
ctx.nodes.append(new_node)
os_nodes_ids.append(node_id)
@@ -435,7 +440,12 @@
logger.error(msg)
raise utils.StopTestError(msg)
- with create_vms_ctx(ctx, cfg, config['openstack']) as new_nodes:
+ num_test_nodes = sum(1 for node in ctx.nodes
+ if 'testnode' in node.roles)
+
+ vm_ctx = create_vms_ctx(ctx, cfg, config['openstack'],
+ num_test_nodes)
+ with vm_ctx as new_nodes:
connect_all(new_nodes, True)
for node in new_nodes:
@@ -534,6 +544,7 @@
found = True
dinfo = report.process_disk_info(data)
report.make_io_report(dinfo, data, html_rep_fname,
+ cfg['charts_img_path'],
lab_info=ctx.hw_info)
text_rep_fname = cfg_dict['text_report_file']
@@ -599,7 +610,7 @@
help="Skip html report", default=False)
parser.add_argument("--params", metavar="testname.paramname",
help="Test params", default=[])
- parser.add_argument("--reuse-vms", default=None, metavar="vm_name_prefix")
+ parser.add_argument("--reuse-vms", default=[], nargs='*')
parser.add_argument("config_file")
return parser.parse_args(argv[1:])
@@ -618,8 +629,8 @@
discover_stage
]
- if opts.reuse_vms is not None:
- pref, ssh_templ = opts.reuse_vms.split(',', 1)
+ for reuse_param in opts.reuse_vms:
+ pref, ssh_templ = reuse_param.split(',', 1)
stages.append(reuse_vms_stage(pref, ssh_templ))
stages.extend([
diff --git a/wally/sensors_utils.py b/wally/sensors_utils.py
index 81a2832..9350349 100644
--- a/wally/sensors_utils.py
+++ b/wally/sensors_utils.py
@@ -1,3 +1,4 @@
+import csv
import time
import Queue
import logging
@@ -18,8 +19,11 @@
fd.write("\n")
observed_nodes = set()
+ fields_list_for_nodes = {}
+ required_keys = set(['time', 'source_id', 'hostname'])
try:
+ csv_fd = csv.writer(fd)
while True:
val = data_q.get()
if val is None:
@@ -29,9 +33,20 @@
if addr not in observed_nodes:
mon_q.put(addr + (data['source_id'],))
observed_nodes.add(addr)
+ keys = set(data)
+ assert required_keys.issubset(keys)
+ keys -= required_keys
- fd.write(repr((addr, data)) + "\n")
+ fields_list_for_nodes[addr] = sorted(keys)
+ csv_fd.writerow([addr[0], addr[1],
+ data['source_id'], data['hostname']] +
+ fields_list_for_nodes[addr])
+ csv_fd.writerow([addr[0], addr[1]] +
+ map(data.__getitem__,
+ ['time'] + fields_list_for_nodes[addr]))
+
+ # fd.write(repr((addr, data)) + "\n")
# source_id = data.pop('source_id')
# rep_time = data.pop('time')
# if 'testnode' in source2roles_map.get(source_id, []):
diff --git a/wally/start_vms.py b/wally/start_vms.py
index 4e0698c..0bd2b9a 100644
--- a/wally/start_vms.py
+++ b/wally/start_vms.py
@@ -115,7 +115,7 @@
for ips in srv.addresses.values():
for ip in ips:
if ip.get("OS-EXT-IPS:type", None) == 'floating':
- yield ip['addr']
+ yield ip['addr'], srv.id
break
@@ -250,15 +250,21 @@
return [ip for ip in ip_list if ip.instance_id is None][:amount]
-def launch_vms(params):
+def launch_vms(params, already_has_count=0):
logger.debug("Starting new nodes on openstack")
count = params['count']
+ lst = NOVA_CONNECTION.services.list(binary='nova-compute')
+ srv_count = len([srv for srv in lst if srv.status == 'enabled'])
if isinstance(count, basestring):
- assert count.startswith("x")
- lst = NOVA_CONNECTION.services.list(binary='nova-compute')
- srv_count = len([srv for srv in lst if srv.status == 'enabled'])
- count = srv_count * int(count[1:])
+ if count.startswith("x"):
+ count = srv_count * int(count[1:])
+ else:
+ assert count.startswith('=')
+ count = int(count[1:]) - already_has_count
+
+ if count <= 0:
+ return
assert isinstance(count, (int, long))
@@ -291,6 +297,13 @@
yield Node(conn_uri, []), os_node.id
+def get_free_server_grpoups(nova, template=None):
+ for g in nova.server_groups.list():
+ if g.members == []:
+ if re.match(template, g.name):
+ yield str(g.name)
+
+
def create_vms_mt(nova, amount, group_name, keypair_name, img_name,
flavor_name, vol_sz=None, network_zone_name=None,
flt_ip_pool=None, name_templ='wally-{id}',
@@ -336,14 +349,21 @@
orig_scheduler_hints = scheduler_hints.copy()
- for idx, (name, flt_ip) in enumerate(zip(names, ips)):
+ MAX_SHED_GROUPS = 32
+ for start_idx in range(MAX_SHED_GROUPS):
+ pass
+
+ group_name_template = scheduler_hints['group'].format("\\d+")
+ groups = list(get_free_server_grpoups(nova, group_name_template + "$"))
+ groups.sort()
+
+ for idx, (name, flt_ip) in enumerate(zip(names, ips), 2):
scheduler_hints = None
if orig_scheduler_hints is not None and sec_group_size is not None:
if "group" in orig_scheduler_hints:
scheduler_hints = orig_scheduler_hints.copy()
- scheduler_hints['group'] = \
- scheduler_hints['group'].format(idx // sec_group_size)
+ scheduler_hints['group'] = groups[idx // sec_group_size]
if scheduler_hints is None:
scheduler_hints = orig_scheduler_hints.copy()
diff --git a/wally/suits/io/agent.py b/wally/suits/io/agent.py
index f6c3308..57ba229 100644
--- a/wally/suits/io/agent.py
+++ b/wally/suits/io/agent.py
@@ -293,15 +293,24 @@
def slice_config(sec_iter, runcycle=None, max_jobs=1000,
- soft_runcycle=None):
+ soft_runcycle=None, split_on_names=False):
jcount = 0
runtime = 0
curr_slice = []
prev_name = None
for pos, sec in enumerate(sec_iter):
- if soft_runcycle is not None and prev_name != sec.name:
- if runtime > soft_runcycle:
+
+ if prev_name is not None:
+ split_here = False
+
+ if soft_runcycle is not None and prev_name != sec.name:
+ split_here = (runtime > soft_runcycle)
+
+ if split_on_names and prev_name != sec.name:
+ split_here = True
+
+ if split_here:
yield curr_slice
curr_slice = []
runtime = 0
diff --git a/wally/suits/io/ceph.cfg b/wally/suits/io/ceph.cfg
index 5593181..a10adfb 100644
--- a/wally/suits/io/ceph.cfg
+++ b/wally/suits/io/ceph.cfg
@@ -1,12 +1,16 @@
[defaults]
-wait_for_previous
-group_reporting
-time_based
+wait_for_previous=1
+group_reporting=1
+time_based=1
buffered=0
iodepth=1
softrandommap=1
filename={FILENAME}
NUM_ROUNDS=7
+thread=1
+
+# this is critical for correct results in multy-node run
+randrepeat=0
NUMJOBS={% 1, 5, 10, 15, 40 %}
NUMJOBS_SHORT={% 1, 2, 3, 10 %}
diff --git a/wally/suits/io/rrd.cfg b/wally/suits/io/rrd.cfg
index 5593181..3f4c074 100644
--- a/wally/suits/io/rrd.cfg
+++ b/wally/suits/io/rrd.cfg
@@ -6,50 +6,19 @@
iodepth=1
softrandommap=1
filename={FILENAME}
-NUM_ROUNDS=7
-
-NUMJOBS={% 1, 5, 10, 15, 40 %}
-NUMJOBS_SHORT={% 1, 2, 3, 10 %}
+NUM_ROUNDS=35
size=30G
ramp_time=15
runtime=60
# ---------------------------------------------------------------------
-# check different thread count, sync mode. (latency, iops) = func(th_count)
-# ---------------------------------------------------------------------
-[ceph_test_{TEST_SUMM} * {NUM_ROUNDS}]
-blocksize=4k
-rw=randwrite
-sync=1
-numjobs={NUMJOBS}
-
-# ---------------------------------------------------------------------
-# direct write
-# ---------------------------------------------------------------------
-[ceph_test_{TEST_SUMM} * {NUM_ROUNDS}]
-blocksize=4k
-rw=randwrite
-direct=1
-numjobs=1
-
-# ---------------------------------------------------------------------
# check different thread count, direct read mode. (latency, iops) = func(th_count)
# also check iops for randread
# ---------------------------------------------------------------------
-[ceph_test_{TEST_SUMM} * {NUM_ROUNDS}]
+[rrd_test_{TEST_SUMM} * {NUM_ROUNDS}]
blocksize=4k
rw=randread
direct=1
-numjobs={NUMJOBS}
-
-# ---------------------------------------------------------------------
-# this is essentially sequential write/read operations
-# we can't use sequential with numjobs > 1 due to caching and block merging
-# ---------------------------------------------------------------------
-[ceph_test_{TEST_SUMM} * {NUM_ROUNDS}]
-blocksize=16m
-rw={% randread, randwrite %}
-direct=1
-numjobs={NUMJOBS_SHORT}
+numjobs=5
diff --git a/wally/suits/io/verify.cfg b/wally/suits/io/verify.cfg
index 4a66aac..58b8450 100644
--- a/wally/suits/io/verify.cfg
+++ b/wally/suits/io/verify.cfg
@@ -8,31 +8,21 @@
filename={FILENAME}
NUM_ROUNDS=1
-size=5G
+# this is critical for correct results in multy-node run
+randrepeat=0
+
+size=50G
ramp_time=5
-runtime=360
+runtime=60
# ---------------------------------------------------------------------
-# check different thread count, sync mode. (latency, iops) = func(th_count)
+[verify_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize=4k
+rw=randwrite
+direct=1
+
# ---------------------------------------------------------------------
-[verify_{TEST_SUMM}]
-blocksize=4m
+[verify_{TEST_SUMM} * {NUM_ROUNDS}]
+blocksize=4k
rw=randread
direct=1
-numjobs=5
-
-# ---------------------------------------------------------------------
-# check different thread count, sync mode. (latency, iops) = func(th_count)
-# ---------------------------------------------------------------------
-# [verify_{TEST_SUMM}]
-# blocksize=4k
-# rw=randwrite
-# direct=1
-
-# ---------------------------------------------------------------------
-# direct write
-# ---------------------------------------------------------------------
-# [verify_{TEST_SUMM}]
-# blocksize=4k
-# rw=randread
-# direct=1
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index f0a1e8d..dd52f33 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -189,7 +189,8 @@
self.fio_configs = io_agent.parse_and_slice_all_in_1(
self.raw_cfg,
self.config_params,
- soft_runcycle=soft_runcycle)
+ soft_runcycle=soft_runcycle,
+ split_on_names=self.test_logging)
self.fio_configs = list(self.fio_configs)
splitter = "\n\n" + "-" * 60 + "\n\n"
diff --git a/wally/utils.py b/wally/utils.py
index d5d6f48..3792ba4 100644
--- a/wally/utils.py
+++ b/wally/utils.py
@@ -1,5 +1,6 @@
import re
import os
+import time
import socket
import logging
import threading
@@ -129,6 +130,28 @@
return "{0}{1}i".format(size // scale, name)
+def run_locally(cmd, input_data="", timeout=20):
+ shell = isinstance(cmd, basestring)
+
+ proc = subprocess.Popen(cmd,
+ shell=shell,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+
+ end_time = time.time() + timeout
+
+ while end_time > time.time():
+ if proc.poll() is None:
+ time.sleep(1)
+
+ out, err = proc.communicate()
+
+ if 0 != proc.returncode:
+ raise subprocess.CalledProcessError(proc.returncode, cmd, out + err)
+
+ return out
+
+
def get_ip_for_target(target_ip):
if not is_ip(target_ip):
target_ip = socket.gethostbyname(target_ip)
@@ -137,8 +160,7 @@
if first_dig == 127:
return '127.0.0.1'
- cmd = 'ip route get to'.split(" ") + [target_ip]
- data = subprocess.Popen(cmd, stdout=subprocess.PIPE).stdout.read()
+ data = run_locally('ip route get to'.split(" ") + [target_ip])
rr1 = r'{0} via [.0-9]+ dev (?P<dev>.*?) src (?P<ip>[.0-9]+)$'
rr1 = rr1.replace(" ", r'\s+')