a lot of chenges
diff --git a/scripts/postprocessing/bottleneck.py b/scripts/postprocessing/bottleneck.py
index d8d4b4c..8507041 100644
--- a/scripts/postprocessing/bottleneck.py
+++ b/scripts/postprocessing/bottleneck.py
@@ -1,5 +1,6 @@
""" Analize test results for finding bottlenecks """
+import re
import sys
import csv
import time
@@ -17,17 +18,10 @@
except ImportError:
pgv = None
+from wally.run_test import load_data_from
from wally.utils import b2ssize, b2ssize_10
-class SensorsData(object):
- def __init__(self, source_id, hostname, ctime, values):
- self.source_id = source_id
- self.hostname = hostname
- self.ctime = ctime
- self.values = values # [((dev, sensor), value)]
-
-
class SensorInfo(object):
def __init__(self, name, print_name, native_ext, to_bytes_coef):
self.name = name
@@ -51,99 +45,68 @@
if sinfo.to_bytes_coef is not None)
-def load_results(fd):
- data = fd.read(100)
- fd.seek(0, os.SEEK_SET)
+class NodeSensorsData(object):
+ def __init__(self, source_id, hostname, headers, values):
+ self.source_id = source_id
+ self.hostname = hostname
+ self.headers = headers
+ self.values = values
+ self.times = None
- # t = time.time()
- if '(' in data or '{' in data:
- res, source_id2nostname = load_results_eval(fd)
- else:
- res, source_id2nostname = load_results_csv(fd)
+ def finalize(self):
+ self.times = [v[0] for v in self.values]
- # print int(((time.time() - t) * 1000000) / len(res)), len(res)
+ def get_data_for_interval(self, beg, end):
+ p1 = bisect.bisect_left(self.times, beg)
+ p2 = bisect.bisect_right(self.times, end)
- return res, source_id2nostname
+ obj = self.__class__(self.source_id,
+ self.hostname,
+ self.headers,
+ self.values[p1:p2])
+ obj.times = self.times[p1:p2]
+ return obj
+
+ def __getitem__(self, name):
+ idx = self.headers.index(name.split('.'))
+ # +1 as first is a time
+ return [val[idx] for val in self.values]
def load_results_csv(fd):
-
- fields = {}
- res = []
- source_id2nostname = {}
- coefs = {}
-
- # cached for performance
- ii = int
- zz = zip
- SD = SensorsData
- ra = res.append
-
- for row in csv.reader(fd):
- if len(row) == 0:
- continue
- ip, port = row[:2]
- ip_port = (ip, ii(port))
-
- if ip_port not in fields:
- sensors = [i.split('.') for i in row[4:]]
- fields[ip_port] = row[2:4] + sensors
- source_id2nostname[row[2]] = row[3]
- coefs[ip_port] = [to_bytes.get(s[1], 1) for s in sensors]
- else:
- fld = fields[ip_port]
- processed_data = []
- a = processed_data.append
-
- # this cycle is critical for performance
- # don't "refactor" it, unles you are confident
- # in what you are doing
- for dev_sensor, val, coef in zz(fld[2:], row[3:], coefs[ip_port]):
- a((dev_sensor, ii(val) * coef))
-
- ctime = ii(row[2])
- sd = SD(fld[0], fld[1], ctime, processed_data)
- ra((ctime, sd))
-
- res.sort(key=lambda x: x[0])
- return res, source_id2nostname
-
-
-def load_results_eval(fd):
- res = []
- source_id2nostname = {}
-
- for line in fd:
- if line.strip() == "":
+ data = fd.read()
+ results = {}
+ for block in data.split("NEW_DATA"):
+ block = block.strip()
+ if len(block) == 0:
continue
- _, data = eval(line)
- ctime = data.pop('time')
- source_id = data.pop('source_id')
- hostname = data.pop('hostname')
+ it = csv.reader(block.split("\n"))
+ headers = next(it)
+ sens_data = [map(int, vals) for vals in it]
+ source_id, hostname = headers[:2]
+ headers = [(None, 'time')] + \
+ [header.split('.') for header in headers[2:]]
+ assert set(map(len, headers)) == set([2])
- processed_data = []
- for k, v in data.items():
- dev, sensor = k.split('.')
- processed_data.append(((dev, sensor),
- v * to_bytes.get(sensor, 1)))
+ results[source_id] = NodeSensorsData(source_id, hostname,
+ headers, sens_data)
- sd = SensorsData(source_id, hostname, ctime, processed_data)
- res.append((ctime, sd))
- source_id2nostname[source_id] = hostname
-
- res.sort(key=lambda x: x[0])
- return res, source_id2nostname
+ return results
-def load_test_timings(fd, max_diff=1000):
+def load_test_timings(fname, max_diff=1000):
raw_map = collections.defaultdict(lambda: [])
- data = yaml.load(fd.read())
- for test_type, test_results in data:
+
+ class data(object):
+ pass
+
+ load_data_from(fname)(None, data)
+
+ for test_type, test_results in data.results:
if test_type == 'io':
for tests_res in test_results:
- for test_res in tests_res['res']:
- raw_map[test_res['name']].append(test_res['run_interval'])
+ raw_map[tests_res.config.name].append(tests_res.run_interval)
result = {}
for name, intervals in raw_map.items():
@@ -208,15 +171,28 @@
def total_consumption(sensors_data, roles_map):
result = {}
- for _, item in sensors_data:
- for (dev, sensor), val in item.values:
+ for name, sensor_data in sensors_data.items():
+ for pos, (dev, sensor) in enumerate(sensor_data.headers):
+ if 'time' == sensor:
+ continue
try:
ad = result[sensor]
except KeyError:
ad = result[sensor] = AggregatedData(sensor)
- ad.per_device[(item.hostname, dev)] += val
+ val = sum(vals[pos] for vals in sensor_data.values)
+
+ ad.per_device[(sensor_data.hostname, dev)] += val
+
+ # vals1 = sensors_data['localhost:22']['sdc.sectors_read']
+ # vals2 = sensors_data['localhost:22']['sdb.sectors_written']
+
+ # from matplotlib import pyplot as plt
+ # plt.plot(range(len(vals1)), vals1)
+ # plt.plot(range(len(vals2)), vals2)
+ # plt.show()
+ # exit(1)
for ad in result.values():
for (hostname, dev), val in ad.per_device.items():
@@ -299,25 +275,6 @@
return "\n".join(table)
-def parse_args(args):
- parser = argparse.ArgumentParser()
- parser.add_argument('-t', '--time_period', nargs=2,
- type=int, default=None,
- help="Begin and end time for tests")
- parser.add_argument('-m', '--max-bottlenek', type=int,
- default=15, help="Max bottlenek to show")
- parser.add_argument('-x', '--max-diff', type=int,
- default=10, help="Max bottlenek to show in" +
- "0.1% from test nodes summ load")
- parser.add_argument('-d', '--debug-ver', action='store_true',
- help="Full report with original data")
- parser.add_argument('-u', '--user-ver', action='store_true',
- default=True, help="Avg load report")
- parser.add_argument('-s', '--select-loads', nargs='*', default=[])
- parser.add_argument('results_folder')
- return parser.parse_args(args[1:])
-
-
def make_roles_mapping(source_id_mapping, source_id2hostname):
result = {}
for ssh_url, roles in source_id_mapping.items():
@@ -349,7 +306,8 @@
if sens.to_bytes_coef is not None:
agg = consumption.get(name)
if agg is not None:
- max_data = max(max_data, agg.per_role.get('testnode', 0))
+ cdt = agg.per_role.get('testnode', 0) * sens.to_bytes_coef
+ max_data = max(max_data, cdt)
return max_data
@@ -364,12 +322,11 @@
def get_data_for_intervals(data, intervals):
- res = []
+ res = {}
for begin, end in intervals:
- times = [ctime for ctime, _ in data]
- b_p = bisect.bisect_left(times, begin)
- e_p = bisect.bisect_right(times, end)
- res.extend(data[b_p:e_p])
+ for name, node_data in data.items():
+ ndata = node_data.get_data_for_interval(begin, end)
+ res[name] = ndata
return res
@@ -380,92 +337,181 @@
self.net_devs = None
-# def plot_consumption(per_consumer_table, fields):
-# hosts = {}
-# storage_sensors = ('sectors_written', 'sectors_read')
+def plot_consumption(per_consumer_table, fields, refload):
+ if pgv is None:
+ return
-# for (hostname, dev), consumption in per_consumer_table.items():
-# if dev != '*':
-# continue
+ hosts = {}
+ storage_sensors = ('sectors_written', 'sectors_read')
-# if hostname not in hosts:
-# hosts[hostname] = Host(hostname)
+ for (hostname, dev), consumption in per_consumer_table.items():
+ if hostname not in hosts:
+ hosts[hostname] = Host(hostname)
-# cons_map = map(zip(fields, consumption))
+ host = hosts[hostname]
+ cons_map = dict(zip(fields, consumption))
-# for sn in storage_sensors:
-# vl = cons_map.get(sn, 0)
-# if vl > 0:
-# pass
+ for sn in storage_sensors:
+ vl = cons_map.get(sn, 0)
+ if vl > 0:
+ host.hdd_devs.setdefault(dev, {})[sn] = vl
+
+ p = pgv.AGraph(name='system', directed=True)
+
+ net = "Network"
+ p.add_node(net)
+
+ in_color = 'red'
+ out_color = 'green'
+
+ for host in hosts.values():
+ g = p.subgraph(name="cluster_" + host.name, label=host.name,
+ color="blue")
+ g.add_node(host.name, shape="diamond")
+ p.add_edge(host.name, net)
+ p.add_edge(net, host.name)
+
+ for dev_name, values in host.hdd_devs.items():
+ if dev_name == '*':
+ continue
+
+ to = values.get('sectors_written', 0)
+ frm = values.get('sectors_read', 0)
+ to_pw = 7 * to / refload
+ frm_pw = 7 * frm / refload
+ min_with = 0.1
+
+ if to_pw > min_with or frm_pw > min_with:
+ dev_fqn = host.name + "." + dev_name
+ g.add_node(dev_fqn)
+
+ if to_pw > min_with:
+ g.add_edge(host.name, dev_fqn,
+ label=b2ssize(to) + "B",
+ penwidth=to_pw,
+ fontcolor=out_color,
+ color=out_color)
+
+ if frm_pw > min_with:
+ g.add_edge(dev_fqn, host.name,
+ label=b2ssize(frm) + "B",
+ penwidth=frm_pw,
+ color=in_color,
+ fontcolor=in_color)
+
+ return p.string()
+
+
+def parse_args(args):
+ parser = argparse.ArgumentParser()
+ parser.add_argument('-t', '--time_period', nargs=2,
+ type=int, default=None,
+ help="Begin and end time for tests")
+ parser.add_argument('-m', '--max-bottlenek', type=int,
+ default=15, help="Max bottlenek to show")
+ parser.add_argument('-x', '--max-diff', type=int,
+ default=10, help="Max bottlenek to show in" +
+ "0.1% from test nodes summ load")
+ parser.add_argument('-d', '--debug-ver', action='store_true',
+ help="Full report with original data")
+ parser.add_argument('-u', '--user-ver', action='store_true',
+ default=True, help="Avg load report")
+ parser.add_argument('-s', '--select-loads', nargs='*', default=[])
+ parser.add_argument('-f', '--fields', nargs='*', default=[])
+ parser.add_argument('results_folder')
+ return parser.parse_args(args[1:])
def main(argv):
opts = parse_args(argv)
- sensors_data_fname = os.path.join(opts.results_folder,
- 'sensor_storage.txt')
+ stor_dir = os.path.join(opts.results_folder, 'sensor_storage')
+ data = {}
+ source_id2hostname = {}
+
+ csv_files = os.listdir(stor_dir)
+ for fname in csv_files:
+ assert re.match(r"\d+_\d+.csv$", fname)
+
+ csv_files.sort(key=lambda x: int(x.split('_')[0]))
+
+ for fname in csv_files:
+ with open(os.path.join(stor_dir, fname)) as fd:
+ for name, node_sens_data in load_results_csv(fd).items():
+ if name in data:
+ assert data[name].hostname == node_sens_data.hostname
+ assert data[name].source_id == node_sens_data.source_id
+ assert data[name].headers == node_sens_data.headers
+ data[name].values.extend(node_sens_data.values)
+ else:
+ data[name] = node_sens_data
+
+ for nd in data.values():
+ assert nd.source_id not in source_id2hostname
+ source_id2hostname[nd.source_id] = nd.hostname
+ nd.finalize()
roles_file = os.path.join(opts.results_folder,
'nodes.yaml')
- raw_results_file = os.path.join(opts.results_folder,
- 'raw_results.yaml')
-
src2roles = yaml.load(open(roles_file))
- timings = load_test_timings(open(raw_results_file))
- with open(sensors_data_fname) as fd:
- data, source_id2hostname = load_results(fd)
+
+ timings = load_test_timings(opts.results_folder)
roles_map = make_roles_mapping(src2roles, source_id2hostname)
max_diff = float(opts.max_diff) / 1000
- # print print_bottlenecks(data, opts.max_bottlenek)
- # print print_bottlenecks(data, opts.max_bottlenek)
+ fields = ('recv_bytes', 'send_bytes',
+ 'sectors_read', 'sectors_written',
+ 'reads_completed', 'writes_completed')
- for name, intervals in sorted(timings.items()):
+ if opts.fields != []:
+ fields = [field for field in fields if field in opts.fields]
+
+ for test_name, intervals in sorted(timings.items()):
if opts.select_loads != []:
- if name not in opts.select_loads:
+ if test_name not in opts.select_loads:
continue
- print
- print
- print "-" * 30 + " " + name + " " + "-" * 30
- print
+ data_chunks = get_data_for_intervals(data, intervals)
- data_chunk = get_data_for_intervals(data, intervals)
-
- consumption = total_consumption(data_chunk, roles_map)
+ consumption = total_consumption(data_chunks, roles_map)
testdata_sz = get_testdata_size(consumption) * max_diff
testop_count = get_testop_cout(consumption) * max_diff
- fields = ('recv_bytes', 'send_bytes',
- 'sectors_read', 'sectors_written',
- 'reads_completed', 'writes_completed')
per_consumer_table = {}
+ per_consumer_table_str = {}
all_consumers = set(consumption.values()[0].all_together)
+ fields = [field for field in fields if field in consumption]
all_consumers_sum = []
for consumer in all_consumers:
+ tb_str = per_consumer_table_str[consumer] = []
tb = per_consumer_table[consumer] = []
vl = 0
for name in fields:
val = consumption[name].all_together[consumer]
if SINFO_MAP[name].to_bytes_coef is None:
if val < testop_count:
- val = 0
- tb.append(b2ssize_10(int(val)))
+ tb_str.append('0')
+ else:
+ tb_str.append(b2ssize_10(int(val)))
else:
+ val = int(val) * SINFO_MAP[name].to_bytes_coef
if val < testdata_sz:
- val = 0
- tb.append(b2ssize(int(val)) + "B")
+ tb_str.append('-')
+ else:
+ tb_str.append(b2ssize(val) + "B")
+ tb.append(int(val))
vl += int(val)
all_consumers_sum.append((vl, consumer))
all_consumers_sum.sort(reverse=True)
- # plot_consumption(per_consumer_table, fields)
- # continue
+
+ plot_consumption(per_consumer_table, fields,
+ testdata_sz / max_diff)
tt = texttable.Texttable(max_width=130)
tt.set_cols_align(["l"] + ["r"] * len(fields))
@@ -481,11 +527,13 @@
for summ, consumer in all_consumers_sum:
if summ > 0:
tt.add_row([":".join(consumer)] +
- [v if v not in ('0B', '0') else '-'
- for v in per_consumer_table[consumer]])
+ per_consumer_table_str[consumer])
tt.set_deco(texttable.Texttable.VLINES | texttable.Texttable.HEADER)
- print tt.draw()
+ res = tt.draw()
+ max_len = max(map(len, res.split("\n")))
+ print test_name.center(max_len)
+ print res
if __name__ == "__main__":