a lot of changes
diff --git a/scripts/postprocessing/bottleneck.py b/scripts/postprocessing/bottleneck.py
index a257d13..cfe9df2 100644
--- a/scripts/postprocessing/bottleneck.py
+++ b/scripts/postprocessing/bottleneck.py
@@ -1,12 +1,16 @@
""" Analize test results for finding bottlenecks """
import sys
+import csv
+import time
+import bisect
import os.path
import argparse
import collections
import yaml
+import texttable
from wally.utils import b2ssize
@@ -20,48 +24,124 @@
self.values = values # [((dev, sensor), value)]
-def load_results(fd):
- res = []
- source_id2nostname = {}
-
- for line in fd:
- line = line.strip()
- if line != "":
- _, data = eval(line)
- ctime = data.pop('time')
- source_id = data.pop('source_id')
- hostname = data.pop('hostname')
-
- data = [(k.split('.'), v) for k, v in data.items()]
-
- sd = SensorsData(source_id, hostname, ctime, data)
- res.append((ctime, sd))
- source_id2nostname[source_id] = hostname
-
- res.sort(key=lambda x: x[0])
- return res, source_id2nostname
-
-
-critical_values = dict(
- io_queue=1,
- mem_usage_percent=0.8)
-
-
class SensorInfo(object):
def __init__(self, name, native_ext, to_bytes_coef):
self.name = name
self.native_ext = native_ext
self.to_bytes_coef = to_bytes_coef
-SINFO = [
+_SINFO = [
SensorInfo('recv_bytes', 'B', 1),
SensorInfo('send_bytes', 'B', 1),
SensorInfo('sectors_written', 'Sect', 512),
SensorInfo('sectors_read', 'Sect', 512),
]
+SINFO_MAP = dict((sinfo.name, sinfo) for sinfo in _SINFO)
+to_bytes = dict((sinfo.name, sinfo.to_bytes_coef) for sinfo in _SINFO)
-SINFO_MAP = dict((sinfo.name, sinfo) for sinfo in SINFO)
+
+def load_results(fd):
+ data = fd.read(100)
+ fd.seek(0, os.SEEK_SET)
+
+ # t = time.time()
+ if '(' in data or '{' in data:
+ res, source_id2nostname = load_results_eval(fd)
+ else:
+ res, source_id2nostname = load_results_csv(fd)
+
+ # print int(((time.time() - t) * 1000000) / len(res)), len(res)
+
+ return res, source_id2nostname
+
+
+def load_results_csv(fd):
+
+ fields = {}
+ res = []
+ source_id2nostname = {}
+ coefs = {}
+
+ # cached for performance
+ ii = int
+ zz = zip
+ SD = SensorsData
+ ra = res.append
+
+ for row in csv.reader(fd):
+ if len(row) == 0:
+ continue
+ ip, port = row[:2]
+ ip_port = (ip, ii(port))
+
+ if ip_port not in fields:
+ sensors = [i.split('.') for i in row[4:]]
+ fields[ip_port] = row[2:4] + sensors
+ source_id2nostname[row[2]] = row[3]
+ coefs[ip_port] = [to_bytes.get(s[1], 1) for s in sensors]
+ else:
+ fld = fields[ip_port]
+ processed_data = []
+ a = processed_data.append
+
+ # this cycle is critical for performance
+ # don't "refactor" it, unles you are confident
+ # in what you are doing
+ for dev_sensor, val, coef in zz(fld[2:], row[3:], coefs[ip_port]):
+ a((dev_sensor, ii(val) * coef))
+
+ ctime = ii(row[2])
+ sd = SD(fld[0], fld[1], ctime, processed_data)
+ ra((ctime, sd))
+
+ res.sort(key=lambda x: x[0])
+ return res, source_id2nostname
+
+
+def load_results_eval(fd):
+ res = []
+ source_id2nostname = {}
+
+ for line in fd:
+ if line.strip() == "":
+ continue
+
+ _, data = eval(line)
+ ctime = data.pop('time')
+ source_id = data.pop('source_id')
+ hostname = data.pop('hostname')
+
+ processed_data = []
+ for k, v in data.items():
+ dev, sensor = k.split('.')
+ processed_data.append(((dev, sensor),
+ v * to_bytes.get(sensor, 1)))
+
+ sd = SensorsData(source_id, hostname, ctime, processed_data)
+ res.append((ctime, sd))
+ source_id2nostname[source_id] = hostname
+
+ res.sort(key=lambda x: x[0])
+ return res, source_id2nostname
+
+
+def load_test_timings(fd):
+ result = {} # test name - [(start_time, finish_time)]
+ data = yaml.load(fd.read())
+ assert len(data) == 1
+ test_type, test_data = data[0]
+ assert test_type == 'io'
+ for test_names, interval in test_data['__meta__']['timings']:
+ assert len(set(test_names)) == 1
+ if test_names[0] not in result:
+ result[test_names[0]] = interval
+ return result
+
+
+critical_values = dict(
+ io_queue=1,
+ mem_usage_percent=0.8)
class AggregatedData(object):
@@ -152,23 +232,22 @@
return "\n".join(table)
-def print_consumption(agg, roles, min_transfer=0):
+def print_consumption(agg, min_transfer=None):
rev_items = []
for (node_or_role, dev), v in agg.all_together.items():
rev_items.append((int(v), node_or_role + ':' + dev))
res = sorted(rev_items, reverse=True)
- sinfo = SINFO_MAP[agg.sensor_name]
- if sinfo.to_bytes_coef is not None:
+ if min_transfer is not None:
res = [(v, k)
for (v, k) in res
- if v * sinfo.to_bytes_coef >= min_transfer]
+ if v >= min_transfer]
if len(res) == 0:
return None
- res = [(b2ssize(v) + sinfo.native_ext, k) for (v, k) in res]
+ res = [(b2ssize(v) + "B", k) for (v, k) in res]
max_name_sz = max(len(name) for _, name in res)
max_val_sz = max(len(val) for val, _ in res)
@@ -227,12 +306,18 @@
max_data = 0
for sensor_name, agg in consumption.items():
if sensor_name in SINFO_MAP:
- tb = SINFO_MAP[sensor_name].to_bytes_coef
- if tb is not None:
- max_data = max(max_data, agg.per_role.get('testnode', 0) * tb)
+ max_data = max(max_data, agg.per_role.get('testnode', 0))
return max_data
+def get_data_for_interval(data, interval):
+ begin, end = interval
+ times = [ctime for ctime, _ in data]
+ b_p = bisect.bisect_left(times, begin)
+ e_p = bisect.bisect_right(times, end)
+ return data[b_p:e_p]
+
+
def main(argv):
opts = parse_args(argv)
@@ -242,8 +327,11 @@
roles_file = os.path.join(opts.results_folder,
'nodes.yaml')
- src2roles = yaml.load(open(roles_file))
+ raw_results_file = os.path.join(opts.results_folder,
+ 'raw_results.yaml')
+ src2roles = yaml.load(open(roles_file))
+ timings = load_test_timings(open(raw_results_file))
with open(sensors_data_fname) as fd:
data, source_id2hostname = load_results(fd)
@@ -252,23 +340,62 @@
# print print_bottlenecks(data, opts.max_bottlenek)
# print print_bottlenecks(data, opts.max_bottlenek)
- consumption = total_consumption(data, roles_map)
+ for name, interval in sorted(timings.items()):
+ print
+ print
+ print "-" * 30 + " " + name + " " + "-" * 30
+ print
- testdata_sz = get_testdata_size(consumption) // 1024
- for name in ('recv_bytes', 'send_bytes',
- 'sectors_read', 'sectors_written'):
- table = print_consumption(consumption[name], roles_map, testdata_sz)
- if table is None:
- print "Consumption of", name, "is negligible"
- else:
- ln = max(map(len, table.split('\n')))
- print '-' * ln
- print name.center(ln)
- print '-' * ln
- print table
- print '-' * ln
- print
+ data_chunk = get_data_for_interval(data, interval)
+ consumption = total_consumption(data_chunk, roles_map)
+
+ testdata_sz = get_testdata_size(consumption) // 100
+
+ fields = ('recv_bytes', 'send_bytes',
+ 'sectors_read', 'sectors_written')
+ per_consumer_table = {}
+
+ all_consumers = set(consumption.values()[0].all_together)
+ all_consumers_sum = []
+
+ for consumer in all_consumers:
+ tb = per_consumer_table[consumer] = []
+ vl = 0
+ for name in fields:
+ val = consumption[name].all_together[consumer]
+ if val < testdata_sz:
+ val = 0
+ vl += int(val)
+ tb.append(b2ssize(int(val)) + "B")
+ all_consumers_sum.append((vl, consumer))
+
+ all_consumers_sum.sort(reverse=True)
+ tt = texttable.Texttable(max_width=130)
+ tt.set_cols_align(["l"] + ["r"] * len(fields))
+ tt.header(["Name"] + list(fields))
+
+ for summ, consumer in all_consumers_sum:
+ if summ > 0:
+ tt.add_row([".".join(consumer)] +
+ [v if v != '0B' else '-'
+ for v in per_consumer_table[consumer]])
+
+ tt.set_deco(texttable.Texttable.VLINES | texttable.Texttable.HEADER)
+ print tt.draw()
+
+ # if name in consumption:
+ # table = print_consumption(consumption[name], testdata_sz)
+ # if table is None:
+ # print "Consumption of", name, "is negligible"
+ # else:
+ # ln = max(map(len, table.split('\n')))
+ # print '-' * ln
+ # print name.center(ln)
+ # print '-' * ln
+ # print table
+ # print '-' * ln
+ # print
if __name__ == "__main__":
exit(main(sys.argv))