a lot of chenges
diff --git a/TODO b/TODO
index ae60fcb..6271e18 100644
--- a/TODO
+++ b/TODO
@@ -3,15 +3,11 @@
offline сенсоры
dd запускать в фоне и чекать периодически
починить все подвисания во всех потоках - дампить стеки при подвисании
-варьирование количества виртуалок
-поправить графики
Finding bottlenecks (алена) - починить процессор
-unified_rw_reporting=1
fadvise_hint=0
Изменить с репорте сенсоров все на %
-Добавить в репорт количество операций
посмотреть что с сетевыми картами
Resource consumption:
добавить процессор,
diff --git a/report_templates/report_ceph.html b/report_templates/report_ceph.html
index d597ced..4ac903b 100644
--- a/report_templates/report_ceph.html
+++ b/report_templates/report_ceph.html
@@ -10,10 +10,33 @@
<div class="page-header text-center">
<h2>Performance Report</h2>
</div>
+
+<!--
+0) Menu
+1) Lab very short performance: max IOPS, max BW, EC2 VM count
+2) Engineering report
+3) boxplots
+4) BW/lat/IOPS = f(time) report
+5) Bottlneck/consumption reports
+6) Excessive lab info
+7) Report description
+-->
+
<div class="container-fluid text-center">
+
<div class="row" style="margin-bottom: 40px">
<div class="col-md-12">
<center>
+
+ <h4>Summary</h4>
+ <table style="width: auto;" class="table table-bordered table-striped">
+ <tr>
+ <td>Compute count</td><td>computes</td>
+ <td>OSD count</td><td>OSD count</td>
+ <td>Total Ceph disks count</td><td>OSD_hdd_count</td>
+ </tr>
+ </table>
+
<table><tr><td>
<H4>Random direct performance,<br>4KiB blocks</H4>
<table style="width: auto;" class="table table-bordered table-striped">
@@ -70,11 +93,11 @@
</div>
<center><br>
<table><tr>
- <td><img src="charts/rand_read_4k.{img_ext}" /></td>
- <td><img src="charts/rand_write_4k.{img_ext}" /></td>
+ <td>{rand_read_4k}</td>
+ <td>{rand_write_4k}</td>
</tr><tr>
- <td><img src="charts/rand_read_16m.{img_ext}" /></td>
- <td><img src="charts/rand_write_16m.{img_ext}" /></td>
+ <td>{rand_read_16m}</td>
+ <td>{rand_write_16m}</td>
</tr></table>
</center>
</center>
diff --git a/report_templates/report_linearity.html b/report_templates/report_linearity.html
new file mode 100644
index 0000000..0fa7862
--- /dev/null
+++ b/report_templates/report_linearity.html
@@ -0,0 +1,29 @@
+<!DOCTYPE html>
+<html>
+<head>
+ <title>Report</title>
+ <link rel="stylesheet"
+ href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/css/bootstrap.min.css">
+</head>
+
+<body>
+<div class="page-header text-center">
+ <h2>IOPS vs Block size</h2>
+</div>
+<div class="container-fluid text-center">
+
+ <div class="row" style="margin-bottom: 40px">
+ <div class="col-md-12">
+ <center>
+ <H3>{descr[oper_descr]} VM_COUNT:{descr[vm_count]} Thread per vm:{descr[concurence]}</H3> <br>
+ <table><tr>
+ <td>{iops_vs_size}</td>
+ <td>{iotime_vs_size}</td>
+ </tr>
+ </center>
+ </div>
+ </div>
+</div>
+</body>
+
+</html>
\ No newline at end of file
diff --git a/wally/sensors/influx_exporter.py b/scripts/influx_exporter.py
similarity index 100%
rename from wally/sensors/influx_exporter.py
rename to scripts/influx_exporter.py
diff --git a/scripts/postprocessing/bottleneck.py b/scripts/postprocessing/bottleneck.py
index d8d4b4c..8507041 100644
--- a/scripts/postprocessing/bottleneck.py
+++ b/scripts/postprocessing/bottleneck.py
@@ -1,5 +1,6 @@
""" Analize test results for finding bottlenecks """
+import re
import sys
import csv
import time
@@ -17,17 +18,10 @@
except ImportError:
pgv = None
+from wally.run_test import load_data_from
from wally.utils import b2ssize, b2ssize_10
-class SensorsData(object):
- def __init__(self, source_id, hostname, ctime, values):
- self.source_id = source_id
- self.hostname = hostname
- self.ctime = ctime
- self.values = values # [((dev, sensor), value)]
-
-
class SensorInfo(object):
def __init__(self, name, print_name, native_ext, to_bytes_coef):
self.name = name
@@ -51,99 +45,68 @@
if sinfo.to_bytes_coef is not None)
-def load_results(fd):
- data = fd.read(100)
- fd.seek(0, os.SEEK_SET)
+class NodeSensorsData(object):
+ def __init__(self, source_id, hostname, headers, values):
+ self.source_id = source_id
+ self.hostname = hostname
+ self.headers = headers
+ self.values = values
+ self.times = None
- # t = time.time()
- if '(' in data or '{' in data:
- res, source_id2nostname = load_results_eval(fd)
- else:
- res, source_id2nostname = load_results_csv(fd)
+ def finalize(self):
+ self.times = [v[0] for v in self.values]
- # print int(((time.time() - t) * 1000000) / len(res)), len(res)
+ def get_data_for_interval(self, beg, end):
+ p1 = bisect.bisect_left(self.times, beg)
+ p2 = bisect.bisect_right(self.times, end)
- return res, source_id2nostname
+ obj = self.__class__(self.source_id,
+ self.hostname,
+ self.headers,
+ self.values[p1:p2])
+ obj.times = self.times[p1:p2]
+ return obj
+
+ def __getitem__(self, name):
+ idx = self.headers.index(name.split('.'))
+ # +1 as first is a time
+ return [val[idx] for val in self.values]
def load_results_csv(fd):
-
- fields = {}
- res = []
- source_id2nostname = {}
- coefs = {}
-
- # cached for performance
- ii = int
- zz = zip
- SD = SensorsData
- ra = res.append
-
- for row in csv.reader(fd):
- if len(row) == 0:
- continue
- ip, port = row[:2]
- ip_port = (ip, ii(port))
-
- if ip_port not in fields:
- sensors = [i.split('.') for i in row[4:]]
- fields[ip_port] = row[2:4] + sensors
- source_id2nostname[row[2]] = row[3]
- coefs[ip_port] = [to_bytes.get(s[1], 1) for s in sensors]
- else:
- fld = fields[ip_port]
- processed_data = []
- a = processed_data.append
-
- # this cycle is critical for performance
- # don't "refactor" it, unles you are confident
- # in what you are doing
- for dev_sensor, val, coef in zz(fld[2:], row[3:], coefs[ip_port]):
- a((dev_sensor, ii(val) * coef))
-
- ctime = ii(row[2])
- sd = SD(fld[0], fld[1], ctime, processed_data)
- ra((ctime, sd))
-
- res.sort(key=lambda x: x[0])
- return res, source_id2nostname
-
-
-def load_results_eval(fd):
- res = []
- source_id2nostname = {}
-
- for line in fd:
- if line.strip() == "":
+ data = fd.read()
+ results = {}
+ for block in data.split("NEW_DATA"):
+ block = block.strip()
+ if len(block) == 0:
continue
- _, data = eval(line)
- ctime = data.pop('time')
- source_id = data.pop('source_id')
- hostname = data.pop('hostname')
+ it = csv.reader(block.split("\n"))
+ headers = next(it)
+ sens_data = [map(int, vals) for vals in it]
+ source_id, hostname = headers[:2]
+ headers = [(None, 'time')] + \
+ [header.split('.') for header in headers[2:]]
+ assert set(map(len, headers)) == set([2])
- processed_data = []
- for k, v in data.items():
- dev, sensor = k.split('.')
- processed_data.append(((dev, sensor),
- v * to_bytes.get(sensor, 1)))
+ results[source_id] = NodeSensorsData(source_id, hostname,
+ headers, sens_data)
- sd = SensorsData(source_id, hostname, ctime, processed_data)
- res.append((ctime, sd))
- source_id2nostname[source_id] = hostname
-
- res.sort(key=lambda x: x[0])
- return res, source_id2nostname
+ return results
-def load_test_timings(fd, max_diff=1000):
+def load_test_timings(fname, max_diff=1000):
raw_map = collections.defaultdict(lambda: [])
- data = yaml.load(fd.read())
- for test_type, test_results in data:
+
+ class data(object):
+ pass
+
+ load_data_from(fname)(None, data)
+
+ for test_type, test_results in data.results:
if test_type == 'io':
for tests_res in test_results:
- for test_res in tests_res['res']:
- raw_map[test_res['name']].append(test_res['run_interval'])
+ raw_map[tests_res.config.name].append(tests_res.run_interval)
result = {}
for name, intervals in raw_map.items():
@@ -208,15 +171,28 @@
def total_consumption(sensors_data, roles_map):
result = {}
- for _, item in sensors_data:
- for (dev, sensor), val in item.values:
+ for name, sensor_data in sensors_data.items():
+ for pos, (dev, sensor) in enumerate(sensor_data.headers):
+ if 'time' == sensor:
+ continue
try:
ad = result[sensor]
except KeyError:
ad = result[sensor] = AggregatedData(sensor)
- ad.per_device[(item.hostname, dev)] += val
+ val = sum(vals[pos] for vals in sensor_data.values)
+
+ ad.per_device[(sensor_data.hostname, dev)] += val
+
+ # vals1 = sensors_data['localhost:22']['sdc.sectors_read']
+ # vals2 = sensors_data['localhost:22']['sdb.sectors_written']
+
+ # from matplotlib import pyplot as plt
+ # plt.plot(range(len(vals1)), vals1)
+ # plt.plot(range(len(vals2)), vals2)
+ # plt.show()
+ # exit(1)
for ad in result.values():
for (hostname, dev), val in ad.per_device.items():
@@ -299,25 +275,6 @@
return "\n".join(table)
-def parse_args(args):
- parser = argparse.ArgumentParser()
- parser.add_argument('-t', '--time_period', nargs=2,
- type=int, default=None,
- help="Begin and end time for tests")
- parser.add_argument('-m', '--max-bottlenek', type=int,
- default=15, help="Max bottlenek to show")
- parser.add_argument('-x', '--max-diff', type=int,
- default=10, help="Max bottlenek to show in" +
- "0.1% from test nodes summ load")
- parser.add_argument('-d', '--debug-ver', action='store_true',
- help="Full report with original data")
- parser.add_argument('-u', '--user-ver', action='store_true',
- default=True, help="Avg load report")
- parser.add_argument('-s', '--select-loads', nargs='*', default=[])
- parser.add_argument('results_folder')
- return parser.parse_args(args[1:])
-
-
def make_roles_mapping(source_id_mapping, source_id2hostname):
result = {}
for ssh_url, roles in source_id_mapping.items():
@@ -349,7 +306,8 @@
if sens.to_bytes_coef is not None:
agg = consumption.get(name)
if agg is not None:
- max_data = max(max_data, agg.per_role.get('testnode', 0))
+ cdt = agg.per_role.get('testnode', 0) * sens.to_bytes_coef
+ max_data = max(max_data, cdt)
return max_data
@@ -364,12 +322,11 @@
def get_data_for_intervals(data, intervals):
- res = []
+ res = {}
for begin, end in intervals:
- times = [ctime for ctime, _ in data]
- b_p = bisect.bisect_left(times, begin)
- e_p = bisect.bisect_right(times, end)
- res.extend(data[b_p:e_p])
+ for name, node_data in data.items():
+ ndata = node_data.get_data_for_interval(begin, end)
+ res[name] = ndata
return res
@@ -380,92 +337,181 @@
self.net_devs = None
-# def plot_consumption(per_consumer_table, fields):
-# hosts = {}
-# storage_sensors = ('sectors_written', 'sectors_read')
+def plot_consumption(per_consumer_table, fields, refload):
+ if pgv is None:
+ return
-# for (hostname, dev), consumption in per_consumer_table.items():
-# if dev != '*':
-# continue
+ hosts = {}
+ storage_sensors = ('sectors_written', 'sectors_read')
-# if hostname not in hosts:
-# hosts[hostname] = Host(hostname)
+ for (hostname, dev), consumption in per_consumer_table.items():
+ if hostname not in hosts:
+ hosts[hostname] = Host(hostname)
-# cons_map = map(zip(fields, consumption))
+ host = hosts[hostname]
+ cons_map = dict(zip(fields, consumption))
-# for sn in storage_sensors:
-# vl = cons_map.get(sn, 0)
-# if vl > 0:
-# pass
+ for sn in storage_sensors:
+ vl = cons_map.get(sn, 0)
+ if vl > 0:
+ host.hdd_devs.setdefault(dev, {})[sn] = vl
+
+ p = pgv.AGraph(name='system', directed=True)
+
+ net = "Network"
+ p.add_node(net)
+
+ in_color = 'red'
+ out_color = 'green'
+
+ for host in hosts.values():
+ g = p.subgraph(name="cluster_" + host.name, label=host.name,
+ color="blue")
+ g.add_node(host.name, shape="diamond")
+ p.add_edge(host.name, net)
+ p.add_edge(net, host.name)
+
+ for dev_name, values in host.hdd_devs.items():
+ if dev_name == '*':
+ continue
+
+ to = values.get('sectors_written', 0)
+ frm = values.get('sectors_read', 0)
+ to_pw = 7 * to / refload
+ frm_pw = 7 * frm / refload
+ min_with = 0.1
+
+ if to_pw > min_with or frm_pw > min_with:
+ dev_fqn = host.name + "." + dev_name
+ g.add_node(dev_fqn)
+
+ if to_pw > min_with:
+ g.add_edge(host.name, dev_fqn,
+ label=b2ssize(to) + "B",
+ penwidth=to_pw,
+ fontcolor=out_color,
+ color=out_color)
+
+ if frm_pw > min_with:
+ g.add_edge(dev_fqn, host.name,
+ label=b2ssize(frm) + "B",
+ penwidth=frm_pw,
+ color=in_color,
+ fontcolor=in_color)
+
+ return p.string()
+
+
+def parse_args(args):
+ parser = argparse.ArgumentParser()
+ parser.add_argument('-t', '--time_period', nargs=2,
+ type=int, default=None,
+ help="Begin and end time for tests")
+ parser.add_argument('-m', '--max-bottlenek', type=int,
+ default=15, help="Max bottlenek to show")
+ parser.add_argument('-x', '--max-diff', type=int,
+ default=10, help="Max bottlenek to show in" +
+ "0.1% from test nodes summ load")
+ parser.add_argument('-d', '--debug-ver', action='store_true',
+ help="Full report with original data")
+ parser.add_argument('-u', '--user-ver', action='store_true',
+ default=True, help="Avg load report")
+ parser.add_argument('-s', '--select-loads', nargs='*', default=[])
+ parser.add_argument('-f', '--fields', nargs='*', default=[])
+ parser.add_argument('results_folder')
+ return parser.parse_args(args[1:])
def main(argv):
opts = parse_args(argv)
- sensors_data_fname = os.path.join(opts.results_folder,
- 'sensor_storage.txt')
+ stor_dir = os.path.join(opts.results_folder, 'sensor_storage')
+ data = {}
+ source_id2hostname = {}
+
+ csv_files = os.listdir(stor_dir)
+ for fname in csv_files:
+ assert re.match(r"\d+_\d+.csv$", fname)
+
+ csv_files.sort(key=lambda x: int(x.split('_')[0]))
+
+ for fname in csv_files:
+ with open(os.path.join(stor_dir, fname)) as fd:
+ for name, node_sens_data in load_results_csv(fd).items():
+ if name in data:
+ assert data[name].hostname == node_sens_data.hostname
+ assert data[name].source_id == node_sens_data.source_id
+ assert data[name].headers == node_sens_data.headers
+ data[name].values.extend(node_sens_data.values)
+ else:
+ data[name] = node_sens_data
+
+ for nd in data.values():
+ assert nd.source_id not in source_id2hostname
+ source_id2hostname[nd.source_id] = nd.hostname
+ nd.finalize()
roles_file = os.path.join(opts.results_folder,
'nodes.yaml')
- raw_results_file = os.path.join(opts.results_folder,
- 'raw_results.yaml')
-
src2roles = yaml.load(open(roles_file))
- timings = load_test_timings(open(raw_results_file))
- with open(sensors_data_fname) as fd:
- data, source_id2hostname = load_results(fd)
+
+ timings = load_test_timings(opts.results_folder)
roles_map = make_roles_mapping(src2roles, source_id2hostname)
max_diff = float(opts.max_diff) / 1000
- # print print_bottlenecks(data, opts.max_bottlenek)
- # print print_bottlenecks(data, opts.max_bottlenek)
+ fields = ('recv_bytes', 'send_bytes',
+ 'sectors_read', 'sectors_written',
+ 'reads_completed', 'writes_completed')
- for name, intervals in sorted(timings.items()):
+ if opts.fields != []:
+ fields = [field for field in fields if field in opts.fields]
+
+ for test_name, intervals in sorted(timings.items()):
if opts.select_loads != []:
- if name not in opts.select_loads:
+ if test_name not in opts.select_loads:
continue
- print
- print
- print "-" * 30 + " " + name + " " + "-" * 30
- print
+ data_chunks = get_data_for_intervals(data, intervals)
- data_chunk = get_data_for_intervals(data, intervals)
-
- consumption = total_consumption(data_chunk, roles_map)
+ consumption = total_consumption(data_chunks, roles_map)
testdata_sz = get_testdata_size(consumption) * max_diff
testop_count = get_testop_cout(consumption) * max_diff
- fields = ('recv_bytes', 'send_bytes',
- 'sectors_read', 'sectors_written',
- 'reads_completed', 'writes_completed')
per_consumer_table = {}
+ per_consumer_table_str = {}
all_consumers = set(consumption.values()[0].all_together)
+ fields = [field for field in fields if field in consumption]
all_consumers_sum = []
for consumer in all_consumers:
+ tb_str = per_consumer_table_str[consumer] = []
tb = per_consumer_table[consumer] = []
vl = 0
for name in fields:
val = consumption[name].all_together[consumer]
if SINFO_MAP[name].to_bytes_coef is None:
if val < testop_count:
- val = 0
- tb.append(b2ssize_10(int(val)))
+ tb_str.append('0')
+ else:
+ tb_str.append(b2ssize_10(int(val)))
else:
+ val = int(val) * SINFO_MAP[name].to_bytes_coef
if val < testdata_sz:
- val = 0
- tb.append(b2ssize(int(val)) + "B")
+ tb_str.append('-')
+ else:
+ tb_str.append(b2ssize(val) + "B")
+ tb.append(int(val))
vl += int(val)
all_consumers_sum.append((vl, consumer))
all_consumers_sum.sort(reverse=True)
- # plot_consumption(per_consumer_table, fields)
- # continue
+
+ plot_consumption(per_consumer_table, fields,
+ testdata_sz / max_diff)
tt = texttable.Texttable(max_width=130)
tt.set_cols_align(["l"] + ["r"] * len(fields))
@@ -481,11 +527,13 @@
for summ, consumer in all_consumers_sum:
if summ > 0:
tt.add_row([":".join(consumer)] +
- [v if v not in ('0B', '0') else '-'
- for v in per_consumer_table[consumer]])
+ per_consumer_table_str[consumer])
tt.set_deco(texttable.Texttable.VLINES | texttable.Texttable.HEADER)
- print tt.draw()
+ res = tt.draw()
+ max_len = max(map(len, res.split("\n")))
+ print test_name.center(max_len)
+ print res
if __name__ == "__main__":
diff --git a/wally/config.py b/wally/config.py
index 90fde3c..2a09d47 100644
--- a/wally/config.py
+++ b/wally/config.py
@@ -56,16 +56,17 @@
with open(run_params_file, 'w') as fd:
fd.write(dumps({'run_uuid': cfg_dict['run_uuid']}))
- cfg_dict['charts_img_path'] = in_var_dir('charts')
- mkdirs_if_unxists(cfg_dict['charts_img_path'])
-
cfg_dict['vm_ids_fname'] = in_var_dir('os_vm_ids')
cfg_dict['html_report_file'] = in_var_dir('{0}_report.html')
cfg_dict['text_report_file'] = in_var_dir('report.txt')
cfg_dict['log_file'] = in_var_dir('log.txt')
- cfg_dict['sensor_storage'] = in_var_dir('sensor_storage.txt')
+ cfg_dict['sensor_storage'] = in_var_dir('sensor_storage')
+ mkdirs_if_unxists(cfg_dict['sensor_storage'])
cfg_dict['nodes_report_file'] = in_var_dir('nodes.yaml')
+ if 'sensors_remote_path' not in cfg_dict:
+ cfg_dict['sensors_remote_path'] = '/tmp/sensors'
+
testnode_log_root = cfg_dict.get('testnode_log_root', '/var/wally')
testnode_log_dir = os.path.join(testnode_log_root, "{0}/{{name}}")
cfg_dict['default_test_local_folder'] = \
diff --git a/wally/discover/fuel.py b/wally/discover/fuel.py
index be10116..34adc07 100644
--- a/wally/discover/fuel.py
+++ b/wally/discover/fuel.py
@@ -68,7 +68,7 @@
nodes = []
ips_ports = []
- logger.info("Forwarding ssh ports from FUEL nodes localhost")
+ logger.info("Forwarding ssh ports from FUEL nodes to localhost")
fuel_usr, fuel_passwd = ssh_creds.split(":", 1)
ips = [str(fuel_node.get_ip(network)) for fuel_node in fuel_nodes]
port_fw = forward_ssh_ports(fuel_host, fuel_usr, fuel_passwd, ips)
diff --git a/wally/hw_info.py b/wally/hw_info.py
index 9d38913..5819eed 100644
--- a/wally/hw_info.py
+++ b/wally/hw_info.py
@@ -27,11 +27,18 @@
self.raw = None
self.storage_controllers = []
+ def get_HDD_count(self):
+ # SATA HDD COUNT, SAS 10k HDD COUNT, SAS SSD count, PCI-E SSD count
+ return []
+
def get_summary(self):
cores = sum(count for _, count in self.cores)
disks = sum(size for _, size in self.disks_info.values())
- return {'cores': cores, 'ram': self.ram_size, 'storage': disks}
+ return {'cores': cores,
+ 'ram': self.ram_size,
+ 'storage': disks,
+ 'disk_count': len(self.disks_info)}
def __str__(self):
res = []
@@ -92,7 +99,6 @@
class SWInfo(object):
def __init__(self):
- self.os = None
self.partitions = None
self.kernel_version = None
self.fio_version = None
@@ -105,6 +111,28 @@
def get_sw_info(conn):
res = SWInfo()
+
+ with conn.open_sftp() as sftp:
+ def get(fname):
+ try:
+ return ssh_utils.read_from_remote(sftp, fname)
+ except:
+ return None
+
+ res.kernel_version = get('/proc/version')
+ res.partitions = get('/etc/mtab')
+ res.OS_version = get('/etc/lsb-release')
+
+ def rr(cmd):
+ try:
+ return ssh_utils.run_over_ssh(conn, cmd, nolog=True)
+ except:
+ return None
+
+ res.libvirt_version = rr("virsh -v")
+ res.qemu_version = rr("qemu-system-x86_64 --version")
+ res.ceph_version = rr("ceph --version")
+
return res
diff --git a/wally/pretty_yaml.py b/wally/pretty_yaml.py
index ff1f3bc..699af7e 100644
--- a/wally/pretty_yaml.py
+++ b/wally/pretty_yaml.py
@@ -3,9 +3,12 @@
def dumps_simple(val):
- bad_symbols = set(" \r\t\n,':")
+ bad_symbols = set(" \r\t\n,':{}[]><;")
if isinstance(val, basestring):
+ if isinstance(val, unicode):
+ val = val.encode('utf8')
+
if len(bad_symbols & set(val)) != 0:
return repr(val)
return val
@@ -28,7 +31,7 @@
return all(isinstance(val, (int, float, long)) for val in vals)
-def dumpv(data, tab_sz=4, width=120, min_width=40):
+def dumpv(data, tab_sz=4, width=160, min_width=40):
tab = ' ' * tab_sz
if width < min_width:
@@ -44,6 +47,8 @@
one_line = "[{0}]".format(", ".join(map(dumps_simple, data)))
else:
one_line = "[{0}]".format(",".join(map(dumps_simple, data)))
+ elif len(data) == 0:
+ one_line = "[]"
else:
one_line = None
@@ -58,21 +63,40 @@
else:
res.append(one_line)
elif isinstance(data, dict):
- assert all(map(is_simple, data.keys()))
+ if len(data) == 0:
+ res.append("{}")
+ else:
+ assert all(map(is_simple, data.keys()))
- for k, v in data.items():
- key_str = dumps_simple(k) + ": "
- val_res = dumpv(v, tab_sz, width - tab_sz, min_width)
+ one_line = None
+ if all(map(is_simple, data.values())):
+ one_line = ", ".join(
+ "{0}: {1}".format(dumps_simple(k), dumps_simple(v))
+ for k, v in sorted(data.items()))
+ one_line = "{" + one_line + "}"
+ if len(one_line) > width:
+ one_line = None
- if len(val_res) == 1 and \
- len(key_str + val_res[0]) < width and \
- not isinstance(v, dict):
- res.append(key_str + val_res[0])
+ if one_line is None:
+ for k, v in data.items():
+ key_str = dumps_simple(k) + ": "
+ val_res = dumpv(v, tab_sz, width - tab_sz, min_width)
+
+ if len(val_res) == 1 and \
+ len(key_str + val_res[0]) < width and \
+ not isinstance(v, dict):
+ res.append(key_str + val_res[0])
+ else:
+ res.append(key_str)
+ res.extend(tab + i for i in val_res)
else:
- res.append(key_str)
- res.extend(tab + i for i in val_res)
+ res.append(one_line)
else:
- raise ValueError("Can't pack {0!r}".format(data))
+ try:
+ get_yamable = data.get_yamable
+ except AttributeError:
+ raise ValueError("Can't pack {0!r}".format(data))
+ res = dumpv(get_yamable(), tab_sz, width, min_width)
return res
diff --git a/wally/report.py b/wally/report.py
index f46352d..04577a9 100644
--- a/wally/report.py
+++ b/wally/report.py
@@ -2,16 +2,19 @@
import bisect
import logging
import collections
+from cStringIO import StringIO
try:
+ import numpy
+ import scipy
import matplotlib.pyplot as plt
except ImportError:
plt = None
import wally
-from wally import charts
from wally.utils import ssize2b
from wally.statistic import round_3_digit, data_property
+from wally.suits.io.fio_task_parser import get_test_sync_mode
logger = logging.getLogger("wally.report")
@@ -31,35 +34,43 @@
report_funcs = []
+class Attrmapper(object):
+ def __init__(self, dct):
+ self.__dct = dct
+
+ def __getattr__(self, name):
+ try:
+ return self.__dct[name]
+ except KeyError:
+ raise AttributeError(name)
+
+
class PerfInfo(object):
- def __init__(self, name, intervals, params, testnodes_count):
+ def __init__(self, name, summary, intervals, params, testnodes_count):
self.name = name
self.bw = None
self.iops = None
self.lat = None
+
+ self.raw_bw = []
+ self.raw_iops = []
+ self.raw_lat = []
+
self.params = params
self.intervals = intervals
self.testnodes_count = testnodes_count
+ self.summary = summary
+ self.p = Attrmapper(self.params.vals)
-
-def split_and_add(data, block_size):
- assert len(data) % block_size == 0
- res = [0] * block_size
-
- for idx, val in enumerate(data):
- res[idx % block_size] += val
-
- return res
+ self.sync_mode = get_test_sync_mode(self.params)
+ self.concurence = self.params.vals.get('numjobs', 1)
def group_by_name(test_data):
name_map = collections.defaultdict(lambda: [])
- for block in test_data:
- for data in block['res']:
- data = data.copy()
- data['__meta__'] = block['__meta__']
- name_map[data['name']].append(data)
+ for data in test_data:
+ name_map[(data.config.name, data.summary())].append(data)
return name_map
@@ -67,37 +78,27 @@
def process_disk_info(test_data):
name_map = group_by_name(test_data)
data = {}
- for name, results in name_map.items():
- testnodes_count_set = set(dt['__meta__']['testnodes_count']
- for dt in results)
+ for (name, summary), results in name_map.items():
+ testnodes_count_set = set(dt.vm_count for dt in results)
assert len(testnodes_count_set) == 1
testnodes_count, = testnodes_count_set
assert len(results) % testnodes_count == 0
- block_count = len(results) // testnodes_count
- intervals = [result['run_interval'] for result in results]
+ intervals = [result.run_interval for result in results]
+ p = results[0].config
+ pinfo = PerfInfo(p.name, result.summary(), intervals,
+ p, testnodes_count)
- p = results[0]['params'].copy()
- rt = p.pop('ramp_time', 0)
+ pinfo.raw_bw = [result.results['bw'] for result in results]
+ pinfo.raw_iops = [result.results['iops'] for result in results]
+ pinfo.raw_lat = [result.results['lat'] for result in results]
- for result in results[1:]:
- tp = result['params'].copy()
- tp.pop('ramp_time', None)
- assert tp == p
+ pinfo.bw = data_property(map(sum, zip(*pinfo.raw_bw)))
+ pinfo.iops = data_property(map(sum, zip(*pinfo.raw_iops)))
+ pinfo.lat = data_property(sum(pinfo.raw_lat, []))
- p['ramp_time'] = rt
- pinfo = PerfInfo(name, intervals, p, testnodes_count)
-
- bw = [result['results']['bw'] for result in results]
- iops = [result['results']['iops'] for result in results]
- lat = [result['results']['lat'] for result in results]
-
- pinfo.bw = data_property(split_and_add(bw, block_count))
- pinfo.iops = data_property(split_and_add(iops, block_count))
- pinfo.lat = data_property(lat)
-
- data[name] = pinfo
+ data[(p.name, summary)] = pinfo
return data
@@ -108,70 +109,138 @@
return closure
-def linearity_report(processed_results, path, lab_info):
- names = {}
- for tp1 in ('rand', 'seq'):
- for oper in ('read', 'write'):
- for sync in ('sync', 'direct', 'async'):
- sq = (tp1, oper, sync)
- name = "{0} {1} {2}".format(*sq)
- names["".join(word[0] for word in sq)] = name
+def get_test_lcheck_params(pinfo):
+ res = [{
+ 's': 'sync',
+ 'd': 'direct',
+ 'a': 'async',
+ 'x': 'sync direct'
+ }[pinfo.sync_mode]]
- colors = ['red', 'green', 'blue', 'cyan',
- 'magenta', 'black', 'yellow', 'burlywood']
- markers = ['*', '^', 'x', 'o', '+', '.']
- color = 0
- marker = 0
+ res.append(pinfo.p.rw)
- plot_data = {}
-
- name_pref = 'linearity_test_rrd'
-
- for res in processed_results.values():
- if res.name.startswith(name_pref):
- iotime = 1000000. / res.iops
- iotime_max = iotime * (1 + res.dev * 3)
- bsize = ssize2b(res.raw['blocksize'])
- plot_data[bsize] = (iotime, iotime_max)
-
- min_sz = min(plot_data)
- min_iotime, _ = plot_data.pop(min_sz)
-
- x = []
- y = []
- e = []
-
- for k, (v, vmax) in sorted(plot_data.items()):
- y.append(v - min_iotime)
- x.append(k)
- e.append(y[-1] - (vmax - min_iotime))
-
- tp = 'rrd'
- plt.errorbar(x, y, e, linestyle='None', label=names[tp],
- color=colors[color], ecolor="black",
- marker=markers[marker])
- plt.yscale('log')
- plt.xscale('log')
- # plt.show()
-
- # ynew = approximate_line(ax, ay, ax, True)
- # plt.plot(ax, ynew, color=colors[color])
- # color += 1
- # marker += 1
- # plt.legend(loc=2)
- # plt.title("Linearity test by %i dots" % (len(vals)))
+ return " ".join(res)
-if plt:
- linearity_report = report('linearity', 'linearity_test')(linearity_report)
+def get_emb_data_svg(plt):
+ sio = StringIO()
+ plt.savefig(sio, format='svg')
+ img_start = "<!-- Created with matplotlib (http://matplotlib.org/) -->"
+ return sio.getvalue().split(img_start, 1)[1]
-def render_all_html(dest, info, lab_description, img_ext, templ_name):
+def get_template(templ_name):
very_root_dir = os.path.dirname(os.path.dirname(wally.__file__))
templ_dir = os.path.join(very_root_dir, 'report_templates')
templ_file = os.path.join(templ_dir, templ_name)
- templ = open(templ_file, 'r').read()
+ return open(templ_file, 'r').read()
+
+@report('linearity', 'linearity_test')
+def linearity_report(processed_results, path, lab_info):
+ labels_and_data = []
+
+ vls = processed_results.values()[0].params.vals.copy()
+ del vls['blocksize']
+
+ for res in processed_results.values():
+ if res.name.startswith('linearity_test'):
+ iotimes = [1000. / val for val in res.iops.raw]
+ labels_and_data.append([res.p.blocksize, res.iops.raw, iotimes])
+ cvls = res.params.vals.copy()
+ del cvls['blocksize']
+ assert cvls == vls
+
+ labels_and_data.sort(key=lambda x: ssize2b(x[0]))
+ _, ax1 = plt.subplots()
+
+ labels, data, iotimes = zip(*labels_and_data)
+ plt.boxplot(iotimes)
+
+ if len(labels_and_data) > 2 and ssize2b(labels_and_data[-2][0]) >= 4096:
+ xt = range(1, len(labels) + 1)
+
+ def io_time(sz, bw, initial_lat):
+ return sz / bw + initial_lat
+
+ x = numpy.array(map(ssize2b, labels))
+ y = numpy.array([sum(dt) / len(dt) for dt in iotimes])
+ popt, _ = scipy.optimize.curve_fit(io_time, x, y, p0=(100., 1.))
+
+ y1 = io_time(x, *popt)
+ plt.plot(xt, y1, linestyle='--', label='LS linear approxomation')
+
+ for idx, (sz, _, _) in enumerate(labels_and_data):
+ if ssize2b(sz) >= 4096:
+ break
+
+ bw = (x[-1] - x[idx]) / (y[-1] - y[idx])
+ lat = y[-1] - x[-1] / bw
+ y2 = io_time(x, bw, lat)
+
+ plt.plot(xt, y2, linestyle='--',
+ label='(4k & max) linear approxomation')
+
+ plt.setp(ax1, xticklabels=labels)
+
+ plt.xlabel("Block size")
+ plt.ylabel("IO time, ms")
+
+ plt.legend(loc=0)
+ plt.grid()
+ iotime_plot = get_emb_data_svg(plt)
+
+ _, ax1 = plt.subplots()
+ plt.boxplot(data)
+ plt.setp(ax1, xticklabels=labels)
+
+ plt.xlabel("Block size")
+ plt.ylabel("IOPS")
+ plt.grid()
+
+ iops_plot = get_emb_data_svg(plt)
+
+ res1 = processed_results.values()[0]
+ descr = {
+ 'vm_count': res1.testnodes_count,
+ 'concurence': res1.concurence,
+ 'oper_descr': get_test_lcheck_params(res1).capitalize()
+ }
+
+ params_map = {'iotime_vs_size': iotime_plot,
+ 'iops_vs_size': iops_plot,
+ 'descr': descr}
+
+ with open(path, 'w') as fd:
+ fd.write(get_template('report_linearity.html').format(**params_map))
+
+
+@report('lat_vs_iops', 'lat_vs_iops')
+def lat_vs_iops(processed_results, path, lab_info):
+ lat_iops = collections.defaultdict(lambda: [])
+ for res in processed_results.values():
+ if res.name.startswith('lat_vs_iops'):
+ lat_iops[res.concurence].append((res.lat.average / 1000.0,
+ res.lat.deviation / 1000.0,
+ res.iops.average,
+ res.iops.deviation))
+
+ colors = ['red', 'green', 'blue', 'orange', 'magenta', "teal"][::-1]
+ for conc, lat_iops in sorted(lat_iops.items()):
+ lat, dev, iops, iops_dev = zip(*lat_iops)
+ plt.errorbar(iops, lat, xerr=iops_dev, yerr=dev, fmt='ro',
+ label=str(conc) + " threads",
+ color=colors.pop())
+
+ plt.xlabel("IOPS")
+ plt.ylabel("Latency, ms")
+ plt.grid()
+ plt.legend(loc=0)
+ plt.show()
+ exit(1)
+
+
+def render_all_html(dest, info, lab_description, images, templ_name):
data = info.__dict__.copy()
for name, val in data.items():
if not name.startswith('__'):
@@ -185,62 +254,25 @@
data['bw_write_max'] = (data['bw_write_max'][0] // 1024,
data['bw_write_max'][1])
- report = templ.format(lab_info=lab_description, img_ext=img_ext,
- **data)
- open(dest, 'w').write(report)
+ images.update(data)
+ report = get_template(templ_name).format(lab_info=lab_description,
+ **images)
-
-def render_hdd_html(dest, info, lab_description, img_ext):
- render_all_html(dest, info, lab_description, img_ext,
- "report_hdd.html")
-
-
-def render_ceph_html(dest, info, lab_description, img_ext):
- render_all_html(dest, info, lab_description, img_ext,
- "report_ceph.html")
+ with open(dest, 'w') as fd:
+ fd.write(report)
def io_chart(title, concurence,
latv, latv_min, latv_max,
- iops_or_bw, iops_or_bw_dev,
- legend, fname):
- bar_data = iops_or_bw
- bar_dev = iops_or_bw_dev
- legend = [legend]
-
- iops_or_bw_per_vm = []
- for iops, conc in zip(iops_or_bw, concurence):
- iops_or_bw_per_vm.append(iops / conc)
-
- bar_dev_bottom = []
- bar_dev_top = []
- for val, err in zip(bar_data, bar_dev):
- bar_dev_top.append(val + err)
- bar_dev_bottom.append(val - err)
-
- charts.render_vertical_bar(title, legend, [bar_data], [bar_dev_top],
- [bar_dev_bottom], file_name=fname,
- scale_x=concurence, label_x="clients",
- label_y=legend[0],
- lines=[
- (latv, "msec", "rr", "lat"),
- # (latv_min, None, None, "lat_min"),
- # (latv_max, None, None, "lat_max"),
- (iops_or_bw_per_vm, None, None,
- legend[0] + " per client")
- ])
-
-
-def io_chart_mpl(title, concurence,
- latv, latv_min, latv_max,
- iops_or_bw, iops_or_bw_err,
- legend, fname, log=False):
+ iops_or_bw, iops_or_bw_err,
+ legend, log=False,
+ boxplots=False):
points = " MiBps" if legend == 'BW' else ""
lc = len(concurence)
width = 0.35
xt = range(1, lc + 1)
- op_per_vm = [v / c for v, c in zip(iops_or_bw, concurence)]
+ op_per_vm = [v / (vm * th) for v, (vm, th) in zip(iops_or_bw, concurence)]
fig, p1 = plt.subplots()
xpos = [i - width / 2 for i in xt]
@@ -252,7 +284,7 @@
label=legend)
p1.grid(True)
- p1.plot(xt, op_per_vm, '--', label=legend + "/vm", color='black')
+ p1.plot(xt, op_per_vm, '--', label=legend + "/thread", color='black')
handles1, labels1 = p1.get_legend_handles_labels()
p2 = p1.twinx()
@@ -261,8 +293,8 @@
p2.plot(xt, latv_min, label="lat min")
plt.xlim(0.5, lc + 0.5)
- plt.xticks(xt, map(str, concurence))
- p1.set_xlabel("Threads")
+ plt.xticks(xt, ["{0} * {1}".format(vm, th) for (vm, th) in concurence])
+ p1.set_xlabel("VM Count * Thread per VM")
p1.set_ylabel(legend + points)
p2.set_ylabel("Latency ms")
plt.title(title)
@@ -270,39 +302,17 @@
plt.legend(handles1 + handles2, labels1 + labels2,
loc='center left', bbox_to_anchor=(1.1, 0.81))
- # fontsize='small')
if log:
p1.set_yscale('log')
p2.set_yscale('log')
- plt.subplots_adjust(right=0.7)
- # plt.show() # bbox_extra_artists=(leg,), bbox_inches='tight')
- # exit(1)
- plt.savefig(fname, format=fname.split('.')[-1])
+ plt.subplots_adjust(right=0.68)
+
+ return get_emb_data_svg(plt)
-def make_hdd_plots(processed_results, charts_dir):
- plots = [
- ('hdd_test_rrd4k', 'rand_read_4k', 'Random read 4k direct IOPS'),
- ('hdd_test_rws4k', 'rand_write_4k', 'Random write 4k sync IOPS')
- ]
- return make_plots(processed_results, charts_dir, plots)
-
-
-def make_ceph_plots(processed_results, charts_dir):
- plots = [
- ('ceph_test_rrd4k', 'rand_read_4k', 'Random read 4k direct IOPS'),
- ('ceph_test_rws4k', 'rand_write_4k', 'Random write 4k sync IOPS'),
- ('ceph_test_rrd16m', 'rand_read_16m',
- 'Random read 16m direct MiBps'),
- ('ceph_test_rwd16m', 'rand_write_16m',
- 'Random write 16m direct MiBps'),
- ]
- return make_plots(processed_results, charts_dir, plots)
-
-
-def make_plots(processed_results, charts_dir, plots):
- file_ext = None
+def make_plots(processed_results, plots):
+ files = {}
for name_pref, fname, desc in plots:
chart_data = []
@@ -313,9 +323,9 @@
if len(chart_data) == 0:
raise ValueError("Can't found any date for " + name_pref)
- use_bw = ssize2b(chart_data[0].params['blocksize']) > 16 * 1024
+ use_bw = ssize2b(chart_data[0].p.blocksize) > 16 * 1024
- chart_data.sort(key=lambda x: x.params['concurence'])
+ chart_data.sort(key=lambda x: x.concurence)
# if x.lat.average < max_lat]
lat = [x.lat.average / 1000 for x in chart_data]
@@ -323,7 +333,7 @@
lat_max = [x.lat.max / 1000 for x in chart_data]
testnodes_count = x.testnodes_count
- concurence = [x.params['concurence'] * testnodes_count
+ concurence = [(testnodes_count, x.concurence)
for x in chart_data]
if use_bw:
@@ -335,25 +345,24 @@
data_dev = [x.iops.confidence for x in chart_data]
name = "IOPS"
- fname = os.path.join(charts_dir, fname)
- if plt is not None:
- io_chart_mpl(desc, concurence, lat, lat_min, lat_max,
- data, data_dev, name, fname + '.svg')
- file_ext = 'svg'
- else:
- io_chart(desc, concurence, lat, lat_min, lat_max,
- data, data_dev, name, fname + '.png')
- file_ext = 'png'
- return file_ext
+ fc = io_chart(title=desc,
+ concurence=concurence,
+ latv=lat, latv_min=lat_min, latv_max=lat_max,
+ iops_or_bw=data,
+ iops_or_bw_err=data_dev,
+ legend=name)
+ files[fname] = fc
+
+ return files
def find_max_where(processed_results, sync_mode, blocksize, rw, iops=True):
result = None
attr = 'iops' if iops else 'bw'
for measurement in processed_results.values():
- ok = measurement.params['sync_mode'] == sync_mode
- ok = ok and (measurement.params['blocksize'] == blocksize)
- ok = ok and (measurement.params['rw'] == rw)
+ ok = measurement.sync_mode == sync_mode
+ ok = ok and (measurement.p.blocksize == blocksize)
+ ok = ok and (measurement.p.rw == rw)
if ok:
field = getattr(measurement, attr)
@@ -388,12 +397,12 @@
'd', '1m', 'read', False)
for res in processed_results.values():
- if res.params['sync_mode'] == 's' and res.params['blocksize'] == '4k':
- if res.params['rw'] != 'randwrite':
+ if res.sync_mode == 's' and res.p.blocksize == '4k':
+ if res.p.rw != 'randwrite':
continue
rws4k_iops_lat_th.append((res.iops.average,
res.lat.average,
- res.params['concurence']))
+ res.concurence))
rws4k_iops_lat_th.sort(key=lambda (_1, _2, conc): conc)
@@ -438,21 +447,33 @@
return hdi
-@report('HDD', 'hdd_test_rrd4k,hdd_test_rws4k')
-def make_hdd_report(processed_results, path, charts_path, lab_info):
- img_ext = make_hdd_plots(processed_results, charts_path)
+@report('HDD', 'hdd_test')
+def make_hdd_report(processed_results, path, lab_info):
+ plots = [
+ ('hdd_test_rrd4k', 'rand_read_4k', 'Random read 4k direct IOPS'),
+ ('hdd_test_rws4k', 'rand_write_4k', 'Random write 4k sync IOPS')
+ ]
+ images = make_plots(processed_results, plots)
di = get_disk_info(processed_results)
- render_hdd_html(path, di, lab_info, img_ext)
+ render_all_html(path, di, lab_info, images, "report_hdd.html")
@report('Ceph', 'ceph_test')
-def make_ceph_report(processed_results, path, charts_path, lab_info):
- img_ext = make_ceph_plots(processed_results, charts_path)
+def make_ceph_report(processed_results, path, lab_info):
+ plots = [
+ ('ceph_test_rrd4k', 'rand_read_4k', 'Random read 4k direct IOPS'),
+ ('ceph_test_rws4k', 'rand_write_4k', 'Random write 4k sync IOPS'),
+ ('ceph_test_rrd16m', 'rand_read_16m', 'Random read 16m direct MiBps'),
+ ('ceph_test_rwd16m', 'rand_write_16m',
+ 'Random write 16m direct MiBps'),
+ ]
+
+ images = make_plots(processed_results, plots)
di = get_disk_info(processed_results)
- render_ceph_html(path, di, lab_info, img_ext)
+ render_all_html(path, di, lab_info, images, "report_ceph.html")
-def make_io_report(dinfo, results, path, charts_path, lab_info=None):
+def make_io_report(dinfo, results, path, lab_info=None):
lab_info = {
"total_disk": "None",
"total_memory": "None",
@@ -461,7 +482,8 @@
}
try:
- res_fields = sorted(dinfo.keys())
+ res_fields = sorted(v.name for v in dinfo.values())
+
for fields, name, func in report_funcs:
for field in fields:
pos = bisect.bisect_left(res_fields, field)
@@ -474,7 +496,7 @@
else:
hpath = path.format(name)
logger.debug("Generatins report " + name + " into " + hpath)
- func(dinfo, hpath, charts_path, lab_info)
+ func(dinfo, hpath, lab_info)
break
else:
logger.warning("No report generator found for this load")
diff --git a/wally/run_test.py b/wally/run_test.py
index 5322432..22856b5 100755
--- a/wally/run_test.py
+++ b/wally/run_test.py
@@ -10,7 +10,6 @@
import argparse
import functools
import threading
-import subprocess
import contextlib
import collections
@@ -23,9 +22,15 @@
from wally.discover import discover, Node
from wally.timeseries import SensorDatastore
from wally import utils, report, ssh_utils, start_vms
+from wally.suits import IOPerfTest, PgBenchTest, MysqlTest
from wally.config import cfg_dict, load_config, setup_loggers
-from wally.suits.itest import IOPerfTest, PgBenchTest, MysqlTest
-from wally.sensors_utils import deploy_sensors_stage, gather_sensors_stage
+from wally.sensors_utils import with_sensors_util, sensors_info_util
+
+TOOL_TYPE_MAPPER = {
+ "io": IOPerfTest,
+ "pgbench": PgBenchTest,
+ "mysql": MysqlTest,
+}
try:
@@ -174,7 +179,8 @@
node=node,
remote_dir=rem_folder,
log_directory=dr,
- coordination_queue=coord_q)
+ coordination_queue=coord_q,
+ total_nodes_count=len(test_nodes))
th = threading.Thread(None, test_thread, None,
(test, node, barrier, res_q))
threads.append(th)
@@ -213,19 +219,33 @@
results.append(val)
- results = test_cls.merge_results(results)
return results
-def run_tests(cfg, test_block, nodes):
- tool_type_mapper = {
- "io": IOPerfTest,
- "pgbench": PgBenchTest,
- "mysql": MysqlTest,
- }
+def suspend_vm_nodes(unused_nodes):
+ pausable_nodes_ids = [node.os_vm_id for node in unused_nodes
+ if node.os_vm_id is not None]
+ non_pausable = len(unused_nodes) - len(pausable_nodes_ids)
+ if 0 != non_pausable:
+ logger.warning("Can't pause {0} nodes".format(
+ non_pausable))
+
+ if len(pausable_nodes_ids) != 0:
+ logger.debug("Try to pause {0} unused nodes".format(
+ len(pausable_nodes_ids)))
+ start_vms.pause(pausable_nodes_ids)
+
+ return pausable_nodes_ids
+
+
+def run_tests(cfg, test_block, nodes):
test_nodes = [node for node in nodes
if 'testnode' in node.roles]
+
+ not_test_nodes = [node for node in nodes
+ if 'testnode' not in node.roles]
+
if len(test_nodes) == 0:
logger.error("No test nodes found")
return
@@ -252,18 +272,7 @@
continue
if cfg.get('suspend_unused_vms', True):
- pausable_nodes_ids = [node.os_vm_id for node in unused_nodes
- if node.os_vm_id is not None]
- non_pausable = len(unused_nodes) - len(pausable_nodes_ids)
-
- if 0 != non_pausable:
- logger.warning("Can't pause {0} nodes".format(
- non_pausable))
-
- if len(pausable_nodes_ids) != 0:
- logger.debug("Try to pause {0} unused nodes".format(
- len(pausable_nodes_ids)))
- start_vms.pause(pausable_nodes_ids)
+ pausable_nodes_ids = suspend_vm_nodes(unused_nodes)
resumable_nodes_ids = [node.os_vm_id for node in curr_test_nodes
if node.os_vm_id is not None]
@@ -273,12 +282,16 @@
len(resumable_nodes_ids)))
start_vms.unpause(resumable_nodes_ids)
- test_cls = tool_type_mapper[name]
+ test_cls = TOOL_TYPE_MAPPER[name]
try:
- res = run_single_test(curr_test_nodes, name, test_cls,
- params,
- cfg['default_test_local_folder'],
- cfg['run_uuid'])
+ sens_nodes = curr_test_nodes + not_test_nodes
+ with sensors_info_util(cfg, sens_nodes) as sensor_data:
+ t_start = time.time()
+ res = run_single_test(curr_test_nodes, name, test_cls,
+ params,
+ cfg['default_test_local_folder'],
+ cfg['run_uuid'])
+ t_end = time.time()
finally:
if cfg.get('suspend_unused_vms', True):
if len(pausable_nodes_ids) != 0:
@@ -286,7 +299,14 @@
len(pausable_nodes_ids)))
start_vms.unpause(pausable_nodes_ids)
- results.append(res)
+ if sensor_data is not None:
+ fname = "{0}_{1}.csv".format(int(t_start), int(t_end))
+ fpath = os.path.join(cfg['sensor_storage'], fname)
+
+ with open(fpath, "w") as fd:
+ fd.write("\n\n".join(sensor_data))
+
+ results.extend(res)
yield name, results
@@ -365,7 +385,8 @@
for creds in p:
vm_name_pattern, conn_pattern = creds.split(",")
- try:
+ msg = "Vm like {0} lookup failed".format(vm_name_pattern)
+ with utils.log_error(msg):
msg = "Looking for vm with name like {0}".format(vm_name_pattern)
logger.debug(msg)
@@ -379,12 +400,6 @@
node = Node(conn_pattern.format(ip=ip), ['testnode'])
node.os_vm_id = vm_id
ctx.nodes.append(node)
- except utils.StopTestError:
- raise
- except Exception as exc:
- msg = "Vm like {0} lookup failed".format(vm_name_pattern)
- logger.exception(msg)
- raise utils.StopTestError(msg, exc)
def get_creds_openrc(path):
@@ -392,24 +407,19 @@
echo = 'echo "$OS_TENANT_NAME:$OS_USERNAME:$OS_PASSWORD@$OS_AUTH_URL"'
- try:
+ msg = "Failed to get creads from openrc file"
+ with utils.log_error(msg):
data = utils.run_locally(['/bin/bash'],
input_data=fc + "\n" + echo)
- except subprocess.CalledProcessError as exc:
- msg = "Failed to get creads from openrc file: " + data
- logger.exception(msg)
- raise utils.StopTestError(msg, exc)
- try:
+ msg = "Failed to get creads from openrc file: " + data
+ with utils.log_error(msg):
data = data.strip()
user, tenant, passwd_auth_url = data.split(':', 2)
passwd, auth_url = passwd_auth_url.rsplit("@", 1)
assert (auth_url.startswith("https://") or
auth_url.startswith("http://"))
- except Exception as exc:
- msg = "Failed to get creads from openrc file: " + data
- logger.exception(msg)
- raise utils.StopTestError(msg, exc)
+
return user, passwd, tenant, auth_url
@@ -512,6 +522,7 @@
num_test_nodes)
with vm_ctx as new_nodes:
if len(new_nodes) != 0:
+ logger.debug("Connecting to new nodes")
connect_all(new_nodes, True)
for node in new_nodes:
@@ -519,17 +530,13 @@
msg = "Failed to connect to vm {0}"
raise RuntimeError(msg.format(node.get_conn_id()))
- deploy_sensors_stage(cfg_dict,
- ctx,
- nodes=new_nodes,
- undeploy=False)
-
- for test_group in config.get('tests', []):
- test_res = run_tests(cfg, test_group, ctx.nodes)
- ctx.results.extend(test_res)
+ with with_sensors_util(cfg_dict, ctx.nodes):
+ for test_group in config.get('tests', []):
+ ctx.results.extend(run_tests(cfg, test_group,
+ ctx.nodes))
else:
- test_res = run_tests(cfg, group, ctx.nodes)
- ctx.results.extend(test_res)
+ with with_sensors_util(cfg_dict, ctx.nodes):
+ ctx.results.extend(run_tests(cfg, group, ctx.nodes))
def shut_down_vms_stage(cfg, ctx):
@@ -595,9 +602,7 @@
fd.flush()
logger.info("Text report were stored in " + text_rep_fname)
- print("\n")
- print(IOPerfTest.format_for_console(data, dinfo))
- print("\n")
+ print("\n" + rep + "\n")
if tp in ['mysql', 'pgbench'] and data is not None:
print("\n")
@@ -618,7 +623,6 @@
found = True
dinfo = report.process_disk_info(data)
report.make_io_report(dinfo, data, html_rep_fname,
- cfg['charts_img_path'],
lab_info=ctx.hw_info)
@@ -629,9 +633,13 @@
def load_data_from(var_dir):
- def load_data_from_file(cfg, ctx):
+ def load_data_from_file(_, ctx):
raw_results = os.path.join(var_dir, 'raw_results.yaml')
- ctx.results = yaml.load(open(raw_results).read())
+ ctx.results = []
+ for tp, results in yaml.load(open(raw_results).read()):
+ cls = TOOL_TYPE_MAPPER[tp]
+ ctx.results.append((tp, map(cls.load, results)))
+
return load_data_from_file
@@ -681,17 +689,25 @@
return parser.parse_args(argv[1:])
-# from plop.collector import Collector
+def get_stage_name(func):
+ if func.__name__.endswith("stage"):
+ return func.__name__
+ else:
+ return func.__name__ + " stage"
def main(argv):
- # collector = Collector()
- # collector.start()
-
faulthandler.register(signal.SIGUSR1, all_threads=True)
opts = parse_args(argv)
load_config(opts.config_file, opts.post_process_only)
+ if cfg_dict.get('logging', {}).get("extra_logs", False) or opts.extra_logs:
+ level = logging.DEBUG
+ else:
+ level = logging.WARNING
+
+ setup_loggers(level, cfg_dict['log_file'])
+
if opts.post_process_only is not None:
stages = [
load_data_from(opts.post_process_only)
@@ -711,10 +727,10 @@
stages.append(collect_hw_info_stage)
stages.extend([
- deploy_sensors_stage,
+ # deploy_sensors_stage,
run_tests_stage,
store_raw_results_stage,
- gather_sensors_stage
+ # gather_sensors_stage
])
report_stages = [
@@ -724,13 +740,6 @@
if not opts.no_html_report:
report_stages.append(html_report_stage)
- if cfg_dict.get('logging', {}).get("extra_logs", False) or opts.extra_logs:
- level = logging.DEBUG
- else:
- level = logging.WARNING
-
- setup_loggers(level, cfg_dict['log_file'])
-
logger.info("All info would be stored into {0}".format(
cfg_dict['var_dir']))
@@ -753,10 +762,7 @@
try:
for stage in stages:
- if stage.__name__.endswith("stage"):
- logger.info("Start {0.__name__}".format(stage))
- else:
- logger.info("Start {0.__name__} stage".format(stage))
+ logger.info("Start " + get_stage_name(stage))
stage(cfg_dict, ctx)
except utils.StopTestError as exc:
logger.error(msg_templ.format(stage, exc))
@@ -766,10 +772,7 @@
exc, cls, tb = sys.exc_info()
for stage in ctx.clear_calls_stack[::-1]:
try:
- if stage.__name__.endswith("stage"):
- logger.info("Start {0.__name__}".format(stage))
- else:
- logger.info("Start {0.__name__} stage".format(stage))
+ logger.info("Start " + get_stage_name(stage))
stage(cfg_dict, ctx)
except utils.StopTestError as cleanup_exc:
logger.error(msg_templ.format(stage, cleanup_exc))
@@ -779,14 +782,16 @@
logger.debug("Start utils.cleanup")
for clean_func, args, kwargs in utils.iter_clean_func():
try:
+ logger.info("Start " + get_stage_name(clean_func))
clean_func(*args, **kwargs)
except utils.StopTestError as cleanup_exc:
- logger.error(msg_templ.format(stage, cleanup_exc))
+ logger.error(msg_templ.format(clean_func, cleanup_exc))
except Exception:
- logger.exception(msg_templ_no_exc.format(stage))
+ logger.exception(msg_templ_no_exc.format(clean_func))
if exc is None:
for report_stage in report_stages:
+ logger.info("Start " + get_stage_name(report_stage))
report_stage(cfg_dict, ctx)
logger.info("All info stored in {0} folder".format(cfg_dict['var_dir']))
@@ -794,9 +799,6 @@
if cfg_dict.get('run_web_ui', False):
stop_web_ui(cfg_dict, ctx)
- # collector.stop()
- # open("plop.out", "w").write(repr(dict(collector.stack_counts)))
-
if exc is None:
logger.info("Tests finished successfully")
return 0
diff --git a/wally/sensors/api.py b/wally/sensors/api.py
index e8c6261..52d33ed 100644
--- a/wally/sensors/api.py
+++ b/wally/sensors/api.py
@@ -1,21 +1,15 @@
-import Queue
+import os
+import time
+import json
import logging
-import threading
+import contextlib
-from .deploy_sensors import (deploy_and_start_sensors,
- stop_and_remove_sensors)
-from .protocol import create_protocol, Timeout, CantUnpack
+from concurrent.futures import ThreadPoolExecutor
+
+from wally.ssh_utils import (copy_paths, run_over_ssh,
+ save_to_remote, read_from_remote)
-__all__ = ['Empty', 'recv_main',
- 'deploy_and_start_sensors',
- 'SensorConfig',
- 'stop_and_remove_sensors',
- 'start_listener_thread',
- ]
-
-
-Empty = Queue.Empty
logger = logging.getLogger("wally.sensors")
@@ -29,40 +23,71 @@
self.monitor_url = monitor_url
-def recv_main(proto, data_q, cmd_q):
- while True:
+@contextlib.contextmanager
+def with_sensors(sensor_configs, remote_path):
+ paths = {os.path.dirname(__file__):
+ os.path.join(remote_path, "sensors")}
+ config_remote_path = os.path.join(remote_path, "conf.json")
+
+ def deploy_sensors(node_sensor_config):
+ copy_paths(node_sensor_config.conn, paths)
+ with node_sensor_config.conn.open_sftp() as sftp:
+ sensors_config = node_sensor_config.sensors.copy()
+ sensors_config['source_id'] = node_sensor_config.source_id
+ save_to_remote(sftp, config_remote_path,
+ json.dumps(sensors_config))
+
+ def remove_sensors(node_sensor_config):
+ run_over_ssh(node_sensor_config.conn,
+ "rm -rf {0}".format(remote_path),
+ node=node_sensor_config.url, timeout=10)
+
+ logger.debug("Installing sensors on {0} nodes".format(len(sensor_configs)))
+ with ThreadPoolExecutor(max_workers=32) as executor:
+ list(executor.map(deploy_sensors, sensor_configs))
try:
- ip, packet = proto.recv(0.1)
- if packet is not None:
- data_q.put((ip, packet))
- except AssertionError as exc:
- logger.warning("Error in sensor data " + str(exc))
- except Timeout:
- pass
- except CantUnpack as exc:
- print exc
+ yield
+ finally:
+ list(executor.map(remove_sensors, sensor_configs))
+
+@contextlib.contextmanager
+def sensors_info(sensor_configs, remote_path):
+ config_remote_path = os.path.join(remote_path, "conf.json")
+
+ def start_sensors(node_sensor_config):
+ cmd_templ = 'env PYTHONPATH="{0}" python -m ' + \
+ "sensors.main -d start -u {1} {2}"
+
+ cmd = cmd_templ.format(remote_path,
+ node_sensor_config.monitor_url,
+ config_remote_path)
+
+ run_over_ssh(node_sensor_config.conn, cmd,
+ node=node_sensor_config.url)
+
+ def stop_and_gather_data(node_sensor_config):
+ cmd = 'env PYTHONPATH="{0}" python -m sensors.main -d stop'
+ cmd = cmd.format(remote_path)
+ run_over_ssh(node_sensor_config.conn, cmd,
+ node=node_sensor_config.url)
+ # some magic
+ time.sleep(1)
+
+ assert node_sensor_config.monitor_url.startswith("csvfile://")
+
+ res_path = node_sensor_config.monitor_url.split("//", 1)[1]
+ with node_sensor_config.conn.open_sftp() as sftp:
+ res = read_from_remote(sftp, res_path)
+
+ return res
+
+ results = []
+
+ logger.debug("Starting sensors on {0} nodes".format(len(sensor_configs)))
+ with ThreadPoolExecutor(max_workers=32) as executor:
+ list(executor.map(start_sensors, sensor_configs))
try:
- val = cmd_q.get(False)
-
- if val is None:
- return
-
- except Queue.Empty:
- pass
-
-
-def start_listener_thread(uri):
- data_q = Queue.Queue()
- cmd_q = Queue.Queue()
- logger.debug("Listening for sensor data on " + uri)
- proto = create_protocol(uri, receiver=True)
- th = threading.Thread(None, recv_main, None, (proto, data_q, cmd_q))
- th.daemon = True
- th.start()
-
- def stop_thread():
- cmd_q.put(None)
- th.join()
-
- return data_q, stop_thread
+ yield results
+ finally:
+ results.extend(executor.map(stop_and_gather_data, sensor_configs))
diff --git a/wally/sensors/deploy_sensors.py b/wally/sensors/deploy_sensors.py
index 4a1c5df..82ab21a 100644
--- a/wally/sensors/deploy_sensors.py
+++ b/wally/sensors/deploy_sensors.py
@@ -3,9 +3,8 @@
import os.path
import logging
-from concurrent.futures import ThreadPoolExecutor, wait
-
-from wally.ssh_utils import copy_paths, run_over_ssh
+from wally.ssh_utils import (copy_paths, run_over_ssh,
+ save_to_remote, read_from_remote)
logger = logging.getLogger('wally.sensors')
@@ -34,25 +33,23 @@
def deploy_and_start_sensor(paths, node_sensor_config, remote_path):
try:
copy_paths(node_sensor_config.conn, paths)
- sftp = node_sensor_config.conn.open_sftp()
+ with node_sensor_config.conn.open_sftp() as sftp:
+ config_remote_path = os.path.join(remote_path, "conf.json")
- config_remote_path = os.path.join(remote_path, "conf.json")
+ sensors_config = node_sensor_config.sensors.copy()
+ sensors_config['source_id'] = node_sensor_config.source_id
+ with sftp.open(config_remote_path, "w") as fd:
+ fd.write(json.dumps(sensors_config))
- sensors_config = node_sensor_config.sensors.copy()
- sensors_config['source_id'] = node_sensor_config.source_id
- with sftp.open(config_remote_path, "w") as fd:
- fd.write(json.dumps(sensors_config))
+ cmd_templ = 'env PYTHONPATH="{0}" python -m ' + \
+ "sensors.main -d start -u {1} {2}"
- cmd_templ = 'env PYTHONPATH="{0}" python -m ' + \
- "sensors.main -d start -u {1} {2}"
+ cmd = cmd_templ.format(os.path.dirname(remote_path),
+ node_sensor_config.monitor_url,
+ config_remote_path)
- cmd = cmd_templ.format(os.path.dirname(remote_path),
- node_sensor_config.monitor_url,
- config_remote_path)
-
- run_over_ssh(node_sensor_config.conn, cmd,
- node=node_sensor_config.url)
- sftp.close()
+ run_over_ssh(node_sensor_config.conn, cmd,
+ node=node_sensor_config.url)
except:
msg = "During deploing sensors on {0}".format(node_sensor_config.url)
diff --git a/wally/sensors/main.py b/wally/sensors/main.py
index 2d0bc81..20eedc5 100644
--- a/wally/sensors/main.py
+++ b/wally/sensors/main.py
@@ -35,7 +35,9 @@
def parse_args(args):
parser = argparse.ArgumentParser()
parser.add_argument('-d', '--daemon',
- choices=('start', 'stop', 'status'),
+ choices=('start', 'stop', 'status',
+ 'start_monitoring', 'stop_monitoring',
+ 'dump_ram_data'),
default=None)
parser.add_argument('-u', '--url', default='stdout://')
diff --git a/wally/sensors/protocol.py b/wally/sensors/protocol.py
index c053011..7c8aa0e 100644
--- a/wally/sensors/protocol.py
+++ b/wally/sensors/protocol.py
@@ -188,6 +188,7 @@
def send(self, data):
if self.headers is None:
self.headers = sorted(data)
+ self.headers.remove('source_id')
for pos, header in enumerate(self.headers):
self.line_format += "{%s:>%s}" % (pos,
@@ -197,6 +198,7 @@
print self.line_format.format(*self.headers)
if self.delta:
+
vals = [data[header].value - self.prev.get(header, 0)
for header in self.headers]
@@ -219,7 +221,7 @@
class CSVFileTransport(ITransport):
- required_keys = set(['time', 'source_id', 'hostname'])
+ required_keys = set(['time', 'source_id'])
def __init__(self, receiver, fname):
ITransport.__init__(self, receiver)
@@ -234,10 +236,25 @@
assert self.required_keys.issubset(keys)
keys -= self.required_keys
self.field_list = sorted(keys)
- self.csv_fd.writerow([data['source_id'], data['hostname']] +
+ self.csv_fd.writerow([data['source_id'], socket.getfqdn()] +
self.field_list)
+ self.field_list = ['time'] + self.field_list
- self.csv_fd.writerow(map(data.__getitem__, ['time'] + self.field_list))
+ self.csv_fd.writerow([data[sens].value for sens in self.field_list])
+
+
+class RAMTransport(ITransport):
+ def __init__(self, next_tr):
+ self.next = next_tr
+ self.data = []
+
+ def send(self, data):
+ self.data.append(data)
+
+ def flush(self):
+ for data in self.data:
+ self.next.send(data)
+ self.data = []
class UDPTransport(ITransport):
@@ -269,10 +286,11 @@
def create_protocol(uri, receiver=False):
- parsed_uri = urlparse(uri)
- if parsed_uri.scheme == 'stdout':
+ if uri == 'stdout':
return StdoutTransport(receiver)
- elif parsed_uri.scheme == 'udp':
+
+ parsed_uri = urlparse(uri)
+ if parsed_uri.scheme == 'udp':
ip, port = parsed_uri.netloc.split(":")
if receiver:
@@ -286,6 +304,9 @@
return FileTransport(receiver, parsed_uri.path)
elif parsed_uri.scheme == 'csvfile':
return CSVFileTransport(receiver, parsed_uri.path)
+ elif parsed_uri.scheme == 'ram':
+ intenal_recv = CSVFileTransport(receiver, parsed_uri.path)
+ return RAMTransport(intenal_recv)
else:
templ = "Can't instantiate transport from {0!r}"
raise ValueError(templ.format(uri))
diff --git a/wally/sensors_utils.py b/wally/sensors_utils.py
index 65de0ef..61a5c08 100644
--- a/wally/sensors_utils.py
+++ b/wally/sensors_utils.py
@@ -1,71 +1,19 @@
-import csv
-import time
-import Queue
+import os.path
import logging
-import threading
+import contextlib
-from wally import utils
-from wally.config import cfg_dict
-from wally.sensors.api import (start_listener_thread,
- deploy_and_start_sensors,
- SensorConfig,
- stop_and_remove_sensors)
+from wally.sensors.api import (with_sensors, sensors_info, SensorConfig)
+
logger = logging.getLogger("wally.sensors")
-DEFAULT_RECEIVER_URL = "udp://{ip}:5699"
-def save_sensors_data(data_q, mon_q, fd, data_store, source2roles_map):
- fd.write("\n")
-
- observed_nodes = set()
- fields_list_for_nodes = {}
- required_keys = set(['time', 'source_id', 'hostname'])
-
- try:
- csv_fd = csv.writer(fd)
- while True:
- val = data_q.get()
- if val is None:
- break
-
- addr, data = val
- if addr not in observed_nodes:
- mon_q.put(addr + (data['source_id'],))
- observed_nodes.add(addr)
- keys = set(data)
- assert required_keys.issubset(keys)
- keys -= required_keys
-
- fields_list_for_nodes[addr] = sorted(keys)
- csv_fd.writerow([addr[0], addr[1],
- data['source_id'], data['hostname']] +
- fields_list_for_nodes[addr])
-
- csv_fd.writerow([addr[0], addr[1]] +
- map(data.__getitem__,
- ['time'] + fields_list_for_nodes[addr]))
-
- # fd.write(repr((addr, data)) + "\n")
- # source_id = data.pop('source_id')
- # rep_time = data.pop('time')
- # if 'testnode' in source2roles_map.get(source_id, []):
- # sum_io_q = 0
- # data_store.update_values(rep_time,
- # {"testnodes:io": sum_io_q},
- # add=True)
- except Exception:
- logger.exception("Error in sensors thread")
- logger.info("Sensors thread exits")
-
-
-def get_sensors_config_for_nodes(cfg, nodes):
+def get_sensors_config_for_nodes(cfg, nodes, remote_path):
monitored_nodes = []
sensors_configs = []
source2roles_map = {}
- receiver_url = cfg.get("receiver_url", DEFAULT_RECEIVER_URL)
- assert '{ip}' in receiver_url
+ receiver_url = "csvfile://" + os.path.join(remote_path, "results.csv")
for role, sensors_str in cfg["roles_mapping"].items():
sensors = [sens.strip() for sens in sensors_str.split(",")]
@@ -74,137 +22,41 @@
for node in nodes:
if role in node.roles:
-
- if node.monitor_ip is not None:
- monitor_url = receiver_url.format(ip=node.monitor_ip)
- else:
- ip = node.get_ip()
- ext_ip = utils.get_ip_for_target(ip)
- monitor_url = receiver_url.format(ip=ext_ip)
-
source2roles_map[node.get_conn_id()] = node.roles
monitored_nodes.append(node)
sens_cfg = SensorConfig(node.connection,
node.get_conn_id(),
collect_cfg,
source_id=node.get_conn_id(),
- monitor_url=monitor_url)
+ monitor_url=receiver_url)
sensors_configs.append(sens_cfg)
return monitored_nodes, sensors_configs, source2roles_map
-def start_sensor_process_thread(ctx, cfg, sensors_configs, source2roles_map):
- receiver_url = cfg.get('receiver_url', DEFAULT_RECEIVER_URL)
- sensors_data_q, stop_sensors_loop = \
- start_listener_thread(receiver_url.format(ip='0.0.0.0'))
-
- mon_q = Queue.Queue()
- fd = open(cfg_dict['sensor_storage'], "w")
-
- params = sensors_data_q, mon_q, fd, ctx.sensors_data, source2roles_map
- sensor_listen_th = threading.Thread(None, save_sensors_data, None,
- params)
- sensor_listen_th.daemon = True
- sensor_listen_th.start()
-
- def stop_sensors_receiver(cfg, ctx):
- stop_sensors_loop()
- sensors_data_q.put(None)
- sensor_listen_th.join()
-
- ctx.clear_calls_stack.append(stop_sensors_receiver)
- return mon_q
-
-
-def deploy_sensors_stage(cfg, ctx, nodes=None, undeploy=True,
- recv_timeout=10, ignore_nodata=False):
-
- cfg = cfg.get('sensors')
- if cfg is None:
+@contextlib.contextmanager
+def with_sensors_util(cfg, nodes):
+ if 'sensors' not in cfg:
+ yield
return
- if nodes is None:
- nodes = ctx.nodes
-
monitored_nodes, sensors_configs, source2roles_map = \
- get_sensors_config_for_nodes(cfg, nodes)
+ get_sensors_config_for_nodes(cfg['sensors'], nodes,
+ cfg['sensors_remote_path'])
- if len(monitored_nodes) == 0:
- logger.info("Nothing to monitor, no sensors would be installed")
+ with with_sensors(sensors_configs, cfg['sensors_remote_path']):
+ yield source2roles_map
+
+
+@contextlib.contextmanager
+def sensors_info_util(cfg, nodes):
+ if 'sensors' not in cfg:
+ yield None
return
- is_online = cfg.get('online', False)
+ _, sensors_configs, _ = \
+ get_sensors_config_for_nodes(cfg['sensors'], nodes,
+ cfg['sensors_remote_path'])
- if is_online:
- if ctx.sensors_mon_q is None:
- logger.info("Start sensors data receiving thread")
- ctx.sensors_mon_q = start_sensor_process_thread(ctx, cfg,
- sensors_configs,
- source2roles_map)
-
- if undeploy:
- def remove_sensors_stage(cfg, ctx):
- _, sensors_configs, _ = \
- get_sensors_config_for_nodes(cfg['sensors'], nodes)
- stop_and_remove_sensors(sensors_configs)
-
- ctx.clear_calls_stack.append(remove_sensors_stage)
-
- num_monitoref_nodes = len(sensors_configs)
- logger.info("Deploing new sensors on {0} node(s)".format(
- num_monitoref_nodes))
-
- deploy_and_start_sensors(sensors_configs)
-
- if is_online:
- wait_for_new_sensors_data(ctx, monitored_nodes, recv_timeout,
- ignore_nodata)
-
-
-def gather_sensors_stage(cfg, ctx, nodes=None):
- cfg = cfg.get('sensors')
- if cfg is None:
- return
-
- is_online = cfg.get('online', False)
- if is_online:
- return
-
- if nodes is None:
- nodes = ctx.nodes
-
- _, sensors_configs, _ = get_sensors_config_for_nodes(cfg, nodes)
- gather_sensors_info(sensors_configs)
-
-
-def gather_sensors_info(sensors_configs):
- pass
-
-
-def wait_for_new_sensors_data(ctx, monitored_nodes, recv_timeout,
- ignore_nodata):
- etime = time.time() + recv_timeout
-
- msg = "Waiting at most {0}s till all {1} nodes starts report sensors data"
- nodes_ids = set(node.get_conn_id() for node in monitored_nodes)
- logger.debug(msg.format(recv_timeout, len(nodes_ids)))
-
- # wait till all nodes start sending data
- while len(nodes_ids) != 0:
- tleft = etime - time.time()
- try:
- source_id = ctx.sensors_mon_q.get(True, tleft)[2]
- except Queue.Empty:
- if not ignore_nodata:
- msg = "Node(s) {0} not sending any sensor data in {1}s"
- msg = msg.format(", ".join(nodes_ids), recv_timeout)
- raise RuntimeError(msg)
- else:
- return
-
- if source_id not in nodes_ids:
- msg = "Receive sensors from extra node: {0}".format(source_id)
- logger.warning(msg)
-
- nodes_ids.remove(source_id)
+ with sensors_info(sensors_configs, cfg['sensors_remote_path']) as res:
+ yield res
diff --git a/wally/ssh_utils.py b/wally/ssh_utils.py
index 45ca892..1e8b647 100644
--- a/wally/ssh_utils.py
+++ b/wally/ssh_utils.py
@@ -1,6 +1,7 @@
import re
import time
import errno
+import random
import socket
import shutil
import logging
@@ -356,18 +357,78 @@
all_sessions = {}
-def start_in_bg(conn, cmd, capture_out=False, **params):
- assert not capture_out
- pid = run_over_ssh(conn, "nohup {0} 2>&1 >/dev/null & echo $!",
- timeout=10, **params)
- return int(pid.strip()), None, None
+class BGSSHTask(object):
+ def __init__(self, node, use_sudo):
+ self.node = node
+ self.pid = None
+ self.use_sudo = use_sudo
+ def start(self, orig_cmd, **params):
+ uniq_name = 'test'
+ cmd = "screen -S {0} -d -m {1}".format(uniq_name, orig_cmd)
+ run_over_ssh(self.node.connection, cmd,
+ timeout=10, node=self.node.get_conn_id(),
+ **params)
+ processes = run_over_ssh(self.node.connection, "ps aux", nolog=True)
-def check_running(conn, pid):
- try:
- run_over_ssh(conn, "ls /proc/{0}", timeout=10, nolog=True)
- except OSError:
- return False
+ for proc in processes.split("\n"):
+ if orig_cmd in proc and "SCREEN" not in proc:
+ self.pid = proc.split()[1]
+ break
+ else:
+ self.pid = -1
+
+ def check_running(self):
+ assert self.pid is not None
+ try:
+ run_over_ssh(self.node.connection,
+ "ls /proc/{0}".format(self.pid),
+ timeout=10, nolog=True)
+ return True
+ except OSError:
+ return False
+ # try:
+ # sftp.stat("/proc/{0}".format(pid))
+ # return True
+ # except (OSError, IOError, NameError):
+ # return False
+
+ def kill(self, soft=True, use_sudo=True):
+ assert self.pid is not None
+ try:
+ if soft:
+ cmd = "kill {0}"
+ else:
+ cmd = "kill -9 {0}"
+
+ if self.use_sudo:
+ cmd = "sudo " + cmd
+
+ run_over_ssh(self.node.connection,
+ cmd.format(self.pid), nolog=True)
+ return True
+ except OSError:
+ return False
+
+ def wait(self, soft_timeout, timeout):
+ end_of_wait_time = timeout + time.time()
+ soft_end_of_wait_time = soft_timeout + time.time()
+ time_till_check = random.randint(5, 10)
+
+ while self.check_running() and time.time() < soft_end_of_wait_time:
+ time.sleep(soft_end_of_wait_time - time.time())
+
+ while end_of_wait_time > time.time():
+ time.sleep(time_till_check)
+ if not self.check_running():
+ break
+ else:
+ self.kill()
+ time.sleep(3)
+ if self.check_running():
+ self.kill(soft=False)
+ return False
+ return True
def run_over_ssh(conn, cmd, stdin_data=None, timeout=60,
@@ -429,9 +490,14 @@
code = session.recv_exit_status()
finally:
+ found = False
with all_sessions_lock:
- del all_sessions[id(session)]
- session.close()
+ if id(session) in all_sessions:
+ found = True
+ del all_sessions[id(session)]
+
+ if found:
+ session.close()
if code != 0:
templ = "SSH:{0} Cmd {1!r} failed with code {2}. Output: {3}"
diff --git a/wally/start_vms.py b/wally/start_vms.py
index 7e1c687..3ab4383 100644
--- a/wally/start_vms.py
+++ b/wally/start_vms.py
@@ -259,7 +259,7 @@
return vol
-def wait_for_server_active(nova, server, timeout=240):
+def wait_for_server_active(nova, server, timeout=300):
t = time.time()
while True:
time.sleep(1)
@@ -291,7 +291,7 @@
def launch_vms(params, already_has_count=0):
- logger.debug("Starting new nodes on openstack")
+ logger.debug("Calculating new vm count")
count = params['count']
nova = nova_connect()
lst = nova.services.list(binary='nova-compute')
@@ -305,8 +305,11 @@
count = int(count[1:]) - already_has_count
if count <= 0:
+ logger.debug("Not need new vms")
return
+ logger.debug("Starting new nodes on openstack")
+
assert isinstance(count, (int, long))
srv_params = "img: {image[name]}, flavor: {flavor[name]}".format(**params)
diff --git a/wally/statistic.py b/wally/statistic.py
index 8180619..74ce572 100644
--- a/wally/statistic.py
+++ b/wally/statistic.py
@@ -144,6 +144,7 @@
self.confidence = None
self.min = None
self.max = None
+ self.raw = None
def rounded_average_conf(self):
return round_deviation((self.average, self.confidence))
@@ -184,4 +185,5 @@
else:
res.confidence = res.deviation
+ res.raw = data[:]
return res
diff --git a/wally/suits/__init__.py b/wally/suits/__init__.py
index 7b6610e..c4e8854 100644
--- a/wally/suits/__init__.py
+++ b/wally/suits/__init__.py
@@ -1,3 +1,5 @@
-from .itest import TwoScriptTest, PgBenchTest, IOPerfTest
+from .io import IOPerfTest
+from .mysql import MysqlTest
+from .postgres import PgBenchTest
-__all__ = ["TwoScriptTest", "PgBenchTest", "IOPerfTest"]
+__all__ = ["MysqlTest", "PgBenchTest", "IOPerfTest"]
diff --git a/wally/suits/io/__init__.py b/wally/suits/io/__init__.py
index e69de29..4828850 100644
--- a/wally/suits/io/__init__.py
+++ b/wally/suits/io/__init__.py
@@ -0,0 +1,330 @@
+import time
+import json
+import os.path
+import logging
+import datetime
+
+from wally.utils import (ssize2b, open_for_append_or_create,
+ sec_to_str, StopTestError)
+
+from wally.ssh_utils import save_to_remote, read_from_remote, BGSSHTask
+
+from ..itest import IPerfTest, TestResults
+from .formatter import format_results_for_console
+from .fio_task_parser import (execution_time, fio_cfg_compile,
+ get_test_summary, FioJobSection)
+
+
+logger = logging.getLogger("wally")
+
+
+class IOTestResults(TestResults):
+ def summary(self):
+ return get_test_summary(self.config) + "vm" + str(self.vm_count)
+
+ def get_yamable(self):
+ return {
+ 'type': "fio_test",
+ 'params': self.params,
+ 'config': (self.config.name, self.config.vals),
+ 'results': self.results,
+ 'raw_result': self.raw_result,
+ 'run_interval': self.run_interval,
+ 'vm_count': self.vm_count
+ }
+
+ @classmethod
+ def from_yaml(cls, data):
+ name, vals = data['config']
+ sec = FioJobSection(name)
+ sec.vals = vals
+
+ return cls(sec, data['params'], data['results'],
+ data['raw_result'], data['run_interval'],
+ data['vm_count'])
+
+
+def get_slice_parts_offset(test_slice, real_inteval):
+ calc_exec_time = sum(map(execution_time, test_slice))
+ coef = (real_inteval[1] - real_inteval[0]) / calc_exec_time
+ curr_offset = real_inteval[0]
+ for section in test_slice:
+ slen = execution_time(section) * coef
+ yield (curr_offset, curr_offset + slen)
+ curr_offset += slen
+
+
+class IOPerfTest(IPerfTest):
+ tcp_conn_timeout = 30
+ max_pig_timeout = 5
+ soft_runcycle = 5 * 60
+
+ def __init__(self, *dt, **mp):
+ IPerfTest.__init__(self, *dt, **mp)
+ self.config_fname = self.options['cfg']
+
+ if '/' not in self.config_fname and '.' not in self.config_fname:
+ cfgs_dir = os.path.dirname(__file__)
+ self.config_fname = os.path.join(cfgs_dir,
+ self.config_fname + '.cfg')
+
+ self.alive_check_interval = self.options.get('alive_check_interval')
+
+ self.config_params = self.options.get('params', {}).copy()
+ self.tool = self.options.get('tool', 'fio')
+
+ raw_res = os.path.join(self.log_directory, "raw_results.txt")
+ self.fio_raw_results_file = open_for_append_or_create(raw_res)
+
+ self.io_py_remote = self.join_remote("agent.py")
+ self.results_file = self.join_remote("results.json")
+ self.pid_file = self.join_remote("pid")
+ self.task_file = self.join_remote("task.cfg")
+ self.use_sudo = self.options.get("use_sudo", True)
+ self.test_logging = self.options.get("test_logging", False)
+ self.raw_cfg = open(self.config_fname).read()
+ self.fio_configs = fio_cfg_compile(self.raw_cfg,
+ self.config_fname,
+ self.config_params,
+ split_on_names=self.test_logging)
+ self.fio_configs = list(self.fio_configs)
+
+ cmd_log = os.path.join(self.log_directory, "task_compiled.cfg")
+ fio_command_file = open_for_append_or_create(cmd_log)
+ splitter = "\n\n" + "-" * 60 + "\n\n"
+ fio_command_file.write(splitter.join(map(str, self.fio_configs)))
+
+ def __str__(self):
+ return "{0}({1})".format(self.__class__.__name__,
+ self.node.get_conn_id())
+
+ @classmethod
+ def load(cls, data):
+ return IOTestResults.from_yaml(data)
+
+ def cleanup(self):
+ # delete_file(conn, self.io_py_remote)
+ # Need to remove tempo files, used for testing
+ pass
+
+ def prefill_test_files(self):
+ files = {}
+ for cfg_slice in self.fio_configs:
+ for section in cfg_slice:
+ sz = ssize2b(section.vals['size'])
+ msz = sz / (1024 ** 2)
+
+ if sz % (1024 ** 2) != 0:
+ msz += 1
+
+ fname = section.vals['filename']
+
+ # if already has other test with the same file name
+ # take largest size
+ files[fname] = max(files.get(fname, 0), msz)
+
+ cmd_templ = "dd oflag=direct " + \
+ "if=/dev/zero of={0} bs={1} count={2}"
+
+ if self.use_sudo:
+ cmd_templ = "sudo " + cmd_templ
+
+ ssize = 0
+ stime = time.time()
+
+ for fname, curr_sz in files.items():
+ cmd = cmd_templ.format(fname, 1024 ** 2, curr_sz)
+ ssize += curr_sz
+ self.run_over_ssh(cmd, timeout=curr_sz)
+
+ ddtime = time.time() - stime
+ if ddtime > 1E-3:
+ fill_bw = int(ssize / ddtime)
+ mess = "Initiall dd fill bw is {0} MiBps for this vm"
+ logger.info(mess.format(fill_bw))
+ self.coordinate(('init_bw', fill_bw))
+
+ def install_utils(self, max_retry=3, timeout=5):
+ need_install = []
+ for bin_name, package in (('fio', 'fio'), ('screen', 'screen')):
+ try:
+ self.run_over_ssh('which ' + bin_name, nolog=True)
+ except OSError:
+ need_install.append(package)
+
+ if len(need_install) == 0:
+ return
+
+ cmd = "sudo apt-get -y install " + " ".join(need_install)
+
+ for i in range(max_retry):
+ try:
+ self.run_over_ssh(cmd)
+ break
+ except OSError as err:
+ time.sleep(timeout)
+ else:
+ raise OSError("Can't install - " + str(err))
+
+ def pre_run(self):
+ try:
+ cmd = 'mkdir -p "{0}"'.format(self.remote_dir)
+ if self.use_sudo:
+ cmd = "sudo " + cmd
+ cmd += " ; sudo chown {0} {1}".format(self.node.get_user(),
+ self.remote_dir)
+
+ self.run_over_ssh(cmd)
+ except Exception as exc:
+ msg = "Failed to create folder {0} on remote {1}. Error: {2!s}"
+ msg = msg.format(self.remote_dir, self.node.get_conn_id(), exc)
+ logger.exception(msg)
+ raise StopTestError(msg, exc)
+
+ self.install_utils()
+
+ if self.options.get('prefill_files', True):
+ self.prefill_test_files()
+ elif self.is_primary:
+ logger.warning("Prefilling of test files is disabled")
+
+ def run(self, barrier):
+ try:
+ if len(self.fio_configs) > 1 and self.is_primary:
+
+ exec_time = 0
+ for test_slice in self.fio_configs:
+ exec_time += sum(map(execution_time, test_slice))
+
+ # +10% - is a rough estimation for additional operations
+ # like sftp, etc
+ exec_time = int(exec_time * 1.1)
+
+ exec_time_s = sec_to_str(exec_time)
+ now_dt = datetime.datetime.now()
+ end_dt = now_dt + datetime.timedelta(0, exec_time)
+ msg = "Entire test should takes aroud: {0} and finished at {1}"
+ logger.info(msg.format(exec_time_s,
+ end_dt.strftime("%H:%M:%S")))
+
+ for pos, fio_cfg_slice in enumerate(self.fio_configs):
+ fio_cfg_slice = list(fio_cfg_slice)
+ names = [i.name for i in fio_cfg_slice]
+ msgs = []
+ already_processed = set()
+ for name in names:
+ if name not in already_processed:
+ already_processed.add(name)
+
+ if 1 == names.count(name):
+ msgs.append(name)
+ else:
+ frmt = "{0} * {1}"
+ msgs.append(frmt.format(name,
+ names.count(name)))
+
+ if self.is_primary:
+ logger.info("Will run tests: " + ", ".join(msgs))
+
+ nolog = (pos != 0) or not self.is_primary
+ out_err, interval = self.do_run(barrier, fio_cfg_slice,
+ nolog=nolog)
+
+ try:
+ full_raw_res = json.loads(out_err)
+
+ res = {"bw": [], "iops": [], "lat": [],
+ "clat": [], "slat": []}
+
+ for raw_result in full_raw_res['jobs']:
+ load_data = raw_result['mixed']
+
+ res["bw"].append(load_data["bw"])
+ res["iops"].append(load_data["iops"])
+ res["lat"].append(load_data["lat"]["mean"])
+ res["clat"].append(load_data["clat"]["mean"])
+ res["slat"].append(load_data["slat"]["mean"])
+
+ first = fio_cfg_slice[0]
+ p1 = first.vals.copy()
+ p1.pop('ramp_time', 0)
+
+ for nxt in fio_cfg_slice[1:]:
+ assert nxt.name == first.name
+ p2 = nxt.vals
+ p2.pop('_ramp_time', 0)
+
+ assert p1 == p2
+
+ tres = IOTestResults(first,
+ self.config_params, res,
+ full_raw_res, interval,
+ vm_count=self.total_nodes_count)
+ self.on_result_cb(tres)
+ except (OSError, StopTestError):
+ raise
+ except Exception as exc:
+ msg_templ = "Error during postprocessing results: {0!s}"
+ raise RuntimeError(msg_templ.format(exc))
+
+ finally:
+ barrier.exit()
+
+ def do_run(self, barrier, cfg_slice, nolog=False):
+ # return open("/tmp/lit-sunshine/io/results.json").read(), (1, 2)
+ conn_id = self.node.get_conn_id()
+
+ cmd_templ = "fio --output-format=json --output={1} " + \
+ "--alloc-size=262144 {0}"
+
+ if self.options.get("use_sudo", True):
+ cmd_templ = "sudo " + cmd_templ
+
+ task_fc = "\n\n".join(map(str, cfg_slice))
+ with self.node.connection.open_sftp() as sftp:
+ save_to_remote(sftp, self.task_file, task_fc)
+
+ cmd = cmd_templ.format(self.task_file, self.results_file)
+
+ exec_time = sum(map(execution_time, cfg_slice))
+ exec_time_str = sec_to_str(exec_time)
+
+ timeout = int(exec_time + max(300, exec_time))
+ soft_tout = exec_time
+ barrier.wait()
+
+ if self.is_primary:
+ templ = "Test should takes about {0}." + \
+ " Should finish at {1}," + \
+ " will wait at most till {2}"
+ now_dt = datetime.datetime.now()
+ end_dt = now_dt + datetime.timedelta(0, exec_time)
+ wait_till = now_dt + datetime.timedelta(0, timeout)
+
+ logger.info(templ.format(exec_time_str,
+ end_dt.strftime("%H:%M:%S"),
+ wait_till.strftime("%H:%M:%S")))
+
+ task = BGSSHTask(self.node, self.options.get("use_sudo", True))
+ begin = time.time()
+ task.start(cmd)
+ task.wait(soft_tout, timeout)
+ end = time.time()
+
+ if not nolog:
+ logger.debug("Test on node {0} is finished".format(conn_id))
+
+ with self.node.connection.open_sftp() as sftp:
+ return read_from_remote(sftp, self.results_file), (begin, end)
+
+ @classmethod
+ def merge_results(cls, results):
+ merged = results[0]
+ for block in results[1:]:
+ assert block["__meta__"] == merged["__meta__"]
+ merged['res'].extend(block['res'])
+ return merged
+
+ @classmethod
+ def format_for_console(cls, data, dinfo):
+ return format_results_for_console(dinfo)
diff --git a/wally/suits/io/agent.py b/wally/suits/io/agent.py
deleted file mode 100644
index 3c3e436..0000000
--- a/wally/suits/io/agent.py
+++ /dev/null
@@ -1,672 +0,0 @@
-import os
-import sys
-import time
-import json
-import copy
-import select
-import pprint
-import os.path
-import argparse
-import traceback
-import subprocess
-import itertools
-from collections import OrderedDict
-
-
-SECTION = 0
-SETTING = 1
-
-
-class FioJobSection(object):
- def __init__(self, name):
- self.name = name
- self.vals = OrderedDict()
- self.format_params = {}
-
- def copy(self):
- return copy.deepcopy(self)
-
-
-def to_bytes(sz):
- sz = sz.lower()
- try:
- return int(sz)
- except ValueError:
- if sz[-1] == 'm':
- return (1024 ** 2) * int(sz[:-1])
- if sz[-1] == 'k':
- return 1024 * int(sz[:-1])
- if sz[-1] == 'g':
- return (1024 ** 3) * int(sz[:-1])
- raise
-
-
-def fio_config_lexer(fio_cfg):
- for lineno, line in enumerate(fio_cfg.split("\n")):
- try:
- line = line.strip()
-
- if line.startswith("#") or line.startswith(";"):
- continue
-
- if line == "":
- continue
-
- if line.startswith('['):
- assert line.endswith(']'), "name should ends with ]"
- yield lineno, SECTION, line[1:-1], None
- elif '=' in line:
- opt_name, opt_val = line.split('=', 1)
- yield lineno, SETTING, opt_name.strip(), opt_val.strip()
- else:
- yield lineno, SETTING, line, '1'
- except Exception as exc:
- pref = "During parsing line number {0}\n{1!s}".format(lineno, exc)
- raise ValueError(pref)
-
-
-def fio_config_parse(lexer_iter, format_params):
- orig_format_params_keys = set(format_params)
- format_params = format_params.copy()
- in_defaults = False
- curr_section = None
- defaults = OrderedDict()
-
- for lineno, tp, name, val in lexer_iter:
- if tp == SECTION:
- if curr_section is not None:
- yield curr_section
-
- if name == 'defaults':
- in_defaults = True
- curr_section = None
- else:
- in_defaults = False
- curr_section = FioJobSection(name)
- curr_section.format_params = format_params.copy()
- curr_section.vals = defaults.copy()
- else:
- assert tp == SETTING
- if name == name.upper():
- msg = "Param not in default section in line " + str(lineno)
- assert in_defaults, msg
- if name not in orig_format_params_keys:
- # don't make parse_value for PARAMS
- # they would be parsed later
- # or this would breakes arrays
- format_params[name] = val
- elif in_defaults:
- defaults[name] = parse_value(val)
- else:
- msg = "data outside section, line " + str(lineno)
- assert curr_section is not None, msg
- curr_section.vals[name] = parse_value(val)
-
- if curr_section is not None:
- yield curr_section
-
-
-def parse_value(val):
- try:
- return int(val)
- except ValueError:
- pass
-
- try:
- return float(val)
- except ValueError:
- pass
-
- if val.startswith('{%'):
- assert val.endswith("%}")
- content = val[2:-2]
- vals = list(i.strip() for i in content.split(','))
- return map(parse_value, vals)
- return val
-
-
-def process_repeats(sec_iter):
-
- for sec in sec_iter:
- if '*' in sec.name:
- msg = "Only one '*' allowed in section name"
- assert sec.name.count('*') == 1, msg
-
- name, count = sec.name.split("*")
- sec.name = name.strip()
- count = count.strip()
-
- try:
- count = int(count.strip().format(**sec.format_params))
- except KeyError:
- raise ValueError("No parameter {0} given".format(count[1:-1]))
- except ValueError:
- msg = "Parameter {0} nas non-int value {1!r}"
- raise ValueError(msg.format(count[1:-1],
- count.format(**sec.format_params)))
-
- yield sec.copy()
-
- if 'ramp_time' in sec.vals:
- sec = sec.copy()
- sec.vals['_ramp_time'] = sec.vals.pop('ramp_time')
-
- for _ in range(count - 1):
- yield sec.copy()
- else:
- yield sec
-
-
-def process_cycles(sec_iter):
- # insert parametrized cycles
- sec_iter = try_format_params_into_section(sec_iter)
-
- for sec in sec_iter:
-
- cycles_var_names = []
- cycles_var_values = []
-
- for name, val in sec.vals.items():
- if isinstance(val, (list, tuple)):
- cycles_var_names.append(name)
- cycles_var_values.append(val)
-
- if len(cycles_var_names) == 0:
- yield sec
- else:
- for combination in itertools.product(*cycles_var_values):
- new_sec = sec.copy()
- new_sec.vals.update(zip(cycles_var_names, combination))
- yield new_sec
-
-
-def try_format_params_into_section(sec_iter):
- for sec in sec_iter:
- params = sec.format_params
- for name, val in sec.vals.items():
- if isinstance(val, basestring):
- try:
- sec.vals[name] = parse_value(val.format(**params))
- except:
- pass
-
- yield sec
-
-
-def format_params_into_section_finall(sec_iter, counter=[0]):
- group_report_err_msg = "Group reporting should be set if numjobs != 1"
-
- for sec in sec_iter:
-
- num_jobs = int(sec.vals.get('numjobs', '1'))
- if num_jobs != 1:
- assert 'group_reporting' in sec.vals, group_report_err_msg
-
- assert sec.vals.get('unified_rw_reporting', '1') in (1, '1')
- sec.vals['unified_rw_reporting'] = '1'
-
- params = sec.format_params.copy()
-
- fsize = to_bytes(sec.vals['size'])
- params['PER_TH_OFFSET'] = fsize // num_jobs
-
- for name, val in sec.vals.items():
- if isinstance(val, basestring):
- sec.vals[name] = parse_value(val.format(**params))
- else:
- assert isinstance(val, (int, float))
-
- params['UNIQ'] = 'UN{0}'.format(counter[0])
- params['COUNTER'] = str(counter[0])
- counter[0] += 1
- params['TEST_SUMM'] = get_test_summary(sec.vals,
- params.get('VM_COUNT', 1))
- params.update(sec.vals)
- sec.name = sec.name.format(**params)
-
- yield sec
-
-
-def fio_config_to_str(sec_iter):
- res = ""
-
- for pos, sec in enumerate(sec_iter):
- if pos != 0:
- res += "\n"
-
- res += "[{0}]\n".format(sec.name)
-
- for name, val in sec.vals.items():
- if name.startswith('_'):
- continue
- res += "{0}={1}\n".format(name, val)
-
- return res
-
-
-def get_test_sync_mode(config):
- try:
- return config['sync_mode']
- except KeyError:
- pass
-
- is_sync = str(config.get("sync", "0")) == "1"
- is_direct = str(config.get("direct", "0")) == "1"
-
- if is_sync and is_direct:
- return 'x'
- elif is_sync:
- return 's'
- elif is_direct:
- return 'd'
- else:
- return 'a'
-
-
-def get_test_summary(params, testnodes_count):
- rw = {"randread": "rr",
- "randwrite": "rw",
- "read": "sr",
- "write": "sw"}[params["rw"]]
-
- sync_mode = get_test_sync_mode(params)
- th_count = params.get('numjobs')
-
- if th_count is None:
- th_count = params.get('concurence', 1)
-
- return "{0}{1}{2}th{3}vm{4}".format(rw,
- sync_mode,
- params['blocksize'],
- th_count,
- testnodes_count)
-
-
-def calculate_execution_time(sec_iter):
- time = 0
- for sec in sec_iter:
- time += sec.vals.get('ramp_time', 0)
- time += sec.vals.get('runtime', 0)
- return time
-
-
-def slice_config(sec_iter, runcycle=None, max_jobs=1000,
- soft_runcycle=None, split_on_names=False):
- jcount = 0
- runtime = 0
- curr_slice = []
- prev_name = None
-
- for pos, sec in enumerate(sec_iter):
-
- if prev_name is not None:
- split_here = False
-
- if soft_runcycle is not None and prev_name != sec.name:
- split_here = (runtime > soft_runcycle)
-
- if split_on_names and prev_name != sec.name:
- split_here = True
-
- if split_here:
- yield curr_slice
- curr_slice = []
- runtime = 0
- jcount = 0
-
- prev_name = sec.name
-
- jc = sec.vals.get('numjobs', 1)
- msg = "numjobs should be integer, not {0!r}".format(jc)
- assert isinstance(jc, int), msg
-
- curr_task_time = calculate_execution_time([sec])
-
- if jc > max_jobs:
- err_templ = "Can't process job {0!r} - too large numjobs"
- raise ValueError(err_templ.format(sec.name))
-
- if runcycle is not None and len(curr_slice) != 0:
- rc_ok = curr_task_time + runtime <= runcycle
- else:
- rc_ok = True
-
- if jc + jcount <= max_jobs and rc_ok:
- runtime += curr_task_time
- jcount += jc
- curr_slice.append(sec)
- continue
-
- assert len(curr_slice) != 0
- yield curr_slice
-
- if '_ramp_time' in sec.vals:
- sec.vals['ramp_time'] = sec.vals.pop('_ramp_time')
- curr_task_time = calculate_execution_time([sec])
-
- runtime = curr_task_time
- jcount = jc
- curr_slice = [sec]
- prev_name = None
-
- if curr_slice != []:
- yield curr_slice
-
-
-def parse_all_in_1(source, test_params):
- lexer_it = fio_config_lexer(source)
- sec_it = fio_config_parse(lexer_it, test_params)
- sec_it = process_cycles(sec_it)
- sec_it = process_repeats(sec_it)
- return format_params_into_section_finall(sec_it)
-
-
-def parse_and_slice_all_in_1(source, test_params, **slice_params):
- sec_it = parse_all_in_1(source, test_params)
- return slice_config(sec_it, **slice_params)
-
-
-def compile_all_in_1(source, test_params, **slice_params):
- slices_it = parse_and_slice_all_in_1(source, test_params, **slice_params)
- for slices in slices_it:
- yield fio_config_to_str(slices)
-
-
-def do_run_fio(config_slice):
- benchmark_config = fio_config_to_str(config_slice)
- cmd = ["fio", "--output-format=json", "--alloc-size=262144", "-"]
- p = subprocess.Popen(cmd,
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
-
- start_time = time.time()
- # set timeout
- raw_out, raw_err = p.communicate(benchmark_config)
- end_time = time.time()
-
- if 0 != p.returncode:
- msg = "Fio failed with code: {0}\nOutput={1}"
- raise OSError(msg.format(p.returncode, raw_err))
-
- # HACK
- raw_out = "{" + raw_out.split('{', 1)[1]
-
- try:
- parsed_out = json.loads(raw_out)["jobs"]
- except KeyError:
- msg = "Can't parse fio output {0!r}: no 'jobs' found"
- raw_out = raw_out[:100]
- raise ValueError(msg.format(raw_out))
-
- except Exception as exc:
- msg = "Can't parse fio output: {0!r}\nError: {1!s}"
- raw_out = raw_out[:100]
- raise ValueError(msg.format(raw_out, exc))
-
- return zip(parsed_out, config_slice), (start_time, end_time)
-
-
-class FioResult(object):
- def __init__(self, name, params, run_interval, results):
- self.params = params.copy()
- self.name = name
- self.run_interval = run_interval
- self.results = results
-
- def json_obj(self):
- return self.__dict__
-
-
-def make_job_results(section, job_output, slice_timings):
- # merge by section.merge_id
-
- raw_result = job_output['mixed']
-
- res = {
- "bw": raw_result["bw"],
- "iops": raw_result["iops"],
- "lat": raw_result["lat"]["mean"],
- "clat": raw_result["clat"]["mean"],
- "slat": raw_result["slat"]["mean"]
- }
-
- vls = section.vals.copy()
-
- vls['sync_mode'] = get_test_sync_mode(vls)
- vls['concurence'] = vls.get('numjobs', 1)
-
- return FioResult(section.name, vls, slice_timings, res)
-
-
-def get_slice_parts_offset(test_slice, real_inteval):
- calc_exec_time = calculate_execution_time(test_slice)
- coef = (real_inteval[1] - real_inteval[0]) / calc_exec_time
- curr_offset = real_inteval[0]
- for section in test_slice:
- slen = calculate_execution_time([section]) * coef
- yield (curr_offset, curr_offset + slen)
- curr_offset += slen
-
-
-def run_fio(sliced_it, raw_results_func=None):
- sliced_list = list(sliced_it)
-
- curr_test_num = 0
- executed_tests = 0
- result = []
-
- for i, test_slice in enumerate(sliced_list):
- test_slice = list(test_slice)
-
- res_cfg_it, slice_timings = do_run_fio(test_slice)
- sec_intervals = get_slice_parts_offset(test_slice,
- slice_timings)
- res_cfg_it = enumerate(zip(res_cfg_it, sec_intervals),
- curr_test_num)
-
- section_names = []
- for curr_test_num, ((job_output, section), interval) in res_cfg_it:
- executed_tests += 1
- section_names.append(section.name)
-
- if raw_results_func is not None:
- raw_results_func(executed_tests,
- [job_output, section])
-
- msg = "{0} != {1}".format(section.name, job_output["jobname"])
- assert section.name == job_output["jobname"], msg
-
- result.append(make_job_results(section, job_output, interval))
-
- curr_test_num += 1
- msg_template = "Done {0} tests from {1}. ETA: {2}"
-
- rest = sliced_list[i:]
- time_eta = sum(map(calculate_execution_time, rest))
- test_left = sum(map(len, rest))
- print msg_template.format(curr_test_num,
- test_left,
- sec_to_str(time_eta))
-
- return result
-
-
-def run_benchmark(binary_tp, *argv, **kwargs):
- if 'fio' == binary_tp:
- return run_fio(*argv, **kwargs)
- raise ValueError("Unknown behcnmark {0}".format(binary_tp))
-
-
-def read_config(fd, timeout=10):
- job_cfg = ""
- etime = time.time() + timeout
- while True:
- wtime = etime - time.time()
- if wtime <= 0:
- raise IOError("No config provided")
-
- r, w, x = select.select([fd], [], [], wtime)
- if len(r) == 0:
- raise IOError("No config provided")
-
- char = fd.read(1)
- if '' == char:
- return job_cfg
-
- job_cfg += char
-
-
-def sec_to_str(seconds):
- h = seconds // 3600
- m = (seconds % 3600) // 60
- s = seconds % 60
- return "{0}:{1:02d}:{2:02d}".format(h, m, s)
-
-
-def parse_args(argv):
- parser = argparse.ArgumentParser(
- description="Run fio' and return result")
- parser.add_argument("--type", metavar="BINARY_TYPE",
- choices=['fio'], default='fio',
- help=argparse.SUPPRESS)
- parser.add_argument("--start-at", metavar="START_AT_UTC", type=int,
- help="Start execution at START_AT_UTC")
- parser.add_argument("--json", action="store_true", default=False,
- help="Json output format")
- parser.add_argument("-o", "--output", default='-', metavar="FILE_PATH",
- help="Store results to FILE_PATH")
- parser.add_argument("--estimate", action="store_true", default=False,
- help="Only estimate task execution time")
- parser.add_argument("--compile", action="store_true", default=False,
- help="Compile config file to fio config")
- parser.add_argument("--num-tests", action="store_true", default=False,
- help="Show total number of tests")
- parser.add_argument("--runcycle", type=int, default=None,
- metavar="MAX_CYCLE_SECONDS",
- help="Max cycle length in seconds")
- parser.add_argument("--show-raw-results", action='store_true',
- default=False, help="Output raw input and results")
- parser.add_argument("--params", nargs="*", metavar="PARAM=VAL",
- default=[],
- help="Provide set of pairs PARAM=VAL to" +
- "format into job description")
- parser.add_argument("-p", "--pid-file", metavar="FILE_TO_STORE_PID",
- default=None, help="Store pid to FILE_TO_STORE_PID " +
- "and remove this file on exit")
- parser.add_argument("jobfile")
- return parser.parse_args(argv)
-
-
-def main(argv):
- argv_obj = parse_args(argv)
-
- if argv_obj.jobfile == '-':
- job_cfg = read_config(sys.stdin)
- else:
- job_cfg = open(argv_obj.jobfile).read()
-
- if argv_obj.output == '-':
- out_fd = sys.stdout
- else:
- out_fd = open(argv_obj.output, "w")
-
- if argv_obj.pid_file is not None:
- with open(argv_obj.pid_file, "w") as fd:
- fd.write(str(os.getpid()))
-
- try:
- params = {}
- for param_val in argv_obj.params:
- assert '=' in param_val
- name, val = param_val.split("=", 1)
- params[name] = val
-
- slice_params = {
- 'runcycle': argv_obj.runcycle,
- }
-
- sliced_it = parse_and_slice_all_in_1(job_cfg, params, **slice_params)
-
- if argv_obj.estimate:
- it = map(calculate_execution_time, sliced_it)
- print sec_to_str(sum(it))
- return 0
-
- if argv_obj.num_tests or argv_obj.compile:
- if argv_obj.compile:
- for test_slice in sliced_it:
- out_fd.write(fio_config_to_str(test_slice))
- out_fd.write("\n#" + "-" * 70 + "\n\n")
-
- if argv_obj.num_tests:
- print len(list(sliced_it))
-
- return 0
-
- if argv_obj.start_at is not None:
- ctime = time.time()
- if argv_obj.start_at >= ctime:
- time.sleep(ctime - argv_obj.start_at)
-
- def raw_res_func(test_num, data):
- pref = "========= RAW_RESULTS({0}) =========\n".format(test_num)
- out_fd.write(pref)
- out_fd.write(json.dumps(data))
- out_fd.write("\n========= END OF RAW_RESULTS =========\n")
- out_fd.flush()
-
- rrfunc = raw_res_func if argv_obj.show_raw_results else None
-
- job_res = run_benchmark(argv_obj.type,
- sliced_it, rrfunc)
-
- res = {'__meta__': {'params': params,
- 'testnodes_count': int(params.get('VM_COUNT', 1))},
- 'res': [j.json_obj() for j in job_res]}
-
- oformat = 'json' if argv_obj.json else 'eval'
-
- msg = "========= RESULTS(format={0}) =========\n"
- out_fd.write(msg.format(oformat))
- if argv_obj.json:
- out_fd.write(json.dumps(res))
- else:
- out_fd.write(pprint.pformat(res) + "\n")
- out_fd.write("\n========= END OF RESULTS =========\n")
-
- return 0
- except:
- out_fd.write("============ ERROR =============\n")
- out_fd.write(traceback.format_exc() + "\n")
- out_fd.write("============ END OF ERROR =============\n")
- return 1
- finally:
- try:
- if out_fd is not sys.stdout:
- out_fd.flush()
- os.fsync(out_fd)
- out_fd.close()
- except Exception:
- traceback.print_exc()
-
- if argv_obj.pid_file is not None:
- if os.path.exists(argv_obj.pid_file):
- os.unlink(argv_obj.pid_file)
-
-
-def fake_main(x):
- import yaml
- time.sleep(60)
- out_fd = sys.stdout
- fname = "/tmp/perf_tests/metempirical_alisha/raw_results.yaml"
- res = yaml.load(open(fname).read())[0][1]
- out_fd.write("========= RESULTS(format=json) =========\n")
- out_fd.write(json.dumps(res))
- out_fd.write("\n========= END OF RESULTS =========\n")
- return 0
-
-
-if __name__ == '__main__':
- # exit(fake_main(sys.argv[1:]))
- exit(main(sys.argv[1:]))
diff --git a/wally/suits/io/ceph.cfg b/wally/suits/io/ceph.cfg
index f38b37c..26aa65f 100644
--- a/wally/suits/io/ceph.cfg
+++ b/wally/suits/io/ceph.cfg
@@ -1,28 +1,19 @@
-[defaults]
-wait_for_previous=1
-group_reporting=1
-time_based=1
-buffered=0
-iodepth=1
-softrandommap=1
-filename={FILENAME}
-NUM_ROUNDS=7
-thread=1
-
-# this is critical for correct results in multy-node run
-randrepeat=0
+[global]
+include defaults.cfg
NUMJOBS={% 1, 5, 10, 15, 40 %}
NUMJOBS_SHORT={% 1, 2, 3, 10 %}
+TEST_FILE_SIZE=100G
-size=100G
+size={TEST_FILE_SIZE}
ramp_time=15
runtime=60
+NUM_ROUNDS=7
# ---------------------------------------------------------------------
# check different thread count, sync mode. (latency, iops) = func(th_count)
# ---------------------------------------------------------------------
-[ceph_test_{TEST_SUMM} * {NUM_ROUNDS}]
+[ceph_test_{TEST_SUMM}]
blocksize=4k
rw=randwrite
sync=1
@@ -31,7 +22,7 @@
# ---------------------------------------------------------------------
# direct write
# ---------------------------------------------------------------------
-[ceph_test_{TEST_SUMM} * {NUM_ROUNDS}]
+[ceph_test_{TEST_SUMM}]
blocksize=4k
rw=randwrite
direct=1
@@ -41,7 +32,7 @@
# check different thread count, direct read mode. (latency, iops) = func(th_count)
# also check iops for randread
# ---------------------------------------------------------------------
-[ceph_test_{TEST_SUMM} * {NUM_ROUNDS}]
+[ceph_test_{TEST_SUMM}]
blocksize=4k
rw=randread
direct=1
@@ -51,7 +42,7 @@
# this is essentially sequential write/read operations
# we can't use sequential with numjobs > 1 due to caching and block merging
# ---------------------------------------------------------------------
-[ceph_test_{TEST_SUMM} * {NUM_ROUNDS}]
+[ceph_test_{TEST_SUMM}]
blocksize=16m
rw={% randread, randwrite %}
direct=1
diff --git a/wally/suits/io/check_distribution.cfg b/wally/suits/io/check_distribution.cfg
index e7cafd9..4746f37 100644
--- a/wally/suits/io/check_distribution.cfg
+++ b/wally/suits/io/check_distribution.cfg
@@ -1,19 +1,13 @@
-[defaults]
+[global]
+include defaults.cfg
NUM_ROUNDS=301
-# this is critical for correct results in multy-node run
-randrepeat=0
-
-[distrubution_test_{TEST_SUMM} * {NUM_ROUNDS}]
+[distrubution_test_{TEST_SUMM}]
blocksize=4k
rw=randwrite
direct=1
+
ramp_time=5
-buffered=0
-wait_for_previous
-filename={FILENAME}
-iodepth=1
-size=10G
-time_based
runtime=30
-group_reporting
+
+size=10G
diff --git a/wally/suits/io/check_linearity.cfg b/wally/suits/io/check_linearity.cfg
index 670e8b3..f7c37fb 100644
--- a/wally/suits/io/check_linearity.cfg
+++ b/wally/suits/io/check_linearity.cfg
@@ -1,33 +1,26 @@
-[defaults]
+[global]
+
+include defaults.cfg
NUM_ROUNDS=7
-# this is critical for correct results in multy-node run
-randrepeat=0
-
-ramp_time=5
-buffered=0
-wait_for_previous
-filename={FILENAME}
-iodepth=1
-size=10G
-time_based
+size={TEST_FILE_SIZE}
ramp_time=5
runtime=30
# ---------------------------------------------------------------------
# check read and write linearity. oper_time = func(size)
# ---------------------------------------------------------------------
-[linearity_test_{TEST_SUMM} * {NUM_ROUNDS}]
-blocksize={% 512, 1k, 4k, 8k, 16k, 32k, 64k, 128k, 256k, 512k, 1m, 2m, 4m %}
-rw={% randwrite, randread %}
-direct=1
+# [linearity_test_{TEST_SUMM}]
+# blocksize={BLOCK_SIZES}
+# rw={% randwrite, randread %}
+# direct=1
# ---------------------------------------------------------------------
# check sync write linearity. oper_time = func(size)
# check sync BW as well
# ---------------------------------------------------------------------
-[linearity_test_{TEST_SUMM} * {NUM_ROUNDS}]
-blocksize={% 512, 1k, 4k, 8k, 16k, 32k, 64k, 128k, 256k, 512k, 1m, 2m, 4m %}
+[linearity_test_{TEST_SUMM}]
+blocksize={BLOCK_SIZES}
rw=randwrite
sync=1
diff --git a/wally/suits/io/defaults.cfg b/wally/suits/io/defaults.cfg
new file mode 100644
index 0000000..51a8145
--- /dev/null
+++ b/wally/suits/io/defaults.cfg
@@ -0,0 +1,14 @@
+buffered=0
+group_reporting=1
+iodepth=1
+softrandommap=1
+thread=1
+time_based=1
+wait_for_previous=1
+
+# this is critical for correct results in multy-node run
+randrepeat=0
+
+filename={FILENAME}
+
+
diff --git a/wally/suits/io/fio_task_parser.py b/wally/suits/io/fio_task_parser.py
new file mode 100644
index 0000000..52c4bb3
--- /dev/null
+++ b/wally/suits/io/fio_task_parser.py
@@ -0,0 +1,458 @@
+import os
+import sys
+import copy
+import os.path
+import argparse
+import itertools
+from collections import OrderedDict, namedtuple
+
+
+from wally.utils import sec_to_str
+
+
+SECTION = 0
+SETTING = 1
+INCLUDE = 2
+
+
+Var = namedtuple('Var', ('name',))
+CfgLine = namedtuple('CfgLine', ('fname', 'lineno', 'oline',
+ 'tp', 'name', 'val'))
+
+
+class FioJobSection(object):
+ def __init__(self, name):
+ self.name = name
+ self.vals = OrderedDict()
+
+ def copy(self):
+ return copy.deepcopy(self)
+
+ def required_vars(self):
+ for name, val in self.vals.items():
+ if isinstance(val, Var):
+ yield name, val
+
+ def is_free(self):
+ return len(list(self.required_vars())) == 0
+
+ def __str__(self):
+ res = "[{0}]\n".format(self.name)
+
+ for name, val in self.vals.items():
+ if name.startswith('_') or name == name.upper():
+ continue
+ if isinstance(val, Var):
+ res += "{0}={{{1}}}\n".format(name, val.name)
+ else:
+ res += "{0}={1}\n".format(name, val)
+
+ return res
+
+
+def to_bytes(sz):
+ sz = sz.lower()
+ try:
+ return int(sz)
+ except ValueError:
+ if sz[-1] == 'm':
+ return (1024 ** 2) * int(sz[:-1])
+ if sz[-1] == 'k':
+ return 1024 * int(sz[:-1])
+ if sz[-1] == 'g':
+ return (1024 ** 3) * int(sz[:-1])
+ raise
+
+
+class ParseError(ValueError):
+ def __init__(self, msg, fname, lineno, line_cont=""):
+ ValueError.__init__(self, msg)
+ self.file_name = fname
+ self.lineno = lineno
+ self.line_cont = line_cont
+
+ def __str__(self):
+ msg = "In {0}:{1} ({2}) : {3}"
+ return msg.format(self.file_name,
+ self.lineno,
+ self.line_cont,
+ super(ParseError, self).__str__())
+
+
+def is_name(name):
+ if len(name) == 0:
+ return False
+
+ if name[0] != '_' and not name[0].isalpha():
+ return False
+
+ for ch in name[1:]:
+ if name[0] != '_' and not name[0].isalnum():
+ return False
+
+ return True
+
+
+def parse_value(val):
+ try:
+ return int(val)
+ except ValueError:
+ pass
+
+ try:
+ return float(val)
+ except ValueError:
+ pass
+
+ if val.startswith('{%'):
+ assert val.endswith("%}")
+ content = val[2:-2]
+ vals = list(i.strip() for i in content.split(','))
+ return map(parse_value, vals)
+
+ if val.startswith('{'):
+ assert val.endswith("}")
+ assert is_name(val[1:-1])
+ return Var(val[1:-1])
+ return val
+
+
+def fio_config_lexer(fio_cfg, fname):
+ for lineno, oline in enumerate(fio_cfg.split("\n")):
+ try:
+ line = oline.strip()
+
+ if line.startswith("#") or line.startswith(";"):
+ continue
+
+ if line == "":
+ continue
+
+ if '#' in line:
+ raise ParseError("# isn't allowed inside line",
+ fname, lineno, oline)
+
+ if line.startswith('['):
+ yield CfgLine(fname, lineno, oline, SECTION,
+ line[1:-1].strip(), None)
+ elif '=' in line:
+ opt_name, opt_val = line.split('=', 1)
+ yield CfgLine(fname, lineno, oline, SETTING,
+ opt_name.strip(),
+ parse_value(opt_val.strip()))
+ elif line.startswith("include "):
+ yield CfgLine(fname, lineno, oline, INCLUDE,
+ line.split(" ", 1)[1], None)
+ else:
+ yield CfgLine(fname, lineno, oline, SETTING, line, '1')
+
+ except Exception as exc:
+ raise ParseError(str(exc), fname, lineno, oline)
+
+
+def fio_config_parse(lexer_iter):
+ in_globals = False
+ curr_section = None
+ glob_vals = OrderedDict()
+ sections_count = 0
+
+ lexed_lines = list(lexer_iter)
+ one_more = True
+ includes = {}
+
+ while one_more:
+ new_lines = []
+ one_more = False
+ for line in lexed_lines:
+ fname, lineno, oline, tp, name, val = line
+
+ if INCLUDE == tp:
+ if not os.path.exists(fname):
+ dirname = '.'
+ else:
+ dirname = os.path.dirname(fname)
+
+ new_fname = os.path.join(dirname, name)
+ includes[new_fname] = (fname, lineno)
+
+ try:
+ cont = open(new_fname).read()
+ except IOError as err:
+ msg = "Error while including file {0}: {1}"
+ raise ParseError(msg.format(new_fname, err),
+ fname, lineno, oline)
+
+ new_lines.extend(fio_config_lexer(cont, new_fname))
+ one_more = True
+ else:
+ new_lines.append(line)
+
+ lexed_lines = new_lines
+
+ for fname, lineno, oline, tp, name, val in lexed_lines:
+ if tp == SECTION:
+ if curr_section is not None:
+ yield curr_section
+ curr_section = None
+
+ if name == 'global':
+ if sections_count != 0:
+ raise ParseError("[global] section should" +
+ " be only one and first",
+ fname, lineno, oline)
+ in_globals = True
+ else:
+ in_globals = False
+ curr_section = FioJobSection(name)
+ curr_section.vals = glob_vals.copy()
+ sections_count += 1
+ else:
+ assert tp == SETTING
+ if in_globals:
+ glob_vals[name] = val
+ elif name == name.upper():
+ raise ParseError("Param '" + name +
+ "' not in [global] section",
+ fname, lineno, oline)
+ elif curr_section is None:
+ raise ParseError("Data outside section",
+ fname, lineno, oline)
+ else:
+ curr_section.vals[name] = val
+
+ if curr_section is not None:
+ yield curr_section
+
+
+def process_repeats(sec):
+ sec = sec.copy()
+ count = sec.vals.pop('NUM_ROUNDS', 1)
+ assert isinstance(count, (int, long))
+
+ for _ in range(count):
+ yield sec.copy()
+
+ if 'ramp_time' in sec.vals:
+ sec.vals['_ramp_time'] = sec.vals.pop('ramp_time')
+
+
+def process_cycles(sec):
+ cycles = OrderedDict()
+
+ for name, val in sec.vals.items():
+ if isinstance(val, list) and name.upper() != name:
+ cycles[name] = val
+
+ if len(cycles) == 0:
+ yield sec
+ else:
+ for combination in itertools.product(*cycles.values()):
+ new_sec = sec.copy()
+ new_sec.vals.update(zip(cycles.keys(), combination))
+ yield new_sec
+
+
+def apply_params(sec, params):
+ processed_vals = OrderedDict()
+ processed_vals.update(params)
+ for name, val in sec.vals.items():
+ if name in params:
+ continue
+
+ if isinstance(val, Var):
+ if val.name in params:
+ val = params[val.name]
+ elif val.name in processed_vals:
+ val = processed_vals[val.name]
+ processed_vals[name] = val
+ sec = sec.copy()
+ sec.vals = processed_vals
+ return sec
+
+
+def finall_process(sec, counter=[0]):
+ sec = sec.copy()
+
+ if sec.vals.get('numjobs', '1') != 1:
+ msg = "Group reporting should be set if numjobs != 1"
+ assert 'group_reporting' in sec.vals, msg
+
+ sec.vals['unified_rw_reporting'] = '1'
+
+ params = sec.vals.copy()
+ params['UNIQ'] = 'UN{0}'.format(counter[0])
+ params['COUNTER'] = str(counter[0])
+ params['TEST_SUMM'] = get_test_summary(sec)
+ sec.name = sec.name.format(**params)
+ counter[0] += 1
+
+ return sec
+
+
+def get_test_sync_mode(sec):
+ is_sync = str(sec.vals.get("sync", "0")) == "1"
+ is_direct = str(sec.vals.get("direct", "0")) == "1"
+
+ if is_sync and is_direct:
+ return 'x'
+ elif is_sync:
+ return 's'
+ elif is_direct:
+ return 'd'
+ else:
+ return 'a'
+
+
+def get_test_summary(sec):
+ rw = {"randread": "rr",
+ "randwrite": "rw",
+ "read": "sr",
+ "write": "sw"}[sec.vals["rw"]]
+
+ sync_mode = get_test_sync_mode(sec)
+ th_count = sec.vals.get('numjobs')
+
+ if th_count is None:
+ th_count = sec.vals.get('concurence', 1)
+
+ return "{0}{1}{2}th{3}".format(rw,
+ sync_mode,
+ sec.vals['blocksize'],
+ th_count)
+
+
+def execution_time(sec):
+ return sec.vals.get('ramp_time', 0) + sec.vals.get('runtime', 0)
+
+
+def slice_config(sec_iter, runcycle=None, max_jobs=1000, split_on_names=False):
+ jcount = 0
+ runtime = 0
+ curr_slice = []
+ prev_name = None
+
+ for pos, sec in enumerate(sec_iter):
+
+ if prev_name is not None:
+ split_here = False
+
+ if split_on_names and prev_name != sec.name:
+ split_here = True
+
+ if split_here:
+ yield curr_slice
+ curr_slice = []
+ runtime = 0
+ jcount = 0
+
+ prev_name = sec.name
+
+ jc = sec.vals.get('numjobs', 1)
+ msg = "numjobs should be integer, not {0!r}".format(jc)
+ assert isinstance(jc, int), msg
+
+ curr_task_time = execution_time(sec)
+
+ if jc > max_jobs:
+ err_templ = "Can't process job {0!r} - too large numjobs"
+ raise ValueError(err_templ.format(sec.name))
+
+ if runcycle is not None and len(curr_slice) != 0:
+ rc_ok = curr_task_time + runtime <= runcycle
+ else:
+ rc_ok = True
+
+ if jc + jcount <= max_jobs and rc_ok:
+ runtime += curr_task_time
+ jcount += jc
+ curr_slice.append(sec)
+ continue
+
+ assert len(curr_slice) != 0
+ yield curr_slice
+
+ if '_ramp_time' in sec.vals:
+ sec.vals['ramp_time'] = sec.vals.pop('_ramp_time')
+ curr_task_time = execution_time(sec)
+
+ runtime = curr_task_time
+ jcount = jc
+ curr_slice = [sec]
+ prev_name = None
+
+ if curr_slice != []:
+ yield curr_slice
+
+
+def parse_all_in_1(source, fname=None):
+ return fio_config_parse(fio_config_lexer(source, fname))
+
+
+def flatmap(func, inp_iter):
+ for val in inp_iter:
+ for res in func(val):
+ yield res
+
+
+def fio_cfg_compile(source, fname, test_params, **slice_params):
+ it = parse_all_in_1(source, fname)
+ it = (apply_params(sec, test_params) for sec in it)
+ it = flatmap(process_cycles, it)
+ it = flatmap(process_repeats, it)
+ it = itertools.imap(finall_process, it)
+ return slice_config(it, **slice_params)
+
+
+def parse_args(argv):
+ parser = argparse.ArgumentParser(
+ description="Run fio' and return result")
+ parser.add_argument("--runcycle", type=int, default=None,
+ metavar="MAX_CYCLE_SECONDS",
+ help="Max cycle length in seconds")
+ parser.add_argument("-p", "--params", nargs="*", metavar="PARAM=VAL",
+ default=[],
+ help="Provide set of pairs PARAM=VAL to" +
+ "format into job description")
+ parser.add_argument("action", choices=['estimate', 'compile', 'num_tests'])
+ parser.add_argument("jobfile")
+ return parser.parse_args(argv)
+
+
+def main(argv):
+ argv_obj = parse_args(argv)
+
+ if argv_obj.jobfile == '-':
+ job_cfg = sys.stdin.read()
+ else:
+ job_cfg = open(argv_obj.jobfile).read()
+
+ params = {}
+ for param_val in argv_obj.params:
+ assert '=' in param_val
+ name, val = param_val.split("=", 1)
+ params[name] = parse_value(val)
+
+ slice_params = {
+ 'runcycle': argv_obj.runcycle,
+ }
+
+ sliced_it = fio_cfg_compile(job_cfg, argv_obj.jobfile,
+ params, **slice_params)
+
+ if argv_obj.action == 'estimate':
+ sum_time = 0
+ for cfg_slice in sliced_it:
+ sum_time += sum(map(execution_time, cfg_slice))
+ print sec_to_str(sum_time)
+ elif argv_obj.action == 'num_tests':
+ print sum(map(len, map(list, sliced_it)))
+ elif argv_obj.action == 'compile':
+ splitter = "\n#" + "-" * 70 + "\n\n"
+ for cfg_slice in sliced_it:
+ print splitter.join(map(str, cfg_slice))
+
+ return 0
+
+
+if __name__ == '__main__':
+ exit(main(sys.argv[1:]))
diff --git a/wally/suits/io/formatter.py b/wally/suits/io/formatter.py
index 22c090f..84b0a13 100644
--- a/wally/suits/io/formatter.py
+++ b/wally/suits/io/formatter.py
@@ -2,15 +2,21 @@
from wally.utils import ssize2b
from wally.statistic import round_3_digit
-from wally.suits.io.agent import get_test_summary
+from .fio_task_parser import get_test_summary, get_test_sync_mode
def key_func(data):
- p = data.params
+ p = data.params.vals
+
+ th_count = data.params.vals.get('numjobs')
+
+ if th_count is None:
+ th_count = data.params.vals.get('concurence', 1)
+
return (p['rw'],
- p['sync_mode'],
+ get_test_sync_mode(data.params),
ssize2b(p['blocksize']),
- int(p['concurence']) * data.testnodes_count,
+ int(th_count) * data.testnodes_count,
data.name)
@@ -41,8 +47,7 @@
prev_k = curr_k
- descr = get_test_summary(data.params, data.testnodes_count)
- test_dinfo = dinfo[data.name]
+ test_dinfo = dinfo[(data.name, data.summary)]
iops, _ = test_dinfo.iops.rounded_average_conf()
@@ -61,7 +66,7 @@
bw = round_3_digit(bw)
params = (data.name.rsplit('_', 1)[0],
- descr, int(iops), int(bw), str(conf_perc),
+ data.summary, int(iops), int(bw), str(conf_perc),
str(dev_perc),
int(iops_per_vm), int(bw_per_vm), lat)
tab.add_row(params)
diff --git a/wally/suits/io/hdd.cfg b/wally/suits/io/hdd.cfg
index 17d0509..21166e5 100644
--- a/wally/suits/io/hdd.cfg
+++ b/wally/suits/io/hdd.cfg
@@ -1,16 +1,11 @@
-[defaults]
-wait_for_previous
-group_reporting
-time_based
-buffered=0
-iodepth=1
-softrandommap=1
-filename={FILENAME}
-NUM_ROUNDS=7
-NUMJOBS={% 1, 5, 10, 15, 20, 30, 40, 80 %}
+[global]
+include defaults.cfg
-# this is critical for correct results in multy-node run
-randrepeat=0
+NUM_ROUNDS=3
+
+# NUMJOBS={% 1, 5, 10, 15, 20, 30, 40, 80 %}
+
+NUMJOBS={% 1, 5, 10 %}
size=10G
ramp_time=5
@@ -19,7 +14,7 @@
# ---------------------------------------------------------------------
# check different thread count, sync mode. (latency, iops) = func(th_count)
# ---------------------------------------------------------------------
-[hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
+[hdd_test_{TEST_SUMM}]
blocksize=4k
rw=randwrite
sync=1
@@ -29,7 +24,7 @@
# check different thread count, direct read mode. (latency, iops) = func(th_count)
# also check iops for randread
# ---------------------------------------------------------------------
-[hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
+[hdd_test_{TEST_SUMM}]
blocksize=4k
rw=randread
direct=1
@@ -38,7 +33,7 @@
# ---------------------------------------------------------------------
# check IOPS randwrite.
# ---------------------------------------------------------------------
-[hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
+[hdd_test_{TEST_SUMM}]
blocksize=4k
rw=randwrite
direct=1
@@ -47,7 +42,7 @@
# No reason for th count > 1 in case of sequantial operations
# They became random
# ---------------------------------------------------------------------
-[hdd_test_{TEST_SUMM} * {NUM_ROUNDS}]
+[hdd_test_{TEST_SUMM}]
blocksize=1m
rw={% read, write %}
direct=1
diff --git a/wally/suits/io/lat_vs_iops.cfg b/wally/suits/io/lat_vs_iops.cfg
index a587a96..dbafcbb 100644
--- a/wally/suits/io/lat_vs_iops.cfg
+++ b/wally/suits/io/lat_vs_iops.cfg
@@ -1,29 +1,40 @@
-[defaults]
-wait_for_previous=1
-filename={FILENAME}
+[global]
+include defaults.cfg
-# this is critical for correct results in multy-node run
-randrepeat=0
-
-iodepth=1
-size=100G
-group_reporting=1
-
-IOPS_LIMIT={% 100, 500 %}
+TEST_FILE_SIZE=100G
+size={TEST_FILE_SIZE}
ramp_time=5
runtime=30
-time_based=1
-buffered=0
-NUMJOBS=1
+blocksize=4k
+rw=randwrite
+sync=1
# ---------------------------------------------------------------------
# latency as function from IOPS
# ---------------------------------------------------------------------
-[lat_vs_iops{rate_iops}_{TEST_SUMM} * {NUM_ROUNDS}]
-blocksize=4k
-rw=randread
-direct=1
-numjobs={NUMJOBS}
-rate_iops={IOPS_LIMIT}
+[lat_vs_iops{rate_iops}_{TEST_SUMM}]
+numjobs=1
+rate_iops={% 20, 40, 60, 80, 100, 120, 160, 200, 250, 300 %}
+
+# ---------------------------------------------------------------------
+# latency as function from IOPS
+# ---------------------------------------------------------------------
+[lat_vs_iops{rate_iops}_{TEST_SUMM}]
+numjobs=3
+rate_iops={% 10, 20, 40, 60, 80, 100, 120, 160 %}
+
+# ---------------------------------------------------------------------
+# latency as function from IOPS
+# ---------------------------------------------------------------------
+[lat_vs_iops{rate_iops}_{TEST_SUMM}]
+numjobs=7
+rate_iops={% 5, 10, 20, 40, 50, 60, 70 %}
+
+# ---------------------------------------------------------------------
+# latency as function from IOPS
+# ---------------------------------------------------------------------
+[lat_vs_iops{rate_iops}_{TEST_SUMM}]
+numjobs=10
+rate_iops={% 5, 10, 20, 40, 50 %}
diff --git a/wally/suits/io/results_loader.py b/wally/suits/io/results_loader.py
deleted file mode 100644
index 988fe0e..0000000
--- a/wally/suits/io/results_loader.py
+++ /dev/null
@@ -1,56 +0,0 @@
-import re
-import json
-
-
-def parse_output(out_err):
- err_start_patt = r"(?ims)=+\s+ERROR\s+=+"
- err_end_patt = r"(?ims)=+\s+END OF ERROR\s+=+"
-
- for block in re.split(err_start_patt, out_err)[1:]:
- tb, garbage = re.split(err_end_patt, block)
- msg = "Test fails with error:\n" + tb.strip() + "\n"
- raise OSError(msg)
-
- start_patt = r"(?ims)=+\s+RESULTS\(format=json\)\s+=+"
- end_patt = r"(?ims)=+\s+END OF RESULTS\s+=+"
-
- for block in re.split(start_patt, out_err)[1:]:
- data, garbage = re.split(end_patt, block)
- yield json.loads(data.strip())
-
- start_patt = r"(?ims)=+\s+RESULTS\(format=eval\)\s+=+"
- end_patt = r"(?ims)=+\s+END OF RESULTS\s+=+"
-
- for block in re.split(start_patt, out_err)[1:]:
- data, garbage = re.split(end_patt, block)
- yield eval(data.strip())
-
-
-def filter_data(name_prefix, fields_to_select, **filters):
- def closure(data):
- for result in data:
- if name_prefix is not None:
- if not result['jobname'].startswith(name_prefix):
- continue
-
- for k, v in filters.items():
- if result.get(k) != v:
- break
- else:
- yield map(result.get, fields_to_select)
- return closure
-
-
-def filter_processed_data(name_prefix, fields_to_select, **filters):
- def closure(data):
- for name, result in data.items():
- if name_prefix is not None:
- if not name.startswith(name_prefix):
- continue
-
- for k, v in filters.items():
- if result.raw.get(k) != v:
- break
- else:
- yield map(result.raw.get, fields_to_select)
- return closure
diff --git a/wally/suits/io/verify.cfg b/wally/suits/io/verify.cfg
index 58b8450..9ebfad1 100644
--- a/wally/suits/io/verify.cfg
+++ b/wally/suits/io/verify.cfg
@@ -1,28 +1,18 @@
-[defaults]
-wait_for_previous
-group_reporting
-time_based
-buffered=0
-iodepth=1
-softrandommap=1
-filename={FILENAME}
-NUM_ROUNDS=1
-
-# this is critical for correct results in multy-node run
-randrepeat=0
-
+[global]
+include defaults.cfg
size=50G
ramp_time=5
runtime=60
+NUM_ROUNDS=2
# ---------------------------------------------------------------------
-[verify_{TEST_SUMM} * {NUM_ROUNDS}]
-blocksize=4k
-rw=randwrite
-direct=1
+#[verify_{TEST_SUMM}]
+#blocksize=4k
+#rw=randwrite
+#direct=1
# ---------------------------------------------------------------------
-[verify_{TEST_SUMM} * {NUM_ROUNDS}]
+[verify_{TEST_SUMM}]
blocksize=4k
rw=randread
direct=1
diff --git a/wally/suits/itest.py b/wally/suits/itest.py
index 36d3fcf..09e93f0 100644
--- a/wally/suits/itest.py
+++ b/wally/suits/itest.py
@@ -1,35 +1,47 @@
import abc
-import time
-import socket
-import random
import os.path
-import logging
-import datetime
-
-from paramiko import SSHException, SFTPError
-import texttable
-
-from wally.utils import (ssize2b, open_for_append_or_create,
- sec_to_str, StopTestError)
-
-from wally.ssh_utils import (copy_paths, run_over_ssh,
- save_to_remote,
- # delete_file,
- connect, read_from_remote, Local,
- exists)
-
-from . import postgres
-from . import mysql
-from .io import agent as io_agent
-from .io import formatter as io_formatter
-from .io.results_loader import parse_output
-logger = logging.getLogger("wally")
+from wally.ssh_utils import run_over_ssh, copy_paths
+
+
+class TestResults(object):
+ def __init__(self, config, params, results,
+ raw_result, run_interval, vm_count):
+ self.config = config
+ self.params = params
+ self.results = results
+ self.raw_result = raw_result
+ self.run_interval = run_interval
+ self.vm_count = vm_count
+
+ def __str__(self):
+ res = "{0}({1}):\n results:\n".format(
+ self.__class__.__name__,
+ self.summary())
+
+ for name, val in self.results.items():
+ res += " {0}={1}\n".format(name, val)
+
+ res += " params:\n"
+
+ for name, val in self.params.items():
+ res += " {0}={1}\n".format(name, val)
+
+ return res
+
+ @abc.abstractmethod
+ def summary(self):
+ pass
+
+ @abc.abstractmethod
+ def get_yamable(self):
+ pass
class IPerfTest(object):
def __init__(self, options, is_primary, on_result_cb, test_uuid, node,
+ total_nodes_count,
log_directory=None,
coordination_queue=None,
remote_dir="/tmp/wally"):
@@ -42,6 +54,7 @@
self.remote_dir = remote_dir
self.is_primary = is_primary
self.stop_requested = False
+ self.total_nodes_count = total_nodes_count
def request_stop(self):
self.stop_requested = True
@@ -59,6 +72,11 @@
def cleanup(self):
pass
+ @classmethod
+ @abc.abstractmethod
+ def load(cls, data):
+ pass
+
@abc.abstractmethod
def run(self, barrier):
pass
@@ -118,470 +136,3 @@
def merge_results(self, results):
tpcm = sum([val[1] for val in results])
return {"res": {"TpmC": tpcm}}
-
-
-class PgBenchTest(TwoScriptTest):
- root = os.path.dirname(postgres.__file__)
- pre_run_script = os.path.join(root, "prepare.sh")
- run_script = os.path.join(root, "run.sh")
-
- @classmethod
- def format_for_console(cls, data):
- tab = texttable.Texttable(max_width=120)
- tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
- tab.header(["TpmC"])
- tab.add_row([data['res']['TpmC']])
- return tab.draw()
-
-
-class MysqlTest(TwoScriptTest):
- root = os.path.dirname(mysql.__file__)
- pre_run_script = os.path.join(root, "prepare.sh")
- run_script = os.path.join(root, "run.sh")
-
- @classmethod
- def format_for_console(cls, data):
- tab = texttable.Texttable(max_width=120)
- tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
- tab.header(["TpmC"])
- tab.add_row([data['res']['TpmC']])
- return tab.draw()
-
-
-class IOPerfTest(IPerfTest):
- tcp_conn_timeout = 30
- max_pig_timeout = 5
- soft_runcycle = 5 * 60
-
- def __init__(self, *dt, **mp):
- IPerfTest.__init__(self, *dt, **mp)
- self.config_fname = self.options['cfg']
-
- if '/' not in self.config_fname and '.' not in self.config_fname:
- cfgs_dir = os.path.dirname(io_agent.__file__)
- self.config_fname = os.path.join(cfgs_dir,
- self.config_fname + '.cfg')
-
- self.alive_check_interval = self.options.get('alive_check_interval')
-
- self.config_params = {}
- for name, val in self.options.get('params', {}).items():
- if isinstance(val, (list, tuple)):
- val = "{%" + ','.join(map(str, val)) + "%}"
- self.config_params[name] = val
-
- self.config_params['VM_COUNT'] = self.options['testnodes_count']
- self.tool = self.options.get('tool', 'fio')
- self.raw_cfg = open(self.config_fname).read()
- self.configs = list(io_agent.parse_all_in_1(self.raw_cfg,
- self.config_params))
-
- cmd_log = os.path.join(self.log_directory, "task_compiled.cfg")
- raw_res = os.path.join(self.log_directory, "raw_results.txt")
-
- self.io_py_remote = self.join_remote("agent.py")
- self.log_fl = self.join_remote("log.txt")
- self.pid_file = self.join_remote("pid")
- self.task_file = self.join_remote("task.cfg")
- self.use_sudo = self.options.get("use_sudo", True)
- self.test_logging = self.options.get("test_logging", False)
-
- fio_command_file = open_for_append_or_create(cmd_log)
-
- if self.test_logging:
- soft_runcycle = self.soft_runcycle
- else:
- soft_runcycle = None
-
- self.fio_configs = io_agent.parse_and_slice_all_in_1(
- self.raw_cfg,
- self.config_params,
- soft_runcycle=soft_runcycle,
- split_on_names=self.test_logging)
-
- self.fio_configs = list(self.fio_configs)
- splitter = "\n\n" + "-" * 60 + "\n\n"
-
- cfg = splitter.join(
- map(io_agent.fio_config_to_str,
- self.fio_configs))
-
- fio_command_file.write(cfg)
- self.fio_raw_results_file = open_for_append_or_create(raw_res)
-
- def __str__(self):
- return "{0}({1})".format(self.__class__.__name__,
- self.node.get_conn_id())
-
- def cleanup(self):
- # delete_file(conn, self.io_py_remote)
- # Need to remove tempo files, used for testing
- pass
-
- def prefill_test_files(self):
- files = {}
-
- for section in self.configs:
- sz = ssize2b(section.vals['size'])
- msz = sz / (1024 ** 2)
-
- if sz % (1024 ** 2) != 0:
- msz += 1
-
- fname = section.vals['filename']
-
- # if already has other test with the same file name
- # take largest size
- files[fname] = max(files.get(fname, 0), msz)
-
- cmd_templ = "dd oflag=direct " + \
- "if=/dev/zero of={0} bs={1} count={2}"
-
- # cmd_templ = "fio --rw=write --bs={1} --direct=1 --size={2} "
-
- if self.use_sudo:
- cmd_templ = "sudo " + cmd_templ
-
- ssize = 0
- stime = time.time()
-
- for fname, curr_sz in files.items():
- cmd = cmd_templ.format(fname, 1024 ** 2, curr_sz)
- ssize += curr_sz
- self.run_over_ssh(cmd, timeout=curr_sz)
-
- ddtime = time.time() - stime
- if ddtime > 1E-3:
- fill_bw = int(ssize / ddtime)
- mess = "Initiall dd fill bw is {0} MiBps for this vm"
- logger.info(mess.format(fill_bw))
- self.coordinate(('init_bw', fill_bw))
-
- def install_utils(self, max_retry=3, timeout=5):
- need_install = []
- for bin_name, package in (('fio', 'fio'), ('screen', 'screen')):
- try:
- self.run_over_ssh('which ' + bin_name, nolog=True)
- except OSError:
- need_install.append(package)
-
- if len(need_install) == 0:
- return
-
- cmd = "sudo apt-get -y install " + " ".join(need_install)
-
- for i in range(max_retry):
- try:
- self.run_over_ssh(cmd)
- break
- except OSError as err:
- time.sleep(timeout)
- else:
- raise OSError("Can't install - " + str(err))
-
- def pre_run(self):
- try:
- cmd = 'mkdir -p "{0}"'.format(self.remote_dir)
- if self.use_sudo:
- cmd = "sudo " + cmd
- cmd += " ; sudo chown {0} {1}".format(self.node.get_user(),
- self.remote_dir)
-
- self.run_over_ssh(cmd)
- except Exception as exc:
- msg = "Failed to create folder {0} on remote {1}. Error: {2!s}"
- msg = msg.format(self.remote_dir, self.node.get_conn_id(), exc)
- logger.exception(msg)
- raise StopTestError(msg, exc)
-
- self.install_utils()
-
- local_fname = os.path.splitext(io_agent.__file__)[0] + ".py"
- files_to_copy = {local_fname: self.io_py_remote}
- copy_paths(self.node.connection, files_to_copy)
-
- if self.options.get('prefill_files', True):
- self.prefill_test_files()
- elif self.is_primary:
- logger.warning("Prefilling of test files is disabled")
-
- def check_process_is_running(self, sftp, pid):
- try:
- sftp.stat("/proc/{0}".format(pid))
- return True
- except (OSError, IOError, NameError):
- return False
-
- def kill_remote_process(self, conn, pid, soft=True):
- try:
- if soft:
- cmd = "kill {0}"
- else:
- cmd = "kill -9 {0}"
-
- if self.use_sudo:
- cmd = "sudo " + cmd
-
- self.run_over_ssh(cmd.format(pid))
- return True
- except OSError:
- return False
-
- def get_test_status(self, res_file=None):
- found_res_file = False
- is_connected = None
- is_running = None
- pid = None
- err = None
-
- try:
- # conn = connect(self.node.conn_url,
- # conn_timeout=self.tcp_conn_timeout)
- # with conn:
- conn = self.node.connection
- with conn.open_sftp() as sftp:
- try:
- pid = read_from_remote(sftp, self.pid_file)
- is_running = True
- except (NameError, IOError, OSError) as exc:
- pid = None
- is_running = False
-
- if is_running:
- if not self.check_process_is_running(sftp, pid):
- try:
- sftp.remove(self.pid_file)
- except (IOError, NameError, OSError):
- pass
- is_running = False
-
- if res_file is not None:
- found_res_file = exists(sftp, res_file)
-
- is_connected = True
-
- except (socket.error, SSHException, EOFError, SFTPError) as exc:
- err = str(exc)
- is_connected = False
-
- return found_res_file, is_connected, is_running, pid, err
-
- def wait_till_finished(self, soft_timeout, timeout, res_fname=None):
- conn_id = self.node.get_conn_id()
- end_of_wait_time = timeout + time.time()
- soft_end_of_wait_time = soft_timeout + time.time()
-
- time_till_check = random.randint(5, 10)
- pid = None
- is_running = False
- pid_get_timeout = self.max_pig_timeout + time.time()
- curr_connected = True
-
- while end_of_wait_time > time.time():
- time.sleep(time_till_check)
-
- found_res_file, is_connected, is_running, npid, err = \
- self.get_test_status(res_fname)
-
- if found_res_file and not is_running:
- return
-
- if is_connected and not is_running:
- if pid is None:
- if time.time() > pid_get_timeout:
- msg = ("On node {0} pid file doesn't " +
- "appears in time")
- logger.error(msg.format(conn_id))
- raise StopTestError("Start timeout")
- else:
- # execution finished
- break
-
- if npid is not None:
- pid = npid
-
- if is_connected and pid is not None and is_running:
- if time.time() < soft_end_of_wait_time:
- time.sleep(soft_end_of_wait_time - time.time())
-
- if is_connected and not curr_connected:
- msg = "Connection with {0} is restored"
- logger.debug(msg.format(conn_id))
- elif not is_connected and curr_connected:
- msg = "Lost connection with " + conn_id + ". Error: " + err
- logger.debug(msg)
-
- curr_connected = is_connected
-
- def run(self, barrier):
- try:
- if len(self.fio_configs) > 1 and self.is_primary:
-
- exec_time = 0
- for test in self.fio_configs:
- exec_time += io_agent.calculate_execution_time(test)
-
- # +5% - is a rough estimation for additional operations
- # like sftp, etc
- exec_time = int(exec_time * 1.05)
-
- exec_time_s = sec_to_str(exec_time)
- now_dt = datetime.datetime.now()
- end_dt = now_dt + datetime.timedelta(0, exec_time)
- msg = "Entire test should takes aroud: {0} and finished at {1}"
- logger.info(msg.format(exec_time_s,
- end_dt.strftime("%H:%M:%S")))
-
- for pos, fio_cfg_slice in enumerate(self.fio_configs):
- names = [i.name for i in fio_cfg_slice]
- msgs = []
- already_processed = set()
- for name in names:
- if name not in already_processed:
- already_processed.add(name)
-
- if 1 == names.count(name):
- msgs.append(name)
- else:
- frmt = "{0} * {1}"
- msgs.append(frmt.format(name,
- names.count(name)))
-
- if self.is_primary:
- logger.info("Will run tests: " + ", ".join(msgs))
-
- nolog = (pos != 0) or not self.is_primary
- out_err = self.do_run(barrier, fio_cfg_slice, nolog=nolog)
-
- try:
- for data in parse_output(out_err):
- self.on_result_cb(data)
- except (OSError, StopTestError):
- raise
- except Exception as exc:
- msg_templ = "Error during postprocessing results: {0!s}"
- raise RuntimeError(msg_templ.format(exc))
-
- finally:
- barrier.exit()
-
- def do_run(self, barrier, cfg, nolog=False):
- conn_id = self.node.get_conn_id()
-
- cmd_templ = "screen -S {screen_name} -d -m " + \
- "env python2 {0} -p {pid_file} -o {results_file} " + \
- "--type {1} {2} --json {3}"
-
- if self.options.get("use_sudo", True):
- cmd_templ = "sudo " + cmd_templ
-
- params = []
- for k, v in self.config_params.items():
- if isinstance(v, basestring) and v.startswith("{%"):
- continue
- params.append("{0}={1}".format(k, v))
-
- if [] != params:
- params = "--params " + " ".join(params)
-
- with self.node.connection.open_sftp() as sftp:
- save_to_remote(sftp, self.task_file,
- io_agent.fio_config_to_str(cfg))
-
- screen_name = self.test_uuid
- cmd = cmd_templ.format(self.io_py_remote,
- self.tool,
- params,
- self.task_file,
- pid_file=self.pid_file,
- results_file=self.log_fl,
- screen_name=screen_name)
-
- exec_time = io_agent.calculate_execution_time(cfg)
- exec_time_str = sec_to_str(exec_time)
-
- timeout = int(exec_time + max(300, exec_time))
- soft_tout = exec_time
- barrier.wait()
- self.run_over_ssh(cmd, nolog=nolog)
- if self.is_primary:
- templ = "Test should takes about {0}." + \
- " Should finish at {1}," + \
- " will wait at most till {2}"
- now_dt = datetime.datetime.now()
- end_dt = now_dt + datetime.timedelta(0, exec_time)
- wait_till = now_dt + datetime.timedelta(0, timeout)
-
- logger.info(templ.format(exec_time_str,
- end_dt.strftime("%H:%M:%S"),
- wait_till.strftime("%H:%M:%S")))
-
- if not nolog:
- msg = "Tests started in screen {1} on each testnode"
- logger.debug(msg.format(conn_id, screen_name))
-
- # TODO: add monitoring socket
- # if not isinstance(self.node.connection, Local):
- # self.node.connection.close()
-
- self.wait_till_finished(soft_tout, timeout, self.log_fl)
- if not nolog:
- logger.debug("Test on node {0} is finished".format(conn_id))
-
- # if self.node.connection is not Local:
- # conn_timeout = self.tcp_conn_timeout * 3
- # self.node.connection = connect(self.node.conn_url,
- # conn_timeout=conn_timeout)
-
- with self.node.connection.open_sftp() as sftp:
- return read_from_remote(sftp, self.log_fl)
-
- @classmethod
- def merge_results(cls, results):
- merged = results[0]
- for block in results[1:]:
- assert block["__meta__"] == merged["__meta__"]
- merged['res'].extend(block['res'])
- return merged
-
- # @classmethod
- # def merge_results(cls, results):
- # if len(results) == 0:
- # return None
-
- # merged_result = results[0]
- # merged_data = merged_result['res']
- # mergable_fields = ['bw', 'clat', 'iops', 'lat', 'slat']
-
- # for res in results[1:]:
- # mm = merged_result['__meta__']
- # assert mm['raw_cfg'] == res['__meta__']['raw_cfg']
- # assert mm['params'] == res['__meta__']['params']
- # mm['timings'].extend(res['__meta__']['timings'])
-
- # data = res['res']
- # for testname, test_data in data.items():
- # if testname not in merged_data:
- # merged_data[testname] = test_data
- # continue
-
- # res_test_data = merged_data[testname]
-
- # diff = set(test_data.keys()).symmetric_difference(
- # res_test_data.keys())
-
- # msg = "Difference: {0}".format(",".join(diff))
- # assert len(diff) == 0, msg
-
- # for k, v in test_data.items():
- # if k in mergable_fields:
- # res_test_data[k].extend(v)
- # else:
- # msg = "{0!r} != {1!r}".format(res_test_data[k], v)
- # assert res_test_data[k] == v, msg
-
- # return merged_result
-
- @classmethod
- def format_for_console(cls, data, dinfo):
- return io_formatter.format_results_for_console(dinfo)
diff --git a/wally/suits/mysql/__init__.py b/wally/suits/mysql/__init__.py
index e69de29..6c3a982 100644
--- a/wally/suits/mysql/__init__.py
+++ b/wally/suits/mysql/__init__.py
@@ -0,0 +1,19 @@
+import os.path
+
+import texttable
+
+from ..itest import TwoScriptTest
+
+
+class MysqlTest(TwoScriptTest):
+ root = os.path.dirname(__file__)
+ pre_run_script = os.path.join(root, "prepare.sh")
+ run_script = os.path.join(root, "run.sh")
+
+ @classmethod
+ def format_for_console(cls, data):
+ tab = texttable.Texttable(max_width=120)
+ tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
+ tab.header(["TpmC"])
+ tab.add_row([data['res']['TpmC']])
+ return tab.draw()
diff --git a/wally/suits/postgres/__init__.py b/wally/suits/postgres/__init__.py
index e69de29..06fdd21 100644
--- a/wally/suits/postgres/__init__.py
+++ b/wally/suits/postgres/__init__.py
@@ -0,0 +1,21 @@
+import os.path
+
+
+import texttable
+
+
+from ..itest import TwoScriptTest
+
+
+class PgBenchTest(TwoScriptTest):
+ root = os.path.dirname(__file__)
+ pre_run_script = os.path.join(root, "prepare.sh")
+ run_script = os.path.join(root, "run.sh")
+
+ @classmethod
+ def format_for_console(cls, data):
+ tab = texttable.Texttable(max_width=120)
+ tab.set_deco(tab.HEADER | tab.VLINES | tab.BORDER)
+ tab.header(["TpmC"])
+ tab.add_row([data['res']['TpmC']])
+ return tab.draw()
diff --git a/wally/utils.py b/wally/utils.py
index 1fa74c5..cdee319 100644
--- a/wally/utils.py
+++ b/wally/utils.py
@@ -1,5 +1,6 @@
import re
import os
+import sys
import time
import psutil
import socket
@@ -31,6 +32,41 @@
self.orig_exc = orig_exc
+@contextlib.contextmanager
+def log_block(message, exc_logger=None):
+ logger.debug("Starts : " + message)
+ with log_error(message, exc_logger):
+ yield
+ # try:
+ # yield
+ # except Exception as exc:
+ # if isinstance(exc, types) and not isinstance(exc, StopIteration):
+ # templ = "Error during {0} stage: {1!s}"
+ # logger.debug(templ.format(action, exc))
+ # raise
+
+
+class log_error(object):
+ def __init__(self, message, exc_logger=None):
+ self.message = message
+ self.exc_logger = exc_logger
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, tp, value, traceback):
+ if value is None or isinstance(value, StopTestError):
+ return
+
+ if self.exc_logger is None:
+ exc_logger = sys._getframe(1).f_globals.get('logger', logger)
+ else:
+ exc_logger = self.exc_logger
+
+ exc_logger.exception(self.message, exc_info=(tp, value, traceback))
+ raise StopTestError(self.message, value)
+
+
def check_input_param(is_ok, message):
if not is_ok:
logger.error(message)
@@ -79,22 +115,6 @@
self.exited = True
-@contextlib.contextmanager
-def log_error(action, types=(Exception,)):
- if not action.startswith("!"):
- logger.debug("Starts : " + action)
- else:
- action = action[1:]
-
- try:
- yield
- except Exception as exc:
- if isinstance(exc, types) and not isinstance(exc, StopIteration):
- templ = "Error during {0} stage: {1!s}"
- logger.debug(templ.format(action, exc))
- raise
-
-
SMAP = dict(k=1024, m=1024 ** 2, g=1024 ** 3, t=1024 ** 4)
@@ -259,3 +279,13 @@
def iter_clean_func():
while CLEANING != []:
yield CLEANING.pop()
+
+
+def flatten(data):
+ res = []
+ for i in data:
+ if isinstance(i, (list, tuple, set)):
+ res.extend(flatten(i))
+ else:
+ res.append(i)
+ return res