| """ Analize test results for finding bottlenecks """ |
| |
| import re |
| import sys |
| import csv |
| import time |
| import bisect |
| import os.path |
| import argparse |
| import collections |
| |
| |
| import yaml |
| import texttable |
| |
| try: |
| import pygraphviz as pgv |
| except ImportError: |
| pgv = None |
| |
| sys.path.append("/mnt/other/work/disk_perf_test_tool") |
| from wally.run_test import load_data_from |
| from wally.utils import b2ssize, b2ssize_10 |
| |
| |
| class SensorInfo(object): |
| def __init__(self, name, print_name, native_ext, to_bytes_coef): |
| self.name = name |
| self.print_name = print_name |
| self.native_ext = native_ext |
| self.to_bytes_coef = to_bytes_coef |
| |
| |
| _SINFO = [ |
| SensorInfo('recv_bytes', 'net_recv', 'B', 1), |
| SensorInfo('send_bytes', 'net_send', 'B', 1), |
| SensorInfo('sectors_written', 'hdd_write', 'Sect', 512), |
| SensorInfo('sectors_read', 'hdd_read', 'Sect', 512), |
| SensorInfo('reads_completed', 'read_op', 'OP', None), |
| SensorInfo('writes_completed', 'write_op', 'OP', None), |
| SensorInfo('procs_blocked', 'blocked_procs', 'P', None), |
| ] |
| |
| SINFO_MAP = dict((sinfo.name, sinfo) for sinfo in _SINFO) |
| to_bytes = dict((sinfo.name, sinfo.to_bytes_coef) |
| for sinfo in _SINFO |
| if sinfo.to_bytes_coef is not None) |
| |
| |
| class NodeSensorsData(object): |
| def __init__(self, source_id, hostname, headers, values): |
| self.source_id = source_id |
| self.hostname = hostname |
| self.headers = headers |
| self.values = values |
| self.times = None |
| |
| def finalize(self): |
| self.times = [v[0] for v in self.values] |
| |
| def get_data_for_interval(self, beg, end): |
| p1 = bisect.bisect_left(self.times, beg) |
| p2 = bisect.bisect_right(self.times, end) |
| |
| obj = self.__class__(self.source_id, |
| self.hostname, |
| self.headers, |
| self.values[p1:p2]) |
| obj.times = self.times[p1:p2] |
| return obj |
| |
| def __getitem__(self, name): |
| idx = self.headers.index(name.split('.')) |
| # +1 as first is a time |
| return [val[idx] for val in self.values] |
| |
| |
| def load_results_csv(fd): |
| data = fd.read() |
| results = {} |
| for block in data.split("NEW_DATA"): |
| block = block.strip() |
| if len(block) == 0: |
| continue |
| |
| it = csv.reader(block.split("\n")) |
| headers = next(it) |
| sens_data = [map(float, vals) for vals in it] |
| source_id, hostname = headers[:2] |
| headers = [(None, 'time')] + \ |
| [header.split('.') for header in headers[2:]] |
| assert set(map(len, headers)) == set([2]) |
| |
| results[source_id] = NodeSensorsData(source_id, hostname, |
| headers, sens_data) |
| |
| return results |
| |
| |
| def load_test_timings(fname, max_diff=1000): |
| raw_map = collections.defaultdict(lambda: []) |
| |
| class data(object): |
| pass |
| |
| load_data_from(fname)(None, data) |
| for test_type, test_results in data.results.items(): |
| if test_type == 'io': |
| for tests_res in test_results: |
| raw_map[tests_res.config.name].append(tests_res.run_interval) |
| |
| result = {} |
| for name, intervals in raw_map.items(): |
| intervals.sort() |
| curr_start, curr_stop = intervals[0] |
| curr_result = [] |
| |
| for (start, stop) in intervals[1:]: |
| if abs(curr_start - start) < max_diff: |
| # if abs(curr_stop - stop) > 2: |
| # print abs(curr_stop - stop) |
| assert abs(curr_stop - stop) < max_diff |
| else: |
| assert start + max_diff >= curr_stop |
| assert stop > curr_stop |
| curr_result.append((curr_start, curr_stop)) |
| curr_start, curr_stop = start, stop |
| curr_result.append((curr_start, curr_stop)) |
| |
| merged_res = [] |
| curr_start, curr_stop = curr_result[0] |
| for start, stop in curr_result[1:]: |
| if abs(curr_stop - start) < max_diff: |
| curr_stop = stop |
| else: |
| merged_res.append((curr_start, curr_stop)) |
| curr_start, curr_stop = start, stop |
| merged_res.append((curr_start, curr_stop)) |
| result[name] = merged_res |
| |
| return result |
| |
| |
| critical_values = dict( |
| io_queue=1, |
| usage_percent=0.8, |
| procs_blocked=1, |
| procs_queue=1) |
| |
| |
| class AggregatedData(object): |
| def __init__(self, sensor_name): |
| self.sensor_name = sensor_name |
| |
| # (node, device): count |
| self.per_device = collections.defaultdict(lambda: 0) |
| |
| # node: count |
| self.per_node = collections.defaultdict(lambda: 0) |
| |
| # role: count |
| self.per_role = collections.defaultdict(lambda: 0) |
| |
| # (role_or_node, device_or_*): count |
| self.all_together = collections.defaultdict(lambda: 0) |
| |
| def __str__(self): |
| res = "<AggregatedData({0})>\n".format(self.sensor_name) |
| for (role_or_node, device), val in self.all_together.items(): |
| res += " {0}:{1} = {2}\n".format(role_or_node, device, val) |
| return res |
| |
| |
| def total_consumption(sensors_data, roles_map): |
| result = {} |
| |
| for name, sensor_data in sensors_data.items(): |
| for pos, (dev, sensor) in enumerate(sensor_data.headers): |
| |
| if 'time' == sensor: |
| continue |
| |
| try: |
| ad = result[sensor] |
| except KeyError: |
| ad = result[sensor] = AggregatedData(sensor) |
| |
| val = sum(vals[pos] for vals in sensor_data.values) |
| |
| ad.per_device[(sensor_data.hostname, dev)] += val |
| |
| # vals1 = sensors_data['localhost:22']['sdc.sectors_read'] |
| # vals2 = sensors_data['localhost:22']['sdb.sectors_written'] |
| |
| # from matplotlib import pyplot as plt |
| # plt.plot(range(len(vals1)), vals1) |
| # plt.plot(range(len(vals2)), vals2) |
| # plt.show() |
| # exit(1) |
| |
| for ad in result.values(): |
| for (hostname, dev), val in ad.per_device.items(): |
| ad.per_node[hostname] += val |
| |
| for role in roles_map[hostname]: |
| ad.per_role[role] += val |
| |
| ad.all_together[(hostname, dev)] = val |
| |
| for role, val in ad.per_role.items(): |
| ad.all_together[(role, '*')] = val |
| |
| for node, val in ad.per_node.items(): |
| ad.all_together[(node, '*')] = val |
| |
| return result |
| |
| |
| def avg_load(sensors_data): |
| load = collections.defaultdict(lambda: 0) |
| |
| min_time = 0xFFFFFFFFFFF |
| max_time = 0 |
| |
| for sensor_data in sensors_data.values(): |
| |
| min_time = min(min_time, min(sensor_data.times)) |
| max_time = max(max_time, max(sensor_data.times)) |
| |
| for name, max_val in critical_values.items(): |
| for pos, (dev, sensor) in enumerate(sensor_data.headers): |
| if sensor == name: |
| for vals in sensor_data.values: |
| if vals[pos] > max_val: |
| load[(sensor_data.hostname, dev, sensor)] += 1 |
| return load, max_time - min_time |
| |
| |
| def print_bottlenecks(sensors_data, max_bottlenecks=15): |
| load, duration = avg_load(sensors_data) |
| |
| if not load: |
| return "\n*** No bottlenecks found *** \n" |
| |
| rev_items = ((v, k) for (k, v) in load.items()) |
| |
| res = sorted(rev_items, reverse=True)[:max_bottlenecks] |
| |
| max_name_sz = max(len(name) for _, name in res) |
| frmt = "{{0:>{0}}} | {{1:>4}}".format(max_name_sz) |
| table = [frmt.format("Component", "% times load > 100%")] |
| |
| for (v, k) in res: |
| table.append(frmt.format(k, int(v * 100.0 / duration + 0.5))) |
| |
| return "\n".join(table) |
| |
| |
| def print_consumption(agg, min_transfer=None): |
| rev_items = [] |
| for (node_or_role, dev), v in agg.all_together.items(): |
| rev_items.append((int(v), node_or_role + ':' + dev)) |
| |
| res = sorted(rev_items, reverse=True) |
| |
| if min_transfer is not None: |
| res = [(v, k) |
| for (v, k) in res |
| if v >= min_transfer] |
| |
| if len(res) == 0: |
| return None |
| |
| res = [(b2ssize(v) + "B", k) for (v, k) in res] |
| |
| max_name_sz = max(len(name) for _, name in res) |
| max_val_sz = max(len(val) for val, _ in res) |
| |
| frmt = " {{0:>{0}}} | {{1:>{1}}} ".format(max_name_sz, max_val_sz) |
| table = [frmt.format("Component", "Usage")] |
| |
| for (v, k) in res: |
| table.append(frmt.format(k, v)) |
| |
| return "\n".join(table) |
| |
| |
| def make_roles_mapping(source_id_mapping, source_id2hostname): |
| result = {} |
| for ssh_url, roles in source_id_mapping.items(): |
| if '@' in ssh_url: |
| source_id = ssh_url.split('@')[1] |
| else: |
| source_id = ssh_url.split('://')[1] |
| |
| if source_id.count(':') == 2: |
| source_id = source_id.rsplit(":", 1)[0] |
| |
| if source_id.endswith(':'): |
| source_id += "22" |
| |
| if source_id in source_id2hostname: |
| result[source_id] = roles |
| result[source_id2hostname[source_id]] = roles |
| |
| for testnode_src in (set(source_id2hostname) - set(result)): |
| result[testnode_src] = ['testnode'] |
| result[source_id2hostname[testnode_src]] = ['testnode'] |
| |
| return result |
| |
| |
| def get_testdata_size(consumption): |
| max_data = 0 |
| for name, sens in SINFO_MAP.items(): |
| if sens.to_bytes_coef is not None: |
| agg = consumption.get(name) |
| if agg is not None: |
| cdt = agg.per_role.get('testnode', 0) * sens.to_bytes_coef |
| max_data = max(max_data, cdt) |
| return max_data |
| |
| |
| def get_testop_cout(consumption): |
| max_op = 0 |
| for name, sens in SINFO_MAP.items(): |
| if sens.to_bytes_coef is None: |
| agg = consumption.get(name) |
| if agg is not None: |
| max_op = max(max_op, agg.per_role.get('testnode', 0)) |
| return max_op |
| |
| |
| def get_data_for_intervals(data, intervals): |
| res = {} |
| for begin, end in intervals: |
| for name, node_data in data.items(): |
| ndata = node_data.get_data_for_interval(begin, end) |
| res[name] = ndata |
| return res |
| |
| |
| class Host(object): |
| def __init__(self, name=None): |
| self.name = name |
| self.hdd_devs = {} |
| self.net_devs = None |
| |
| |
| def plot_consumption(per_consumer_table, fields, refload): |
| if pgv is None: |
| return |
| |
| hosts = {} |
| storage_sensors = ('sectors_written', 'sectors_read') |
| |
| for (hostname, dev), consumption in per_consumer_table.items(): |
| if hostname not in hosts: |
| hosts[hostname] = Host(hostname) |
| |
| host = hosts[hostname] |
| cons_map = dict(zip(fields, consumption)) |
| |
| for sn in storage_sensors: |
| vl = cons_map.get(sn, 0) |
| if vl > 0: |
| host.hdd_devs.setdefault(dev, {})[sn] = vl |
| |
| p = pgv.AGraph(name='system', directed=True) |
| |
| net = "Network" |
| p.add_node(net) |
| |
| in_color = 'red' |
| out_color = 'green' |
| |
| for host in hosts.values(): |
| g = p.subgraph(name="cluster_" + host.name, label=host.name, |
| color="blue") |
| g.add_node(host.name, shape="diamond") |
| p.add_edge(host.name, net) |
| p.add_edge(net, host.name) |
| |
| for dev_name, values in host.hdd_devs.items(): |
| if dev_name == '*': |
| continue |
| |
| to = values.get('sectors_written', 0) |
| frm = values.get('sectors_read', 0) |
| to_pw = 7 * to / refload |
| frm_pw = 7 * frm / refload |
| min_with = 0.1 |
| |
| if to_pw > min_with or frm_pw > min_with: |
| dev_fqn = host.name + "." + dev_name |
| g.add_node(dev_fqn) |
| |
| if to_pw > min_with: |
| g.add_edge(host.name, dev_fqn, |
| label=b2ssize(to) + "B", |
| penwidth=to_pw, |
| fontcolor=out_color, |
| color=out_color) |
| |
| if frm_pw > min_with: |
| g.add_edge(dev_fqn, host.name, |
| label=b2ssize(frm) + "B", |
| penwidth=frm_pw, |
| color=in_color, |
| fontcolor=in_color) |
| |
| return p.string() |
| |
| |
| def parse_args(args): |
| parser = argparse.ArgumentParser() |
| parser.add_argument('-t', '--time_period', nargs=2, |
| type=int, default=None, |
| help="Begin and end time for tests") |
| parser.add_argument('-m', '--max-bottlenek', type=int, |
| default=15, help="Max bottleneck to show") |
| parser.add_argument('-x', '--max-diff', type=int, |
| default=10, help="Max bottleneck to show in" + |
| "0.1% from test nodes summ load") |
| parser.add_argument('-d', '--debug-ver', action='store_true', |
| help="Full report with original data") |
| parser.add_argument('-u', '--user-ver', action='store_true', |
| default=True, help="Avg load report") |
| parser.add_argument('-s', '--select-loads', nargs='*', default=[]) |
| parser.add_argument('-f', '--fields', nargs='*', default=[]) |
| parser.add_argument('results_folder') |
| return parser.parse_args(args[1:]) |
| |
| |
| def main(argv): |
| opts = parse_args(argv) |
| |
| stor_dir = os.path.join(opts.results_folder, 'sensor_storage') |
| data = {} |
| source_id2hostname = {} |
| |
| csv_files = os.listdir(stor_dir) |
| for fname in csv_files: |
| assert re.match(r"\d+_\d+.csv$", fname) |
| |
| csv_files.sort(key=lambda x: int(x.split('_')[0])) |
| |
| for fname in csv_files: |
| with open(os.path.join(stor_dir, fname)) as fd: |
| for name, node_sens_data in load_results_csv(fd).items(): |
| if name in data: |
| assert data[name].hostname == node_sens_data.hostname |
| assert data[name].source_id == node_sens_data.source_id |
| assert data[name].headers == node_sens_data.headers |
| data[name].values.extend(node_sens_data.values) |
| else: |
| data[name] = node_sens_data |
| |
| for nd in data.values(): |
| assert nd.source_id not in source_id2hostname |
| source_id2hostname[nd.source_id] = nd.hostname |
| nd.finalize() |
| |
| roles_file = os.path.join(opts.results_folder, |
| 'nodes.yaml') |
| |
| src2roles = yaml.load(open(roles_file)) |
| |
| timings = load_test_timings(opts.results_folder) |
| |
| roles_map = make_roles_mapping(src2roles, source_id2hostname) |
| max_diff = float(opts.max_diff) / 1000 |
| |
| fields = ('recv_bytes', 'send_bytes', |
| 'sectors_read', 'sectors_written', |
| 'reads_completed', 'writes_completed') |
| |
| if opts.fields != []: |
| fields = [field for field in fields if field in opts.fields] |
| |
| for test_name, intervals in sorted(timings.items()): |
| if opts.select_loads != []: |
| if test_name not in opts.select_loads: |
| continue |
| |
| data_chunks = get_data_for_intervals(data, intervals) |
| |
| consumption = total_consumption(data_chunks, roles_map) |
| |
| bottlenecks = print_bottlenecks(data_chunks) |
| |
| testdata_sz = get_testdata_size(consumption) * max_diff |
| testop_count = get_testop_cout(consumption) * max_diff |
| |
| per_consumer_table = {} |
| per_consumer_table_str = {} |
| |
| all_consumers = set()#consumption.values()[0].all_together) |
| for value in consumption.values(): |
| all_consumers = all_consumers | set(value.all_together) |
| fields = [field for field in fields if field in consumption] |
| all_consumers_sum = [] |
| |
| for consumer in all_consumers: |
| tb_str = per_consumer_table_str[consumer] = [] |
| tb = per_consumer_table[consumer] = [] |
| vl = 0 |
| for name in fields: |
| val = consumption[name].all_together[consumer] |
| if SINFO_MAP[name].to_bytes_coef is None: |
| if val < testop_count: |
| tb_str.append('0') |
| else: |
| tb_str.append(b2ssize_10(int(val))) |
| else: |
| val = int(val) * SINFO_MAP[name].to_bytes_coef |
| if val < testdata_sz: |
| tb_str.append('-') |
| else: |
| tb_str.append(b2ssize(val) + "B") |
| tb.append(int(val)) |
| vl += int(val) |
| all_consumers_sum.append((vl, consumer)) |
| |
| all_consumers_sum.sort(reverse=True) |
| |
| plot_consumption(per_consumer_table, fields, |
| testdata_sz / max_diff) |
| |
| tt = texttable.Texttable(max_width=130) |
| tt.set_cols_align(["l"] + ["r"] * len(fields)) |
| |
| header = ["Name"] |
| for fld in fields: |
| if fld in SINFO_MAP: |
| header.append(SINFO_MAP[fld].print_name) |
| else: |
| header.append(fld) |
| tt.header(header) |
| |
| for summ, consumer in all_consumers_sum: |
| if summ > 0: |
| tt.add_row([":".join(consumer)] + |
| per_consumer_table_str[consumer]) |
| |
| tt.set_deco(texttable.Texttable.VLINES | texttable.Texttable.HEADER) |
| res = tt.draw() |
| max_len = max(map(len, res.split("\n"))) |
| print test_name.center(max_len) |
| print res |
| print bottlenecks |
| |
| |
| if __name__ == "__main__": |
| exit(main(sys.argv)) |