blob: 4dff18675dc4cdf84307e6396087183022a6ae62 [file] [log] [blame]
import re
import json
import collections
# from wally.utils import ssize_to_b
from wally.statistic import med_dev
PerfInfo = collections.namedtuple('PerfInfo',
('name',
'bw', 'iops', 'dev',
'lat', 'lat_dev', 'raw',
'meta'))
def split_and_add(data, block_size):
assert len(data) % block_size == 0
res = [0] * block_size
for idx, val in enumerate(data):
res[idx % block_size] += val
return res
def process_disk_info(test_output):
data = {}
for tp, pre_result in test_output:
if tp != 'io' or pre_result is None:
pass
vm_count = pre_result['__test_meta__']['testnodes_count']
for name, results in pre_result['res'].items():
assert len(results['bw']) % vm_count == 0
block_count = len(results['bw']) // vm_count
bw, bw_dev = med_dev(split_and_add(results['bw'], block_count))
iops, _ = med_dev(split_and_add(results['iops'],
block_count))
lat, lat_dev = med_dev(results['lat'])
dev = bw_dev / float(bw)
data[name] = PerfInfo(name, bw, iops, dev, lat, lat_dev, results,
pre_result['__test_meta__'])
return data
def parse_output(out_err):
err_start_patt = r"(?ims)=+\s+ERROR\s+=+"
err_end_patt = r"(?ims)=+\s+END OF ERROR\s+=+"
for block in re.split(err_start_patt, out_err)[1:]:
tb, garbage = re.split(err_end_patt, block)
msg = "Test fails with error:\n" + tb.strip() + "\n"
raise OSError(msg)
start_patt = r"(?ims)=+\s+RESULTS\(format=json\)\s+=+"
end_patt = r"(?ims)=+\s+END OF RESULTS\s+=+"
for block in re.split(start_patt, out_err)[1:]:
data, garbage = re.split(end_patt, block)
yield json.loads(data.strip())
start_patt = r"(?ims)=+\s+RESULTS\(format=eval\)\s+=+"
end_patt = r"(?ims)=+\s+END OF RESULTS\s+=+"
for block in re.split(start_patt, out_err)[1:]:
data, garbage = re.split(end_patt, block)
yield eval(data.strip())
def filter_data(name_prefix, fields_to_select, **filters):
def closure(data):
for result in data:
if name_prefix is not None:
if not result['jobname'].startswith(name_prefix):
continue
for k, v in filters.items():
if result.get(k) != v:
break
else:
yield map(result.get, fields_to_select)
return closure
def filter_processed_data(name_prefix, fields_to_select, **filters):
def closure(data):
for name, result in data.items():
if name_prefix is not None:
if not name.startswith(name_prefix):
continue
for k, v in filters.items():
if result.raw.get(k) != v:
break
else:
yield map(result.raw.get, fields_to_select)
return closure
# def load_data(raw_data):
# data = list(parse_output(raw_data))[0]
# for key, val in data['res'].items():
# val['blocksize_b'] = ssize_to_b(val['blocksize'])
# val['iops_mediana'], val['iops_stddev'] = med_dev(val['iops'])
# val['bw_mediana'], val['bw_stddev'] = med_dev(val['bw'])
# val['lat_mediana'], val['lat_stddev'] = med_dev(val['lat'])
# yield val
# def load_files(*fnames):
# for fname in fnames:
# for i in load_data(open(fname).read()):
# yield i