blob: 01489f65cbf8a343f57ada0153274458bce6f8a5 [file] [log] [blame]
""" Analize test results for finding bottlenecks """
import re
import sys
import csv
import time
import bisect
import os.path
import argparse
import collections
import yaml
import texttable
try:
import pygraphviz as pgv
except ImportError:
pgv = None
sys.path.append("/mnt/other/work/disk_perf_test_tool")
from wally.run_test import load_data_from
from wally.utils import b2ssize, b2ssize_10
class SensorInfo(object):
def __init__(self, name, print_name, native_ext, to_bytes_coef):
self.name = name
self.print_name = print_name
self.native_ext = native_ext
self.to_bytes_coef = to_bytes_coef
_SINFO = [
SensorInfo('recv_bytes', 'net_recv', 'B', 1),
SensorInfo('send_bytes', 'net_send', 'B', 1),
SensorInfo('sectors_written', 'hdd_write', 'Sect', 512),
SensorInfo('sectors_read', 'hdd_read', 'Sect', 512),
SensorInfo('reads_completed', 'read_op', 'OP', None),
SensorInfo('writes_completed', 'write_op', 'OP', None),
SensorInfo('procs_blocked', 'blocked_procs', 'P', None),
]
SINFO_MAP = dict((sinfo.name, sinfo) for sinfo in _SINFO)
to_bytes = dict((sinfo.name, sinfo.to_bytes_coef)
for sinfo in _SINFO
if sinfo.to_bytes_coef is not None)
class NodeSensorsData(object):
def __init__(self, source_id, hostname, headers, values):
self.source_id = source_id
self.hostname = hostname
self.headers = headers
self.values = values
self.times = None
def finalize(self):
self.times = [v[0] for v in self.values]
def get_data_for_interval(self, beg, end):
p1 = bisect.bisect_left(self.times, beg)
p2 = bisect.bisect_right(self.times, end)
obj = self.__class__(self.source_id,
self.hostname,
self.headers,
self.values[p1:p2])
obj.times = self.times[p1:p2]
return obj
def __getitem__(self, name):
idx = self.headers.index(name.split('.'))
# +1 as first is a time
return [val[idx] for val in self.values]
def load_results_csv(fd):
data = fd.read()
results = {}
for block in data.split("NEW_DATA"):
block = block.strip()
if len(block) == 0:
continue
it = csv.reader(block.split("\n"))
headers = next(it)
sens_data = [map(float, vals) for vals in it]
source_id, hostname = headers[:2]
headers = [(None, 'time')] + \
[header.split('.') for header in headers[2:]]
assert set(map(len, headers)) == set([2])
results[source_id] = NodeSensorsData(source_id, hostname,
headers, sens_data)
return results
def load_test_timings(fname, max_diff=1000):
raw_map = collections.defaultdict(lambda: [])
class data(object):
pass
load_data_from(fname)(None, data)
for test_type, test_results in data.results.items():
if test_type == 'io':
for tests_res in test_results:
raw_map[tests_res.config.name].append(tests_res.run_interval)
result = {}
for name, intervals in raw_map.items():
intervals.sort()
curr_start, curr_stop = intervals[0]
curr_result = []
for (start, stop) in intervals[1:]:
if abs(curr_start - start) < max_diff:
# if abs(curr_stop - stop) > 2:
# print abs(curr_stop - stop)
assert abs(curr_stop - stop) < max_diff
else:
assert start + max_diff >= curr_stop
assert stop > curr_stop
curr_result.append((curr_start, curr_stop))
curr_start, curr_stop = start, stop
curr_result.append((curr_start, curr_stop))
merged_res = []
curr_start, curr_stop = curr_result[0]
for start, stop in curr_result[1:]:
if abs(curr_stop - start) < max_diff:
curr_stop = stop
else:
merged_res.append((curr_start, curr_stop))
curr_start, curr_stop = start, stop
merged_res.append((curr_start, curr_stop))
result[name] = merged_res
return result
critical_values = dict(
io_queue=1,
usage_percent=0.8,
procs_blocked=1,
procs_queue=1)
class AggregatedData(object):
def __init__(self, sensor_name):
self.sensor_name = sensor_name
# (node, device): count
self.per_device = collections.defaultdict(lambda: 0)
# node: count
self.per_node = collections.defaultdict(lambda: 0)
# role: count
self.per_role = collections.defaultdict(lambda: 0)
# (role_or_node, device_or_*): count
self.all_together = collections.defaultdict(lambda: 0)
def __str__(self):
res = "<AggregatedData({0})>\n".format(self.sensor_name)
for (role_or_node, device), val in self.all_together.items():
res += " {0}:{1} = {2}\n".format(role_or_node, device, val)
return res
def total_consumption(sensors_data, roles_map):
result = {}
for name, sensor_data in sensors_data.items():
for pos, (dev, sensor) in enumerate(sensor_data.headers):
if 'time' == sensor:
continue
try:
ad = result[sensor]
except KeyError:
ad = result[sensor] = AggregatedData(sensor)
val = sum(vals[pos] for vals in sensor_data.values)
ad.per_device[(sensor_data.hostname, dev)] += val
# vals1 = sensors_data['localhost:22']['sdc.sectors_read']
# vals2 = sensors_data['localhost:22']['sdb.sectors_written']
# from matplotlib import pyplot as plt
# plt.plot(range(len(vals1)), vals1)
# plt.plot(range(len(vals2)), vals2)
# plt.show()
# exit(1)
for ad in result.values():
for (hostname, dev), val in ad.per_device.items():
ad.per_node[hostname] += val
for role in roles_map[hostname]:
ad.per_role[role] += val
ad.all_together[(hostname, dev)] = val
for role, val in ad.per_role.items():
ad.all_together[(role, '*')] = val
for node, val in ad.per_node.items():
ad.all_together[(node, '*')] = val
return result
def avg_load(sensors_data):
load = collections.defaultdict(lambda: 0)
min_time = 0xFFFFFFFFFFF
max_time = 0
for sensor_data in sensors_data.values():
min_time = min(min_time, min(sensor_data.times))
max_time = max(max_time, max(sensor_data.times))
for name, max_val in critical_values.items():
for pos, (dev, sensor) in enumerate(sensor_data.headers):
if sensor == name:
for vals in sensor_data.values:
if vals[pos] > max_val:
load[(sensor_data.hostname, dev, sensor)] += 1
return load, max_time - min_time
def print_bottlenecks(sensors_data, max_bottlenecks=15):
load, duration = avg_load(sensors_data)
if not load:
return "\n*** No bottlenecks found *** \n"
rev_items = ((v, k) for (k, v) in load.items())
res = sorted(rev_items, reverse=True)[:max_bottlenecks]
max_name_sz = max(len(name) for _, name in res)
frmt = "{{0:>{0}}} | {{1:>4}}".format(max_name_sz)
table = [frmt.format("Component", "% times load > 100%")]
for (v, k) in res:
table.append(frmt.format(k, int(v * 100.0 / duration + 0.5)))
return "\n".join(table)
def print_consumption(agg, min_transfer=None):
rev_items = []
for (node_or_role, dev), v in agg.all_together.items():
rev_items.append((int(v), node_or_role + ':' + dev))
res = sorted(rev_items, reverse=True)
if min_transfer is not None:
res = [(v, k)
for (v, k) in res
if v >= min_transfer]
if len(res) == 0:
return None
res = [(b2ssize(v) + "B", k) for (v, k) in res]
max_name_sz = max(len(name) for _, name in res)
max_val_sz = max(len(val) for val, _ in res)
frmt = " {{0:>{0}}} | {{1:>{1}}} ".format(max_name_sz, max_val_sz)
table = [frmt.format("Component", "Usage")]
for (v, k) in res:
table.append(frmt.format(k, v))
return "\n".join(table)
def make_roles_mapping(source_id_mapping, source_id2hostname):
result = {}
for ssh_url, roles in source_id_mapping.items():
if '@' in ssh_url:
source_id = ssh_url.split('@')[1]
else:
source_id = ssh_url.split('://')[1]
if source_id.count(':') == 2:
source_id = source_id.rsplit(":", 1)[0]
if source_id.endswith(':'):
source_id += "22"
if source_id in source_id2hostname:
result[source_id] = roles
result[source_id2hostname[source_id]] = roles
for testnode_src in (set(source_id2hostname) - set(result)):
result[testnode_src] = ['testnode']
result[source_id2hostname[testnode_src]] = ['testnode']
return result
def get_testdata_size(consumption):
max_data = 0
for name, sens in SINFO_MAP.items():
if sens.to_bytes_coef is not None:
agg = consumption.get(name)
if agg is not None:
cdt = agg.per_role.get('testnode', 0) * sens.to_bytes_coef
max_data = max(max_data, cdt)
return max_data
def get_testop_cout(consumption):
max_op = 0
for name, sens in SINFO_MAP.items():
if sens.to_bytes_coef is None:
agg = consumption.get(name)
if agg is not None:
max_op = max(max_op, agg.per_role.get('testnode', 0))
return max_op
def get_data_for_intervals(data, intervals):
res = {}
for begin, end in intervals:
for name, node_data in data.items():
ndata = node_data.get_data_for_interval(begin, end)
res[name] = ndata
return res
class Host(object):
def __init__(self, name=None):
self.name = name
self.hdd_devs = {}
self.net_devs = None
def plot_consumption(per_consumer_table, fields, refload):
if pgv is None:
return
hosts = {}
storage_sensors = ('sectors_written', 'sectors_read')
for (hostname, dev), consumption in per_consumer_table.items():
if hostname not in hosts:
hosts[hostname] = Host(hostname)
host = hosts[hostname]
cons_map = dict(zip(fields, consumption))
for sn in storage_sensors:
vl = cons_map.get(sn, 0)
if vl > 0:
host.hdd_devs.setdefault(dev, {})[sn] = vl
p = pgv.AGraph(name='system', directed=True)
net = "Network"
p.add_node(net)
in_color = 'red'
out_color = 'green'
for host in hosts.values():
g = p.subgraph(name="cluster_" + host.name, label=host.name,
color="blue")
g.add_node(host.name, shape="diamond")
p.add_edge(host.name, net)
p.add_edge(net, host.name)
for dev_name, values in host.hdd_devs.items():
if dev_name == '*':
continue
to = values.get('sectors_written', 0)
frm = values.get('sectors_read', 0)
to_pw = 7 * to / refload
frm_pw = 7 * frm / refload
min_with = 0.1
if to_pw > min_with or frm_pw > min_with:
dev_fqn = host.name + "." + dev_name
g.add_node(dev_fqn)
if to_pw > min_with:
g.add_edge(host.name, dev_fqn,
label=b2ssize(to) + "B",
penwidth=to_pw,
fontcolor=out_color,
color=out_color)
if frm_pw > min_with:
g.add_edge(dev_fqn, host.name,
label=b2ssize(frm) + "B",
penwidth=frm_pw,
color=in_color,
fontcolor=in_color)
return p.string()
def parse_args(args):
parser = argparse.ArgumentParser()
parser.add_argument('-t', '--time_period', nargs=2,
type=int, default=None,
help="Begin and end time for tests")
parser.add_argument('-m', '--max-bottlenek', type=int,
default=15, help="Max bottleneck to show")
parser.add_argument('-x', '--max-diff', type=int,
default=10, help="Max bottleneck to show in" +
"0.1% from test nodes summ load")
parser.add_argument('-d', '--debug-ver', action='store_true',
help="Full report with original data")
parser.add_argument('-u', '--user-ver', action='store_true',
default=True, help="Avg load report")
parser.add_argument('-s', '--select-loads', nargs='*', default=[])
parser.add_argument('-f', '--fields', nargs='*', default=[])
parser.add_argument('results_folder')
return parser.parse_args(args[1:])
def main(argv):
opts = parse_args(argv)
stor_dir = os.path.join(opts.results_folder, 'sensor_storage')
data = {}
source_id2hostname = {}
csv_files = os.listdir(stor_dir)
for fname in csv_files:
assert re.match(r"\d+_\d+.csv$", fname)
csv_files.sort(key=lambda x: int(x.split('_')[0]))
for fname in csv_files:
with open(os.path.join(stor_dir, fname)) as fd:
for name, node_sens_data in load_results_csv(fd).items():
if name in data:
assert data[name].hostname == node_sens_data.hostname
assert data[name].source_id == node_sens_data.source_id
assert data[name].headers == node_sens_data.headers
data[name].values.extend(node_sens_data.values)
else:
data[name] = node_sens_data
for nd in data.values():
assert nd.source_id not in source_id2hostname
source_id2hostname[nd.source_id] = nd.hostname
nd.finalize()
roles_file = os.path.join(opts.results_folder,
'nodes.yaml')
src2roles = yaml.load(open(roles_file))
timings = load_test_timings(opts.results_folder)
roles_map = make_roles_mapping(src2roles, source_id2hostname)
max_diff = float(opts.max_diff) / 1000
fields = ('recv_bytes', 'send_bytes',
'sectors_read', 'sectors_written',
'reads_completed', 'writes_completed')
if opts.fields != []:
fields = [field for field in fields if field in opts.fields]
for test_name, intervals in sorted(timings.items()):
if opts.select_loads != []:
if test_name not in opts.select_loads:
continue
data_chunks = get_data_for_intervals(data, intervals)
consumption = total_consumption(data_chunks, roles_map)
bottlenecks = print_bottlenecks(data_chunks)
testdata_sz = get_testdata_size(consumption) * max_diff
testop_count = get_testop_cout(consumption) * max_diff
per_consumer_table = {}
per_consumer_table_str = {}
all_consumers = set()#consumption.values()[0].all_together)
for value in consumption.values():
all_consumers = all_consumers | set(value.all_together)
fields = [field for field in fields if field in consumption]
all_consumers_sum = []
for consumer in all_consumers:
tb_str = per_consumer_table_str[consumer] = []
tb = per_consumer_table[consumer] = []
vl = 0
for name in fields:
val = consumption[name].all_together[consumer]
if SINFO_MAP[name].to_bytes_coef is None:
if val < testop_count:
tb_str.append('0')
else:
tb_str.append(b2ssize_10(int(val)))
else:
val = int(val) * SINFO_MAP[name].to_bytes_coef
if val < testdata_sz:
tb_str.append('-')
else:
tb_str.append(b2ssize(val) + "B")
tb.append(int(val))
vl += int(val)
all_consumers_sum.append((vl, consumer))
all_consumers_sum.sort(reverse=True)
plot_consumption(per_consumer_table, fields,
testdata_sz / max_diff)
tt = texttable.Texttable(max_width=130)
tt.set_cols_align(["l"] + ["r"] * len(fields))
header = ["Name"]
for fld in fields:
if fld in SINFO_MAP:
header.append(SINFO_MAP[fld].print_name)
else:
header.append(fld)
tt.header(header)
for summ, consumer in all_consumers_sum:
if summ > 0:
tt.add_row([":".join(consumer)] +
per_consumer_table_str[consumer])
tt.set_deco(texttable.Texttable.VLINES | texttable.Texttable.HEADER)
res = tt.draw()
max_len = max(map(len, res.split("\n")))
print test_name.center(max_len)
print res
print bottlenecks
if __name__ == "__main__":
exit(main(sys.argv))